aboutsummaryrefslogtreecommitdiff
path: root/src/multicast
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-05-29 16:35:55 +0000
committerGabor X Toth <*@tg-x.net>2014-05-29 16:35:55 +0000
commit9955561e1b204ccf23fbf841f409bd3ef79be88c (patch)
tree0271c23ae9f1dad72266a0e6073d696e5afca027 /src/multicast
parenta5877668ba805c5e0efe622e6ce4c58ff5609bf9 (diff)
downloadgnunet-9955561e1b204ccf23fbf841f409bd3ef79be88c.tar.gz
gnunet-9955561e1b204ccf23fbf841f409bd3ef79be88c.zip
psyc, multicast: reorg code, use new client manager & psyc util lib
Diffstat (limited to 'src/multicast')
-rw-r--r--src/multicast/multicast_api.c614
1 files changed, 154 insertions, 460 deletions
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index 36d564d52..b6d51896d 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -24,6 +24,7 @@
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * @author Gabor X Toth 25 * @author Gabor X Toth
26 */ 26 */
27
27#include "platform.h" 28#include "platform.h"
28#include "gnunet_util_lib.h" 29#include "gnunet_util_lib.h"
29#include "gnunet_multicast_service.h" 30#include "gnunet_multicast_service.h"
@@ -33,26 +34,6 @@
33 34
34 35
35/** 36/**
36 * Started origins.
37 * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin
38 */
39static struct GNUNET_CONTAINER_MultiHashMap *origins;
40
41/**
42 * Joined members.
43 * group_key_hash -> struct GNUNET_MULTICAST_Member
44 */
45static struct GNUNET_CONTAINER_MultiHashMap *members;
46
47
48struct MessageQueue
49{
50 struct MessageQueue *prev;
51 struct MessageQueue *next;
52};
53
54
55/**
56 * Handle for a request to send a message to all multicast group members 37 * Handle for a request to send a message to all multicast group members
57 * (from the origin). 38 * (from the origin).
58 */ 39 */
@@ -90,47 +71,14 @@ struct GNUNET_MULTICAST_Group
90 const struct GNUNET_CONFIGURATION_Handle *cfg; 71 const struct GNUNET_CONFIGURATION_Handle *cfg;
91 72
92 /** 73 /**
93 * Socket (if available). 74 * Client connection to the service.
94 */ 75 */
95 struct GNUNET_CLIENT_Connection *client; 76 struct GNUNET_CLIENT_MANAGER_Connection *client;
96
97 /**
98 * Currently pending transmission request, or NULL for none.
99 */
100 struct GNUNET_CLIENT_TransmitHandle *th;
101
102 /**
103 * Head of operations to transmit.
104 */
105 struct MessageQueue *tmit_head;
106
107 /**
108 * Tail of operations to transmit.
109 */
110 struct MessageQueue *tmit_tail;
111
112 /**
113 * Message being transmitted to the Multicast service.
114 */
115 struct MessageQueue *tmit_msg;
116 77
117 /** 78 /**
118 * Message to send on reconnect. 79 * Message to send on reconnect.
119 */ 80 */
120 struct GNUNET_MessageHeader *reconnect_msg; 81 struct GNUNET_MessageHeader *connect_msg;
121
122 /**
123 * Task doing exponential back-off trying to reconnect.
124 */
125 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
126
127 /**
128 * Time for next connect retry.
129 */
130 struct GNUNET_TIME_Relative reconnect_delay;
131
132 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
133 struct GNUNET_HashCode pub_key_hash;
134 82
135 GNUNET_MULTICAST_JoinRequestCallback join_req_cb; 83 GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
136 GNUNET_MULTICAST_MembershipTestCallback member_test_cb; 84 GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
@@ -140,11 +88,6 @@ struct GNUNET_MULTICAST_Group
140 void *cb_cls; 88 void *cb_cls;
141 89
142 /** 90 /**
143 * Are we polling for incoming messages right now?
144 */
145 uint8_t in_receive;
146
147 /**
148 * Are we currently transmitting a message? 91 * Are we currently transmitting a message?
149 */ 92 */
150 uint8_t in_transmit; 93 uint8_t in_transmit;
@@ -163,7 +106,6 @@ struct GNUNET_MULTICAST_Origin
163{ 106{
164 struct GNUNET_MULTICAST_Group grp; 107 struct GNUNET_MULTICAST_Group grp;
165 struct GNUNET_MULTICAST_OriginTransmitHandle tmit; 108 struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
166 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
167 109
168 GNUNET_MULTICAST_RequestCallback request_cb; 110 GNUNET_MULTICAST_RequestCallback request_cb;
169}; 111};
@@ -229,294 +171,125 @@ struct GNUNET_MULTICAST_MemberReplayHandle
229}; 171};
230 172
231 173
232static void
233reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
234
235
236static void
237reschedule_connect (struct GNUNET_MULTICAST_Group *grp);
238
239
240/** 174/**
241 * Schedule transmission of the next message from our queue. 175 * Send first message to the service after connecting.
242 *
243 * @param grp PSYC channel handle
244 */ 176 */
245static void 177static void
246transmit_next (struct GNUNET_MULTICAST_Group *grp); 178group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp)
247
248
249static void
250message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
251
252
253/**
254 * Reschedule a connect attempt to the service.
255 *
256 * @param c channel to reconnect
257 */
258static void
259reschedule_connect (struct GNUNET_MULTICAST_Group *grp)
260{ 179{
261 GNUNET_assert (grp->reconnect_task == GNUNET_SCHEDULER_NO_TASK); 180 uint16_t cmsg_size = ntohs (grp->connect_msg->size);
262 181 struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
263 if (NULL != grp->th) 182 memcpy (cmsg, grp->connect_msg, cmsg_size);
264 { 183 GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
265 GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
266 grp->th = NULL;
267 }
268 if (NULL != grp->client)
269 {
270 GNUNET_CLIENT_disconnect (grp->client);
271 grp->client = NULL;
272 }
273 grp->in_receive = GNUNET_NO;
274 LOG (GNUNET_ERROR_TYPE_DEBUG,
275 "Scheduling task to reconnect to Multicast service in %s.\n",
276 GNUNET_STRINGS_relative_time_to_string (grp->reconnect_delay, GNUNET_YES));
277 grp->reconnect_task =
278 GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &reconnect, grp);
279 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
280} 184}
281 185
282 186
283/** 187/**
284 * Reset stored data related to the last received message. 188 * Got disconnected from service. Reconnect.
285 */ 189 */
286static void 190static void
287recv_reset (struct GNUNET_MULTICAST_Group *grp) 191group_recv_disconnect (void *cls,
288{ 192 struct GNUNET_CLIENT_MANAGER_Connection *client,
289} 193 const struct GNUNET_MessageHeader *msg)
290
291
292static void
293recv_error (struct GNUNET_MULTICAST_Group *grp)
294{
295 if (NULL != grp->message_cb)
296 grp->message_cb (grp->cb_cls, NULL);
297
298 recv_reset (grp);
299}
300
301
302/**
303 * Transmit next message to service.
304 *
305 * @param cls The struct GNUNET_MULTICAST_Group.
306 * @param size Number of bytes available in @a buf.
307 * @param buf Where to copy the message.
308 *
309 * @return Number of bytes copied to @a buf.
310 */
311static size_t
312send_next_message (void *cls, size_t size, void *buf)
313{ 194{
314 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); 195 struct GNUNET_MULTICAST_Group *
315 struct GNUNET_MULTICAST_Group *grp = cls; 196 grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
316 struct MessageQueue *mq = grp->tmit_head; 197 GNUNET_CLIENT_MANAGER_reconnect (client);
317 if (NULL == mq) 198 group_send_connect_msg (grp);
318 return 0;
319 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
320 size_t ret = ntohs (qmsg->size);
321 grp->th = NULL;
322 if (ret > size)
323 {
324 reschedule_connect (grp);
325 return 0;
326 }
327 memcpy (buf, qmsg, ret);
328
329 GNUNET_CONTAINER_DLL_remove (grp->tmit_head, grp->tmit_tail, mq);
330 GNUNET_free (mq);
331
332 if (NULL != grp->tmit_head)
333 transmit_next (grp);
334
335 if (GNUNET_NO == grp->in_receive)
336 {
337 grp->in_receive = GNUNET_YES;
338 GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
339 GNUNET_TIME_UNIT_FOREVER_REL);
340 }
341 return ret;
342} 199}
343 200
344 201
345/** 202/**
346 * Schedule transmission of the next message from our queue. 203 * Receive join request from service.
347 *
348 * @param grp Multicast group handle.
349 */ 204 */
350static void 205static void
351transmit_next (struct GNUNET_MULTICAST_Group *grp) 206group_recv_join_request (void *cls,
207 struct GNUNET_CLIENT_MANAGER_Connection *client,
208 const struct GNUNET_MessageHeader *msg)
352{ 209{
353 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); 210 struct GNUNET_MULTICAST_Group *
354 if (NULL != grp->th || NULL == grp->client) 211 grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
355 return;
356
357 struct MessageQueue *mq = grp->tmit_head;
358 if (NULL == mq)
359 return;
360 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
361
362 grp->th = GNUNET_CLIENT_notify_transmit_ready (grp->client,
363 ntohs (qmsg->size),
364 GNUNET_TIME_UNIT_FOREVER_REL,
365 GNUNET_NO,
366 &send_next_message,
367 grp);
368}
369 212
213 const struct MulticastJoinRequestMessage *
214 jreq = (const struct MulticastJoinRequestMessage *) msg;
370 215
371/** 216 struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
372 * Try again to connect to the Multicast service. 217 jh->group = grp;
373 * 218 jh->member_key = jreq->member_key;
374 * @param cls Channel handle. 219 jh->member_peer = jreq->member_peer;
375 * @param tc Scheduler context.
376 */
377static void
378reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
379{
380 struct GNUNET_MULTICAST_Group *grp = cls;
381
382 recv_reset (grp);
383 grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
384 LOG (GNUNET_ERROR_TYPE_DEBUG,
385 "Connecting to Multicast service.\n");
386 GNUNET_assert (NULL == grp->client);
387 grp->client = GNUNET_CLIENT_connect ("multicast", grp->cfg);
388 GNUNET_assert (NULL != grp->client);
389 uint16_t reconn_size = ntohs (grp->reconnect_msg->size);
390
391 if (NULL == grp->tmit_head ||
392 0 != memcmp (&grp->tmit_head[1], grp->reconnect_msg, reconn_size))
393 {
394 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
395 memcpy (&mq[1], grp->reconnect_msg, reconn_size);
396 GNUNET_CONTAINER_DLL_insert (grp->tmit_head, grp->tmit_tail, mq);
397 }
398 transmit_next (grp);
399}
400 220
221 const struct GNUNET_MessageHeader *jmsg = NULL;
222 if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
223 jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
401 224
402/** 225 if (NULL != grp->join_req_cb)
403 * Disconnect from the Multicast service. 226 grp->join_req_cb (grp->cb_cls, &jreq->member_key, jmsg, jh);
404 *
405 * @param g Group handle to disconnect.
406 */
407static void
408disconnect (void *g)
409{
410 struct GNUNET_MULTICAST_Group *grp = g;
411
412 GNUNET_assert (NULL != grp);
413 if (grp->tmit_head != grp->tmit_tail)
414 {
415 LOG (GNUNET_ERROR_TYPE_ERROR,
416 "Disconnecting while there are still outstanding messages!\n");
417 GNUNET_break (0);
418 }
419 if (grp->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
420 {
421 GNUNET_SCHEDULER_cancel (grp->reconnect_task);
422 grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
423 }
424 if (NULL != grp->th)
425 {
426 GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
427 grp->th = NULL;
428 }
429 if (NULL != grp->client)
430 {
431 GNUNET_CLIENT_disconnect (grp->client);
432 grp->client = NULL;
433 }
434 if (NULL != grp->reconnect_msg)
435 {
436 GNUNET_free (grp->reconnect_msg);
437 grp->reconnect_msg = NULL;
438 }
439} 227}
440 228
441 229
442/** 230/**
443 * Iterator callback for calling message callbacks for all groups. 231 * Receive multicast message from service.
444 */ 232 */
445static int 233static void
446message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group) 234group_recv_message (void *cls,
235 struct GNUNET_CLIENT_MANAGER_Connection *client,
236 const struct GNUNET_MessageHeader *msg)
447{ 237{
448 const struct GNUNET_MessageHeader *msg = cls; 238 struct GNUNET_MULTICAST_Group *
449 struct GNUNET_MULTICAST_Group *grp = group; 239 grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
240 struct GNUNET_MULTICAST_MessageHeader *
241 mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
450 242
451 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
452 "Calling message callback with a message " 244 "Calling message callback with a message of size %u.\n",
453 "of type %u and size %u.\n", 245 ntohs (mmsg->header.size));
454 ntohs (msg->type), ntohs (msg->size));
455 246
456 if (NULL != grp->message_cb) 247 if (NULL != grp->message_cb)
457 grp->message_cb (grp->cb_cls, msg); 248 grp->message_cb (grp->cb_cls, mmsg);
458
459 return GNUNET_YES;
460} 249}
461 250
462 251
463/** 252/**
464 * Iterator callback for calling request callbacks of origins. 253 * Origin receives uniquest request from a member.
465 */ 254 */
466static int 255static void
467request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin) 256origin_recv_request (void *cls,
468{ 257 struct GNUNET_CLIENT_MANAGER_Connection *client,
469 const struct GNUNET_MULTICAST_RequestHeader *req = cls; 258 const struct GNUNET_MessageHeader *msg)
470 struct GNUNET_MULTICAST_Origin *orig = origin; 259{
260 struct GNUNET_MULTICAST_Group *grp;
261 struct GNUNET_MULTICAST_Origin *
262 orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
263 grp = &orig->grp;
264 struct GNUNET_MULTICAST_RequestHeader *
265 req = (struct GNUNET_MULTICAST_RequestHeader *) msg;
471 266
472 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 267 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
473 "Calling request callback for a request of type %u and size %u.\n", 268 "Calling request callback with a request of size %u.\n",
474 ntohs (req->header.type), ntohs (req->header.size)); 269 ntohs (req->header.size));
475 270
476 if (NULL != orig->request_cb) 271 if (NULL != orig->request_cb)
477 orig->request_cb (orig->grp.cb_cls, &req->member_key, 272 orig->request_cb (grp->cb_cls, req);
478 (const struct GNUNET_MessageHeader *) req, 0);
479 return GNUNET_YES;
480} 273}
481 274
482 275
483/** 276/**
484 * Iterator callback for calling join request callbacks of origins. 277 * Member receives join decision.
485 */ 278 */
486static int 279static void
487join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, 280member_recv_join_decision (void *cls,
488 void *group) 281 struct GNUNET_CLIENT_MANAGER_Connection *client,
282 const struct GNUNET_MessageHeader *msg)
489{ 283{
490 const struct MulticastJoinRequestMessage *req = cls; 284 struct GNUNET_MULTICAST_Group *grp;
491 struct GNUNET_MULTICAST_Group *grp = group; 285 struct GNUNET_MULTICAST_Member *
492 286 mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
493 struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); 287 grp = &mem->grp;
494 jh->group = grp;
495 jh->member_key = req->member_key;
496 jh->member_peer = req->member_peer;
497
498 const struct GNUNET_MessageHeader *msg = NULL;
499 if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size))
500 msg = (const struct GNUNET_MessageHeader *) &req[1];
501
502 if (NULL != grp->join_req_cb)
503 grp->join_req_cb (grp->cb_cls, &req->member_key, msg, jh);
504 return GNUNET_YES;
505}
506
507 288
508/** 289 const struct MulticastJoinDecisionMessageHeader *
509 * Iterator callback for calling join decision callbacks of members. 290 hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg;
510 */
511static int
512join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
513 void *member)
514{
515 const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
516 const struct MulticastJoinDecisionMessage * 291 const struct MulticastJoinDecisionMessage *
517 dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; 292 dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
518 struct GNUNET_MULTICAST_Member *mem = member;
519 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
520 293
521 uint16_t dcsn_size = ntohs (dcsn->header.size); 294 uint16_t dcsn_size = ntohs (dcsn->header.size);
522 int is_admitted = ntohl (dcsn->is_admitted); 295 int is_admitted = ntohl (dcsn->is_admitted);
@@ -549,116 +322,53 @@ join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
549 322
550 if (GNUNET_YES != is_admitted) 323 if (GNUNET_YES != is_admitted)
551 GNUNET_MULTICAST_member_part (mem); 324 GNUNET_MULTICAST_member_part (mem);
552
553 return GNUNET_YES;
554} 325}
555 326
327
556/** 328/**
557 * Function called when we receive a message from the service. 329 * Message handlers for an origin.
558 *
559 * @param cls struct GNUNET_MULTICAST_Group
560 * @param msg Message received, NULL on timeout or fatal error.
561 */ 330 */
562static void 331static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
563message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
564{ 332{
565 struct GNUNET_MULTICAST_Group *grp = cls; 333 { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
566 334
567 if (NULL == msg) 335 { &group_recv_message, NULL,
568 { 336 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
569 // timeout / disconnected from service, reconnect 337 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
570 reschedule_connect (grp);
571 return;
572 }
573 338
574 uint16_t size_eq = 0; 339 { &origin_recv_request, NULL,
575 uint16_t size_min = 0; 340 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
576 uint16_t size = ntohs (msg->size); 341 sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
577 uint16_t type = ntohs (msg->type);
578 342
579 LOG (GNUNET_ERROR_TYPE_DEBUG, 343 { &group_recv_join_request, NULL,
580 "Received message of type %d and size %u from Multicast service\n", 344 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
581 type, size); 345 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
582 346
583 switch (type) 347 { NULL, NULL, 0, 0, GNUNET_NO }
584 { 348};
585 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
586 size_min = sizeof (struct GNUNET_MULTICAST_MessageHeader);
587 break;
588
589 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
590 size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader);
591 break;
592 349
593 case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
594 size_min = sizeof (struct MulticastJoinRequestMessage);
595 break;
596 350
597 case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION: 351/**
598 size_min = sizeof (struct MulticastJoinDecisionMessage); 352 * Message handlers for a member.
599 break; 353 */
354static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
355{
356 { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
600 357
601 default: 358 { &group_recv_message, NULL,
602 GNUNET_break_op (0); 359 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
603 type = 0; 360 sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
604 }
605 361
606 if (! ((0 < size_eq && size == size_eq) 362 { &group_recv_join_request, NULL,
607 || (0 < size_min && size_min <= size))) 363 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
608 { 364 sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
609 GNUNET_break_op (0);
610 type = 0;
611 }
612 365
613 switch (type) 366 { &member_recv_join_decision, NULL,
614 { 367 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
615 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: 368 sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
616 if (origins != NULL)
617 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
618 message_cb, (void *) msg);
619 if (members != NULL)
620 GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
621 message_cb, (void *) msg);
622 break;
623
624 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
625 if (GNUNET_YES != grp->is_origin)
626 {
627 GNUNET_break (0);
628 break;
629 }
630 if (NULL != origins)
631 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
632 request_cb, (void *) msg);
633 break;
634
635 case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
636 if (NULL != origins)
637 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
638 join_request_cb, (void *) msg);
639 if (NULL != members)
640 GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
641 join_request_cb, (void *) msg);
642 break;
643
644 case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION:
645 if (GNUNET_NO != grp->is_origin)
646 {
647 GNUNET_break (0);
648 break;
649 }
650 if (NULL != members)
651 GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
652 join_decision_cb, (void *) msg);
653 break;
654 }
655 369
656 if (NULL != grp->client) 370 { NULL, NULL, 0, 0, GNUNET_NO }
657 { 371};
658 GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
659 GNUNET_TIME_UNIT_FOREVER_REL);
660 }
661}
662 372
663 373
664/** 374/**
@@ -667,7 +377,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
667 * Must be called once and only once in response to an invocation of the 377 * Must be called once and only once in response to an invocation of the
668 * #GNUNET_MULTICAST_JoinRequestCallback. 378 * #GNUNET_MULTICAST_JoinRequestCallback.
669 * 379 *
670 * @param jh Join request handle. 380 * @param join Join request handle.
671 * @param is_admitted #GNUNET_YES if the join is approved, 381 * @param is_admitted #GNUNET_YES if the join is approved,
672 * #GNUNET_NO if it is disapproved, 382 * #GNUNET_NO if it is disapproved,
673 * #GNUNET_SYSERR if we cannot answer the request. 383 * #GNUNET_SYSERR if we cannot answer the request.
@@ -685,27 +395,25 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
685 * peer that issued the request even if admission is denied. 395 * peer that issued the request even if admission is denied.
686 */ 396 */
687struct GNUNET_MULTICAST_ReplayHandle * 397struct GNUNET_MULTICAST_ReplayHandle *
688GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, 398GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
689 int is_admitted, 399 int is_admitted,
690 uint16_t relay_count, 400 uint16_t relay_count,
691 const struct GNUNET_PeerIdentity *relays, 401 const struct GNUNET_PeerIdentity *relays,
692 const struct GNUNET_MessageHeader *join_resp) 402 const struct GNUNET_MessageHeader *join_resp)
693{ 403{
694 struct GNUNET_MULTICAST_Group *grp = jh->group; 404 struct GNUNET_MULTICAST_Group *grp = join->group;
695 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; 405 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
696 uint16_t relay_size = relay_count * sizeof (*relays); 406 uint16_t relay_size = relay_count * sizeof (*relays);
407
697 struct MulticastJoinDecisionMessageHeader * hdcsn; 408 struct MulticastJoinDecisionMessageHeader * hdcsn;
698 struct MulticastJoinDecisionMessage *dcsn; 409 struct MulticastJoinDecisionMessage *dcsn;
699 struct MessageQueue * 410 hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn)
700 mq = GNUNET_malloc (sizeof (*mq) + sizeof (*hdcsn) + sizeof (*dcsn) 411 + relay_size + join_resp_size);
701 + relay_size + join_resp_size);
702
703 hdcsn = (struct MulticastJoinDecisionMessageHeader *) &mq[1];
704 hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
705 hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn) 412 hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
706 + relay_size + join_resp_size); 413 + relay_size + join_resp_size);
707 hdcsn->member_key = jh->member_key; 414 hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
708 hdcsn->peer = jh->member_peer; 415 hdcsn->member_key = join->member_key;
416 hdcsn->peer = join->member_peer;
709 417
710 dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1]; 418 dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
711 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); 419 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
@@ -717,10 +425,8 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh,
717 if (0 < join_resp_size) 425 if (0 < join_resp_size)
718 memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); 426 memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
719 427
720 GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); 428 GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
721 transmit_next (grp); 429 GNUNET_free (join);
722
723 GNUNET_free (jh);
724 return NULL; 430 return NULL;
725} 431}
726 432
@@ -832,7 +538,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
832 start->max_fragment_id = max_fragment_id; 538 start->max_fragment_id = max_fragment_id;
833 memcpy (&start->group_key, priv_key, sizeof (*priv_key)); 539 memcpy (&start->group_key, priv_key, sizeof (*priv_key));
834 540
835 grp->reconnect_msg = (struct GNUNET_MessageHeader *) start; 541 grp->connect_msg = (struct GNUNET_MessageHeader *) start;
836 grp->is_origin = GNUNET_YES; 542 grp->is_origin = GNUNET_YES;
837 grp->cfg = cfg; 543 grp->cfg = cfg;
838 544
@@ -844,20 +550,10 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
844 grp->message_cb = message_cb; 550 grp->message_cb = message_cb;
845 551
846 orig->request_cb = request_cb; 552 orig->request_cb = request_cb;
847 orig->priv_key = *priv_key;
848
849 GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
850 GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key),
851 &grp->pub_key_hash);
852
853 if (NULL == origins)
854 origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
855
856 GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
857 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
858 553
859 grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 554 grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", origin_handlers);
860 grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp); 555 GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp));
556 group_send_connect_msg (grp);
861 557
862 return orig; 558 return orig;
863} 559}
@@ -871,8 +567,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
871void 567void
872GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig) 568GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig)
873{ 569{
874 disconnect (&orig->grp); 570 GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES);
875 GNUNET_CONTAINER_multihashmap_remove (origins, &orig->grp.pub_key_hash, orig);
876 GNUNET_free (orig); 571 GNUNET_free (orig);
877} 572}
878 573
@@ -885,26 +580,22 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
885 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; 580 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
886 581
887 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; 582 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
888 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size); 583 struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
889 GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
890
891 struct GNUNET_MULTICAST_MessageHeader *
892 msg = (struct GNUNET_MULTICAST_MessageHeader *) &mq[1];
893 int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]); 584 int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
894 585
895 if (! (GNUNET_YES == ret || GNUNET_NO == ret) 586 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
896 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) 587 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
897 { 588 {
898 LOG (GNUNET_ERROR_TYPE_ERROR, 589 LOG (GNUNET_ERROR_TYPE_ERROR,
899 "OriginTransmitNotify() returned error or invalid message size.\n"); 590 "OriginTransmitNotify() returned error or invalid message size.\n");
900 /* FIXME: handle error */ 591 /* FIXME: handle error */
901 GNUNET_free (mq); 592 GNUNET_free (msg);
902 return; 593 return;
903 } 594 }
904 595
905 if (GNUNET_NO == ret && 0 == buf_size) 596 if (GNUNET_NO == ret && 0 == buf_size)
906 { 597 {
907 GNUNET_free (mq); 598 GNUNET_free (msg);
908 return; /* Transmission paused. */ 599 return; /* Transmission paused. */
909 } 600 }
910 601
@@ -915,7 +606,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
915 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); 606 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
916 tmit->fragment_offset += sizeof (*msg) + buf_size; 607 tmit->fragment_offset += sizeof (*msg) + buf_size;
917 608
918 transmit_next (grp); 609 GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
919} 610}
920 611
921 612
@@ -939,6 +630,12 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
939 GNUNET_MULTICAST_OriginTransmitNotify notify, 630 GNUNET_MULTICAST_OriginTransmitNotify notify,
940 void *notify_cls) 631 void *notify_cls)
941{ 632{
633/* FIXME
634 if (GNUNET_YES == orig->grp.in_transmit)
635 return NULL;
636 orig->grp.in_transmit = GNUNET_YES;
637*/
638
942 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; 639 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
943 tmit->origin = orig; 640 tmit->origin = orig;
944 tmit->message_id = message_id; 641 tmit->message_id = message_id;
@@ -1047,10 +744,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1047 if (0 < join_msg_size) 744 if (0 < join_msg_size)
1048 memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); 745 memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1049 746
1050 grp->reconnect_msg = (struct GNUNET_MessageHeader *) join; 747 grp->connect_msg = (struct GNUNET_MessageHeader *) join;
1051 grp->is_origin = GNUNET_NO; 748 grp->is_origin = GNUNET_NO;
1052 grp->cfg = cfg; 749 grp->cfg = cfg;
1053 grp->pub_key = *group_key;
1054 750
1055 mem->join_dcsn_cb = join_decision_cb; 751 mem->join_dcsn_cb = join_decision_cb;
1056 grp->join_req_cb = join_request_cb; 752 grp->join_req_cb = join_request_cb;
@@ -1059,17 +755,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1059 grp->message_cb = message_cb; 755 grp->message_cb = message_cb;
1060 grp->cb_cls = cls; 756 grp->cb_cls = cls;
1061 757
1062 GNUNET_CRYPTO_eddsa_key_get_public (member_key, &grp->pub_key); 758 grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", member_handlers);
1063 GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash); 759 GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp));
1064 760 group_send_connect_msg (grp);
1065 if (NULL == members)
1066 members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1067
1068 GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
1069 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1070
1071 grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1072 grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
1073 761
1074 return mem; 762 return mem;
1075} 763}
@@ -1088,8 +776,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1088void 776void
1089GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem) 777GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
1090{ 778{
1091 disconnect (&mem->grp); 779 GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES);
1092 GNUNET_CONTAINER_multihashmap_remove (members, &mem->grp.pub_key_hash, mem);
1093 GNUNET_free (mem); 780 GNUNET_free (mem);
1094} 781}
1095 782
@@ -1162,25 +849,26 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1162 struct GNUNET_MULTICAST_Group *grp = &mem->grp; 849 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1163 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; 850 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1164 851
1165 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; 852 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1166 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size); 853 struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
1167 GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
1168
1169 struct GNUNET_MULTICAST_RequestHeader *
1170 req = (struct GNUNET_MULTICAST_RequestHeader *) &mq[1];
1171 int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]); 854 int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
1172 855
1173 if (! (GNUNET_YES == ret || GNUNET_NO == ret) 856 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
1174 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) 857 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
1175 { 858 {
1176 LOG (GNUNET_ERROR_TYPE_ERROR, 859 LOG (GNUNET_ERROR_TYPE_ERROR,
1177 "MemberTransmitNotify() returned error or invalid message size.\n"); 860 "MemberTransmitNotify() returned error or invalid message size.\n");
1178 /* FIXME: handle error */ 861 /* FIXME: handle error */
862 GNUNET_free (req);
1179 return; 863 return;
1180 } 864 }
1181 865
1182 if (GNUNET_NO == ret && 0 == buf_size) 866 if (GNUNET_NO == ret && 0 == buf_size)
1183 return; /* Transmission paused. */ 867 {
868 /* Transmission paused. */
869 GNUNET_free (req);
870 return;
871 }
1184 872
1185 req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); 873 req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1186 req->header.size = htons (sizeof (*req) + buf_size); 874 req->header.size = htons (sizeof (*req) + buf_size);
@@ -1188,7 +876,7 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1188 req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset); 876 req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
1189 tmit->fragment_offset += sizeof (*req) + buf_size; 877 tmit->fragment_offset += sizeof (*req) + buf_size;
1190 878
1191 transmit_next (grp); 879 GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
1192} 880}
1193 881
1194 882
@@ -1207,6 +895,12 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1207 GNUNET_MULTICAST_MemberTransmitNotify notify, 895 GNUNET_MULTICAST_MemberTransmitNotify notify,
1208 void *notify_cls) 896 void *notify_cls)
1209{ 897{
898/* FIXME
899 if (GNUNET_YES == mem->grp.in_transmit)
900 return NULL;
901 mem->grp.in_transmit = GNUNET_YES;
902*/
903
1210 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; 904 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1211 tmit->member = mem; 905 tmit->member = mem;
1212 tmit->request_id = request_id; 906 tmit->request_id = request_id;