summaryrefslogtreecommitdiff
path: root/src/transport/transport_api2_communication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport_api2_communication.c')
-rw-r--r--src/transport/transport_api2_communication.c455
1 files changed, 230 insertions, 225 deletions
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
index f9c387a87..de5b42c3f 100644
--- a/src/transport/transport_api2_communication.c
+++ b/src/transport/transport_api2_communication.c
@@ -42,7 +42,8 @@
42/** 42/**
43 * Information we track per packet to enable flow control. 43 * Information we track per packet to enable flow control.
44 */ 44 */
45struct FlowControl { 45struct FlowControl
46{
46 /** 47 /**
47 * Kept in a DLL. 48 * Kept in a DLL.
48 */ 49 */
@@ -79,7 +80,8 @@ struct FlowControl {
79 * Information we track per message to tell the transport about 80 * Information we track per message to tell the transport about
80 * success or failures. 81 * success or failures.
81 */ 82 */
82struct AckPending { 83struct AckPending
84{
83 /** 85 /**
84 * Kept in a DLL. 86 * Kept in a DLL.
85 */ 87 */
@@ -110,7 +112,8 @@ struct AckPending {
110/** 112/**
111 * Opaque handle to the transport service for communicators. 113 * Opaque handle to the transport service for communicators.
112 */ 114 */
113struct GNUNET_TRANSPORT_CommunicatorHandle { 115struct GNUNET_TRANSPORT_CommunicatorHandle
116{
114 /** 117 /**
115 * Head of DLL of addresses this communicator offers to the transport service. 118 * Head of DLL of addresses this communicator offers to the transport service.
116 */ 119 */
@@ -226,7 +229,8 @@ struct GNUNET_TRANSPORT_CommunicatorHandle {
226 * Handle returned to identify the internal data structure the transport 229 * Handle returned to identify the internal data structure the transport
227 * API has created to manage a message queue to a particular peer. 230 * API has created to manage a message queue to a particular peer.
228 */ 231 */
229struct GNUNET_TRANSPORT_QueueHandle { 232struct GNUNET_TRANSPORT_QueueHandle
233{
230 /** 234 /**
231 * Kept in a DLL. 235 * Kept in a DLL.
232 */ 236 */
@@ -283,7 +287,8 @@ struct GNUNET_TRANSPORT_QueueHandle {
283 * Internal representation of an address a communicator is 287 * Internal representation of an address a communicator is
284 * currently providing for the transport service. 288 * currently providing for the transport service.
285 */ 289 */
286struct GNUNET_TRANSPORT_AddressIdentifier { 290struct GNUNET_TRANSPORT_AddressIdentifier
291{
287 /** 292 /**
288 * Kept in a DLL. 293 * Kept in a DLL.
289 */ 294 */
@@ -329,7 +334,7 @@ struct GNUNET_TRANSPORT_AddressIdentifier {
329 * @param ch handle to reconnect 334 * @param ch handle to reconnect
330 */ 335 */
331static void 336static void
332reconnect(struct GNUNET_TRANSPORT_CommunicatorHandle *ch); 337reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
333 338
334 339
335/** 340/**
@@ -339,20 +344,20 @@ reconnect(struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
339 * @param ai address to add 344 * @param ai address to add
340 */ 345 */
341static void 346static void
342send_add_address(struct GNUNET_TRANSPORT_AddressIdentifier *ai) 347send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
343{ 348{
344 struct GNUNET_MQ_Envelope *env; 349 struct GNUNET_MQ_Envelope *env;
345 struct GNUNET_TRANSPORT_AddAddressMessage *aam; 350 struct GNUNET_TRANSPORT_AddAddressMessage *aam;
346 351
347 if (NULL == ai->ch->mq) 352 if (NULL == ai->ch->mq)
348 return; 353 return;
349 env = GNUNET_MQ_msg_extra(aam, 354 env = GNUNET_MQ_msg_extra (aam,
350 strlen(ai->address) + 1, 355 strlen (ai->address) + 1,
351 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS); 356 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
352 aam->expiration = GNUNET_TIME_relative_hton(ai->expiration); 357 aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
353 aam->nt = htonl((uint32_t)ai->nt); 358 aam->nt = htonl ((uint32_t) ai->nt);
354 memcpy(&aam[1], ai->address, strlen(ai->address) + 1); 359 memcpy (&aam[1], ai->address, strlen (ai->address) + 1);
355 GNUNET_MQ_send(ai->ch->mq, env); 360 GNUNET_MQ_send (ai->ch->mq, env);
356} 361}
357 362
358 363
@@ -363,16 +368,16 @@ send_add_address(struct GNUNET_TRANSPORT_AddressIdentifier *ai)
363 * @param ai address to delete 368 * @param ai address to delete
364 */ 369 */
365static void 370static void
366send_del_address(struct GNUNET_TRANSPORT_AddressIdentifier *ai) 371send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
367{ 372{
368 struct GNUNET_MQ_Envelope *env; 373 struct GNUNET_MQ_Envelope *env;
369 struct GNUNET_TRANSPORT_DelAddressMessage *dam; 374 struct GNUNET_TRANSPORT_DelAddressMessage *dam;
370 375
371 if (NULL == ai->ch->mq) 376 if (NULL == ai->ch->mq)
372 return; 377 return;
373 env = GNUNET_MQ_msg(dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS); 378 env = GNUNET_MQ_msg (dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
374 dam->aid = htonl(ai->aid); 379 dam->aid = htonl (ai->aid);
375 GNUNET_MQ_send(ai->ch->mq, env); 380 GNUNET_MQ_send (ai->ch->mq, env);
376} 381}
377 382
378 383
@@ -383,23 +388,23 @@ send_del_address(struct GNUNET_TRANSPORT_AddressIdentifier *ai)
383 * @param qh queue to add 388 * @param qh queue to add
384 */ 389 */
385static void 390static void
386send_add_queue(struct GNUNET_TRANSPORT_QueueHandle *qh) 391send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
387{ 392{
388 struct GNUNET_MQ_Envelope *env; 393 struct GNUNET_MQ_Envelope *env;
389 struct GNUNET_TRANSPORT_AddQueueMessage *aqm; 394 struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
390 395
391 if (NULL == qh->ch->mq) 396 if (NULL == qh->ch->mq)
392 return; 397 return;
393 env = GNUNET_MQ_msg_extra(aqm, 398 env = GNUNET_MQ_msg_extra (aqm,
394 strlen(qh->address) + 1, 399 strlen (qh->address) + 1,
395 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP); 400 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
396 aqm->qid = htonl(qh->queue_id); 401 aqm->qid = htonl (qh->queue_id);
397 aqm->receiver = qh->peer; 402 aqm->receiver = qh->peer;
398 aqm->nt = htonl((uint32_t)qh->nt); 403 aqm->nt = htonl ((uint32_t) qh->nt);
399 aqm->mtu = htonl(qh->mtu); 404 aqm->mtu = htonl (qh->mtu);
400 aqm->cs = htonl((uint32_t)qh->cs); 405 aqm->cs = htonl ((uint32_t) qh->cs);
401 memcpy(&aqm[1], qh->address, strlen(qh->address) + 1); 406 memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
402 GNUNET_MQ_send(qh->ch->mq, env); 407 GNUNET_MQ_send (qh->ch->mq, env);
403} 408}
404 409
405 410
@@ -410,17 +415,17 @@ send_add_queue(struct GNUNET_TRANSPORT_QueueHandle *qh)
410 * @param qh queue to delete 415 * @param qh queue to delete
411 */ 416 */
412static void 417static void
413send_del_queue(struct GNUNET_TRANSPORT_QueueHandle *qh) 418send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
414{ 419{
415 struct GNUNET_MQ_Envelope *env; 420 struct GNUNET_MQ_Envelope *env;
416 struct GNUNET_TRANSPORT_DelQueueMessage *dqm; 421 struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
417 422
418 if (NULL == qh->ch->mq) 423 if (NULL == qh->ch->mq)
419 return; 424 return;
420 env = GNUNET_MQ_msg(dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN); 425 env = GNUNET_MQ_msg (dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
421 dqm->qid = htonl(qh->queue_id); 426 dqm->qid = htonl (qh->queue_id);
422 dqm->receiver = qh->peer; 427 dqm->receiver = qh->peer;
423 GNUNET_MQ_send(qh->ch->mq, env); 428 GNUNET_MQ_send (qh->ch->mq, env);
424} 429}
425 430
426 431
@@ -433,27 +438,27 @@ send_del_queue(struct GNUNET_TRANSPORT_QueueHandle *qh)
433 * @param ch service to disconnect from 438 * @param ch service to disconnect from
434 */ 439 */
435static void 440static void
436disconnect(struct GNUNET_TRANSPORT_CommunicatorHandle *ch) 441disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
437{ 442{
438 struct FlowControl *fcn; 443 struct FlowControl *fcn;
439 struct AckPending *apn; 444 struct AckPending *apn;
440 445
441 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn) 446 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn)
442 { 447 {
443 fcn = fc->next; 448 fcn = fc->next;
444 GNUNET_CONTAINER_DLL_remove(ch->fc_head, ch->fc_tail, fc); 449 GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
445 fc->cb(fc->cb_cls, GNUNET_SYSERR); 450 fc->cb (fc->cb_cls, GNUNET_SYSERR);
446 GNUNET_free(fc); 451 GNUNET_free (fc);
447 } 452 }
448 for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn) 453 for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn)
449 { 454 {
450 apn = ap->next; 455 apn = ap->next;
451 GNUNET_CONTAINER_DLL_remove(ch->ap_head, ch->ap_tail, ap); 456 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
452 GNUNET_free(ap); 457 GNUNET_free (ap);
453 } 458 }
454 if (NULL == ch->mq) 459 if (NULL == ch->mq)
455 return; 460 return;
456 GNUNET_MQ_destroy(ch->mq); 461 GNUNET_MQ_destroy (ch->mq);
457 ch->mq = NULL; 462 ch->mq = NULL;
458} 463}
459 464
@@ -462,16 +467,16 @@ disconnect(struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
462 * Function called on MQ errors. 467 * Function called on MQ errors.
463 */ 468 */
464static void 469static void
465error_handler(void *cls, enum GNUNET_MQ_Error error) 470error_handler (void *cls, enum GNUNET_MQ_Error error)
466{ 471{
467 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 472 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
468 473
469 GNUNET_log(GNUNET_ERROR_TYPE_INFO, 474 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
470 "MQ failure %d, reconnecting to transport service.\n", 475 "MQ failure %d, reconnecting to transport service.\n",
471 error); 476 error);
472 disconnect(ch); 477 disconnect (ch);
473 /* TODO: maybe do this with exponential backoff/delay */ 478 /* TODO: maybe do this with exponential backoff/delay */
474 reconnect(ch); 479 reconnect (ch);
475} 480}
476 481
477 482
@@ -483,29 +488,29 @@ error_handler(void *cls, enum GNUNET_MQ_Error error)
483 * @param incoming_ack the ack 488 * @param incoming_ack the ack
484 */ 489 */
485static void 490static void
486handle_incoming_ack( 491handle_incoming_ack (
487 void *cls, 492 void *cls,
488 const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack) 493 const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
489{ 494{
490 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 495 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
491 496
492 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next) 497 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next)
498 {
499 if ((fc->id == incoming_ack->fc_id) &&
500 (0 == memcmp (&fc->sender,
501 &incoming_ack->sender,
502 sizeof(struct GNUNET_PeerIdentity))))
493 { 503 {
494 if ((fc->id == incoming_ack->fc_id) && 504 GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
495 (0 == memcmp(&fc->sender, 505 fc->cb (fc->cb_cls, GNUNET_OK);
496 &incoming_ack->sender, 506 GNUNET_free (fc);
497 sizeof(struct GNUNET_PeerIdentity)))) 507 return;
498 {
499 GNUNET_CONTAINER_DLL_remove(ch->fc_head, ch->fc_tail, fc);
500 fc->cb(fc->cb_cls, GNUNET_OK);
501 GNUNET_free(fc);
502 return;
503 }
504 } 508 }
505 GNUNET_break(0); 509 }
506 disconnect(ch); 510 GNUNET_break (0);
511 disconnect (ch);
507 /* TODO: maybe do this with exponential backoff/delay */ 512 /* TODO: maybe do this with exponential backoff/delay */
508 reconnect(ch); 513 reconnect (ch);
509} 514}
510 515
511 516
@@ -518,10 +523,10 @@ handle_incoming_ack(
518 * @return #GNUNET_OK if @a smt is well-formed 523 * @return #GNUNET_OK if @a smt is well-formed
519 */ 524 */
520static int 525static int
521check_create_queue(void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq) 526check_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
522{ 527{
523 (void)cls; 528 (void) cls;
524 GNUNET_MQ_check_zero_termination(cq); 529 GNUNET_MQ_check_zero_termination (cq);
525 return GNUNET_OK; 530 return GNUNET_OK;
526} 531}
527 532
@@ -533,26 +538,26 @@ check_create_queue(void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
533 * @param cq the queue creation request 538 * @param cq the queue creation request
534 */ 539 */
535static void 540static void
536handle_create_queue(void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq) 541handle_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
537{ 542{
538 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 543 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
539 const char *addr = (const char *)&cq[1]; 544 const char *addr = (const char *) &cq[1];
540 struct GNUNET_TRANSPORT_CreateQueueResponse *cqr; 545 struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
541 struct GNUNET_MQ_Envelope *env; 546 struct GNUNET_MQ_Envelope *env;
542 547
543 if (GNUNET_OK != ch->mq_init(ch->mq_init_cls, &cq->receiver, addr)) 548 if (GNUNET_OK != ch->mq_init (ch->mq_init_cls, &cq->receiver, addr))
544 { 549 {
545 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, 550 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
546 "Address `%s' invalid for this communicator\n", 551 "Address `%s' invalid for this communicator\n",
547 addr); 552 addr);
548 env = GNUNET_MQ_msg(cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL); 553 env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
549 } 554 }
550 else 555 else
551 { 556 {
552 env = GNUNET_MQ_msg(cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK); 557 env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
553 } 558 }
554 cqr->request_id = cq->request_id; 559 cqr->request_id = cq->request_id;
555 GNUNET_MQ_send(ch->mq, env); 560 GNUNET_MQ_send (ch->mq, env);
556} 561}
557 562
558 563
@@ -565,10 +570,10 @@ handle_create_queue(void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
565 * @return #GNUNET_OK if @a smt is well-formed 570 * @return #GNUNET_OK if @a smt is well-formed
566 */ 571 */
567static int 572static int
568check_send_msg(void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt) 573check_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
569{ 574{
570 (void)cls; 575 (void) cls;
571 GNUNET_MQ_check_boxed_message(smt); 576 GNUNET_MQ_check_boxed_message (smt);
572 return GNUNET_OK; 577 return GNUNET_OK;
573} 578}
574 579
@@ -583,19 +588,19 @@ check_send_msg(void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
583 * @param mid message that the ack is about 588 * @param mid message that the ack is about
584 */ 589 */
585static void 590static void
586send_ack(struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 591send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
587 int status, 592 int status,
588 const struct GNUNET_PeerIdentity *receiver, 593 const struct GNUNET_PeerIdentity *receiver,
589 uint64_t mid) 594 uint64_t mid)
590{ 595{
591 struct GNUNET_MQ_Envelope *env; 596 struct GNUNET_MQ_Envelope *env;
592 struct GNUNET_TRANSPORT_SendMessageToAck *ack; 597 struct GNUNET_TRANSPORT_SendMessageToAck *ack;
593 598
594 env = GNUNET_MQ_msg(ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK); 599 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
595 ack->status = htonl(status); 600 ack->status = htonl (status);
596 ack->mid = mid; 601 ack->mid = mid;
597 ack->receiver = *receiver; 602 ack->receiver = *receiver;
598 GNUNET_MQ_send(ch->mq, env); 603 GNUNET_MQ_send (ch->mq, env);
599} 604}
600 605
601 606
@@ -606,14 +611,14 @@ send_ack(struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
606 * @param cls an `struct AckPending *` 611 * @param cls an `struct AckPending *`
607 */ 612 */
608static void 613static void
609send_ack_cb(void *cls) 614send_ack_cb (void *cls)
610{ 615{
611 struct AckPending *ap = cls; 616 struct AckPending *ap = cls;
612 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch; 617 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
613 618
614 GNUNET_CONTAINER_DLL_remove(ch->ap_head, ch->ap_tail, ap); 619 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
615 send_ack(ch, GNUNET_OK, &ap->receiver, ap->mid); 620 send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid);
616 GNUNET_free(ap); 621 GNUNET_free (ap);
617} 622}
618 623
619 624
@@ -624,7 +629,7 @@ send_ack_cb(void *cls)
624 * @param smt the transmission request 629 * @param smt the transmission request
625 */ 630 */
626static void 631static void
627handle_send_msg(void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt) 632handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
628{ 633{
629 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 634 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
630 const struct GNUNET_MessageHeader *mh; 635 const struct GNUNET_MessageHeader *mh;
@@ -634,27 +639,27 @@ handle_send_msg(void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
634 639
635 for (qh = ch->queue_head; NULL != qh; qh = qh->next) 640 for (qh = ch->queue_head; NULL != qh; qh = qh->next)
636 if ((qh->queue_id == smt->qid) && 641 if ((qh->queue_id == smt->qid) &&
637 (0 == memcmp(&qh->peer, 642 (0 == memcmp (&qh->peer,
638 &smt->receiver, 643 &smt->receiver,
639 sizeof(struct GNUNET_PeerIdentity)))) 644 sizeof(struct GNUNET_PeerIdentity))))
640 break; 645 break;
641 if (NULL == qh) 646 if (NULL == qh)
642 { 647 {
643 /* queue is already gone, tell transport this one failed */ 648 /* queue is already gone, tell transport this one failed */
644 GNUNET_log(GNUNET_ERROR_TYPE_INFO, 649 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
645 "Transmission failed, queue no longer exists.\n"); 650 "Transmission failed, queue no longer exists.\n");
646 send_ack(ch, GNUNET_NO, &smt->receiver, smt->mid); 651 send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid);
647 return; 652 return;
648 } 653 }
649 ap = GNUNET_new(struct AckPending); 654 ap = GNUNET_new (struct AckPending);
650 ap->ch = ch; 655 ap->ch = ch;
651 ap->receiver = smt->receiver; 656 ap->receiver = smt->receiver;
652 ap->mid = smt->mid; 657 ap->mid = smt->mid;
653 GNUNET_CONTAINER_DLL_insert(ch->ap_head, ch->ap_tail, ap); 658 GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap);
654 mh = (const struct GNUNET_MessageHeader *)&smt[1]; 659 mh = (const struct GNUNET_MessageHeader *) &smt[1];
655 env = GNUNET_MQ_msg_copy(mh); 660 env = GNUNET_MQ_msg_copy (mh);
656 GNUNET_MQ_notify_sent(env, &send_ack_cb, ap); 661 GNUNET_MQ_notify_sent (env, &send_ack_cb, ap);
657 GNUNET_MQ_send(qh->mq, env); 662 GNUNET_MQ_send (qh->mq, env);
658} 663}
659 664
660 665
@@ -667,12 +672,12 @@ handle_send_msg(void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
667 * @return #GNUNET_OK if @a smt is well-formed 672 * @return #GNUNET_OK if @a smt is well-formed
668 */ 673 */
669static int 674static int
670check_backchannel_incoming( 675check_backchannel_incoming (
671 void *cls, 676 void *cls,
672 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi) 677 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
673{ 678{
674 (void)cls; 679 (void) cls;
675 GNUNET_MQ_check_boxed_message(bi); 680 GNUNET_MQ_check_boxed_message (bi);
676 return GNUNET_OK; 681 return GNUNET_OK;
677} 682}
678 683
@@ -684,20 +689,20 @@ check_backchannel_incoming(
684 * @param bi the backchannel message 689 * @param bi the backchannel message
685 */ 690 */
686static void 691static void
687handle_backchannel_incoming( 692handle_backchannel_incoming (
688 void *cls, 693 void *cls,
689 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi) 694 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
690{ 695{
691 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 696 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
692 697
693 if (NULL != ch->notify_cb) 698 if (NULL != ch->notify_cb)
694 ch->notify_cb(ch->notify_cb_cls, 699 ch->notify_cb (ch->notify_cb_cls,
695 &bi->pid, 700 &bi->pid,
696 (const struct GNUNET_MessageHeader *)&bi[1]); 701 (const struct GNUNET_MessageHeader *) &bi[1]);
697 else 702 else
698 GNUNET_log( 703 GNUNET_log (
699 GNUNET_ERROR_TYPE_INFO, 704 GNUNET_ERROR_TYPE_INFO,
700 _("Dropped backchanel message: handler not provided by communicator\n")); 705 _ ("Dropped backchanel message: handler not provided by communicator\n"));
701} 706}
702 707
703 708
@@ -707,46 +712,46 @@ handle_backchannel_incoming(
707 * @param ch handle to reconnect 712 * @param ch handle to reconnect
708 */ 713 */
709static void 714static void
710reconnect(struct GNUNET_TRANSPORT_CommunicatorHandle *ch) 715reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
711{ 716{
712 struct GNUNET_MQ_MessageHandler handlers[] = 717 struct GNUNET_MQ_MessageHandler handlers[] =
713 { GNUNET_MQ_hd_fixed_size(incoming_ack, 718 { GNUNET_MQ_hd_fixed_size (incoming_ack,
714 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK, 719 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
715 struct GNUNET_TRANSPORT_IncomingMessageAck, 720 struct GNUNET_TRANSPORT_IncomingMessageAck,
716 ch), 721 ch),
717 GNUNET_MQ_hd_var_size(create_queue, 722 GNUNET_MQ_hd_var_size (create_queue,
718 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE, 723 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
719 struct GNUNET_TRANSPORT_CreateQueue, 724 struct GNUNET_TRANSPORT_CreateQueue,
720 ch), 725 ch),
721 GNUNET_MQ_hd_var_size(send_msg, 726 GNUNET_MQ_hd_var_size (send_msg,
722 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG, 727 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
723 struct GNUNET_TRANSPORT_SendMessageTo, 728 struct GNUNET_TRANSPORT_SendMessageTo,
724 ch), 729 ch),
725 GNUNET_MQ_hd_var_size( 730 GNUNET_MQ_hd_var_size (
726 backchannel_incoming, 731 backchannel_incoming,
727 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING, 732 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING,
728 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming, 733 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming,
729 ch), 734 ch),
730 GNUNET_MQ_handler_end() }; 735 GNUNET_MQ_handler_end () };
731 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam; 736 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
732 struct GNUNET_MQ_Envelope *env; 737 struct GNUNET_MQ_Envelope *env;
733 738
734 ch->mq = 739 ch->mq =
735 GNUNET_CLIENT_connect(ch->cfg, "transport", handlers, &error_handler, ch); 740 GNUNET_CLIENT_connect (ch->cfg, "transport", handlers, &error_handler, ch);
736 if (NULL == ch->mq) 741 if (NULL == ch->mq)
737 return; 742 return;
738 env = GNUNET_MQ_msg_extra(cam, 743 env = GNUNET_MQ_msg_extra (cam,
739 strlen(ch->addr_prefix) + 1, 744 strlen (ch->addr_prefix) + 1,
740 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR); 745 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
741 cam->cc = htonl((uint32_t)ch->cc); 746 cam->cc = htonl ((uint32_t) ch->cc);
742 memcpy(&cam[1], ch->addr_prefix, strlen(ch->addr_prefix) + 1); 747 memcpy (&cam[1], ch->addr_prefix, strlen (ch->addr_prefix) + 1);
743 GNUNET_MQ_send(ch->mq, env); 748 GNUNET_MQ_send (ch->mq, env);
744 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai; 749 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai;
745 ai = ai->next) 750 ai = ai->next)
746 send_add_address(ai); 751 send_add_address (ai);
747 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh; 752 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh;
748 qh = qh->next) 753 qh = qh->next)
749 send_add_queue(qh); 754 send_add_queue (qh);
750} 755}
751 756
752 757
@@ -769,7 +774,7 @@ reconnect(struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
769 * @return NULL on error 774 * @return NULL on error
770 */ 775 */
771struct GNUNET_TRANSPORT_CommunicatorHandle * 776struct GNUNET_TRANSPORT_CommunicatorHandle *
772GNUNET_TRANSPORT_communicator_connect( 777GNUNET_TRANSPORT_communicator_connect (
773 const struct GNUNET_CONFIGURATION_Handle *cfg, 778 const struct GNUNET_CONFIGURATION_Handle *cfg,
774 const char *config_section, 779 const char *config_section,
775 const char *addr_prefix, 780 const char *addr_prefix,
@@ -781,7 +786,7 @@ GNUNET_TRANSPORT_communicator_connect(
781{ 786{
782 struct GNUNET_TRANSPORT_CommunicatorHandle *ch; 787 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
783 788
784 ch = GNUNET_new(struct GNUNET_TRANSPORT_CommunicatorHandle); 789 ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
785 ch->cfg = cfg; 790 ch->cfg = cfg;
786 ch->config_section = config_section; 791 ch->config_section = config_section;
787 ch->addr_prefix = addr_prefix; 792 ch->addr_prefix = addr_prefix;
@@ -790,18 +795,18 @@ GNUNET_TRANSPORT_communicator_connect(
790 ch->notify_cb = notify_cb; 795 ch->notify_cb = notify_cb;
791 ch->notify_cb_cls = notify_cb_cls; 796 ch->notify_cb_cls = notify_cb_cls;
792 ch->cc = cc; 797 ch->cc = cc;
793 reconnect(ch); 798 reconnect (ch);
794 if (GNUNET_OK != 799 if (GNUNET_OK !=
795 GNUNET_CONFIGURATION_get_value_number(cfg, 800 GNUNET_CONFIGURATION_get_value_number (cfg,
796 config_section, 801 config_section,
797 "MAX_QUEUE_LENGTH", 802 "MAX_QUEUE_LENGTH",
798 &ch->max_queue_length)) 803 &ch->max_queue_length))
799 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; 804 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
800 if (NULL == ch->mq) 805 if (NULL == ch->mq)
801 { 806 {
802 GNUNET_free(ch); 807 GNUNET_free (ch);
803 return NULL; 808 return NULL;
804 } 809 }
805 return ch; 810 return ch;
806} 811}
807 812
@@ -812,16 +817,16 @@ GNUNET_TRANSPORT_communicator_connect(
812 * @param ch handle returned from connect 817 * @param ch handle returned from connect
813 */ 818 */
814void 819void
815GNUNET_TRANSPORT_communicator_disconnect( 820GNUNET_TRANSPORT_communicator_disconnect (
816 struct GNUNET_TRANSPORT_CommunicatorHandle *ch) 821 struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
817{ 822{
818 disconnect(ch); 823 disconnect (ch);
819 while (NULL != ch->ai_head) 824 while (NULL != ch->ai_head)
820 { 825 {
821 GNUNET_break(0); /* communicator forgot to remove address, warn! */ 826 GNUNET_break (0); /* communicator forgot to remove address, warn! */
822 GNUNET_TRANSPORT_communicator_address_remove(ch->ai_head); 827 GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
823 } 828 }
824 GNUNET_free(ch); 829 GNUNET_free (ch);
825} 830}
826 831
827 832
@@ -849,7 +854,7 @@ GNUNET_TRANSPORT_communicator_disconnect(
849 * the tranport service is not yet up 854 * the tranport service is not yet up
850 */ 855 */
851int 856int
852GNUNET_TRANSPORT_communicator_receive( 857GNUNET_TRANSPORT_communicator_receive (
853 struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 858 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
854 const struct GNUNET_PeerIdentity *sender, 859 const struct GNUNET_PeerIdentity *sender,
855 const struct GNUNET_MessageHeader *msg, 860 const struct GNUNET_MessageHeader *msg,
@@ -863,41 +868,41 @@ GNUNET_TRANSPORT_communicator_receive(
863 868
864 if (NULL == ch->mq) 869 if (NULL == ch->mq)
865 return GNUNET_SYSERR; 870 return GNUNET_SYSERR;
866 if ((NULL == cb) && (GNUNET_MQ_get_length(ch->mq) >= ch->max_queue_length)) 871 if ((NULL == cb) && (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length))
867 { 872 {
868 GNUNET_log( 873 GNUNET_log (
869 GNUNET_ERROR_TYPE_WARNING, 874 GNUNET_ERROR_TYPE_WARNING,
870 "Dropping message: transprot is too slow, queue length %llu exceeded\n", 875 "Dropping message: transprot is too slow, queue length %llu exceeded\n",
871 ch->max_queue_length); 876 ch->max_queue_length);
872 return GNUNET_NO; 877 return GNUNET_NO;
873 } 878 }
874 879
875 msize = ntohs(msg->size); 880 msize = ntohs (msg->size);
876 env = 881 env =
877 GNUNET_MQ_msg_extra(im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG); 882 GNUNET_MQ_msg_extra (im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
878 if (NULL == env) 883 if (NULL == env)
879 { 884 {
880 GNUNET_break(0); 885 GNUNET_break (0);
881 return GNUNET_SYSERR; 886 return GNUNET_SYSERR;
882 } 887 }
883 im->expected_address_validity = 888 im->expected_address_validity =
884 GNUNET_TIME_relative_hton(expected_addr_validity); 889 GNUNET_TIME_relative_hton (expected_addr_validity);
885 im->sender = *sender; 890 im->sender = *sender;
886 memcpy(&im[1], msg, msize); 891 memcpy (&im[1], msg, msize);
887 if (NULL != cb) 892 if (NULL != cb)
888 { 893 {
889 struct FlowControl *fc; 894 struct FlowControl *fc;
890 895
891 im->fc_on = htonl(GNUNET_YES); 896 im->fc_on = htonl (GNUNET_YES);
892 im->fc_id = ch->fc_gen++; 897 im->fc_id = ch->fc_gen++;
893 fc = GNUNET_new(struct FlowControl); 898 fc = GNUNET_new (struct FlowControl);
894 fc->sender = *sender; 899 fc->sender = *sender;
895 fc->id = im->fc_id; 900 fc->id = im->fc_id;
896 fc->cb = cb; 901 fc->cb = cb;
897 fc->cb_cls = cb_cls; 902 fc->cb_cls = cb_cls;
898 GNUNET_CONTAINER_DLL_insert(ch->fc_head, ch->fc_tail, fc); 903 GNUNET_CONTAINER_DLL_insert (ch->fc_head, ch->fc_tail, fc);
899 } 904 }
900 GNUNET_MQ_send(ch->mq, env); 905 GNUNET_MQ_send (ch->mq, env);
901 return GNUNET_OK; 906 return GNUNET_OK;
902} 907}
903 908
@@ -922,7 +927,7 @@ GNUNET_TRANSPORT_communicator_receive(
922 * @return API handle identifying the new MQ 927 * @return API handle identifying the new MQ
923 */ 928 */
924struct GNUNET_TRANSPORT_QueueHandle * 929struct GNUNET_TRANSPORT_QueueHandle *
925GNUNET_TRANSPORT_communicator_mq_add( 930GNUNET_TRANSPORT_communicator_mq_add (
926 struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 931 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
927 const struct GNUNET_PeerIdentity *peer, 932 const struct GNUNET_PeerIdentity *peer,
928 const char *address, 933 const char *address,
@@ -933,17 +938,17 @@ GNUNET_TRANSPORT_communicator_mq_add(
933{ 938{
934 struct GNUNET_TRANSPORT_QueueHandle *qh; 939 struct GNUNET_TRANSPORT_QueueHandle *qh;
935 940
936 qh = GNUNET_new(struct GNUNET_TRANSPORT_QueueHandle); 941 qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
937 qh->ch = ch; 942 qh->ch = ch;
938 qh->peer = *peer; 943 qh->peer = *peer;
939 qh->address = GNUNET_strdup(address); 944 qh->address = GNUNET_strdup (address);
940 qh->nt = nt; 945 qh->nt = nt;
941 qh->mtu = mtu; 946 qh->mtu = mtu;
942 qh->cs = cs; 947 qh->cs = cs;
943 qh->mq = mq; 948 qh->mq = mq;
944 qh->queue_id = ch->queue_gen++; 949 qh->queue_id = ch->queue_gen++;
945 GNUNET_CONTAINER_DLL_insert(ch->queue_head, ch->queue_tail, qh); 950 GNUNET_CONTAINER_DLL_insert (ch->queue_head, ch->queue_tail, qh);
946 send_add_queue(qh); 951 send_add_queue (qh);
947 return qh; 952 return qh;
948} 953}
949 954
@@ -955,15 +960,15 @@ GNUNET_TRANSPORT_communicator_mq_add(
955 * @param qh handle for the queue that must be invalidated 960 * @param qh handle for the queue that must be invalidated
956 */ 961 */
957void 962void
958GNUNET_TRANSPORT_communicator_mq_del(struct GNUNET_TRANSPORT_QueueHandle *qh) 963GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
959{ 964{
960 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch; 965 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
961 966
962 send_del_queue(qh); 967 send_del_queue (qh);
963 GNUNET_CONTAINER_DLL_remove(ch->queue_head, ch->queue_tail, qh); 968 GNUNET_CONTAINER_DLL_remove (ch->queue_head, ch->queue_tail, qh);
964 GNUNET_MQ_destroy(qh->mq); 969 GNUNET_MQ_destroy (qh->mq);
965 GNUNET_free(qh->address); 970 GNUNET_free (qh->address);
966 GNUNET_free(qh); 971 GNUNET_free (qh);
967} 972}
968 973
969 974
@@ -977,7 +982,7 @@ GNUNET_TRANSPORT_communicator_mq_del(struct GNUNET_TRANSPORT_QueueHandle *qh)
977 * @param expiration when does the communicator forsee this address expiring? 982 * @param expiration when does the communicator forsee this address expiring?
978 */ 983 */
979struct GNUNET_TRANSPORT_AddressIdentifier * 984struct GNUNET_TRANSPORT_AddressIdentifier *
980GNUNET_TRANSPORT_communicator_address_add( 985GNUNET_TRANSPORT_communicator_address_add (
981 struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 986 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
982 const char *address, 987 const char *address,
983 enum GNUNET_NetworkType nt, 988 enum GNUNET_NetworkType nt,
@@ -985,14 +990,14 @@ GNUNET_TRANSPORT_communicator_address_add(
985{ 990{
986 struct GNUNET_TRANSPORT_AddressIdentifier *ai; 991 struct GNUNET_TRANSPORT_AddressIdentifier *ai;
987 992
988 ai = GNUNET_new(struct GNUNET_TRANSPORT_AddressIdentifier); 993 ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
989 ai->ch = ch; 994 ai->ch = ch;
990 ai->address = GNUNET_strdup(address); 995 ai->address = GNUNET_strdup (address);
991 ai->nt = nt; 996 ai->nt = nt;
992 ai->expiration = expiration; 997 ai->expiration = expiration;
993 ai->aid = ch->aid_gen++; 998 ai->aid = ch->aid_gen++;
994 GNUNET_CONTAINER_DLL_insert(ch->ai_head, ch->ai_tail, ai); 999 GNUNET_CONTAINER_DLL_insert (ch->ai_head, ch->ai_tail, ai);
995 send_add_address(ai); 1000 send_add_address (ai);
996 return ai; 1001 return ai;
997} 1002}
998 1003
@@ -1004,15 +1009,15 @@ GNUNET_TRANSPORT_communicator_address_add(
1004 * @param ai address that is no longer provided 1009 * @param ai address that is no longer provided
1005 */ 1010 */
1006void 1011void
1007GNUNET_TRANSPORT_communicator_address_remove( 1012GNUNET_TRANSPORT_communicator_address_remove (
1008 struct GNUNET_TRANSPORT_AddressIdentifier *ai) 1013 struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1009{ 1014{
1010 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch; 1015 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1011 1016
1012 send_del_address(ai); 1017 send_del_address (ai);
1013 GNUNET_CONTAINER_DLL_remove(ch->ai_head, ch->ai_tail, ai); 1018 GNUNET_CONTAINER_DLL_remove (ch->ai_head, ch->ai_tail, ai);
1014 GNUNET_free(ai->address); 1019 GNUNET_free (ai->address);
1015 GNUNET_free(ai); 1020 GNUNET_free (ai);
1016} 1021}
1017 1022
1018 1023
@@ -1034,7 +1039,7 @@ GNUNET_TRANSPORT_communicator_address_remove(
1034 * notify-API to @a pid's communicator @a comm 1039 * notify-API to @a pid's communicator @a comm
1035 */ 1040 */
1036void 1041void
1037GNUNET_TRANSPORT_communicator_notify( 1042GNUNET_TRANSPORT_communicator_notify (
1038 struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 1043 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1039 const struct GNUNET_PeerIdentity *pid, 1044 const struct GNUNET_PeerIdentity *pid,
1040 const char *comm, 1045 const char *comm,
@@ -1042,18 +1047,18 @@ GNUNET_TRANSPORT_communicator_notify(
1042{ 1047{
1043 struct GNUNET_MQ_Envelope *env; 1048 struct GNUNET_MQ_Envelope *env;
1044 struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb; 1049 struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
1045 size_t slen = strlen(comm) + 1; 1050 size_t slen = strlen (comm) + 1;
1046 uint16_t mlen = ntohs(header->size); 1051 uint16_t mlen = ntohs (header->size);
1047 1052
1048 GNUNET_assert(mlen + slen + sizeof(*cb) < UINT16_MAX); 1053 GNUNET_assert (mlen + slen + sizeof(*cb) < UINT16_MAX);
1049 env = 1054 env =
1050 GNUNET_MQ_msg_extra(cb, 1055 GNUNET_MQ_msg_extra (cb,
1051 slen + mlen, 1056 slen + mlen,
1052 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL); 1057 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
1053 cb->pid = *pid; 1058 cb->pid = *pid;
1054 memcpy(&cb[1], header, mlen); 1059 memcpy (&cb[1], header, mlen);
1055 memcpy(((char *)&cb[1]) + mlen, comm, slen); 1060 memcpy (((char *) &cb[1]) + mlen, comm, slen);
1056 GNUNET_MQ_send(ch->mq, env); 1061 GNUNET_MQ_send (ch->mq, env);
1057} 1062}
1058 1063
1059 1064