aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/cadet_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cadet/cadet_api.c')
-rw-r--r--src/cadet/cadet_api.c285
1 files changed, 115 insertions, 170 deletions
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c
index 8638be27d..b16520429 100644
--- a/src/cadet/cadet_api.c
+++ b/src/cadet/cadet_api.c
@@ -30,7 +30,7 @@
30#include "cadet.h" 30#include "cadet.h"
31#include "cadet_protocol.h" 31#include "cadet_protocol.h"
32 32
33#define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__) 33#define LOG(kind, ...) GNUNET_log_from (kind, "cadet-api", __VA_ARGS__)
34 34
35/** 35/**
36 * Opaque handle to the service. 36 * Opaque handle to the service.
@@ -71,7 +71,6 @@ struct GNUNET_CADET_Handle
71 * Time to the next reconnect in case one reconnect fails 71 * Time to the next reconnect in case one reconnect fails
72 */ 72 */
73 struct GNUNET_TIME_Relative reconnect_time; 73 struct GNUNET_TIME_Relative reconnect_time;
74
75}; 74};
76 75
77 76
@@ -138,7 +137,6 @@ struct GNUNET_CADET_Channel
138 * How many messages are we allowed to send to the service right now? 137 * How many messages are we allowed to send to the service right now?
139 */ 138 */
140 unsigned int allow_send; 139 unsigned int allow_send;
141
142}; 140};
143 141
144 142
@@ -199,10 +197,9 @@ struct GNUNET_CADET_Port
199 */ 197 */
200static struct GNUNET_CADET_Port * 198static struct GNUNET_CADET_Port *
201find_port (const struct GNUNET_CADET_Handle *h, 199find_port (const struct GNUNET_CADET_Handle *h,
202 const struct GNUNET_HashCode *hash) 200 const struct GNUNET_HashCode *hash)
203{ 201{
204 return GNUNET_CONTAINER_multihashmap_get (h->ports, 202 return GNUNET_CONTAINER_multihashmap_get (h->ports, hash);
205 hash);
206} 203}
207 204
208 205
@@ -240,11 +237,10 @@ create_channel (struct GNUNET_CADET_Handle *h,
240 ch->cadet = h; 237 ch->cadet = h;
241 if (NULL == ccnp) 238 if (NULL == ccnp)
242 { 239 {
243 while (NULL != 240 while (NULL != find_channel (h, h->next_ccn))
244 find_channel (h, 241 h->next_ccn.channel_of_client =
245 h->next_ccn)) 242 htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI |
246 h->next_ccn.channel_of_client 243 (1 + ntohl (h->next_ccn.channel_of_client)));
247 = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI | (1 + ntohl (h->next_ccn.channel_of_client)));
248 ccn = h->next_ccn; 244 ccn = h->next_ccn;
249 } 245 }
250 else 246 else
@@ -253,10 +249,11 @@ create_channel (struct GNUNET_CADET_Handle *h,
253 } 249 }
254 ch->ccn = ccn; 250 ch->ccn = ccn;
255 GNUNET_assert (GNUNET_OK == 251 GNUNET_assert (GNUNET_OK ==
256 GNUNET_CONTAINER_multihashmap32_put (h->channels, 252 GNUNET_CONTAINER_multihashmap32_put (
257 ntohl (ch->ccn.channel_of_client), 253 h->channels,
258 ch, 254 ntohl (ch->ccn.channel_of_client),
259 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 255 ch,
256 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
260 return ch; 257 return ch;
261} 258}
262 259
@@ -280,10 +277,11 @@ destroy_channel (struct GNUNET_CADET_Channel *ch)
280 "Destroying channel %X of %p\n", 277 "Destroying channel %X of %p\n",
281 htonl (ch->ccn.channel_of_client), 278 htonl (ch->ccn.channel_of_client),
282 h); 279 h);
283 GNUNET_assert (GNUNET_YES == 280 GNUNET_assert (
284 GNUNET_CONTAINER_multihashmap32_remove (h->channels, 281 GNUNET_YES ==
285 ntohl (ch->ccn.channel_of_client), 282 GNUNET_CONTAINER_multihashmap32_remove (h->channels,
286 ch)); 283 ntohl (ch->ccn.channel_of_client),
284 ch));
287 if (NULL != ch->mq_cont) 285 if (NULL != ch->mq_cont)
288 { 286 {
289 GNUNET_SCHEDULER_cancel (ch->mq_cont); 287 GNUNET_SCHEDULER_cancel (ch->mq_cont);
@@ -291,8 +289,7 @@ destroy_channel (struct GNUNET_CADET_Channel *ch)
291 } 289 }
292 /* signal channel destruction */ 290 /* signal channel destruction */
293 if (NULL != ch->disconnects) 291 if (NULL != ch->disconnects)
294 ch->disconnects (ch->ctx, 292 ch->disconnects (ch->ctx, ch);
295 ch);
296 if (NULL != ch->pending_env) 293 if (NULL != ch->pending_env)
297 GNUNET_MQ_discard (ch->pending_env); 294 GNUNET_MQ_discard (ch->pending_env);
298 GNUNET_MQ_destroy (ch->mq); 295 GNUNET_MQ_destroy (ch->mq);
@@ -320,9 +317,7 @@ reconnect (struct GNUNET_CADET_Handle *h);
320 * @return #GNUNET_OK (continue to iterate) 317 * @return #GNUNET_OK (continue to iterate)
321 */ 318 */
322static int 319static int
323open_port_cb (void *cls, 320open_port_cb (void *cls, const struct GNUNET_HashCode *id, void *value)
324 const struct GNUNET_HashCode *id,
325 void *value)
326{ 321{
327 struct GNUNET_CADET_Handle *h = cls; 322 struct GNUNET_CADET_Handle *h = cls;
328 struct GNUNET_CADET_Port *port = value; 323 struct GNUNET_CADET_Port *port = value;
@@ -330,11 +325,9 @@ open_port_cb (void *cls,
330 struct GNUNET_MQ_Envelope *env; 325 struct GNUNET_MQ_Envelope *env;
331 326
332 (void) id; 327 (void) id;
333 env = GNUNET_MQ_msg (msg, 328 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
334 GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
335 msg->port = port->id; 329 msg->port = port->id;
336 GNUNET_MQ_send (h->mq, 330 GNUNET_MQ_send (h->mq, env);
337 env);
338 return GNUNET_OK; 331 return GNUNET_OK;
339} 332}
340 333
@@ -351,12 +344,9 @@ reconnect_cbk (void *cls)
351 struct GNUNET_CADET_Handle *h = cls; 344 struct GNUNET_CADET_Handle *h = cls;
352 345
353 h->reconnect_task = NULL; 346 h->reconnect_task = NULL;
354 h->reconnect_time 347 h->reconnect_time = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
355 = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
356 reconnect (h); 348 reconnect (h);
357 GNUNET_CONTAINER_multihashmap_iterate (h->ports, 349 GNUNET_CONTAINER_multihashmap_iterate (h->ports, &open_port_cb, h);
358 &open_port_cb,
359 h);
360} 350}
361 351
362 352
@@ -405,8 +395,7 @@ cadet_mq_send_now (void *cls)
405 "Sending message on channel %s to CADET, new window size is %u\n", 395 "Sending message on channel %s to CADET, new window size is %u\n",
406 GNUNET_i2s (&ch->peer), 396 GNUNET_i2s (&ch->peer),
407 ch->allow_send); 397 ch->allow_send);
408 GNUNET_MQ_send (ch->cadet->mq, 398 GNUNET_MQ_send (ch->cadet->mq, env);
409 env);
410 GNUNET_MQ_impl_send_continue (ch->mq); 399 GNUNET_MQ_impl_send_continue (ch->mq);
411} 400}
412 401
@@ -431,8 +420,10 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
431 struct GNUNET_CADET_Channel *ch = impl_state; 420 struct GNUNET_CADET_Channel *ch = impl_state;
432 struct GNUNET_CADET_Handle *h = ch->cadet; 421 struct GNUNET_CADET_Handle *h = ch->cadet;
433 uint16_t msize; 422 uint16_t msize;
423 struct GNUNET_MQ_Envelope *orig_env;
434 struct GNUNET_MQ_Envelope *env; 424 struct GNUNET_MQ_Envelope *env;
435 struct GNUNET_CADET_LocalData *cadet_msg = NULL; 425 struct GNUNET_CADET_LocalData *cadet_msg;
426 enum GNUNET_MQ_PriorityPreferences pp;
436 427
437 if (NULL == h->mq) 428 if (NULL == h->mq)
438 { 429 {
@@ -440,6 +431,8 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
440 GNUNET_MQ_impl_send_continue (mq); 431 GNUNET_MQ_impl_send_continue (mq);
441 return; 432 return;
442 } 433 }
434 orig_env = GNUNET_MQ_get_current_envelope (mq);
435 pp = GNUNET_MQ_env_get_options (orig_env);
443 436
444 /* check message size for sanity */ 437 /* check message size for sanity */
445 msize = ntohs (msg->size); 438 msize = ntohs (msg->size);
@@ -453,12 +446,11 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
453 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA, 446 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
454 msg); 447 msg);
455 cadet_msg->ccn = ch->ccn; 448 cadet_msg->ccn = ch->ccn;
449 cadet_msg->pp = htonl ((uint32_t) pp);
456 GNUNET_assert (NULL == ch->pending_env); 450 GNUNET_assert (NULL == ch->pending_env);
457 ch->pending_env = env; 451 ch->pending_env = env;
458 if (0 < ch->allow_send) 452 if (0 < ch->allow_send)
459 ch->mq_cont 453 ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_now, ch);
460 = GNUNET_SCHEDULER_add_now (&cadet_mq_send_now,
461 ch);
462} 454}
463 455
464 456
@@ -470,8 +462,7 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
470 * @param impl_state state of the implementation 462 * @param impl_state state of the implementation
471 */ 463 */
472static void 464static void
473cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, 465cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
474 void *impl_state)
475{ 466{
476 struct GNUNET_CADET_Channel *ch = impl_state; 467 struct GNUNET_CADET_Channel *ch = impl_state;
477 468
@@ -489,8 +480,7 @@ cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
489 * @param error error code 480 * @param error error code
490 */ 481 */
491static void 482static void
492cadet_mq_error_handler (void *cls, 483cadet_mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
493 enum GNUNET_MQ_Error error)
494{ 484{
495 struct GNUNET_CADET_Channel *ch = cls; 485 struct GNUNET_CADET_Channel *ch = cls;
496 486
@@ -503,11 +493,10 @@ cadet_mq_error_handler (void *cls,
503 else 493 else
504 { 494 {
505 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 495 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
506 "MQ error in communication with CADET: %d\n", 496 "MQ error in communication with CADET: %d\n",
507 error); 497 error);
508 if (NULL != ch->disconnects) 498 if (NULL != ch->disconnects)
509 ch->disconnects (ch->ctx, 499 ch->disconnects (ch->ctx, ch);
510 ch);
511 GNUNET_CADET_channel_destroy (ch); 500 GNUNET_CADET_channel_destroy (ch);
512 } 501 }
513} 502}
@@ -521,8 +510,7 @@ cadet_mq_error_handler (void *cls,
521 * @param impl_state state specific to the implementation 510 * @param impl_state state specific to the implementation
522 */ 511 */
523static void 512static void
524cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq, 513cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
525 void *impl_state)
526{ 514{
527 struct GNUNET_CADET_Channel *ch = impl_state; 515 struct GNUNET_CADET_Channel *ch = impl_state;
528 516
@@ -545,8 +533,9 @@ cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
545 * @param msg A message with the details of the new incoming channel 533 * @param msg A message with the details of the new incoming channel
546 */ 534 */
547static void 535static void
548handle_channel_created (void *cls, 536handle_channel_created (
549 const struct GNUNET_CADET_LocalChannelCreateMessage *msg) 537 void *cls,
538 const struct GNUNET_CADET_LocalChannelCreateMessage *msg)
550{ 539{
551 struct GNUNET_CADET_Handle *h = cls; 540 struct GNUNET_CADET_Handle *h = cls;
552 struct GNUNET_CADET_Channel *ch; 541 struct GNUNET_CADET_Channel *ch;
@@ -561,8 +550,7 @@ handle_channel_created (void *cls,
561 GNUNET_break (0); 550 GNUNET_break (0);
562 return; 551 return;
563 } 552 }
564 port = find_port (h, 553 port = find_port (h, port_number);
565 port_number);
566 if (NULL == port) 554 if (NULL == port)
567 { 555 {
568 /* We could have closed the port but the service didn't know about it yet 556 /* We could have closed the port but the service didn't know about it yet
@@ -575,16 +563,14 @@ handle_channel_created (void *cls,
575 "No handler for incoming channel %X (on port %s, recently closed?)\n", 563 "No handler for incoming channel %X (on port %s, recently closed?)\n",
576 ntohl (ccn.channel_of_client), 564 ntohl (ccn.channel_of_client),
577 GNUNET_h2s (port_number)); 565 GNUNET_h2s (port_number));
578 env = GNUNET_MQ_msg (d_msg, 566 env =
579 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY); 567 GNUNET_MQ_msg (d_msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
580 d_msg->ccn = msg->ccn; 568 d_msg->ccn = msg->ccn;
581 GNUNET_MQ_send (h->mq, 569 GNUNET_MQ_send (h->mq, env);
582 env);
583 return; 570 return;
584 } 571 }
585 572
586 ch = create_channel (h, 573 ch = create_channel (h, &ccn);
587 &ccn);
588 ch->peer = msg->peer; 574 ch->peer = msg->peer;
589 ch->incoming_port = port; 575 ch->incoming_port = port;
590 LOG (GNUNET_ERROR_TYPE_DEBUG, 576 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -603,11 +589,8 @@ handle_channel_created (void *cls,
603 port->handlers, 589 port->handlers,
604 &cadet_mq_error_handler, 590 &cadet_mq_error_handler,
605 ch); 591 ch);
606 ch->ctx = port->connects (port->cls, 592 ch->ctx = port->connects (port->cls, ch, &msg->peer);
607 ch, 593 GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx);
608 &msg->peer);
609 GNUNET_MQ_set_handlers_closure (ch->mq,
610 ch->ctx);
611} 594}
612 595
613 596
@@ -618,14 +601,14 @@ handle_channel_created (void *cls,
618 * @param msg A message with the details of the channel being destroyed 601 * @param msg A message with the details of the channel being destroyed
619 */ 602 */
620static void 603static void
621handle_channel_destroy (void *cls, 604handle_channel_destroy (
622 const struct GNUNET_CADET_LocalChannelDestroyMessage *msg) 605 void *cls,
606 const struct GNUNET_CADET_LocalChannelDestroyMessage *msg)
623{ 607{
624 struct GNUNET_CADET_Handle *h = cls; 608 struct GNUNET_CADET_Handle *h = cls;
625 struct GNUNET_CADET_Channel *ch; 609 struct GNUNET_CADET_Channel *ch;
626 610
627 ch = find_channel (h, 611 ch = find_channel (h, msg->ccn);
628 msg->ccn);
629 if (NULL == ch) 612 if (NULL == ch)
630 { 613 {
631 LOG (GNUNET_ERROR_TYPE_DEBUG, 614 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -649,8 +632,7 @@ handle_channel_destroy (void *cls,
649 * #GNUNET_SYSERR otherwise 632 * #GNUNET_SYSERR otherwise
650 */ 633 */
651static int 634static int
652check_local_data (void *cls, 635check_local_data (void *cls, const struct GNUNET_CADET_LocalData *message)
653 const struct GNUNET_CADET_LocalData *message)
654{ 636{
655 uint16_t size; 637 uint16_t size;
656 638
@@ -672,8 +654,7 @@ check_local_data (void *cls,
672 * @param message A message encapsulating the data 654 * @param message A message encapsulating the data
673 */ 655 */
674static void 656static void
675handle_local_data (void *cls, 657handle_local_data (void *cls, const struct GNUNET_CADET_LocalData *message)
676 const struct GNUNET_CADET_LocalData *message)
677{ 658{
678 struct GNUNET_CADET_Handle *h = cls; 659 struct GNUNET_CADET_Handle *h = cls;
679 const struct GNUNET_MessageHeader *payload; 660 const struct GNUNET_MessageHeader *payload;
@@ -681,8 +662,7 @@ handle_local_data (void *cls,
681 uint16_t type; 662 uint16_t type;
682 int fwd; 663 int fwd;
683 664
684 ch = find_channel (h, 665 ch = find_channel (h, message->ccn);
685 message->ccn);
686 if (NULL == ch) 666 if (NULL == ch)
687 { 667 {
688 LOG (GNUNET_ERROR_TYPE_DEBUG, 668 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -700,8 +680,7 @@ handle_local_data (void *cls,
700 GNUNET_i2s (&ch->peer), 680 GNUNET_i2s (&ch->peer),
701 ntohl (message->ccn.channel_of_client), 681 ntohl (message->ccn.channel_of_client),
702 type); 682 type);
703 GNUNET_MQ_inject_message (ch->mq, 683 GNUNET_MQ_inject_message (ch->mq, payload);
704 payload);
705} 684}
706 685
707 686
@@ -713,14 +692,12 @@ handle_local_data (void *cls,
713 * @param message Message itself. 692 * @param message Message itself.
714 */ 693 */
715static void 694static void
716handle_local_ack (void *cls, 695handle_local_ack (void *cls, const struct GNUNET_CADET_LocalAck *message)
717 const struct GNUNET_CADET_LocalAck *message)
718{ 696{
719 struct GNUNET_CADET_Handle *h = cls; 697 struct GNUNET_CADET_Handle *h = cls;
720 struct GNUNET_CADET_Channel *ch; 698 struct GNUNET_CADET_Channel *ch;
721 699
722 ch = find_channel (h, 700 ch = find_channel (h, message->ccn);
723 message->ccn);
724 if (NULL == ch) 701 if (NULL == ch)
725 { 702 {
726 LOG (GNUNET_ERROR_TYPE_DEBUG, 703 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -745,9 +722,7 @@ handle_local_ack (void *cls,
745 } 722 }
746 if (NULL != ch->mq_cont) 723 if (NULL != ch->mq_cont)
747 return; /* already working on it! */ 724 return; /* already working on it! */
748 ch->mq_cont 725 ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_now, ch);
749 = GNUNET_SCHEDULER_add_now (&cadet_mq_send_now,
750 ch);
751} 726}
752 727
753 728
@@ -761,9 +736,7 @@ handle_local_ack (void *cls,
761 * @return #GNUNET_OK (continue to iterate) 736 * @return #GNUNET_OK (continue to iterate)
762 */ 737 */
763static int 738static int
764destroy_channel_cb (void *cls, 739destroy_channel_cb (void *cls, uint32_t cid, void *value)
765 uint32_t cid,
766 void *value)
767{ 740{
768 /* struct GNUNET_CADET_Handle *handle = cls; */ 741 /* struct GNUNET_CADET_Handle *handle = cls; */
769 struct GNUNET_CADET_Channel *ch = value; 742 struct GNUNET_CADET_Channel *ch = value;
@@ -771,7 +744,7 @@ destroy_channel_cb (void *cls,
771 (void) cls; 744 (void) cls;
772 (void) cid; 745 (void) cid;
773 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 746 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
774 "Destroying channel due to GNUNET_CADET_disconnect()\n"); 747 "Destroying channel due to GNUNET_CADET_disconnect()\n");
775 destroy_channel (ch); 748 destroy_channel (ch);
776 return GNUNET_OK; 749 return GNUNET_OK;
777} 750}
@@ -786,23 +759,17 @@ destroy_channel_cb (void *cls,
786 * @param error error code 759 * @param error error code
787 */ 760 */
788static void 761static void
789handle_mq_error (void *cls, 762handle_mq_error (void *cls, enum GNUNET_MQ_Error error)
790 enum GNUNET_MQ_Error error)
791{ 763{
792 struct GNUNET_CADET_Handle *h = cls; 764 struct GNUNET_CADET_Handle *h = cls;
793 765
794 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 766 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MQ ERROR: %u\n", error);
795 "MQ ERROR: %u\n", 767 GNUNET_CONTAINER_multihashmap32_iterate (h->channels, &destroy_channel_cb, h);
796 error);
797 GNUNET_CONTAINER_multihashmap32_iterate (h->channels,
798 &destroy_channel_cb,
799 h);
800 GNUNET_MQ_destroy (h->mq); 768 GNUNET_MQ_destroy (h->mq);
801 h->mq = NULL; 769 h->mq = NULL;
802 GNUNET_assert (NULL == h->reconnect_task); 770 GNUNET_assert (NULL == h->reconnect_task);
803 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, 771 h->reconnect_task =
804 &reconnect_cbk, 772 GNUNET_SCHEDULER_add_delayed (h->reconnect_time, &reconnect_cbk, h);
805 h);
806} 773}
807 774
808 775
@@ -815,32 +782,28 @@ handle_mq_error (void *cls,
815static void 782static void
816reconnect (struct GNUNET_CADET_Handle *h) 783reconnect (struct GNUNET_CADET_Handle *h)
817{ 784{
818 struct GNUNET_MQ_MessageHandler handlers[] = { 785 struct GNUNET_MQ_MessageHandler handlers[] =
819 GNUNET_MQ_hd_fixed_size (channel_created, 786 {GNUNET_MQ_hd_fixed_size (channel_created,
820 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE, 787 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE,
821 struct GNUNET_CADET_LocalChannelCreateMessage, 788 struct GNUNET_CADET_LocalChannelCreateMessage,
822 h), 789 h),
823 GNUNET_MQ_hd_fixed_size (channel_destroy, 790 GNUNET_MQ_hd_fixed_size (channel_destroy,
824 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY, 791 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY,
825 struct GNUNET_CADET_LocalChannelDestroyMessage, 792 struct GNUNET_CADET_LocalChannelDestroyMessage,
826 h), 793 h),
827 GNUNET_MQ_hd_var_size (local_data, 794 GNUNET_MQ_hd_var_size (local_data,
828 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA, 795 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
829 struct GNUNET_CADET_LocalData, 796 struct GNUNET_CADET_LocalData,
830 h), 797 h),
831 GNUNET_MQ_hd_fixed_size (local_ack, 798 GNUNET_MQ_hd_fixed_size (local_ack,
832 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK, 799 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK,
833 struct GNUNET_CADET_LocalAck, 800 struct GNUNET_CADET_LocalAck,
834 h), 801 h),
835 GNUNET_MQ_handler_end () 802 GNUNET_MQ_handler_end ()};
836 };
837 803
838 GNUNET_assert (NULL == h->mq); 804 GNUNET_assert (NULL == h->mq);
839 h->mq = GNUNET_CLIENT_connect (h->cfg, 805 h->mq =
840 "cadet", 806 GNUNET_CLIENT_connect (h->cfg, "cadet", handlers, &handle_mq_error, h);
841 handlers,
842 &handle_mq_error,
843 h);
844} 807}
845 808
846 809
@@ -854,9 +817,7 @@ reconnect (struct GNUNET_CADET_Handle *h)
854 * @return #GNUNET_OK (continue to iterate) 817 * @return #GNUNET_OK (continue to iterate)
855 */ 818 */
856static int 819static int
857destroy_port_cb (void *cls, 820destroy_port_cb (void *cls, const struct GNUNET_HashCode *id, void *value)
858 const struct GNUNET_HashCode *id,
859 void *value)
860{ 821{
861 /* struct GNUNET_CADET_Handle *handle = cls; */ 822 /* struct GNUNET_CADET_Handle *handle = cls; */
862 struct GNUNET_CADET_Port *port = value; 823 struct GNUNET_CADET_Port *port = value;
@@ -914,20 +875,17 @@ GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle)
914void 875void
915GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p) 876GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p)
916{ 877{
917 GNUNET_assert (GNUNET_YES == 878 GNUNET_assert (
918 GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, 879 GNUNET_YES ==
919 &p->id, 880 GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, &p->id, p));
920 p));
921 if (NULL != p->cadet->mq) 881 if (NULL != p->cadet->mq)
922 { 882 {
923 struct GNUNET_CADET_PortMessage *msg; 883 struct GNUNET_CADET_PortMessage *msg;
924 struct GNUNET_MQ_Envelope *env; 884 struct GNUNET_MQ_Envelope *env;
925 885
926 env = GNUNET_MQ_msg (msg, 886 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE);
927 GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE);
928 msg->port = p->id; 887 msg->port = p->id;
929 GNUNET_MQ_send (p->cadet->mq, 888 GNUNET_MQ_send (p->cadet->mq, env);
930 env);
931 } 889 }
932 GNUNET_free_non_null (p->handlers); 890 GNUNET_free_non_null (p->handlers);
933 GNUNET_free (p); 891 GNUNET_free (p);
@@ -952,14 +910,12 @@ GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel)
952 910
953 if (NULL != h->mq) 911 if (NULL != h->mq)
954 { 912 {
955 env = GNUNET_MQ_msg (msg, 913 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
956 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
957 msg->ccn = channel->ccn; 914 msg->ccn = channel->ccn;
958 GNUNET_MQ_send (h->mq, 915 GNUNET_MQ_send (h->mq, env);
959 env);
960 } 916 }
961 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 917 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
962 "Destroying channel due to GNUNET_CADET_channel_destroy()\n"); 918 "Destroying channel due to GNUNET_CADET_channel_destroy()\n");
963 channel->disconnects = NULL; 919 channel->disconnects = NULL;
964 destroy_channel (channel); 920 destroy_channel (channel);
965} 921}
@@ -993,14 +949,12 @@ GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel)
993 struct GNUNET_CADET_LocalAck *msg; 949 struct GNUNET_CADET_LocalAck *msg;
994 struct GNUNET_MQ_Envelope *env; 950 struct GNUNET_MQ_Envelope *env;
995 951
996 env = GNUNET_MQ_msg (msg, 952 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
997 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
998 LOG (GNUNET_ERROR_TYPE_DEBUG, 953 LOG (GNUNET_ERROR_TYPE_DEBUG,
999 "Sending ACK on channel %X\n", 954 "Sending ACK on channel %X\n",
1000 ntohl (channel->ccn.channel_of_client)); 955 ntohl (channel->ccn.channel_of_client));
1001 msg->ccn = channel->ccn; 956 msg->ccn = channel->ccn;
1002 GNUNET_MQ_send (channel->cadet->mq, 957 GNUNET_MQ_send (channel->cadet->mq, env);
1003 env);
1004} 958}
1005 959
1006 960
@@ -1016,12 +970,10 @@ GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
1016{ 970{
1017 struct GNUNET_CADET_Handle *h; 971 struct GNUNET_CADET_Handle *h;
1018 972
1019 LOG (GNUNET_ERROR_TYPE_DEBUG, 973 LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_CADET_connect()\n");
1020 "GNUNET_CADET_connect()\n");
1021 h = GNUNET_new (struct GNUNET_CADET_Handle); 974 h = GNUNET_new (struct GNUNET_CADET_Handle);
1022 h->cfg = cfg; 975 h->cfg = cfg;
1023 h->ports = GNUNET_CONTAINER_multihashmap_create (4, 976 h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
1024 GNUNET_YES);
1025 h->channels = GNUNET_CONTAINER_multihashmap32_create (4); 977 h->channels = GNUNET_CONTAINER_multihashmap32_create (4);
1026 reconnect (h); 978 reconnect (h);
1027 if (NULL == h->mq) 979 if (NULL == h->mq)
@@ -1051,7 +1003,7 @@ struct GNUNET_CADET_Port *
1051GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h, 1003GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1052 const struct GNUNET_HashCode *port, 1004 const struct GNUNET_HashCode *port,
1053 GNUNET_CADET_ConnectEventHandler connects, 1005 GNUNET_CADET_ConnectEventHandler connects,
1054 void * connects_cls, 1006 void *connects_cls,
1055 GNUNET_CADET_WindowSizeEventHandler window_changes, 1007 GNUNET_CADET_WindowSizeEventHandler window_changes,
1056 GNUNET_CADET_DisconnectEventHandler disconnects, 1008 GNUNET_CADET_DisconnectEventHandler disconnects,
1057 const struct GNUNET_MQ_MessageHandler *handlers) 1009 const struct GNUNET_MQ_MessageHandler *handlers)
@@ -1061,17 +1013,17 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1061 GNUNET_assert (NULL != connects); 1013 GNUNET_assert (NULL != connects);
1062 GNUNET_assert (NULL != disconnects); 1014 GNUNET_assert (NULL != disconnects);
1063 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1015 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064 "Listening to CADET port %s\n", 1016 "Listening to CADET port %s\n",
1065 GNUNET_h2s (port)); 1017 GNUNET_h2s (port));
1066 1018
1067 p = GNUNET_new (struct GNUNET_CADET_Port); 1019 p = GNUNET_new (struct GNUNET_CADET_Port);
1068 p->cadet = h; 1020 p->cadet = h;
1069 p->id = *port; 1021 p->id = *port;
1070 if (GNUNET_OK != 1022 if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_put (
1071 GNUNET_CONTAINER_multihashmap_put (h->ports, 1023 h->ports,
1072 &p->id, 1024 &p->id,
1073 p, 1025 p,
1074 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) 1026 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1075 { 1027 {
1076 GNUNET_free (p); 1028 GNUNET_free (p);
1077 return NULL; 1029 return NULL;
@@ -1081,11 +1033,8 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1081 p->window_changes = window_changes; 1033 p->window_changes = window_changes;
1082 p->disconnects = disconnects; 1034 p->disconnects = disconnects;
1083 p->handlers = GNUNET_MQ_copy_handlers (handlers); 1035 p->handlers = GNUNET_MQ_copy_handlers (handlers);
1084 1036
1085 GNUNET_assert (GNUNET_OK == 1037 GNUNET_assert (GNUNET_OK == open_port_cb (h, &p->id, p));
1086 open_port_cb (h,
1087 &p->id,
1088 p));
1089 return p; 1038 return p;
1090} 1039}
1091 1040
@@ -1124,11 +1073,10 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1124 1073
1125 GNUNET_assert (NULL != disconnects); 1074 GNUNET_assert (NULL != disconnects);
1126 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1075 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1127 "Creating channel to peer %s at port %s\n", 1076 "Creating channel to peer %s at port %s\n",
1128 GNUNET_i2s (destination), 1077 GNUNET_i2s (destination),
1129 GNUNET_h2s (port)); 1078 GNUNET_h2s (port));
1130 ch = create_channel (h, 1079 ch = create_channel (h, NULL);
1131 NULL);
1132 ch->ctx = channel_cls; 1080 ch->ctx = channel_cls;
1133 ch->peer = *destination; 1081 ch->peer = *destination;
1134 ch->window_changes = window_changes; 1082 ch->window_changes = window_changes;
@@ -1142,17 +1090,14 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1142 handlers, 1090 handlers,
1143 &cadet_mq_error_handler, 1091 &cadet_mq_error_handler,
1144 ch); 1092 ch);
1145 GNUNET_MQ_set_handlers_closure (ch->mq, 1093 GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
1146 channel_cls);
1147 1094
1148 /* Request channel creation to service */ 1095 /* Request channel creation to service */
1149 env = GNUNET_MQ_msg (msg, 1096 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
1150 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
1151 msg->ccn = ch->ccn; 1097 msg->ccn = ch->ccn;
1152 msg->port = *port; 1098 msg->port = *port;
1153 msg->peer = *destination; 1099 msg->peer = *destination;
1154 GNUNET_MQ_send (h->mq, 1100 GNUNET_MQ_send (h->mq, env);
1155 env);
1156 return ch; 1101 return ch;
1157} 1102}
1158 1103