diff options
author | Gabor X Toth <*@tg-x.net> | 2016-08-03 23:20:37 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2016-08-03 23:20:37 +0000 |
commit | 7d83c569d7ea76376a7bc97db58a4ad912dfad8c (patch) | |
tree | 4af9a9a75ae7843922d4a00cf8dae14bc52421cb /src/multicast | |
parent | b2cada934a6811b7fbc2a9d1a9bf84d48a2ba0e1 (diff) | |
download | gnunet-7d83c569d7ea76376a7bc97db58a4ad912dfad8c.tar.gz gnunet-7d83c569d7ea76376a7bc97db58a4ad912dfad8c.zip |
multicast: switch to MQ
Diffstat (limited to 'src/multicast')
-rw-r--r-- | src/multicast/multicast_api.c | 588 |
1 files changed, 367 insertions, 221 deletions
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index e390a621c..db0f0e759 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c | |||
@@ -27,6 +27,7 @@ | |||
27 | 27 | ||
28 | #include "platform.h" | 28 | #include "platform.h" |
29 | #include "gnunet_util_lib.h" | 29 | #include "gnunet_util_lib.h" |
30 | #include "gnunet_mq_lib.h" | ||
30 | #include "gnunet_multicast_service.h" | 31 | #include "gnunet_multicast_service.h" |
31 | #include "multicast.h" | 32 | #include "multicast.h" |
32 | 33 | ||
@@ -73,12 +74,22 @@ struct GNUNET_MULTICAST_Group | |||
73 | /** | 74 | /** |
74 | * Client connection to the service. | 75 | * Client connection to the service. |
75 | */ | 76 | */ |
76 | struct GNUNET_CLIENT_MANAGER_Connection *client; | 77 | struct GNUNET_MQ_Handle *mq; |
77 | 78 | ||
78 | /** | 79 | /** |
79 | * Message to send on reconnect. | 80 | * Time to wait until we try to reconnect on failure. |
80 | */ | 81 | */ |
81 | struct GNUNET_MessageHeader *connect_msg; | 82 | struct GNUNET_TIME_Relative reconnect_backoff; |
83 | |||
84 | /** | ||
85 | * Task for reconnecting when the listener fails. | ||
86 | */ | ||
87 | struct GNUNET_SCHEDULER_Task *reconnect_task; | ||
88 | |||
89 | /** | ||
90 | * Message to send on connect. | ||
91 | */ | ||
92 | struct GNUNET_MQ_Envelope *connect_env; | ||
82 | 93 | ||
83 | GNUNET_MULTICAST_JoinRequestCallback join_req_cb; | 94 | GNUNET_MULTICAST_JoinRequestCallback join_req_cb; |
84 | GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; | 95 | GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; |
@@ -198,31 +209,21 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem); | |||
198 | 209 | ||
199 | 210 | ||
200 | /** | 211 | /** |
201 | * Send first message to the service after connecting. | 212 | * Check join request message. |
202 | */ | 213 | */ |
203 | static void | 214 | static int |
204 | group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp) | 215 | check_group_join_request (void *cls, |
216 | const struct MulticastJoinRequestMessage *jreq) | ||
205 | { | 217 | { |
206 | uint16_t cmsg_size = ntohs (grp->connect_msg->size); | 218 | uint16_t size = ntohs (jreq->header.size); |
207 | struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size); | ||
208 | GNUNET_memcpy (cmsg, grp->connect_msg, cmsg_size); | ||
209 | GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg); | ||
210 | GNUNET_free (cmsg); | ||
211 | } | ||
212 | 219 | ||
220 | if (sizeof (*jreq) == size) | ||
221 | return GNUNET_OK; | ||
213 | 222 | ||
214 | /** | 223 | if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size) |
215 | * Got disconnected from service. Reconnect. | 224 | return GNUNET_OK; |
216 | */ | 225 | |
217 | static void | 226 | return GNUNET_SYSERR; |
218 | group_recv_disconnect (void *cls, | ||
219 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
220 | const struct GNUNET_MessageHeader *msg) | ||
221 | { | ||
222 | struct GNUNET_MULTICAST_Group * | ||
223 | grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); | ||
224 | GNUNET_CLIENT_MANAGER_reconnect (client); | ||
225 | group_send_connect_msg (grp); | ||
226 | } | 227 | } |
227 | 228 | ||
228 | 229 | ||
@@ -230,16 +231,13 @@ group_recv_disconnect (void *cls, | |||
230 | * Receive join request from service. | 231 | * Receive join request from service. |
231 | */ | 232 | */ |
232 | static void | 233 | static void |
233 | group_recv_join_request (void *cls, | 234 | handle_group_join_request (void *cls, |
234 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 235 | const struct MulticastJoinRequestMessage *jreq) |
235 | const struct GNUNET_MessageHeader *msg) | ||
236 | { | 236 | { |
237 | struct GNUNET_MULTICAST_Group *grp; | 237 | struct GNUNET_MULTICAST_Group *grp = cls; |
238 | const struct MulticastJoinRequestMessage *jreq; | ||
239 | struct GNUNET_MULTICAST_JoinHandle *jh; | 238 | struct GNUNET_MULTICAST_JoinHandle *jh; |
240 | const struct GNUNET_MessageHeader *jmsg; | 239 | const struct GNUNET_MessageHeader *jmsg = NULL; |
241 | 240 | ||
242 | grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); | ||
243 | if (NULL == grp) | 241 | if (NULL == grp) |
244 | { | 242 | { |
245 | GNUNET_break (0); | 243 | GNUNET_break (0); |
@@ -247,17 +245,28 @@ group_recv_join_request (void *cls, | |||
247 | } | 245 | } |
248 | if (NULL == grp->join_req_cb) | 246 | if (NULL == grp->join_req_cb) |
249 | return; | 247 | return; |
250 | /* FIXME: this fails to check that 'msg' is well-formed! */ | 248 | |
251 | jreq = (const struct MulticastJoinRequestMessage *) msg; | ||
252 | if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size)) | 249 | if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size)) |
253 | jmsg = (const struct GNUNET_MessageHeader *) &jreq[1]; | 250 | jmsg = (const struct GNUNET_MessageHeader *) &jreq[1]; |
254 | else | 251 | |
255 | jmsg = NULL; | ||
256 | jh = GNUNET_malloc (sizeof (*jh)); | 252 | jh = GNUNET_malloc (sizeof (*jh)); |
257 | jh->group = grp; | 253 | jh->group = grp; |
258 | jh->member_pub_key = jreq->member_pub_key; | 254 | jh->member_pub_key = jreq->member_pub_key; |
259 | jh->peer = jreq->peer; | 255 | jh->peer = jreq->peer; |
260 | grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh); | 256 | grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh); |
257 | |||
258 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | ||
259 | } | ||
260 | |||
261 | |||
262 | /** | ||
263 | * Check multicast message. | ||
264 | */ | ||
265 | static int | ||
266 | check_group_message (void *cls, | ||
267 | const struct GNUNET_MULTICAST_MessageHeader *mmsg) | ||
268 | { | ||
269 | return GNUNET_OK; | ||
261 | } | 270 | } |
262 | 271 | ||
263 | 272 | ||
@@ -265,14 +274,10 @@ group_recv_join_request (void *cls, | |||
265 | * Receive multicast message from service. | 274 | * Receive multicast message from service. |
266 | */ | 275 | */ |
267 | static void | 276 | static void |
268 | group_recv_message (void *cls, | 277 | handle_group_message (void *cls, |
269 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 278 | const struct GNUNET_MULTICAST_MessageHeader *mmsg) |
270 | const struct GNUNET_MessageHeader *msg) | ||
271 | { | 279 | { |
272 | struct GNUNET_MULTICAST_Group * | 280 | struct GNUNET_MULTICAST_Group *grp = cls; |
273 | grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); | ||
274 | struct GNUNET_MULTICAST_MessageHeader * | ||
275 | mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg; | ||
276 | 281 | ||
277 | if (GNUNET_YES == grp->is_disconnecting) | 282 | if (GNUNET_YES == grp->is_disconnecting) |
278 | return; | 283 | return; |
@@ -283,6 +288,8 @@ group_recv_message (void *cls, | |||
283 | 288 | ||
284 | if (NULL != grp->message_cb) | 289 | if (NULL != grp->message_cb) |
285 | grp->message_cb (grp->cb_cls, mmsg); | 290 | grp->message_cb (grp->cb_cls, mmsg); |
291 | |||
292 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | ||
286 | } | 293 | } |
287 | 294 | ||
288 | 295 | ||
@@ -290,12 +297,10 @@ group_recv_message (void *cls, | |||
290 | * Receive message/request fragment acknowledgement from service. | 297 | * Receive message/request fragment acknowledgement from service. |
291 | */ | 298 | */ |
292 | static void | 299 | static void |
293 | group_recv_fragment_ack (void *cls, | 300 | handle_group_fragment_ack (void *cls, |
294 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 301 | const struct GNUNET_MessageHeader *msg) |
295 | const struct GNUNET_MessageHeader *msg) | ||
296 | { | 302 | { |
297 | struct GNUNET_MULTICAST_Group * | 303 | struct GNUNET_MULTICAST_Group *grp = cls; |
298 | grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); | ||
299 | 304 | ||
300 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 305 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
301 | "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n", | 306 | "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n", |
@@ -316,22 +321,32 @@ group_recv_fragment_ack (void *cls, | |||
316 | origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp); | 321 | origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp); |
317 | else | 322 | else |
318 | member_to_origin ((struct GNUNET_MULTICAST_Member *) grp); | 323 | member_to_origin ((struct GNUNET_MULTICAST_Member *) grp); |
324 | |||
325 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | ||
326 | } | ||
327 | |||
328 | |||
329 | /** | ||
330 | * Check unicast request. | ||
331 | */ | ||
332 | static int | ||
333 | check_origin_request (void *cls, | ||
334 | const struct GNUNET_MULTICAST_RequestHeader *req) | ||
335 | { | ||
336 | return GNUNET_OK; | ||
319 | } | 337 | } |
320 | 338 | ||
339 | |||
321 | /** | 340 | /** |
322 | * Origin receives uniquest request from a member. | 341 | * Origin receives unicast request from a member. |
323 | */ | 342 | */ |
324 | static void | 343 | static void |
325 | origin_recv_request (void *cls, | 344 | handle_origin_request (void *cls, |
326 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 345 | const struct GNUNET_MULTICAST_RequestHeader *req) |
327 | const struct GNUNET_MessageHeader *msg) | ||
328 | { | 346 | { |
329 | struct GNUNET_MULTICAST_Group *grp; | 347 | struct GNUNET_MULTICAST_Group *grp; |
330 | struct GNUNET_MULTICAST_Origin * | 348 | struct GNUNET_MULTICAST_Origin *orig = cls; |
331 | orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); | ||
332 | grp = &orig->grp; | 349 | grp = &orig->grp; |
333 | struct GNUNET_MULTICAST_RequestHeader * | ||
334 | req = (struct GNUNET_MULTICAST_RequestHeader *) msg; | ||
335 | 350 | ||
336 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 351 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
337 | "Calling request callback with a request of size %u.\n", | 352 | "Calling request callback with a request of size %u.\n", |
@@ -339,6 +354,8 @@ origin_recv_request (void *cls, | |||
339 | 354 | ||
340 | if (NULL != orig->request_cb) | 355 | if (NULL != orig->request_cb) |
341 | orig->request_cb (grp->cb_cls, req); | 356 | orig->request_cb (grp->cb_cls, req); |
357 | |||
358 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | ||
342 | } | 359 | } |
343 | 360 | ||
344 | 361 | ||
@@ -346,14 +363,11 @@ origin_recv_request (void *cls, | |||
346 | * Receive multicast replay request from service. | 363 | * Receive multicast replay request from service. |
347 | */ | 364 | */ |
348 | static void | 365 | static void |
349 | group_recv_replay_request (void *cls, | 366 | handle_group_replay_request (void *cls, |
350 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 367 | const struct MulticastReplayRequestMessage *rep) |
351 | const struct GNUNET_MessageHeader *msg) | 368 | |
352 | { | 369 | { |
353 | struct GNUNET_MULTICAST_Group * | 370 | struct GNUNET_MULTICAST_Group *grp = cls; |
354 | grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); | ||
355 | struct MulticastReplayRequestMessage * | ||
356 | rep = (struct MulticastReplayRequestMessage *) msg; | ||
357 | 371 | ||
358 | if (GNUNET_YES == grp->is_disconnecting) | 372 | if (GNUNET_YES == grp->is_disconnecting) |
359 | return; | 373 | return; |
@@ -385,45 +399,72 @@ group_recv_replay_request (void *cls, | |||
385 | GNUNET_ntohll (rep->flags), rh); | 399 | GNUNET_ntohll (rep->flags), rh); |
386 | } | 400 | } |
387 | } | 401 | } |
402 | |||
403 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | ||
388 | } | 404 | } |
389 | 405 | ||
390 | 406 | ||
391 | /** | 407 | /** |
392 | * Receive multicast replay request from service. | 408 | * Check replay response. |
409 | */ | ||
410 | static int | ||
411 | check_member_replay_response (void *cls, | ||
412 | const struct MulticastReplayResponseMessage *res) | ||
413 | { | ||
414 | uint16_t size = ntohs (res->header.size); | ||
415 | |||
416 | if (sizeof (*res) == size) | ||
417 | return GNUNET_OK; | ||
418 | |||
419 | if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size) | ||
420 | return GNUNET_OK; | ||
421 | |||
422 | return GNUNET_SYSERR; | ||
423 | } | ||
424 | |||
425 | |||
426 | /** | ||
427 | * Receive replay response from service. | ||
393 | */ | 428 | */ |
394 | static void | 429 | static void |
395 | member_recv_replay_response (void *cls, | 430 | handle_member_replay_response (void *cls, |
396 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 431 | const struct MulticastReplayResponseMessage *res) |
397 | const struct GNUNET_MessageHeader *msg) | ||
398 | { | 432 | { |
399 | struct GNUNET_MULTICAST_Group *grp; | 433 | struct GNUNET_MULTICAST_Group *grp; |
400 | struct GNUNET_MULTICAST_Member * | 434 | struct GNUNET_MULTICAST_Member *mem = cls; |
401 | mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); | ||
402 | grp = &mem->grp; | 435 | grp = &mem->grp; |
403 | // FIXME: Something is missing here for the code to make sense | 436 | |
404 | //struct MulticastReplayResponseMessage * | ||
405 | // res = (struct MulticastReplayResponseMessage *) msg; | ||
406 | if (GNUNET_YES == grp->is_disconnecting) | 437 | if (GNUNET_YES == grp->is_disconnecting) |
407 | return; | 438 | return; |
408 | 439 | ||
409 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n"); | 440 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n"); |
441 | |||
442 | // FIXME: return result | ||
443 | } | ||
444 | |||
445 | |||
446 | /** | ||
447 | * Check join decision. | ||
448 | */ | ||
449 | static int | ||
450 | check_member_join_decision (void *cls, | ||
451 | const struct MulticastJoinDecisionMessageHeader *hdcsn) | ||
452 | { | ||
453 | return GNUNET_OK; // checked in handle below | ||
410 | } | 454 | } |
411 | 455 | ||
456 | |||
412 | /** | 457 | /** |
413 | * Member receives join decision. | 458 | * Member receives join decision. |
414 | */ | 459 | */ |
415 | static void | 460 | static void |
416 | member_recv_join_decision (void *cls, | 461 | handle_member_join_decision (void *cls, |
417 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 462 | const struct MulticastJoinDecisionMessageHeader *hdcsn) |
418 | const struct GNUNET_MessageHeader *msg) | ||
419 | { | 463 | { |
420 | struct GNUNET_MULTICAST_Group *grp; | 464 | struct GNUNET_MULTICAST_Group *grp; |
421 | struct GNUNET_MULTICAST_Member * | 465 | struct GNUNET_MULTICAST_Member *mem = cls; |
422 | mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); | ||
423 | grp = &mem->grp; | 466 | grp = &mem->grp; |
424 | 467 | ||
425 | const struct MulticastJoinDecisionMessageHeader * | ||
426 | hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg; | ||
427 | const struct MulticastJoinDecisionMessage * | 468 | const struct MulticastJoinDecisionMessage * |
428 | dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; | 469 | dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; |
429 | 470 | ||
@@ -474,79 +515,15 @@ member_recv_join_decision (void *cls, | |||
474 | // FIXME: | 515 | // FIXME: |
475 | //if (GNUNET_YES != is_admitted) | 516 | //if (GNUNET_YES != is_admitted) |
476 | // GNUNET_MULTICAST_member_part (mem); | 517 | // GNUNET_MULTICAST_member_part (mem); |
477 | } | ||
478 | |||
479 | 518 | ||
480 | /** | 519 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; |
481 | * Message handlers for an origin. | 520 | } |
482 | */ | ||
483 | static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] = | ||
484 | { | ||
485 | { group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | ||
486 | |||
487 | { group_recv_message, NULL, | ||
488 | GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, | ||
489 | sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, | ||
490 | |||
491 | { origin_recv_request, NULL, | ||
492 | GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, | ||
493 | sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES }, | ||
494 | |||
495 | { group_recv_fragment_ack, NULL, | ||
496 | GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, | ||
497 | sizeof (struct GNUNET_MessageHeader), GNUNET_YES }, | ||
498 | |||
499 | { group_recv_join_request, NULL, | ||
500 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, | ||
501 | sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, | ||
502 | |||
503 | { group_recv_replay_request, NULL, | ||
504 | GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, | ||
505 | sizeof (struct MulticastReplayRequestMessage), GNUNET_NO }, | ||
506 | |||
507 | { NULL, NULL, 0, 0, GNUNET_NO } | ||
508 | }; | ||
509 | |||
510 | |||
511 | /** | ||
512 | * Message handlers for a member. | ||
513 | */ | ||
514 | static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] = | ||
515 | { | ||
516 | { group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | ||
517 | |||
518 | { group_recv_message, NULL, | ||
519 | GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, | ||
520 | sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, | ||
521 | |||
522 | { group_recv_fragment_ack, NULL, | ||
523 | GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, | ||
524 | sizeof (struct GNUNET_MessageHeader), GNUNET_YES }, | ||
525 | |||
526 | { group_recv_join_request, NULL, | ||
527 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, | ||
528 | sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, | ||
529 | |||
530 | { member_recv_join_decision, NULL, | ||
531 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, | ||
532 | sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES }, | ||
533 | |||
534 | { group_recv_replay_request, NULL, | ||
535 | GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, | ||
536 | sizeof (struct MulticastReplayRequestMessage), GNUNET_NO }, | ||
537 | |||
538 | { member_recv_replay_response, NULL, | ||
539 | GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, | ||
540 | sizeof (struct MulticastReplayRequestMessage), GNUNET_NO }, | ||
541 | |||
542 | { NULL, NULL, 0, 0, GNUNET_NO } | ||
543 | }; | ||
544 | 521 | ||
545 | 522 | ||
546 | static void | 523 | static void |
547 | group_cleanup (struct GNUNET_MULTICAST_Group *grp) | 524 | group_cleanup (struct GNUNET_MULTICAST_Group *grp) |
548 | { | 525 | { |
549 | GNUNET_free (grp->connect_msg); | 526 | GNUNET_free (grp->connect_env); |
550 | if (NULL != grp->disconnect_cb) | 527 | if (NULL != grp->disconnect_cb) |
551 | grp->disconnect_cb (grp->disconnect_cls); | 528 | grp->disconnect_cb (grp->disconnect_cls); |
552 | } | 529 | } |
@@ -609,13 +586,11 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join, | |||
609 | uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; | 586 | uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; |
610 | uint16_t relay_size = relay_count * sizeof (*relays); | 587 | uint16_t relay_size = relay_count * sizeof (*relays); |
611 | 588 | ||
612 | struct MulticastJoinDecisionMessageHeader * hdcsn; | 589 | struct MulticastJoinDecisionMessageHeader *hdcsn; |
613 | struct MulticastJoinDecisionMessage *dcsn; | 590 | struct MulticastJoinDecisionMessage *dcsn; |
614 | hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn) | 591 | struct GNUNET_MQ_Envelope * |
615 | + relay_size + join_resp_size); | 592 | env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size, |
616 | hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn) | 593 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); |
617 | + relay_size + join_resp_size); | ||
618 | hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); | ||
619 | hdcsn->member_pub_key = join->member_pub_key; | 594 | hdcsn->member_pub_key = join->member_pub_key; |
620 | hdcsn->peer = join->peer; | 595 | hdcsn->peer = join->peer; |
621 | 596 | ||
@@ -629,8 +604,7 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join, | |||
629 | if (0 < join_resp_size) | 604 | if (0 < join_resp_size) |
630 | GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); | 605 | GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); |
631 | 606 | ||
632 | GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header); | 607 | GNUNET_MQ_send (grp->mq, env); |
633 | GNUNET_free (hdcsn); | ||
634 | GNUNET_free (join); | 608 | GNUNET_free (join); |
635 | return NULL; | 609 | return NULL; |
636 | } | 610 | } |
@@ -653,19 +627,15 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, | |||
653 | enum GNUNET_MULTICAST_ReplayErrorCode ec) | 627 | enum GNUNET_MULTICAST_ReplayErrorCode ec) |
654 | { | 628 | { |
655 | uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0; | 629 | uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0; |
656 | struct MulticastReplayResponseMessage * | 630 | struct MulticastReplayResponseMessage *res; |
657 | res = GNUNET_malloc (sizeof (*res) + msg_size); | 631 | struct GNUNET_MQ_Envelope * |
658 | *res = (struct MulticastReplayResponseMessage) { | 632 | env = GNUNET_MQ_msg_extra (res, msg_size, |
659 | .header = { | 633 | GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE); |
660 | .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE), | 634 | res->fragment_id = rh->req.fragment_id; |
661 | .size = htons (sizeof (*res) + msg_size), | 635 | res->message_id = rh->req.message_id; |
662 | }, | 636 | res->fragment_offset = rh->req.fragment_offset; |
663 | .fragment_id = rh->req.fragment_id, | 637 | res->flags = rh->req.flags; |
664 | .message_id = rh->req.message_id, | 638 | res->error_code = htonl (ec); |
665 | .fragment_offset = rh->req.fragment_offset, | ||
666 | .flags = rh->req.flags, | ||
667 | .error_code = htonl (ec), | ||
668 | }; | ||
669 | 639 | ||
670 | if (GNUNET_MULTICAST_REC_OK == ec) | 640 | if (GNUNET_MULTICAST_REC_OK == ec) |
671 | { | 641 | { |
@@ -673,8 +643,7 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, | |||
673 | GNUNET_memcpy (&res[1], msg, msg_size); | 643 | GNUNET_memcpy (&res[1], msg, msg_size); |
674 | } | 644 | } |
675 | 645 | ||
676 | GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &res->header); | 646 | GNUNET_MQ_send (rh->grp->mq, env); |
677 | GNUNET_free (res); | ||
678 | 647 | ||
679 | if (GNUNET_MULTICAST_REC_OK != ec) | 648 | if (GNUNET_MULTICAST_REC_OK != ec) |
680 | GNUNET_free (rh); | 649 | GNUNET_free (rh); |
@@ -692,18 +661,16 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, | |||
692 | void | 661 | void |
693 | GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh) | 662 | GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh) |
694 | { | 663 | { |
695 | struct MulticastReplayResponseMessage end = { | 664 | struct MulticastReplayResponseMessage *end; |
696 | .header = { | 665 | struct GNUNET_MQ_Envelope * |
697 | .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END), | 666 | env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END); |
698 | .size = htons (sizeof (end)), | 667 | |
699 | }, | 668 | end->fragment_id = rh->req.fragment_id; |
700 | .fragment_id = rh->req.fragment_id, | 669 | end->message_id = rh->req.message_id; |
701 | .message_id = rh->req.message_id, | 670 | end->fragment_offset = rh->req.fragment_offset; |
702 | .fragment_offset = rh->req.fragment_offset, | 671 | end->flags = rh->req.flags; |
703 | .flags = rh->req.flags, | ||
704 | }; | ||
705 | 672 | ||
706 | GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &end.header); | 673 | GNUNET_MQ_send (rh->grp->mq, env); |
707 | GNUNET_free (rh); | 674 | GNUNET_free (rh); |
708 | } | 675 | } |
709 | 676 | ||
@@ -726,6 +693,92 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, | |||
726 | } | 693 | } |
727 | 694 | ||
728 | 695 | ||
696 | void | ||
697 | origin_connect (struct GNUNET_MULTICAST_Origin *orig); | ||
698 | |||
699 | |||
700 | static void | ||
701 | origin_reconnect (void *cls) | ||
702 | { | ||
703 | origin_connect (cls); | ||
704 | } | ||
705 | |||
706 | |||
707 | /** | ||
708 | * Origin client disconnected from service. | ||
709 | * | ||
710 | * Reconnect after backoff period.= | ||
711 | */ | ||
712 | void | ||
713 | origin_disconnected (void *cls, enum GNUNET_MQ_Error error) | ||
714 | { | ||
715 | struct GNUNET_MULTICAST_Origin *orig = cls; | ||
716 | struct GNUNET_MULTICAST_Group *grp = &orig->grp; | ||
717 | |||
718 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
719 | "Origin client disconnected (%d), re-connecting\n", | ||
720 | (int) error); | ||
721 | if (NULL != grp->mq) | ||
722 | { | ||
723 | GNUNET_MQ_destroy (grp->mq); | ||
724 | grp->mq = NULL; | ||
725 | } | ||
726 | |||
727 | grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff, | ||
728 | &origin_reconnect, | ||
729 | orig); | ||
730 | grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff); | ||
731 | } | ||
732 | |||
733 | |||
734 | /** | ||
735 | * Connect to service as origin. | ||
736 | */ | ||
737 | void | ||
738 | origin_connect (struct GNUNET_MULTICAST_Origin *orig) | ||
739 | { | ||
740 | struct GNUNET_MULTICAST_Group *grp = &orig->grp; | ||
741 | |||
742 | GNUNET_MQ_hd_var_size (group_message, | ||
743 | GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, | ||
744 | struct GNUNET_MULTICAST_MessageHeader); | ||
745 | |||
746 | GNUNET_MQ_hd_var_size (origin_request, | ||
747 | GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, | ||
748 | struct GNUNET_MULTICAST_RequestHeader); | ||
749 | |||
750 | GNUNET_MQ_hd_fixed_size (group_fragment_ack, | ||
751 | GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, | ||
752 | struct GNUNET_MessageHeader); | ||
753 | |||
754 | GNUNET_MQ_hd_var_size (group_join_request, | ||
755 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, | ||
756 | struct MulticastJoinRequestMessage); | ||
757 | |||
758 | GNUNET_MQ_hd_fixed_size (group_replay_request, | ||
759 | GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, | ||
760 | struct MulticastReplayRequestMessage); | ||
761 | |||
762 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
763 | make_group_message_handler (grp), | ||
764 | make_origin_request_handler (orig), | ||
765 | make_group_fragment_ack_handler (grp), | ||
766 | make_group_join_request_handler (grp), | ||
767 | make_group_replay_request_handler (grp), | ||
768 | GNUNET_MQ_handler_end () | ||
769 | }; | ||
770 | |||
771 | grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", | ||
772 | handlers, origin_disconnected, orig); | ||
773 | if (NULL == grp->mq) | ||
774 | { | ||
775 | GNUNET_break (0); | ||
776 | return; | ||
777 | } | ||
778 | GNUNET_MQ_send_copy (grp->mq, grp->connect_env); | ||
779 | } | ||
780 | |||
781 | |||
729 | /** | 782 | /** |
730 | * Start a multicast group. | 783 | * Start a multicast group. |
731 | * | 784 | * |
@@ -776,14 +829,13 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
776 | { | 829 | { |
777 | struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig)); | 830 | struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig)); |
778 | struct GNUNET_MULTICAST_Group *grp = &orig->grp; | 831 | struct GNUNET_MULTICAST_Group *grp = &orig->grp; |
779 | struct MulticastOriginStartMessage *start = GNUNET_malloc (sizeof (*start)); | ||
780 | 832 | ||
781 | start->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START); | 833 | struct MulticastOriginStartMessage *start; |
782 | start->header.size = htons (sizeof (*start)); | 834 | grp->connect_env = GNUNET_MQ_msg (start, |
835 | GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START); | ||
783 | start->max_fragment_id = max_fragment_id; | 836 | start->max_fragment_id = max_fragment_id; |
784 | GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key)); | 837 | GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key)); |
785 | 838 | ||
786 | grp->connect_msg = (struct GNUNET_MessageHeader *) start; | ||
787 | grp->is_origin = GNUNET_YES; | 839 | grp->is_origin = GNUNET_YES; |
788 | grp->cfg = cfg; | 840 | grp->cfg = cfg; |
789 | 841 | ||
@@ -795,10 +847,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
795 | 847 | ||
796 | orig->request_cb = request_cb; | 848 | orig->request_cb = request_cb; |
797 | 849 | ||
798 | grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", origin_handlers); | 850 | origin_connect (orig); |
799 | GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp)); | ||
800 | group_send_connect_msg (grp); | ||
801 | |||
802 | return orig; | 851 | return orig; |
803 | } | 852 | } |
804 | 853 | ||
@@ -820,8 +869,13 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig, | |||
820 | grp->disconnect_cb = stop_cb; | 869 | grp->disconnect_cb = stop_cb; |
821 | grp->disconnect_cls = stop_cls; | 870 | grp->disconnect_cls = stop_cls; |
822 | 871 | ||
823 | GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES, | 872 | // FIXME: wait till queued messages are sent |
824 | &origin_cleanup, orig); | 873 | if (NULL != grp->mq) |
874 | { | ||
875 | GNUNET_MQ_destroy (grp->mq); | ||
876 | grp->mq = NULL; | ||
877 | } | ||
878 | origin_cleanup (orig); | ||
825 | } | 879 | } |
826 | 880 | ||
827 | 881 | ||
@@ -834,7 +888,11 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) | |||
834 | GNUNET_assert (GNUNET_YES == grp->in_transmit); | 888 | GNUNET_assert (GNUNET_YES == grp->in_transmit); |
835 | 889 | ||
836 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; | 890 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; |
837 | struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size); | 891 | struct GNUNET_MULTICAST_MessageHeader *msg; |
892 | struct GNUNET_MQ_Envelope * | ||
893 | env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg), | ||
894 | GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); | ||
895 | |||
838 | int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]); | 896 | int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]); |
839 | 897 | ||
840 | if (! (GNUNET_YES == ret || GNUNET_NO == ret) | 898 | if (! (GNUNET_YES == ret || GNUNET_NO == ret) |
@@ -844,7 +902,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) | |||
844 | "%p OriginTransmitNotify() returned error or invalid message size.\n", | 902 | "%p OriginTransmitNotify() returned error or invalid message size.\n", |
845 | orig); | 903 | orig); |
846 | /* FIXME: handle error */ | 904 | /* FIXME: handle error */ |
847 | GNUNET_free (msg); | 905 | GNUNET_free (env); |
848 | return; | 906 | return; |
849 | } | 907 | } |
850 | 908 | ||
@@ -852,11 +910,10 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) | |||
852 | { | 910 | { |
853 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 911 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
854 | "%p OriginTransmitNotify() - transmission paused.\n", orig); | 912 | "%p OriginTransmitNotify() - transmission paused.\n", orig); |
855 | GNUNET_free (msg); | 913 | GNUNET_free (env); |
856 | return; /* Transmission paused. */ | 914 | return; /* Transmission paused. */ |
857 | } | 915 | } |
858 | 916 | ||
859 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); | ||
860 | msg->header.size = htons (sizeof (*msg) + buf_size); | 917 | msg->header.size = htons (sizeof (*msg) + buf_size); |
861 | msg->message_id = GNUNET_htonll (tmit->message_id); | 918 | msg->message_id = GNUNET_htonll (tmit->message_id); |
862 | msg->group_generation = tmit->group_generation; | 919 | msg->group_generation = tmit->group_generation; |
@@ -864,8 +921,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) | |||
864 | tmit->fragment_offset += sizeof (*msg) + buf_size; | 921 | tmit->fragment_offset += sizeof (*msg) + buf_size; |
865 | 922 | ||
866 | grp->acks_pending++; | 923 | grp->acks_pending++; |
867 | GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header); | 924 | GNUNET_MQ_send (grp->mq, env); |
868 | GNUNET_free (msg); | ||
869 | 925 | ||
870 | if (GNUNET_YES == ret) | 926 | if (GNUNET_YES == ret) |
871 | grp->in_transmit = GNUNET_NO; | 927 | grp->in_transmit = GNUNET_NO; |
@@ -944,6 +1000,94 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan | |||
944 | } | 1000 | } |
945 | 1001 | ||
946 | 1002 | ||
1003 | void | ||
1004 | member_connect (struct GNUNET_MULTICAST_Member *mem); | ||
1005 | |||
1006 | |||
1007 | static void | ||
1008 | member_reconnect (void *cls) | ||
1009 | { | ||
1010 | member_connect (cls); | ||
1011 | } | ||
1012 | |||
1013 | |||
1014 | /** | ||
1015 | * Member client disconnected from service. | ||
1016 | * | ||
1017 | * Reconnect after backoff period. | ||
1018 | */ | ||
1019 | void | ||
1020 | member_disconnected (void *cls, enum GNUNET_MQ_Error error) | ||
1021 | { | ||
1022 | struct GNUNET_MULTICAST_Member *mem = cls; | ||
1023 | struct GNUNET_MULTICAST_Group *grp = &mem->grp; | ||
1024 | |||
1025 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1026 | "Member client disconnected (%d), re-connecting\n", | ||
1027 | (int) error); | ||
1028 | GNUNET_MQ_destroy (grp->mq); | ||
1029 | grp->mq = NULL; | ||
1030 | |||
1031 | grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff, | ||
1032 | &member_reconnect, | ||
1033 | mem); | ||
1034 | grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff); | ||
1035 | } | ||
1036 | |||
1037 | |||
1038 | /** | ||
1039 | * Connect to service as member. | ||
1040 | */ | ||
1041 | void | ||
1042 | member_connect (struct GNUNET_MULTICAST_Member *mem) | ||
1043 | { | ||
1044 | struct GNUNET_MULTICAST_Group *grp = &mem->grp; | ||
1045 | |||
1046 | GNUNET_MQ_hd_var_size (group_message, | ||
1047 | GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, | ||
1048 | struct GNUNET_MULTICAST_MessageHeader); | ||
1049 | |||
1050 | GNUNET_MQ_hd_fixed_size (group_fragment_ack, | ||
1051 | GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, | ||
1052 | struct GNUNET_MessageHeader); | ||
1053 | |||
1054 | GNUNET_MQ_hd_var_size (group_join_request, | ||
1055 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, | ||
1056 | struct MulticastJoinRequestMessage); | ||
1057 | |||
1058 | GNUNET_MQ_hd_var_size (member_join_decision, | ||
1059 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, | ||
1060 | struct MulticastJoinDecisionMessageHeader); | ||
1061 | |||
1062 | GNUNET_MQ_hd_fixed_size (group_replay_request, | ||
1063 | GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, | ||
1064 | struct MulticastReplayRequestMessage); | ||
1065 | |||
1066 | GNUNET_MQ_hd_var_size (member_replay_response, | ||
1067 | GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, | ||
1068 | struct MulticastReplayResponseMessage); | ||
1069 | |||
1070 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
1071 | make_group_message_handler (grp), | ||
1072 | make_group_fragment_ack_handler (grp), | ||
1073 | make_group_join_request_handler (grp), | ||
1074 | make_member_join_decision_handler (mem), | ||
1075 | make_group_replay_request_handler (grp), | ||
1076 | make_member_replay_response_handler (mem), | ||
1077 | GNUNET_MQ_handler_end () | ||
1078 | }; | ||
1079 | |||
1080 | grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", | ||
1081 | handlers, member_disconnected, mem); | ||
1082 | if (NULL == grp->mq) | ||
1083 | { | ||
1084 | GNUNET_break (0); | ||
1085 | return; | ||
1086 | } | ||
1087 | GNUNET_MQ_send_copy (grp->mq, grp->connect_env); | ||
1088 | } | ||
1089 | |||
1090 | |||
947 | /** | 1091 | /** |
948 | * Join a multicast group. | 1092 | * Join a multicast group. |
949 | * | 1093 | * |
@@ -1015,10 +1159,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1015 | 1159 | ||
1016 | uint16_t relay_size = relay_count * sizeof (*relays); | 1160 | uint16_t relay_size = relay_count * sizeof (*relays); |
1017 | uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0; | 1161 | uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0; |
1018 | struct MulticastMemberJoinMessage * | 1162 | struct MulticastMemberJoinMessage *join; |
1019 | join = GNUNET_malloc (sizeof (*join) + relay_size + join_msg_size); | 1163 | grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size, |
1020 | join->header.size = htons (sizeof (*join) + relay_size + join_msg_size); | 1164 | GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN); |
1021 | join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN); | ||
1022 | join->group_pub_key = *group_pub_key; | 1165 | join->group_pub_key = *group_pub_key; |
1023 | join->member_key = *member_key; | 1166 | join->member_key = *member_key; |
1024 | join->origin = *origin; | 1167 | join->origin = *origin; |
@@ -1028,7 +1171,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1028 | if (0 < join_msg_size) | 1171 | if (0 < join_msg_size) |
1029 | GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); | 1172 | GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); |
1030 | 1173 | ||
1031 | grp->connect_msg = (struct GNUNET_MessageHeader *) join; | 1174 | grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; |
1032 | grp->is_origin = GNUNET_NO; | 1175 | grp->is_origin = GNUNET_NO; |
1033 | grp->cfg = cfg; | 1176 | grp->cfg = cfg; |
1034 | 1177 | ||
@@ -1039,10 +1182,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1039 | grp->message_cb = message_cb; | 1182 | grp->message_cb = message_cb; |
1040 | grp->cb_cls = cls; | 1183 | grp->cb_cls = cls; |
1041 | 1184 | ||
1042 | grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", member_handlers); | 1185 | member_connect (mem); |
1043 | GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp)); | ||
1044 | group_send_connect_msg (grp); | ||
1045 | |||
1046 | return mem; | 1186 | return mem; |
1047 | } | 1187 | } |
1048 | 1188 | ||
@@ -1076,8 +1216,13 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem, | |||
1076 | grp->replay_msg_cb = NULL; | 1216 | grp->replay_msg_cb = NULL; |
1077 | grp->replay_frag_cb = NULL; | 1217 | grp->replay_frag_cb = NULL; |
1078 | 1218 | ||
1079 | GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES, | 1219 | // FIXME: wait till queued messages are sent |
1080 | member_cleanup, mem); | 1220 | if (NULL != grp->mq) |
1221 | { | ||
1222 | GNUNET_MQ_destroy (grp->mq); | ||
1223 | grp->mq = NULL; | ||
1224 | } | ||
1225 | member_cleanup (mem); | ||
1081 | } | 1226 | } |
1082 | 1227 | ||
1083 | 1228 | ||
@@ -1088,17 +1233,16 @@ member_replay_request (struct GNUNET_MULTICAST_Member *mem, | |||
1088 | uint64_t fragment_offset, | 1233 | uint64_t fragment_offset, |
1089 | uint64_t flags) | 1234 | uint64_t flags) |
1090 | { | 1235 | { |
1091 | struct MulticastReplayRequestMessage rep = { | 1236 | struct MulticastReplayRequestMessage *rep; |
1092 | .header = { | 1237 | struct GNUNET_MQ_Envelope * |
1093 | .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST), | 1238 | env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST); |
1094 | .size = htons (sizeof (rep)), | 1239 | |
1095 | }, | 1240 | rep->fragment_id = GNUNET_htonll (fragment_id); |
1096 | .fragment_id = GNUNET_htonll (fragment_id), | 1241 | rep->message_id = GNUNET_htonll (message_id); |
1097 | .message_id = GNUNET_htonll (message_id), | 1242 | rep->fragment_offset = GNUNET_htonll (fragment_offset); |
1098 | .fragment_offset = GNUNET_htonll (fragment_offset), | 1243 | rep->flags = GNUNET_htonll (flags); |
1099 | .flags = GNUNET_htonll (flags), | 1244 | |
1100 | }; | 1245 | GNUNET_MQ_send (mem->grp.mq, env); |
1101 | GNUNET_CLIENT_MANAGER_transmit (mem->grp.client, &rep.header); | ||
1102 | } | 1246 | } |
1103 | 1247 | ||
1104 | 1248 | ||
@@ -1168,7 +1312,11 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) | |||
1168 | GNUNET_assert (GNUNET_YES == grp->in_transmit); | 1312 | GNUNET_assert (GNUNET_YES == grp->in_transmit); |
1169 | 1313 | ||
1170 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; | 1314 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; |
1171 | struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size); | 1315 | struct GNUNET_MULTICAST_RequestHeader *req; |
1316 | struct GNUNET_MQ_Envelope * | ||
1317 | env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req), | ||
1318 | GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); | ||
1319 | |||
1172 | int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]); | 1320 | int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]); |
1173 | 1321 | ||
1174 | if (! (GNUNET_YES == ret || GNUNET_NO == ret) | 1322 | if (! (GNUNET_YES == ret || GNUNET_NO == ret) |
@@ -1189,14 +1337,12 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) | |||
1189 | return; | 1337 | return; |
1190 | } | 1338 | } |
1191 | 1339 | ||
1192 | req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); | ||
1193 | req->header.size = htons (sizeof (*req) + buf_size); | 1340 | req->header.size = htons (sizeof (*req) + buf_size); |
1194 | req->request_id = GNUNET_htonll (tmit->request_id); | 1341 | req->request_id = GNUNET_htonll (tmit->request_id); |
1195 | req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset); | 1342 | req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset); |
1196 | tmit->fragment_offset += sizeof (*req) + buf_size; | 1343 | tmit->fragment_offset += sizeof (*req) + buf_size; |
1197 | 1344 | ||
1198 | GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header); | 1345 | GNUNET_MQ_send (grp->mq, env); |
1199 | GNUNET_free (req); | ||
1200 | 1346 | ||
1201 | if (GNUNET_YES == ret) | 1347 | if (GNUNET_YES == ret) |
1202 | grp->in_transmit = GNUNET_NO; | 1348 | grp->in_transmit = GNUNET_NO; |