diff options
author | Gabor X Toth <*@tg-x.net> | 2014-05-29 16:35:55 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-05-29 16:35:55 +0000 |
commit | 9955561e1b204ccf23fbf841f409bd3ef79be88c (patch) | |
tree | 0271c23ae9f1dad72266a0e6073d696e5afca027 /src/multicast | |
parent | a5877668ba805c5e0efe622e6ce4c58ff5609bf9 (diff) | |
download | gnunet-9955561e1b204ccf23fbf841f409bd3ef79be88c.tar.gz gnunet-9955561e1b204ccf23fbf841f409bd3ef79be88c.zip |
psyc, multicast: reorg code, use new client manager & psyc util lib
Diffstat (limited to 'src/multicast')
-rw-r--r-- | src/multicast/multicast_api.c | 614 |
1 files changed, 154 insertions, 460 deletions
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index 36d564d52..b6d51896d 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c | |||
@@ -24,6 +24,7 @@ | |||
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | * @author Gabor X Toth | 25 | * @author Gabor X Toth |
26 | */ | 26 | */ |
27 | |||
27 | #include "platform.h" | 28 | #include "platform.h" |
28 | #include "gnunet_util_lib.h" | 29 | #include "gnunet_util_lib.h" |
29 | #include "gnunet_multicast_service.h" | 30 | #include "gnunet_multicast_service.h" |
@@ -33,26 +34,6 @@ | |||
33 | 34 | ||
34 | 35 | ||
35 | /** | 36 | /** |
36 | * Started origins. | ||
37 | * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin | ||
38 | */ | ||
39 | static struct GNUNET_CONTAINER_MultiHashMap *origins; | ||
40 | |||
41 | /** | ||
42 | * Joined members. | ||
43 | * group_key_hash -> struct GNUNET_MULTICAST_Member | ||
44 | */ | ||
45 | static struct GNUNET_CONTAINER_MultiHashMap *members; | ||
46 | |||
47 | |||
48 | struct MessageQueue | ||
49 | { | ||
50 | struct MessageQueue *prev; | ||
51 | struct MessageQueue *next; | ||
52 | }; | ||
53 | |||
54 | |||
55 | /** | ||
56 | * Handle for a request to send a message to all multicast group members | 37 | * Handle for a request to send a message to all multicast group members |
57 | * (from the origin). | 38 | * (from the origin). |
58 | */ | 39 | */ |
@@ -90,47 +71,14 @@ struct GNUNET_MULTICAST_Group | |||
90 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 71 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
91 | 72 | ||
92 | /** | 73 | /** |
93 | * Socket (if available). | 74 | * Client connection to the service. |
94 | */ | 75 | */ |
95 | struct GNUNET_CLIENT_Connection *client; | 76 | struct GNUNET_CLIENT_MANAGER_Connection *client; |
96 | |||
97 | /** | ||
98 | * Currently pending transmission request, or NULL for none. | ||
99 | */ | ||
100 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
101 | |||
102 | /** | ||
103 | * Head of operations to transmit. | ||
104 | */ | ||
105 | struct MessageQueue *tmit_head; | ||
106 | |||
107 | /** | ||
108 | * Tail of operations to transmit. | ||
109 | */ | ||
110 | struct MessageQueue *tmit_tail; | ||
111 | |||
112 | /** | ||
113 | * Message being transmitted to the Multicast service. | ||
114 | */ | ||
115 | struct MessageQueue *tmit_msg; | ||
116 | 77 | ||
117 | /** | 78 | /** |
118 | * Message to send on reconnect. | 79 | * Message to send on reconnect. |
119 | */ | 80 | */ |
120 | struct GNUNET_MessageHeader *reconnect_msg; | 81 | struct GNUNET_MessageHeader *connect_msg; |
121 | |||
122 | /** | ||
123 | * Task doing exponential back-off trying to reconnect. | ||
124 | */ | ||
125 | GNUNET_SCHEDULER_TaskIdentifier reconnect_task; | ||
126 | |||
127 | /** | ||
128 | * Time for next connect retry. | ||
129 | */ | ||
130 | struct GNUNET_TIME_Relative reconnect_delay; | ||
131 | |||
132 | struct GNUNET_CRYPTO_EddsaPublicKey pub_key; | ||
133 | struct GNUNET_HashCode pub_key_hash; | ||
134 | 82 | ||
135 | GNUNET_MULTICAST_JoinRequestCallback join_req_cb; | 83 | GNUNET_MULTICAST_JoinRequestCallback join_req_cb; |
136 | GNUNET_MULTICAST_MembershipTestCallback member_test_cb; | 84 | GNUNET_MULTICAST_MembershipTestCallback member_test_cb; |
@@ -140,11 +88,6 @@ struct GNUNET_MULTICAST_Group | |||
140 | void *cb_cls; | 88 | void *cb_cls; |
141 | 89 | ||
142 | /** | 90 | /** |
143 | * Are we polling for incoming messages right now? | ||
144 | */ | ||
145 | uint8_t in_receive; | ||
146 | |||
147 | /** | ||
148 | * Are we currently transmitting a message? | 91 | * Are we currently transmitting a message? |
149 | */ | 92 | */ |
150 | uint8_t in_transmit; | 93 | uint8_t in_transmit; |
@@ -163,7 +106,6 @@ struct GNUNET_MULTICAST_Origin | |||
163 | { | 106 | { |
164 | struct GNUNET_MULTICAST_Group grp; | 107 | struct GNUNET_MULTICAST_Group grp; |
165 | struct GNUNET_MULTICAST_OriginTransmitHandle tmit; | 108 | struct GNUNET_MULTICAST_OriginTransmitHandle tmit; |
166 | struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; | ||
167 | 109 | ||
168 | GNUNET_MULTICAST_RequestCallback request_cb; | 110 | GNUNET_MULTICAST_RequestCallback request_cb; |
169 | }; | 111 | }; |
@@ -229,294 +171,125 @@ struct GNUNET_MULTICAST_MemberReplayHandle | |||
229 | }; | 171 | }; |
230 | 172 | ||
231 | 173 | ||
232 | static void | ||
233 | reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
234 | |||
235 | |||
236 | static void | ||
237 | reschedule_connect (struct GNUNET_MULTICAST_Group *grp); | ||
238 | |||
239 | |||
240 | /** | 174 | /** |
241 | * Schedule transmission of the next message from our queue. | 175 | * Send first message to the service after connecting. |
242 | * | ||
243 | * @param grp PSYC channel handle | ||
244 | */ | 176 | */ |
245 | static void | 177 | static void |
246 | transmit_next (struct GNUNET_MULTICAST_Group *grp); | 178 | group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp) |
247 | |||
248 | |||
249 | static void | ||
250 | message_handler (void *cls, const struct GNUNET_MessageHeader *msg); | ||
251 | |||
252 | |||
253 | /** | ||
254 | * Reschedule a connect attempt to the service. | ||
255 | * | ||
256 | * @param c channel to reconnect | ||
257 | */ | ||
258 | static void | ||
259 | reschedule_connect (struct GNUNET_MULTICAST_Group *grp) | ||
260 | { | 179 | { |
261 | GNUNET_assert (grp->reconnect_task == GNUNET_SCHEDULER_NO_TASK); | 180 | uint16_t cmsg_size = ntohs (grp->connect_msg->size); |
262 | 181 | struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size); | |
263 | if (NULL != grp->th) | 182 | memcpy (cmsg, grp->connect_msg, cmsg_size); |
264 | { | 183 | GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg); |
265 | GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th); | ||
266 | grp->th = NULL; | ||
267 | } | ||
268 | if (NULL != grp->client) | ||
269 | { | ||
270 | GNUNET_CLIENT_disconnect (grp->client); | ||
271 | grp->client = NULL; | ||
272 | } | ||
273 | grp->in_receive = GNUNET_NO; | ||
274 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
275 | "Scheduling task to reconnect to Multicast service in %s.\n", | ||
276 | GNUNET_STRINGS_relative_time_to_string (grp->reconnect_delay, GNUNET_YES)); | ||
277 | grp->reconnect_task = | ||
278 | GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &reconnect, grp); | ||
279 | grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); | ||
280 | } | 184 | } |
281 | 185 | ||
282 | 186 | ||
283 | /** | 187 | /** |
284 | * Reset stored data related to the last received message. | 188 | * Got disconnected from service. Reconnect. |
285 | */ | 189 | */ |
286 | static void | 190 | static void |
287 | recv_reset (struct GNUNET_MULTICAST_Group *grp) | 191 | group_recv_disconnect (void *cls, |
288 | { | 192 | struct GNUNET_CLIENT_MANAGER_Connection *client, |
289 | } | 193 | const struct GNUNET_MessageHeader *msg) |
290 | |||
291 | |||
292 | static void | ||
293 | recv_error (struct GNUNET_MULTICAST_Group *grp) | ||
294 | { | ||
295 | if (NULL != grp->message_cb) | ||
296 | grp->message_cb (grp->cb_cls, NULL); | ||
297 | |||
298 | recv_reset (grp); | ||
299 | } | ||
300 | |||
301 | |||
302 | /** | ||
303 | * Transmit next message to service. | ||
304 | * | ||
305 | * @param cls The struct GNUNET_MULTICAST_Group. | ||
306 | * @param size Number of bytes available in @a buf. | ||
307 | * @param buf Where to copy the message. | ||
308 | * | ||
309 | * @return Number of bytes copied to @a buf. | ||
310 | */ | ||
311 | static size_t | ||
312 | send_next_message (void *cls, size_t size, void *buf) | ||
313 | { | 194 | { |
314 | LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); | 195 | struct GNUNET_MULTICAST_Group * |
315 | struct GNUNET_MULTICAST_Group *grp = cls; | 196 | grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); |
316 | struct MessageQueue *mq = grp->tmit_head; | 197 | GNUNET_CLIENT_MANAGER_reconnect (client); |
317 | if (NULL == mq) | 198 | group_send_connect_msg (grp); |
318 | return 0; | ||
319 | struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
320 | size_t ret = ntohs (qmsg->size); | ||
321 | grp->th = NULL; | ||
322 | if (ret > size) | ||
323 | { | ||
324 | reschedule_connect (grp); | ||
325 | return 0; | ||
326 | } | ||
327 | memcpy (buf, qmsg, ret); | ||
328 | |||
329 | GNUNET_CONTAINER_DLL_remove (grp->tmit_head, grp->tmit_tail, mq); | ||
330 | GNUNET_free (mq); | ||
331 | |||
332 | if (NULL != grp->tmit_head) | ||
333 | transmit_next (grp); | ||
334 | |||
335 | if (GNUNET_NO == grp->in_receive) | ||
336 | { | ||
337 | grp->in_receive = GNUNET_YES; | ||
338 | GNUNET_CLIENT_receive (grp->client, &message_handler, grp, | ||
339 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
340 | } | ||
341 | return ret; | ||
342 | } | 199 | } |
343 | 200 | ||
344 | 201 | ||
345 | /** | 202 | /** |
346 | * Schedule transmission of the next message from our queue. | 203 | * Receive join request from service. |
347 | * | ||
348 | * @param grp Multicast group handle. | ||
349 | */ | 204 | */ |
350 | static void | 205 | static void |
351 | transmit_next (struct GNUNET_MULTICAST_Group *grp) | 206 | group_recv_join_request (void *cls, |
207 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
208 | const struct GNUNET_MessageHeader *msg) | ||
352 | { | 209 | { |
353 | LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); | 210 | struct GNUNET_MULTICAST_Group * |
354 | if (NULL != grp->th || NULL == grp->client) | 211 | grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); |
355 | return; | ||
356 | |||
357 | struct MessageQueue *mq = grp->tmit_head; | ||
358 | if (NULL == mq) | ||
359 | return; | ||
360 | struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
361 | |||
362 | grp->th = GNUNET_CLIENT_notify_transmit_ready (grp->client, | ||
363 | ntohs (qmsg->size), | ||
364 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
365 | GNUNET_NO, | ||
366 | &send_next_message, | ||
367 | grp); | ||
368 | } | ||
369 | 212 | ||
213 | const struct MulticastJoinRequestMessage * | ||
214 | jreq = (const struct MulticastJoinRequestMessage *) msg; | ||
370 | 215 | ||
371 | /** | 216 | struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); |
372 | * Try again to connect to the Multicast service. | 217 | jh->group = grp; |
373 | * | 218 | jh->member_key = jreq->member_key; |
374 | * @param cls Channel handle. | 219 | jh->member_peer = jreq->member_peer; |
375 | * @param tc Scheduler context. | ||
376 | */ | ||
377 | static void | ||
378 | reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
379 | { | ||
380 | struct GNUNET_MULTICAST_Group *grp = cls; | ||
381 | |||
382 | recv_reset (grp); | ||
383 | grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
384 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
385 | "Connecting to Multicast service.\n"); | ||
386 | GNUNET_assert (NULL == grp->client); | ||
387 | grp->client = GNUNET_CLIENT_connect ("multicast", grp->cfg); | ||
388 | GNUNET_assert (NULL != grp->client); | ||
389 | uint16_t reconn_size = ntohs (grp->reconnect_msg->size); | ||
390 | |||
391 | if (NULL == grp->tmit_head || | ||
392 | 0 != memcmp (&grp->tmit_head[1], grp->reconnect_msg, reconn_size)) | ||
393 | { | ||
394 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size); | ||
395 | memcpy (&mq[1], grp->reconnect_msg, reconn_size); | ||
396 | GNUNET_CONTAINER_DLL_insert (grp->tmit_head, grp->tmit_tail, mq); | ||
397 | } | ||
398 | transmit_next (grp); | ||
399 | } | ||
400 | 220 | ||
221 | const struct GNUNET_MessageHeader *jmsg = NULL; | ||
222 | if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size)) | ||
223 | jmsg = (const struct GNUNET_MessageHeader *) &jreq[1]; | ||
401 | 224 | ||
402 | /** | 225 | if (NULL != grp->join_req_cb) |
403 | * Disconnect from the Multicast service. | 226 | grp->join_req_cb (grp->cb_cls, &jreq->member_key, jmsg, jh); |
404 | * | ||
405 | * @param g Group handle to disconnect. | ||
406 | */ | ||
407 | static void | ||
408 | disconnect (void *g) | ||
409 | { | ||
410 | struct GNUNET_MULTICAST_Group *grp = g; | ||
411 | |||
412 | GNUNET_assert (NULL != grp); | ||
413 | if (grp->tmit_head != grp->tmit_tail) | ||
414 | { | ||
415 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
416 | "Disconnecting while there are still outstanding messages!\n"); | ||
417 | GNUNET_break (0); | ||
418 | } | ||
419 | if (grp->reconnect_task != GNUNET_SCHEDULER_NO_TASK) | ||
420 | { | ||
421 | GNUNET_SCHEDULER_cancel (grp->reconnect_task); | ||
422 | grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
423 | } | ||
424 | if (NULL != grp->th) | ||
425 | { | ||
426 | GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th); | ||
427 | grp->th = NULL; | ||
428 | } | ||
429 | if (NULL != grp->client) | ||
430 | { | ||
431 | GNUNET_CLIENT_disconnect (grp->client); | ||
432 | grp->client = NULL; | ||
433 | } | ||
434 | if (NULL != grp->reconnect_msg) | ||
435 | { | ||
436 | GNUNET_free (grp->reconnect_msg); | ||
437 | grp->reconnect_msg = NULL; | ||
438 | } | ||
439 | } | 227 | } |
440 | 228 | ||
441 | 229 | ||
442 | /** | 230 | /** |
443 | * Iterator callback for calling message callbacks for all groups. | 231 | * Receive multicast message from service. |
444 | */ | 232 | */ |
445 | static int | 233 | static void |
446 | message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group) | 234 | group_recv_message (void *cls, |
235 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
236 | const struct GNUNET_MessageHeader *msg) | ||
447 | { | 237 | { |
448 | const struct GNUNET_MessageHeader *msg = cls; | 238 | struct GNUNET_MULTICAST_Group * |
449 | struct GNUNET_MULTICAST_Group *grp = group; | 239 | grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); |
240 | struct GNUNET_MULTICAST_MessageHeader * | ||
241 | mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg; | ||
450 | 242 | ||
451 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 243 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
452 | "Calling message callback with a message " | 244 | "Calling message callback with a message of size %u.\n", |
453 | "of type %u and size %u.\n", | 245 | ntohs (mmsg->header.size)); |
454 | ntohs (msg->type), ntohs (msg->size)); | ||
455 | 246 | ||
456 | if (NULL != grp->message_cb) | 247 | if (NULL != grp->message_cb) |
457 | grp->message_cb (grp->cb_cls, msg); | 248 | grp->message_cb (grp->cb_cls, mmsg); |
458 | |||
459 | return GNUNET_YES; | ||
460 | } | 249 | } |
461 | 250 | ||
462 | 251 | ||
463 | /** | 252 | /** |
464 | * Iterator callback for calling request callbacks of origins. | 253 | * Origin receives uniquest request from a member. |
465 | */ | 254 | */ |
466 | static int | 255 | static void |
467 | request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin) | 256 | origin_recv_request (void *cls, |
468 | { | 257 | struct GNUNET_CLIENT_MANAGER_Connection *client, |
469 | const struct GNUNET_MULTICAST_RequestHeader *req = cls; | 258 | const struct GNUNET_MessageHeader *msg) |
470 | struct GNUNET_MULTICAST_Origin *orig = origin; | 259 | { |
260 | struct GNUNET_MULTICAST_Group *grp; | ||
261 | struct GNUNET_MULTICAST_Origin * | ||
262 | orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); | ||
263 | grp = &orig->grp; | ||
264 | struct GNUNET_MULTICAST_RequestHeader * | ||
265 | req = (struct GNUNET_MULTICAST_RequestHeader *) msg; | ||
471 | 266 | ||
472 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 267 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
473 | "Calling request callback for a request of type %u and size %u.\n", | 268 | "Calling request callback with a request of size %u.\n", |
474 | ntohs (req->header.type), ntohs (req->header.size)); | 269 | ntohs (req->header.size)); |
475 | 270 | ||
476 | if (NULL != orig->request_cb) | 271 | if (NULL != orig->request_cb) |
477 | orig->request_cb (orig->grp.cb_cls, &req->member_key, | 272 | orig->request_cb (grp->cb_cls, req); |
478 | (const struct GNUNET_MessageHeader *) req, 0); | ||
479 | return GNUNET_YES; | ||
480 | } | 273 | } |
481 | 274 | ||
482 | 275 | ||
483 | /** | 276 | /** |
484 | * Iterator callback for calling join request callbacks of origins. | 277 | * Member receives join decision. |
485 | */ | 278 | */ |
486 | static int | 279 | static void |
487 | join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, | 280 | member_recv_join_decision (void *cls, |
488 | void *group) | 281 | struct GNUNET_CLIENT_MANAGER_Connection *client, |
282 | const struct GNUNET_MessageHeader *msg) | ||
489 | { | 283 | { |
490 | const struct MulticastJoinRequestMessage *req = cls; | 284 | struct GNUNET_MULTICAST_Group *grp; |
491 | struct GNUNET_MULTICAST_Group *grp = group; | 285 | struct GNUNET_MULTICAST_Member * |
492 | 286 | mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); | |
493 | struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); | 287 | grp = &mem->grp; |
494 | jh->group = grp; | ||
495 | jh->member_key = req->member_key; | ||
496 | jh->member_peer = req->member_peer; | ||
497 | |||
498 | const struct GNUNET_MessageHeader *msg = NULL; | ||
499 | if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size)) | ||
500 | msg = (const struct GNUNET_MessageHeader *) &req[1]; | ||
501 | |||
502 | if (NULL != grp->join_req_cb) | ||
503 | grp->join_req_cb (grp->cb_cls, &req->member_key, msg, jh); | ||
504 | return GNUNET_YES; | ||
505 | } | ||
506 | |||
507 | 288 | ||
508 | /** | 289 | const struct MulticastJoinDecisionMessageHeader * |
509 | * Iterator callback for calling join decision callbacks of members. | 290 | hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg; |
510 | */ | ||
511 | static int | ||
512 | join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, | ||
513 | void *member) | ||
514 | { | ||
515 | const struct MulticastJoinDecisionMessageHeader *hdcsn = cls; | ||
516 | const struct MulticastJoinDecisionMessage * | 291 | const struct MulticastJoinDecisionMessage * |
517 | dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; | 292 | dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; |
518 | struct GNUNET_MULTICAST_Member *mem = member; | ||
519 | struct GNUNET_MULTICAST_Group *grp = &mem->grp; | ||
520 | 293 | ||
521 | uint16_t dcsn_size = ntohs (dcsn->header.size); | 294 | uint16_t dcsn_size = ntohs (dcsn->header.size); |
522 | int is_admitted = ntohl (dcsn->is_admitted); | 295 | int is_admitted = ntohl (dcsn->is_admitted); |
@@ -549,116 +322,53 @@ join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, | |||
549 | 322 | ||
550 | if (GNUNET_YES != is_admitted) | 323 | if (GNUNET_YES != is_admitted) |
551 | GNUNET_MULTICAST_member_part (mem); | 324 | GNUNET_MULTICAST_member_part (mem); |
552 | |||
553 | return GNUNET_YES; | ||
554 | } | 325 | } |
555 | 326 | ||
327 | |||
556 | /** | 328 | /** |
557 | * Function called when we receive a message from the service. | 329 | * Message handlers for an origin. |
558 | * | ||
559 | * @param cls struct GNUNET_MULTICAST_Group | ||
560 | * @param msg Message received, NULL on timeout or fatal error. | ||
561 | */ | 330 | */ |
562 | static void | 331 | static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] = |
563 | message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | ||
564 | { | 332 | { |
565 | struct GNUNET_MULTICAST_Group *grp = cls; | 333 | { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, |
566 | 334 | ||
567 | if (NULL == msg) | 335 | { &group_recv_message, NULL, |
568 | { | 336 | GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, |
569 | // timeout / disconnected from service, reconnect | 337 | sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, |
570 | reschedule_connect (grp); | ||
571 | return; | ||
572 | } | ||
573 | 338 | ||
574 | uint16_t size_eq = 0; | 339 | { &origin_recv_request, NULL, |
575 | uint16_t size_min = 0; | 340 | GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, |
576 | uint16_t size = ntohs (msg->size); | 341 | sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES }, |
577 | uint16_t type = ntohs (msg->type); | ||
578 | 342 | ||
579 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 343 | { &group_recv_join_request, NULL, |
580 | "Received message of type %d and size %u from Multicast service\n", | 344 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, |
581 | type, size); | 345 | sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, |
582 | 346 | ||
583 | switch (type) | 347 | { NULL, NULL, 0, 0, GNUNET_NO } |
584 | { | 348 | }; |
585 | case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: | ||
586 | size_min = sizeof (struct GNUNET_MULTICAST_MessageHeader); | ||
587 | break; | ||
588 | |||
589 | case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: | ||
590 | size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader); | ||
591 | break; | ||
592 | 349 | ||
593 | case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST: | ||
594 | size_min = sizeof (struct MulticastJoinRequestMessage); | ||
595 | break; | ||
596 | 350 | ||
597 | case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION: | 351 | /** |
598 | size_min = sizeof (struct MulticastJoinDecisionMessage); | 352 | * Message handlers for a member. |
599 | break; | 353 | */ |
354 | static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] = | ||
355 | { | ||
356 | { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | ||
600 | 357 | ||
601 | default: | 358 | { &group_recv_message, NULL, |
602 | GNUNET_break_op (0); | 359 | GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, |
603 | type = 0; | 360 | sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, |
604 | } | ||
605 | 361 | ||
606 | if (! ((0 < size_eq && size == size_eq) | 362 | { &group_recv_join_request, NULL, |
607 | || (0 < size_min && size_min <= size))) | 363 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, |
608 | { | 364 | sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, |
609 | GNUNET_break_op (0); | ||
610 | type = 0; | ||
611 | } | ||
612 | 365 | ||
613 | switch (type) | 366 | { &member_recv_join_decision, NULL, |
614 | { | 367 | GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, |
615 | case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: | 368 | sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES }, |
616 | if (origins != NULL) | ||
617 | GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, | ||
618 | message_cb, (void *) msg); | ||
619 | if (members != NULL) | ||
620 | GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, | ||
621 | message_cb, (void *) msg); | ||
622 | break; | ||
623 | |||
624 | case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: | ||
625 | if (GNUNET_YES != grp->is_origin) | ||
626 | { | ||
627 | GNUNET_break (0); | ||
628 | break; | ||
629 | } | ||
630 | if (NULL != origins) | ||
631 | GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, | ||
632 | request_cb, (void *) msg); | ||
633 | break; | ||
634 | |||
635 | case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST: | ||
636 | if (NULL != origins) | ||
637 | GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, | ||
638 | join_request_cb, (void *) msg); | ||
639 | if (NULL != members) | ||
640 | GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, | ||
641 | join_request_cb, (void *) msg); | ||
642 | break; | ||
643 | |||
644 | case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION: | ||
645 | if (GNUNET_NO != grp->is_origin) | ||
646 | { | ||
647 | GNUNET_break (0); | ||
648 | break; | ||
649 | } | ||
650 | if (NULL != members) | ||
651 | GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, | ||
652 | join_decision_cb, (void *) msg); | ||
653 | break; | ||
654 | } | ||
655 | 369 | ||
656 | if (NULL != grp->client) | 370 | { NULL, NULL, 0, 0, GNUNET_NO } |
657 | { | 371 | }; |
658 | GNUNET_CLIENT_receive (grp->client, &message_handler, grp, | ||
659 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
660 | } | ||
661 | } | ||
662 | 372 | ||
663 | 373 | ||
664 | /** | 374 | /** |
@@ -667,7 +377,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
667 | * Must be called once and only once in response to an invocation of the | 377 | * Must be called once and only once in response to an invocation of the |
668 | * #GNUNET_MULTICAST_JoinRequestCallback. | 378 | * #GNUNET_MULTICAST_JoinRequestCallback. |
669 | * | 379 | * |
670 | * @param jh Join request handle. | 380 | * @param join Join request handle. |
671 | * @param is_admitted #GNUNET_YES if the join is approved, | 381 | * @param is_admitted #GNUNET_YES if the join is approved, |
672 | * #GNUNET_NO if it is disapproved, | 382 | * #GNUNET_NO if it is disapproved, |
673 | * #GNUNET_SYSERR if we cannot answer the request. | 383 | * #GNUNET_SYSERR if we cannot answer the request. |
@@ -685,27 +395,25 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
685 | * peer that issued the request even if admission is denied. | 395 | * peer that issued the request even if admission is denied. |
686 | */ | 396 | */ |
687 | struct GNUNET_MULTICAST_ReplayHandle * | 397 | struct GNUNET_MULTICAST_ReplayHandle * |
688 | GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, | 398 | GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join, |
689 | int is_admitted, | 399 | int is_admitted, |
690 | uint16_t relay_count, | 400 | uint16_t relay_count, |
691 | const struct GNUNET_PeerIdentity *relays, | 401 | const struct GNUNET_PeerIdentity *relays, |
692 | const struct GNUNET_MessageHeader *join_resp) | 402 | const struct GNUNET_MessageHeader *join_resp) |
693 | { | 403 | { |
694 | struct GNUNET_MULTICAST_Group *grp = jh->group; | 404 | struct GNUNET_MULTICAST_Group *grp = join->group; |
695 | uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; | 405 | uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; |
696 | uint16_t relay_size = relay_count * sizeof (*relays); | 406 | uint16_t relay_size = relay_count * sizeof (*relays); |
407 | |||
697 | struct MulticastJoinDecisionMessageHeader * hdcsn; | 408 | struct MulticastJoinDecisionMessageHeader * hdcsn; |
698 | struct MulticastJoinDecisionMessage *dcsn; | 409 | struct MulticastJoinDecisionMessage *dcsn; |
699 | struct MessageQueue * | 410 | hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn) |
700 | mq = GNUNET_malloc (sizeof (*mq) + sizeof (*hdcsn) + sizeof (*dcsn) | 411 | + relay_size + join_resp_size); |
701 | + relay_size + join_resp_size); | ||
702 | |||
703 | hdcsn = (struct MulticastJoinDecisionMessageHeader *) &mq[1]; | ||
704 | hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); | ||
705 | hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn) | 412 | hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn) |
706 | + relay_size + join_resp_size); | 413 | + relay_size + join_resp_size); |
707 | hdcsn->member_key = jh->member_key; | 414 | hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); |
708 | hdcsn->peer = jh->member_peer; | 415 | hdcsn->member_key = join->member_key; |
416 | hdcsn->peer = join->member_peer; | ||
709 | 417 | ||
710 | dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1]; | 418 | dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1]; |
711 | dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); | 419 | dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); |
@@ -717,10 +425,8 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, | |||
717 | if (0 < join_resp_size) | 425 | if (0 < join_resp_size) |
718 | memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); | 426 | memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); |
719 | 427 | ||
720 | GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); | 428 | GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header); |
721 | transmit_next (grp); | 429 | GNUNET_free (join); |
722 | |||
723 | GNUNET_free (jh); | ||
724 | return NULL; | 430 | return NULL; |
725 | } | 431 | } |
726 | 432 | ||
@@ -832,7 +538,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
832 | start->max_fragment_id = max_fragment_id; | 538 | start->max_fragment_id = max_fragment_id; |
833 | memcpy (&start->group_key, priv_key, sizeof (*priv_key)); | 539 | memcpy (&start->group_key, priv_key, sizeof (*priv_key)); |
834 | 540 | ||
835 | grp->reconnect_msg = (struct GNUNET_MessageHeader *) start; | 541 | grp->connect_msg = (struct GNUNET_MessageHeader *) start; |
836 | grp->is_origin = GNUNET_YES; | 542 | grp->is_origin = GNUNET_YES; |
837 | grp->cfg = cfg; | 543 | grp->cfg = cfg; |
838 | 544 | ||
@@ -844,20 +550,10 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
844 | grp->message_cb = message_cb; | 550 | grp->message_cb = message_cb; |
845 | 551 | ||
846 | orig->request_cb = request_cb; | 552 | orig->request_cb = request_cb; |
847 | orig->priv_key = *priv_key; | ||
848 | |||
849 | GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key); | ||
850 | GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), | ||
851 | &grp->pub_key_hash); | ||
852 | |||
853 | if (NULL == origins) | ||
854 | origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | ||
855 | |||
856 | GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig, | ||
857 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
858 | 553 | ||
859 | grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 554 | grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", origin_handlers); |
860 | grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp); | 555 | GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp)); |
556 | group_send_connect_msg (grp); | ||
861 | 557 | ||
862 | return orig; | 558 | return orig; |
863 | } | 559 | } |
@@ -871,8 +567,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
871 | void | 567 | void |
872 | GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig) | 568 | GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig) |
873 | { | 569 | { |
874 | disconnect (&orig->grp); | 570 | GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES); |
875 | GNUNET_CONTAINER_multihashmap_remove (origins, &orig->grp.pub_key_hash, orig); | ||
876 | GNUNET_free (orig); | 571 | GNUNET_free (orig); |
877 | } | 572 | } |
878 | 573 | ||
@@ -885,26 +580,22 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) | |||
885 | struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; | 580 | struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; |
886 | 581 | ||
887 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; | 582 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; |
888 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size); | 583 | struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size); |
889 | GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); | ||
890 | |||
891 | struct GNUNET_MULTICAST_MessageHeader * | ||
892 | msg = (struct GNUNET_MULTICAST_MessageHeader *) &mq[1]; | ||
893 | int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]); | 584 | int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]); |
894 | 585 | ||
895 | if (! (GNUNET_YES == ret || GNUNET_NO == ret) | 586 | if (! (GNUNET_YES == ret || GNUNET_NO == ret) |
896 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) | 587 | || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) |
897 | { | 588 | { |
898 | LOG (GNUNET_ERROR_TYPE_ERROR, | 589 | LOG (GNUNET_ERROR_TYPE_ERROR, |
899 | "OriginTransmitNotify() returned error or invalid message size.\n"); | 590 | "OriginTransmitNotify() returned error or invalid message size.\n"); |
900 | /* FIXME: handle error */ | 591 | /* FIXME: handle error */ |
901 | GNUNET_free (mq); | 592 | GNUNET_free (msg); |
902 | return; | 593 | return; |
903 | } | 594 | } |
904 | 595 | ||
905 | if (GNUNET_NO == ret && 0 == buf_size) | 596 | if (GNUNET_NO == ret && 0 == buf_size) |
906 | { | 597 | { |
907 | GNUNET_free (mq); | 598 | GNUNET_free (msg); |
908 | return; /* Transmission paused. */ | 599 | return; /* Transmission paused. */ |
909 | } | 600 | } |
910 | 601 | ||
@@ -915,7 +606,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) | |||
915 | msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); | 606 | msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); |
916 | tmit->fragment_offset += sizeof (*msg) + buf_size; | 607 | tmit->fragment_offset += sizeof (*msg) + buf_size; |
917 | 608 | ||
918 | transmit_next (grp); | 609 | GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header); |
919 | } | 610 | } |
920 | 611 | ||
921 | 612 | ||
@@ -939,6 +630,12 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig, | |||
939 | GNUNET_MULTICAST_OriginTransmitNotify notify, | 630 | GNUNET_MULTICAST_OriginTransmitNotify notify, |
940 | void *notify_cls) | 631 | void *notify_cls) |
941 | { | 632 | { |
633 | /* FIXME | ||
634 | if (GNUNET_YES == orig->grp.in_transmit) | ||
635 | return NULL; | ||
636 | orig->grp.in_transmit = GNUNET_YES; | ||
637 | */ | ||
638 | |||
942 | struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; | 639 | struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; |
943 | tmit->origin = orig; | 640 | tmit->origin = orig; |
944 | tmit->message_id = message_id; | 641 | tmit->message_id = message_id; |
@@ -1047,10 +744,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1047 | if (0 < join_msg_size) | 744 | if (0 < join_msg_size) |
1048 | memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); | 745 | memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); |
1049 | 746 | ||
1050 | grp->reconnect_msg = (struct GNUNET_MessageHeader *) join; | 747 | grp->connect_msg = (struct GNUNET_MessageHeader *) join; |
1051 | grp->is_origin = GNUNET_NO; | 748 | grp->is_origin = GNUNET_NO; |
1052 | grp->cfg = cfg; | 749 | grp->cfg = cfg; |
1053 | grp->pub_key = *group_key; | ||
1054 | 750 | ||
1055 | mem->join_dcsn_cb = join_decision_cb; | 751 | mem->join_dcsn_cb = join_decision_cb; |
1056 | grp->join_req_cb = join_request_cb; | 752 | grp->join_req_cb = join_request_cb; |
@@ -1059,17 +755,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1059 | grp->message_cb = message_cb; | 755 | grp->message_cb = message_cb; |
1060 | grp->cb_cls = cls; | 756 | grp->cb_cls = cls; |
1061 | 757 | ||
1062 | GNUNET_CRYPTO_eddsa_key_get_public (member_key, &grp->pub_key); | 758 | grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", member_handlers); |
1063 | GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash); | 759 | GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp)); |
1064 | 760 | group_send_connect_msg (grp); | |
1065 | if (NULL == members) | ||
1066 | members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | ||
1067 | |||
1068 | GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem, | ||
1069 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
1070 | |||
1071 | grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | ||
1072 | grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp); | ||
1073 | 761 | ||
1074 | return mem; | 762 | return mem; |
1075 | } | 763 | } |
@@ -1088,8 +776,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1088 | void | 776 | void |
1089 | GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem) | 777 | GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem) |
1090 | { | 778 | { |
1091 | disconnect (&mem->grp); | 779 | GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES); |
1092 | GNUNET_CONTAINER_multihashmap_remove (members, &mem->grp.pub_key_hash, mem); | ||
1093 | GNUNET_free (mem); | 780 | GNUNET_free (mem); |
1094 | } | 781 | } |
1095 | 782 | ||
@@ -1162,25 +849,26 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) | |||
1162 | struct GNUNET_MULTICAST_Group *grp = &mem->grp; | 849 | struct GNUNET_MULTICAST_Group *grp = &mem->grp; |
1163 | struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; | 850 | struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; |
1164 | 851 | ||
1165 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; | 852 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; |
1166 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size); | 853 | struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size); |
1167 | GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); | ||
1168 | |||
1169 | struct GNUNET_MULTICAST_RequestHeader * | ||
1170 | req = (struct GNUNET_MULTICAST_RequestHeader *) &mq[1]; | ||
1171 | int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]); | 854 | int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]); |
1172 | 855 | ||
1173 | if (! (GNUNET_YES == ret || GNUNET_NO == ret) | 856 | if (! (GNUNET_YES == ret || GNUNET_NO == ret) |
1174 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) | 857 | || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) |
1175 | { | 858 | { |
1176 | LOG (GNUNET_ERROR_TYPE_ERROR, | 859 | LOG (GNUNET_ERROR_TYPE_ERROR, |
1177 | "MemberTransmitNotify() returned error or invalid message size.\n"); | 860 | "MemberTransmitNotify() returned error or invalid message size.\n"); |
1178 | /* FIXME: handle error */ | 861 | /* FIXME: handle error */ |
862 | GNUNET_free (req); | ||
1179 | return; | 863 | return; |
1180 | } | 864 | } |
1181 | 865 | ||
1182 | if (GNUNET_NO == ret && 0 == buf_size) | 866 | if (GNUNET_NO == ret && 0 == buf_size) |
1183 | return; /* Transmission paused. */ | 867 | { |
868 | /* Transmission paused. */ | ||
869 | GNUNET_free (req); | ||
870 | return; | ||
871 | } | ||
1184 | 872 | ||
1185 | req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); | 873 | req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); |
1186 | req->header.size = htons (sizeof (*req) + buf_size); | 874 | req->header.size = htons (sizeof (*req) + buf_size); |
@@ -1188,7 +876,7 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) | |||
1188 | req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset); | 876 | req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset); |
1189 | tmit->fragment_offset += sizeof (*req) + buf_size; | 877 | tmit->fragment_offset += sizeof (*req) + buf_size; |
1190 | 878 | ||
1191 | transmit_next (grp); | 879 | GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header); |
1192 | } | 880 | } |
1193 | 881 | ||
1194 | 882 | ||
@@ -1207,6 +895,12 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem, | |||
1207 | GNUNET_MULTICAST_MemberTransmitNotify notify, | 895 | GNUNET_MULTICAST_MemberTransmitNotify notify, |
1208 | void *notify_cls) | 896 | void *notify_cls) |
1209 | { | 897 | { |
898 | /* FIXME | ||
899 | if (GNUNET_YES == mem->grp.in_transmit) | ||
900 | return NULL; | ||
901 | mem->grp.in_transmit = GNUNET_YES; | ||
902 | */ | ||
903 | |||
1210 | struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; | 904 | struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; |
1211 | tmit->member = mem; | 905 | tmit->member = mem; |
1212 | tmit->request_id = request_id; | 906 | tmit->request_id = request_id; |