diff options
author | Gabor X Toth <*@tg-x.net> | 2013-11-09 23:12:27 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2013-11-09 23:12:27 +0000 |
commit | 172ab07eeb1215cc9d22dabc589f7529ac2d59ea (patch) | |
tree | d75fc75033c7e93b58c1908f06c7b856bc317b8e /src/psyc | |
parent | d10808d7f17c5f6f1356c22ef0992965cbaf5ce1 (diff) | |
download | gnunet-172ab07eeb1215cc9d22dabc589f7529ac2d59ea.tar.gz gnunet-172ab07eeb1215cc9d22dabc589f7529ac2d59ea.zip |
psyc: handling messages from multicast and passing them to clients; pause/resume fixes
Diffstat (limited to 'src/psyc')
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 320 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 94 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 50 |
3 files changed, 355 insertions, 109 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 77eb82a4c..a3b1b8f82 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -56,7 +56,7 @@ static struct GNUNET_SERVER_NotificationContext *nc; | |||
56 | static struct GNUNET_PSYCSTORE_Handle *store; | 56 | static struct GNUNET_PSYCSTORE_Handle *store; |
57 | 57 | ||
58 | /** | 58 | /** |
59 | * channel's pub_key_hash -> struct Channel | 59 | * Channel's pub_key_hash -> struct Channel |
60 | */ | 60 | */ |
61 | static struct GNUNET_CONTAINER_MultiHashMap *clients; | 61 | static struct GNUNET_CONTAINER_MultiHashMap *clients; |
62 | 62 | ||
@@ -70,6 +70,9 @@ struct TransmitMessage | |||
70 | 70 | ||
71 | char *buf; | 71 | char *buf; |
72 | uint16_t size; | 72 | uint16_t size; |
73 | /** | ||
74 | * enum GNUNET_PSYC_DataStatus | ||
75 | */ | ||
73 | uint8_t status; | 76 | uint8_t status; |
74 | }; | 77 | }; |
75 | 78 | ||
@@ -83,15 +86,17 @@ struct Channel | |||
83 | struct TransmitMessage *tmit_head; | 86 | struct TransmitMessage *tmit_head; |
84 | struct TransmitMessage *tmit_tail; | 87 | struct TransmitMessage *tmit_tail; |
85 | 88 | ||
86 | char *tmit_buf; | ||
87 | GNUNET_SCHEDULER_TaskIdentifier tmit_task; | 89 | GNUNET_SCHEDULER_TaskIdentifier tmit_task; |
88 | uint32_t tmit_mod_count; | 90 | uint32_t tmit_mod_count; |
89 | uint32_t tmit_mod_recvd; | 91 | uint32_t tmit_mod_recvd; |
90 | uint16_t tmit_size; | 92 | /** |
93 | * enum GNUNET_PSYC_DataStatus | ||
94 | */ | ||
91 | uint8_t tmit_status; | 95 | uint8_t tmit_status; |
92 | 96 | ||
93 | uint8_t in_transmit; | 97 | uint8_t in_transmit; |
94 | uint8_t is_master; | 98 | uint8_t is_master; |
99 | uint8_t disconnected; | ||
95 | }; | 100 | }; |
96 | 101 | ||
97 | /** | 102 | /** |
@@ -142,6 +147,10 @@ struct Slave | |||
142 | }; | 147 | }; |
143 | 148 | ||
144 | 149 | ||
150 | static void | ||
151 | transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay); | ||
152 | |||
153 | |||
145 | /** | 154 | /** |
146 | * Task run during shutdown. | 155 | * Task run during shutdown. |
147 | * | 156 | * |
@@ -163,6 +172,30 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
163 | } | 172 | } |
164 | } | 173 | } |
165 | 174 | ||
175 | |||
176 | static void | ||
177 | client_cleanup (struct Channel *ch) | ||
178 | { | ||
179 | if (ch->is_master) | ||
180 | { | ||
181 | struct Master *mst = (struct Master *) ch; | ||
182 | if (NULL != mst->origin) | ||
183 | GNUNET_MULTICAST_origin_stop (mst->origin); | ||
184 | } | ||
185 | else | ||
186 | { | ||
187 | struct Slave *slv = (struct Slave *) ch; | ||
188 | if (NULL != slv->join_req) | ||
189 | GNUNET_free (slv->join_req); | ||
190 | if (NULL != slv->relays) | ||
191 | GNUNET_free (slv->relays); | ||
192 | if (NULL != slv->member) | ||
193 | GNUNET_MULTICAST_member_part (slv->member); | ||
194 | } | ||
195 | |||
196 | GNUNET_free (ch); | ||
197 | } | ||
198 | |||
166 | /** | 199 | /** |
167 | * Called whenever a client is disconnected. | 200 | * Called whenever a client is disconnected. |
168 | * Frees our resources associated with that client. | 201 | * Frees our resources associated with that client. |
@@ -188,30 +221,17 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
188 | return; | 221 | return; |
189 | } | 222 | } |
190 | 223 | ||
191 | if (NULL != ch->tmit_buf) | 224 | ch->disconnected = GNUNET_YES; |
192 | { | ||
193 | GNUNET_free (ch->tmit_buf); | ||
194 | ch->tmit_buf = NULL; | ||
195 | } | ||
196 | 225 | ||
197 | if (ch->is_master) | 226 | /* Send pending messages to multicast before cleanup. */ |
227 | if (NULL != ch->tmit_head) | ||
198 | { | 228 | { |
199 | struct Master *mst = (struct Master *) ch; | 229 | transmit_message (ch, GNUNET_TIME_UNIT_ZERO); |
200 | if (NULL != mst->origin) | ||
201 | GNUNET_MULTICAST_origin_stop (mst->origin); | ||
202 | } | 230 | } |
203 | else | 231 | else |
204 | { | 232 | { |
205 | struct Slave *slv = (struct Slave *) ch; | 233 | client_cleanup (ch); |
206 | if (NULL != slv->join_req) | ||
207 | GNUNET_free (slv->join_req); | ||
208 | if (NULL != slv->relays) | ||
209 | GNUNET_free (slv->relays); | ||
210 | if (NULL != slv->member) | ||
211 | GNUNET_MULTICAST_member_part (slv->member); | ||
212 | } | 234 | } |
213 | |||
214 | GNUNET_free (ch); | ||
215 | } | 235 | } |
216 | 236 | ||
217 | void | 237 | void |
@@ -259,14 +279,98 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | |||
259 | 279 | ||
260 | } | 280 | } |
261 | 281 | ||
282 | |||
283 | void | ||
284 | fragment_store_result (void *cls, int64_t result, const char *err_msg) | ||
285 | { | ||
286 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
287 | "fragment_store() returned %l (%s)\n", result, err_msg); | ||
288 | } | ||
289 | |||
290 | /** | ||
291 | * Send PSYC messages in an incoming multicast message to a client. | ||
292 | */ | ||
293 | int | ||
294 | send_to_client (void *cls, const struct GNUNET_HashCode *ch_key_hash, void *chan) | ||
295 | { | ||
296 | const struct GNUNET_MULTICAST_MessageHeader *msg = cls; | ||
297 | struct Channel *ch = chan; | ||
298 | |||
299 | uint16_t size = ntohs (msg->header.size); | ||
300 | uint16_t pos = 0; | ||
301 | |||
302 | while (sizeof (*msg) + pos < size) | ||
303 | { | ||
304 | const struct GNUNET_MessageHeader *pmsg | ||
305 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | ||
306 | uint16_t psize = ntohs (pmsg->size); | ||
307 | if (sizeof (*msg) + pos + psize > size) | ||
308 | { | ||
309 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
310 | "Ignoring message of type %u with invalid size. " | ||
311 | "(%u + %u + %u > %u)\n", ntohs (pmsg->type), | ||
312 | sizeof (*msg), pos, psize, size); | ||
313 | break; | ||
314 | } | ||
315 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
316 | "Sending message of type %u and size %u to client.\n", | ||
317 | ntohs (pmsg->type), psize); | ||
318 | |||
319 | GNUNET_SERVER_notification_context_add (nc, ch->client); | ||
320 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, pmsg, | ||
321 | GNUNET_NO); | ||
322 | pos += psize; | ||
323 | } | ||
324 | return GNUNET_YES; | ||
325 | } | ||
326 | |||
327 | |||
328 | /** | ||
329 | * Incoming message fragment from multicast. | ||
330 | * | ||
331 | * Store it using PSYCstore and send it to all clients of the channel. | ||
332 | */ | ||
262 | void | 333 | void |
263 | message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | 334 | message_cb (void *cls, const struct GNUNET_MessageHeader *msg) |
264 | { | 335 | { |
336 | uint16_t type = ntohs (msg->type); | ||
337 | uint16_t size = ntohs (msg->size); | ||
338 | |||
265 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 339 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
266 | "Received message of type %u from multicast.\n", | 340 | "Received message of type %u and size %u from multicast.\n", |
267 | ntohs (msg->type)); | 341 | type, size); |
342 | |||
343 | struct Channel *ch = cls; | ||
344 | struct Master *mst = cls; | ||
345 | struct Slave *slv = cls; | ||
346 | |||
347 | struct GNUNET_CRYPTO_EddsaPublicKey *ch_key | ||
348 | = ch->is_master ? &mst->pub_key : &slv->chan_key; | ||
349 | struct GNUNET_HashCode *ch_key_hash | ||
350 | = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash; | ||
351 | |||
352 | switch (type) | ||
353 | { | ||
354 | case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: | ||
355 | GNUNET_PSYCSTORE_fragment_store (store, ch_key, | ||
356 | (const struct | ||
357 | GNUNET_MULTICAST_MessageHeader *) msg, | ||
358 | 0, NULL, NULL); | ||
359 | GNUNET_CONTAINER_multihashmap_get_multiple (clients, ch_key_hash, | ||
360 | send_to_client, (void *) msg); | ||
361 | break; | ||
362 | |||
363 | default: | ||
364 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
365 | "Ignoring unknown message of type %u and size %u.\n", | ||
366 | type, size); | ||
367 | } | ||
268 | } | 368 | } |
269 | 369 | ||
370 | |||
371 | /** | ||
372 | * Response from PSYCstore with the current counter values for a channel master. | ||
373 | */ | ||
270 | void | 374 | void |
271 | master_counters_cb (void *cls, int result, uint64_t max_fragment_id, | 375 | master_counters_cb (void *cls, int result, uint64_t max_fragment_id, |
272 | uint64_t max_message_id, uint64_t max_group_generation, | 376 | uint64_t max_message_id, uint64_t max_group_generation, |
@@ -299,6 +403,9 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, | |||
299 | } | 403 | } |
300 | 404 | ||
301 | 405 | ||
406 | /** | ||
407 | * Response from PSYCstore with the current counter values for a channel slave. | ||
408 | */ | ||
302 | void | 409 | void |
303 | slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, | 410 | slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, |
304 | uint64_t max_message_id, uint64_t max_group_generation, | 411 | uint64_t max_message_id, uint64_t max_group_generation, |
@@ -332,6 +439,9 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, | |||
332 | } | 439 | } |
333 | 440 | ||
334 | 441 | ||
442 | /** | ||
443 | * Handle a connecting client starting a channel master. | ||
444 | */ | ||
335 | static void | 445 | static void |
336 | handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, | 446 | handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, |
337 | const struct GNUNET_MessageHeader *msg) | 447 | const struct GNUNET_MessageHeader *msg) |
@@ -357,6 +467,9 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, | |||
357 | } | 467 | } |
358 | 468 | ||
359 | 469 | ||
470 | /** | ||
471 | * Handle a connecting client joining as a channel slave. | ||
472 | */ | ||
360 | static void | 473 | static void |
361 | handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | 474 | handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, |
362 | const struct GNUNET_MessageHeader *msg) | 475 | const struct GNUNET_MessageHeader *msg) |
@@ -389,13 +502,26 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
389 | } | 502 | } |
390 | 503 | ||
391 | 504 | ||
505 | /** | ||
506 | * Send transmission acknowledgement to a client. | ||
507 | * | ||
508 | * Sent after the last GNUNET_PSYC_MessageModifier and after each | ||
509 | * GNUNET_PSYC_MessageData. | ||
510 | * | ||
511 | * @param ch The channel struct for the client. | ||
512 | */ | ||
392 | static void | 513 | static void |
393 | send_transmit_ack (struct Channel *ch) | 514 | send_transmit_ack (struct Channel *ch) |
394 | { | 515 | { |
395 | struct TransmitAck *res = GNUNET_malloc (sizeof (*res)); | 516 | struct TransmitAck *res = GNUNET_malloc (sizeof (*res)); |
396 | res->header.size = htons (sizeof (*res)); | 517 | res->header.size = htons (sizeof (*res)); |
397 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); | 518 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); |
398 | res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size); | 519 | |
520 | res->buf_avail = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; | ||
521 | struct TransmitMessage *tmit_msg = ch->tmit_tail; | ||
522 | if (NULL != tmit_msg && GNUNET_PSYC_DATA_CONT == tmit_msg->status) | ||
523 | res->buf_avail -= tmit_msg->size; | ||
524 | res->buf_avail = htons (res->buf_avail); | ||
399 | 525 | ||
400 | GNUNET_SERVER_notification_context_add (nc, ch->client); | 526 | GNUNET_SERVER_notification_context_add (nc, ch->client); |
401 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, | 527 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, |
@@ -404,30 +530,53 @@ send_transmit_ack (struct Channel *ch) | |||
404 | } | 530 | } |
405 | 531 | ||
406 | 532 | ||
533 | /** | ||
534 | * Callback for the transmit functions of multicast. | ||
535 | */ | ||
407 | static int | 536 | static int |
408 | transmit_notify (void *cls, size_t *data_size, void *data) | 537 | transmit_notify (void *cls, size_t *data_size, void *data) |
409 | { | 538 | { |
410 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n"); | ||
411 | struct Channel *ch = cls; | 539 | struct Channel *ch = cls; |
412 | struct TransmitMessage *msg = ch->tmit_head; | 540 | struct TransmitMessage *msg = ch->tmit_head; |
413 | 541 | ||
414 | if (NULL == msg || *data_size < ntohs (msg->size)) | 542 | if (NULL == msg || *data_size < msg->size) |
415 | { | 543 | { |
544 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n"); | ||
416 | *data_size = 0; | 545 | *data_size = 0; |
417 | return GNUNET_NO; | 546 | return GNUNET_NO; |
418 | } | 547 | } |
419 | 548 | ||
420 | *data_size = ntohs (msg->size); | 549 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
550 | "transmit_notify: sending %u bytes.\n", msg->size); | ||
551 | |||
552 | *data_size = msg->size; | ||
421 | memcpy (data, msg->buf, *data_size); | 553 | memcpy (data, msg->buf, *data_size); |
422 | 554 | ||
423 | GNUNET_free (ch->tmit_buf); | ||
424 | ch->tmit_buf = NULL; | ||
425 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); | 555 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); |
556 | GNUNET_free (msg); | ||
557 | |||
558 | int ret = (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; | ||
559 | |||
560 | if (0 == ch->tmit_task) | ||
561 | { | ||
562 | if (NULL != ch->tmit_head) | ||
563 | { | ||
564 | transmit_message (ch, GNUNET_TIME_UNIT_ZERO); | ||
565 | } | ||
566 | else if (ch->disconnected) | ||
567 | { | ||
568 | /* FIXME: handle partial message (when still in_transmit) */ | ||
569 | client_cleanup (ch); | ||
570 | } | ||
571 | } | ||
426 | 572 | ||
427 | return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; | 573 | return ret; |
428 | } | 574 | } |
429 | 575 | ||
430 | 576 | ||
577 | /** | ||
578 | * Transmit a message from a channel master to the multicast group. | ||
579 | */ | ||
431 | static void | 580 | static void |
432 | master_transmit_message (void *cls, | 581 | master_transmit_message (void *cls, |
433 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 582 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
@@ -449,6 +598,9 @@ master_transmit_message (void *cls, | |||
449 | } | 598 | } |
450 | 599 | ||
451 | 600 | ||
601 | /** | ||
602 | * Transmit a message from a channel slave to the multicast group. | ||
603 | */ | ||
452 | static void | 604 | static void |
453 | slave_transmit_message (void *cls, | 605 | slave_transmit_message (void *cls, |
454 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 606 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
@@ -468,50 +620,90 @@ slave_transmit_message (void *cls, | |||
468 | } | 620 | } |
469 | 621 | ||
470 | 622 | ||
623 | /** | ||
624 | * Schedule message transmission from a channel to the multicast group. | ||
625 | * | ||
626 | * @param ch The channel. | ||
627 | * @param delay Transmission delay. | ||
628 | */ | ||
629 | static void | ||
630 | transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay) | ||
631 | { | ||
632 | if (0 != ch->tmit_task) | ||
633 | GNUNET_SCHEDULER_cancel (ch->tmit_task); | ||
634 | |||
635 | ch->tmit_task | ||
636 | = ch->is_master | ||
637 | ? GNUNET_SCHEDULER_add_delayed (delay, master_transmit_message, ch) | ||
638 | : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch); | ||
639 | } | ||
640 | |||
641 | /** | ||
642 | * Queue incoming message parts from a client for transmission, and send them to | ||
643 | * the multicast group when the buffer is full or reached the end of message. | ||
644 | * | ||
645 | * @param ch Channel struct for the client. | ||
646 | * @param msg Message from the client. | ||
647 | * | ||
648 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR. | ||
649 | */ | ||
471 | static int | 650 | static int |
472 | buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) | 651 | queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) |
473 | { | 652 | { |
474 | uint16_t size = ntohs (msg->size); | 653 | uint16_t size = ntohs (msg->size); |
475 | struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO; | 654 | struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO; |
655 | struct TransmitMessage *tmit_msg = ch->tmit_tail; | ||
656 | |||
657 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
658 | "Queueing message of type %u and size %u " | ||
659 | "for transmission to multicast.\n", | ||
660 | ntohs (msg->type), size); | ||
476 | 661 | ||
477 | if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size) | 662 | if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size) |
478 | return GNUNET_SYSERR; | 663 | return GNUNET_SYSERR; |
479 | 664 | ||
480 | if (0 == ch->tmit_size) | 665 | if (NULL == tmit_msg |
481 | { | 666 | || tmit_msg->status != GNUNET_PSYC_DATA_CONT |
482 | ch->tmit_buf = GNUNET_malloc (size); | 667 | || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < tmit_msg->size + size) |
483 | memcpy (ch->tmit_buf, msg, size); | ||
484 | ch->tmit_size = size; | ||
485 | } | ||
486 | else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size) | ||
487 | { | ||
488 | ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size); | ||
489 | memcpy (ch->tmit_buf + ch->tmit_size, msg, size); | ||
490 | ch->tmit_size += size; | ||
491 | } | ||
492 | |||
493 | if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE | ||
494 | < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData)) | ||
495 | { | 668 | { |
496 | struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage); | 669 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
497 | tmit_msg->buf = (char *) msg; | 670 | "Appending message qto new buffer.\n"); |
671 | /* Start filling up new buffer */ | ||
672 | tmit_msg = GNUNET_new (struct TransmitMessage); | ||
673 | tmit_msg->buf = GNUNET_malloc (size); | ||
674 | memcpy (tmit_msg->buf, msg, size); | ||
498 | tmit_msg->size = size; | 675 | tmit_msg->size = size; |
499 | tmit_msg->status = ch->tmit_status; | 676 | tmit_msg->status = ch->tmit_status; |
500 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); | 677 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); |
501 | tmit_delay = GNUNET_TIME_UNIT_ZERO; | 678 | } |
679 | else | ||
680 | { | ||
681 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
682 | "Appending message to existing buffer.\n"); | ||
683 | /* Append to existing buffer */ | ||
684 | tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size); | ||
685 | memcpy (tmit_msg->buf + tmit_msg->size, msg, size); | ||
686 | tmit_msg->size += size; | ||
687 | tmit_msg->status = ch->tmit_status; | ||
502 | } | 688 | } |
503 | 689 | ||
504 | if (0 != ch->tmit_task) | 690 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size); |
505 | GNUNET_SCHEDULER_cancel (ch->tmit_task); | ||
506 | 691 | ||
507 | ch->tmit_task | 692 | /* Wait a bit for the remaining message parts from the client |
508 | = ch->is_master | 693 | if there's still some space left in the buffer. */ |
509 | ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch) | 694 | if (GNUNET_PSYC_DATA_CONT == tmit_msg->status |
510 | : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch); | 695 | && (tmit_msg->size + sizeof (struct GNUNET_PSYC_MessageData) |
696 | < GNUNET_MULTICAST_FRAGMENT_MAX_SIZE)) | ||
697 | tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2); | ||
698 | |||
699 | transmit_message (ch, tmit_delay); | ||
511 | 700 | ||
512 | return GNUNET_OK; | 701 | return GNUNET_OK; |
513 | } | 702 | } |
514 | 703 | ||
704 | /** | ||
705 | * Incoming method from a client. | ||
706 | */ | ||
515 | static void | 707 | static void |
516 | handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, | 708 | handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, |
517 | const struct GNUNET_MessageHeader *msg) | 709 | const struct GNUNET_MessageHeader *msg) |
@@ -524,18 +716,16 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, | |||
524 | 716 | ||
525 | if (GNUNET_NO != ch->in_transmit) | 717 | if (GNUNET_NO != ch->in_transmit) |
526 | { | 718 | { |
527 | // FIXME: already transmitting a message, send back error message. | 719 | /* FIXME: already transmitting a message, send back error message. */ |
528 | return; | 720 | return; |
529 | } | 721 | } |
530 | 722 | ||
531 | ch->in_transmit = GNUNET_YES; | 723 | ch->in_transmit = GNUNET_YES; |
532 | ch->tmit_buf = NULL; | ||
533 | ch->tmit_size = 0; | ||
534 | ch->tmit_mod_recvd = 0; | 724 | ch->tmit_mod_recvd = 0; |
535 | ch->tmit_mod_count = ntohl (meth->mod_count); | 725 | ch->tmit_mod_count = ntohl (meth->mod_count); |
536 | ch->tmit_status = GNUNET_PSYC_DATA_CONT; | 726 | ch->tmit_status = GNUNET_PSYC_DATA_CONT; |
537 | 727 | ||
538 | buffer_message (ch, msg); | 728 | queue_message (ch, msg); |
539 | 729 | ||
540 | if (0 == ch->tmit_mod_count) | 730 | if (0 == ch->tmit_mod_count) |
541 | send_transmit_ack (ch); | 731 | send_transmit_ack (ch); |
@@ -544,18 +734,23 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, | |||
544 | }; | 734 | }; |
545 | 735 | ||
546 | 736 | ||
737 | /** | ||
738 | * Incoming modifier from a client. | ||
739 | */ | ||
547 | static void | 740 | static void |
548 | handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, | 741 | handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, |
549 | const struct GNUNET_MessageHeader *msg) | 742 | const struct GNUNET_MessageHeader *msg) |
550 | { | 743 | { |
744 | /* | ||
551 | const struct GNUNET_PSYC_MessageModifier *mod | 745 | const struct GNUNET_PSYC_MessageModifier *mod |
552 | = (const struct GNUNET_PSYC_MessageModifier *) msg; | 746 | = (const struct GNUNET_PSYC_MessageModifier *) msg; |
747 | */ | ||
553 | struct Channel *ch | 748 | struct Channel *ch |
554 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 749 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
555 | GNUNET_assert (NULL != ch); | 750 | GNUNET_assert (NULL != ch); |
556 | 751 | ||
557 | ch->tmit_mod_recvd++; | 752 | ch->tmit_mod_recvd++; |
558 | buffer_message (ch, msg); | 753 | queue_message (ch, msg); |
559 | 754 | ||
560 | if (ch->tmit_mod_recvd == ch->tmit_mod_count) | 755 | if (ch->tmit_mod_recvd == ch->tmit_mod_count) |
561 | send_transmit_ack (ch); | 756 | send_transmit_ack (ch); |
@@ -564,6 +759,9 @@ handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, | |||
564 | }; | 759 | }; |
565 | 760 | ||
566 | 761 | ||
762 | /** | ||
763 | * Incoming data from a client. | ||
764 | */ | ||
567 | static void | 765 | static void |
568 | handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, | 766 | handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, |
569 | const struct GNUNET_MessageHeader *msg) | 767 | const struct GNUNET_MessageHeader *msg) |
@@ -575,7 +773,7 @@ handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, | |||
575 | GNUNET_assert (NULL != ch); | 773 | GNUNET_assert (NULL != ch); |
576 | 774 | ||
577 | ch->tmit_status = ntohs (data->status); | 775 | ch->tmit_status = ntohs (data->status); |
578 | buffer_message (ch, msg); | 776 | queue_message (ch, msg); |
579 | send_transmit_ack (ch); | 777 | send_transmit_ack (ch); |
580 | 778 | ||
581 | if (GNUNET_PSYC_DATA_CONT != ch->tmit_status) | 779 | if (GNUNET_PSYC_DATA_CONT != ch->tmit_status) |
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 18b5920b3..f971de1b8 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -69,12 +69,12 @@ struct GNUNET_PSYC_Channel | |||
69 | /** | 69 | /** |
70 | * Head of operations to transmit. | 70 | * Head of operations to transmit. |
71 | */ | 71 | */ |
72 | struct OperationHandle *transmit_head; | 72 | struct OperationHandle *tmit_head; |
73 | 73 | ||
74 | /** | 74 | /** |
75 | * Tail of operations to transmit. | 75 | * Tail of operations to transmit. |
76 | */ | 76 | */ |
77 | struct OperationHandle *transmit_tail; | 77 | struct OperationHandle *tmit_tail; |
78 | 78 | ||
79 | /** | 79 | /** |
80 | * Message to send on reconnect. | 80 | * Message to send on reconnect. |
@@ -116,6 +116,16 @@ struct GNUNET_PSYC_Channel | |||
116 | * Buffer space available for transmitting the next data fragment. | 116 | * Buffer space available for transmitting the next data fragment. |
117 | */ | 117 | */ |
118 | uint16_t tmit_buf_avail; | 118 | uint16_t tmit_buf_avail; |
119 | |||
120 | /** | ||
121 | * Is transmission paused? | ||
122 | */ | ||
123 | uint8_t tmit_paused; | ||
124 | |||
125 | /** | ||
126 | * Are we still waiting for a PSYC_TRANSMIT_ACK? | ||
127 | */ | ||
128 | uint8_t tmit_ack_pending; | ||
119 | }; | 129 | }; |
120 | 130 | ||
121 | 131 | ||
@@ -243,6 +253,11 @@ static void | |||
243 | transmit_next (struct GNUNET_PSYC_Channel *ch); | 253 | transmit_next (struct GNUNET_PSYC_Channel *ch); |
244 | 254 | ||
245 | 255 | ||
256 | /** | ||
257 | * Request data from client to transmit. | ||
258 | * | ||
259 | * @param mst Master handle. | ||
260 | */ | ||
246 | static void | 261 | static void |
247 | master_transmit_data (struct GNUNET_PSYC_Master *mst) | 262 | master_transmit_data (struct GNUNET_PSYC_Master *mst) |
248 | { | 263 | { |
@@ -268,12 +283,13 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
268 | default: | 283 | default: |
269 | mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; | 284 | mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; |
270 | data_size = 0; | 285 | data_size = 0; |
271 | LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n"); | 286 | LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error.\n"); |
272 | } | 287 | } |
273 | 288 | ||
274 | if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) | 289 | if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) |
275 | { | 290 | { |
276 | /* Transmission paused, nothing to send. */ | 291 | /* Transmission paused, nothing to send. */ |
292 | ch->tmit_paused = GNUNET_YES; | ||
277 | GNUNET_free (op); | 293 | GNUNET_free (op); |
278 | } | 294 | } |
279 | else | 295 | else |
@@ -281,7 +297,8 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
281 | GNUNET_assert (data_size <= ch->tmit_buf_avail); | 297 | GNUNET_assert (data_size <= ch->tmit_buf_avail); |
282 | pdata->header.size = htons (sizeof (*pdata) + data_size); | 298 | pdata->header.size = htons (sizeof (*pdata) + data_size); |
283 | pdata->status = htons (mst->tmit->status); | 299 | pdata->status = htons (mst->tmit->status); |
284 | GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); | 300 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); |
301 | ch->tmit_ack_pending = GNUNET_YES; | ||
285 | transmit_next (ch); | 302 | transmit_next (ch); |
286 | } | 303 | } |
287 | } | 304 | } |
@@ -305,7 +322,6 @@ message_handler (void *cls, | |||
305 | struct CountersResult *cres; | 322 | struct CountersResult *cres; |
306 | struct TransmitAck *tack; | 323 | struct TransmitAck *tack; |
307 | 324 | ||
308 | |||
309 | if (NULL == msg) | 325 | if (NULL == msg) |
310 | { | 326 | { |
311 | reschedule_connect (ch); | 327 | reschedule_connect (ch); |
@@ -317,7 +333,8 @@ message_handler (void *cls, | |||
317 | uint16_t type = ntohs (msg->type); | 333 | uint16_t type = ntohs (msg->type); |
318 | 334 | ||
319 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 335 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
320 | "Received message of type %d from PSYC service\n", type); | 336 | "Received message of type %d and size %u from PSYC service\n", |
337 | type, size); | ||
321 | 338 | ||
322 | switch (type) | 339 | switch (type) |
323 | { | 340 | { |
@@ -328,10 +345,16 @@ message_handler (void *cls, | |||
328 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | 345 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: |
329 | size_eq = sizeof (struct TransmitAck); | 346 | size_eq = sizeof (struct TransmitAck); |
330 | break; | 347 | break; |
348 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
349 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | ||
350 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
351 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
352 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
353 | size_min = sizeof (struct GNUNET_PSYC_MessageData); | ||
331 | } | 354 | } |
332 | 355 | ||
333 | if (! ((0 < size_eq && size == size_eq) | 356 | if (! ((0 < size_eq && size == size_eq) |
334 | || (0 < size_min && size >= size_min))) | 357 | || (0 < size_min && size_min <= size))) |
335 | { | 358 | { |
336 | GNUNET_break (0); | 359 | GNUNET_break (0); |
337 | reschedule_connect (ch); | 360 | reschedule_connect (ch); |
@@ -370,7 +393,9 @@ message_handler (void *cls, | |||
370 | else | 393 | else |
371 | { | 394 | { |
372 | ch->tmit_buf_avail = ntohs (tack->buf_avail); | 395 | ch->tmit_buf_avail = ntohs (tack->buf_avail); |
373 | master_transmit_data (mst); | 396 | ch->tmit_ack_pending = GNUNET_NO; |
397 | if (GNUNET_NO == ch->tmit_paused) | ||
398 | master_transmit_data (mst); | ||
374 | } | 399 | } |
375 | } | 400 | } |
376 | else | 401 | else |
@@ -378,6 +403,18 @@ message_handler (void *cls, | |||
378 | /* TODO: slave */ | 403 | /* TODO: slave */ |
379 | } | 404 | } |
380 | break; | 405 | break; |
406 | |||
407 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
408 | |||
409 | break; | ||
410 | |||
411 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
412 | |||
413 | break; | ||
414 | |||
415 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
416 | |||
417 | break; | ||
381 | } | 418 | } |
382 | 419 | ||
383 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, | 420 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, |
@@ -397,9 +434,9 @@ static size_t | |||
397 | send_next_message (void *cls, size_t size, void *buf) | 434 | send_next_message (void *cls, size_t size, void *buf) |
398 | { | 435 | { |
399 | struct GNUNET_PSYC_Channel *ch = cls; | 436 | struct GNUNET_PSYC_Channel *ch = cls; |
400 | struct OperationHandle *op = ch->transmit_head; | 437 | struct OperationHandle *op = ch->tmit_head; |
401 | size_t ret; | 438 | size_t ret; |
402 | 439 | LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); | |
403 | ch->th = NULL; | 440 | ch->th = NULL; |
404 | if (NULL == op->msg) | 441 | if (NULL == op->msg) |
405 | return 0; | 442 | return 0; |
@@ -409,15 +446,12 @@ send_next_message (void *cls, size_t size, void *buf) | |||
409 | reschedule_connect (ch); | 446 | reschedule_connect (ch); |
410 | return 0; | 447 | return 0; |
411 | } | 448 | } |
412 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
413 | "Sending message of type %d to PSYC service\n", | ||
414 | ntohs (op->msg->type)); | ||
415 | memcpy (buf, op->msg, ret); | 449 | memcpy (buf, op->msg, ret); |
416 | 450 | ||
417 | GNUNET_CONTAINER_DLL_remove (ch->transmit_head, ch->transmit_tail, op); | 451 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, op); |
418 | GNUNET_free (op); | 452 | GNUNET_free (op); |
419 | 453 | ||
420 | if (NULL != ch->transmit_head) | 454 | if (NULL != ch->tmit_head) |
421 | transmit_next (ch); | 455 | transmit_next (ch); |
422 | 456 | ||
423 | if (GNUNET_NO == ch->in_receive) | 457 | if (GNUNET_NO == ch->in_receive) |
@@ -438,10 +472,11 @@ send_next_message (void *cls, size_t size, void *buf) | |||
438 | static void | 472 | static void |
439 | transmit_next (struct GNUNET_PSYC_Channel *ch) | 473 | transmit_next (struct GNUNET_PSYC_Channel *ch) |
440 | { | 474 | { |
475 | LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); | ||
441 | if (NULL != ch->th || NULL == ch->client) | 476 | if (NULL != ch->th || NULL == ch->client) |
442 | return; | 477 | return; |
443 | 478 | ||
444 | struct OperationHandle *op = ch->transmit_head; | 479 | struct OperationHandle *op = ch->tmit_head; |
445 | if (NULL == op) | 480 | if (NULL == op) |
446 | return; | 481 | return; |
447 | 482 | ||
@@ -472,14 +507,14 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
472 | ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); | 507 | ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); |
473 | GNUNET_assert (NULL != ch->client); | 508 | GNUNET_assert (NULL != ch->client); |
474 | 509 | ||
475 | if (NULL == ch->transmit_head || | 510 | if (NULL == ch->tmit_head || |
476 | ch->transmit_head->msg->type != ch->reconnect_msg->type) | 511 | ch->tmit_head->msg->type != ch->reconnect_msg->type) |
477 | { | 512 | { |
478 | uint16_t reconn_size = ntohs (ch->reconnect_msg->size); | 513 | uint16_t reconn_size = ntohs (ch->reconnect_msg->size); |
479 | struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); | 514 | struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); |
480 | memcpy (&op[1], ch->reconnect_msg, reconn_size); | 515 | memcpy (&op[1], ch->reconnect_msg, reconn_size); |
481 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | 516 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; |
482 | GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); | 517 | GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, op); |
483 | } | 518 | } |
484 | transmit_next (ch); | 519 | transmit_next (ch); |
485 | } | 520 | } |
@@ -496,7 +531,7 @@ disconnect (void *c) | |||
496 | struct GNUNET_PSYC_Channel *ch = c; | 531 | struct GNUNET_PSYC_Channel *ch = c; |
497 | 532 | ||
498 | GNUNET_assert (NULL != ch); | 533 | GNUNET_assert (NULL != ch); |
499 | if (ch->transmit_head != ch->transmit_tail) | 534 | if (ch->tmit_head != ch->tmit_tail) |
500 | { | 535 | { |
501 | LOG (GNUNET_ERROR_TYPE_ERROR, | 536 | LOG (GNUNET_ERROR_TYPE_ERROR, |
502 | "Disconnecting while there are still outstanding messages!\n"); | 537 | "Disconnecting while there are still outstanding messages!\n"); |
@@ -654,7 +689,7 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) | |||
654 | memcpy (&pmod[1], mod->name, name_size); | 689 | memcpy (&pmod[1], mod->name, name_size); |
655 | memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); | 690 | memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); |
656 | 691 | ||
657 | GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); | 692 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); |
658 | return GNUNET_YES; | 693 | return GNUNET_YES; |
659 | } | 694 | } |
660 | 695 | ||
@@ -699,7 +734,7 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | |||
699 | pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); | 734 | pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); |
700 | memcpy (&pmeth[1], method_name, size); | 735 | memcpy (&pmeth[1], method_name, size); |
701 | 736 | ||
702 | GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); | 737 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); |
703 | GNUNET_ENV_environment_iterate (env, send_modifier, master); | 738 | GNUNET_ENV_environment_iterate (env, send_modifier, master); |
704 | transmit_next (ch); | 739 | transmit_next (ch); |
705 | 740 | ||
@@ -720,7 +755,12 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | |||
720 | void | 755 | void |
721 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | 756 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) |
722 | { | 757 | { |
723 | master_transmit_data (th->master); | 758 | struct GNUNET_PSYC_Channel *ch = &th->master->ch; |
759 | if (GNUNET_NO == ch->tmit_ack_pending) | ||
760 | { | ||
761 | ch->tmit_paused = GNUNET_NO; | ||
762 | master_transmit_data (th->master); | ||
763 | } | ||
724 | } | 764 | } |
725 | 765 | ||
726 | 766 | ||
@@ -938,8 +978,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, | |||
938 | slvadd->header.size = htons (sizeof (*slvadd)); | 978 | slvadd->header.size = htons (sizeof (*slvadd)); |
939 | slvadd->announced_at = GNUNET_htonll (announced_at); | 979 | slvadd->announced_at = GNUNET_htonll (announced_at); |
940 | slvadd->effective_since = GNUNET_htonll (effective_since); | 980 | slvadd->effective_since = GNUNET_htonll (effective_since); |
941 | GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head, | 981 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, |
942 | channel->transmit_tail, | 982 | channel->tmit_tail, |
943 | op); | 983 | op); |
944 | transmit_next (channel); | 984 | transmit_next (channel); |
945 | } | 985 | } |
@@ -979,8 +1019,8 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, | |||
979 | slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM; | 1019 | slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM; |
980 | slvrm->header.size = htons (sizeof (*slvrm)); | 1020 | slvrm->header.size = htons (sizeof (*slvrm)); |
981 | slvrm->announced_at = GNUNET_htonll (announced_at); | 1021 | slvrm->announced_at = GNUNET_htonll (announced_at); |
982 | GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head, | 1022 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, |
983 | channel->transmit_tail, | 1023 | channel->tmit_tail, |
984 | op); | 1024 | op); |
985 | transmit_next (channel); | 1025 | transmit_next (channel); |
986 | } | 1026 | } |
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 41c4ca8e4..2986fdf6a 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -144,10 +144,12 @@ join (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | |||
144 | return GNUNET_OK; | 144 | return GNUNET_OK; |
145 | } | 145 | } |
146 | 146 | ||
147 | |||
147 | struct TransmitClosure | 148 | struct TransmitClosure |
148 | { | 149 | { |
149 | struct GNUNET_PSYC_MasterTransmitHandle *handle; | 150 | struct GNUNET_PSYC_MasterTransmitHandle *handle; |
150 | uint8_t n; | 151 | uint8_t n; |
152 | uint8_t paused; | ||
151 | uint8_t fragment_count; | 153 | uint8_t fragment_count; |
152 | char *fragments[16]; | 154 | char *fragments[16]; |
153 | uint16_t fragment_sizes[16]; | 155 | uint16_t fragment_sizes[16]; |
@@ -157,8 +159,9 @@ struct TransmitClosure | |||
157 | static void | 159 | static void |
158 | transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 160 | transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
159 | { | 161 | { |
160 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Transmit resume\n"); | 162 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); |
161 | struct TransmitClosure *tmit = cls; | 163 | struct TransmitClosure *tmit = cls; |
164 | tmit->paused = GNUNET_NO; | ||
162 | GNUNET_PSYC_master_transmit_resume (tmit->handle); | 165 | GNUNET_PSYC_master_transmit_resume (tmit->handle); |
163 | } | 166 | } |
164 | 167 | ||
@@ -167,33 +170,36 @@ static int | |||
167 | transmit_notify (void *cls, size_t *data_size, void *data) | 170 | transmit_notify (void *cls, size_t *data_size, void *data) |
168 | { | 171 | { |
169 | struct TransmitClosure *tmit = cls; | 172 | struct TransmitClosure *tmit = cls; |
170 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 173 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
171 | "Transmit notify: %lu bytes\n", *data_size); | 174 | "Transmit notify: %lu bytes available, " |
172 | 175 | "processing fragment %u/%u.\n", | |
173 | if (tmit->fragment_count <= tmit->n) | 176 | *data_size, tmit->n + 1, tmit->fragment_count); |
174 | return GNUNET_YES; | ||
175 | |||
176 | GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); | 177 | GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); |
177 | 178 | ||
178 | *data_size = tmit->fragment_sizes[tmit->n]; | 179 | if (GNUNET_YES == tmit->paused && tmit->n == tmit->fragment_count - 1) |
179 | memcpy (data, tmit->fragments[tmit->n], *data_size); | ||
180 | tmit->n++; | ||
181 | |||
182 | if (tmit->n == tmit->fragment_count - 1) | ||
183 | { | 180 | { |
184 | /* Send last fragment later. */ | 181 | /* Send last fragment later. */ |
185 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &transmit_resume, | 182 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); |
186 | tmit); | 183 | tmit->paused = GNUNET_YES; |
184 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply | ||
185 | (GNUNET_TIME_UNIT_SECONDS, 3), | ||
186 | &transmit_resume, tmit); | ||
187 | *data_size = 0; | 187 | *data_size = 0; |
188 | return GNUNET_NO; | 188 | return GNUNET_NO; |
189 | } | 189 | } |
190 | return tmit->n <= tmit->fragment_count ? GNUNET_NO : GNUNET_YES; | 190 | |
191 | GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); | ||
192 | *data_size = tmit->fragment_sizes[tmit->n]; | ||
193 | memcpy (data, tmit->fragments[tmit->n], *data_size); | ||
194 | |||
195 | return ++tmit->n < tmit->fragment_count ? GNUNET_NO : GNUNET_YES; | ||
191 | } | 196 | } |
192 | 197 | ||
193 | void | 198 | void |
194 | master_started (void *cls, uint64_t max_message_id) | 199 | master_started (void *cls, uint64_t max_message_id) |
195 | { | 200 | { |
196 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", max_message_id); | 201 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
202 | "Master started: %lu\n", max_message_id); | ||
197 | 203 | ||
198 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); | 204 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); |
199 | GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, | 205 | GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, |
@@ -202,11 +208,13 @@ master_started (void *cls, uint64_t max_message_id) | |||
202 | "_foo_bar", "foo bar baz", 11); | 208 | "_foo_bar", "foo bar baz", 11); |
203 | 209 | ||
204 | struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); | 210 | struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); |
205 | tmit->fragment_count = 2; | 211 | tmit->fragment_count = 3; |
206 | tmit->fragments[0] = "foo bar"; | 212 | tmit->fragments[0] = "foo"; |
207 | tmit->fragment_sizes[0] = 7; | 213 | tmit->fragment_sizes[0] = 4; |
208 | tmit->fragments[1] = "baz!"; | 214 | tmit->fragments[1] = "foo bar"; |
209 | tmit->fragment_sizes[1] = 4; | 215 | tmit->fragment_sizes[1] = 7; |
216 | tmit->fragments[2] = "foo bar baz"; | ||
217 | tmit->fragment_sizes[2] = 11; | ||
210 | tmit->handle | 218 | tmit->handle |
211 | = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit, | 219 | = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit, |
212 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); | 220 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); |