aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2013-11-09 23:12:27 +0000
committerGabor X Toth <*@tg-x.net>2013-11-09 23:12:27 +0000
commit172ab07eeb1215cc9d22dabc589f7529ac2d59ea (patch)
treed75fc75033c7e93b58c1908f06c7b856bc317b8e /src/psyc
parentd10808d7f17c5f6f1356c22ef0992965cbaf5ce1 (diff)
downloadgnunet-172ab07eeb1215cc9d22dabc589f7529ac2d59ea.tar.gz
gnunet-172ab07eeb1215cc9d22dabc589f7529ac2d59ea.zip
psyc: handling messages from multicast and passing them to clients; pause/resume fixes
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c320
-rw-r--r--src/psyc/psyc_api.c94
-rw-r--r--src/psyc/test_psyc.c50
3 files changed, 355 insertions, 109 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 77eb82a4c..a3b1b8f82 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -56,7 +56,7 @@ static struct GNUNET_SERVER_NotificationContext *nc;
56static struct GNUNET_PSYCSTORE_Handle *store; 56static struct GNUNET_PSYCSTORE_Handle *store;
57 57
58/** 58/**
59 * channel's pub_key_hash -> struct Channel 59 * Channel's pub_key_hash -> struct Channel
60 */ 60 */
61static struct GNUNET_CONTAINER_MultiHashMap *clients; 61static struct GNUNET_CONTAINER_MultiHashMap *clients;
62 62
@@ -70,6 +70,9 @@ struct TransmitMessage
70 70
71 char *buf; 71 char *buf;
72 uint16_t size; 72 uint16_t size;
73 /**
74 * enum GNUNET_PSYC_DataStatus
75 */
73 uint8_t status; 76 uint8_t status;
74}; 77};
75 78
@@ -83,15 +86,17 @@ struct Channel
83 struct TransmitMessage *tmit_head; 86 struct TransmitMessage *tmit_head;
84 struct TransmitMessage *tmit_tail; 87 struct TransmitMessage *tmit_tail;
85 88
86 char *tmit_buf;
87 GNUNET_SCHEDULER_TaskIdentifier tmit_task; 89 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
88 uint32_t tmit_mod_count; 90 uint32_t tmit_mod_count;
89 uint32_t tmit_mod_recvd; 91 uint32_t tmit_mod_recvd;
90 uint16_t tmit_size; 92 /**
93 * enum GNUNET_PSYC_DataStatus
94 */
91 uint8_t tmit_status; 95 uint8_t tmit_status;
92 96
93 uint8_t in_transmit; 97 uint8_t in_transmit;
94 uint8_t is_master; 98 uint8_t is_master;
99 uint8_t disconnected;
95}; 100};
96 101
97/** 102/**
@@ -142,6 +147,10 @@ struct Slave
142}; 147};
143 148
144 149
150static void
151transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay);
152
153
145/** 154/**
146 * Task run during shutdown. 155 * Task run during shutdown.
147 * 156 *
@@ -163,6 +172,30 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
163 } 172 }
164} 173}
165 174
175
176static void
177client_cleanup (struct Channel *ch)
178{
179 if (ch->is_master)
180 {
181 struct Master *mst = (struct Master *) ch;
182 if (NULL != mst->origin)
183 GNUNET_MULTICAST_origin_stop (mst->origin);
184 }
185 else
186 {
187 struct Slave *slv = (struct Slave *) ch;
188 if (NULL != slv->join_req)
189 GNUNET_free (slv->join_req);
190 if (NULL != slv->relays)
191 GNUNET_free (slv->relays);
192 if (NULL != slv->member)
193 GNUNET_MULTICAST_member_part (slv->member);
194 }
195
196 GNUNET_free (ch);
197}
198
166/** 199/**
167 * Called whenever a client is disconnected. 200 * Called whenever a client is disconnected.
168 * Frees our resources associated with that client. 201 * Frees our resources associated with that client.
@@ -188,30 +221,17 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
188 return; 221 return;
189 } 222 }
190 223
191 if (NULL != ch->tmit_buf) 224 ch->disconnected = GNUNET_YES;
192 {
193 GNUNET_free (ch->tmit_buf);
194 ch->tmit_buf = NULL;
195 }
196 225
197 if (ch->is_master) 226 /* Send pending messages to multicast before cleanup. */
227 if (NULL != ch->tmit_head)
198 { 228 {
199 struct Master *mst = (struct Master *) ch; 229 transmit_message (ch, GNUNET_TIME_UNIT_ZERO);
200 if (NULL != mst->origin)
201 GNUNET_MULTICAST_origin_stop (mst->origin);
202 } 230 }
203 else 231 else
204 { 232 {
205 struct Slave *slv = (struct Slave *) ch; 233 client_cleanup (ch);
206 if (NULL != slv->join_req)
207 GNUNET_free (slv->join_req);
208 if (NULL != slv->relays)
209 GNUNET_free (slv->relays);
210 if (NULL != slv->member)
211 GNUNET_MULTICAST_member_part (slv->member);
212 } 234 }
213
214 GNUNET_free (ch);
215} 235}
216 236
217void 237void
@@ -259,14 +279,98 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
259 279
260} 280}
261 281
282
283void
284fragment_store_result (void *cls, int64_t result, const char *err_msg)
285{
286 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
287 "fragment_store() returned %l (%s)\n", result, err_msg);
288}
289
290/**
291 * Send PSYC messages in an incoming multicast message to a client.
292 */
293int
294send_to_client (void *cls, const struct GNUNET_HashCode *ch_key_hash, void *chan)
295{
296 const struct GNUNET_MULTICAST_MessageHeader *msg = cls;
297 struct Channel *ch = chan;
298
299 uint16_t size = ntohs (msg->header.size);
300 uint16_t pos = 0;
301
302 while (sizeof (*msg) + pos < size)
303 {
304 const struct GNUNET_MessageHeader *pmsg
305 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
306 uint16_t psize = ntohs (pmsg->size);
307 if (sizeof (*msg) + pos + psize > size)
308 {
309 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
310 "Ignoring message of type %u with invalid size. "
311 "(%u + %u + %u > %u)\n", ntohs (pmsg->type),
312 sizeof (*msg), pos, psize, size);
313 break;
314 }
315 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
316 "Sending message of type %u and size %u to client.\n",
317 ntohs (pmsg->type), psize);
318
319 GNUNET_SERVER_notification_context_add (nc, ch->client);
320 GNUNET_SERVER_notification_context_unicast (nc, ch->client, pmsg,
321 GNUNET_NO);
322 pos += psize;
323 }
324 return GNUNET_YES;
325}
326
327
328/**
329 * Incoming message fragment from multicast.
330 *
331 * Store it using PSYCstore and send it to all clients of the channel.
332 */
262void 333void
263message_cb (void *cls, const struct GNUNET_MessageHeader *msg) 334message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
264{ 335{
336 uint16_t type = ntohs (msg->type);
337 uint16_t size = ntohs (msg->size);
338
265 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266 "Received message of type %u from multicast.\n", 340 "Received message of type %u and size %u from multicast.\n",
267 ntohs (msg->type)); 341 type, size);
342
343 struct Channel *ch = cls;
344 struct Master *mst = cls;
345 struct Slave *slv = cls;
346
347 struct GNUNET_CRYPTO_EddsaPublicKey *ch_key
348 = ch->is_master ? &mst->pub_key : &slv->chan_key;
349 struct GNUNET_HashCode *ch_key_hash
350 = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash;
351
352 switch (type)
353 {
354 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
355 GNUNET_PSYCSTORE_fragment_store (store, ch_key,
356 (const struct
357 GNUNET_MULTICAST_MessageHeader *) msg,
358 0, NULL, NULL);
359 GNUNET_CONTAINER_multihashmap_get_multiple (clients, ch_key_hash,
360 send_to_client, (void *) msg);
361 break;
362
363 default:
364 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
365 "Ignoring unknown message of type %u and size %u.\n",
366 type, size);
367 }
268} 368}
269 369
370
371/**
372 * Response from PSYCstore with the current counter values for a channel master.
373 */
270void 374void
271master_counters_cb (void *cls, int result, uint64_t max_fragment_id, 375master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
272 uint64_t max_message_id, uint64_t max_group_generation, 376 uint64_t max_message_id, uint64_t max_group_generation,
@@ -299,6 +403,9 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
299} 403}
300 404
301 405
406/**
407 * Response from PSYCstore with the current counter values for a channel slave.
408 */
302void 409void
303slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, 410slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
304 uint64_t max_message_id, uint64_t max_group_generation, 411 uint64_t max_message_id, uint64_t max_group_generation,
@@ -332,6 +439,9 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
332} 439}
333 440
334 441
442/**
443 * Handle a connecting client starting a channel master.
444 */
335static void 445static void
336handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, 446handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
337 const struct GNUNET_MessageHeader *msg) 447 const struct GNUNET_MessageHeader *msg)
@@ -357,6 +467,9 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
357} 467}
358 468
359 469
470/**
471 * Handle a connecting client joining as a channel slave.
472 */
360static void 473static void
361handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, 474handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
362 const struct GNUNET_MessageHeader *msg) 475 const struct GNUNET_MessageHeader *msg)
@@ -389,13 +502,26 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
389} 502}
390 503
391 504
505/**
506 * Send transmission acknowledgement to a client.
507 *
508 * Sent after the last GNUNET_PSYC_MessageModifier and after each
509 * GNUNET_PSYC_MessageData.
510 *
511 * @param ch The channel struct for the client.
512 */
392static void 513static void
393send_transmit_ack (struct Channel *ch) 514send_transmit_ack (struct Channel *ch)
394{ 515{
395 struct TransmitAck *res = GNUNET_malloc (sizeof (*res)); 516 struct TransmitAck *res = GNUNET_malloc (sizeof (*res));
396 res->header.size = htons (sizeof (*res)); 517 res->header.size = htons (sizeof (*res));
397 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); 518 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
398 res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size); 519
520 res->buf_avail = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
521 struct TransmitMessage *tmit_msg = ch->tmit_tail;
522 if (NULL != tmit_msg && GNUNET_PSYC_DATA_CONT == tmit_msg->status)
523 res->buf_avail -= tmit_msg->size;
524 res->buf_avail = htons (res->buf_avail);
399 525
400 GNUNET_SERVER_notification_context_add (nc, ch->client); 526 GNUNET_SERVER_notification_context_add (nc, ch->client);
401 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, 527 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
@@ -404,30 +530,53 @@ send_transmit_ack (struct Channel *ch)
404} 530}
405 531
406 532
533/**
534 * Callback for the transmit functions of multicast.
535 */
407static int 536static int
408transmit_notify (void *cls, size_t *data_size, void *data) 537transmit_notify (void *cls, size_t *data_size, void *data)
409{ 538{
410 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n");
411 struct Channel *ch = cls; 539 struct Channel *ch = cls;
412 struct TransmitMessage *msg = ch->tmit_head; 540 struct TransmitMessage *msg = ch->tmit_head;
413 541
414 if (NULL == msg || *data_size < ntohs (msg->size)) 542 if (NULL == msg || *data_size < msg->size)
415 { 543 {
544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n");
416 *data_size = 0; 545 *data_size = 0;
417 return GNUNET_NO; 546 return GNUNET_NO;
418 } 547 }
419 548
420 *data_size = ntohs (msg->size); 549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
550 "transmit_notify: sending %u bytes.\n", msg->size);
551
552 *data_size = msg->size;
421 memcpy (data, msg->buf, *data_size); 553 memcpy (data, msg->buf, *data_size);
422 554
423 GNUNET_free (ch->tmit_buf);
424 ch->tmit_buf = NULL;
425 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); 555 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
556 GNUNET_free (msg);
557
558 int ret = (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
559
560 if (0 == ch->tmit_task)
561 {
562 if (NULL != ch->tmit_head)
563 {
564 transmit_message (ch, GNUNET_TIME_UNIT_ZERO);
565 }
566 else if (ch->disconnected)
567 {
568 /* FIXME: handle partial message (when still in_transmit) */
569 client_cleanup (ch);
570 }
571 }
426 572
427 return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; 573 return ret;
428} 574}
429 575
430 576
577/**
578 * Transmit a message from a channel master to the multicast group.
579 */
431static void 580static void
432master_transmit_message (void *cls, 581master_transmit_message (void *cls,
433 const struct GNUNET_SCHEDULER_TaskContext *tc) 582 const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -449,6 +598,9 @@ master_transmit_message (void *cls,
449} 598}
450 599
451 600
601/**
602 * Transmit a message from a channel slave to the multicast group.
603 */
452static void 604static void
453slave_transmit_message (void *cls, 605slave_transmit_message (void *cls,
454 const struct GNUNET_SCHEDULER_TaskContext *tc) 606 const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -468,50 +620,90 @@ slave_transmit_message (void *cls,
468} 620}
469 621
470 622
623/**
624 * Schedule message transmission from a channel to the multicast group.
625 *
626 * @param ch The channel.
627 * @param delay Transmission delay.
628 */
629static void
630transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay)
631{
632 if (0 != ch->tmit_task)
633 GNUNET_SCHEDULER_cancel (ch->tmit_task);
634
635 ch->tmit_task
636 = ch->is_master
637 ? GNUNET_SCHEDULER_add_delayed (delay, master_transmit_message, ch)
638 : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch);
639}
640
641/**
642 * Queue incoming message parts from a client for transmission, and send them to
643 * the multicast group when the buffer is full or reached the end of message.
644 *
645 * @param ch Channel struct for the client.
646 * @param msg Message from the client.
647 *
648 * @return #GNUNET_OK on success, else #GNUNET_SYSERR.
649 */
471static int 650static int
472buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) 651queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
473{ 652{
474 uint16_t size = ntohs (msg->size); 653 uint16_t size = ntohs (msg->size);
475 struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO; 654 struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
655 struct TransmitMessage *tmit_msg = ch->tmit_tail;
656
657 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
658 "Queueing message of type %u and size %u "
659 "for transmission to multicast.\n",
660 ntohs (msg->type), size);
476 661
477 if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size) 662 if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
478 return GNUNET_SYSERR; 663 return GNUNET_SYSERR;
479 664
480 if (0 == ch->tmit_size) 665 if (NULL == tmit_msg
481 { 666 || tmit_msg->status != GNUNET_PSYC_DATA_CONT
482 ch->tmit_buf = GNUNET_malloc (size); 667 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < tmit_msg->size + size)
483 memcpy (ch->tmit_buf, msg, size);
484 ch->tmit_size = size;
485 }
486 else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size)
487 {
488 ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size);
489 memcpy (ch->tmit_buf + ch->tmit_size, msg, size);
490 ch->tmit_size += size;
491 }
492
493 if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE
494 < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData))
495 { 668 {
496 struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage); 669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
497 tmit_msg->buf = (char *) msg; 670 "Appending message qto new buffer.\n");
671 /* Start filling up new buffer */
672 tmit_msg = GNUNET_new (struct TransmitMessage);
673 tmit_msg->buf = GNUNET_malloc (size);
674 memcpy (tmit_msg->buf, msg, size);
498 tmit_msg->size = size; 675 tmit_msg->size = size;
499 tmit_msg->status = ch->tmit_status; 676 tmit_msg->status = ch->tmit_status;
500 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); 677 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
501 tmit_delay = GNUNET_TIME_UNIT_ZERO; 678 }
679 else
680 {
681 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
682 "Appending message to existing buffer.\n");
683 /* Append to existing buffer */
684 tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size);
685 memcpy (tmit_msg->buf + tmit_msg->size, msg, size);
686 tmit_msg->size += size;
687 tmit_msg->status = ch->tmit_status;
502 } 688 }
503 689
504 if (0 != ch->tmit_task) 690 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size);
505 GNUNET_SCHEDULER_cancel (ch->tmit_task);
506 691
507 ch->tmit_task 692 /* Wait a bit for the remaining message parts from the client
508 = ch->is_master 693 if there's still some space left in the buffer. */
509 ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch) 694 if (GNUNET_PSYC_DATA_CONT == tmit_msg->status
510 : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch); 695 && (tmit_msg->size + sizeof (struct GNUNET_PSYC_MessageData)
696 < GNUNET_MULTICAST_FRAGMENT_MAX_SIZE))
697 tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2);
698
699 transmit_message (ch, tmit_delay);
511 700
512 return GNUNET_OK; 701 return GNUNET_OK;
513} 702}
514 703
704/**
705 * Incoming method from a client.
706 */
515static void 707static void
516handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, 708handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
517 const struct GNUNET_MessageHeader *msg) 709 const struct GNUNET_MessageHeader *msg)
@@ -524,18 +716,16 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
524 716
525 if (GNUNET_NO != ch->in_transmit) 717 if (GNUNET_NO != ch->in_transmit)
526 { 718 {
527 // FIXME: already transmitting a message, send back error message. 719 /* FIXME: already transmitting a message, send back error message. */
528 return; 720 return;
529 } 721 }
530 722
531 ch->in_transmit = GNUNET_YES; 723 ch->in_transmit = GNUNET_YES;
532 ch->tmit_buf = NULL;
533 ch->tmit_size = 0;
534 ch->tmit_mod_recvd = 0; 724 ch->tmit_mod_recvd = 0;
535 ch->tmit_mod_count = ntohl (meth->mod_count); 725 ch->tmit_mod_count = ntohl (meth->mod_count);
536 ch->tmit_status = GNUNET_PSYC_DATA_CONT; 726 ch->tmit_status = GNUNET_PSYC_DATA_CONT;
537 727
538 buffer_message (ch, msg); 728 queue_message (ch, msg);
539 729
540 if (0 == ch->tmit_mod_count) 730 if (0 == ch->tmit_mod_count)
541 send_transmit_ack (ch); 731 send_transmit_ack (ch);
@@ -544,18 +734,23 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
544}; 734};
545 735
546 736
737/**
738 * Incoming modifier from a client.
739 */
547static void 740static void
548handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, 741handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
549 const struct GNUNET_MessageHeader *msg) 742 const struct GNUNET_MessageHeader *msg)
550{ 743{
744 /*
551 const struct GNUNET_PSYC_MessageModifier *mod 745 const struct GNUNET_PSYC_MessageModifier *mod
552 = (const struct GNUNET_PSYC_MessageModifier *) msg; 746 = (const struct GNUNET_PSYC_MessageModifier *) msg;
747 */
553 struct Channel *ch 748 struct Channel *ch
554 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 749 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
555 GNUNET_assert (NULL != ch); 750 GNUNET_assert (NULL != ch);
556 751
557 ch->tmit_mod_recvd++; 752 ch->tmit_mod_recvd++;
558 buffer_message (ch, msg); 753 queue_message (ch, msg);
559 754
560 if (ch->tmit_mod_recvd == ch->tmit_mod_count) 755 if (ch->tmit_mod_recvd == ch->tmit_mod_count)
561 send_transmit_ack (ch); 756 send_transmit_ack (ch);
@@ -564,6 +759,9 @@ handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
564}; 759};
565 760
566 761
762/**
763 * Incoming data from a client.
764 */
567static void 765static void
568handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, 766handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
569 const struct GNUNET_MessageHeader *msg) 767 const struct GNUNET_MessageHeader *msg)
@@ -575,7 +773,7 @@ handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
575 GNUNET_assert (NULL != ch); 773 GNUNET_assert (NULL != ch);
576 774
577 ch->tmit_status = ntohs (data->status); 775 ch->tmit_status = ntohs (data->status);
578 buffer_message (ch, msg); 776 queue_message (ch, msg);
579 send_transmit_ack (ch); 777 send_transmit_ack (ch);
580 778
581 if (GNUNET_PSYC_DATA_CONT != ch->tmit_status) 779 if (GNUNET_PSYC_DATA_CONT != ch->tmit_status)
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 18b5920b3..f971de1b8 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -69,12 +69,12 @@ struct GNUNET_PSYC_Channel
69 /** 69 /**
70 * Head of operations to transmit. 70 * Head of operations to transmit.
71 */ 71 */
72 struct OperationHandle *transmit_head; 72 struct OperationHandle *tmit_head;
73 73
74 /** 74 /**
75 * Tail of operations to transmit. 75 * Tail of operations to transmit.
76 */ 76 */
77 struct OperationHandle *transmit_tail; 77 struct OperationHandle *tmit_tail;
78 78
79 /** 79 /**
80 * Message to send on reconnect. 80 * Message to send on reconnect.
@@ -116,6 +116,16 @@ struct GNUNET_PSYC_Channel
116 * Buffer space available for transmitting the next data fragment. 116 * Buffer space available for transmitting the next data fragment.
117 */ 117 */
118 uint16_t tmit_buf_avail; 118 uint16_t tmit_buf_avail;
119
120 /**
121 * Is transmission paused?
122 */
123 uint8_t tmit_paused;
124
125 /**
126 * Are we still waiting for a PSYC_TRANSMIT_ACK?
127 */
128 uint8_t tmit_ack_pending;
119}; 129};
120 130
121 131
@@ -243,6 +253,11 @@ static void
243transmit_next (struct GNUNET_PSYC_Channel *ch); 253transmit_next (struct GNUNET_PSYC_Channel *ch);
244 254
245 255
256/**
257 * Request data from client to transmit.
258 *
259 * @param mst Master handle.
260 */
246static void 261static void
247master_transmit_data (struct GNUNET_PSYC_Master *mst) 262master_transmit_data (struct GNUNET_PSYC_Master *mst)
248{ 263{
@@ -268,12 +283,13 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
268 default: 283 default:
269 mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; 284 mst->tmit->status = GNUNET_PSYC_DATA_CANCEL;
270 data_size = 0; 285 data_size = 0;
271 LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n"); 286 LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error.\n");
272 } 287 }
273 288
274 if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) 289 if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size))
275 { 290 {
276 /* Transmission paused, nothing to send. */ 291 /* Transmission paused, nothing to send. */
292 ch->tmit_paused = GNUNET_YES;
277 GNUNET_free (op); 293 GNUNET_free (op);
278 } 294 }
279 else 295 else
@@ -281,7 +297,8 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
281 GNUNET_assert (data_size <= ch->tmit_buf_avail); 297 GNUNET_assert (data_size <= ch->tmit_buf_avail);
282 pdata->header.size = htons (sizeof (*pdata) + data_size); 298 pdata->header.size = htons (sizeof (*pdata) + data_size);
283 pdata->status = htons (mst->tmit->status); 299 pdata->status = htons (mst->tmit->status);
284 GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); 300 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
301 ch->tmit_ack_pending = GNUNET_YES;
285 transmit_next (ch); 302 transmit_next (ch);
286 } 303 }
287} 304}
@@ -305,7 +322,6 @@ message_handler (void *cls,
305 struct CountersResult *cres; 322 struct CountersResult *cres;
306 struct TransmitAck *tack; 323 struct TransmitAck *tack;
307 324
308
309 if (NULL == msg) 325 if (NULL == msg)
310 { 326 {
311 reschedule_connect (ch); 327 reschedule_connect (ch);
@@ -317,7 +333,8 @@ message_handler (void *cls,
317 uint16_t type = ntohs (msg->type); 333 uint16_t type = ntohs (msg->type);
318 334
319 LOG (GNUNET_ERROR_TYPE_DEBUG, 335 LOG (GNUNET_ERROR_TYPE_DEBUG,
320 "Received message of type %d from PSYC service\n", type); 336 "Received message of type %d and size %u from PSYC service\n",
337 type, size);
321 338
322 switch (type) 339 switch (type)
323 { 340 {
@@ -328,10 +345,16 @@ message_handler (void *cls,
328 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: 345 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
329 size_eq = sizeof (struct TransmitAck); 346 size_eq = sizeof (struct TransmitAck);
330 break; 347 break;
348 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
349 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
350 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
351 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
352 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
353 size_min = sizeof (struct GNUNET_PSYC_MessageData);
331 } 354 }
332 355
333 if (! ((0 < size_eq && size == size_eq) 356 if (! ((0 < size_eq && size == size_eq)
334 || (0 < size_min && size >= size_min))) 357 || (0 < size_min && size_min <= size)))
335 { 358 {
336 GNUNET_break (0); 359 GNUNET_break (0);
337 reschedule_connect (ch); 360 reschedule_connect (ch);
@@ -370,7 +393,9 @@ message_handler (void *cls,
370 else 393 else
371 { 394 {
372 ch->tmit_buf_avail = ntohs (tack->buf_avail); 395 ch->tmit_buf_avail = ntohs (tack->buf_avail);
373 master_transmit_data (mst); 396 ch->tmit_ack_pending = GNUNET_NO;
397 if (GNUNET_NO == ch->tmit_paused)
398 master_transmit_data (mst);
374 } 399 }
375 } 400 }
376 else 401 else
@@ -378,6 +403,18 @@ message_handler (void *cls,
378 /* TODO: slave */ 403 /* TODO: slave */
379 } 404 }
380 break; 405 break;
406
407 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
408
409 break;
410
411 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
412
413 break;
414
415 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
416
417 break;
381 } 418 }
382 419
383 GNUNET_CLIENT_receive (ch->client, &message_handler, ch, 420 GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
@@ -397,9 +434,9 @@ static size_t
397send_next_message (void *cls, size_t size, void *buf) 434send_next_message (void *cls, size_t size, void *buf)
398{ 435{
399 struct GNUNET_PSYC_Channel *ch = cls; 436 struct GNUNET_PSYC_Channel *ch = cls;
400 struct OperationHandle *op = ch->transmit_head; 437 struct OperationHandle *op = ch->tmit_head;
401 size_t ret; 438 size_t ret;
402 439 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
403 ch->th = NULL; 440 ch->th = NULL;
404 if (NULL == op->msg) 441 if (NULL == op->msg)
405 return 0; 442 return 0;
@@ -409,15 +446,12 @@ send_next_message (void *cls, size_t size, void *buf)
409 reschedule_connect (ch); 446 reschedule_connect (ch);
410 return 0; 447 return 0;
411 } 448 }
412 LOG (GNUNET_ERROR_TYPE_DEBUG,
413 "Sending message of type %d to PSYC service\n",
414 ntohs (op->msg->type));
415 memcpy (buf, op->msg, ret); 449 memcpy (buf, op->msg, ret);
416 450
417 GNUNET_CONTAINER_DLL_remove (ch->transmit_head, ch->transmit_tail, op); 451 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, op);
418 GNUNET_free (op); 452 GNUNET_free (op);
419 453
420 if (NULL != ch->transmit_head) 454 if (NULL != ch->tmit_head)
421 transmit_next (ch); 455 transmit_next (ch);
422 456
423 if (GNUNET_NO == ch->in_receive) 457 if (GNUNET_NO == ch->in_receive)
@@ -438,10 +472,11 @@ send_next_message (void *cls, size_t size, void *buf)
438static void 472static void
439transmit_next (struct GNUNET_PSYC_Channel *ch) 473transmit_next (struct GNUNET_PSYC_Channel *ch)
440{ 474{
475 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
441 if (NULL != ch->th || NULL == ch->client) 476 if (NULL != ch->th || NULL == ch->client)
442 return; 477 return;
443 478
444 struct OperationHandle *op = ch->transmit_head; 479 struct OperationHandle *op = ch->tmit_head;
445 if (NULL == op) 480 if (NULL == op)
446 return; 481 return;
447 482
@@ -472,14 +507,14 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
472 ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); 507 ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
473 GNUNET_assert (NULL != ch->client); 508 GNUNET_assert (NULL != ch->client);
474 509
475 if (NULL == ch->transmit_head || 510 if (NULL == ch->tmit_head ||
476 ch->transmit_head->msg->type != ch->reconnect_msg->type) 511 ch->tmit_head->msg->type != ch->reconnect_msg->type)
477 { 512 {
478 uint16_t reconn_size = ntohs (ch->reconnect_msg->size); 513 uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
479 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); 514 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size);
480 memcpy (&op[1], ch->reconnect_msg, reconn_size); 515 memcpy (&op[1], ch->reconnect_msg, reconn_size);
481 op->msg = (struct GNUNET_MessageHeader *) &op[1]; 516 op->msg = (struct GNUNET_MessageHeader *) &op[1];
482 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); 517 GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, op);
483 } 518 }
484 transmit_next (ch); 519 transmit_next (ch);
485} 520}
@@ -496,7 +531,7 @@ disconnect (void *c)
496 struct GNUNET_PSYC_Channel *ch = c; 531 struct GNUNET_PSYC_Channel *ch = c;
497 532
498 GNUNET_assert (NULL != ch); 533 GNUNET_assert (NULL != ch);
499 if (ch->transmit_head != ch->transmit_tail) 534 if (ch->tmit_head != ch->tmit_tail)
500 { 535 {
501 LOG (GNUNET_ERROR_TYPE_ERROR, 536 LOG (GNUNET_ERROR_TYPE_ERROR,
502 "Disconnecting while there are still outstanding messages!\n"); 537 "Disconnecting while there are still outstanding messages!\n");
@@ -654,7 +689,7 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
654 memcpy (&pmod[1], mod->name, name_size); 689 memcpy (&pmod[1], mod->name, name_size);
655 memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); 690 memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
656 691
657 GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); 692 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
658 return GNUNET_YES; 693 return GNUNET_YES;
659} 694}
660 695
@@ -699,7 +734,7 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
699 pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); 734 pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
700 memcpy (&pmeth[1], method_name, size); 735 memcpy (&pmeth[1], method_name, size);
701 736
702 GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); 737 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
703 GNUNET_ENV_environment_iterate (env, send_modifier, master); 738 GNUNET_ENV_environment_iterate (env, send_modifier, master);
704 transmit_next (ch); 739 transmit_next (ch);
705 740
@@ -720,7 +755,12 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
720void 755void
721GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 756GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
722{ 757{
723 master_transmit_data (th->master); 758 struct GNUNET_PSYC_Channel *ch = &th->master->ch;
759 if (GNUNET_NO == ch->tmit_ack_pending)
760 {
761 ch->tmit_paused = GNUNET_NO;
762 master_transmit_data (th->master);
763 }
724} 764}
725 765
726 766
@@ -938,8 +978,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
938 slvadd->header.size = htons (sizeof (*slvadd)); 978 slvadd->header.size = htons (sizeof (*slvadd));
939 slvadd->announced_at = GNUNET_htonll (announced_at); 979 slvadd->announced_at = GNUNET_htonll (announced_at);
940 slvadd->effective_since = GNUNET_htonll (effective_since); 980 slvadd->effective_since = GNUNET_htonll (effective_since);
941 GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head, 981 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
942 channel->transmit_tail, 982 channel->tmit_tail,
943 op); 983 op);
944 transmit_next (channel); 984 transmit_next (channel);
945} 985}
@@ -979,8 +1019,8 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
979 slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM; 1019 slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM;
980 slvrm->header.size = htons (sizeof (*slvrm)); 1020 slvrm->header.size = htons (sizeof (*slvrm));
981 slvrm->announced_at = GNUNET_htonll (announced_at); 1021 slvrm->announced_at = GNUNET_htonll (announced_at);
982 GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head, 1022 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
983 channel->transmit_tail, 1023 channel->tmit_tail,
984 op); 1024 op);
985 transmit_next (channel); 1025 transmit_next (channel);
986} 1026}
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 41c4ca8e4..2986fdf6a 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -144,10 +144,12 @@ join (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
144 return GNUNET_OK; 144 return GNUNET_OK;
145} 145}
146 146
147
147struct TransmitClosure 148struct TransmitClosure
148{ 149{
149 struct GNUNET_PSYC_MasterTransmitHandle *handle; 150 struct GNUNET_PSYC_MasterTransmitHandle *handle;
150 uint8_t n; 151 uint8_t n;
152 uint8_t paused;
151 uint8_t fragment_count; 153 uint8_t fragment_count;
152 char *fragments[16]; 154 char *fragments[16];
153 uint16_t fragment_sizes[16]; 155 uint16_t fragment_sizes[16];
@@ -157,8 +159,9 @@ struct TransmitClosure
157static void 159static void
158transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 160transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
159{ 161{
160 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Transmit resume\n"); 162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
161 struct TransmitClosure *tmit = cls; 163 struct TransmitClosure *tmit = cls;
164 tmit->paused = GNUNET_NO;
162 GNUNET_PSYC_master_transmit_resume (tmit->handle); 165 GNUNET_PSYC_master_transmit_resume (tmit->handle);
163} 166}
164 167
@@ -167,33 +170,36 @@ static int
167transmit_notify (void *cls, size_t *data_size, void *data) 170transmit_notify (void *cls, size_t *data_size, void *data)
168{ 171{
169 struct TransmitClosure *tmit = cls; 172 struct TransmitClosure *tmit = cls;
170 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 173 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
171 "Transmit notify: %lu bytes\n", *data_size); 174 "Transmit notify: %lu bytes available, "
172 175 "processing fragment %u/%u.\n",
173 if (tmit->fragment_count <= tmit->n) 176 *data_size, tmit->n + 1, tmit->fragment_count);
174 return GNUNET_YES;
175
176 GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); 177 GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size);
177 178
178 *data_size = tmit->fragment_sizes[tmit->n]; 179 if (GNUNET_YES == tmit->paused && tmit->n == tmit->fragment_count - 1)
179 memcpy (data, tmit->fragments[tmit->n], *data_size);
180 tmit->n++;
181
182 if (tmit->n == tmit->fragment_count - 1)
183 { 180 {
184 /* Send last fragment later. */ 181 /* Send last fragment later. */
185 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &transmit_resume, 182 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
186 tmit); 183 tmit->paused = GNUNET_YES;
184 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
185 (GNUNET_TIME_UNIT_SECONDS, 3),
186 &transmit_resume, tmit);
187 *data_size = 0; 187 *data_size = 0;
188 return GNUNET_NO; 188 return GNUNET_NO;
189 } 189 }
190 return tmit->n <= tmit->fragment_count ? GNUNET_NO : GNUNET_YES; 190
191 GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size);
192 *data_size = tmit->fragment_sizes[tmit->n];
193 memcpy (data, tmit->fragments[tmit->n], *data_size);
194
195 return ++tmit->n < tmit->fragment_count ? GNUNET_NO : GNUNET_YES;
191} 196}
192 197
193void 198void
194master_started (void *cls, uint64_t max_message_id) 199master_started (void *cls, uint64_t max_message_id)
195{ 200{
196 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", max_message_id); 201 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
202 "Master started: %lu\n", max_message_id);
197 203
198 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); 204 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
199 GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, 205 GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
@@ -202,11 +208,13 @@ master_started (void *cls, uint64_t max_message_id)
202 "_foo_bar", "foo bar baz", 11); 208 "_foo_bar", "foo bar baz", 11);
203 209
204 struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); 210 struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure);
205 tmit->fragment_count = 2; 211 tmit->fragment_count = 3;
206 tmit->fragments[0] = "foo bar"; 212 tmit->fragments[0] = "foo";
207 tmit->fragment_sizes[0] = 7; 213 tmit->fragment_sizes[0] = 4;
208 tmit->fragments[1] = "baz!"; 214 tmit->fragments[1] = "foo bar";
209 tmit->fragment_sizes[1] = 4; 215 tmit->fragment_sizes[1] = 7;
216 tmit->fragments[2] = "foo bar baz";
217 tmit->fragment_sizes[2] = 11;
210 tmit->handle 218 tmit->handle
211 = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit, 219 = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit,
212 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); 220 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);