diff options
Diffstat (limited to 'src/transport/transport_api2_communication.c')
-rw-r--r-- | src/transport/transport_api2_communication.c | 194 |
1 files changed, 124 insertions, 70 deletions
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c index d446516bd..3a68c6eba 100644 --- a/src/transport/transport_api2_communication.c +++ b/src/transport/transport_api2_communication.c | |||
@@ -90,6 +90,11 @@ struct AckPending | |||
90 | struct AckPending *prev; | 90 | struct AckPending *prev; |
91 | 91 | ||
92 | /** | 92 | /** |
93 | * Communicator this entry belongs to. | ||
94 | */ | ||
95 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch; | ||
96 | |||
97 | /** | ||
93 | * Which peer is this about? | 98 | * Which peer is this about? |
94 | */ | 99 | */ |
95 | struct GNUNET_PeerIdentity receiver; | 100 | struct GNUNET_PeerIdentity receiver; |
@@ -134,17 +139,17 @@ struct GNUNET_TRANSPORT_CommunicatorHandle | |||
134 | /** | 139 | /** |
135 | * DLL of messages awaiting transmission confirmation (ack). | 140 | * DLL of messages awaiting transmission confirmation (ack). |
136 | */ | 141 | */ |
137 | struct AckPending *ac_tail; | 142 | struct AckPending *ap_tail; |
138 | 143 | ||
139 | /** | 144 | /** |
140 | * DLL of queues we offer. | 145 | * DLL of queues we offer. |
141 | */ | 146 | */ |
142 | struct QueueHandle *queue_head; | 147 | struct GNUNET_TRANSPORT_QueueHandle *queue_head; |
143 | 148 | ||
144 | /** | 149 | /** |
145 | * DLL of queues we offer. | 150 | * DLL of queues we offer. |
146 | */ | 151 | */ |
147 | struct QueueHandle *queue_tail; | 152 | struct GNUNET_TRANSPORT_QueueHandle *queue_tail; |
148 | 153 | ||
149 | /** | 154 | /** |
150 | * Our configuration. | 155 | * Our configuration. |
@@ -152,9 +157,14 @@ struct GNUNET_TRANSPORT_CommunicatorHandle | |||
152 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 157 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
153 | 158 | ||
154 | /** | 159 | /** |
155 | * Name of the communicator. | 160 | * Config section to use. |
161 | */ | ||
162 | const char *config_section; | ||
163 | |||
164 | /** | ||
165 | * Address prefix to use. | ||
156 | */ | 166 | */ |
157 | const char *name; | 167 | const char *addr_prefix; |
158 | 168 | ||
159 | /** | 169 | /** |
160 | * Function to call when the transport service wants us to initiate | 170 | * Function to call when the transport service wants us to initiate |
@@ -168,6 +178,11 @@ struct GNUNET_TRANSPORT_CommunicatorHandle | |||
168 | void *mq_init_cls; | 178 | void *mq_init_cls; |
169 | 179 | ||
170 | /** | 180 | /** |
181 | * Queue to talk to the transport service. | ||
182 | */ | ||
183 | struct GNUNET_MQ_Handle *mq; | ||
184 | |||
185 | /** | ||
171 | * Maximum permissable queue length. | 186 | * Maximum permissable queue length. |
172 | */ | 187 | */ |
173 | unsigned long long max_queue_length; | 188 | unsigned long long max_queue_length; |
@@ -202,6 +217,17 @@ struct GNUNET_TRANSPORT_CommunicatorHandle | |||
202 | */ | 217 | */ |
203 | struct GNUNET_TRANSPORT_QueueHandle | 218 | struct GNUNET_TRANSPORT_QueueHandle |
204 | { | 219 | { |
220 | |||
221 | /** | ||
222 | * Kept in a DLL. | ||
223 | */ | ||
224 | struct GNUNET_TRANSPORT_QueueHandle *next; | ||
225 | |||
226 | /** | ||
227 | * Kept in a DLL. | ||
228 | */ | ||
229 | struct GNUNET_TRANSPORT_QueueHandle *prev; | ||
230 | |||
205 | /** | 231 | /** |
206 | * Handle this queue belongs to. | 232 | * Handle this queue belongs to. |
207 | */ | 233 | */ |
@@ -308,7 +334,7 @@ send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai) | |||
308 | env = GNUNET_MQ_msg_extra (aam, | 334 | env = GNUNET_MQ_msg_extra (aam, |
309 | strlen (ai->address) + 1, | 335 | strlen (ai->address) + 1, |
310 | GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS); | 336 | GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS); |
311 | aam->expiration = GNUNET_TIME_relative_to_nbo (ai->expiration); | 337 | aam->expiration = GNUNET_TIME_relative_hton (ai->expiration); |
312 | aam->nt = htonl ((uint32_t) ai->nt); | 338 | aam->nt = htonl ((uint32_t) ai->nt); |
313 | memcpy (&aam[1], | 339 | memcpy (&aam[1], |
314 | ai->address, | 340 | ai->address, |
@@ -334,7 +360,7 @@ send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai) | |||
334 | return; | 360 | return; |
335 | env = GNUNET_MQ_msg (dam, | 361 | env = GNUNET_MQ_msg (dam, |
336 | GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS); | 362 | GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS); |
337 | dam.aid = htonl (ai->aid); | 363 | dam->aid = htonl (ai->aid); |
338 | GNUNET_MQ_send (ai->ch->mq, | 364 | GNUNET_MQ_send (ai->ch->mq, |
339 | env); | 365 | env); |
340 | } | 366 | } |
@@ -352,18 +378,18 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) | |||
352 | struct GNUNET_MQ_Envelope *env; | 378 | struct GNUNET_MQ_Envelope *env; |
353 | struct GNUNET_TRANSPORT_AddQueueMessage *aqm; | 379 | struct GNUNET_TRANSPORT_AddQueueMessage *aqm; |
354 | 380 | ||
355 | if (NULL == ai->ch->mq) | 381 | if (NULL == qh->ch->mq) |
356 | return; | 382 | return; |
357 | env = GNUNET_MQ_msg_extra (aqm, | 383 | env = GNUNET_MQ_msg_extra (aqm, |
358 | strlen (ai->address) + 1, | 384 | strlen (qh->address) + 1, |
359 | GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE); | 385 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP); |
360 | aqm.receiver = qh->peer; | 386 | aqm->receiver = qh->peer; |
361 | aqm.nt = htonl ((uint32_t) qh->nt); | 387 | aqm->nt = htonl ((uint32_t) qh->nt); |
362 | aqm.qid = htonl (qh->qid); | 388 | aqm->qid = htonl (qh->queue_id); |
363 | memcpy (&aqm[1], | 389 | memcpy (&aqm[1], |
364 | ai->address, | 390 | qh->address, |
365 | strlen (ai->address) + 1); | 391 | strlen (qh->address) + 1); |
366 | GNUNET_MQ_send (ai->ch->mq, | 392 | GNUNET_MQ_send (qh->ch->mq, |
367 | env); | 393 | env); |
368 | } | 394 | } |
369 | 395 | ||
@@ -380,13 +406,13 @@ send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) | |||
380 | struct GNUNET_MQ_Envelope *env; | 406 | struct GNUNET_MQ_Envelope *env; |
381 | struct GNUNET_TRANSPORT_DelQueueMessage *dqm; | 407 | struct GNUNET_TRANSPORT_DelQueueMessage *dqm; |
382 | 408 | ||
383 | if (NULL == ai->ch->mq) | 409 | if (NULL == qh->ch->mq) |
384 | return; | 410 | return; |
385 | env = GNUNET_MQ_msg (dqm, | 411 | env = GNUNET_MQ_msg (dqm, |
386 | GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE); | 412 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN); |
387 | dqm.qid = htonl (qh->qid); | 413 | dqm->qid = htonl (qh->queue_id); |
388 | dqm.receiver = qh->peer; | 414 | dqm->receiver = qh->peer; |
389 | GNUNET_MQ_send (ai->ch->mq, | 415 | GNUNET_MQ_send (qh->ch->mq, |
390 | env); | 416 | env); |
391 | } | 417 | } |
392 | 418 | ||
@@ -444,7 +470,8 @@ error_handler (void *cls, | |||
444 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | 470 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; |
445 | 471 | ||
446 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 472 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
447 | "MQ failure, reconnecting to transport service.\n"); | 473 | "MQ failure %d, reconnecting to transport service.\n", |
474 | error); | ||
448 | disconnect (ch); | 475 | disconnect (ch); |
449 | /* TODO: maybe do this with exponential backoff/delay */ | 476 | /* TODO: maybe do this with exponential backoff/delay */ |
450 | reconnect (ch); | 477 | reconnect (ch); |
@@ -460,7 +487,7 @@ error_handler (void *cls, | |||
460 | */ | 487 | */ |
461 | static void | 488 | static void |
462 | handle_incoming_ack (void *cls, | 489 | handle_incoming_ack (void *cls, |
463 | struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack) | 490 | const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack) |
464 | { | 491 | { |
465 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | 492 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; |
466 | 493 | ||
@@ -470,7 +497,7 @@ handle_incoming_ack (void *cls, | |||
470 | { | 497 | { |
471 | if ( (fc->id == incoming_ack->fc_id) && | 498 | if ( (fc->id == incoming_ack->fc_id) && |
472 | (0 == memcmp (&fc->sender, | 499 | (0 == memcmp (&fc->sender, |
473 | incoming_ack->sender, | 500 | &incoming_ack->sender, |
474 | sizeof (struct GNUNET_PeerIdentity))) ) | 501 | sizeof (struct GNUNET_PeerIdentity))) ) |
475 | { | 502 | { |
476 | GNUNET_CONTAINER_DLL_remove (ch->fc_head, | 503 | GNUNET_CONTAINER_DLL_remove (ch->fc_head, |
@@ -499,11 +526,12 @@ handle_incoming_ack (void *cls, | |||
499 | */ | 526 | */ |
500 | static int | 527 | static int |
501 | check_create_queue (void *cls, | 528 | check_create_queue (void *cls, |
502 | struct GNUNET_TRANSPORT_CreateQueue *cq) | 529 | const struct GNUNET_TRANSPORT_CreateQueue *cq) |
503 | { | 530 | { |
504 | uint16_t len = ntohs (cq->header.size) - sizeof (*cq); | 531 | uint16_t len = ntohs (cq->header.size) - sizeof (*cq); |
505 | const char *addr = (const char *) &cq[1]; | 532 | const char *addr = (const char *) &cq[1]; |
506 | 533 | ||
534 | (void) cls; | ||
507 | if ( (0 == len) || | 535 | if ( (0 == len) || |
508 | ('\0' != addr[len-1]) ) | 536 | ('\0' != addr[len-1]) ) |
509 | { | 537 | { |
@@ -522,11 +550,13 @@ check_create_queue (void *cls, | |||
522 | */ | 550 | */ |
523 | static void | 551 | static void |
524 | handle_create_queue (void *cls, | 552 | handle_create_queue (void *cls, |
525 | struct GNUNET_TRANSPORT_CreateQueue *cq) | 553 | const struct GNUNET_TRANSPORT_CreateQueue *cq) |
526 | { | 554 | { |
527 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | 555 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; |
528 | const char *addr = (const char *) &cq[1]; | 556 | const char *addr = (const char *) &cq[1]; |
529 | 557 | struct GNUNET_TRANSPORT_CreateQueueResponse *cqr; | |
558 | struct GNUNET_MQ_Envelope *env; | ||
559 | |||
530 | if (GNUNET_OK != | 560 | if (GNUNET_OK != |
531 | ch->mq_init (ch->mq_init_cls, | 561 | ch->mq_init (ch->mq_init_cls, |
532 | &cq->receiver, | 562 | &cq->receiver, |
@@ -535,8 +565,17 @@ handle_create_queue (void *cls, | |||
535 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 565 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
536 | "Address `%s' invalid for this communicator\n", | 566 | "Address `%s' invalid for this communicator\n", |
537 | addr); | 567 | addr); |
538 | // TODO: do we notify the transport!? | 568 | env = GNUNET_MQ_msg (cqr, |
569 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL); | ||
539 | } | 570 | } |
571 | else | ||
572 | { | ||
573 | env = GNUNET_MQ_msg (cqr, | ||
574 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK); | ||
575 | } | ||
576 | cqr->request_id = cq->request_id; | ||
577 | GNUNET_MQ_send (ch->mq, | ||
578 | env); | ||
540 | } | 579 | } |
541 | 580 | ||
542 | 581 | ||
@@ -550,11 +589,12 @@ handle_create_queue (void *cls, | |||
550 | */ | 589 | */ |
551 | static int | 590 | static int |
552 | check_send_msg (void *cls, | 591 | check_send_msg (void *cls, |
553 | struct GNUNET_TRANSPORT_SendMessageTo *smt) | 592 | const struct GNUNET_TRANSPORT_SendMessageTo *smt) |
554 | { | 593 | { |
555 | uint16_t len = ntohs (smt->header.size) - sizeof (*smt); | 594 | uint16_t len = ntohs (smt->header.size) - sizeof (*smt); |
556 | const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1]; | 595 | const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1]; |
557 | 596 | ||
597 | (void) cls; | ||
558 | if (ntohs (mh->size) != len) | 598 | if (ntohs (mh->size) != len) |
559 | { | 599 | { |
560 | GNUNET_break (0); | 600 | GNUNET_break (0); |
@@ -584,9 +624,9 @@ send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, | |||
584 | 624 | ||
585 | env = GNUNET_MQ_msg (ack, | 625 | env = GNUNET_MQ_msg (ack, |
586 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK); | 626 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK); |
587 | ack->status = htonl (GNUNET_OK); | 627 | ack->status = htonl (status); |
588 | ack->mid = ap->mid; | 628 | ack->mid = mid; |
589 | ack->receiver = ap->receiver; | 629 | ack->receiver = *receiver; |
590 | GNUNET_MQ_send (ch->mq, | 630 | GNUNET_MQ_send (ch->mq, |
591 | env); | 631 | env); |
592 | } | 632 | } |
@@ -623,18 +663,18 @@ send_ack_cb (void *cls) | |||
623 | */ | 663 | */ |
624 | static void | 664 | static void |
625 | handle_send_msg (void *cls, | 665 | handle_send_msg (void *cls, |
626 | struct GNUNET_TRANSPORT_SendMessageTo *smt) | 666 | const struct GNUNET_TRANSPORT_SendMessageTo *smt) |
627 | { | 667 | { |
628 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; | 668 | struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; |
629 | const struct GNUNET_MessageHeader *mh; | 669 | const struct GNUNET_MessageHeader *mh; |
630 | struct GNUNET_MQ_Envelope *env; | 670 | struct GNUNET_MQ_Envelope *env; |
631 | struct AckPending *ap; | 671 | struct AckPending *ap; |
632 | struct QueueHandle *qh; | 672 | struct GNUNET_TRANSPORT_QueueHandle *qh; |
633 | 673 | ||
634 | for (qh = ch->queue_head;NULL != qh; qh = qh->next) | 674 | for (qh = ch->queue_head;NULL != qh; qh = qh->next) |
635 | if ( (qh->queue_id == smt->qid) && | 675 | if ( (qh->queue_id == smt->qid) && |
636 | (0 == memcmp (&qh->peer, | 676 | (0 == memcmp (&qh->peer, |
637 | &smt->target, | 677 | &smt->receiver, |
638 | sizeof (struct GNUNET_PeerIdentity))) ) | 678 | sizeof (struct GNUNET_PeerIdentity))) ) |
639 | break; | 679 | break; |
640 | if (NULL == qh) | 680 | if (NULL == qh) |
@@ -653,7 +693,7 @@ handle_send_msg (void *cls, | |||
653 | ap->receiver = smt->receiver; | 693 | ap->receiver = smt->receiver; |
654 | ap->mid = smt->mid; | 694 | ap->mid = smt->mid; |
655 | GNUNET_CONTAINER_DLL_insert (ch->ap_head, | 695 | GNUNET_CONTAINER_DLL_insert (ch->ap_head, |
656 | cp->ap_tail, | 696 | ch->ap_tail, |
657 | ap); | 697 | ap); |
658 | mh = (const struct GNUNET_MessageHeader *) &smt[1]; | 698 | mh = (const struct GNUNET_MessageHeader *) &smt[1]; |
659 | env = GNUNET_MQ_msg_copy (mh); | 699 | env = GNUNET_MQ_msg_copy (mh); |
@@ -679,7 +719,7 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | |||
679 | struct GNUNET_TRANSPORT_IncomingMessageAck, | 719 | struct GNUNET_TRANSPORT_IncomingMessageAck, |
680 | ch), | 720 | ch), |
681 | GNUNET_MQ_hd_var_size (create_queue, | 721 | GNUNET_MQ_hd_var_size (create_queue, |
682 | GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE, | 722 | GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE, |
683 | struct GNUNET_TRANSPORT_CreateQueue, | 723 | struct GNUNET_TRANSPORT_CreateQueue, |
684 | ch), | 724 | ch), |
685 | GNUNET_MQ_hd_var_size (send_msg, | 725 | GNUNET_MQ_hd_var_size (send_msg, |
@@ -688,12 +728,24 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | |||
688 | ch), | 728 | ch), |
689 | GNUNET_MQ_handler_end() | 729 | GNUNET_MQ_handler_end() |
690 | }; | 730 | }; |
731 | struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam; | ||
732 | struct GNUNET_MQ_Envelope *env; | ||
691 | 733 | ||
692 | ch->mq = GNUNET_CLIENT_connect (cfg, | 734 | ch->mq = GNUNET_CLIENT_connect (ch->cfg, |
693 | "transport", | 735 | "transport", |
694 | handlers, | 736 | handlers, |
695 | &error_handler, | 737 | &error_handler, |
696 | ch); | 738 | ch); |
739 | if (NULL == ch->mq) | ||
740 | return; | ||
741 | env = GNUNET_MQ_msg_extra (cam, | ||
742 | strlen (ch->addr_prefix) + 1, | ||
743 | GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR); | ||
744 | memcpy (&cam[1], | ||
745 | ch->addr_prefix, | ||
746 | strlen (ch->addr_prefix) + 1); | ||
747 | GNUNET_MQ_send (ch->mq, | ||
748 | env); | ||
697 | for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; | 749 | for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; |
698 | NULL != ai; | 750 | NULL != ai; |
699 | ai = ai->next) | 751 | ai = ai->next) |
@@ -709,7 +761,9 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | |||
709 | * Connect to the transport service. | 761 | * Connect to the transport service. |
710 | * | 762 | * |
711 | * @param cfg configuration to use | 763 | * @param cfg configuration to use |
712 | * @param name name of the communicator that is connecting | 764 | * @param config_section section of the configuration to use for options |
765 | * @param addr_prefix address prefix for addresses supported by this | ||
766 | * communicator, could be NULL for incoming-only communicators | ||
713 | * @param mtu maximum message size supported by communicator, 0 if | 767 | * @param mtu maximum message size supported by communicator, 0 if |
714 | * sending is not supported, SIZE_MAX for no MTU | 768 | * sending is not supported, SIZE_MAX for no MTU |
715 | * @param mq_init function to call to initialize a message queue given | 769 | * @param mq_init function to call to initialize a message queue given |
@@ -720,7 +774,8 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) | |||
720 | */ | 774 | */ |
721 | struct GNUNET_TRANSPORT_CommunicatorHandle * | 775 | struct GNUNET_TRANSPORT_CommunicatorHandle * |
722 | GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | 776 | GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, |
723 | const char *name, | 777 | const char *config_section, |
778 | const char *addr_prefix, | ||
724 | size_t mtu, | 779 | size_t mtu, |
725 | GNUNET_TRANSPORT_CommunicatorMqInit mq_init, | 780 | GNUNET_TRANSPORT_CommunicatorMqInit mq_init, |
726 | void *mq_init_cls) | 781 | void *mq_init_cls) |
@@ -729,14 +784,15 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle | |||
729 | 784 | ||
730 | ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle); | 785 | ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle); |
731 | ch->cfg = cfg; | 786 | ch->cfg = cfg; |
732 | ch->name = name; | 787 | ch->config_section = config_section; |
788 | ch->addr_prefix = addr_prefix; | ||
733 | ch->mtu = mtu; | 789 | ch->mtu = mtu; |
734 | ch->mq_init = mq_init; | 790 | ch->mq_init = mq_init; |
735 | ch->mq_init_cls = mq_init_cls; | 791 | ch->mq_init_cls = mq_init_cls; |
736 | reconnect (ch); | 792 | reconnect (ch); |
737 | if (GNUNET_OK != | 793 | if (GNUNET_OK != |
738 | GNUNET_CONFIGURATION_get_value_number (cfg, | 794 | GNUNET_CONFIGURATION_get_value_number (cfg, |
739 | name, | 795 | config_section, |
740 | "MAX_QUEUE_LENGTH", | 796 | "MAX_QUEUE_LENGTH", |
741 | &ch->max_queue_length)) | 797 | &ch->max_queue_length)) |
742 | ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; | 798 | ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; |
@@ -798,32 +854,15 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl | |||
798 | struct GNUNET_TRANSPORT_IncomingMessage *im; | 854 | struct GNUNET_TRANSPORT_IncomingMessage *im; |
799 | uint16_t msize; | 855 | uint16_t msize; |
800 | 856 | ||
801 | if (NULL == ai->ch->mq) | 857 | if (NULL == ch->mq) |
802 | return GNUNET_SYSERR; | 858 | return GNUNET_SYSERR; |
803 | if (NULL != cb) | 859 | if ( (NULL == cb) && |
860 | (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) ) | ||
804 | { | 861 | { |
805 | struct FlowControl *fc; | 862 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
806 | 863 | "Dropping message: transprot is too slow, queue length %llu exceeded\n", | |
807 | im->fc_on = htonl (GNUNET_YES); | 864 | ch->max_queue_length); |
808 | im->fc_id = ai->ch->fc_gen++; | 865 | return GNUNET_NO; |
809 | fc = GNUNET_new (struct FlowControl); | ||
810 | fc->sender = *sender; | ||
811 | fc->id = im->fc_id; | ||
812 | fc->cb = cb; | ||
813 | fc->cb_cls = cb_cls; | ||
814 | GNUNET_CONTAINER_DLL_insert (ch->fc_head, | ||
815 | ch->fc_tail, | ||
816 | fc); | ||
817 | } | ||
818 | else | ||
819 | { | ||
820 | if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) | ||
821 | { | ||
822 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
823 | "Dropping message: transprot is too slow, queue length %u exceeded\n", | ||
824 | ch->max_queue_length); | ||
825 | return GNUNET_NO; | ||
826 | } | ||
827 | } | 866 | } |
828 | 867 | ||
829 | msize = ntohs (msg->size); | 868 | msize = ntohs (msg->size); |
@@ -839,7 +878,22 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl | |||
839 | memcpy (&im[1], | 878 | memcpy (&im[1], |
840 | msg, | 879 | msg, |
841 | msize); | 880 | msize); |
842 | GNUNET_MQ_send (ai->ch->mq, | 881 | if (NULL != cb) |
882 | { | ||
883 | struct FlowControl *fc; | ||
884 | |||
885 | im->fc_on = htonl (GNUNET_YES); | ||
886 | im->fc_id = ch->fc_gen++; | ||
887 | fc = GNUNET_new (struct FlowControl); | ||
888 | fc->sender = *sender; | ||
889 | fc->id = im->fc_id; | ||
890 | fc->cb = cb; | ||
891 | fc->cb_cls = cb_cls; | ||
892 | GNUNET_CONTAINER_DLL_insert (ch->fc_head, | ||
893 | ch->fc_tail, | ||
894 | fc); | ||
895 | } | ||
896 | GNUNET_MQ_send (ch->mq, | ||
843 | env); | 897 | env); |
844 | return GNUNET_OK; | 898 | return GNUNET_OK; |
845 | } | 899 | } |
@@ -927,9 +981,9 @@ GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorH | |||
927 | ai->address = GNUNET_strdup (address); | 981 | ai->address = GNUNET_strdup (address); |
928 | ai->nt = nt; | 982 | ai->nt = nt; |
929 | ai->expiration = expiration; | 983 | ai->expiration = expiration; |
930 | ai->aid = handle->aid_gen++; | 984 | ai->aid = ch->aid_gen++; |
931 | GNUNET_CONTAINER_DLL_insert (handle->ai_head, | 985 | GNUNET_CONTAINER_DLL_insert (ch->ai_head, |
932 | handle->ai_tail, | 986 | ch->ai_tail, |
933 | ai); | 987 | ai); |
934 | send_add_address (ai); | 988 | send_add_address (ai); |
935 | return ai; | 989 | return ai; |