aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api2_communication.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-06-04 14:10:57 +0200
committerChristian Grothoff <christian@grothoff.org>2019-06-04 14:10:57 +0200
commitd6845922193d63e28d743ab77d843af9023d8a14 (patch)
treefec7bb3a0ca63c6a43ac6e61eb7aa1faf9412e9c /src/transport/transport_api2_communication.c
parentb544508d067006323c6f51d84adfaf8adbcd4ee8 (diff)
downloadgnunet-d6845922193d63e28d743ab77d843af9023d8a14.tar.gz
gnunet-d6845922193d63e28d743ab77d843af9023d8a14.zip
use macro
Diffstat (limited to 'src/transport/transport_api2_communication.c')
-rw-r--r--src/transport/transport_api2_communication.c395
1 files changed, 160 insertions, 235 deletions
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
index 6d497c967..8bc18845b 100644
--- a/src/transport/transport_api2_communication.c
+++ b/src/transport/transport_api2_communication.c
@@ -222,7 +222,6 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
222 * Characteristics of the communicator. 222 * Characteristics of the communicator.
223 */ 223 */
224 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc; 224 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
225
226}; 225};
227 226
228 227
@@ -282,7 +281,6 @@ struct GNUNET_TRANSPORT_QueueHandle
282 * Maximum transmission unit for the queue. 281 * Maximum transmission unit for the queue.
283 */ 282 */
284 uint32_t mtu; 283 uint32_t mtu;
285
286}; 284};
287 285
288 286
@@ -329,7 +327,6 @@ struct GNUNET_TRANSPORT_AddressIdentifier
329 * Network type for the address. 327 * Network type for the address.
330 */ 328 */
331 enum GNUNET_NetworkType nt; 329 enum GNUNET_NetworkType nt;
332
333}; 330};
334 331
335 332
@@ -357,15 +354,12 @@ send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
357 if (NULL == ai->ch->mq) 354 if (NULL == ai->ch->mq)
358 return; 355 return;
359 env = GNUNET_MQ_msg_extra (aam, 356 env = GNUNET_MQ_msg_extra (aam,
360 strlen (ai->address) + 1, 357 strlen (ai->address) + 1,
361 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS); 358 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
362 aam->expiration = GNUNET_TIME_relative_hton (ai->expiration); 359 aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
363 aam->nt = htonl ((uint32_t) ai->nt); 360 aam->nt = htonl ((uint32_t) ai->nt);
364 memcpy (&aam[1], 361 memcpy (&aam[1], ai->address, strlen (ai->address) + 1);
365 ai->address, 362 GNUNET_MQ_send (ai->ch->mq, env);
366 strlen (ai->address) + 1);
367 GNUNET_MQ_send (ai->ch->mq,
368 env);
369} 363}
370 364
371 365
@@ -383,11 +377,9 @@ send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
383 377
384 if (NULL == ai->ch->mq) 378 if (NULL == ai->ch->mq)
385 return; 379 return;
386 env = GNUNET_MQ_msg (dam, 380 env = GNUNET_MQ_msg (dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
387 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
388 dam->aid = htonl (ai->aid); 381 dam->aid = htonl (ai->aid);
389 GNUNET_MQ_send (ai->ch->mq, 382 GNUNET_MQ_send (ai->ch->mq, env);
390 env);
391} 383}
392 384
393 385
@@ -406,18 +398,15 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
406 if (NULL == qh->ch->mq) 398 if (NULL == qh->ch->mq)
407 return; 399 return;
408 env = GNUNET_MQ_msg_extra (aqm, 400 env = GNUNET_MQ_msg_extra (aqm,
409 strlen (qh->address) + 1, 401 strlen (qh->address) + 1,
410 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP); 402 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
411 aqm->qid = htonl (qh->queue_id); 403 aqm->qid = htonl (qh->queue_id);
412 aqm->receiver = qh->peer; 404 aqm->receiver = qh->peer;
413 aqm->nt = htonl ((uint32_t) qh->nt); 405 aqm->nt = htonl ((uint32_t) qh->nt);
414 aqm->mtu = htonl (qh->mtu); 406 aqm->mtu = htonl (qh->mtu);
415 aqm->cs = htonl ((uint32_t) qh->cs); 407 aqm->cs = htonl ((uint32_t) qh->cs);
416 memcpy (&aqm[1], 408 memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
417 qh->address, 409 GNUNET_MQ_send (qh->ch->mq, env);
418 strlen (qh->address) + 1);
419 GNUNET_MQ_send (qh->ch->mq,
420 env);
421} 410}
422 411
423 412
@@ -435,12 +424,10 @@ send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
435 424
436 if (NULL == qh->ch->mq) 425 if (NULL == qh->ch->mq)
437 return; 426 return;
438 env = GNUNET_MQ_msg (dqm, 427 env = GNUNET_MQ_msg (dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
439 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
440 dqm->qid = htonl (qh->queue_id); 428 dqm->qid = htonl (qh->queue_id);
441 dqm->receiver = qh->peer; 429 dqm->receiver = qh->peer;
442 GNUNET_MQ_send (qh->ch->mq, 430 GNUNET_MQ_send (qh->ch->mq, env);
443 env);
444} 431}
445 432
446 433
@@ -458,26 +445,17 @@ disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
458 struct FlowControl *fcn; 445 struct FlowControl *fcn;
459 struct AckPending *apn; 446 struct AckPending *apn;
460 447
461 for (struct FlowControl *fc = ch->fc_head; 448 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn)
462 NULL != fc;
463 fc = fcn)
464 { 449 {
465 fcn = fc->next; 450 fcn = fc->next;
466 GNUNET_CONTAINER_DLL_remove (ch->fc_head, 451 GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
467 ch->fc_tail, 452 fc->cb (fc->cb_cls, GNUNET_SYSERR);
468 fc);
469 fc->cb (fc->cb_cls,
470 GNUNET_SYSERR);
471 GNUNET_free (fc); 453 GNUNET_free (fc);
472 } 454 }
473 for (struct AckPending *ap = ch->ap_head; 455 for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn)
474 NULL != ap;
475 ap = apn)
476 { 456 {
477 apn = ap->next; 457 apn = ap->next;
478 GNUNET_CONTAINER_DLL_remove (ch->ap_head, 458 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
479 ch->ap_tail,
480 ap);
481 GNUNET_free (ap); 459 GNUNET_free (ap);
482 } 460 }
483 if (NULL == ch->mq) 461 if (NULL == ch->mq)
@@ -491,14 +469,13 @@ disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
491 * Function called on MQ errors. 469 * Function called on MQ errors.
492 */ 470 */
493static void 471static void
494error_handler (void *cls, 472error_handler (void *cls, enum GNUNET_MQ_Error error)
495 enum GNUNET_MQ_Error error)
496{ 473{
497 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 474 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
498 475
499 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 476 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
500 "MQ failure %d, reconnecting to transport service.\n", 477 "MQ failure %d, reconnecting to transport service.\n",
501 error); 478 error);
502 disconnect (ch); 479 disconnect (ch);
503 /* TODO: maybe do this with exponential backoff/delay */ 480 /* TODO: maybe do this with exponential backoff/delay */
504 reconnect (ch); 481 reconnect (ch);
@@ -513,25 +490,21 @@ error_handler (void *cls,
513 * @param incoming_ack the ack 490 * @param incoming_ack the ack
514 */ 491 */
515static void 492static void
516handle_incoming_ack (void *cls, 493handle_incoming_ack (
517 const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack) 494 void *cls,
495 const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
518{ 496{
519 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 497 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
520 498
521 for (struct FlowControl *fc = ch->fc_head; 499 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next)
522 NULL != fc;
523 fc = fc->next)
524 { 500 {
525 if ( (fc->id == incoming_ack->fc_id) && 501 if ((fc->id == incoming_ack->fc_id) &&
526 (0 == memcmp (&fc->sender, 502 (0 == memcmp (&fc->sender,
527 &incoming_ack->sender, 503 &incoming_ack->sender,
528 sizeof (struct GNUNET_PeerIdentity))) ) 504 sizeof (struct GNUNET_PeerIdentity))))
529 { 505 {
530 GNUNET_CONTAINER_DLL_remove (ch->fc_head, 506 GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
531 ch->fc_tail, 507 fc->cb (fc->cb_cls, GNUNET_OK);
532 fc);
533 fc->cb (fc->cb_cls,
534 GNUNET_OK);
535 GNUNET_free (fc); 508 GNUNET_free (fc);
536 return; 509 return;
537 } 510 }
@@ -552,19 +525,10 @@ handle_incoming_ack (void *cls,
552 * @return #GNUNET_OK if @a smt is well-formed 525 * @return #GNUNET_OK if @a smt is well-formed
553 */ 526 */
554static int 527static int
555check_create_queue (void *cls, 528check_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
556 const struct GNUNET_TRANSPORT_CreateQueue *cq)
557{ 529{
558 uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
559 const char *addr = (const char *) &cq[1];
560
561 (void) cls; 530 (void) cls;
562 if ( (0 == len) || 531 GNUNET_MQ_check_zero_termination (cq);
563 ('\0' != addr[len-1]) )
564 {
565 GNUNET_break (0);
566 return GNUNET_SYSERR;
567 }
568 return GNUNET_OK; 532 return GNUNET_OK;
569} 533}
570 534
@@ -576,33 +540,26 @@ check_create_queue (void *cls,
576 * @param cq the queue creation request 540 * @param cq the queue creation request
577 */ 541 */
578static void 542static void
579handle_create_queue (void *cls, 543handle_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
580 const struct GNUNET_TRANSPORT_CreateQueue *cq)
581{ 544{
582 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 545 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
583 const char *addr = (const char *) &cq[1]; 546 const char *addr = (const char *) &cq[1];
584 struct GNUNET_TRANSPORT_CreateQueueResponse *cqr; 547 struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
585 struct GNUNET_MQ_Envelope *env; 548 struct GNUNET_MQ_Envelope *env;
586 549
587 if (GNUNET_OK != 550 if (GNUNET_OK != ch->mq_init (ch->mq_init_cls, &cq->receiver, addr))
588 ch->mq_init (ch->mq_init_cls,
589 &cq->receiver,
590 addr))
591 { 551 {
592 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 552 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
593 "Address `%s' invalid for this communicator\n", 553 "Address `%s' invalid for this communicator\n",
594 addr); 554 addr);
595 env = GNUNET_MQ_msg (cqr, 555 env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
596 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
597 } 556 }
598 else 557 else
599 { 558 {
600 env = GNUNET_MQ_msg (cqr, 559 env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
601 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
602 } 560 }
603 cqr->request_id = cq->request_id; 561 cqr->request_id = cq->request_id;
604 GNUNET_MQ_send (ch->mq, 562 GNUNET_MQ_send (ch->mq, env);
605 env);
606} 563}
607 564
608 565
@@ -615,8 +572,7 @@ handle_create_queue (void *cls,
615 * @return #GNUNET_OK if @a smt is well-formed 572 * @return #GNUNET_OK if @a smt is well-formed
616 */ 573 */
617static int 574static int
618check_send_msg (void *cls, 575check_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
619 const struct GNUNET_TRANSPORT_SendMessageTo *smt)
620{ 576{
621 (void) cls; 577 (void) cls;
622 GNUNET_MQ_check_boxed_message (smt); 578 GNUNET_MQ_check_boxed_message (smt);
@@ -635,20 +591,18 @@ check_send_msg (void *cls,
635 */ 591 */
636static void 592static void
637send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 593send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
638 int status, 594 int status,
639 const struct GNUNET_PeerIdentity *receiver, 595 const struct GNUNET_PeerIdentity *receiver,
640 uint64_t mid) 596 uint64_t mid)
641{ 597{
642 struct GNUNET_MQ_Envelope *env; 598 struct GNUNET_MQ_Envelope *env;
643 struct GNUNET_TRANSPORT_SendMessageToAck *ack; 599 struct GNUNET_TRANSPORT_SendMessageToAck *ack;
644 600
645 env = GNUNET_MQ_msg (ack, 601 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
646 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
647 ack->status = htonl (status); 602 ack->status = htonl (status);
648 ack->mid = mid; 603 ack->mid = mid;
649 ack->receiver = *receiver; 604 ack->receiver = *receiver;
650 GNUNET_MQ_send (ch->mq, 605 GNUNET_MQ_send (ch->mq, env);
651 env);
652} 606}
653 607
654 608
@@ -664,13 +618,8 @@ send_ack_cb (void *cls)
664 struct AckPending *ap = cls; 618 struct AckPending *ap = cls;
665 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch; 619 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
666 620
667 GNUNET_CONTAINER_DLL_remove (ch->ap_head, 621 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
668 ch->ap_tail, 622 send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid);
669 ap);
670 send_ack (ch,
671 GNUNET_OK,
672 &ap->receiver,
673 ap->mid);
674 GNUNET_free (ap); 623 GNUNET_free (ap);
675} 624}
676 625
@@ -682,8 +631,7 @@ send_ack_cb (void *cls)
682 * @param smt the transmission request 631 * @param smt the transmission request
683 */ 632 */
684static void 633static void
685handle_send_msg (void *cls, 634handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
686 const struct GNUNET_TRANSPORT_SendMessageTo *smt)
687{ 635{
688 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 636 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
689 const struct GNUNET_MessageHeader *mh; 637 const struct GNUNET_MessageHeader *mh;
@@ -691,37 +639,29 @@ handle_send_msg (void *cls,
691 struct AckPending *ap; 639 struct AckPending *ap;
692 struct GNUNET_TRANSPORT_QueueHandle *qh; 640 struct GNUNET_TRANSPORT_QueueHandle *qh;
693 641
694 for (qh = ch->queue_head;NULL != qh; qh = qh->next) 642 for (qh = ch->queue_head; NULL != qh; qh = qh->next)
695 if ( (qh->queue_id == smt->qid) && 643 if ((qh->queue_id == smt->qid) &&
696 (0 == memcmp (&qh->peer, 644 (0 == memcmp (&qh->peer,
697 &smt->receiver, 645 &smt->receiver,
698 sizeof (struct GNUNET_PeerIdentity))) ) 646 sizeof (struct GNUNET_PeerIdentity))))
699 break; 647 break;
700 if (NULL == qh) 648 if (NULL == qh)
701 { 649 {
702 /* queue is already gone, tell transport this one failed */ 650 /* queue is already gone, tell transport this one failed */
703 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 651 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
704 "Transmission failed, queue no longer exists.\n"); 652 "Transmission failed, queue no longer exists.\n");
705 send_ack (ch, 653 send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid);
706 GNUNET_NO,
707 &smt->receiver,
708 smt->mid);
709 return; 654 return;
710 } 655 }
711 ap = GNUNET_new (struct AckPending); 656 ap = GNUNET_new (struct AckPending);
712 ap->ch = ch; 657 ap->ch = ch;
713 ap->receiver = smt->receiver; 658 ap->receiver = smt->receiver;
714 ap->mid = smt->mid; 659 ap->mid = smt->mid;
715 GNUNET_CONTAINER_DLL_insert (ch->ap_head, 660 GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap);
716 ch->ap_tail,
717 ap);
718 mh = (const struct GNUNET_MessageHeader *) &smt[1]; 661 mh = (const struct GNUNET_MessageHeader *) &smt[1];
719 env = GNUNET_MQ_msg_copy (mh); 662 env = GNUNET_MQ_msg_copy (mh);
720 GNUNET_MQ_notify_sent (env, 663 GNUNET_MQ_notify_sent (env, &send_ack_cb, ap);
721 &send_ack_cb, 664 GNUNET_MQ_send (qh->mq, env);
722 ap);
723 GNUNET_MQ_send (qh->mq,
724 env);
725} 665}
726 666
727 667
@@ -734,8 +674,9 @@ handle_send_msg (void *cls,
734 * @return #GNUNET_OK if @a smt is well-formed 674 * @return #GNUNET_OK if @a smt is well-formed
735 */ 675 */
736static int 676static int
737check_backchannel_incoming (void *cls, 677check_backchannel_incoming (
738 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi) 678 void *cls,
679 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
739{ 680{
740 (void) cls; 681 (void) cls;
741 GNUNET_MQ_check_boxed_message (bi); 682 GNUNET_MQ_check_boxed_message (bi);
@@ -750,18 +691,20 @@ check_backchannel_incoming (void *cls,
750 * @param bi the backchannel message 691 * @param bi the backchannel message
751 */ 692 */
752static void 693static void
753handle_backchannel_incoming (void *cls, 694handle_backchannel_incoming (
754 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi) 695 void *cls,
696 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
755{ 697{
756 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 698 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
757 699
758 if (NULL != ch->notify_cb) 700 if (NULL != ch->notify_cb)
759 ch->notify_cb (ch->notify_cb_cls, 701 ch->notify_cb (ch->notify_cb_cls,
760 &bi->pid, 702 &bi->pid,
761 (const struct GNUNET_MessageHeader *) &bi[1]); 703 (const struct GNUNET_MessageHeader *) &bi[1]);
762 else 704 else
763 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 705 GNUNET_log (
764 _("Dropped backchanel message: handler not provided by communicator\n")); 706 GNUNET_ERROR_TYPE_INFO,
707 _ ("Dropped backchanel message: handler not provided by communicator\n"));
765} 708}
766 709
767 710
@@ -773,50 +716,42 @@ handle_backchannel_incoming (void *cls,
773static void 716static void
774reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) 717reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
775{ 718{
776 struct GNUNET_MQ_MessageHandler handlers[] = { 719 struct GNUNET_MQ_MessageHandler handlers[] =
777 GNUNET_MQ_hd_fixed_size (incoming_ack, 720 {GNUNET_MQ_hd_fixed_size (incoming_ack,
778 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK, 721 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
779 struct GNUNET_TRANSPORT_IncomingMessageAck, 722 struct GNUNET_TRANSPORT_IncomingMessageAck,
780 ch), 723 ch),
781 GNUNET_MQ_hd_var_size (create_queue, 724 GNUNET_MQ_hd_var_size (create_queue,
782 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE, 725 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
783 struct GNUNET_TRANSPORT_CreateQueue, 726 struct GNUNET_TRANSPORT_CreateQueue,
784 ch), 727 ch),
785 GNUNET_MQ_hd_var_size (send_msg, 728 GNUNET_MQ_hd_var_size (send_msg,
786 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG, 729 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
787 struct GNUNET_TRANSPORT_SendMessageTo, 730 struct GNUNET_TRANSPORT_SendMessageTo,
788 ch), 731 ch),
789 GNUNET_MQ_hd_var_size (backchannel_incoming, 732 GNUNET_MQ_hd_var_size (
790 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING, 733 backchannel_incoming,
791 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming, 734 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING,
792 ch), 735 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming,
793 GNUNET_MQ_handler_end() 736 ch),
794 }; 737 GNUNET_MQ_handler_end ()};
795 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam; 738 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
796 struct GNUNET_MQ_Envelope *env; 739 struct GNUNET_MQ_Envelope *env;
797 740
798 ch->mq = GNUNET_CLIENT_connect (ch->cfg, 741 ch->mq =
799 "transport", 742 GNUNET_CLIENT_connect (ch->cfg, "transport", handlers, &error_handler, ch);
800 handlers,
801 &error_handler,
802 ch);
803 if (NULL == ch->mq) 743 if (NULL == ch->mq)
804 return; 744 return;
805 env = GNUNET_MQ_msg_extra (cam, 745 env = GNUNET_MQ_msg_extra (cam,
806 strlen (ch->addr_prefix) + 1, 746 strlen (ch->addr_prefix) + 1,
807 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR); 747 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
808 cam->cc = htonl ((uint32_t) ch->cc); 748 cam->cc = htonl ((uint32_t) ch->cc);
809 memcpy (&cam[1], 749 memcpy (&cam[1], ch->addr_prefix, strlen (ch->addr_prefix) + 1);
810 ch->addr_prefix, 750 GNUNET_MQ_send (ch->mq, env);
811 strlen (ch->addr_prefix) + 1); 751 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai;
812 GNUNET_MQ_send (ch->mq,
813 env);
814 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
815 NULL != ai;
816 ai = ai->next) 752 ai = ai->next)
817 send_add_address (ai); 753 send_add_address (ai);
818 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; 754 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh;
819 NULL != qh;
820 qh = qh->next) 755 qh = qh->next)
821 send_add_queue (qh); 756 send_add_queue (qh);
822} 757}
@@ -841,14 +776,15 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
841 * @return NULL on error 776 * @return NULL on error
842 */ 777 */
843struct GNUNET_TRANSPORT_CommunicatorHandle * 778struct GNUNET_TRANSPORT_CommunicatorHandle *
844GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, 779GNUNET_TRANSPORT_communicator_connect (
845 const char *config_section, 780 const struct GNUNET_CONFIGURATION_Handle *cfg,
846 const char *addr_prefix, 781 const char *config_section,
847 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc, 782 const char *addr_prefix,
848 GNUNET_TRANSPORT_CommunicatorMqInit mq_init, 783 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
849 void *mq_init_cls, 784 GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
850 GNUNET_TRANSPORT_CommunicatorNotify notify_cb, 785 void *mq_init_cls,
851 void *notify_cb_cls) 786 GNUNET_TRANSPORT_CommunicatorNotify notify_cb,
787 void *notify_cb_cls)
852{ 788{
853 struct GNUNET_TRANSPORT_CommunicatorHandle *ch; 789 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
854 790
@@ -864,9 +800,9 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle
864 reconnect (ch); 800 reconnect (ch);
865 if (GNUNET_OK != 801 if (GNUNET_OK !=
866 GNUNET_CONFIGURATION_get_value_number (cfg, 802 GNUNET_CONFIGURATION_get_value_number (cfg,
867 config_section, 803 config_section,
868 "MAX_QUEUE_LENGTH", 804 "MAX_QUEUE_LENGTH",
869 &ch->max_queue_length)) 805 &ch->max_queue_length))
870 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; 806 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
871 if (NULL == ch->mq) 807 if (NULL == ch->mq)
872 { 808 {
@@ -883,7 +819,8 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle
883 * @param ch handle returned from connect 819 * @param ch handle returned from connect
884 */ 820 */
885void 821void
886GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) 822GNUNET_TRANSPORT_communicator_disconnect (
823 struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
887{ 824{
888 disconnect (ch); 825 disconnect (ch);
889 while (NULL != ch->ai_head) 826 while (NULL != ch->ai_head)
@@ -919,12 +856,13 @@ GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHa
919 * the tranport service is not yet up 856 * the tranport service is not yet up
920 */ 857 */
921int 858int
922GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 859GNUNET_TRANSPORT_communicator_receive (
923 const struct GNUNET_PeerIdentity *sender, 860 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
924 const struct GNUNET_MessageHeader *msg, 861 const struct GNUNET_PeerIdentity *sender,
925 struct GNUNET_TIME_Relative expected_addr_validity, 862 const struct GNUNET_MessageHeader *msg,
926 GNUNET_TRANSPORT_MessageCompletedCallback cb, 863 struct GNUNET_TIME_Relative expected_addr_validity,
927 void *cb_cls) 864 GNUNET_TRANSPORT_MessageCompletedCallback cb,
865 void *cb_cls)
928{ 866{
929 struct GNUNET_MQ_Envelope *env; 867 struct GNUNET_MQ_Envelope *env;
930 struct GNUNET_TRANSPORT_IncomingMessage *im; 868 struct GNUNET_TRANSPORT_IncomingMessage *im;
@@ -932,29 +870,27 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
932 870
933 if (NULL == ch->mq) 871 if (NULL == ch->mq)
934 return GNUNET_SYSERR; 872 return GNUNET_SYSERR;
935 if ( (NULL == cb) && 873 if ((NULL == cb) && (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length))
936 (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) )
937 { 874 {
938 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 875 GNUNET_log (
939 "Dropping message: transprot is too slow, queue length %llu exceeded\n", 876 GNUNET_ERROR_TYPE_WARNING,
940 ch->max_queue_length); 877 "Dropping message: transprot is too slow, queue length %llu exceeded\n",
878 ch->max_queue_length);
941 return GNUNET_NO; 879 return GNUNET_NO;
942 } 880 }
943 881
944 msize = ntohs (msg->size); 882 msize = ntohs (msg->size);
945 env = GNUNET_MQ_msg_extra (im, 883 env =
946 msize, 884 GNUNET_MQ_msg_extra (im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
947 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
948 if (NULL == env) 885 if (NULL == env)
949 { 886 {
950 GNUNET_break (0); 887 GNUNET_break (0);
951 return GNUNET_SYSERR; 888 return GNUNET_SYSERR;
952 } 889 }
953 im->expected_address_validity = GNUNET_TIME_relative_hton (expected_addr_validity); 890 im->expected_address_validity =
891 GNUNET_TIME_relative_hton (expected_addr_validity);
954 im->sender = *sender; 892 im->sender = *sender;
955 memcpy (&im[1], 893 memcpy (&im[1], msg, msize);
956 msg,
957 msize);
958 if (NULL != cb) 894 if (NULL != cb)
959 { 895 {
960 struct FlowControl *fc; 896 struct FlowControl *fc;
@@ -966,12 +902,9 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
966 fc->id = im->fc_id; 902 fc->id = im->fc_id;
967 fc->cb = cb; 903 fc->cb = cb;
968 fc->cb_cls = cb_cls; 904 fc->cb_cls = cb_cls;
969 GNUNET_CONTAINER_DLL_insert (ch->fc_head, 905 GNUNET_CONTAINER_DLL_insert (ch->fc_head, ch->fc_tail, fc);
970 ch->fc_tail,
971 fc);
972 } 906 }
973 GNUNET_MQ_send (ch->mq, 907 GNUNET_MQ_send (ch->mq, env);
974 env);
975 return GNUNET_OK; 908 return GNUNET_OK;
976} 909}
977 910
@@ -996,13 +929,14 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
996 * @return API handle identifying the new MQ 929 * @return API handle identifying the new MQ
997 */ 930 */
998struct GNUNET_TRANSPORT_QueueHandle * 931struct GNUNET_TRANSPORT_QueueHandle *
999GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 932GNUNET_TRANSPORT_communicator_mq_add (
1000 const struct GNUNET_PeerIdentity *peer, 933 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1001 const char *address, 934 const struct GNUNET_PeerIdentity *peer,
1002 uint32_t mtu, 935 const char *address,
1003 enum GNUNET_NetworkType nt, 936 uint32_t mtu,
1004 enum GNUNET_TRANSPORT_ConnectionStatus cs, 937 enum GNUNET_NetworkType nt,
1005 struct GNUNET_MQ_Handle *mq) 938 enum GNUNET_TRANSPORT_ConnectionStatus cs,
939 struct GNUNET_MQ_Handle *mq)
1006{ 940{
1007 struct GNUNET_TRANSPORT_QueueHandle *qh; 941 struct GNUNET_TRANSPORT_QueueHandle *qh;
1008 942
@@ -1015,9 +949,7 @@ GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle
1015 qh->cs = cs; 949 qh->cs = cs;
1016 qh->mq = mq; 950 qh->mq = mq;
1017 qh->queue_id = ch->queue_gen++; 951 qh->queue_id = ch->queue_gen++;
1018 GNUNET_CONTAINER_DLL_insert (ch->queue_head, 952 GNUNET_CONTAINER_DLL_insert (ch->queue_head, ch->queue_tail, qh);
1019 ch->queue_tail,
1020 qh);
1021 send_add_queue (qh); 953 send_add_queue (qh);
1022 return qh; 954 return qh;
1023} 955}
@@ -1035,9 +967,7 @@ GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
1035 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch; 967 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
1036 968
1037 send_del_queue (qh); 969 send_del_queue (qh);
1038 GNUNET_CONTAINER_DLL_remove (ch->queue_head, 970 GNUNET_CONTAINER_DLL_remove (ch->queue_head, ch->queue_tail, qh);
1039 ch->queue_tail,
1040 qh);
1041 GNUNET_MQ_destroy (qh->mq); 971 GNUNET_MQ_destroy (qh->mq);
1042 GNUNET_free (qh->address); 972 GNUNET_free (qh->address);
1043 GNUNET_free (qh); 973 GNUNET_free (qh);
@@ -1054,10 +984,11 @@ GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
1054 * @param expiration when does the communicator forsee this address expiring? 984 * @param expiration when does the communicator forsee this address expiring?
1055 */ 985 */
1056struct GNUNET_TRANSPORT_AddressIdentifier * 986struct GNUNET_TRANSPORT_AddressIdentifier *
1057GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 987GNUNET_TRANSPORT_communicator_address_add (
1058 const char *address, 988 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1059 enum GNUNET_NetworkType nt, 989 const char *address,
1060 struct GNUNET_TIME_Relative expiration) 990 enum GNUNET_NetworkType nt,
991 struct GNUNET_TIME_Relative expiration)
1061{ 992{
1062 struct GNUNET_TRANSPORT_AddressIdentifier *ai; 993 struct GNUNET_TRANSPORT_AddressIdentifier *ai;
1063 994
@@ -1067,9 +998,7 @@ GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorH
1067 ai->nt = nt; 998 ai->nt = nt;
1068 ai->expiration = expiration; 999 ai->expiration = expiration;
1069 ai->aid = ch->aid_gen++; 1000 ai->aid = ch->aid_gen++;
1070 GNUNET_CONTAINER_DLL_insert (ch->ai_head, 1001 GNUNET_CONTAINER_DLL_insert (ch->ai_head, ch->ai_tail, ai);
1071 ch->ai_tail,
1072 ai);
1073 send_add_address (ai); 1002 send_add_address (ai);
1074 return ai; 1003 return ai;
1075} 1004}
@@ -1082,14 +1011,13 @@ GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorH
1082 * @param ai address that is no longer provided 1011 * @param ai address that is no longer provided
1083 */ 1012 */
1084void 1013void
1085GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai) 1014GNUNET_TRANSPORT_communicator_address_remove (
1015 struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1086{ 1016{
1087 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch; 1017 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1088 1018
1089 send_del_address (ai); 1019 send_del_address (ai);
1090 GNUNET_CONTAINER_DLL_remove (ch->ai_head, 1020 GNUNET_CONTAINER_DLL_remove (ch->ai_head, ch->ai_tail, ai);
1091 ch->ai_tail,
1092 ai);
1093 GNUNET_free (ai->address); 1021 GNUNET_free (ai->address);
1094 GNUNET_free (ai); 1022 GNUNET_free (ai);
1095} 1023}
@@ -1113,10 +1041,11 @@ GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIde
1113 * notify-API to @a pid's communicator @a comm 1041 * notify-API to @a pid's communicator @a comm
1114 */ 1042 */
1115void 1043void
1116GNUNET_TRANSPORT_communicator_notify (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 1044GNUNET_TRANSPORT_communicator_notify (
1117 const struct GNUNET_PeerIdentity *pid, 1045 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1118 const char *comm, 1046 const struct GNUNET_PeerIdentity *pid,
1119 const struct GNUNET_MessageHeader *header) 1047 const char *comm,
1048 const struct GNUNET_MessageHeader *header)
1120{ 1049{
1121 struct GNUNET_MQ_Envelope *env; 1050 struct GNUNET_MQ_Envelope *env;
1122 struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb; 1051 struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
@@ -1124,18 +1053,14 @@ GNUNET_TRANSPORT_communicator_notify (struct GNUNET_TRANSPORT_CommunicatorHandle
1124 uint16_t mlen = ntohs (header->size); 1053 uint16_t mlen = ntohs (header->size);
1125 1054
1126 GNUNET_assert (mlen + slen + sizeof (*cb) < UINT16_MAX); 1055 GNUNET_assert (mlen + slen + sizeof (*cb) < UINT16_MAX);
1127 env = GNUNET_MQ_msg_extra (cb, 1056 env =
1128 slen + mlen, 1057 GNUNET_MQ_msg_extra (cb,
1129 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL); 1058 slen + mlen,
1059 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
1130 cb->pid = *pid; 1060 cb->pid = *pid;
1131 memcpy (&cb[1], 1061 memcpy (&cb[1], header, mlen);
1132 header, 1062 memcpy (((char *) &cb[1]) + mlen, comm, slen);
1133 mlen); 1063 GNUNET_MQ_send (ch->mq, env);
1134 memcpy (((char *)&cb[1]) + mlen,
1135 comm,
1136 slen);
1137 GNUNET_MQ_send (ch->mq,
1138 env);
1139} 1064}
1140 1065
1141 1066