aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api2_communication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport_api2_communication.c')
-rw-r--r--src/transport/transport_api2_communication.c465
1 files changed, 229 insertions, 236 deletions
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
index 8bc18845b..f9c387a87 100644
--- a/src/transport/transport_api2_communication.c
+++ b/src/transport/transport_api2_communication.c
@@ -16,7 +16,7 @@
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
18 SPDX-License-Identifier: AGPL3.0-or-later 18 SPDX-License-Identifier: AGPL3.0-or-later
19*/ 19 */
20 20
21/** 21/**
22 * @file transport/transport_api2_communication.c 22 * @file transport/transport_api2_communication.c
@@ -42,8 +42,7 @@
42/** 42/**
43 * Information we track per packet to enable flow control. 43 * Information we track per packet to enable flow control.
44 */ 44 */
45struct FlowControl 45struct FlowControl {
46{
47 /** 46 /**
48 * Kept in a DLL. 47 * Kept in a DLL.
49 */ 48 */
@@ -80,8 +79,7 @@ struct FlowControl
80 * Information we track per message to tell the transport about 79 * Information we track per message to tell the transport about
81 * success or failures. 80 * success or failures.
82 */ 81 */
83struct AckPending 82struct AckPending {
84{
85 /** 83 /**
86 * Kept in a DLL. 84 * Kept in a DLL.
87 */ 85 */
@@ -112,8 +110,7 @@ struct AckPending
112/** 110/**
113 * Opaque handle to the transport service for communicators. 111 * Opaque handle to the transport service for communicators.
114 */ 112 */
115struct GNUNET_TRANSPORT_CommunicatorHandle 113struct GNUNET_TRANSPORT_CommunicatorHandle {
116{
117 /** 114 /**
118 * Head of DLL of addresses this communicator offers to the transport service. 115 * Head of DLL of addresses this communicator offers to the transport service.
119 */ 116 */
@@ -229,9 +226,7 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
229 * Handle returned to identify the internal data structure the transport 226 * Handle returned to identify the internal data structure the transport
230 * API has created to manage a message queue to a particular peer. 227 * API has created to manage a message queue to a particular peer.
231 */ 228 */
232struct GNUNET_TRANSPORT_QueueHandle 229struct GNUNET_TRANSPORT_QueueHandle {
233{
234
235 /** 230 /**
236 * Kept in a DLL. 231 * Kept in a DLL.
237 */ 232 */
@@ -288,9 +283,7 @@ struct GNUNET_TRANSPORT_QueueHandle
288 * Internal representation of an address a communicator is 283 * Internal representation of an address a communicator is
289 * currently providing for the transport service. 284 * currently providing for the transport service.
290 */ 285 */
291struct GNUNET_TRANSPORT_AddressIdentifier 286struct GNUNET_TRANSPORT_AddressIdentifier {
292{
293
294 /** 287 /**
295 * Kept in a DLL. 288 * Kept in a DLL.
296 */ 289 */
@@ -336,7 +329,7 @@ struct GNUNET_TRANSPORT_AddressIdentifier
336 * @param ch handle to reconnect 329 * @param ch handle to reconnect
337 */ 330 */
338static void 331static void
339reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch); 332reconnect(struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
340 333
341 334
342/** 335/**
@@ -346,20 +339,20 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
346 * @param ai address to add 339 * @param ai address to add
347 */ 340 */
348static void 341static void
349send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai) 342send_add_address(struct GNUNET_TRANSPORT_AddressIdentifier *ai)
350{ 343{
351 struct GNUNET_MQ_Envelope *env; 344 struct GNUNET_MQ_Envelope *env;
352 struct GNUNET_TRANSPORT_AddAddressMessage *aam; 345 struct GNUNET_TRANSPORT_AddAddressMessage *aam;
353 346
354 if (NULL == ai->ch->mq) 347 if (NULL == ai->ch->mq)
355 return; 348 return;
356 env = GNUNET_MQ_msg_extra (aam, 349 env = GNUNET_MQ_msg_extra(aam,
357 strlen (ai->address) + 1, 350 strlen(ai->address) + 1,
358 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS); 351 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
359 aam->expiration = GNUNET_TIME_relative_hton (ai->expiration); 352 aam->expiration = GNUNET_TIME_relative_hton(ai->expiration);
360 aam->nt = htonl ((uint32_t) ai->nt); 353 aam->nt = htonl((uint32_t)ai->nt);
361 memcpy (&aam[1], ai->address, strlen (ai->address) + 1); 354 memcpy(&aam[1], ai->address, strlen(ai->address) + 1);
362 GNUNET_MQ_send (ai->ch->mq, env); 355 GNUNET_MQ_send(ai->ch->mq, env);
363} 356}
364 357
365 358
@@ -370,16 +363,16 @@ send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
370 * @param ai address to delete 363 * @param ai address to delete
371 */ 364 */
372static void 365static void
373send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai) 366send_del_address(struct GNUNET_TRANSPORT_AddressIdentifier *ai)
374{ 367{
375 struct GNUNET_MQ_Envelope *env; 368 struct GNUNET_MQ_Envelope *env;
376 struct GNUNET_TRANSPORT_DelAddressMessage *dam; 369 struct GNUNET_TRANSPORT_DelAddressMessage *dam;
377 370
378 if (NULL == ai->ch->mq) 371 if (NULL == ai->ch->mq)
379 return; 372 return;
380 env = GNUNET_MQ_msg (dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS); 373 env = GNUNET_MQ_msg(dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
381 dam->aid = htonl (ai->aid); 374 dam->aid = htonl(ai->aid);
382 GNUNET_MQ_send (ai->ch->mq, env); 375 GNUNET_MQ_send(ai->ch->mq, env);
383} 376}
384 377
385 378
@@ -390,23 +383,23 @@ send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
390 * @param qh queue to add 383 * @param qh queue to add
391 */ 384 */
392static void 385static void
393send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) 386send_add_queue(struct GNUNET_TRANSPORT_QueueHandle *qh)
394{ 387{
395 struct GNUNET_MQ_Envelope *env; 388 struct GNUNET_MQ_Envelope *env;
396 struct GNUNET_TRANSPORT_AddQueueMessage *aqm; 389 struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
397 390
398 if (NULL == qh->ch->mq) 391 if (NULL == qh->ch->mq)
399 return; 392 return;
400 env = GNUNET_MQ_msg_extra (aqm, 393 env = GNUNET_MQ_msg_extra(aqm,
401 strlen (qh->address) + 1, 394 strlen(qh->address) + 1,
402 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP); 395 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
403 aqm->qid = htonl (qh->queue_id); 396 aqm->qid = htonl(qh->queue_id);
404 aqm->receiver = qh->peer; 397 aqm->receiver = qh->peer;
405 aqm->nt = htonl ((uint32_t) qh->nt); 398 aqm->nt = htonl((uint32_t)qh->nt);
406 aqm->mtu = htonl (qh->mtu); 399 aqm->mtu = htonl(qh->mtu);
407 aqm->cs = htonl ((uint32_t) qh->cs); 400 aqm->cs = htonl((uint32_t)qh->cs);
408 memcpy (&aqm[1], qh->address, strlen (qh->address) + 1); 401 memcpy(&aqm[1], qh->address, strlen(qh->address) + 1);
409 GNUNET_MQ_send (qh->ch->mq, env); 402 GNUNET_MQ_send(qh->ch->mq, env);
410} 403}
411 404
412 405
@@ -417,17 +410,17 @@ send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
417 * @param qh queue to delete 410 * @param qh queue to delete
418 */ 411 */
419static void 412static void
420send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh) 413send_del_queue(struct GNUNET_TRANSPORT_QueueHandle *qh)
421{ 414{
422 struct GNUNET_MQ_Envelope *env; 415 struct GNUNET_MQ_Envelope *env;
423 struct GNUNET_TRANSPORT_DelQueueMessage *dqm; 416 struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
424 417
425 if (NULL == qh->ch->mq) 418 if (NULL == qh->ch->mq)
426 return; 419 return;
427 env = GNUNET_MQ_msg (dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN); 420 env = GNUNET_MQ_msg(dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
428 dqm->qid = htonl (qh->queue_id); 421 dqm->qid = htonl(qh->queue_id);
429 dqm->receiver = qh->peer; 422 dqm->receiver = qh->peer;
430 GNUNET_MQ_send (qh->ch->mq, env); 423 GNUNET_MQ_send(qh->ch->mq, env);
431} 424}
432 425
433 426
@@ -440,27 +433,27 @@ send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
440 * @param ch service to disconnect from 433 * @param ch service to disconnect from
441 */ 434 */
442static void 435static void
443disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) 436disconnect(struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
444{ 437{
445 struct FlowControl *fcn; 438 struct FlowControl *fcn;
446 struct AckPending *apn; 439 struct AckPending *apn;
447 440
448 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn) 441 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn)
449 { 442 {
450 fcn = fc->next; 443 fcn = fc->next;
451 GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc); 444 GNUNET_CONTAINER_DLL_remove(ch->fc_head, ch->fc_tail, fc);
452 fc->cb (fc->cb_cls, GNUNET_SYSERR); 445 fc->cb(fc->cb_cls, GNUNET_SYSERR);
453 GNUNET_free (fc); 446 GNUNET_free(fc);
454 } 447 }
455 for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn) 448 for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn)
456 { 449 {
457 apn = ap->next; 450 apn = ap->next;
458 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap); 451 GNUNET_CONTAINER_DLL_remove(ch->ap_head, ch->ap_tail, ap);
459 GNUNET_free (ap); 452 GNUNET_free(ap);
460 } 453 }
461 if (NULL == ch->mq) 454 if (NULL == ch->mq)
462 return; 455 return;
463 GNUNET_MQ_destroy (ch->mq); 456 GNUNET_MQ_destroy(ch->mq);
464 ch->mq = NULL; 457 ch->mq = NULL;
465} 458}
466 459
@@ -469,16 +462,16 @@ disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
469 * Function called on MQ errors. 462 * Function called on MQ errors.
470 */ 463 */
471static void 464static void
472error_handler (void *cls, enum GNUNET_MQ_Error error) 465error_handler(void *cls, enum GNUNET_MQ_Error error)
473{ 466{
474 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 467 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
475 468
476 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 469 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
477 "MQ failure %d, reconnecting to transport service.\n", 470 "MQ failure %d, reconnecting to transport service.\n",
478 error); 471 error);
479 disconnect (ch); 472 disconnect(ch);
480 /* TODO: maybe do this with exponential backoff/delay */ 473 /* TODO: maybe do this with exponential backoff/delay */
481 reconnect (ch); 474 reconnect(ch);
482} 475}
483 476
484 477
@@ -490,29 +483,29 @@ error_handler (void *cls, enum GNUNET_MQ_Error error)
490 * @param incoming_ack the ack 483 * @param incoming_ack the ack
491 */ 484 */
492static void 485static void
493handle_incoming_ack ( 486handle_incoming_ack(
494 void *cls, 487 void *cls,
495 const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack) 488 const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
496{ 489{
497 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 490 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
498 491
499 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next) 492 for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next)
500 {
501 if ((fc->id == incoming_ack->fc_id) &&
502 (0 == memcmp (&fc->sender,
503 &incoming_ack->sender,
504 sizeof (struct GNUNET_PeerIdentity))))
505 { 493 {
506 GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc); 494 if ((fc->id == incoming_ack->fc_id) &&
507 fc->cb (fc->cb_cls, GNUNET_OK); 495 (0 == memcmp(&fc->sender,
508 GNUNET_free (fc); 496 &incoming_ack->sender,
509 return; 497 sizeof(struct GNUNET_PeerIdentity))))
498 {
499 GNUNET_CONTAINER_DLL_remove(ch->fc_head, ch->fc_tail, fc);
500 fc->cb(fc->cb_cls, GNUNET_OK);
501 GNUNET_free(fc);
502 return;
503 }
510 } 504 }
511 } 505 GNUNET_break(0);
512 GNUNET_break (0); 506 disconnect(ch);
513 disconnect (ch);
514 /* TODO: maybe do this with exponential backoff/delay */ 507 /* TODO: maybe do this with exponential backoff/delay */
515 reconnect (ch); 508 reconnect(ch);
516} 509}
517 510
518 511
@@ -525,10 +518,10 @@ handle_incoming_ack (
525 * @return #GNUNET_OK if @a smt is well-formed 518 * @return #GNUNET_OK if @a smt is well-formed
526 */ 519 */
527static int 520static int
528check_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq) 521check_create_queue(void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
529{ 522{
530 (void) cls; 523 (void)cls;
531 GNUNET_MQ_check_zero_termination (cq); 524 GNUNET_MQ_check_zero_termination(cq);
532 return GNUNET_OK; 525 return GNUNET_OK;
533} 526}
534 527
@@ -540,26 +533,26 @@ check_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
540 * @param cq the queue creation request 533 * @param cq the queue creation request
541 */ 534 */
542static void 535static void
543handle_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq) 536handle_create_queue(void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
544{ 537{
545 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 538 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
546 const char *addr = (const char *) &cq[1]; 539 const char *addr = (const char *)&cq[1];
547 struct GNUNET_TRANSPORT_CreateQueueResponse *cqr; 540 struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
548 struct GNUNET_MQ_Envelope *env; 541 struct GNUNET_MQ_Envelope *env;
549 542
550 if (GNUNET_OK != ch->mq_init (ch->mq_init_cls, &cq->receiver, addr)) 543 if (GNUNET_OK != ch->mq_init(ch->mq_init_cls, &cq->receiver, addr))
551 { 544 {
552 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 545 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
553 "Address `%s' invalid for this communicator\n", 546 "Address `%s' invalid for this communicator\n",
554 addr); 547 addr);
555 env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL); 548 env = GNUNET_MQ_msg(cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
556 } 549 }
557 else 550 else
558 { 551 {
559 env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK); 552 env = GNUNET_MQ_msg(cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
560 } 553 }
561 cqr->request_id = cq->request_id; 554 cqr->request_id = cq->request_id;
562 GNUNET_MQ_send (ch->mq, env); 555 GNUNET_MQ_send(ch->mq, env);
563} 556}
564 557
565 558
@@ -572,10 +565,10 @@ handle_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
572 * @return #GNUNET_OK if @a smt is well-formed 565 * @return #GNUNET_OK if @a smt is well-formed
573 */ 566 */
574static int 567static int
575check_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt) 568check_send_msg(void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
576{ 569{
577 (void) cls; 570 (void)cls;
578 GNUNET_MQ_check_boxed_message (smt); 571 GNUNET_MQ_check_boxed_message(smt);
579 return GNUNET_OK; 572 return GNUNET_OK;
580} 573}
581 574
@@ -590,19 +583,19 @@ check_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
590 * @param mid message that the ack is about 583 * @param mid message that the ack is about
591 */ 584 */
592static void 585static void
593send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 586send_ack(struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
594 int status, 587 int status,
595 const struct GNUNET_PeerIdentity *receiver, 588 const struct GNUNET_PeerIdentity *receiver,
596 uint64_t mid) 589 uint64_t mid)
597{ 590{
598 struct GNUNET_MQ_Envelope *env; 591 struct GNUNET_MQ_Envelope *env;
599 struct GNUNET_TRANSPORT_SendMessageToAck *ack; 592 struct GNUNET_TRANSPORT_SendMessageToAck *ack;
600 593
601 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK); 594 env = GNUNET_MQ_msg(ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
602 ack->status = htonl (status); 595 ack->status = htonl(status);
603 ack->mid = mid; 596 ack->mid = mid;
604 ack->receiver = *receiver; 597 ack->receiver = *receiver;
605 GNUNET_MQ_send (ch->mq, env); 598 GNUNET_MQ_send(ch->mq, env);
606} 599}
607 600
608 601
@@ -613,14 +606,14 @@ send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
613 * @param cls an `struct AckPending *` 606 * @param cls an `struct AckPending *`
614 */ 607 */
615static void 608static void
616send_ack_cb (void *cls) 609send_ack_cb(void *cls)
617{ 610{
618 struct AckPending *ap = cls; 611 struct AckPending *ap = cls;
619 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch; 612 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
620 613
621 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap); 614 GNUNET_CONTAINER_DLL_remove(ch->ap_head, ch->ap_tail, ap);
622 send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid); 615 send_ack(ch, GNUNET_OK, &ap->receiver, ap->mid);
623 GNUNET_free (ap); 616 GNUNET_free(ap);
624} 617}
625 618
626 619
@@ -631,7 +624,7 @@ send_ack_cb (void *cls)
631 * @param smt the transmission request 624 * @param smt the transmission request
632 */ 625 */
633static void 626static void
634handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt) 627handle_send_msg(void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
635{ 628{
636 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 629 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
637 const struct GNUNET_MessageHeader *mh; 630 const struct GNUNET_MessageHeader *mh;
@@ -641,27 +634,27 @@ handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
641 634
642 for (qh = ch->queue_head; NULL != qh; qh = qh->next) 635 for (qh = ch->queue_head; NULL != qh; qh = qh->next)
643 if ((qh->queue_id == smt->qid) && 636 if ((qh->queue_id == smt->qid) &&
644 (0 == memcmp (&qh->peer, 637 (0 == memcmp(&qh->peer,
645 &smt->receiver, 638 &smt->receiver,
646 sizeof (struct GNUNET_PeerIdentity)))) 639 sizeof(struct GNUNET_PeerIdentity))))
647 break; 640 break;
648 if (NULL == qh) 641 if (NULL == qh)
649 { 642 {
650 /* queue is already gone, tell transport this one failed */ 643 /* queue is already gone, tell transport this one failed */
651 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 644 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
652 "Transmission failed, queue no longer exists.\n"); 645 "Transmission failed, queue no longer exists.\n");
653 send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid); 646 send_ack(ch, GNUNET_NO, &smt->receiver, smt->mid);
654 return; 647 return;
655 } 648 }
656 ap = GNUNET_new (struct AckPending); 649 ap = GNUNET_new(struct AckPending);
657 ap->ch = ch; 650 ap->ch = ch;
658 ap->receiver = smt->receiver; 651 ap->receiver = smt->receiver;
659 ap->mid = smt->mid; 652 ap->mid = smt->mid;
660 GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap); 653 GNUNET_CONTAINER_DLL_insert(ch->ap_head, ch->ap_tail, ap);
661 mh = (const struct GNUNET_MessageHeader *) &smt[1]; 654 mh = (const struct GNUNET_MessageHeader *)&smt[1];
662 env = GNUNET_MQ_msg_copy (mh); 655 env = GNUNET_MQ_msg_copy(mh);
663 GNUNET_MQ_notify_sent (env, &send_ack_cb, ap); 656 GNUNET_MQ_notify_sent(env, &send_ack_cb, ap);
664 GNUNET_MQ_send (qh->mq, env); 657 GNUNET_MQ_send(qh->mq, env);
665} 658}
666 659
667 660
@@ -674,12 +667,12 @@ handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
674 * @return #GNUNET_OK if @a smt is well-formed 667 * @return #GNUNET_OK if @a smt is well-formed
675 */ 668 */
676static int 669static int
677check_backchannel_incoming ( 670check_backchannel_incoming(
678 void *cls, 671 void *cls,
679 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi) 672 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
680{ 673{
681 (void) cls; 674 (void)cls;
682 GNUNET_MQ_check_boxed_message (bi); 675 GNUNET_MQ_check_boxed_message(bi);
683 return GNUNET_OK; 676 return GNUNET_OK;
684} 677}
685 678
@@ -691,20 +684,20 @@ check_backchannel_incoming (
691 * @param bi the backchannel message 684 * @param bi the backchannel message
692 */ 685 */
693static void 686static void
694handle_backchannel_incoming ( 687handle_backchannel_incoming(
695 void *cls, 688 void *cls,
696 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi) 689 const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
697{ 690{
698 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 691 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
699 692
700 if (NULL != ch->notify_cb) 693 if (NULL != ch->notify_cb)
701 ch->notify_cb (ch->notify_cb_cls, 694 ch->notify_cb(ch->notify_cb_cls,
702 &bi->pid, 695 &bi->pid,
703 (const struct GNUNET_MessageHeader *) &bi[1]); 696 (const struct GNUNET_MessageHeader *)&bi[1]);
704 else 697 else
705 GNUNET_log ( 698 GNUNET_log(
706 GNUNET_ERROR_TYPE_INFO, 699 GNUNET_ERROR_TYPE_INFO,
707 _ ("Dropped backchanel message: handler not provided by communicator\n")); 700 _("Dropped backchanel message: handler not provided by communicator\n"));
708} 701}
709 702
710 703
@@ -714,46 +707,46 @@ handle_backchannel_incoming (
714 * @param ch handle to reconnect 707 * @param ch handle to reconnect
715 */ 708 */
716static void 709static void
717reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) 710reconnect(struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
718{ 711{
719 struct GNUNET_MQ_MessageHandler handlers[] = 712 struct GNUNET_MQ_MessageHandler handlers[] =
720 {GNUNET_MQ_hd_fixed_size (incoming_ack, 713 { GNUNET_MQ_hd_fixed_size(incoming_ack,
721 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK, 714 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
722 struct GNUNET_TRANSPORT_IncomingMessageAck, 715 struct GNUNET_TRANSPORT_IncomingMessageAck,
723 ch),
724 GNUNET_MQ_hd_var_size (create_queue,
725 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
726 struct GNUNET_TRANSPORT_CreateQueue,
727 ch),
728 GNUNET_MQ_hd_var_size (send_msg,
729 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
730 struct GNUNET_TRANSPORT_SendMessageTo,
731 ch), 716 ch),
732 GNUNET_MQ_hd_var_size ( 717 GNUNET_MQ_hd_var_size(create_queue,
733 backchannel_incoming, 718 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
734 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING, 719 struct GNUNET_TRANSPORT_CreateQueue,
735 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming, 720 ch),
736 ch), 721 GNUNET_MQ_hd_var_size(send_msg,
737 GNUNET_MQ_handler_end ()}; 722 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
723 struct GNUNET_TRANSPORT_SendMessageTo,
724 ch),
725 GNUNET_MQ_hd_var_size(
726 backchannel_incoming,
727 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING,
728 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming,
729 ch),
730 GNUNET_MQ_handler_end() };
738 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam; 731 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
739 struct GNUNET_MQ_Envelope *env; 732 struct GNUNET_MQ_Envelope *env;
740 733
741 ch->mq = 734 ch->mq =
742 GNUNET_CLIENT_connect (ch->cfg, "transport", handlers, &error_handler, ch); 735 GNUNET_CLIENT_connect(ch->cfg, "transport", handlers, &error_handler, ch);
743 if (NULL == ch->mq) 736 if (NULL == ch->mq)
744 return; 737 return;
745 env = GNUNET_MQ_msg_extra (cam, 738 env = GNUNET_MQ_msg_extra(cam,
746 strlen (ch->addr_prefix) + 1, 739 strlen(ch->addr_prefix) + 1,
747 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR); 740 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
748 cam->cc = htonl ((uint32_t) ch->cc); 741 cam->cc = htonl((uint32_t)ch->cc);
749 memcpy (&cam[1], ch->addr_prefix, strlen (ch->addr_prefix) + 1); 742 memcpy(&cam[1], ch->addr_prefix, strlen(ch->addr_prefix) + 1);
750 GNUNET_MQ_send (ch->mq, env); 743 GNUNET_MQ_send(ch->mq, env);
751 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai; 744 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai;
752 ai = ai->next) 745 ai = ai->next)
753 send_add_address (ai); 746 send_add_address(ai);
754 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh; 747 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh;
755 qh = qh->next) 748 qh = qh->next)
756 send_add_queue (qh); 749 send_add_queue(qh);
757} 750}
758 751
759 752
@@ -776,7 +769,7 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
776 * @return NULL on error 769 * @return NULL on error
777 */ 770 */
778struct GNUNET_TRANSPORT_CommunicatorHandle * 771struct GNUNET_TRANSPORT_CommunicatorHandle *
779GNUNET_TRANSPORT_communicator_connect ( 772GNUNET_TRANSPORT_communicator_connect(
780 const struct GNUNET_CONFIGURATION_Handle *cfg, 773 const struct GNUNET_CONFIGURATION_Handle *cfg,
781 const char *config_section, 774 const char *config_section,
782 const char *addr_prefix, 775 const char *addr_prefix,
@@ -788,7 +781,7 @@ GNUNET_TRANSPORT_communicator_connect (
788{ 781{
789 struct GNUNET_TRANSPORT_CommunicatorHandle *ch; 782 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
790 783
791 ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle); 784 ch = GNUNET_new(struct GNUNET_TRANSPORT_CommunicatorHandle);
792 ch->cfg = cfg; 785 ch->cfg = cfg;
793 ch->config_section = config_section; 786 ch->config_section = config_section;
794 ch->addr_prefix = addr_prefix; 787 ch->addr_prefix = addr_prefix;
@@ -797,18 +790,18 @@ GNUNET_TRANSPORT_communicator_connect (
797 ch->notify_cb = notify_cb; 790 ch->notify_cb = notify_cb;
798 ch->notify_cb_cls = notify_cb_cls; 791 ch->notify_cb_cls = notify_cb_cls;
799 ch->cc = cc; 792 ch->cc = cc;
800 reconnect (ch); 793 reconnect(ch);
801 if (GNUNET_OK != 794 if (GNUNET_OK !=
802 GNUNET_CONFIGURATION_get_value_number (cfg, 795 GNUNET_CONFIGURATION_get_value_number(cfg,
803 config_section, 796 config_section,
804 "MAX_QUEUE_LENGTH", 797 "MAX_QUEUE_LENGTH",
805 &ch->max_queue_length)) 798 &ch->max_queue_length))
806 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; 799 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
807 if (NULL == ch->mq) 800 if (NULL == ch->mq)
808 { 801 {
809 GNUNET_free (ch); 802 GNUNET_free(ch);
810 return NULL; 803 return NULL;
811 } 804 }
812 return ch; 805 return ch;
813} 806}
814 807
@@ -819,16 +812,16 @@ GNUNET_TRANSPORT_communicator_connect (
819 * @param ch handle returned from connect 812 * @param ch handle returned from connect
820 */ 813 */
821void 814void
822GNUNET_TRANSPORT_communicator_disconnect ( 815GNUNET_TRANSPORT_communicator_disconnect(
823 struct GNUNET_TRANSPORT_CommunicatorHandle *ch) 816 struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
824{ 817{
825 disconnect (ch); 818 disconnect(ch);
826 while (NULL != ch->ai_head) 819 while (NULL != ch->ai_head)
827 { 820 {
828 GNUNET_break (0); /* communicator forgot to remove address, warn! */ 821 GNUNET_break(0); /* communicator forgot to remove address, warn! */
829 GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head); 822 GNUNET_TRANSPORT_communicator_address_remove(ch->ai_head);
830 } 823 }
831 GNUNET_free (ch); 824 GNUNET_free(ch);
832} 825}
833 826
834 827
@@ -856,7 +849,7 @@ GNUNET_TRANSPORT_communicator_disconnect (
856 * the tranport service is not yet up 849 * the tranport service is not yet up
857 */ 850 */
858int 851int
859GNUNET_TRANSPORT_communicator_receive ( 852GNUNET_TRANSPORT_communicator_receive(
860 struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 853 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
861 const struct GNUNET_PeerIdentity *sender, 854 const struct GNUNET_PeerIdentity *sender,
862 const struct GNUNET_MessageHeader *msg, 855 const struct GNUNET_MessageHeader *msg,
@@ -870,41 +863,41 @@ GNUNET_TRANSPORT_communicator_receive (
870 863
871 if (NULL == ch->mq) 864 if (NULL == ch->mq)
872 return GNUNET_SYSERR; 865 return GNUNET_SYSERR;
873 if ((NULL == cb) && (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)) 866 if ((NULL == cb) && (GNUNET_MQ_get_length(ch->mq) >= ch->max_queue_length))
874 { 867 {
875 GNUNET_log ( 868 GNUNET_log(
876 GNUNET_ERROR_TYPE_WARNING, 869 GNUNET_ERROR_TYPE_WARNING,
877 "Dropping message: transprot is too slow, queue length %llu exceeded\n", 870 "Dropping message: transprot is too slow, queue length %llu exceeded\n",
878 ch->max_queue_length); 871 ch->max_queue_length);
879 return GNUNET_NO; 872 return GNUNET_NO;
880 } 873 }
881 874
882 msize = ntohs (msg->size); 875 msize = ntohs(msg->size);
883 env = 876 env =
884 GNUNET_MQ_msg_extra (im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG); 877 GNUNET_MQ_msg_extra(im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
885 if (NULL == env) 878 if (NULL == env)
886 { 879 {
887 GNUNET_break (0); 880 GNUNET_break(0);
888 return GNUNET_SYSERR; 881 return GNUNET_SYSERR;
889 } 882 }
890 im->expected_address_validity = 883 im->expected_address_validity =
891 GNUNET_TIME_relative_hton (expected_addr_validity); 884 GNUNET_TIME_relative_hton(expected_addr_validity);
892 im->sender = *sender; 885 im->sender = *sender;
893 memcpy (&im[1], msg, msize); 886 memcpy(&im[1], msg, msize);
894 if (NULL != cb) 887 if (NULL != cb)
895 { 888 {
896 struct FlowControl *fc; 889 struct FlowControl *fc;
897 890
898 im->fc_on = htonl (GNUNET_YES); 891 im->fc_on = htonl(GNUNET_YES);
899 im->fc_id = ch->fc_gen++; 892 im->fc_id = ch->fc_gen++;
900 fc = GNUNET_new (struct FlowControl); 893 fc = GNUNET_new(struct FlowControl);
901 fc->sender = *sender; 894 fc->sender = *sender;
902 fc->id = im->fc_id; 895 fc->id = im->fc_id;
903 fc->cb = cb; 896 fc->cb = cb;
904 fc->cb_cls = cb_cls; 897 fc->cb_cls = cb_cls;
905 GNUNET_CONTAINER_DLL_insert (ch->fc_head, ch->fc_tail, fc); 898 GNUNET_CONTAINER_DLL_insert(ch->fc_head, ch->fc_tail, fc);
906 } 899 }
907 GNUNET_MQ_send (ch->mq, env); 900 GNUNET_MQ_send(ch->mq, env);
908 return GNUNET_OK; 901 return GNUNET_OK;
909} 902}
910 903
@@ -929,7 +922,7 @@ GNUNET_TRANSPORT_communicator_receive (
929 * @return API handle identifying the new MQ 922 * @return API handle identifying the new MQ
930 */ 923 */
931struct GNUNET_TRANSPORT_QueueHandle * 924struct GNUNET_TRANSPORT_QueueHandle *
932GNUNET_TRANSPORT_communicator_mq_add ( 925GNUNET_TRANSPORT_communicator_mq_add(
933 struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 926 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
934 const struct GNUNET_PeerIdentity *peer, 927 const struct GNUNET_PeerIdentity *peer,
935 const char *address, 928 const char *address,
@@ -940,17 +933,17 @@ GNUNET_TRANSPORT_communicator_mq_add (
940{ 933{
941 struct GNUNET_TRANSPORT_QueueHandle *qh; 934 struct GNUNET_TRANSPORT_QueueHandle *qh;
942 935
943 qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle); 936 qh = GNUNET_new(struct GNUNET_TRANSPORT_QueueHandle);
944 qh->ch = ch; 937 qh->ch = ch;
945 qh->peer = *peer; 938 qh->peer = *peer;
946 qh->address = GNUNET_strdup (address); 939 qh->address = GNUNET_strdup(address);
947 qh->nt = nt; 940 qh->nt = nt;
948 qh->mtu = mtu; 941 qh->mtu = mtu;
949 qh->cs = cs; 942 qh->cs = cs;
950 qh->mq = mq; 943 qh->mq = mq;
951 qh->queue_id = ch->queue_gen++; 944 qh->queue_id = ch->queue_gen++;
952 GNUNET_CONTAINER_DLL_insert (ch->queue_head, ch->queue_tail, qh); 945 GNUNET_CONTAINER_DLL_insert(ch->queue_head, ch->queue_tail, qh);
953 send_add_queue (qh); 946 send_add_queue(qh);
954 return qh; 947 return qh;
955} 948}
956 949
@@ -962,15 +955,15 @@ GNUNET_TRANSPORT_communicator_mq_add (
962 * @param qh handle for the queue that must be invalidated 955 * @param qh handle for the queue that must be invalidated
963 */ 956 */
964void 957void
965GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh) 958GNUNET_TRANSPORT_communicator_mq_del(struct GNUNET_TRANSPORT_QueueHandle *qh)
966{ 959{
967 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch; 960 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
968 961
969 send_del_queue (qh); 962 send_del_queue(qh);
970 GNUNET_CONTAINER_DLL_remove (ch->queue_head, ch->queue_tail, qh); 963 GNUNET_CONTAINER_DLL_remove(ch->queue_head, ch->queue_tail, qh);
971 GNUNET_MQ_destroy (qh->mq); 964 GNUNET_MQ_destroy(qh->mq);
972 GNUNET_free (qh->address); 965 GNUNET_free(qh->address);
973 GNUNET_free (qh); 966 GNUNET_free(qh);
974} 967}
975 968
976 969
@@ -984,7 +977,7 @@ GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
984 * @param expiration when does the communicator forsee this address expiring? 977 * @param expiration when does the communicator forsee this address expiring?
985 */ 978 */
986struct GNUNET_TRANSPORT_AddressIdentifier * 979struct GNUNET_TRANSPORT_AddressIdentifier *
987GNUNET_TRANSPORT_communicator_address_add ( 980GNUNET_TRANSPORT_communicator_address_add(
988 struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 981 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
989 const char *address, 982 const char *address,
990 enum GNUNET_NetworkType nt, 983 enum GNUNET_NetworkType nt,
@@ -992,14 +985,14 @@ GNUNET_TRANSPORT_communicator_address_add (
992{ 985{
993 struct GNUNET_TRANSPORT_AddressIdentifier *ai; 986 struct GNUNET_TRANSPORT_AddressIdentifier *ai;
994 987
995 ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier); 988 ai = GNUNET_new(struct GNUNET_TRANSPORT_AddressIdentifier);
996 ai->ch = ch; 989 ai->ch = ch;
997 ai->address = GNUNET_strdup (address); 990 ai->address = GNUNET_strdup(address);
998 ai->nt = nt; 991 ai->nt = nt;
999 ai->expiration = expiration; 992 ai->expiration = expiration;
1000 ai->aid = ch->aid_gen++; 993 ai->aid = ch->aid_gen++;
1001 GNUNET_CONTAINER_DLL_insert (ch->ai_head, ch->ai_tail, ai); 994 GNUNET_CONTAINER_DLL_insert(ch->ai_head, ch->ai_tail, ai);
1002 send_add_address (ai); 995 send_add_address(ai);
1003 return ai; 996 return ai;
1004} 997}
1005 998
@@ -1011,15 +1004,15 @@ GNUNET_TRANSPORT_communicator_address_add (
1011 * @param ai address that is no longer provided 1004 * @param ai address that is no longer provided
1012 */ 1005 */
1013void 1006void
1014GNUNET_TRANSPORT_communicator_address_remove ( 1007GNUNET_TRANSPORT_communicator_address_remove(
1015 struct GNUNET_TRANSPORT_AddressIdentifier *ai) 1008 struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1016{ 1009{
1017 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch; 1010 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1018 1011
1019 send_del_address (ai); 1012 send_del_address(ai);
1020 GNUNET_CONTAINER_DLL_remove (ch->ai_head, ch->ai_tail, ai); 1013 GNUNET_CONTAINER_DLL_remove(ch->ai_head, ch->ai_tail, ai);
1021 GNUNET_free (ai->address); 1014 GNUNET_free(ai->address);
1022 GNUNET_free (ai); 1015 GNUNET_free(ai);
1023} 1016}
1024 1017
1025 1018
@@ -1041,7 +1034,7 @@ GNUNET_TRANSPORT_communicator_address_remove (
1041 * notify-API to @a pid's communicator @a comm 1034 * notify-API to @a pid's communicator @a comm
1042 */ 1035 */
1043void 1036void
1044GNUNET_TRANSPORT_communicator_notify ( 1037GNUNET_TRANSPORT_communicator_notify(
1045 struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 1038 struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1046 const struct GNUNET_PeerIdentity *pid, 1039 const struct GNUNET_PeerIdentity *pid,
1047 const char *comm, 1040 const char *comm,
@@ -1049,18 +1042,18 @@ GNUNET_TRANSPORT_communicator_notify (
1049{ 1042{
1050 struct GNUNET_MQ_Envelope *env; 1043 struct GNUNET_MQ_Envelope *env;
1051 struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb; 1044 struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
1052 size_t slen = strlen (comm) + 1; 1045 size_t slen = strlen(comm) + 1;
1053 uint16_t mlen = ntohs (header->size); 1046 uint16_t mlen = ntohs(header->size);
1054 1047
1055 GNUNET_assert (mlen + slen + sizeof (*cb) < UINT16_MAX); 1048 GNUNET_assert(mlen + slen + sizeof(*cb) < UINT16_MAX);
1056 env = 1049 env =
1057 GNUNET_MQ_msg_extra (cb, 1050 GNUNET_MQ_msg_extra(cb,
1058 slen + mlen, 1051 slen + mlen,
1059 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL); 1052 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
1060 cb->pid = *pid; 1053 cb->pid = *pid;
1061 memcpy (&cb[1], header, mlen); 1054 memcpy(&cb[1], header, mlen);
1062 memcpy (((char *) &cb[1]) + mlen, comm, slen); 1055 memcpy(((char *)&cb[1]) + mlen, comm, slen);
1063 GNUNET_MQ_send (ch->mq, env); 1056 GNUNET_MQ_send(ch->mq, env);
1064} 1057}
1065 1058
1066 1059