aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-01-06 00:09:43 +0000
committerGabor X Toth <*@tg-x.net>2014-01-06 00:09:43 +0000
commit1a0ffe2288b97b47a5b2bfbda2f9438680429422 (patch)
tree72db4cd67f06253a60bf3e2966fd0b1bf55eba5c
parent43d497d7c4ebb6efae37ae4bb2f812a68aa64a32 (diff)
downloadgnunet-1a0ffe2288b97b47a5b2bfbda2f9438680429422.tar.gz
gnunet-1a0ffe2288b97b47a5b2bfbda2f9438680429422.zip
psyc: ipc messages, notify callback for modifiers, tests
-rwxr-xr-xcontrib/logread.pl2
-rw-r--r--src/include/gnunet_protocols.h2
-rw-r--r--src/include/gnunet_psyc_service.h7
-rw-r--r--src/multicast/multicast_api.c9
-rw-r--r--src/psyc/gnunet-service-psyc.c323
-rw-r--r--src/psyc/psyc_api.c443
-rw-r--r--src/psyc/test_psyc.c224
-rw-r--r--src/psyc/test_psyc.conf7
8 files changed, 502 insertions, 515 deletions
diff --git a/contrib/logread.pl b/contrib/logread.pl
index c6f82a68d..11baf2d86 100755
--- a/contrib/logread.pl
+++ b/contrib/logread.pl
@@ -98,7 +98,7 @@ while (<>)
98 s/\b(multicast|psyc|psycstore|social)\b/BLUE $1/ex; 98 s/\b(multicast|psyc|psycstore|social)\b/BLUE $1/ex;
99 99
100 # Add message type names 100 # Add message type names
101 s/(message(?:\s+of)?\s+type\s+)(\d+)/ 101 s/(message(?:\s+part)?(?:\s+of)?\s+type\s+)(\d+)/
102 $1 . BRIGHT_CYAN (exists $msgtypes{$2} ? $msgtypes{$2} : 'UNKNOWN') . 102 $1 . BRIGHT_CYAN (exists $msgtypes{$2} ? $msgtypes{$2} : 'UNKNOWN') .
103 CYAN " ($2)"/e; 103 CYAN " ($2)"/e;
104 104
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 9ca4155e8..2470b3ab1 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2130,7 +2130,7 @@ extern "C"
2130 2130
2131#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL 697 2131#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL 697
2132 2132
2133#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK 698 2133#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK 698
2134 2134
2135 2135
2136#define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701 2136#define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h
index eb17c9351..f843fbe1f 100644
--- a/src/include/gnunet_psyc_service.h
+++ b/src/include/gnunet_psyc_service.h
@@ -426,6 +426,11 @@ typedef int
426 uint16_t *data_size, 426 uint16_t *data_size,
427 void *data); 427 void *data);
428 428
429typedef int
430(*GNUNET_PSYC_MasterTransmitNotifyModifier) (void *cls,
431 uint16_t *data_size,
432 void *data,
433 uint8_t *oper);
429 434
430/** 435/**
431 * Flags for transmitting messages to a channel by the master. 436 * Flags for transmitting messages to a channel by the master.
@@ -472,7 +477,7 @@ struct GNUNET_PSYC_MasterTransmitHandle;
472struct GNUNET_PSYC_MasterTransmitHandle * 477struct GNUNET_PSYC_MasterTransmitHandle *
473GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, 478GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
474 const char *method_name, 479 const char *method_name,
475 GNUNET_PSYC_MasterTransmitNotify notify_mod, 480 GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod,
476 GNUNET_PSYC_MasterTransmitNotify notify_data, 481 GNUNET_PSYC_MasterTransmitNotify notify_data,
477 void *notify_cls, 482 void *notify_cls,
478 enum GNUNET_PSYC_MasterTransmitFlags flags); 483 enum GNUNET_PSYC_MasterTransmitFlags flags);
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index 6b784c2f0..bb6a57b58 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -362,8 +362,9 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
362 struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle; 362 struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle;
363 363
364 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; 364 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
365 char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
365 struct GNUNET_MULTICAST_MessageHeader *msg 366 struct GNUNET_MULTICAST_MessageHeader *msg
366 = GNUNET_malloc (buf_size); 367 = (struct GNUNET_MULTICAST_MessageHeader *) buf;
367 int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]); 368 int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]);
368 369
369 if (! (GNUNET_YES == ret || GNUNET_NO == ret) 370 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
@@ -380,12 +381,12 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
380 381
381 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); 382 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
382 msg->header.size = htons (sizeof (*msg) + buf_size); 383 msg->header.size = htons (sizeof (*msg) + buf_size);
383 msg->message_id = mh->message_id; 384 msg->message_id = GNUNET_htonll (mh->message_id);
384 msg->group_generation = mh->group_generation; 385 msg->group_generation = mh->group_generation;
385 386
386 /* FIXME: add fragment ID and signature in the service instead of here */ 387 /* FIXME: add fragment ID and signature in the service instead of here */
387 msg->fragment_id = orig->next_fragment_id++; 388 msg->fragment_id = GNUNET_ntohll (orig->next_fragment_id++);
388 msg->fragment_offset = mh->fragment_offset; 389 msg->fragment_offset = GNUNET_ntohll (mh->fragment_offset);
389 mh->fragment_offset += sizeof (*msg) + buf_size; 390 mh->fragment_offset += sizeof (*msg) + buf_size;
390 msg->purpose.size = htonl (sizeof (*msg) + buf_size 391 msg->purpose.size = htonl (sizeof (*msg) + buf_size
391 - sizeof (msg->header) 392 - sizeof (msg->header)
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 628c39900..e5de7dcda 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -171,8 +171,8 @@ struct Slave
171}; 171};
172 172
173 173
174static void 174static inline void
175transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay); 175transmit_message (struct Channel *ch);
176 176
177 177
178/** 178/**
@@ -205,6 +205,7 @@ client_cleanup (struct Channel *ch)
205 struct Master *mst = (struct Master *) ch; 205 struct Master *mst = (struct Master *) ch;
206 if (NULL != mst->origin) 206 if (NULL != mst->origin)
207 GNUNET_MULTICAST_origin_stop (mst->origin); 207 GNUNET_MULTICAST_origin_stop (mst->origin);
208 GNUNET_CONTAINER_multihashmap_remove (clients, &mst->pub_key_hash, mst);
208 } 209 }
209 else 210 else
210 { 211 {
@@ -251,7 +252,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
251 /* Send pending messages to multicast before cleanup. */ 252 /* Send pending messages to multicast before cleanup. */
252 if (NULL != ch->tmit_head) 253 if (NULL != ch->tmit_head)
253 { 254 {
254 transmit_message (ch, GNUNET_TIME_UNIT_ZERO); 255 transmit_message (ch);
255 } 256 }
256 else 257 else
257 { 258 {
@@ -321,6 +322,10 @@ message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
321 const struct GNUNET_MessageHeader *msg = cls; 322 const struct GNUNET_MessageHeader *msg = cls;
322 struct Channel *ch = chan; 323 struct Channel *ch = chan;
323 324
325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
326 "Sending message of type %u and size %u to client 0x%zx.\n",
327 ntohs (msg->type), ntohs (msg->size), ch->client);
328
324 GNUNET_SERVER_notification_context_add (nc, ch->client); 329 GNUNET_SERVER_notification_context_add (nc, ch->client);
325 GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO); 330 GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO);
326 331
@@ -363,24 +368,6 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
363 GNUNET_MULTICAST_MessageHeader *) msg, 368 GNUNET_MULTICAST_MessageHeader *) msg,
364 0, NULL, NULL); 369 0, NULL, NULL);
365 370
366 uint16_t size = ntohs (msg->size);
367 uint16_t psize = 0;
368 uint16_t pos = 0;
369
370 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
371 {
372 const struct GNUNET_MessageHeader *pmsg
373 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
374 uint16_t psize = ntohs (pmsg->size);
375 if (sizeof (*msg) + pos + psize > size)
376 {
377 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
378 "Message received from multicast contains invalid PSYC "
379 "message. Not sending to clients.\n");
380 return;
381 }
382 }
383
384#if TODO 371#if TODO
385 /* FIXME: apply modifiers to state in PSYCstore */ 372 /* FIXME: apply modifiers to state in PSYCstore */
386 GNUNET_PSYCSTORE_state_modify (store, chan_key, 373 GNUNET_PSYCSTORE_state_modify (store, chan_key,
@@ -393,6 +380,26 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
393 = (const struct GNUNET_MULTICAST_MessageHeader *) msg; 380 = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
394 struct GNUNET_PSYC_MessageHeader *pmsg; 381 struct GNUNET_PSYC_MessageHeader *pmsg;
395 382
383 uint16_t size = ntohs (msg->size);
384 uint16_t psize = 0;
385 uint16_t pos = 0;
386
387 for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize)
388 {
389 const struct GNUNET_MessageHeader *pmsg
390 = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos);
391 psize = ntohs (pmsg->size);
392 if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size)
393 {
394 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
395 "Received invalid message part of type %u and size %u "
396 "from multicast. Not sending to clients.\n",
397 ntohs (pmsg->type), psize);
398 GNUNET_break_op (0);
399 return;
400 }
401 }
402
396 psize = sizeof (*pmsg) + size - sizeof (*mmsg); 403 psize = sizeof (*pmsg) + size - sizeof (*mmsg);
397 pmsg = GNUNET_malloc (psize); 404 pmsg = GNUNET_malloc (psize);
398 pmsg->header.size = htons (psize); 405 pmsg->header.size = htons (psize);
@@ -572,19 +579,18 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
572 579
573 580
574/** 581/**
575 * Send transmission acknowledgement to a client. 582 * Send acknowledgement to a client.
576 * 583 *
577 * Sent after the last GNUNET_PSYC_MessageModifier and after each 584 * Sent after a message fragment has been passed on to multicast.
578 * GNUNET_PSYC_MessageData.
579 * 585 *
580 * @param ch The channel struct for the client. 586 * @param ch The channel struct for the client.
581 */ 587 */
582static void 588static void
583send_transmit_ack (struct Channel *ch) 589send_message_ack (struct Channel *ch)
584{ 590{
585 struct GNUNET_MessageHeader res; 591 struct GNUNET_MessageHeader res;
586 res.size = htons (sizeof (res)); 592 res.size = htons (sizeof (res));
587 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); 593 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
588 594
589 GNUNET_SERVER_notification_context_add (nc, ch->client); 595 GNUNET_SERVER_notification_context_add (nc, ch->client);
590 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, 596 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
@@ -599,9 +605,9 @@ static int
599transmit_notify (void *cls, size_t *data_size, void *data) 605transmit_notify (void *cls, size_t *data_size, void *data)
600{ 606{
601 struct Channel *ch = cls; 607 struct Channel *ch = cls;
602 struct TransmitMessage *msg = ch->tmit_head; 608 struct TransmitMessage *tmit_msg = ch->tmit_head;
603 609
604 if (NULL == msg || *data_size < msg->size) 610 if (NULL == tmit_msg || *data_size < tmit_msg->size)
605 { 611 {
606 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n"); 612 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n");
607 *data_size = 0; 613 *data_size = 0;
@@ -609,21 +615,22 @@ transmit_notify (void *cls, size_t *data_size, void *data)
609 } 615 }
610 616
611 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 617 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
612 "transmit_notify: sending %u bytes.\n", msg->size); 618 "transmit_notify: sending %u bytes.\n", tmit_msg->size);
613 619
614 *data_size = msg->size; 620 *data_size = tmit_msg->size;
615 memcpy (data, msg->buf, *data_size); 621 memcpy (data, tmit_msg->buf, *data_size);
616 622
617 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); 623 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
618 GNUNET_free (msg); 624 GNUNET_free (tmit_msg);
619 625
620 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; 626 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
627 send_message_ack (ch);
621 628
622 if (0 == ch->tmit_task) 629 if (0 == ch->tmit_task)
623 { 630 {
624 if (NULL != ch->tmit_head) 631 if (NULL != ch->tmit_head)
625 { 632 {
626 transmit_message (ch, GNUNET_TIME_UNIT_ZERO); 633 transmit_message (ch);
627 } 634 }
628 else if (ch->disconnected) 635 else if (ch->disconnected)
629 { 636 {
@@ -640,11 +647,9 @@ transmit_notify (void *cls, size_t *data_size, void *data)
640 * Transmit a message from a channel master to the multicast group. 647 * Transmit a message from a channel master to the multicast group.
641 */ 648 */
642static void 649static void
643master_transmit_message (void *cls, 650master_transmit_message (struct Master *mst)
644 const struct GNUNET_SCHEDULER_TaskContext *tc)
645{ 651{
646 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n"); 652 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
647 struct Master *mst = cls;
648 mst->channel.tmit_task = 0; 653 mst->channel.tmit_task = 0;
649 if (NULL == mst->tmit_handle) 654 if (NULL == mst->tmit_handle)
650 { 655 {
@@ -664,10 +669,8 @@ master_transmit_message (void *cls,
664 * Transmit a message from a channel slave to the multicast group. 669 * Transmit a message from a channel slave to the multicast group.
665 */ 670 */
666static void 671static void
667slave_transmit_message (void *cls, 672slave_transmit_message (struct Slave *slv)
668 const struct GNUNET_SCHEDULER_TaskContext *tc)
669{ 673{
670 struct Slave *slv = cls;
671 slv->channel.tmit_task = 0; 674 slv->channel.tmit_task = 0;
672 if (NULL == slv->tmit_handle) 675 if (NULL == slv->tmit_handle)
673 { 676 {
@@ -682,214 +685,85 @@ slave_transmit_message (void *cls,
682} 685}
683 686
684 687
685/** 688static inline void
686 * Schedule message transmission from a channel to the multicast group. 689transmit_message (struct Channel *ch)
687 *
688 * @param ch The channel.
689 * @param delay Transmission delay.
690 */
691static void
692transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay)
693{ 690{
694 if (0 != ch->tmit_task) 691 ch->is_master
695 GNUNET_SCHEDULER_cancel (ch->tmit_task); 692 ? master_transmit_message ((struct Master *) ch)
696 693 : slave_transmit_message ((struct Slave *) ch);
697 ch->tmit_task
698 = ch->is_master
699 ? GNUNET_SCHEDULER_add_delayed (delay, master_transmit_message, ch)
700 : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch);
701}
702
703
704/**
705 * Queue incoming message parts from a client for transmission, and send them to
706 * the multicast group when the buffer is full or reached the end of message.
707 *
708 * @param ch Channel struct for the client.
709 * @param msg Message from the client.
710 *
711 * @return #GNUNET_OK on success, else #GNUNET_SYSERR.
712 */
713static int
714queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
715{
716 uint16_t size = ntohs (msg->size);
717 struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
718 struct TransmitMessage *tmit_msg = ch->tmit_tail;
719
720 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
721 "Queueing message of type %u and size %u "
722 "for transmission to multicast.\n",
723 ntohs (msg->type), size);
724
725 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size)
726 return GNUNET_SYSERR;
727
728 if (NULL == tmit_msg
729 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit_msg->size + size)
730 {
731 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
732 "Appending message to new buffer.\n");
733 /* Start filling up new buffer */
734 tmit_msg = GNUNET_new (struct TransmitMessage);
735 tmit_msg->buf = GNUNET_malloc (size);
736 memcpy (tmit_msg->buf, msg, size);
737 tmit_msg->size = size;
738 tmit_msg->state = ch->tmit_state;
739 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
740 }
741 else
742 {
743 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744 "Appending message to existing buffer.\n");
745 /* Append to existing buffer */
746 tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size);
747 memcpy (tmit_msg->buf + tmit_msg->size, msg, size);
748 tmit_msg->size += size;
749 tmit_msg->state = ch->tmit_state;
750 }
751
752 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size);
753
754 /* Wait a bit for the remaining message parts from the client
755 if there's still some space left in the buffer. */
756 if (tmit_msg->state < MSG_STATE_END
757 && (tmit_msg->size + sizeof (struct GNUNET_MessageHeader)
758 < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD))
759 {
760 tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2);
761 }
762 else
763 {
764 send_transmit_ack (ch);
765 }
766
767 transmit_message (ch, tmit_delay);
768
769 return GNUNET_OK;
770} 694}
771 695
772 696
773static void 697static void
774transmit_error (struct Channel *ch) 698transmit_error (struct Channel *ch)
775{ 699{
776 struct GNUNET_MessageHeader *msg = GNUNET_malloc (sizeof (*msg)); 700 struct GNUNET_MessageHeader *msg;
701 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
702 + sizeof (*msg));
703 msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
777 msg->size = ntohs (sizeof (*msg)); 704 msg->size = ntohs (sizeof (*msg));
778 msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 705 msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
779 queue_message (ch, msg);
780 706
707 tmit_msg->buf = (char *) &tmit_msg[1];
708 tmit_msg->size = sizeof (*msg);
709 tmit_msg->state = ch->tmit_state;
710 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
711 transmit_message (ch);
712
713 /* FIXME: cleanup */
781 GNUNET_SERVER_client_disconnect (ch->client); 714 GNUNET_SERVER_client_disconnect (ch->client);
782} 715}
783 716
784/**
785 * Incoming method from a client.
786 */
787static void
788handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
789 const struct GNUNET_MessageHeader *msg)
790{
791 /* const struct GNUNET_PSYC_MessageMethod *meth
792 = (const struct GNUNET_PSYC_MessageMethod *) msg; */
793 struct Channel *ch
794 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
795 GNUNET_assert (NULL != ch);
796
797 if (MSG_STATE_START != ch->tmit_state)
798 {
799 transmit_error (ch);
800 return;
801 }
802 ch->tmit_state = MSG_STATE_METHOD;
803
804 queue_message (ch, msg);
805 send_transmit_ack (ch);
806 GNUNET_SERVER_receive_done (client, GNUNET_OK);
807};
808
809
810/**
811 * Incoming modifier from a client.
812 */
813static void
814handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
815 const struct GNUNET_MessageHeader *msg)
816{
817 const struct GNUNET_PSYC_MessageModifier *mod
818 = (const struct GNUNET_PSYC_MessageModifier *) msg;
819
820 struct Channel *ch
821 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
822 GNUNET_assert (NULL != ch);
823
824 if (MSG_STATE_METHOD != ch->tmit_state
825 || MSG_STATE_MODIFIER != ch->tmit_state
826 || MSG_STATE_MOD_CONT != ch->tmit_state
827 || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size)
828 {
829 transmit_error (ch);
830 return;
831 }
832 ch->tmit_mod_value_size_expected = ntohl (mod->value_size);
833 ch->tmit_mod_value_size = ntohs (msg->size) - ntohs(mod->name_size) - 1;
834
835 queue_message (ch, msg);
836 GNUNET_SERVER_receive_done (client, GNUNET_OK);
837};
838
839 717
840/** 718/**
841 * Incoming modifier from a client. 719 * Incoming message from a client.
842 */ 720 */
843static void 721static void
844handle_transmit_mod_cont (void *cls, struct GNUNET_SERVER_Client *client, 722handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
845 const struct GNUNET_MessageHeader *msg) 723 const struct GNUNET_MessageHeader *msg)
846{ 724{
847 struct Channel *ch 725 struct Channel *ch
848 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 726 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
849 GNUNET_assert (NULL != ch); 727 GNUNET_assert (NULL != ch);
850 728
851 ch->tmit_mod_value_size += ntohs (msg->size); 729 uint16_t size = ntohs (msg->size);
730 uint16_t psize = 0, pos = 0;
852 731
853 if (MSG_STATE_MODIFIER != ch->tmit_state 732 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
854 || MSG_STATE_MOD_CONT != ch->tmit_state
855 || ch->tmit_mod_value_size_expected < ch->tmit_mod_value_size)
856 { 733 {
734 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n");
735 GNUNET_break (0);
857 transmit_error (ch); 736 transmit_error (ch);
858 return; 737 return;
859 } 738 }
860 ch->tmit_state = MSG_STATE_MOD_CONT;
861
862 queue_message (ch, msg);
863 GNUNET_SERVER_receive_done (client, GNUNET_OK);
864};
865 739
866 740 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
867/**
868 * Incoming data from a client.
869 */
870static void
871handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
872 const struct GNUNET_MessageHeader *msg)
873{
874 struct Channel *ch
875 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
876 GNUNET_assert (NULL != ch);
877
878 if (MSG_STATE_METHOD != ch->tmit_state
879 || MSG_STATE_MODIFIER != ch->tmit_state
880 || MSG_STATE_MOD_CONT != ch->tmit_state
881 || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size)
882 { 741 {
883 transmit_error (ch); 742 const struct GNUNET_MessageHeader *pmsg
884 return; 743 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
744 psize = ntohs (pmsg->size);
745 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746 "Received message part of type %u and size %u "
747 "from client.\n", ntohs (pmsg->type), psize);
748 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
749 {
750 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
751 "Received invalid message part of type %u and size %u "
752 "from client.\n", ntohs (pmsg->type), psize);
753 GNUNET_break (0);
754 transmit_error (ch);
755 return;
756 }
885 } 757 }
886 ch->tmit_state = MSG_STATE_DATA;
887 758
888 queue_message (ch, msg); 759 size -= sizeof (*msg);
889 send_transmit_ack (ch); 760 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
890 761 tmit_msg->buf = (char *) &tmit_msg[1];
891 if (MSG_STATE_END <= ch->tmit_state) 762 memcpy (tmit_msg->buf, &msg[1], size);
892 ch->tmit_state = MSG_STATE_START; 763 tmit_msg->size = size;
764 tmit_msg->state = ch->tmit_state;
765 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
766 transmit_message (ch);
893 767
894 GNUNET_SERVER_receive_done (client, GNUNET_OK); 768 GNUNET_SERVER_receive_done (client, GNUNET_OK);
895}; 769};
@@ -912,22 +786,9 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
912 786
913 { &handle_slave_join, NULL, 787 { &handle_slave_join, NULL,
914 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, 788 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
915#if TODO 789
916 { &handle_psyc_message, NULL, 790 { &handle_psyc_message, NULL,
917 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, 791 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
918#endif
919 { &handle_transmit_method, NULL,
920 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
921
922 { &handle_transmit_modifier, NULL,
923 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
924
925 { &handle_transmit_mod_cont, NULL,
926 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT, 0 },
927
928 { &handle_transmit_data, NULL,
929 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
930 { NULL, NULL, 0, 0 }
931 }; 792 };
932 793
933 cfg = c; 794 cfg = c;
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index a5a01fa92..e904e00b5 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -45,7 +45,7 @@ struct OperationHandle
45{ 45{
46 struct OperationHandle *prev; 46 struct OperationHandle *prev;
47 struct OperationHandle *next; 47 struct OperationHandle *next;
48 const struct GNUNET_MessageHeader *msg; 48 struct GNUNET_MessageHeader *msg;
49}; 49};
50 50
51/** 51/**
@@ -79,6 +79,11 @@ struct GNUNET_PSYC_Channel
79 struct OperationHandle *tmit_tail; 79 struct OperationHandle *tmit_tail;
80 80
81 /** 81 /**
82 * Message being transmitted to the PSYC service.
83 */
84 struct OperationHandle *tmit_msg;
85
86 /**
82 * Message to send on reconnect. 87 * Message to send on reconnect.
83 */ 88 */
84 struct GNUNET_MessageHeader *reconnect_msg; 89 struct GNUNET_MessageHeader *reconnect_msg;
@@ -139,11 +144,6 @@ struct GNUNET_PSYC_Channel
139 uint32_t recv_mod_value_size; 144 uint32_t recv_mod_value_size;
140 145
141 /** 146 /**
142 * Buffer space available for transmitting the next data fragment.
143 */
144 uint16_t tmit_size; // FIXME
145
146 /**
147 * Is transmission paused? 147 * Is transmission paused?
148 */ 148 */
149 uint8_t tmit_paused; 149 uint8_t tmit_paused;
@@ -151,7 +151,7 @@ struct GNUNET_PSYC_Channel
151 /** 151 /**
152 * Are we still waiting for a PSYC_TRANSMIT_ACK? 152 * Are we still waiting for a PSYC_TRANSMIT_ACK?
153 */ 153 */
154 uint8_t tmit_ack_pending; // FIXME 154 uint8_t tmit_ack_pending;
155 155
156 /** 156 /**
157 * Are we polling for incoming messages right now? 157 * Are we polling for incoming messages right now?
@@ -176,7 +176,7 @@ struct GNUNET_PSYC_Channel
176struct GNUNET_PSYC_MasterTransmitHandle 176struct GNUNET_PSYC_MasterTransmitHandle
177{ 177{
178 struct GNUNET_PSYC_Master *master; 178 struct GNUNET_PSYC_Master *master;
179 GNUNET_PSYC_MasterTransmitNotify notify_mod; 179 GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod;
180 GNUNET_PSYC_MasterTransmitNotify notify_data; 180 GNUNET_PSYC_MasterTransmitNotify notify_data;
181 void *notify_cls; 181 void *notify_cls;
182 enum MessageState state; 182 enum MessageState state;
@@ -246,16 +246,14 @@ struct GNUNET_PSYC_StateQuery
246}; 246};
247 247
248 248
249/**
250 * Try again to connect to the PSYC service.
251 *
252 * @param cls Handle to the PSYC service.
253 * @param tc Scheduler context
254 */
255static void 249static void
256reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); 250reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
257 251
258 252
253static void
254master_transmit_data (struct GNUNET_PSYC_Master *mst);
255
256
259/** 257/**
260 * Reschedule a connect attempt to the service. 258 * Reschedule a connect attempt to the service.
261 * 259 *
@@ -323,6 +321,79 @@ recv_error (struct GNUNET_PSYC_Channel *ch)
323 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); 321 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
324} 322}
325 323
324
325/**
326 * Queue an incoming message part for transmission to the PSYC service.
327 *
328 * The message part is added to the current message buffer.
329 * When this buffer is full, it is added to the transmission queue.
330 *
331 * @param ch Channel struct for the client.
332 * @param msg Modifier message part, or NULL when there's no more modifiers.
333 * @param end End of message.
334 */
335static void
336queue_message (struct GNUNET_PSYC_Channel *ch,
337 const struct GNUNET_MessageHeader *msg,
338 uint8_t end)
339{
340 uint16_t size = msg ? ntohs (msg->size) : 0;
341
342 LOG (GNUNET_ERROR_TYPE_DEBUG,
343 "Queueing message of type %u and size %u (end: %u)).\n",
344 ntohs (msg->type), size, end);
345
346 struct OperationHandle *op = ch->tmit_msg;
347 if (NULL != op)
348 {
349 if (NULL == msg
350 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size)
351 {
352 /* End of message or buffer is full, add it to transmission queue
353 * and start with empty buffer */
354 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
355 op->msg->size = htons (op->msg->size);
356 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
357 ch->tmit_msg = op = NULL;
358 ch->tmit_ack_pending++;
359 }
360 else
361 {
362 /* Message fits in current buffer, append */
363 ch->tmit_msg = op
364 = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size);
365 op->msg = (struct GNUNET_MessageHeader *) &op[1];
366 memcpy ((char *) op->msg + op->msg->size, msg, size);
367 op->msg->size += size;
368 }
369 }
370
371 if (NULL == op && NULL != msg)
372 {
373 /* Empty buffer, copy over message. */
374 ch->tmit_msg = op
375 = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size);
376 op->msg = (struct GNUNET_MessageHeader *) &op[1];
377 op->msg->size = sizeof (*op->msg) + size;
378 memcpy (&op->msg[1], msg, size);
379 }
380
381 if (NULL != op
382 && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
383 < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
384 {
385 /* End of message or buffer is full, add it to transmission queue. */
386 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
387 op->msg->size = htons (op->msg->size);
388 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
389 ch->tmit_msg = op = NULL;
390 ch->tmit_ack_pending++;
391 }
392
393 transmit_next (ch);
394}
395
396
326/** 397/**
327 * Request a modifier from a client to transmit. 398 * Request a modifier from a client to transmit.
328 * 399 *
@@ -332,32 +403,71 @@ static void
332master_transmit_mod (struct GNUNET_PSYC_Master *mst) 403master_transmit_mod (struct GNUNET_PSYC_Master *mst)
333{ 404{
334 struct GNUNET_PSYC_Channel *ch = &mst->ch; 405 struct GNUNET_PSYC_Channel *ch = &mst->ch;
335 uint16_t max_data_size 406 uint16_t max_data_size, data_size;
336 = ch->tmit_size > sizeof (struct GNUNET_MessageHeader) 407 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
337 ? GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - ch->tmit_size 408 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
338 : GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD - ch->tmit_size; 409 int notify_ret;
339 uint16_t data_size = max_data_size;
340 410
341 struct GNUNET_MessageHeader *msg; 411 switch (mst->tmit->state)
342 struct OperationHandle *op 412 {
343 = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); 413 case MSG_STATE_MODIFIER:
344 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; 414 {
345 msg->type 415 struct GNUNET_PSYC_MessageModifier *mod
346 = MSG_STATE_MODIFIER == mst->tmit->state 416 = (struct GNUNET_PSYC_MessageModifier *) msg;
347 ? htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER) 417 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
348 : htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); 418 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
419 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
420 notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
421 &data_size, &mod[1], &mod->oper);
422 mod->name_size = strnlen ((char *) &mod[1], data_size);
423 if (mod->name_size < data_size)
424 {
425 mod->oper = htons (mod->oper);
426 mod->value_size = htons (data_size - 1 - mod->name_size);
427 mod->name_size = htons (mod->name_size);
428 }
429 else if (0 < data_size)
430 {
431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
432 notify_ret = GNUNET_SYSERR;
433 }
434 break;
435 }
436 case MSG_STATE_MOD_CONT:
437 {
438 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
439 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
440 msg->size = sizeof (struct GNUNET_MessageHeader);
441 notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
442 &data_size, &msg[1], NULL);
443 break;
444 }
445 default:
446 GNUNET_assert (0);
447 }
349 448
350 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
351 &data_size, &msg[1]);
352 switch (notify_ret) 449 switch (notify_ret)
353 { 450 {
354 case GNUNET_NO: 451 case GNUNET_NO:
355 if (0 != data_size) 452 if (0 == data_size)
356 mst->tmit->state = MSG_STATE_MOD_CONT; 453 { /* Transmission paused, nothing to send. */
454 ch->tmit_paused = GNUNET_YES;
455 return;
456 }
457 mst->tmit->state = MSG_STATE_MOD_CONT;
357 break; 458 break;
358 459
359 case GNUNET_YES: 460 case GNUNET_YES:
360 mst->tmit->state = (0 == data_size) ? MSG_STATE_DATA : MSG_STATE_MODIFIER; 461 if (0 == data_size)
462 {
463 /* End of modifiers. */
464 mst->tmit->state = MSG_STATE_DATA;
465 if (0 == ch->tmit_ack_pending)
466 master_transmit_data (mst);
467
468 return;
469 }
470 mst->tmit->state = MSG_STATE_MODIFIER;
361 break; 471 break;
362 472
363 default: 473 default:
@@ -368,36 +478,18 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
368 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 478 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
369 msg->size = htons (sizeof (*msg)); 479 msg->size = htons (sizeof (*msg));
370 480
371 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 481 queue_message (ch, msg, GNUNET_YES);
372 transmit_next (ch);
373 return; 482 return;
374 } 483 }
375 484
376 if ((GNUNET_NO == notify_ret && 0 == data_size))
377 {
378 /* Transmission paused, nothing to send. */
379 ch->tmit_paused = GNUNET_YES;
380 GNUNET_free (op);
381 }
382
383 if (0 < data_size) 485 if (0 < data_size)
384 { 486 {
385 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); 487 GNUNET_assert (data_size <= max_data_size);
386 msg->size = htons (sizeof (*msg) + data_size); 488 msg->size = htons (msg->size + data_size);
387 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 489 queue_message (ch, msg, GNUNET_NO);
388 }
389
390 /* End of message. */
391 if (GNUNET_YES == notify_ret)
392 {
393 op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
394 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
395 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
396 msg->size = htons (sizeof (*msg));
397 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
398 } 490 }
399 491
400 transmit_next (ch); 492 master_transmit_mod (mst);
401} 493}
402 494
403 495
@@ -410,11 +502,10 @@ static void
410master_transmit_data (struct GNUNET_PSYC_Master *mst) 502master_transmit_data (struct GNUNET_PSYC_Master *mst)
411{ 503{
412 struct GNUNET_PSYC_Channel *ch = &mst->ch; 504 struct GNUNET_PSYC_Channel *ch = &mst->ch;
413 struct GNUNET_MessageHeader *msg;
414 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; 505 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
415 struct OperationHandle *op 506 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
416 = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); 507 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
417 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; 508
418 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); 509 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
419 510
420 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, 511 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
@@ -426,7 +517,7 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
426 { 517 {
427 /* Transmission paused, nothing to send. */ 518 /* Transmission paused, nothing to send. */
428 ch->tmit_paused = GNUNET_YES; 519 ch->tmit_paused = GNUNET_YES;
429 GNUNET_free (op); 520 return;
430 } 521 }
431 break; 522 break;
432 523
@@ -441,9 +532,7 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
441 mst->tmit->state = MSG_STATE_START; 532 mst->tmit->state = MSG_STATE_START;
442 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 533 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
443 msg->size = htons (sizeof (*msg)); 534 msg->size = htons (sizeof (*msg));
444 535 queue_message (ch, msg, GNUNET_YES);
445 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
446 transmit_next (ch);
447 return; 536 return;
448 } 537 }
449 538
@@ -451,20 +540,16 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
451 { 540 {
452 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); 541 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
453 msg->size = htons (sizeof (*msg) + data_size); 542 msg->size = htons (sizeof (*msg) + data_size);
454 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 543 queue_message (ch, msg, !notify_ret);
455 } 544 }
456 545
457 /* End of message. */ 546 /* End of message. */
458 if (GNUNET_YES == notify_ret) 547 if (GNUNET_YES == notify_ret)
459 { 548 {
460 op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
461 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
462 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); 549 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
463 msg->size = htons (sizeof (*msg)); 550 msg->size = htons (sizeof (*msg));
464 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 551 queue_message (ch, msg, GNUNET_YES);
465 } 552 }
466
467 transmit_next (ch);
468} 553}
469 554
470 555
@@ -476,57 +561,55 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
476 */ 561 */
477static void 562static void
478handle_psyc_message (struct GNUNET_PSYC_Channel *ch, 563handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
479 const struct GNUNET_PSYC_MessageHeader *pmsg) 564 const struct GNUNET_PSYC_MessageHeader *msg)
480{ 565{
481 const struct GNUNET_MessageHeader *msg; 566 uint16_t size = ntohs (msg->header.size);
482 uint16_t msize = ntohs (pmsg->header.size);
483 uint16_t pos = 0;
484 uint16_t size = 0;
485 uint16_t type, size_eq, size_min;
486 567
487 if (MSG_STATE_START == ch->recv_state) 568 if (MSG_STATE_START == ch->recv_state)
488 { 569 {
489 ch->recv_message_id = GNUNET_ntohll (pmsg->message_id); 570 ch->recv_message_id = GNUNET_ntohll (msg->message_id);
490 ch->recv_flags = ntohl (pmsg->flags); 571 ch->recv_flags = ntohl (msg->flags);
491 } 572 }
492 else if (GNUNET_ntohll (pmsg->message_id) != ch->recv_message_id) 573 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
493 { 574 {
494 LOG (GNUNET_ERROR_TYPE_WARNING, 575 LOG (GNUNET_ERROR_TYPE_WARNING,
495 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", 576 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
496 GNUNET_ntohll (pmsg->message_id), ch->recv_message_id); 577 GNUNET_ntohll (msg->message_id), ch->recv_message_id);
497 GNUNET_break_op (0); 578 GNUNET_break_op (0);
498 recv_error (ch); 579 recv_error (ch);
499 } 580 }
500 else if (ntohl (pmsg->flags) != ch->recv_flags) 581 else if (ntohl (msg->flags) != ch->recv_flags)
501 { 582 {
502 LOG (GNUNET_ERROR_TYPE_WARNING, 583 LOG (GNUNET_ERROR_TYPE_WARNING,
503 "Unexpected message flags. Got: %lu, expected: %lu\n", 584 "Unexpected message flags. Got: %lu, expected: %lu\n",
504 ntohl (pmsg->flags), ch->recv_flags); 585 ntohl (msg->flags), ch->recv_flags);
505 GNUNET_break_op (0); 586 GNUNET_break_op (0);
506 recv_error (ch); 587 recv_error (ch);
507 } 588 }
508 589
509 for (pos = 0; sizeof (*pmsg) + pos < msize; pos += size) 590 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
591
592 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
510 { 593 {
511 msg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); 594 const struct GNUNET_MessageHeader *pmsg
512 size = ntohs (msg->size); 595 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
513 type = ntohs (msg->type); 596 psize = ntohs (pmsg->size);
597 ptype = ntohs (pmsg->type);
514 size_eq = size_min = 0; 598 size_eq = size_min = 0;
515 599
516 if (msize < sizeof (*pmsg) + pos + size) 600 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
601 "Received message part of type %u and size %u from PSYC.\n",
602 ptype, psize);
603
604 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
517 { 605 {
518 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 606 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
519 "Discarding message of type %u with invalid size. " 607 "Discarding message of type %u with invalid size %u.\n",
520 "(%u < %u + %u + %u)\n", ntohs (msg->type), 608 ptype, psize);
521 msize, sizeof (*msg), pos, size);
522 break; 609 break;
523 } 610 }
524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525 "Received message part of type %u and size %u from PSYC.\n",
526 ntohs (msg->type), size);
527 611
528 612 switch (ptype)
529 switch (type)
530 { 613 {
531 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 614 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
532 size_min = sizeof (struct GNUNET_PSYC_MessageMethod); 615 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
@@ -534,6 +617,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
534 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: 617 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
535 size_min = sizeof (struct GNUNET_PSYC_MessageModifier); 618 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
536 break; 619 break;
620 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
537 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: 621 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
538 size_min = sizeof (struct GNUNET_MessageHeader); 622 size_min = sizeof (struct GNUNET_MessageHeader);
539 break; 623 break;
@@ -543,22 +627,22 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
543 break; 627 break;
544 } 628 }
545 629
546 if (! ((0 < size_eq && size == size_eq) 630 if (! ((0 < size_eq && psize == size_eq)
547 || (0 < size_min && size_min <= size))) 631 || (0 < size_min && size_min <= psize)))
548 { 632 {
549 GNUNET_break (0); 633 GNUNET_break (0);
550 reschedule_connect (ch); 634 reschedule_connect (ch);
551 return; 635 return;
552 } 636 }
553 637
554 switch (type) 638 switch (ptype)
555 { 639 {
556 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 640 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
557 { 641 {
558 struct GNUNET_PSYC_MessageMethod *meth 642 struct GNUNET_PSYC_MessageMethod *meth
559 = (struct GNUNET_PSYC_MessageMethod *) msg; 643 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
560 644
561 if (MSG_STATE_HEADER != ch->recv_state) 645 if (MSG_STATE_START != ch->recv_state)
562 { 646 {
563 LOG (GNUNET_ERROR_TYPE_WARNING, 647 LOG (GNUNET_ERROR_TYPE_WARNING,
564 "Discarding out of order message method.\n"); 648 "Discarding out of order message method.\n");
@@ -568,89 +652,66 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
568 */ 652 */
569 GNUNET_break_op (0); 653 GNUNET_break_op (0);
570 recv_error (ch); 654 recv_error (ch);
571 break; 655 return;
572 } 656 }
573 657
574 if ('\0' != (char *) meth + msg->size - 1) 658 if ('\0' != *((char *) meth + psize - 1))
575 { 659 {
576 LOG (GNUNET_ERROR_TYPE_WARNING, 660 LOG (GNUNET_ERROR_TYPE_WARNING,
577 "Discarding message with malformed method. " 661 "Discarding message with malformed method. "
578 "Message ID: %" PRIu64 "\n", ch->recv_message_id); 662 "Message ID: %" PRIu64 "\n", ch->recv_message_id);
579 GNUNET_break_op (0); 663 GNUNET_break_op (0);
580 recv_error (ch); 664 recv_error (ch);
581 break; 665 return;
582 } 666 }
583 GNUNET_PSYC_MessageCallback message_cb
584 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
585 ? ch->hist_message_cb
586 : ch->message_cb;
587
588 if (NULL != message_cb)
589 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
590
591 ch->recv_state = MSG_STATE_METHOD; 667 ch->recv_state = MSG_STATE_METHOD;
592 break; 668 break;
593 } 669 }
594 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: 670 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
595 { 671 {
596 if (MSG_STATE_MODIFIER != ch->recv_state) 672 if (!(MSG_STATE_METHOD == ch->recv_state
673 || MSG_STATE_MODIFIER == ch->recv_state
674 || MSG_STATE_MOD_CONT == ch->recv_state))
597 { 675 {
598 LOG (GNUNET_ERROR_TYPE_WARNING, 676 LOG (GNUNET_ERROR_TYPE_WARNING,
599 "Discarding out of order message modifier.\n"); 677 "Discarding out of order message modifier.\n");
600 GNUNET_break_op (0); 678 GNUNET_break_op (0);
601 recv_error (ch); 679 recv_error (ch);
602 break; 680 return;
603 } 681 }
604 682
605 struct GNUNET_PSYC_MessageModifier *mod 683 struct GNUNET_PSYC_MessageModifier *mod
606 = (struct GNUNET_PSYC_MessageModifier *) msg; 684 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
607 685
608 uint16_t name_size = ntohs (mod->name_size); 686 uint16_t name_size = ntohs (mod->name_size);
609 ch->recv_mod_value_size_expected = ntohs (mod->value_size); 687 ch->recv_mod_value_size_expected = ntohs (mod->value_size);
610 ch->recv_mod_value_size = size - sizeof (*mod) - name_size - 1; 688 ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
611 689
612 if (size < sizeof (*mod) + name_size + 1 690 if (psize < sizeof (*mod) + name_size + 1
613 || '\0' != (char *) &mod[1] + mod->name_size 691 || '\0' != *((char *) &mod[1] + name_size)
614 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) 692 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
615 { 693 {
616 LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); 694 LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n");
617 GNUNET_break_op (0); 695 GNUNET_break_op (0);
618 break; 696 return;
619 } 697 }
620
621 ch->recv_state = MSG_STATE_MODIFIER; 698 ch->recv_state = MSG_STATE_MODIFIER;
622
623 GNUNET_PSYC_MessageCallback message_cb
624 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
625 ? ch->hist_message_cb
626 : ch->message_cb;
627
628 if (NULL != message_cb)
629 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
630
631 break; 699 break;
632 } 700 }
633 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: 701 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
634 { 702 {
635 ch->recv_mod_value_size += size - sizeof (*msg); 703 ch->recv_mod_value_size += psize - sizeof (*pmsg);
636 704
637 if (MSG_STATE_MODIFIER != ch->recv_state 705 if (!(MSG_STATE_MODIFIER == ch->recv_state
706 || MSG_STATE_MOD_CONT == ch->recv_state)
638 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) 707 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
639 { 708 {
640 LOG (GNUNET_ERROR_TYPE_WARNING, 709 LOG (GNUNET_ERROR_TYPE_WARNING,
641 "Discarding out of order message modifier continuation.\n"); 710 "Discarding out of order message modifier continuation.\n");
642 GNUNET_break_op (0); 711 GNUNET_break_op (0);
643 recv_reset (ch); 712 recv_reset (ch);
644 break; 713 return;
645 } 714 }
646
647 GNUNET_PSYC_MessageCallback message_cb
648 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
649 ? ch->hist_message_cb
650 : ch->message_cb;
651
652 if (NULL != message_cb)
653 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
654 break; 715 break;
655 } 716 }
656 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: 717 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
@@ -662,12 +723,23 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
662 "Discarding out of order message data fragment.\n"); 723 "Discarding out of order message data fragment.\n");
663 GNUNET_break_op (0); 724 GNUNET_break_op (0);
664 recv_reset (ch); 725 recv_reset (ch);
665 break; 726 return;
666 } 727 }
667
668 ch->recv_state = MSG_STATE_DATA; 728 ch->recv_state = MSG_STATE_DATA;
669 break; 729 break;
670 } 730 }
731 }
732
733 GNUNET_PSYC_MessageCallback message_cb
734 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
735 ? ch->hist_message_cb
736 : ch->message_cb;
737
738 if (NULL != message_cb)
739 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg);
740
741 switch (ptype)
742 {
671 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: 743 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
672 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: 744 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
673 recv_reset (ch); 745 recv_reset (ch);
@@ -717,18 +789,7 @@ message_handler (void *cls,
717 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: 789 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
718 size_min = sizeof (struct GNUNET_PSYC_MessageHeader); 790 size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
719 break; 791 break;
720 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 792 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
721 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
722 break;
723 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
724 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
725 break;
726 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
727 size_min = sizeof (struct GNUNET_MessageHeader);
728 break;
729 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
730 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
731 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
732 size_eq = sizeof (struct GNUNET_MessageHeader); 793 size_eq = sizeof (struct GNUNET_MessageHeader);
733 break; 794 break;
734 } 795 }
@@ -761,9 +822,15 @@ message_handler (void *cls,
761#endif 822#endif
762 break; 823 break;
763 } 824 }
764 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: 825 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
765 { 826 {
766 ch->tmit_ack_pending = GNUNET_NO; 827 if (0 == ch->tmit_ack_pending)
828 {
829 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
830 GNUNET_break (0);
831 break;
832 }
833 ch->tmit_ack_pending--;
767 834
768 if (ch->is_master) 835 if (ch->is_master)
769 { 836 {
@@ -771,10 +838,6 @@ message_handler (void *cls,
771 switch (mst->tmit->state) 838 switch (mst->tmit->state)
772 { 839 {
773 case MSG_STATE_MODIFIER: 840 case MSG_STATE_MODIFIER:
774 if (GNUNET_NO == ch->tmit_paused)
775 master_transmit_mod (mst);
776 break;
777
778 case MSG_STATE_MOD_CONT: 841 case MSG_STATE_MOD_CONT:
779 if (GNUNET_NO == ch->tmit_paused) 842 if (GNUNET_NO == ch->tmit_paused)
780 master_transmit_mod (mst); 843 master_transmit_mod (mst);
@@ -795,12 +858,13 @@ message_handler (void *cls,
795 else 858 else
796 { 859 {
797 LOG (GNUNET_ERROR_TYPE_WARNING, 860 LOG (GNUNET_ERROR_TYPE_WARNING,
798 "Ignoring transmit ack, there's no transmission going on.\n"); 861 "Ignoring message ACK, there's no transmission going on.\n");
862 GNUNET_break (0);
799 } 863 }
800 break; 864 break;
801 default: 865 default:
802 LOG (GNUNET_ERROR_TYPE_WARNING, 866 LOG (GNUNET_ERROR_TYPE_DEBUG,
803 "Ignoring unexpected transmit ack.\n"); 867 "Ignoring message ACK in state %u.\n", mst->tmit->state);
804 } 868 }
805 } 869 }
806 else 870 else
@@ -811,12 +875,15 @@ message_handler (void *cls,
811 } 875 }
812 876
813 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: 877 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
814 handle_psyc_message(ch, (const struct GNUNET_PSYC_MessageHeader *) msg); 878 handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
815 break; 879 break;
816 } 880 }
817 881
818 GNUNET_CLIENT_receive (ch->client, &message_handler, ch, 882 if (NULL != ch->client)
819 GNUNET_TIME_UNIT_FOREVER_REL); 883 {
884 GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
885 GNUNET_TIME_UNIT_FOREVER_REL);
886 }
820} 887}
821 888
822 889
@@ -1029,6 +1096,8 @@ void
1029GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) 1096GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
1030{ 1097{
1031 disconnect (master); 1098 disconnect (master);
1099 if (NULL != master->tmit)
1100 GNUNET_free (master->tmit);
1032 GNUNET_free (master); 1101 GNUNET_free (master);
1033} 1102}
1034 1103
@@ -1069,30 +1138,6 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1069} 1138}
1070 1139
1071 1140
1072/* FIXME: split up value into <64K chunks and transmit the continuations in
1073 * MOD_CONT msgs */
1074static int
1075send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
1076{
1077 struct GNUNET_PSYC_Channel *ch = cls;
1078 size_t name_size = strlen (mod->name) + 1;
1079 struct GNUNET_PSYC_MessageModifier *pmod;
1080 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmod)
1081 + name_size + mod->value_size);
1082 pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
1083 op->msg = (struct GNUNET_MessageHeader *) pmod;
1084
1085 pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
1086 pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size);
1087 pmod->name_size = htons (name_size);
1088 memcpy (&pmod[1], mod->name, name_size);
1089 memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
1090
1091 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
1092 return GNUNET_YES;
1093}
1094
1095
1096/** 1141/**
1097 * Send a message to call a method to all members in the PSYC channel. 1142 * Send a message to call a method to all members in the PSYC channel.
1098 * 1143 *
@@ -1107,7 +1152,7 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
1107struct GNUNET_PSYC_MasterTransmitHandle * 1152struct GNUNET_PSYC_MasterTransmitHandle *
1108GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, 1153GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1109 const char *method_name, 1154 const char *method_name,
1110 GNUNET_PSYC_MasterTransmitNotify notify_mod, 1155 GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod,
1111 GNUNET_PSYC_MasterTransmitNotify notify_data, 1156 GNUNET_PSYC_MasterTransmitNotify notify_data,
1112 void *notify_cls, 1157 void *notify_cls,
1113 enum GNUNET_PSYC_MasterTransmitFlags flags) 1158 enum GNUNET_PSYC_MasterTransmitFlags flags)
@@ -1120,25 +1165,27 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1120 1165
1121 size_t size = strlen (method_name) + 1; 1166 size_t size = strlen (method_name) + 1;
1122 struct GNUNET_PSYC_MessageMethod *pmeth; 1167 struct GNUNET_PSYC_MessageMethod *pmeth;
1123 struct OperationHandle *op 1168 struct OperationHandle *op;
1124 = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size);
1125 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1];
1126 op->msg = (struct GNUNET_MessageHeader *) pmeth;
1127 1169
1170 ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
1171 + sizeof (*pmeth) + size);
1172 op->msg = (struct GNUNET_MessageHeader *) &op[1];
1173 op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
1174
1175 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
1128 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); 1176 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
1129 pmeth->header.size = htons (sizeof (*pmeth) + size); 1177 pmeth->header.size = htons (sizeof (*pmeth) + size);
1130 pmeth->flags = htonl (flags); 1178 pmeth->flags = htonl (flags);
1131 memcpy (&pmeth[1], method_name, size); 1179 memcpy (&pmeth[1], method_name, size);
1132 1180
1133 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
1134 transmit_next (ch);
1135
1136 master->tmit = GNUNET_malloc (sizeof (*master->tmit)); 1181 master->tmit = GNUNET_malloc (sizeof (*master->tmit));
1137 master->tmit->master = master; 1182 master->tmit->master = master;
1138 master->tmit->notify_mod = notify_mod; 1183 master->tmit->notify_mod = notify_mod;
1139 master->tmit->notify_data = notify_data; 1184 master->tmit->notify_data = notify_data;
1140 master->tmit->notify_cls = notify_cls; 1185 master->tmit->notify_cls = notify_cls;
1141 master->tmit->state = MSG_STATE_START; // FIXME 1186 master->tmit->state = MSG_STATE_MODIFIER;
1187
1188 master_transmit_mod (master);
1142 return master->tmit; 1189 return master->tmit;
1143} 1190}
1144 1191
@@ -1152,7 +1199,7 @@ void
1152GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 1199GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1153{ 1200{
1154 struct GNUNET_PSYC_Channel *ch = &th->master->ch; 1201 struct GNUNET_PSYC_Channel *ch = &th->master->ch;
1155 if (GNUNET_NO == ch->tmit_ack_pending) 1202 if (0 == ch->tmit_ack_pending)
1156 { 1203 {
1157 ch->tmit_paused = GNUNET_NO; 1204 ch->tmit_paused = GNUNET_NO;
1158 master_transmit_data (th->master); 1205 master_transmit_data (th->master);
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 704819c50..33684b125 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -25,6 +25,8 @@
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 */ 26 */
27 27
28#include <inttypes.h>
29
28#include "platform.h" 30#include "platform.h"
29#include "gnunet_crypto_lib.h" 31#include "gnunet_crypto_lib.h"
30#include "gnunet_common.h" 32#include "gnunet_common.h"
@@ -35,7 +37,7 @@
35 37
36#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) 38#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
37 39
38#define DEBUG_SERVICE 1 40#define DEBUG_SERVICE 0
39 41
40 42
41/** 43/**
@@ -62,17 +64,37 @@ static struct GNUNET_CRYPTO_EddsaPublicKey slave_pub_key;
62 64
63struct GNUNET_PSYC_MasterTransmitHandle *mth; 65struct GNUNET_PSYC_MasterTransmitHandle *mth;
64 66
67struct TransmitClosure
68{
69 struct GNUNET_PSYC_MasterTransmitHandle *handle;
70 struct GNUNET_ENV_Environment *env;
71 char *data[16];
72 const char *mod_value;
73 size_t mod_value_size;
74 uint8_t data_count;
75 uint8_t paused;
76 uint8_t n;
77};
78
79struct TransmitClosure *tmit;
80
65/** 81/**
66 * Clean up all resources used. 82 * Clean up all resources used.
67 */ 83 */
68static void 84static void
69cleanup () 85cleanup ()
70{ 86{
71 if (mst != NULL) 87 if (NULL != mst)
72 { 88 {
73 GNUNET_PSYC_master_stop (mst); 89 GNUNET_PSYC_master_stop (mst);
74 mst = NULL; 90 mst = NULL;
75 } 91 }
92 if (NULL != tmit)
93 {
94 GNUNET_ENV_environment_destroy (tmit->env);
95 GNUNET_free (tmit);
96 tmit = NULL;
97 }
76 GNUNET_SCHEDULER_shutdown (); 98 GNUNET_SCHEDULER_shutdown ();
77} 99}
78 100
@@ -121,44 +143,40 @@ end ()
121} 143}
122 144
123 145
124static int 146static void
125method (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 147message (void *cls, uint64_t message_id, uint32_t flags,
126 uint64_t message_id, const char *name, 148 const struct GNUNET_MessageHeader *msg)
127 size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers,
128 uint64_t data_offset, const void *data, size_t data_size,
129 enum GNUNET_PSYC_MessageFlags flags)
130{ 149{
131 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 150 if (NULL == msg)
132 "Method: %s, modifiers: %lu, flags: %u\n%.*s\n", 151 {
133 name, modifier_count, flags, data_size, data); 152 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
134 return GNUNET_OK; 153 "Error while receiving message %llu\n", message_id);
135} 154 return;
155 }
136 156
157 uint16_t type = ntohs (msg->type);
158 uint16_t size = ntohs (msg->size);
137 159
138static int 160 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
139join (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 161 "Got message part of type %u and size %u "
140 const char *method_name, 162 "belonging to message ID %llu with flags %u\n",
141 size_t variable_count, const struct GNUNET_ENV_Modifier *variables, 163 type, size, message_id, flags);
142 const void *data, size_t data_size, struct GNUNET_PSYC_JoinHandle *jh) 164
143{ 165 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
144 return GNUNET_OK; 166 end ();
145} 167}
146 168
147 169
148struct TransmitClosure 170static void
171join_request (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
172 const char *method_name,
173 size_t variable_count, const struct GNUNET_ENV_Modifier *variables,
174 const void *data, size_t data_size,
175 struct GNUNET_PSYC_JoinHandle *jh)
149{ 176{
150 struct GNUNET_PSYC_MasterTransmitHandle *handle; 177 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
151 178 "Got join request.");
152 char *mod_names[16]; 179}
153 char *mod_values[16];
154 char *data[16];
155
156 uint8_t mod_count;
157 uint8_t data_count;
158
159 uint8_t paused;
160 uint8_t n;
161};
162 180
163 181
164static void 182static void
@@ -172,45 +190,95 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
172 190
173 191
174static int 192static int
175tmit_notify_mod (void *cls, size_t *data_size, void *data) 193tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper)
176{ 194{
177 struct TransmitClosure *tmit = cls; 195 struct TransmitClosure *tmit = cls;
178 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 196 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
179 "Transmit notify modifier: %lu bytes available, " 197 "Transmit notify modifier: %lu bytes available, "
180 "processing modifier %u/%u.\n", 198 "%u modifiers left to process.\n",
181 *data_size, tmit->n + 1, tmit->fragment_count); 199 *data_size, GNUNET_ENV_environment_get_count (tmit->env));
182 /* FIXME: continuation */ 200
183 uint16_t name_size = strlen (tmit->mod_names[tmit->n]); 201 enum GNUNET_ENV_Operator op = 0;
184 uint16_t value_size = strlen (tmit->mod_values[tmit->n]); 202 const char *name = NULL;
185 if (name_size + 1 + value_size <= *data_size) 203 const char *value = NULL;
186 return GNUNET_NO; 204 uint16_t name_size = 0;
187 205 size_t value_size = 0;
188 *data_size = name_size + 1 + value_size; 206
189 memcpy (data, tmit->fragments[tmit->n], *data_size); 207 if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
190 208 { /* Modifier continuation */
191 if (++tmit->n < tmit->mod_count) 209 value = tmit->mod_value;
192 { 210 if (tmit->mod_value_size <= *data_size)
193 return GNUNET_NO; 211 {
212 value_size = tmit->mod_value_size;
213 tmit->mod_value = NULL;
214 }
215 else
216 {
217 value_size = *data_size;
218 tmit->mod_value += value_size;
219 }
220 tmit->mod_value_size -= value_size;
221
222 if (*data_size < value_size)
223 {
224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
225 "value larger than buffer: %u < %zu\n",
226 *data_size, value_size);
227 *data_size = 0;
228 return GNUNET_NO;
229 }
230
231 *data_size = value_size;
232 memcpy (data, value, value_size);
194 } 233 }
195 else 234 else if (NULL != oper)
196 { 235 {
197 tmit->n = 0; 236 if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name,
198 return GNUNET_YES; 237 (void *) &value, &value_size))
238 { /* No more modifiers, continue with data */
239 *data_size = 0;
240 return GNUNET_YES;
241 }
242
243 *oper = op;
244 name_size = strlen (name);
245
246 if (name_size + 1 + value_size <= *data_size)
247 {
248 *data_size = name_size + 1 + value_size;
249 }
250 else
251 {
252 tmit->mod_value_size = value_size;
253 value_size = *data_size - name_size - 1;
254 tmit->mod_value_size -= value_size;
255 tmit->mod_value = value + value_size;
256 }
257
258 memcpy (data, name, name_size);
259 ((char *)data)[name_size] = '\0';
260 memcpy ((char *)data + name_size + 1, value, value_size);
199 } 261 }
262
263 return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO;
200} 264}
201 265
202 266
203static int 267static int
204tmit_notify_data (void *cls, size_t *data_size, void *data) 268tmit_notify_data (void *cls, uint16_t *data_size, void *data)
205{ 269{
206 struct TransmitClosure *tmit = cls; 270 struct TransmitClosure *tmit = cls;
271 uint16_t size = strlen (tmit->data[tmit->n]);
207 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
208 "Transmit notify data: %lu bytes available, " 273 "Transmit notify data: %lu bytes available, "
209 "processing fragment %u/%u.\n", 274 "processing fragment %u/%u (size %u).\n",
210 *data_size, tmit->n + 1, tmit->fragment_count); 275 *data_size, tmit->n + 1, tmit->data_count, size);
211 uint16_t size = strlen (tmit->data[tmit->n]); 276 if (*data_size < size)
212 if (size <= *data_size) 277 {
213 return GNUNET_NO; 278 *data_size = 0;
279 GNUNET_assert (0);
280 return GNUNET_SYSERR;
281 }
214 282
215 if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1) 283 if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1)
216 { 284 {
@@ -231,19 +299,18 @@ tmit_notify_data (void *cls, size_t *data_size, void *data)
231} 299}
232 300
233 301
234void 302static void
235master_started (void *cls, uint64_t max_message_id) 303master_started (void *cls, uint64_t max_message_id)
236{ 304{
237 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 305 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238 "Master started: %lu\n", max_message_id); 306 "Master started: %" PRIu64 "\n", max_message_id);
239 307
240 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); 308 tmit = GNUNET_new (struct TransmitClosure);
241 GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, 309 tmit->env = GNUNET_ENV_environment_create ();
242 "_foo", "bar baz", 7); 310 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
243 GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, 311 "_foo", "bar baz", 7);
244 "_foo_bar", "foo bar baz", 11); 312 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
245 313 "_foo_bar", "foo bar baz", 11);
246 struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure);
247 tmit->data[0] = "foo"; 314 tmit->data[0] = "foo";
248 tmit->data[1] = "foo bar"; 315 tmit->data[1] = "foo bar";
249 tmit->data[2] = "foo bar baz"; 316 tmit->data[2] = "foo bar baz";
@@ -255,7 +322,7 @@ master_started (void *cls, uint64_t max_message_id)
255} 322}
256 323
257 324
258void 325static void
259slave_joined (void *cls, uint64_t max_message_id) 326slave_joined (void *cls, uint64_t max_message_id)
260{ 327{
261 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id); 328 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id);
@@ -288,19 +355,19 @@ run (void *cls,
288 GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); 355 GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key);
289 GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key); 356 GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key);
290 357
291 mst = GNUNET_PSYC_master_start (cfg, channel_key, 358 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE,
292 GNUNET_PSYC_CHANNEL_PRIVATE, 359 &message, &join_request, &master_started, NULL);
293 &method, &join, &master_started, NULL); 360 return; /* FIXME: test slave */
294 return; 361
295 struct GNUNET_PeerIdentity origin; 362 struct GNUNET_PeerIdentity origin;
296 struct GNUNET_PeerIdentity relays[16]; 363 struct GNUNET_PeerIdentity relays[16];
297 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); 364 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
298 GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, 365 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
299 "_foo", "bar baz", 7); 366 "_foo", "bar baz", 7);
300 GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, 367 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
301 "_foo_bar", "foo bar baz", 11); 368 "_foo_bar", "foo bar baz", 11);
302 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, 369 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
303 16, relays, &method, &join, &slave_joined, 370 16, relays, &message, &join_request, &slave_joined,
304 NULL, "_request_join", env, "some data", 9); 371 NULL, "_request_join", env, "some data", 9);
305 GNUNET_ENV_environment_destroy (env); 372 GNUNET_ENV_environment_destroy (env);
306} 373}
@@ -319,8 +386,7 @@ main (int argc, char *argv[])
319 opts, &run, NULL)) 386 opts, &run, NULL))
320 return 1; 387 return 1;
321#else 388#else
322 if (0 != GNUNET_TESTING_service_run ("test-psyc", "psyc", 389 if (0 != GNUNET_TESTING_peer_run ("test-psyc", "test_psyc.conf", &run, NULL))
323 "test_psyc.conf", &run, NULL))
324 return 1; 390 return 1;
325#endif 391#endif
326 return res; 392 return res;
diff --git a/src/psyc/test_psyc.conf b/src/psyc/test_psyc.conf
index 1e646239a..7a1eb8404 100644
--- a/src/psyc/test_psyc.conf
+++ b/src/psyc/test_psyc.conf
@@ -8,3 +8,10 @@ BINARY = gnunet-service-psyc
8UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psyc.sock 8UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psyc.sock
9UNIX_MATCH_UID = NO 9UNIX_MATCH_UID = NO
10UNIX_MATCH_GID = YES 10UNIX_MATCH_GID = YES
11
12[psycstore]
13AUTOSTART = YES
14BINARY = gnunet-service-psycstore
15UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psycstore.sock
16UNIX_MATCH_UID = NO
17UNIX_MATCH_GID = YES