aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorSchanzenbach, Martin <mschanzenbach@posteo.de>2019-12-24 22:42:58 +0900
committerSchanzenbach, Martin <mschanzenbach@posteo.de>2019-12-24 22:42:58 +0900
commita967815bd7207de41c7e719ef34628f2eaed3ab0 (patch)
tree78e0cf7dec976385aa6482fed44b3c8226838966 /src/transport
parenta8bd85390032ee41c59cbdae645fe982bee51e7e (diff)
downloadgnunet-a967815bd7207de41c7e719ef34628f2eaed3ab0.tar.gz
gnunet-a967815bd7207de41c7e719ef34628f2eaed3ab0.zip
fix dropped pkts; wip
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/gnunet-communicator-unix.c50
1 files changed, 33 insertions, 17 deletions
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c
index 29ec087e1..5d7949b97 100644
--- a/src/transport/gnunet-communicator-unix.c
+++ b/src/transport/gnunet-communicator-unix.c
@@ -42,7 +42,7 @@
42 * otherwise we may read messages just to have them dropped 42 * otherwise we may read messages just to have them dropped
43 * by the communicator API. 43 * by the communicator API.
44 */ 44 */
45#define DEFAULT_MAX_QUEUE_LENGTH 8 45#define DEFAULT_MAX_QUEUE_LENGTH 8000
46 46
47/** 47/**
48 * Address prefix used by the communicator. 48 * Address prefix used by the communicator.
@@ -412,16 +412,6 @@ select_write_cb (void *cls)
412 412
413 /* take queue of the ready list */ 413 /* take queue of the ready list */
414 write_task = NULL; 414 write_task = NULL;
415 GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
416 if (NULL != queue_head)
417 write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
418 unix_sock,
419 &select_write_cb,
420 NULL);
421
422 /* send 'msg' */
423 queue->msg = NULL;
424 GNUNET_MQ_impl_send_continue (queue->mq);
425resend: 415resend:
426 /* Send the data */ 416 /* Send the data */
427 sent = GNUNET_NETWORK_socket_sendto (unix_sock, 417 sent = GNUNET_NETWORK_socket_sendto (unix_sock,
@@ -437,6 +427,17 @@ resend:
437 (sent < 0) ? strerror (errno) : "ok"); 427 (sent < 0) ? strerror (errno) : "ok");
438 if (-1 != sent) 428 if (-1 != sent)
439 { 429 {
430 GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
431 if (NULL != queue_head)
432 write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
433 unix_sock,
434 &select_write_cb,
435 NULL);
436
437 /* send 'msg' */
438 GNUNET_free (queue->msg);
439 queue->msg = NULL;
440 GNUNET_MQ_impl_send_continue (queue->mq);
440 GNUNET_STATISTICS_update (stats, 441 GNUNET_STATISTICS_update (stats,
441 "# bytes sent", 442 "# bytes sent",
442 (long long) sent, 443 (long long) sent,
@@ -448,6 +449,10 @@ resend:
448 "# network transmission failures", 449 "# network transmission failures",
449 1, 450 1,
450 GNUNET_NO); 451 GNUNET_NO);
452 write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
453 unix_sock,
454 &select_write_cb,
455 NULL);
451 switch (errno) 456 switch (errno)
452 { 457 {
453 case EAGAIN: 458 case EAGAIN:
@@ -473,7 +478,7 @@ resend:
473 return; 478 return;
474 } 479 }
475 GNUNET_log ( 480 GNUNET_log (
476 GNUNET_ERROR_TYPE_DEBUG, 481 GNUNET_ERROR_TYPE_WARNING,
477 "Trying to increase socket buffer size from %u to %u for message size %u\n", 482 "Trying to increase socket buffer size from %u to %u for message size %u\n",
478 (unsigned int) size, 483 (unsigned int) size,
479 (unsigned int) ((msg_size / 1000) + 2) * 1000, 484 (unsigned int) ((msg_size / 1000) + 2) * 1000,
@@ -523,9 +528,9 @@ mq_send (struct GNUNET_MQ_Handle *mq,
523 528
524 GNUNET_assert (mq == queue->mq); 529 GNUNET_assert (mq == queue->mq);
525 GNUNET_assert (NULL == queue->msg); 530 GNUNET_assert (NULL == queue->msg);
526 //Convert to UNIXMessage 531 // Convert to UNIXMessage
527 queue->msg = GNUNET_malloc (msize + sizeof (struct UNIXMessage)); 532 queue->msg = GNUNET_malloc (msize + sizeof (struct UNIXMessage));
528 queue->msg->header.size = htons(msize + sizeof (struct UNIXMessage)); 533 queue->msg->header.size = htons (msize + sizeof (struct UNIXMessage));
529 queue->msg->sender = my_identity; 534 queue->msg->sender = my_identity;
530 memcpy (&queue->msg[1], msg, msize); 535 memcpy (&queue->msg[1], msg, msize);
531 GNUNET_CONTAINER_DLL_insert (queue_head, queue_tail, queue); 536 GNUNET_CONTAINER_DLL_insert (queue_head, queue_tail, queue);
@@ -697,14 +702,13 @@ receive_complete_cb (void *cls, int success)
697{ 702{
698 (void) cls; 703 (void) cls;
699 delivering_messages--; 704 delivering_messages--;
700
701 if (GNUNET_OK != success) 705 if (GNUNET_OK != success)
702 GNUNET_STATISTICS_update (stats, 706 GNUNET_STATISTICS_update (stats,
703 "# transport transmission failures", 707 "# transport transmission failures",
704 1, 708 1,
705 GNUNET_NO); 709 GNUNET_NO);
706 GNUNET_assert (NULL != unix_sock); 710 if ((NULL == read_task) && (delivering_messages < max_queue_length) &&
707 if ((NULL == read_task) && (delivering_messages < max_queue_length)) 711 (NULL != unix_sock))
708 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, 712 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
709 unix_sock, 713 unix_sock,
710 &select_read_cb, 714 &select_read_cb,
@@ -807,15 +811,26 @@ select_read_cb (void *cls)
807 &receive_complete_cb, 811 &receive_complete_cb,
808 NULL); 812 NULL);
809 if (GNUNET_SYSERR == ret) 813 if (GNUNET_SYSERR == ret)
814 {
815 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
816 "Transport not up!\n");
810 return; /* transport not up */ 817 return; /* transport not up */
818 }
811 if (GNUNET_NO == ret) 819 if (GNUNET_NO == ret)
820 {
821 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
822 "Error sending message to transport\n");
812 break; 823 break;
824 }
813 delivering_messages++; 825 delivering_messages++;
814 offset += csize; 826 offset += csize;
815 } 827 }
816 } 828 }
817 if (delivering_messages >= max_queue_length) 829 if (delivering_messages >= max_queue_length)
818 { 830 {
831 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
832 "Back pressure %llu\n", delivering_messages);
833
819 /* we should try to apply 'back pressure' */ 834 /* we should try to apply 'back pressure' */
820 GNUNET_SCHEDULER_cancel (read_task); 835 GNUNET_SCHEDULER_cancel (read_task);
821 read_task = NULL; 836 read_task = NULL;
@@ -991,6 +1006,7 @@ run (void *cls,
991 struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key; 1006 struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key;
992 1007
993 (void) cls; 1008 (void) cls;
1009 delivering_messages = 0;
994 1010
995 my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg); 1011 my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
996 if (NULL == my_private_key) 1012 if (NULL == my_private_key)