aboutsummaryrefslogtreecommitdiff
path: root/src/multicast
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2016-08-03 23:20:37 +0000
committerGabor X Toth <*@tg-x.net>2016-08-03 23:20:37 +0000
commit7d83c569d7ea76376a7bc97db58a4ad912dfad8c (patch)
tree4af9a9a75ae7843922d4a00cf8dae14bc52421cb /src/multicast
parentb2cada934a6811b7fbc2a9d1a9bf84d48a2ba0e1 (diff)
downloadgnunet-7d83c569d7ea76376a7bc97db58a4ad912dfad8c.tar.gz
gnunet-7d83c569d7ea76376a7bc97db58a4ad912dfad8c.zip
multicast: switch to MQ
Diffstat (limited to 'src/multicast')
-rw-r--r--src/multicast/multicast_api.c588
1 files changed, 367 insertions, 221 deletions
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index e390a621c..db0f0e759 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -27,6 +27,7 @@
27 27
28#include "platform.h" 28#include "platform.h"
29#include "gnunet_util_lib.h" 29#include "gnunet_util_lib.h"
30#include "gnunet_mq_lib.h"
30#include "gnunet_multicast_service.h" 31#include "gnunet_multicast_service.h"
31#include "multicast.h" 32#include "multicast.h"
32 33
@@ -73,12 +74,22 @@ struct GNUNET_MULTICAST_Group
73 /** 74 /**
74 * Client connection to the service. 75 * Client connection to the service.
75 */ 76 */
76 struct GNUNET_CLIENT_MANAGER_Connection *client; 77 struct GNUNET_MQ_Handle *mq;
77 78
78 /** 79 /**
79 * Message to send on reconnect. 80 * Time to wait until we try to reconnect on failure.
80 */ 81 */
81 struct GNUNET_MessageHeader *connect_msg; 82 struct GNUNET_TIME_Relative reconnect_backoff;
83
84 /**
85 * Task for reconnecting when the listener fails.
86 */
87 struct GNUNET_SCHEDULER_Task *reconnect_task;
88
89 /**
90 * Message to send on connect.
91 */
92 struct GNUNET_MQ_Envelope *connect_env;
82 93
83 GNUNET_MULTICAST_JoinRequestCallback join_req_cb; 94 GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
84 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; 95 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
@@ -198,31 +209,21 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem);
198 209
199 210
200/** 211/**
201 * Send first message to the service after connecting. 212 * Check join request message.
202 */ 213 */
203static void 214static int
204group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp) 215check_group_join_request (void *cls,
216 const struct MulticastJoinRequestMessage *jreq)
205{ 217{
206 uint16_t cmsg_size = ntohs (grp->connect_msg->size); 218 uint16_t size = ntohs (jreq->header.size);
207 struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size);
208 GNUNET_memcpy (cmsg, grp->connect_msg, cmsg_size);
209 GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
210 GNUNET_free (cmsg);
211}
212 219
220 if (sizeof (*jreq) == size)
221 return GNUNET_OK;
213 222
214/** 223 if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
215 * Got disconnected from service. Reconnect. 224 return GNUNET_OK;
216 */ 225
217static void 226 return GNUNET_SYSERR;
218group_recv_disconnect (void *cls,
219 struct GNUNET_CLIENT_MANAGER_Connection *client,
220 const struct GNUNET_MessageHeader *msg)
221{
222 struct GNUNET_MULTICAST_Group *
223 grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
224 GNUNET_CLIENT_MANAGER_reconnect (client);
225 group_send_connect_msg (grp);
226} 227}
227 228
228 229
@@ -230,16 +231,13 @@ group_recv_disconnect (void *cls,
230 * Receive join request from service. 231 * Receive join request from service.
231 */ 232 */
232static void 233static void
233group_recv_join_request (void *cls, 234handle_group_join_request (void *cls,
234 struct GNUNET_CLIENT_MANAGER_Connection *client, 235 const struct MulticastJoinRequestMessage *jreq)
235 const struct GNUNET_MessageHeader *msg)
236{ 236{
237 struct GNUNET_MULTICAST_Group *grp; 237 struct GNUNET_MULTICAST_Group *grp = cls;
238 const struct MulticastJoinRequestMessage *jreq;
239 struct GNUNET_MULTICAST_JoinHandle *jh; 238 struct GNUNET_MULTICAST_JoinHandle *jh;
240 const struct GNUNET_MessageHeader *jmsg; 239 const struct GNUNET_MessageHeader *jmsg = NULL;
241 240
242 grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
243 if (NULL == grp) 241 if (NULL == grp)
244 { 242 {
245 GNUNET_break (0); 243 GNUNET_break (0);
@@ -247,17 +245,28 @@ group_recv_join_request (void *cls,
247 } 245 }
248 if (NULL == grp->join_req_cb) 246 if (NULL == grp->join_req_cb)
249 return; 247 return;
250 /* FIXME: this fails to check that 'msg' is well-formed! */ 248
251 jreq = (const struct MulticastJoinRequestMessage *) msg;
252 if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size)) 249 if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
253 jmsg = (const struct GNUNET_MessageHeader *) &jreq[1]; 250 jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
254 else 251
255 jmsg = NULL;
256 jh = GNUNET_malloc (sizeof (*jh)); 252 jh = GNUNET_malloc (sizeof (*jh));
257 jh->group = grp; 253 jh->group = grp;
258 jh->member_pub_key = jreq->member_pub_key; 254 jh->member_pub_key = jreq->member_pub_key;
259 jh->peer = jreq->peer; 255 jh->peer = jreq->peer;
260 grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh); 256 grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
257
258 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
259}
260
261
262/**
263 * Check multicast message.
264 */
265static int
266check_group_message (void *cls,
267 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
268{
269 return GNUNET_OK;
261} 270}
262 271
263 272
@@ -265,14 +274,10 @@ group_recv_join_request (void *cls,
265 * Receive multicast message from service. 274 * Receive multicast message from service.
266 */ 275 */
267static void 276static void
268group_recv_message (void *cls, 277handle_group_message (void *cls,
269 struct GNUNET_CLIENT_MANAGER_Connection *client, 278 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
270 const struct GNUNET_MessageHeader *msg)
271{ 279{
272 struct GNUNET_MULTICAST_Group * 280 struct GNUNET_MULTICAST_Group *grp = cls;
273 grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
274 struct GNUNET_MULTICAST_MessageHeader *
275 mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
276 281
277 if (GNUNET_YES == grp->is_disconnecting) 282 if (GNUNET_YES == grp->is_disconnecting)
278 return; 283 return;
@@ -283,6 +288,8 @@ group_recv_message (void *cls,
283 288
284 if (NULL != grp->message_cb) 289 if (NULL != grp->message_cb)
285 grp->message_cb (grp->cb_cls, mmsg); 290 grp->message_cb (grp->cb_cls, mmsg);
291
292 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
286} 293}
287 294
288 295
@@ -290,12 +297,10 @@ group_recv_message (void *cls,
290 * Receive message/request fragment acknowledgement from service. 297 * Receive message/request fragment acknowledgement from service.
291 */ 298 */
292static void 299static void
293group_recv_fragment_ack (void *cls, 300handle_group_fragment_ack (void *cls,
294 struct GNUNET_CLIENT_MANAGER_Connection *client, 301 const struct GNUNET_MessageHeader *msg)
295 const struct GNUNET_MessageHeader *msg)
296{ 302{
297 struct GNUNET_MULTICAST_Group * 303 struct GNUNET_MULTICAST_Group *grp = cls;
298 grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
299 304
300 LOG (GNUNET_ERROR_TYPE_DEBUG, 305 LOG (GNUNET_ERROR_TYPE_DEBUG,
301 "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n", 306 "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
@@ -316,22 +321,32 @@ group_recv_fragment_ack (void *cls,
316 origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp); 321 origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
317 else 322 else
318 member_to_origin ((struct GNUNET_MULTICAST_Member *) grp); 323 member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
324
325 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
326}
327
328
329/**
330 * Check unicast request.
331 */
332static int
333check_origin_request (void *cls,
334 const struct GNUNET_MULTICAST_RequestHeader *req)
335{
336 return GNUNET_OK;
319} 337}
320 338
339
321/** 340/**
322 * Origin receives uniquest request from a member. 341 * Origin receives unicast request from a member.
323 */ 342 */
324static void 343static void
325origin_recv_request (void *cls, 344handle_origin_request (void *cls,
326 struct GNUNET_CLIENT_MANAGER_Connection *client, 345 const struct GNUNET_MULTICAST_RequestHeader *req)
327 const struct GNUNET_MessageHeader *msg)
328{ 346{
329 struct GNUNET_MULTICAST_Group *grp; 347 struct GNUNET_MULTICAST_Group *grp;
330 struct GNUNET_MULTICAST_Origin * 348 struct GNUNET_MULTICAST_Origin *orig = cls;
331 orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
332 grp = &orig->grp; 349 grp = &orig->grp;
333 struct GNUNET_MULTICAST_RequestHeader *
334 req = (struct GNUNET_MULTICAST_RequestHeader *) msg;
335 350
336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 351 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
337 "Calling request callback with a request of size %u.\n", 352 "Calling request callback with a request of size %u.\n",
@@ -339,6 +354,8 @@ origin_recv_request (void *cls,
339 354
340 if (NULL != orig->request_cb) 355 if (NULL != orig->request_cb)
341 orig->request_cb (grp->cb_cls, req); 356 orig->request_cb (grp->cb_cls, req);
357
358 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
342} 359}
343 360
344 361
@@ -346,14 +363,11 @@ origin_recv_request (void *cls,
346 * Receive multicast replay request from service. 363 * Receive multicast replay request from service.
347 */ 364 */
348static void 365static void
349group_recv_replay_request (void *cls, 366handle_group_replay_request (void *cls,
350 struct GNUNET_CLIENT_MANAGER_Connection *client, 367 const struct MulticastReplayRequestMessage *rep)
351 const struct GNUNET_MessageHeader *msg) 368
352{ 369{
353 struct GNUNET_MULTICAST_Group * 370 struct GNUNET_MULTICAST_Group *grp = cls;
354 grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
355 struct MulticastReplayRequestMessage *
356 rep = (struct MulticastReplayRequestMessage *) msg;
357 371
358 if (GNUNET_YES == grp->is_disconnecting) 372 if (GNUNET_YES == grp->is_disconnecting)
359 return; 373 return;
@@ -385,45 +399,72 @@ group_recv_replay_request (void *cls,
385 GNUNET_ntohll (rep->flags), rh); 399 GNUNET_ntohll (rep->flags), rh);
386 } 400 }
387 } 401 }
402
403 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
388} 404}
389 405
390 406
391/** 407/**
392 * Receive multicast replay request from service. 408 * Check replay response.
409 */
410static int
411check_member_replay_response (void *cls,
412 const struct MulticastReplayResponseMessage *res)
413{
414 uint16_t size = ntohs (res->header.size);
415
416 if (sizeof (*res) == size)
417 return GNUNET_OK;
418
419 if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
420 return GNUNET_OK;
421
422 return GNUNET_SYSERR;
423}
424
425
426/**
427 * Receive replay response from service.
393 */ 428 */
394static void 429static void
395member_recv_replay_response (void *cls, 430handle_member_replay_response (void *cls,
396 struct GNUNET_CLIENT_MANAGER_Connection *client, 431 const struct MulticastReplayResponseMessage *res)
397 const struct GNUNET_MessageHeader *msg)
398{ 432{
399 struct GNUNET_MULTICAST_Group *grp; 433 struct GNUNET_MULTICAST_Group *grp;
400 struct GNUNET_MULTICAST_Member * 434 struct GNUNET_MULTICAST_Member *mem = cls;
401 mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
402 grp = &mem->grp; 435 grp = &mem->grp;
403 // FIXME: Something is missing here for the code to make sense 436
404 //struct MulticastReplayResponseMessage *
405 // res = (struct MulticastReplayResponseMessage *) msg;
406 if (GNUNET_YES == grp->is_disconnecting) 437 if (GNUNET_YES == grp->is_disconnecting)
407 return; 438 return;
408 439
409 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n"); 440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
441
442 // FIXME: return result
443}
444
445
446/**
447 * Check join decision.
448 */
449static int
450check_member_join_decision (void *cls,
451 const struct MulticastJoinDecisionMessageHeader *hdcsn)
452{
453 return GNUNET_OK; // checked in handle below
410} 454}
411 455
456
412/** 457/**
413 * Member receives join decision. 458 * Member receives join decision.
414 */ 459 */
415static void 460static void
416member_recv_join_decision (void *cls, 461handle_member_join_decision (void *cls,
417 struct GNUNET_CLIENT_MANAGER_Connection *client, 462 const struct MulticastJoinDecisionMessageHeader *hdcsn)
418 const struct GNUNET_MessageHeader *msg)
419{ 463{
420 struct GNUNET_MULTICAST_Group *grp; 464 struct GNUNET_MULTICAST_Group *grp;
421 struct GNUNET_MULTICAST_Member * 465 struct GNUNET_MULTICAST_Member *mem = cls;
422 mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
423 grp = &mem->grp; 466 grp = &mem->grp;
424 467
425 const struct MulticastJoinDecisionMessageHeader *
426 hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg;
427 const struct MulticastJoinDecisionMessage * 468 const struct MulticastJoinDecisionMessage *
428 dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; 469 dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
429 470
@@ -474,79 +515,15 @@ member_recv_join_decision (void *cls,
474 // FIXME: 515 // FIXME:
475 //if (GNUNET_YES != is_admitted) 516 //if (GNUNET_YES != is_admitted)
476 // GNUNET_MULTICAST_member_part (mem); 517 // GNUNET_MULTICAST_member_part (mem);
477}
478
479 518
480/** 519 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
481 * Message handlers for an origin. 520}
482 */
483static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
484{
485 { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
486
487 { group_recv_message, NULL,
488 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
489 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
490
491 { origin_recv_request, NULL,
492 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
493 sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
494
495 { group_recv_fragment_ack, NULL,
496 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
497 sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
498
499 { group_recv_join_request, NULL,
500 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
501 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
502
503 { group_recv_replay_request, NULL,
504 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
505 sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
506
507 { NULL, NULL, 0, 0, GNUNET_NO }
508};
509
510
511/**
512 * Message handlers for a member.
513 */
514static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
515{
516 { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
517
518 { group_recv_message, NULL,
519 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
520 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
521
522 { group_recv_fragment_ack, NULL,
523 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
524 sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
525
526 { group_recv_join_request, NULL,
527 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
528 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
529
530 { member_recv_join_decision, NULL,
531 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
532 sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
533
534 { group_recv_replay_request, NULL,
535 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
536 sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
537
538 { member_recv_replay_response, NULL,
539 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
540 sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
541
542 { NULL, NULL, 0, 0, GNUNET_NO }
543};
544 521
545 522
546static void 523static void
547group_cleanup (struct GNUNET_MULTICAST_Group *grp) 524group_cleanup (struct GNUNET_MULTICAST_Group *grp)
548{ 525{
549 GNUNET_free (grp->connect_msg); 526 GNUNET_free (grp->connect_env);
550 if (NULL != grp->disconnect_cb) 527 if (NULL != grp->disconnect_cb)
551 grp->disconnect_cb (grp->disconnect_cls); 528 grp->disconnect_cb (grp->disconnect_cls);
552} 529}
@@ -609,13 +586,11 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
609 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; 586 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
610 uint16_t relay_size = relay_count * sizeof (*relays); 587 uint16_t relay_size = relay_count * sizeof (*relays);
611 588
612 struct MulticastJoinDecisionMessageHeader * hdcsn; 589 struct MulticastJoinDecisionMessageHeader *hdcsn;
613 struct MulticastJoinDecisionMessage *dcsn; 590 struct MulticastJoinDecisionMessage *dcsn;
614 hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn) 591 struct GNUNET_MQ_Envelope *
615 + relay_size + join_resp_size); 592 env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
616 hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn) 593 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
617 + relay_size + join_resp_size);
618 hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
619 hdcsn->member_pub_key = join->member_pub_key; 594 hdcsn->member_pub_key = join->member_pub_key;
620 hdcsn->peer = join->peer; 595 hdcsn->peer = join->peer;
621 596
@@ -629,8 +604,7 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
629 if (0 < join_resp_size) 604 if (0 < join_resp_size)
630 GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); 605 GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
631 606
632 GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header); 607 GNUNET_MQ_send (grp->mq, env);
633 GNUNET_free (hdcsn);
634 GNUNET_free (join); 608 GNUNET_free (join);
635 return NULL; 609 return NULL;
636} 610}
@@ -653,19 +627,15 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
653 enum GNUNET_MULTICAST_ReplayErrorCode ec) 627 enum GNUNET_MULTICAST_ReplayErrorCode ec)
654{ 628{
655 uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0; 629 uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
656 struct MulticastReplayResponseMessage * 630 struct MulticastReplayResponseMessage *res;
657 res = GNUNET_malloc (sizeof (*res) + msg_size); 631 struct GNUNET_MQ_Envelope *
658 *res = (struct MulticastReplayResponseMessage) { 632 env = GNUNET_MQ_msg_extra (res, msg_size,
659 .header = { 633 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
660 .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE), 634 res->fragment_id = rh->req.fragment_id;
661 .size = htons (sizeof (*res) + msg_size), 635 res->message_id = rh->req.message_id;
662 }, 636 res->fragment_offset = rh->req.fragment_offset;
663 .fragment_id = rh->req.fragment_id, 637 res->flags = rh->req.flags;
664 .message_id = rh->req.message_id, 638 res->error_code = htonl (ec);
665 .fragment_offset = rh->req.fragment_offset,
666 .flags = rh->req.flags,
667 .error_code = htonl (ec),
668 };
669 639
670 if (GNUNET_MULTICAST_REC_OK == ec) 640 if (GNUNET_MULTICAST_REC_OK == ec)
671 { 641 {
@@ -673,8 +643,7 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
673 GNUNET_memcpy (&res[1], msg, msg_size); 643 GNUNET_memcpy (&res[1], msg, msg_size);
674 } 644 }
675 645
676 GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &res->header); 646 GNUNET_MQ_send (rh->grp->mq, env);
677 GNUNET_free (res);
678 647
679 if (GNUNET_MULTICAST_REC_OK != ec) 648 if (GNUNET_MULTICAST_REC_OK != ec)
680 GNUNET_free (rh); 649 GNUNET_free (rh);
@@ -692,18 +661,16 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
692void 661void
693GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh) 662GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
694{ 663{
695 struct MulticastReplayResponseMessage end = { 664 struct MulticastReplayResponseMessage *end;
696 .header = { 665 struct GNUNET_MQ_Envelope *
697 .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END), 666 env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
698 .size = htons (sizeof (end)), 667
699 }, 668 end->fragment_id = rh->req.fragment_id;
700 .fragment_id = rh->req.fragment_id, 669 end->message_id = rh->req.message_id;
701 .message_id = rh->req.message_id, 670 end->fragment_offset = rh->req.fragment_offset;
702 .fragment_offset = rh->req.fragment_offset, 671 end->flags = rh->req.flags;
703 .flags = rh->req.flags,
704 };
705 672
706 GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &end.header); 673 GNUNET_MQ_send (rh->grp->mq, env);
707 GNUNET_free (rh); 674 GNUNET_free (rh);
708} 675}
709 676
@@ -726,6 +693,92 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
726} 693}
727 694
728 695
696void
697origin_connect (struct GNUNET_MULTICAST_Origin *orig);
698
699
700static void
701origin_reconnect (void *cls)
702{
703 origin_connect (cls);
704}
705
706
707/**
708 * Origin client disconnected from service.
709 *
710 * Reconnect after backoff period.=
711 */
712void
713origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
714{
715 struct GNUNET_MULTICAST_Origin *orig = cls;
716 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
717
718 LOG (GNUNET_ERROR_TYPE_DEBUG,
719 "Origin client disconnected (%d), re-connecting\n",
720 (int) error);
721 if (NULL != grp->mq)
722 {
723 GNUNET_MQ_destroy (grp->mq);
724 grp->mq = NULL;
725 }
726
727 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff,
728 &origin_reconnect,
729 orig);
730 grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff);
731}
732
733
734/**
735 * Connect to service as origin.
736 */
737void
738origin_connect (struct GNUNET_MULTICAST_Origin *orig)
739{
740 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
741
742 GNUNET_MQ_hd_var_size (group_message,
743 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
744 struct GNUNET_MULTICAST_MessageHeader);
745
746 GNUNET_MQ_hd_var_size (origin_request,
747 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
748 struct GNUNET_MULTICAST_RequestHeader);
749
750 GNUNET_MQ_hd_fixed_size (group_fragment_ack,
751 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
752 struct GNUNET_MessageHeader);
753
754 GNUNET_MQ_hd_var_size (group_join_request,
755 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
756 struct MulticastJoinRequestMessage);
757
758 GNUNET_MQ_hd_fixed_size (group_replay_request,
759 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
760 struct MulticastReplayRequestMessage);
761
762 struct GNUNET_MQ_MessageHandler handlers[] = {
763 make_group_message_handler (grp),
764 make_origin_request_handler (orig),
765 make_group_fragment_ack_handler (grp),
766 make_group_join_request_handler (grp),
767 make_group_replay_request_handler (grp),
768 GNUNET_MQ_handler_end ()
769 };
770
771 grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
772 handlers, origin_disconnected, orig);
773 if (NULL == grp->mq)
774 {
775 GNUNET_break (0);
776 return;
777 }
778 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
779}
780
781
729/** 782/**
730 * Start a multicast group. 783 * Start a multicast group.
731 * 784 *
@@ -776,14 +829,13 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
776{ 829{
777 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig)); 830 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
778 struct GNUNET_MULTICAST_Group *grp = &orig->grp; 831 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
779 struct MulticastOriginStartMessage *start = GNUNET_malloc (sizeof (*start));
780 832
781 start->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START); 833 struct MulticastOriginStartMessage *start;
782 start->header.size = htons (sizeof (*start)); 834 grp->connect_env = GNUNET_MQ_msg (start,
835 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
783 start->max_fragment_id = max_fragment_id; 836 start->max_fragment_id = max_fragment_id;
784 GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key)); 837 GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key));
785 838
786 grp->connect_msg = (struct GNUNET_MessageHeader *) start;
787 grp->is_origin = GNUNET_YES; 839 grp->is_origin = GNUNET_YES;
788 grp->cfg = cfg; 840 grp->cfg = cfg;
789 841
@@ -795,10 +847,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
795 847
796 orig->request_cb = request_cb; 848 orig->request_cb = request_cb;
797 849
798 grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", origin_handlers); 850 origin_connect (orig);
799 GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp));
800 group_send_connect_msg (grp);
801
802 return orig; 851 return orig;
803} 852}
804 853
@@ -820,8 +869,13 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
820 grp->disconnect_cb = stop_cb; 869 grp->disconnect_cb = stop_cb;
821 grp->disconnect_cls = stop_cls; 870 grp->disconnect_cls = stop_cls;
822 871
823 GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES, 872 // FIXME: wait till queued messages are sent
824 &origin_cleanup, orig); 873 if (NULL != grp->mq)
874 {
875 GNUNET_MQ_destroy (grp->mq);
876 grp->mq = NULL;
877 }
878 origin_cleanup (orig);
825} 879}
826 880
827 881
@@ -834,7 +888,11 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
834 GNUNET_assert (GNUNET_YES == grp->in_transmit); 888 GNUNET_assert (GNUNET_YES == grp->in_transmit);
835 889
836 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; 890 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
837 struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size); 891 struct GNUNET_MULTICAST_MessageHeader *msg;
892 struct GNUNET_MQ_Envelope *
893 env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
894 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
895
838 int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]); 896 int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
839 897
840 if (! (GNUNET_YES == ret || GNUNET_NO == ret) 898 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
@@ -844,7 +902,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
844 "%p OriginTransmitNotify() returned error or invalid message size.\n", 902 "%p OriginTransmitNotify() returned error or invalid message size.\n",
845 orig); 903 orig);
846 /* FIXME: handle error */ 904 /* FIXME: handle error */
847 GNUNET_free (msg); 905 GNUNET_free (env);
848 return; 906 return;
849 } 907 }
850 908
@@ -852,11 +910,10 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
852 { 910 {
853 LOG (GNUNET_ERROR_TYPE_DEBUG, 911 LOG (GNUNET_ERROR_TYPE_DEBUG,
854 "%p OriginTransmitNotify() - transmission paused.\n", orig); 912 "%p OriginTransmitNotify() - transmission paused.\n", orig);
855 GNUNET_free (msg); 913 GNUNET_free (env);
856 return; /* Transmission paused. */ 914 return; /* Transmission paused. */
857 } 915 }
858 916
859 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
860 msg->header.size = htons (sizeof (*msg) + buf_size); 917 msg->header.size = htons (sizeof (*msg) + buf_size);
861 msg->message_id = GNUNET_htonll (tmit->message_id); 918 msg->message_id = GNUNET_htonll (tmit->message_id);
862 msg->group_generation = tmit->group_generation; 919 msg->group_generation = tmit->group_generation;
@@ -864,8 +921,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
864 tmit->fragment_offset += sizeof (*msg) + buf_size; 921 tmit->fragment_offset += sizeof (*msg) + buf_size;
865 922
866 grp->acks_pending++; 923 grp->acks_pending++;
867 GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header); 924 GNUNET_MQ_send (grp->mq, env);
868 GNUNET_free (msg);
869 925
870 if (GNUNET_YES == ret) 926 if (GNUNET_YES == ret)
871 grp->in_transmit = GNUNET_NO; 927 grp->in_transmit = GNUNET_NO;
@@ -944,6 +1000,94 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan
944} 1000}
945 1001
946 1002
1003 void
1004member_connect (struct GNUNET_MULTICAST_Member *mem);
1005
1006
1007static void
1008member_reconnect (void *cls)
1009{
1010 member_connect (cls);
1011}
1012
1013
1014/**
1015 * Member client disconnected from service.
1016 *
1017 * Reconnect after backoff period.
1018 */
1019void
1020member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1021{
1022 struct GNUNET_MULTICAST_Member *mem = cls;
1023 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1024
1025 LOG (GNUNET_ERROR_TYPE_DEBUG,
1026 "Member client disconnected (%d), re-connecting\n",
1027 (int) error);
1028 GNUNET_MQ_destroy (grp->mq);
1029 grp->mq = NULL;
1030
1031 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff,
1032 &member_reconnect,
1033 mem);
1034 grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff);
1035}
1036
1037
1038/**
1039 * Connect to service as member.
1040 */
1041void
1042member_connect (struct GNUNET_MULTICAST_Member *mem)
1043{
1044 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1045
1046 GNUNET_MQ_hd_var_size (group_message,
1047 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1048 struct GNUNET_MULTICAST_MessageHeader);
1049
1050 GNUNET_MQ_hd_fixed_size (group_fragment_ack,
1051 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
1052 struct GNUNET_MessageHeader);
1053
1054 GNUNET_MQ_hd_var_size (group_join_request,
1055 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1056 struct MulticastJoinRequestMessage);
1057
1058 GNUNET_MQ_hd_var_size (member_join_decision,
1059 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1060 struct MulticastJoinDecisionMessageHeader);
1061
1062 GNUNET_MQ_hd_fixed_size (group_replay_request,
1063 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1064 struct MulticastReplayRequestMessage);
1065
1066 GNUNET_MQ_hd_var_size (member_replay_response,
1067 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1068 struct MulticastReplayResponseMessage);
1069
1070 struct GNUNET_MQ_MessageHandler handlers[] = {
1071 make_group_message_handler (grp),
1072 make_group_fragment_ack_handler (grp),
1073 make_group_join_request_handler (grp),
1074 make_member_join_decision_handler (mem),
1075 make_group_replay_request_handler (grp),
1076 make_member_replay_response_handler (mem),
1077 GNUNET_MQ_handler_end ()
1078 };
1079
1080 grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
1081 handlers, member_disconnected, mem);
1082 if (NULL == grp->mq)
1083 {
1084 GNUNET_break (0);
1085 return;
1086 }
1087 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
1088}
1089
1090
947/** 1091/**
948 * Join a multicast group. 1092 * Join a multicast group.
949 * 1093 *
@@ -1015,10 +1159,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1015 1159
1016 uint16_t relay_size = relay_count * sizeof (*relays); 1160 uint16_t relay_size = relay_count * sizeof (*relays);
1017 uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0; 1161 uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
1018 struct MulticastMemberJoinMessage * 1162 struct MulticastMemberJoinMessage *join;
1019 join = GNUNET_malloc (sizeof (*join) + relay_size + join_msg_size); 1163 grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
1020 join->header.size = htons (sizeof (*join) + relay_size + join_msg_size); 1164 GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
1021 join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
1022 join->group_pub_key = *group_pub_key; 1165 join->group_pub_key = *group_pub_key;
1023 join->member_key = *member_key; 1166 join->member_key = *member_key;
1024 join->origin = *origin; 1167 join->origin = *origin;
@@ -1028,7 +1171,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1028 if (0 < join_msg_size) 1171 if (0 < join_msg_size)
1029 GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); 1172 GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1030 1173
1031 grp->connect_msg = (struct GNUNET_MessageHeader *) join; 1174 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1032 grp->is_origin = GNUNET_NO; 1175 grp->is_origin = GNUNET_NO;
1033 grp->cfg = cfg; 1176 grp->cfg = cfg;
1034 1177
@@ -1039,10 +1182,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1039 grp->message_cb = message_cb; 1182 grp->message_cb = message_cb;
1040 grp->cb_cls = cls; 1183 grp->cb_cls = cls;
1041 1184
1042 grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", member_handlers); 1185 member_connect (mem);
1043 GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp));
1044 group_send_connect_msg (grp);
1045
1046 return mem; 1186 return mem;
1047} 1187}
1048 1188
@@ -1076,8 +1216,13 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
1076 grp->replay_msg_cb = NULL; 1216 grp->replay_msg_cb = NULL;
1077 grp->replay_frag_cb = NULL; 1217 grp->replay_frag_cb = NULL;
1078 1218
1079 GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES, 1219 // FIXME: wait till queued messages are sent
1080 member_cleanup, mem); 1220 if (NULL != grp->mq)
1221 {
1222 GNUNET_MQ_destroy (grp->mq);
1223 grp->mq = NULL;
1224 }
1225 member_cleanup (mem);
1081} 1226}
1082 1227
1083 1228
@@ -1088,17 +1233,16 @@ member_replay_request (struct GNUNET_MULTICAST_Member *mem,
1088 uint64_t fragment_offset, 1233 uint64_t fragment_offset,
1089 uint64_t flags) 1234 uint64_t flags)
1090{ 1235{
1091 struct MulticastReplayRequestMessage rep = { 1236 struct MulticastReplayRequestMessage *rep;
1092 .header = { 1237 struct GNUNET_MQ_Envelope *
1093 .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST), 1238 env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
1094 .size = htons (sizeof (rep)), 1239
1095 }, 1240 rep->fragment_id = GNUNET_htonll (fragment_id);
1096 .fragment_id = GNUNET_htonll (fragment_id), 1241 rep->message_id = GNUNET_htonll (message_id);
1097 .message_id = GNUNET_htonll (message_id), 1242 rep->fragment_offset = GNUNET_htonll (fragment_offset);
1098 .fragment_offset = GNUNET_htonll (fragment_offset), 1243 rep->flags = GNUNET_htonll (flags);
1099 .flags = GNUNET_htonll (flags), 1244
1100 }; 1245 GNUNET_MQ_send (mem->grp.mq, env);
1101 GNUNET_CLIENT_MANAGER_transmit (mem->grp.client, &rep.header);
1102} 1246}
1103 1247
1104 1248
@@ -1168,7 +1312,11 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1168 GNUNET_assert (GNUNET_YES == grp->in_transmit); 1312 GNUNET_assert (GNUNET_YES == grp->in_transmit);
1169 1313
1170 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; 1314 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1171 struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size); 1315 struct GNUNET_MULTICAST_RequestHeader *req;
1316 struct GNUNET_MQ_Envelope *
1317 env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
1318 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1319
1172 int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]); 1320 int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
1173 1321
1174 if (! (GNUNET_YES == ret || GNUNET_NO == ret) 1322 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
@@ -1189,14 +1337,12 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1189 return; 1337 return;
1190 } 1338 }
1191 1339
1192 req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1193 req->header.size = htons (sizeof (*req) + buf_size); 1340 req->header.size = htons (sizeof (*req) + buf_size);
1194 req->request_id = GNUNET_htonll (tmit->request_id); 1341 req->request_id = GNUNET_htonll (tmit->request_id);
1195 req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset); 1342 req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
1196 tmit->fragment_offset += sizeof (*req) + buf_size; 1343 tmit->fragment_offset += sizeof (*req) + buf_size;
1197 1344
1198 GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header); 1345 GNUNET_MQ_send (grp->mq, env);
1199 GNUNET_free (req);
1200 1346
1201 if (GNUNET_YES == ret) 1347 if (GNUNET_YES == ret)
1202 grp->in_transmit = GNUNET_NO; 1348 grp->in_transmit = GNUNET_NO;