aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/cadet_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cadet/cadet_api.c')
-rw-r--r--src/cadet/cadet_api.c315
1 files changed, 130 insertions, 185 deletions
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c
index b0016d2a8..e141787a0 100644
--- a/src/cadet/cadet_api.c
+++ b/src/cadet/cadet_api.c
@@ -30,7 +30,7 @@
30#include "cadet.h" 30#include "cadet.h"
31#include "cadet_protocol.h" 31#include "cadet_protocol.h"
32 32
33#define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__) 33#define LOG(kind, ...) GNUNET_log_from (kind, "cadet-api", __VA_ARGS__)
34 34
35/** 35/**
36 * Opaque handle to the service. 36 * Opaque handle to the service.
@@ -71,7 +71,6 @@ struct GNUNET_CADET_Handle
71 * Time to the next reconnect in case one reconnect fails 71 * Time to the next reconnect in case one reconnect fails
72 */ 72 */
73 struct GNUNET_TIME_Relative reconnect_time; 73 struct GNUNET_TIME_Relative reconnect_time;
74
75}; 74};
76 75
77 76
@@ -143,7 +142,6 @@ struct GNUNET_CADET_Channel
143 * How many messages are we allowed to send to the service right now? 142 * How many messages are we allowed to send to the service right now?
144 */ 143 */
145 unsigned int allow_send; 144 unsigned int allow_send;
146
147}; 145};
148 146
149 147
@@ -204,10 +202,9 @@ struct GNUNET_CADET_Port
204 */ 202 */
205static struct GNUNET_CADET_Port * 203static struct GNUNET_CADET_Port *
206find_port (const struct GNUNET_CADET_Handle *h, 204find_port (const struct GNUNET_CADET_Handle *h,
207 const struct GNUNET_HashCode *hash) 205 const struct GNUNET_HashCode *hash)
208{ 206{
209 return GNUNET_CONTAINER_multihashmap_get (h->ports, 207 return GNUNET_CONTAINER_multihashmap_get (h->ports, hash);
210 hash);
211} 208}
212 209
213 210
@@ -245,11 +242,10 @@ create_channel (struct GNUNET_CADET_Handle *h,
245 ch->cadet = h; 242 ch->cadet = h;
246 if (NULL == ccnp) 243 if (NULL == ccnp)
247 { 244 {
248 while (NULL != 245 while (NULL != find_channel (h, h->next_ccn))
249 find_channel (h, 246 h->next_ccn.channel_of_client =
250 h->next_ccn)) 247 htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI |
251 h->next_ccn.channel_of_client 248 (1 + ntohl (h->next_ccn.channel_of_client)));
252 = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI | (1 + ntohl (h->next_ccn.channel_of_client)));
253 ccn = h->next_ccn; 249 ccn = h->next_ccn;
254 } 250 }
255 else 251 else
@@ -258,10 +254,11 @@ create_channel (struct GNUNET_CADET_Handle *h,
258 } 254 }
259 ch->ccn = ccn; 255 ch->ccn = ccn;
260 GNUNET_assert (GNUNET_OK == 256 GNUNET_assert (GNUNET_OK ==
261 GNUNET_CONTAINER_multihashmap32_put (h->channels, 257 GNUNET_CONTAINER_multihashmap32_put (
262 ntohl (ch->ccn.channel_of_client), 258 h->channels,
263 ch, 259 ntohl (ch->ccn.channel_of_client),
264 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 260 ch,
261 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
265 return ch; 262 return ch;
266} 263}
267 264
@@ -285,10 +282,11 @@ destroy_channel (struct GNUNET_CADET_Channel *ch)
285 "Destroying channel %X of %p\n", 282 "Destroying channel %X of %p\n",
286 htonl (ch->ccn.channel_of_client), 283 htonl (ch->ccn.channel_of_client),
287 h); 284 h);
288 GNUNET_assert (GNUNET_YES == 285 GNUNET_assert (
289 GNUNET_CONTAINER_multihashmap32_remove (h->channels, 286 GNUNET_YES ==
290 ntohl (ch->ccn.channel_of_client), 287 GNUNET_CONTAINER_multihashmap32_remove (h->channels,
291 ch)); 288 ntohl (ch->ccn.channel_of_client),
289 ch));
292 if (NULL != ch->mq_cont) 290 if (NULL != ch->mq_cont)
293 { 291 {
294 GNUNET_SCHEDULER_cancel (ch->mq_cont); 292 GNUNET_SCHEDULER_cancel (ch->mq_cont);
@@ -296,8 +294,7 @@ destroy_channel (struct GNUNET_CADET_Channel *ch)
296 } 294 }
297 /* signal channel destruction */ 295 /* signal channel destruction */
298 if (NULL != ch->disconnects) 296 if (NULL != ch->disconnects)
299 ch->disconnects (ch->ctx, 297 ch->disconnects (ch->ctx, ch);
300 ch);
301 if (NULL != ch->pending_env) 298 if (NULL != ch->pending_env)
302 GNUNET_MQ_discard (ch->pending_env); 299 GNUNET_MQ_discard (ch->pending_env);
303 GNUNET_MQ_destroy (ch->mq); 300 GNUNET_MQ_destroy (ch->mq);
@@ -325,9 +322,7 @@ reconnect (struct GNUNET_CADET_Handle *h);
325 * @return #GNUNET_OK (continue to iterate) 322 * @return #GNUNET_OK (continue to iterate)
326 */ 323 */
327static int 324static int
328open_port_cb (void *cls, 325open_port_cb (void *cls, const struct GNUNET_HashCode *id, void *value)
329 const struct GNUNET_HashCode *id,
330 void *value)
331{ 326{
332 struct GNUNET_CADET_Handle *h = cls; 327 struct GNUNET_CADET_Handle *h = cls;
333 struct GNUNET_CADET_Port *port = value; 328 struct GNUNET_CADET_Port *port = value;
@@ -335,11 +330,9 @@ open_port_cb (void *cls,
335 struct GNUNET_MQ_Envelope *env; 330 struct GNUNET_MQ_Envelope *env;
336 331
337 (void) id; 332 (void) id;
338 env = GNUNET_MQ_msg (msg, 333 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
339 GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
340 msg->port = port->id; 334 msg->port = port->id;
341 GNUNET_MQ_send (h->mq, 335 GNUNET_MQ_send (h->mq, env);
342 env);
343 return GNUNET_OK; 336 return GNUNET_OK;
344} 337}
345 338
@@ -356,12 +349,9 @@ reconnect_cbk (void *cls)
356 struct GNUNET_CADET_Handle *h = cls; 349 struct GNUNET_CADET_Handle *h = cls;
357 350
358 h->reconnect_task = NULL; 351 h->reconnect_task = NULL;
359 h->reconnect_time 352 h->reconnect_time = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
360 = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
361 reconnect (h); 353 reconnect (h);
362 GNUNET_CONTAINER_multihashmap_iterate (h->ports, 354 GNUNET_CONTAINER_multihashmap_iterate (h->ports, &open_port_cb, h);
363 &open_port_cb,
364 h);
365} 355}
366 356
367 357
@@ -410,8 +400,7 @@ cadet_mq_send_now (void *cls)
410 "Sending message on channel %s to CADET, new window size is %u\n", 400 "Sending message on channel %s to CADET, new window size is %u\n",
411 GNUNET_i2s (&ch->peer), 401 GNUNET_i2s (&ch->peer),
412 ch->allow_send); 402 ch->allow_send);
413 GNUNET_MQ_send (ch->cadet->mq, 403 GNUNET_MQ_send (ch->cadet->mq, env);
414 env);
415 GNUNET_MQ_impl_send_continue (ch->mq); 404 GNUNET_MQ_impl_send_continue (ch->mq);
416} 405}
417 406
@@ -436,8 +425,10 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
436 struct GNUNET_CADET_Channel *ch = impl_state; 425 struct GNUNET_CADET_Channel *ch = impl_state;
437 struct GNUNET_CADET_Handle *h = ch->cadet; 426 struct GNUNET_CADET_Handle *h = ch->cadet;
438 uint16_t msize; 427 uint16_t msize;
428 struct GNUNET_MQ_Envelope *orig_env;
439 struct GNUNET_MQ_Envelope *env; 429 struct GNUNET_MQ_Envelope *env;
440 struct GNUNET_CADET_LocalData *cadet_msg = NULL; 430 struct GNUNET_CADET_LocalData *cadet_msg;
431 enum GNUNET_MQ_PriorityPreferences pp;
441 432
442 if (NULL == h->mq) 433 if (NULL == h->mq)
443 { 434 {
@@ -445,6 +436,8 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
445 GNUNET_MQ_impl_send_continue (mq); 436 GNUNET_MQ_impl_send_continue (mq);
446 return; 437 return;
447 } 438 }
439 orig_env = GNUNET_MQ_get_current_envelope (mq);
440 pp = GNUNET_MQ_env_get_options (orig_env);
448 441
449 /* check message size for sanity */ 442 /* check message size for sanity */
450 msize = ntohs (msg->size); 443 msize = ntohs (msg->size);
@@ -458,12 +451,11 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
458 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA, 451 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
459 msg); 452 msg);
460 cadet_msg->ccn = ch->ccn; 453 cadet_msg->ccn = ch->ccn;
454 cadet_msg->pp = htonl ((uint32_t) pp);
461 GNUNET_assert (NULL == ch->pending_env); 455 GNUNET_assert (NULL == ch->pending_env);
462 ch->pending_env = env; 456 ch->pending_env = env;
463 if (0 < ch->allow_send) 457 if (0 < ch->allow_send)
464 ch->mq_cont 458 ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_now, ch);
465 = GNUNET_SCHEDULER_add_now (&cadet_mq_send_now,
466 ch);
467} 459}
468 460
469 461
@@ -475,8 +467,7 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
475 * @param impl_state state of the implementation 467 * @param impl_state state of the implementation
476 */ 468 */
477static void 469static void
478cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, 470cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
479 void *impl_state)
480{ 471{
481 struct GNUNET_CADET_Channel *ch = impl_state; 472 struct GNUNET_CADET_Channel *ch = impl_state;
482 473
@@ -494,8 +485,7 @@ cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
494 * @param error error code 485 * @param error error code
495 */ 486 */
496static void 487static void
497cadet_mq_error_handler (void *cls, 488cadet_mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
498 enum GNUNET_MQ_Error error)
499{ 489{
500 struct GNUNET_CADET_Channel *ch = cls; 490 struct GNUNET_CADET_Channel *ch = cls;
501 491
@@ -508,11 +498,10 @@ cadet_mq_error_handler (void *cls,
508 else 498 else
509 { 499 {
510 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 500 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
511 "MQ error in communication with CADET: %d\n", 501 "MQ error in communication with CADET: %d\n",
512 error); 502 error);
513 if (NULL != ch->disconnects) 503 if (NULL != ch->disconnects)
514 ch->disconnects (ch->ctx, 504 ch->disconnects (ch->ctx, ch);
515 ch);
516 GNUNET_CADET_channel_destroy (ch); 505 GNUNET_CADET_channel_destroy (ch);
517 } 506 }
518} 507}
@@ -526,8 +515,7 @@ cadet_mq_error_handler (void *cls,
526 * @param impl_state state specific to the implementation 515 * @param impl_state state specific to the implementation
527 */ 516 */
528static void 517static void
529cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq, 518cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
530 void *impl_state)
531{ 519{
532 struct GNUNET_CADET_Channel *ch = impl_state; 520 struct GNUNET_CADET_Channel *ch = impl_state;
533 521
@@ -550,8 +538,9 @@ cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
550 * @param msg A message with the details of the new incoming channel 538 * @param msg A message with the details of the new incoming channel
551 */ 539 */
552static void 540static void
553handle_channel_created (void *cls, 541handle_channel_created (
554 const struct GNUNET_CADET_LocalChannelCreateMessage *msg) 542 void *cls,
543 const struct GNUNET_CADET_LocalChannelCreateMessage *msg)
555{ 544{
556 struct GNUNET_CADET_Handle *h = cls; 545 struct GNUNET_CADET_Handle *h = cls;
557 struct GNUNET_CADET_Channel *ch; 546 struct GNUNET_CADET_Channel *ch;
@@ -566,8 +555,7 @@ handle_channel_created (void *cls,
566 GNUNET_break (0); 555 GNUNET_break (0);
567 return; 556 return;
568 } 557 }
569 port = find_port (h, 558 port = find_port (h, port_number);
570 port_number);
571 if (NULL == port) 559 if (NULL == port)
572 { 560 {
573 /* We could have closed the port but the service didn't know about it yet 561 /* We could have closed the port but the service didn't know about it yet
@@ -580,16 +568,14 @@ handle_channel_created (void *cls,
580 "No handler for incoming channel %X (on port %s, recently closed?)\n", 568 "No handler for incoming channel %X (on port %s, recently closed?)\n",
581 ntohl (ccn.channel_of_client), 569 ntohl (ccn.channel_of_client),
582 GNUNET_h2s (port_number)); 570 GNUNET_h2s (port_number));
583 env = GNUNET_MQ_msg (d_msg, 571 env =
584 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY); 572 GNUNET_MQ_msg (d_msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
585 d_msg->ccn = msg->ccn; 573 d_msg->ccn = msg->ccn;
586 GNUNET_MQ_send (h->mq, 574 GNUNET_MQ_send (h->mq, env);
587 env);
588 return; 575 return;
589 } 576 }
590 577
591 ch = create_channel (h, 578 ch = create_channel (h, &ccn);
592 &ccn);
593 ch->peer = msg->peer; 579 ch->peer = msg->peer;
594 ch->incoming_port = port; 580 ch->incoming_port = port;
595 ch->options = ntohl (msg->opt); 581 ch->options = ntohl (msg->opt);
@@ -609,11 +595,8 @@ handle_channel_created (void *cls,
609 port->handlers, 595 port->handlers,
610 &cadet_mq_error_handler, 596 &cadet_mq_error_handler,
611 ch); 597 ch);
612 ch->ctx = port->connects (port->cls, 598 ch->ctx = port->connects (port->cls, ch, &msg->peer);
613 ch, 599 GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx);
614 &msg->peer);
615 GNUNET_MQ_set_handlers_closure (ch->mq,
616 ch->ctx);
617} 600}
618 601
619 602
@@ -624,14 +607,14 @@ handle_channel_created (void *cls,
624 * @param msg A message with the details of the channel being destroyed 607 * @param msg A message with the details of the channel being destroyed
625 */ 608 */
626static void 609static void
627handle_channel_destroy (void *cls, 610handle_channel_destroy (
628 const struct GNUNET_CADET_LocalChannelDestroyMessage *msg) 611 void *cls,
612 const struct GNUNET_CADET_LocalChannelDestroyMessage *msg)
629{ 613{
630 struct GNUNET_CADET_Handle *h = cls; 614 struct GNUNET_CADET_Handle *h = cls;
631 struct GNUNET_CADET_Channel *ch; 615 struct GNUNET_CADET_Channel *ch;
632 616
633 ch = find_channel (h, 617 ch = find_channel (h, msg->ccn);
634 msg->ccn);
635 if (NULL == ch) 618 if (NULL == ch)
636 { 619 {
637 LOG (GNUNET_ERROR_TYPE_DEBUG, 620 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -655,8 +638,7 @@ handle_channel_destroy (void *cls,
655 * #GNUNET_SYSERR otherwise 638 * #GNUNET_SYSERR otherwise
656 */ 639 */
657static int 640static int
658check_local_data (void *cls, 641check_local_data (void *cls, const struct GNUNET_CADET_LocalData *message)
659 const struct GNUNET_CADET_LocalData *message)
660{ 642{
661 uint16_t size; 643 uint16_t size;
662 644
@@ -678,8 +660,7 @@ check_local_data (void *cls,
678 * @param message A message encapsulating the data 660 * @param message A message encapsulating the data
679 */ 661 */
680static void 662static void
681handle_local_data (void *cls, 663handle_local_data (void *cls, const struct GNUNET_CADET_LocalData *message)
682 const struct GNUNET_CADET_LocalData *message)
683{ 664{
684 struct GNUNET_CADET_Handle *h = cls; 665 struct GNUNET_CADET_Handle *h = cls;
685 const struct GNUNET_MessageHeader *payload; 666 const struct GNUNET_MessageHeader *payload;
@@ -687,8 +668,7 @@ handle_local_data (void *cls,
687 uint16_t type; 668 uint16_t type;
688 int fwd; 669 int fwd;
689 670
690 ch = find_channel (h, 671 ch = find_channel (h, message->ccn);
691 message->ccn);
692 if (NULL == ch) 672 if (NULL == ch)
693 { 673 {
694 LOG (GNUNET_ERROR_TYPE_DEBUG, 674 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -706,8 +686,7 @@ handle_local_data (void *cls,
706 GNUNET_i2s (&ch->peer), 686 GNUNET_i2s (&ch->peer),
707 ntohl (message->ccn.channel_of_client), 687 ntohl (message->ccn.channel_of_client),
708 type); 688 type);
709 GNUNET_MQ_inject_message (ch->mq, 689 GNUNET_MQ_inject_message (ch->mq, payload);
710 payload);
711} 690}
712 691
713 692
@@ -719,14 +698,12 @@ handle_local_data (void *cls,
719 * @param message Message itself. 698 * @param message Message itself.
720 */ 699 */
721static void 700static void
722handle_local_ack (void *cls, 701handle_local_ack (void *cls, const struct GNUNET_CADET_LocalAck *message)
723 const struct GNUNET_CADET_LocalAck *message)
724{ 702{
725 struct GNUNET_CADET_Handle *h = cls; 703 struct GNUNET_CADET_Handle *h = cls;
726 struct GNUNET_CADET_Channel *ch; 704 struct GNUNET_CADET_Channel *ch;
727 705
728 ch = find_channel (h, 706 ch = find_channel (h, message->ccn);
729 message->ccn);
730 if (NULL == ch) 707 if (NULL == ch)
731 { 708 {
732 LOG (GNUNET_ERROR_TYPE_DEBUG, 709 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -751,9 +728,7 @@ handle_local_ack (void *cls,
751 } 728 }
752 if (NULL != ch->mq_cont) 729 if (NULL != ch->mq_cont)
753 return; /* already working on it! */ 730 return; /* already working on it! */
754 ch->mq_cont 731 ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_now, ch);
755 = GNUNET_SCHEDULER_add_now (&cadet_mq_send_now,
756 ch);
757} 732}
758 733
759 734
@@ -767,9 +742,7 @@ handle_local_ack (void *cls,
767 * @return #GNUNET_OK (continue to iterate) 742 * @return #GNUNET_OK (continue to iterate)
768 */ 743 */
769static int 744static int
770destroy_channel_cb (void *cls, 745destroy_channel_cb (void *cls, uint32_t cid, void *value)
771 uint32_t cid,
772 void *value)
773{ 746{
774 /* struct GNUNET_CADET_Handle *handle = cls; */ 747 /* struct GNUNET_CADET_Handle *handle = cls; */
775 struct GNUNET_CADET_Channel *ch = value; 748 struct GNUNET_CADET_Channel *ch = value;
@@ -777,7 +750,7 @@ destroy_channel_cb (void *cls,
777 (void) cls; 750 (void) cls;
778 (void) cid; 751 (void) cid;
779 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 752 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
780 "Destroying channel due to GNUNET_CADET_disconnect()\n"); 753 "Destroying channel due to GNUNET_CADET_disconnect()\n");
781 destroy_channel (ch); 754 destroy_channel (ch);
782 return GNUNET_OK; 755 return GNUNET_OK;
783} 756}
@@ -792,23 +765,17 @@ destroy_channel_cb (void *cls,
792 * @param error error code 765 * @param error error code
793 */ 766 */
794static void 767static void
795handle_mq_error (void *cls, 768handle_mq_error (void *cls, enum GNUNET_MQ_Error error)
796 enum GNUNET_MQ_Error error)
797{ 769{
798 struct GNUNET_CADET_Handle *h = cls; 770 struct GNUNET_CADET_Handle *h = cls;
799 771
800 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 772 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MQ ERROR: %u\n", error);
801 "MQ ERROR: %u\n", 773 GNUNET_CONTAINER_multihashmap32_iterate (h->channels, &destroy_channel_cb, h);
802 error);
803 GNUNET_CONTAINER_multihashmap32_iterate (h->channels,
804 &destroy_channel_cb,
805 h);
806 GNUNET_MQ_destroy (h->mq); 774 GNUNET_MQ_destroy (h->mq);
807 h->mq = NULL; 775 h->mq = NULL;
808 GNUNET_assert (NULL == h->reconnect_task); 776 GNUNET_assert (NULL == h->reconnect_task);
809 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, 777 h->reconnect_task =
810 &reconnect_cbk, 778 GNUNET_SCHEDULER_add_delayed (h->reconnect_time, &reconnect_cbk, h);
811 h);
812} 779}
813 780
814 781
@@ -821,32 +788,28 @@ handle_mq_error (void *cls,
821static void 788static void
822reconnect (struct GNUNET_CADET_Handle *h) 789reconnect (struct GNUNET_CADET_Handle *h)
823{ 790{
824 struct GNUNET_MQ_MessageHandler handlers[] = { 791 struct GNUNET_MQ_MessageHandler handlers[] =
825 GNUNET_MQ_hd_fixed_size (channel_created, 792 {GNUNET_MQ_hd_fixed_size (channel_created,
826 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE, 793 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE,
827 struct GNUNET_CADET_LocalChannelCreateMessage, 794 struct GNUNET_CADET_LocalChannelCreateMessage,
828 h), 795 h),
829 GNUNET_MQ_hd_fixed_size (channel_destroy, 796 GNUNET_MQ_hd_fixed_size (channel_destroy,
830 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY, 797 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY,
831 struct GNUNET_CADET_LocalChannelDestroyMessage, 798 struct GNUNET_CADET_LocalChannelDestroyMessage,
832 h), 799 h),
833 GNUNET_MQ_hd_var_size (local_data, 800 GNUNET_MQ_hd_var_size (local_data,
834 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA, 801 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
835 struct GNUNET_CADET_LocalData, 802 struct GNUNET_CADET_LocalData,
836 h), 803 h),
837 GNUNET_MQ_hd_fixed_size (local_ack, 804 GNUNET_MQ_hd_fixed_size (local_ack,
838 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK, 805 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK,
839 struct GNUNET_CADET_LocalAck, 806 struct GNUNET_CADET_LocalAck,
840 h), 807 h),
841 GNUNET_MQ_handler_end () 808 GNUNET_MQ_handler_end ()};
842 };
843 809
844 GNUNET_assert (NULL == h->mq); 810 GNUNET_assert (NULL == h->mq);
845 h->mq = GNUNET_CLIENT_connect (h->cfg, 811 h->mq =
846 "cadet", 812 GNUNET_CLIENT_connect (h->cfg, "cadet", handlers, &handle_mq_error, h);
847 handlers,
848 &handle_mq_error,
849 h);
850} 813}
851 814
852 815
@@ -860,9 +823,7 @@ reconnect (struct GNUNET_CADET_Handle *h)
860 * @return #GNUNET_OK (continue to iterate) 823 * @return #GNUNET_OK (continue to iterate)
861 */ 824 */
862static int 825static int
863destroy_port_cb (void *cls, 826destroy_port_cb (void *cls, const struct GNUNET_HashCode *id, void *value)
864 const struct GNUNET_HashCode *id,
865 void *value)
866{ 827{
867 /* struct GNUNET_CADET_Handle *handle = cls; */ 828 /* struct GNUNET_CADET_Handle *handle = cls; */
868 struct GNUNET_CADET_Port *port = value; 829 struct GNUNET_CADET_Port *port = value;
@@ -920,20 +881,17 @@ GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle)
920void 881void
921GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p) 882GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p)
922{ 883{
923 GNUNET_assert (GNUNET_YES == 884 GNUNET_assert (
924 GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, 885 GNUNET_YES ==
925 &p->id, 886 GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, &p->id, p));
926 p));
927 if (NULL != p->cadet->mq) 887 if (NULL != p->cadet->mq)
928 { 888 {
929 struct GNUNET_CADET_PortMessage *msg; 889 struct GNUNET_CADET_PortMessage *msg;
930 struct GNUNET_MQ_Envelope *env; 890 struct GNUNET_MQ_Envelope *env;
931 891
932 env = GNUNET_MQ_msg (msg, 892 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE);
933 GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE);
934 msg->port = p->id; 893 msg->port = p->id;
935 GNUNET_MQ_send (p->cadet->mq, 894 GNUNET_MQ_send (p->cadet->mq, env);
936 env);
937 } 895 }
938 GNUNET_free_non_null (p->handlers); 896 GNUNET_free_non_null (p->handlers);
939 GNUNET_free (p); 897 GNUNET_free (p);
@@ -958,14 +916,12 @@ GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel)
958 916
959 if (NULL != h->mq) 917 if (NULL != h->mq)
960 { 918 {
961 env = GNUNET_MQ_msg (msg, 919 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
962 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
963 msg->ccn = channel->ccn; 920 msg->ccn = channel->ccn;
964 GNUNET_MQ_send (h->mq, 921 GNUNET_MQ_send (h->mq, env);
965 env);
966 } 922 }
967 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 923 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
968 "Destroying channel due to GNUNET_CADET_channel_destroy()\n"); 924 "Destroying channel due to GNUNET_CADET_channel_destroy()\n");
969 channel->disconnects = NULL; 925 channel->disconnects = NULL;
970 destroy_channel (channel); 926 destroy_channel (channel);
971} 927}
@@ -989,21 +945,21 @@ GNUNET_CADET_channel_get_info (struct GNUNET_CADET_Channel *channel,
989 945
990 switch (option) 946 switch (option)
991 { 947 {
992 case GNUNET_CADET_OPTION_NOBUFFER: 948 case GNUNET_CADET_OPTION_NOBUFFER:
993 case GNUNET_CADET_OPTION_RELIABLE: 949 case GNUNET_CADET_OPTION_RELIABLE:
994 case GNUNET_CADET_OPTION_OUT_OF_ORDER: 950 case GNUNET_CADET_OPTION_OUT_OF_ORDER:
995 if (0 != (option & channel->options)) 951 if (0 != (option & channel->options))
996 bool_flag = GNUNET_YES; 952 bool_flag = GNUNET_YES;
997 else 953 else
998 bool_flag = GNUNET_NO; 954 bool_flag = GNUNET_NO;
999 return (const union GNUNET_CADET_ChannelInfo *) &bool_flag; 955 return (const union GNUNET_CADET_ChannelInfo *) &bool_flag;
1000 break; 956 break;
1001 case GNUNET_CADET_OPTION_PEER: 957 case GNUNET_CADET_OPTION_PEER:
1002 return (const union GNUNET_CADET_ChannelInfo *) &channel->peer; 958 return (const union GNUNET_CADET_ChannelInfo *) &channel->peer;
1003 break; 959 break;
1004 default: 960 default:
1005 GNUNET_break (0); 961 GNUNET_break (0);
1006 return NULL; 962 return NULL;
1007 } 963 }
1008} 964}
1009 965
@@ -1019,14 +975,12 @@ GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel)
1019 struct GNUNET_CADET_LocalAck *msg; 975 struct GNUNET_CADET_LocalAck *msg;
1020 struct GNUNET_MQ_Envelope *env; 976 struct GNUNET_MQ_Envelope *env;
1021 977
1022 env = GNUNET_MQ_msg (msg, 978 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1023 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1024 LOG (GNUNET_ERROR_TYPE_DEBUG, 979 LOG (GNUNET_ERROR_TYPE_DEBUG,
1025 "Sending ACK on channel %X\n", 980 "Sending ACK on channel %X\n",
1026 ntohl (channel->ccn.channel_of_client)); 981 ntohl (channel->ccn.channel_of_client));
1027 msg->ccn = channel->ccn; 982 msg->ccn = channel->ccn;
1028 GNUNET_MQ_send (channel->cadet->mq, 983 GNUNET_MQ_send (channel->cadet->mq, env);
1029 env);
1030} 984}
1031 985
1032 986
@@ -1042,12 +996,10 @@ GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
1042{ 996{
1043 struct GNUNET_CADET_Handle *h; 997 struct GNUNET_CADET_Handle *h;
1044 998
1045 LOG (GNUNET_ERROR_TYPE_DEBUG, 999 LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_CADET_connect()\n");
1046 "GNUNET_CADET_connect()\n");
1047 h = GNUNET_new (struct GNUNET_CADET_Handle); 1000 h = GNUNET_new (struct GNUNET_CADET_Handle);
1048 h->cfg = cfg; 1001 h->cfg = cfg;
1049 h->ports = GNUNET_CONTAINER_multihashmap_create (4, 1002 h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
1050 GNUNET_YES);
1051 h->channels = GNUNET_CONTAINER_multihashmap32_create (4); 1003 h->channels = GNUNET_CONTAINER_multihashmap32_create (4);
1052 reconnect (h); 1004 reconnect (h);
1053 if (NULL == h->mq) 1005 if (NULL == h->mq)
@@ -1077,7 +1029,7 @@ struct GNUNET_CADET_Port *
1077GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h, 1029GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1078 const struct GNUNET_HashCode *port, 1030 const struct GNUNET_HashCode *port,
1079 GNUNET_CADET_ConnectEventHandler connects, 1031 GNUNET_CADET_ConnectEventHandler connects,
1080 void * connects_cls, 1032 void *connects_cls,
1081 GNUNET_CADET_WindowSizeEventHandler window_changes, 1033 GNUNET_CADET_WindowSizeEventHandler window_changes,
1082 GNUNET_CADET_DisconnectEventHandler disconnects, 1034 GNUNET_CADET_DisconnectEventHandler disconnects,
1083 const struct GNUNET_MQ_MessageHandler *handlers) 1035 const struct GNUNET_MQ_MessageHandler *handlers)
@@ -1087,17 +1039,17 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1087 GNUNET_assert (NULL != connects); 1039 GNUNET_assert (NULL != connects);
1088 GNUNET_assert (NULL != disconnects); 1040 GNUNET_assert (NULL != disconnects);
1089 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1041 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1090 "Listening to CADET port %s\n", 1042 "Listening to CADET port %s\n",
1091 GNUNET_h2s (port)); 1043 GNUNET_h2s (port));
1092 1044
1093 p = GNUNET_new (struct GNUNET_CADET_Port); 1045 p = GNUNET_new (struct GNUNET_CADET_Port);
1094 p->cadet = h; 1046 p->cadet = h;
1095 p->id = *port; 1047 p->id = *port;
1096 if (GNUNET_OK != 1048 if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_put (
1097 GNUNET_CONTAINER_multihashmap_put (h->ports, 1049 h->ports,
1098 &p->id, 1050 &p->id,
1099 p, 1051 p,
1100 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) 1052 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1101 { 1053 {
1102 GNUNET_free (p); 1054 GNUNET_free (p);
1103 return NULL; 1055 return NULL;
@@ -1107,11 +1059,8 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1107 p->window_changes = window_changes; 1059 p->window_changes = window_changes;
1108 p->disconnects = disconnects; 1060 p->disconnects = disconnects;
1109 p->handlers = GNUNET_MQ_copy_handlers (handlers); 1061 p->handlers = GNUNET_MQ_copy_handlers (handlers);
1110 1062
1111 GNUNET_assert (GNUNET_OK == 1063 GNUNET_assert (GNUNET_OK == open_port_cb (h, &p->id, p));
1112 open_port_cb (h,
1113 &p->id,
1114 p));
1115 return p; 1064 return p;
1116} 1065}
1117 1066
@@ -1151,11 +1100,10 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1151 1100
1152 GNUNET_assert (NULL != disconnects); 1101 GNUNET_assert (NULL != disconnects);
1153 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1102 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1154 "Creating channel to peer %s at port %s\n", 1103 "Creating channel to peer %s at port %s\n",
1155 GNUNET_i2s (destination), 1104 GNUNET_i2s (destination),
1156 GNUNET_h2s (port)); 1105 GNUNET_h2s (port));
1157 ch = create_channel (h, 1106 ch = create_channel (h, NULL);
1158 NULL);
1159 ch->ctx = channel_cls; 1107 ch->ctx = channel_cls;
1160 ch->peer = *destination; 1108 ch->peer = *destination;
1161 ch->options = options; 1109 ch->options = options;
@@ -1170,18 +1118,15 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1170 handlers, 1118 handlers,
1171 &cadet_mq_error_handler, 1119 &cadet_mq_error_handler,
1172 ch); 1120 ch);
1173 GNUNET_MQ_set_handlers_closure (ch->mq, 1121 GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
1174 channel_cls);
1175 1122
1176 /* Request channel creation to service */ 1123 /* Request channel creation to service */
1177 env = GNUNET_MQ_msg (msg, 1124 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
1178 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
1179 msg->ccn = ch->ccn; 1125 msg->ccn = ch->ccn;
1180 msg->port = *port; 1126 msg->port = *port;
1181 msg->peer = *destination; 1127 msg->peer = *destination;
1182 msg->opt = htonl (options); 1128 msg->opt = htonl (options);
1183 GNUNET_MQ_send (h->mq, 1129 GNUNET_MQ_send (h->mq, env);
1184 env);
1185 return ch; 1130 return ch;
1186} 1131}
1187 1132