diff options
author | Schanzenbach, Martin <mschanzenbach@posteo.de> | 2019-12-24 22:42:58 +0900 |
---|---|---|
committer | Schanzenbach, Martin <mschanzenbach@posteo.de> | 2019-12-24 22:42:58 +0900 |
commit | a967815bd7207de41c7e719ef34628f2eaed3ab0 (patch) | |
tree | 78e0cf7dec976385aa6482fed44b3c8226838966 /src/transport | |
parent | a8bd85390032ee41c59cbdae645fe982bee51e7e (diff) | |
download | gnunet-a967815bd7207de41c7e719ef34628f2eaed3ab0.tar.gz gnunet-a967815bd7207de41c7e719ef34628f2eaed3ab0.zip |
fix dropped pkts; wip
Diffstat (limited to 'src/transport')
-rw-r--r-- | src/transport/gnunet-communicator-unix.c | 50 |
1 files changed, 33 insertions, 17 deletions
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c index 29ec087e1..5d7949b97 100644 --- a/src/transport/gnunet-communicator-unix.c +++ b/src/transport/gnunet-communicator-unix.c | |||
@@ -42,7 +42,7 @@ | |||
42 | * otherwise we may read messages just to have them dropped | 42 | * otherwise we may read messages just to have them dropped |
43 | * by the communicator API. | 43 | * by the communicator API. |
44 | */ | 44 | */ |
45 | #define DEFAULT_MAX_QUEUE_LENGTH 8 | 45 | #define DEFAULT_MAX_QUEUE_LENGTH 8000 |
46 | 46 | ||
47 | /** | 47 | /** |
48 | * Address prefix used by the communicator. | 48 | * Address prefix used by the communicator. |
@@ -412,16 +412,6 @@ select_write_cb (void *cls) | |||
412 | 412 | ||
413 | /* take queue of the ready list */ | 413 | /* take queue of the ready list */ |
414 | write_task = NULL; | 414 | write_task = NULL; |
415 | GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue); | ||
416 | if (NULL != queue_head) | ||
417 | write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
418 | unix_sock, | ||
419 | &select_write_cb, | ||
420 | NULL); | ||
421 | |||
422 | /* send 'msg' */ | ||
423 | queue->msg = NULL; | ||
424 | GNUNET_MQ_impl_send_continue (queue->mq); | ||
425 | resend: | 415 | resend: |
426 | /* Send the data */ | 416 | /* Send the data */ |
427 | sent = GNUNET_NETWORK_socket_sendto (unix_sock, | 417 | sent = GNUNET_NETWORK_socket_sendto (unix_sock, |
@@ -437,6 +427,17 @@ resend: | |||
437 | (sent < 0) ? strerror (errno) : "ok"); | 427 | (sent < 0) ? strerror (errno) : "ok"); |
438 | if (-1 != sent) | 428 | if (-1 != sent) |
439 | { | 429 | { |
430 | GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue); | ||
431 | if (NULL != queue_head) | ||
432 | write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
433 | unix_sock, | ||
434 | &select_write_cb, | ||
435 | NULL); | ||
436 | |||
437 | /* send 'msg' */ | ||
438 | GNUNET_free (queue->msg); | ||
439 | queue->msg = NULL; | ||
440 | GNUNET_MQ_impl_send_continue (queue->mq); | ||
440 | GNUNET_STATISTICS_update (stats, | 441 | GNUNET_STATISTICS_update (stats, |
441 | "# bytes sent", | 442 | "# bytes sent", |
442 | (long long) sent, | 443 | (long long) sent, |
@@ -448,6 +449,10 @@ resend: | |||
448 | "# network transmission failures", | 449 | "# network transmission failures", |
449 | 1, | 450 | 1, |
450 | GNUNET_NO); | 451 | GNUNET_NO); |
452 | write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
453 | unix_sock, | ||
454 | &select_write_cb, | ||
455 | NULL); | ||
451 | switch (errno) | 456 | switch (errno) |
452 | { | 457 | { |
453 | case EAGAIN: | 458 | case EAGAIN: |
@@ -473,7 +478,7 @@ resend: | |||
473 | return; | 478 | return; |
474 | } | 479 | } |
475 | GNUNET_log ( | 480 | GNUNET_log ( |
476 | GNUNET_ERROR_TYPE_DEBUG, | 481 | GNUNET_ERROR_TYPE_WARNING, |
477 | "Trying to increase socket buffer size from %u to %u for message size %u\n", | 482 | "Trying to increase socket buffer size from %u to %u for message size %u\n", |
478 | (unsigned int) size, | 483 | (unsigned int) size, |
479 | (unsigned int) ((msg_size / 1000) + 2) * 1000, | 484 | (unsigned int) ((msg_size / 1000) + 2) * 1000, |
@@ -523,9 +528,9 @@ mq_send (struct GNUNET_MQ_Handle *mq, | |||
523 | 528 | ||
524 | GNUNET_assert (mq == queue->mq); | 529 | GNUNET_assert (mq == queue->mq); |
525 | GNUNET_assert (NULL == queue->msg); | 530 | GNUNET_assert (NULL == queue->msg); |
526 | //Convert to UNIXMessage | 531 | // Convert to UNIXMessage |
527 | queue->msg = GNUNET_malloc (msize + sizeof (struct UNIXMessage)); | 532 | queue->msg = GNUNET_malloc (msize + sizeof (struct UNIXMessage)); |
528 | queue->msg->header.size = htons(msize + sizeof (struct UNIXMessage)); | 533 | queue->msg->header.size = htons (msize + sizeof (struct UNIXMessage)); |
529 | queue->msg->sender = my_identity; | 534 | queue->msg->sender = my_identity; |
530 | memcpy (&queue->msg[1], msg, msize); | 535 | memcpy (&queue->msg[1], msg, msize); |
531 | GNUNET_CONTAINER_DLL_insert (queue_head, queue_tail, queue); | 536 | GNUNET_CONTAINER_DLL_insert (queue_head, queue_tail, queue); |
@@ -697,14 +702,13 @@ receive_complete_cb (void *cls, int success) | |||
697 | { | 702 | { |
698 | (void) cls; | 703 | (void) cls; |
699 | delivering_messages--; | 704 | delivering_messages--; |
700 | |||
701 | if (GNUNET_OK != success) | 705 | if (GNUNET_OK != success) |
702 | GNUNET_STATISTICS_update (stats, | 706 | GNUNET_STATISTICS_update (stats, |
703 | "# transport transmission failures", | 707 | "# transport transmission failures", |
704 | 1, | 708 | 1, |
705 | GNUNET_NO); | 709 | GNUNET_NO); |
706 | GNUNET_assert (NULL != unix_sock); | 710 | if ((NULL == read_task) && (delivering_messages < max_queue_length) && |
707 | if ((NULL == read_task) && (delivering_messages < max_queue_length)) | 711 | (NULL != unix_sock)) |
708 | read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, | 712 | read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, |
709 | unix_sock, | 713 | unix_sock, |
710 | &select_read_cb, | 714 | &select_read_cb, |
@@ -807,15 +811,26 @@ select_read_cb (void *cls) | |||
807 | &receive_complete_cb, | 811 | &receive_complete_cb, |
808 | NULL); | 812 | NULL); |
809 | if (GNUNET_SYSERR == ret) | 813 | if (GNUNET_SYSERR == ret) |
814 | { | ||
815 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
816 | "Transport not up!\n"); | ||
810 | return; /* transport not up */ | 817 | return; /* transport not up */ |
818 | } | ||
811 | if (GNUNET_NO == ret) | 819 | if (GNUNET_NO == ret) |
820 | { | ||
821 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
822 | "Error sending message to transport\n"); | ||
812 | break; | 823 | break; |
824 | } | ||
813 | delivering_messages++; | 825 | delivering_messages++; |
814 | offset += csize; | 826 | offset += csize; |
815 | } | 827 | } |
816 | } | 828 | } |
817 | if (delivering_messages >= max_queue_length) | 829 | if (delivering_messages >= max_queue_length) |
818 | { | 830 | { |
831 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
832 | "Back pressure %llu\n", delivering_messages); | ||
833 | |||
819 | /* we should try to apply 'back pressure' */ | 834 | /* we should try to apply 'back pressure' */ |
820 | GNUNET_SCHEDULER_cancel (read_task); | 835 | GNUNET_SCHEDULER_cancel (read_task); |
821 | read_task = NULL; | 836 | read_task = NULL; |
@@ -991,6 +1006,7 @@ run (void *cls, | |||
991 | struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key; | 1006 | struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key; |
992 | 1007 | ||
993 | (void) cls; | 1008 | (void) cls; |
1009 | delivering_messages = 0; | ||
994 | 1010 | ||
995 | my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg); | 1011 | my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg); |
996 | if (NULL == my_private_key) | 1012 | if (NULL == my_private_key) |