diff options
author | Gabor X Toth <*@tg-x.net> | 2014-01-06 00:09:43 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-01-06 00:09:43 +0000 |
commit | 1a0ffe2288b97b47a5b2bfbda2f9438680429422 (patch) | |
tree | 72db4cd67f06253a60bf3e2966fd0b1bf55eba5c | |
parent | 43d497d7c4ebb6efae37ae4bb2f812a68aa64a32 (diff) | |
download | gnunet-1a0ffe2288b97b47a5b2bfbda2f9438680429422.tar.gz gnunet-1a0ffe2288b97b47a5b2bfbda2f9438680429422.zip |
psyc: ipc messages, notify callback for modifiers, tests
-rwxr-xr-x | contrib/logread.pl | 2 | ||||
-rw-r--r-- | src/include/gnunet_protocols.h | 2 | ||||
-rw-r--r-- | src/include/gnunet_psyc_service.h | 7 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 9 | ||||
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 323 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 443 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 224 | ||||
-rw-r--r-- | src/psyc/test_psyc.conf | 7 |
8 files changed, 502 insertions, 515 deletions
diff --git a/contrib/logread.pl b/contrib/logread.pl index c6f82a68d..11baf2d86 100755 --- a/contrib/logread.pl +++ b/contrib/logread.pl | |||
@@ -98,7 +98,7 @@ while (<>) | |||
98 | s/\b(multicast|psyc|psycstore|social)\b/BLUE $1/ex; | 98 | s/\b(multicast|psyc|psycstore|social)\b/BLUE $1/ex; |
99 | 99 | ||
100 | # Add message type names | 100 | # Add message type names |
101 | s/(message(?:\s+of)?\s+type\s+)(\d+)/ | 101 | s/(message(?:\s+part)?(?:\s+of)?\s+type\s+)(\d+)/ |
102 | $1 . BRIGHT_CYAN (exists $msgtypes{$2} ? $msgtypes{$2} : 'UNKNOWN') . | 102 | $1 . BRIGHT_CYAN (exists $msgtypes{$2} ? $msgtypes{$2} : 'UNKNOWN') . |
103 | CYAN " ($2)"/e; | 103 | CYAN " ($2)"/e; |
104 | 104 | ||
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 9ca4155e8..2470b3ab1 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -2130,7 +2130,7 @@ extern "C" | |||
2130 | 2130 | ||
2131 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL 697 | 2131 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL 697 |
2132 | 2132 | ||
2133 | #define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK 698 | 2133 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK 698 |
2134 | 2134 | ||
2135 | 2135 | ||
2136 | #define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701 | 2136 | #define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701 |
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h index eb17c9351..f843fbe1f 100644 --- a/src/include/gnunet_psyc_service.h +++ b/src/include/gnunet_psyc_service.h | |||
@@ -426,6 +426,11 @@ typedef int | |||
426 | uint16_t *data_size, | 426 | uint16_t *data_size, |
427 | void *data); | 427 | void *data); |
428 | 428 | ||
429 | typedef int | ||
430 | (*GNUNET_PSYC_MasterTransmitNotifyModifier) (void *cls, | ||
431 | uint16_t *data_size, | ||
432 | void *data, | ||
433 | uint8_t *oper); | ||
429 | 434 | ||
430 | /** | 435 | /** |
431 | * Flags for transmitting messages to a channel by the master. | 436 | * Flags for transmitting messages to a channel by the master. |
@@ -472,7 +477,7 @@ struct GNUNET_PSYC_MasterTransmitHandle; | |||
472 | struct GNUNET_PSYC_MasterTransmitHandle * | 477 | struct GNUNET_PSYC_MasterTransmitHandle * |
473 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | 478 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, |
474 | const char *method_name, | 479 | const char *method_name, |
475 | GNUNET_PSYC_MasterTransmitNotify notify_mod, | 480 | GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod, |
476 | GNUNET_PSYC_MasterTransmitNotify notify_data, | 481 | GNUNET_PSYC_MasterTransmitNotify notify_data, |
477 | void *notify_cls, | 482 | void *notify_cls, |
478 | enum GNUNET_PSYC_MasterTransmitFlags flags); | 483 | enum GNUNET_PSYC_MasterTransmitFlags flags); |
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index 6b784c2f0..bb6a57b58 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c | |||
@@ -362,8 +362,9 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc | |||
362 | struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle; | 362 | struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle; |
363 | 363 | ||
364 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; | 364 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; |
365 | char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | ||
365 | struct GNUNET_MULTICAST_MessageHeader *msg | 366 | struct GNUNET_MULTICAST_MessageHeader *msg |
366 | = GNUNET_malloc (buf_size); | 367 | = (struct GNUNET_MULTICAST_MessageHeader *) buf; |
367 | int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]); | 368 | int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]); |
368 | 369 | ||
369 | if (! (GNUNET_YES == ret || GNUNET_NO == ret) | 370 | if (! (GNUNET_YES == ret || GNUNET_NO == ret) |
@@ -380,12 +381,12 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc | |||
380 | 381 | ||
381 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); | 382 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); |
382 | msg->header.size = htons (sizeof (*msg) + buf_size); | 383 | msg->header.size = htons (sizeof (*msg) + buf_size); |
383 | msg->message_id = mh->message_id; | 384 | msg->message_id = GNUNET_htonll (mh->message_id); |
384 | msg->group_generation = mh->group_generation; | 385 | msg->group_generation = mh->group_generation; |
385 | 386 | ||
386 | /* FIXME: add fragment ID and signature in the service instead of here */ | 387 | /* FIXME: add fragment ID and signature in the service instead of here */ |
387 | msg->fragment_id = orig->next_fragment_id++; | 388 | msg->fragment_id = GNUNET_ntohll (orig->next_fragment_id++); |
388 | msg->fragment_offset = mh->fragment_offset; | 389 | msg->fragment_offset = GNUNET_ntohll (mh->fragment_offset); |
389 | mh->fragment_offset += sizeof (*msg) + buf_size; | 390 | mh->fragment_offset += sizeof (*msg) + buf_size; |
390 | msg->purpose.size = htonl (sizeof (*msg) + buf_size | 391 | msg->purpose.size = htonl (sizeof (*msg) + buf_size |
391 | - sizeof (msg->header) | 392 | - sizeof (msg->header) |
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 628c39900..e5de7dcda 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -171,8 +171,8 @@ struct Slave | |||
171 | }; | 171 | }; |
172 | 172 | ||
173 | 173 | ||
174 | static void | 174 | static inline void |
175 | transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay); | 175 | transmit_message (struct Channel *ch); |
176 | 176 | ||
177 | 177 | ||
178 | /** | 178 | /** |
@@ -205,6 +205,7 @@ client_cleanup (struct Channel *ch) | |||
205 | struct Master *mst = (struct Master *) ch; | 205 | struct Master *mst = (struct Master *) ch; |
206 | if (NULL != mst->origin) | 206 | if (NULL != mst->origin) |
207 | GNUNET_MULTICAST_origin_stop (mst->origin); | 207 | GNUNET_MULTICAST_origin_stop (mst->origin); |
208 | GNUNET_CONTAINER_multihashmap_remove (clients, &mst->pub_key_hash, mst); | ||
208 | } | 209 | } |
209 | else | 210 | else |
210 | { | 211 | { |
@@ -251,7 +252,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
251 | /* Send pending messages to multicast before cleanup. */ | 252 | /* Send pending messages to multicast before cleanup. */ |
252 | if (NULL != ch->tmit_head) | 253 | if (NULL != ch->tmit_head) |
253 | { | 254 | { |
254 | transmit_message (ch, GNUNET_TIME_UNIT_ZERO); | 255 | transmit_message (ch); |
255 | } | 256 | } |
256 | else | 257 | else |
257 | { | 258 | { |
@@ -321,6 +322,10 @@ message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash, | |||
321 | const struct GNUNET_MessageHeader *msg = cls; | 322 | const struct GNUNET_MessageHeader *msg = cls; |
322 | struct Channel *ch = chan; | 323 | struct Channel *ch = chan; |
323 | 324 | ||
325 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
326 | "Sending message of type %u and size %u to client 0x%zx.\n", | ||
327 | ntohs (msg->type), ntohs (msg->size), ch->client); | ||
328 | |||
324 | GNUNET_SERVER_notification_context_add (nc, ch->client); | 329 | GNUNET_SERVER_notification_context_add (nc, ch->client); |
325 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO); | 330 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO); |
326 | 331 | ||
@@ -363,24 +368,6 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | |||
363 | GNUNET_MULTICAST_MessageHeader *) msg, | 368 | GNUNET_MULTICAST_MessageHeader *) msg, |
364 | 0, NULL, NULL); | 369 | 0, NULL, NULL); |
365 | 370 | ||
366 | uint16_t size = ntohs (msg->size); | ||
367 | uint16_t psize = 0; | ||
368 | uint16_t pos = 0; | ||
369 | |||
370 | for (pos = 0; sizeof (*msg) + pos < size; pos += psize) | ||
371 | { | ||
372 | const struct GNUNET_MessageHeader *pmsg | ||
373 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | ||
374 | uint16_t psize = ntohs (pmsg->size); | ||
375 | if (sizeof (*msg) + pos + psize > size) | ||
376 | { | ||
377 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
378 | "Message received from multicast contains invalid PSYC " | ||
379 | "message. Not sending to clients.\n"); | ||
380 | return; | ||
381 | } | ||
382 | } | ||
383 | |||
384 | #if TODO | 371 | #if TODO |
385 | /* FIXME: apply modifiers to state in PSYCstore */ | 372 | /* FIXME: apply modifiers to state in PSYCstore */ |
386 | GNUNET_PSYCSTORE_state_modify (store, chan_key, | 373 | GNUNET_PSYCSTORE_state_modify (store, chan_key, |
@@ -393,6 +380,26 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | |||
393 | = (const struct GNUNET_MULTICAST_MessageHeader *) msg; | 380 | = (const struct GNUNET_MULTICAST_MessageHeader *) msg; |
394 | struct GNUNET_PSYC_MessageHeader *pmsg; | 381 | struct GNUNET_PSYC_MessageHeader *pmsg; |
395 | 382 | ||
383 | uint16_t size = ntohs (msg->size); | ||
384 | uint16_t psize = 0; | ||
385 | uint16_t pos = 0; | ||
386 | |||
387 | for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize) | ||
388 | { | ||
389 | const struct GNUNET_MessageHeader *pmsg | ||
390 | = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos); | ||
391 | psize = ntohs (pmsg->size); | ||
392 | if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size) | ||
393 | { | ||
394 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
395 | "Received invalid message part of type %u and size %u " | ||
396 | "from multicast. Not sending to clients.\n", | ||
397 | ntohs (pmsg->type), psize); | ||
398 | GNUNET_break_op (0); | ||
399 | return; | ||
400 | } | ||
401 | } | ||
402 | |||
396 | psize = sizeof (*pmsg) + size - sizeof (*mmsg); | 403 | psize = sizeof (*pmsg) + size - sizeof (*mmsg); |
397 | pmsg = GNUNET_malloc (psize); | 404 | pmsg = GNUNET_malloc (psize); |
398 | pmsg->header.size = htons (psize); | 405 | pmsg->header.size = htons (psize); |
@@ -572,19 +579,18 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
572 | 579 | ||
573 | 580 | ||
574 | /** | 581 | /** |
575 | * Send transmission acknowledgement to a client. | 582 | * Send acknowledgement to a client. |
576 | * | 583 | * |
577 | * Sent after the last GNUNET_PSYC_MessageModifier and after each | 584 | * Sent after a message fragment has been passed on to multicast. |
578 | * GNUNET_PSYC_MessageData. | ||
579 | * | 585 | * |
580 | * @param ch The channel struct for the client. | 586 | * @param ch The channel struct for the client. |
581 | */ | 587 | */ |
582 | static void | 588 | static void |
583 | send_transmit_ack (struct Channel *ch) | 589 | send_message_ack (struct Channel *ch) |
584 | { | 590 | { |
585 | struct GNUNET_MessageHeader res; | 591 | struct GNUNET_MessageHeader res; |
586 | res.size = htons (sizeof (res)); | 592 | res.size = htons (sizeof (res)); |
587 | res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); | 593 | res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK); |
588 | 594 | ||
589 | GNUNET_SERVER_notification_context_add (nc, ch->client); | 595 | GNUNET_SERVER_notification_context_add (nc, ch->client); |
590 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, | 596 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, |
@@ -599,9 +605,9 @@ static int | |||
599 | transmit_notify (void *cls, size_t *data_size, void *data) | 605 | transmit_notify (void *cls, size_t *data_size, void *data) |
600 | { | 606 | { |
601 | struct Channel *ch = cls; | 607 | struct Channel *ch = cls; |
602 | struct TransmitMessage *msg = ch->tmit_head; | 608 | struct TransmitMessage *tmit_msg = ch->tmit_head; |
603 | 609 | ||
604 | if (NULL == msg || *data_size < msg->size) | 610 | if (NULL == tmit_msg || *data_size < tmit_msg->size) |
605 | { | 611 | { |
606 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n"); | 612 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n"); |
607 | *data_size = 0; | 613 | *data_size = 0; |
@@ -609,21 +615,22 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
609 | } | 615 | } |
610 | 616 | ||
611 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 617 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
612 | "transmit_notify: sending %u bytes.\n", msg->size); | 618 | "transmit_notify: sending %u bytes.\n", tmit_msg->size); |
613 | 619 | ||
614 | *data_size = msg->size; | 620 | *data_size = tmit_msg->size; |
615 | memcpy (data, msg->buf, *data_size); | 621 | memcpy (data, tmit_msg->buf, *data_size); |
616 | 622 | ||
617 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); | 623 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); |
618 | GNUNET_free (msg); | 624 | GNUNET_free (tmit_msg); |
619 | 625 | ||
620 | int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; | 626 | int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; |
627 | send_message_ack (ch); | ||
621 | 628 | ||
622 | if (0 == ch->tmit_task) | 629 | if (0 == ch->tmit_task) |
623 | { | 630 | { |
624 | if (NULL != ch->tmit_head) | 631 | if (NULL != ch->tmit_head) |
625 | { | 632 | { |
626 | transmit_message (ch, GNUNET_TIME_UNIT_ZERO); | 633 | transmit_message (ch); |
627 | } | 634 | } |
628 | else if (ch->disconnected) | 635 | else if (ch->disconnected) |
629 | { | 636 | { |
@@ -640,11 +647,9 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
640 | * Transmit a message from a channel master to the multicast group. | 647 | * Transmit a message from a channel master to the multicast group. |
641 | */ | 648 | */ |
642 | static void | 649 | static void |
643 | master_transmit_message (void *cls, | 650 | master_transmit_message (struct Master *mst) |
644 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
645 | { | 651 | { |
646 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n"); | 652 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n"); |
647 | struct Master *mst = cls; | ||
648 | mst->channel.tmit_task = 0; | 653 | mst->channel.tmit_task = 0; |
649 | if (NULL == mst->tmit_handle) | 654 | if (NULL == mst->tmit_handle) |
650 | { | 655 | { |
@@ -664,10 +669,8 @@ master_transmit_message (void *cls, | |||
664 | * Transmit a message from a channel slave to the multicast group. | 669 | * Transmit a message from a channel slave to the multicast group. |
665 | */ | 670 | */ |
666 | static void | 671 | static void |
667 | slave_transmit_message (void *cls, | 672 | slave_transmit_message (struct Slave *slv) |
668 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
669 | { | 673 | { |
670 | struct Slave *slv = cls; | ||
671 | slv->channel.tmit_task = 0; | 674 | slv->channel.tmit_task = 0; |
672 | if (NULL == slv->tmit_handle) | 675 | if (NULL == slv->tmit_handle) |
673 | { | 676 | { |
@@ -682,214 +685,85 @@ slave_transmit_message (void *cls, | |||
682 | } | 685 | } |
683 | 686 | ||
684 | 687 | ||
685 | /** | 688 | static inline void |
686 | * Schedule message transmission from a channel to the multicast group. | 689 | transmit_message (struct Channel *ch) |
687 | * | ||
688 | * @param ch The channel. | ||
689 | * @param delay Transmission delay. | ||
690 | */ | ||
691 | static void | ||
692 | transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay) | ||
693 | { | 690 | { |
694 | if (0 != ch->tmit_task) | 691 | ch->is_master |
695 | GNUNET_SCHEDULER_cancel (ch->tmit_task); | 692 | ? master_transmit_message ((struct Master *) ch) |
696 | 693 | : slave_transmit_message ((struct Slave *) ch); | |
697 | ch->tmit_task | ||
698 | = ch->is_master | ||
699 | ? GNUNET_SCHEDULER_add_delayed (delay, master_transmit_message, ch) | ||
700 | : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch); | ||
701 | } | ||
702 | |||
703 | |||
704 | /** | ||
705 | * Queue incoming message parts from a client for transmission, and send them to | ||
706 | * the multicast group when the buffer is full or reached the end of message. | ||
707 | * | ||
708 | * @param ch Channel struct for the client. | ||
709 | * @param msg Message from the client. | ||
710 | * | ||
711 | * @return #GNUNET_OK on success, else #GNUNET_SYSERR. | ||
712 | */ | ||
713 | static int | ||
714 | queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) | ||
715 | { | ||
716 | uint16_t size = ntohs (msg->size); | ||
717 | struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO; | ||
718 | struct TransmitMessage *tmit_msg = ch->tmit_tail; | ||
719 | |||
720 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
721 | "Queueing message of type %u and size %u " | ||
722 | "for transmission to multicast.\n", | ||
723 | ntohs (msg->type), size); | ||
724 | |||
725 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size) | ||
726 | return GNUNET_SYSERR; | ||
727 | |||
728 | if (NULL == tmit_msg | ||
729 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit_msg->size + size) | ||
730 | { | ||
731 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
732 | "Appending message to new buffer.\n"); | ||
733 | /* Start filling up new buffer */ | ||
734 | tmit_msg = GNUNET_new (struct TransmitMessage); | ||
735 | tmit_msg->buf = GNUNET_malloc (size); | ||
736 | memcpy (tmit_msg->buf, msg, size); | ||
737 | tmit_msg->size = size; | ||
738 | tmit_msg->state = ch->tmit_state; | ||
739 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); | ||
740 | } | ||
741 | else | ||
742 | { | ||
743 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
744 | "Appending message to existing buffer.\n"); | ||
745 | /* Append to existing buffer */ | ||
746 | tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size); | ||
747 | memcpy (tmit_msg->buf + tmit_msg->size, msg, size); | ||
748 | tmit_msg->size += size; | ||
749 | tmit_msg->state = ch->tmit_state; | ||
750 | } | ||
751 | |||
752 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size); | ||
753 | |||
754 | /* Wait a bit for the remaining message parts from the client | ||
755 | if there's still some space left in the buffer. */ | ||
756 | if (tmit_msg->state < MSG_STATE_END | ||
757 | && (tmit_msg->size + sizeof (struct GNUNET_MessageHeader) | ||
758 | < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD)) | ||
759 | { | ||
760 | tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2); | ||
761 | } | ||
762 | else | ||
763 | { | ||
764 | send_transmit_ack (ch); | ||
765 | } | ||
766 | |||
767 | transmit_message (ch, tmit_delay); | ||
768 | |||
769 | return GNUNET_OK; | ||
770 | } | 694 | } |
771 | 695 | ||
772 | 696 | ||
773 | static void | 697 | static void |
774 | transmit_error (struct Channel *ch) | 698 | transmit_error (struct Channel *ch) |
775 | { | 699 | { |
776 | struct GNUNET_MessageHeader *msg = GNUNET_malloc (sizeof (*msg)); | 700 | struct GNUNET_MessageHeader *msg; |
701 | struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) | ||
702 | + sizeof (*msg)); | ||
703 | msg = (struct GNUNET_MessageHeader *) &tmit_msg[1]; | ||
777 | msg->size = ntohs (sizeof (*msg)); | 704 | msg->size = ntohs (sizeof (*msg)); |
778 | msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 705 | msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
779 | queue_message (ch, msg); | ||
780 | 706 | ||
707 | tmit_msg->buf = (char *) &tmit_msg[1]; | ||
708 | tmit_msg->size = sizeof (*msg); | ||
709 | tmit_msg->state = ch->tmit_state; | ||
710 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); | ||
711 | transmit_message (ch); | ||
712 | |||
713 | /* FIXME: cleanup */ | ||
781 | GNUNET_SERVER_client_disconnect (ch->client); | 714 | GNUNET_SERVER_client_disconnect (ch->client); |
782 | } | 715 | } |
783 | 716 | ||
784 | /** | ||
785 | * Incoming method from a client. | ||
786 | */ | ||
787 | static void | ||
788 | handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, | ||
789 | const struct GNUNET_MessageHeader *msg) | ||
790 | { | ||
791 | /* const struct GNUNET_PSYC_MessageMethod *meth | ||
792 | = (const struct GNUNET_PSYC_MessageMethod *) msg; */ | ||
793 | struct Channel *ch | ||
794 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | ||
795 | GNUNET_assert (NULL != ch); | ||
796 | |||
797 | if (MSG_STATE_START != ch->tmit_state) | ||
798 | { | ||
799 | transmit_error (ch); | ||
800 | return; | ||
801 | } | ||
802 | ch->tmit_state = MSG_STATE_METHOD; | ||
803 | |||
804 | queue_message (ch, msg); | ||
805 | send_transmit_ack (ch); | ||
806 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
807 | }; | ||
808 | |||
809 | |||
810 | /** | ||
811 | * Incoming modifier from a client. | ||
812 | */ | ||
813 | static void | ||
814 | handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, | ||
815 | const struct GNUNET_MessageHeader *msg) | ||
816 | { | ||
817 | const struct GNUNET_PSYC_MessageModifier *mod | ||
818 | = (const struct GNUNET_PSYC_MessageModifier *) msg; | ||
819 | |||
820 | struct Channel *ch | ||
821 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | ||
822 | GNUNET_assert (NULL != ch); | ||
823 | |||
824 | if (MSG_STATE_METHOD != ch->tmit_state | ||
825 | || MSG_STATE_MODIFIER != ch->tmit_state | ||
826 | || MSG_STATE_MOD_CONT != ch->tmit_state | ||
827 | || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size) | ||
828 | { | ||
829 | transmit_error (ch); | ||
830 | return; | ||
831 | } | ||
832 | ch->tmit_mod_value_size_expected = ntohl (mod->value_size); | ||
833 | ch->tmit_mod_value_size = ntohs (msg->size) - ntohs(mod->name_size) - 1; | ||
834 | |||
835 | queue_message (ch, msg); | ||
836 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
837 | }; | ||
838 | |||
839 | 717 | ||
840 | /** | 718 | /** |
841 | * Incoming modifier from a client. | 719 | * Incoming message from a client. |
842 | */ | 720 | */ |
843 | static void | 721 | static void |
844 | handle_transmit_mod_cont (void *cls, struct GNUNET_SERVER_Client *client, | 722 | handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, |
845 | const struct GNUNET_MessageHeader *msg) | 723 | const struct GNUNET_MessageHeader *msg) |
846 | { | 724 | { |
847 | struct Channel *ch | 725 | struct Channel *ch |
848 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 726 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
849 | GNUNET_assert (NULL != ch); | 727 | GNUNET_assert (NULL != ch); |
850 | 728 | ||
851 | ch->tmit_mod_value_size += ntohs (msg->size); | 729 | uint16_t size = ntohs (msg->size); |
730 | uint16_t psize = 0, pos = 0; | ||
852 | 731 | ||
853 | if (MSG_STATE_MODIFIER != ch->tmit_state | 732 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) |
854 | || MSG_STATE_MOD_CONT != ch->tmit_state | ||
855 | || ch->tmit_mod_value_size_expected < ch->tmit_mod_value_size) | ||
856 | { | 733 | { |
734 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n"); | ||
735 | GNUNET_break (0); | ||
857 | transmit_error (ch); | 736 | transmit_error (ch); |
858 | return; | 737 | return; |
859 | } | 738 | } |
860 | ch->tmit_state = MSG_STATE_MOD_CONT; | ||
861 | |||
862 | queue_message (ch, msg); | ||
863 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
864 | }; | ||
865 | 739 | ||
866 | 740 | for (pos = 0; sizeof (*msg) + pos < size; pos += psize) | |
867 | /** | ||
868 | * Incoming data from a client. | ||
869 | */ | ||
870 | static void | ||
871 | handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, | ||
872 | const struct GNUNET_MessageHeader *msg) | ||
873 | { | ||
874 | struct Channel *ch | ||
875 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | ||
876 | GNUNET_assert (NULL != ch); | ||
877 | |||
878 | if (MSG_STATE_METHOD != ch->tmit_state | ||
879 | || MSG_STATE_MODIFIER != ch->tmit_state | ||
880 | || MSG_STATE_MOD_CONT != ch->tmit_state | ||
881 | || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size) | ||
882 | { | 741 | { |
883 | transmit_error (ch); | 742 | const struct GNUNET_MessageHeader *pmsg |
884 | return; | 743 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); |
744 | psize = ntohs (pmsg->size); | ||
745 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
746 | "Received message part of type %u and size %u " | ||
747 | "from client.\n", ntohs (pmsg->type), psize); | ||
748 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) | ||
749 | { | ||
750 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
751 | "Received invalid message part of type %u and size %u " | ||
752 | "from client.\n", ntohs (pmsg->type), psize); | ||
753 | GNUNET_break (0); | ||
754 | transmit_error (ch); | ||
755 | return; | ||
756 | } | ||
885 | } | 757 | } |
886 | ch->tmit_state = MSG_STATE_DATA; | ||
887 | 758 | ||
888 | queue_message (ch, msg); | 759 | size -= sizeof (*msg); |
889 | send_transmit_ack (ch); | 760 | struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size); |
890 | 761 | tmit_msg->buf = (char *) &tmit_msg[1]; | |
891 | if (MSG_STATE_END <= ch->tmit_state) | 762 | memcpy (tmit_msg->buf, &msg[1], size); |
892 | ch->tmit_state = MSG_STATE_START; | 763 | tmit_msg->size = size; |
764 | tmit_msg->state = ch->tmit_state; | ||
765 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); | ||
766 | transmit_message (ch); | ||
893 | 767 | ||
894 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 768 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
895 | }; | 769 | }; |
@@ -912,22 +786,9 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
912 | 786 | ||
913 | { &handle_slave_join, NULL, | 787 | { &handle_slave_join, NULL, |
914 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, | 788 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, |
915 | #if TODO | 789 | |
916 | { &handle_psyc_message, NULL, | 790 | { &handle_psyc_message, NULL, |
917 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, | 791 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, |
918 | #endif | ||
919 | { &handle_transmit_method, NULL, | ||
920 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 }, | ||
921 | |||
922 | { &handle_transmit_modifier, NULL, | ||
923 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 }, | ||
924 | |||
925 | { &handle_transmit_mod_cont, NULL, | ||
926 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT, 0 }, | ||
927 | |||
928 | { &handle_transmit_data, NULL, | ||
929 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 }, | ||
930 | { NULL, NULL, 0, 0 } | ||
931 | }; | 792 | }; |
932 | 793 | ||
933 | cfg = c; | 794 | cfg = c; |
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index a5a01fa92..e904e00b5 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -45,7 +45,7 @@ struct OperationHandle | |||
45 | { | 45 | { |
46 | struct OperationHandle *prev; | 46 | struct OperationHandle *prev; |
47 | struct OperationHandle *next; | 47 | struct OperationHandle *next; |
48 | const struct GNUNET_MessageHeader *msg; | 48 | struct GNUNET_MessageHeader *msg; |
49 | }; | 49 | }; |
50 | 50 | ||
51 | /** | 51 | /** |
@@ -79,6 +79,11 @@ struct GNUNET_PSYC_Channel | |||
79 | struct OperationHandle *tmit_tail; | 79 | struct OperationHandle *tmit_tail; |
80 | 80 | ||
81 | /** | 81 | /** |
82 | * Message being transmitted to the PSYC service. | ||
83 | */ | ||
84 | struct OperationHandle *tmit_msg; | ||
85 | |||
86 | /** | ||
82 | * Message to send on reconnect. | 87 | * Message to send on reconnect. |
83 | */ | 88 | */ |
84 | struct GNUNET_MessageHeader *reconnect_msg; | 89 | struct GNUNET_MessageHeader *reconnect_msg; |
@@ -139,11 +144,6 @@ struct GNUNET_PSYC_Channel | |||
139 | uint32_t recv_mod_value_size; | 144 | uint32_t recv_mod_value_size; |
140 | 145 | ||
141 | /** | 146 | /** |
142 | * Buffer space available for transmitting the next data fragment. | ||
143 | */ | ||
144 | uint16_t tmit_size; // FIXME | ||
145 | |||
146 | /** | ||
147 | * Is transmission paused? | 147 | * Is transmission paused? |
148 | */ | 148 | */ |
149 | uint8_t tmit_paused; | 149 | uint8_t tmit_paused; |
@@ -151,7 +151,7 @@ struct GNUNET_PSYC_Channel | |||
151 | /** | 151 | /** |
152 | * Are we still waiting for a PSYC_TRANSMIT_ACK? | 152 | * Are we still waiting for a PSYC_TRANSMIT_ACK? |
153 | */ | 153 | */ |
154 | uint8_t tmit_ack_pending; // FIXME | 154 | uint8_t tmit_ack_pending; |
155 | 155 | ||
156 | /** | 156 | /** |
157 | * Are we polling for incoming messages right now? | 157 | * Are we polling for incoming messages right now? |
@@ -176,7 +176,7 @@ struct GNUNET_PSYC_Channel | |||
176 | struct GNUNET_PSYC_MasterTransmitHandle | 176 | struct GNUNET_PSYC_MasterTransmitHandle |
177 | { | 177 | { |
178 | struct GNUNET_PSYC_Master *master; | 178 | struct GNUNET_PSYC_Master *master; |
179 | GNUNET_PSYC_MasterTransmitNotify notify_mod; | 179 | GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod; |
180 | GNUNET_PSYC_MasterTransmitNotify notify_data; | 180 | GNUNET_PSYC_MasterTransmitNotify notify_data; |
181 | void *notify_cls; | 181 | void *notify_cls; |
182 | enum MessageState state; | 182 | enum MessageState state; |
@@ -246,16 +246,14 @@ struct GNUNET_PSYC_StateQuery | |||
246 | }; | 246 | }; |
247 | 247 | ||
248 | 248 | ||
249 | /** | ||
250 | * Try again to connect to the PSYC service. | ||
251 | * | ||
252 | * @param cls Handle to the PSYC service. | ||
253 | * @param tc Scheduler context | ||
254 | */ | ||
255 | static void | 249 | static void |
256 | reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | 250 | reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); |
257 | 251 | ||
258 | 252 | ||
253 | static void | ||
254 | master_transmit_data (struct GNUNET_PSYC_Master *mst); | ||
255 | |||
256 | |||
259 | /** | 257 | /** |
260 | * Reschedule a connect attempt to the service. | 258 | * Reschedule a connect attempt to the service. |
261 | * | 259 | * |
@@ -323,6 +321,79 @@ recv_error (struct GNUNET_PSYC_Channel *ch) | |||
323 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); | 321 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); |
324 | } | 322 | } |
325 | 323 | ||
324 | |||
325 | /** | ||
326 | * Queue an incoming message part for transmission to the PSYC service. | ||
327 | * | ||
328 | * The message part is added to the current message buffer. | ||
329 | * When this buffer is full, it is added to the transmission queue. | ||
330 | * | ||
331 | * @param ch Channel struct for the client. | ||
332 | * @param msg Modifier message part, or NULL when there's no more modifiers. | ||
333 | * @param end End of message. | ||
334 | */ | ||
335 | static void | ||
336 | queue_message (struct GNUNET_PSYC_Channel *ch, | ||
337 | const struct GNUNET_MessageHeader *msg, | ||
338 | uint8_t end) | ||
339 | { | ||
340 | uint16_t size = msg ? ntohs (msg->size) : 0; | ||
341 | |||
342 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
343 | "Queueing message of type %u and size %u (end: %u)).\n", | ||
344 | ntohs (msg->type), size, end); | ||
345 | |||
346 | struct OperationHandle *op = ch->tmit_msg; | ||
347 | if (NULL != op) | ||
348 | { | ||
349 | if (NULL == msg | ||
350 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size) | ||
351 | { | ||
352 | /* End of message or buffer is full, add it to transmission queue | ||
353 | * and start with empty buffer */ | ||
354 | op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
355 | op->msg->size = htons (op->msg->size); | ||
356 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
357 | ch->tmit_msg = op = NULL; | ||
358 | ch->tmit_ack_pending++; | ||
359 | } | ||
360 | else | ||
361 | { | ||
362 | /* Message fits in current buffer, append */ | ||
363 | ch->tmit_msg = op | ||
364 | = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size); | ||
365 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
366 | memcpy ((char *) op->msg + op->msg->size, msg, size); | ||
367 | op->msg->size += size; | ||
368 | } | ||
369 | } | ||
370 | |||
371 | if (NULL == op && NULL != msg) | ||
372 | { | ||
373 | /* Empty buffer, copy over message. */ | ||
374 | ch->tmit_msg = op | ||
375 | = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size); | ||
376 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
377 | op->msg->size = sizeof (*op->msg) + size; | ||
378 | memcpy (&op->msg[1], msg, size); | ||
379 | } | ||
380 | |||
381 | if (NULL != op | ||
382 | && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD | ||
383 | < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) | ||
384 | { | ||
385 | /* End of message or buffer is full, add it to transmission queue. */ | ||
386 | op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
387 | op->msg->size = htons (op->msg->size); | ||
388 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
389 | ch->tmit_msg = op = NULL; | ||
390 | ch->tmit_ack_pending++; | ||
391 | } | ||
392 | |||
393 | transmit_next (ch); | ||
394 | } | ||
395 | |||
396 | |||
326 | /** | 397 | /** |
327 | * Request a modifier from a client to transmit. | 398 | * Request a modifier from a client to transmit. |
328 | * | 399 | * |
@@ -332,32 +403,71 @@ static void | |||
332 | master_transmit_mod (struct GNUNET_PSYC_Master *mst) | 403 | master_transmit_mod (struct GNUNET_PSYC_Master *mst) |
333 | { | 404 | { |
334 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | 405 | struct GNUNET_PSYC_Channel *ch = &mst->ch; |
335 | uint16_t max_data_size | 406 | uint16_t max_data_size, data_size; |
336 | = ch->tmit_size > sizeof (struct GNUNET_MessageHeader) | 407 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; |
337 | ? GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - ch->tmit_size | 408 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; |
338 | : GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD - ch->tmit_size; | 409 | int notify_ret; |
339 | uint16_t data_size = max_data_size; | ||
340 | 410 | ||
341 | struct GNUNET_MessageHeader *msg; | 411 | switch (mst->tmit->state) |
342 | struct OperationHandle *op | 412 | { |
343 | = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); | 413 | case MSG_STATE_MODIFIER: |
344 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | 414 | { |
345 | msg->type | 415 | struct GNUNET_PSYC_MessageModifier *mod |
346 | = MSG_STATE_MODIFIER == mst->tmit->state | 416 | = (struct GNUNET_PSYC_MessageModifier *) msg; |
347 | ? htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER) | 417 | max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; |
348 | : htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | 418 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); |
419 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
420 | notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, | ||
421 | &data_size, &mod[1], &mod->oper); | ||
422 | mod->name_size = strnlen ((char *) &mod[1], data_size); | ||
423 | if (mod->name_size < data_size) | ||
424 | { | ||
425 | mod->oper = htons (mod->oper); | ||
426 | mod->value_size = htons (data_size - 1 - mod->name_size); | ||
427 | mod->name_size = htons (mod->name_size); | ||
428 | } | ||
429 | else if (0 < data_size) | ||
430 | { | ||
431 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n"); | ||
432 | notify_ret = GNUNET_SYSERR; | ||
433 | } | ||
434 | break; | ||
435 | } | ||
436 | case MSG_STATE_MOD_CONT: | ||
437 | { | ||
438 | max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; | ||
439 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | ||
440 | msg->size = sizeof (struct GNUNET_MessageHeader); | ||
441 | notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, | ||
442 | &data_size, &msg[1], NULL); | ||
443 | break; | ||
444 | } | ||
445 | default: | ||
446 | GNUNET_assert (0); | ||
447 | } | ||
349 | 448 | ||
350 | int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, | ||
351 | &data_size, &msg[1]); | ||
352 | switch (notify_ret) | 449 | switch (notify_ret) |
353 | { | 450 | { |
354 | case GNUNET_NO: | 451 | case GNUNET_NO: |
355 | if (0 != data_size) | 452 | if (0 == data_size) |
356 | mst->tmit->state = MSG_STATE_MOD_CONT; | 453 | { /* Transmission paused, nothing to send. */ |
454 | ch->tmit_paused = GNUNET_YES; | ||
455 | return; | ||
456 | } | ||
457 | mst->tmit->state = MSG_STATE_MOD_CONT; | ||
357 | break; | 458 | break; |
358 | 459 | ||
359 | case GNUNET_YES: | 460 | case GNUNET_YES: |
360 | mst->tmit->state = (0 == data_size) ? MSG_STATE_DATA : MSG_STATE_MODIFIER; | 461 | if (0 == data_size) |
462 | { | ||
463 | /* End of modifiers. */ | ||
464 | mst->tmit->state = MSG_STATE_DATA; | ||
465 | if (0 == ch->tmit_ack_pending) | ||
466 | master_transmit_data (mst); | ||
467 | |||
468 | return; | ||
469 | } | ||
470 | mst->tmit->state = MSG_STATE_MODIFIER; | ||
361 | break; | 471 | break; |
362 | 472 | ||
363 | default: | 473 | default: |
@@ -368,36 +478,18 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) | |||
368 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 478 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
369 | msg->size = htons (sizeof (*msg)); | 479 | msg->size = htons (sizeof (*msg)); |
370 | 480 | ||
371 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 481 | queue_message (ch, msg, GNUNET_YES); |
372 | transmit_next (ch); | ||
373 | return; | 482 | return; |
374 | } | 483 | } |
375 | 484 | ||
376 | if ((GNUNET_NO == notify_ret && 0 == data_size)) | ||
377 | { | ||
378 | /* Transmission paused, nothing to send. */ | ||
379 | ch->tmit_paused = GNUNET_YES; | ||
380 | GNUNET_free (op); | ||
381 | } | ||
382 | |||
383 | if (0 < data_size) | 485 | if (0 < data_size) |
384 | { | 486 | { |
385 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); | 487 | GNUNET_assert (data_size <= max_data_size); |
386 | msg->size = htons (sizeof (*msg) + data_size); | 488 | msg->size = htons (msg->size + data_size); |
387 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 489 | queue_message (ch, msg, GNUNET_NO); |
388 | } | ||
389 | |||
390 | /* End of message. */ | ||
391 | if (GNUNET_YES == notify_ret) | ||
392 | { | ||
393 | op = GNUNET_malloc (sizeof *(op) + sizeof (*msg)); | ||
394 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
395 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | ||
396 | msg->size = htons (sizeof (*msg)); | ||
397 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
398 | } | 490 | } |
399 | 491 | ||
400 | transmit_next (ch); | 492 | master_transmit_mod (mst); |
401 | } | 493 | } |
402 | 494 | ||
403 | 495 | ||
@@ -410,11 +502,10 @@ static void | |||
410 | master_transmit_data (struct GNUNET_PSYC_Master *mst) | 502 | master_transmit_data (struct GNUNET_PSYC_Master *mst) |
411 | { | 503 | { |
412 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | 504 | struct GNUNET_PSYC_Channel *ch = &mst->ch; |
413 | struct GNUNET_MessageHeader *msg; | ||
414 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | 505 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; |
415 | struct OperationHandle *op | 506 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; |
416 | = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); | 507 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; |
417 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | 508 | |
418 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | 509 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); |
419 | 510 | ||
420 | int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, | 511 | int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, |
@@ -426,7 +517,7 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
426 | { | 517 | { |
427 | /* Transmission paused, nothing to send. */ | 518 | /* Transmission paused, nothing to send. */ |
428 | ch->tmit_paused = GNUNET_YES; | 519 | ch->tmit_paused = GNUNET_YES; |
429 | GNUNET_free (op); | 520 | return; |
430 | } | 521 | } |
431 | break; | 522 | break; |
432 | 523 | ||
@@ -441,9 +532,7 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
441 | mst->tmit->state = MSG_STATE_START; | 532 | mst->tmit->state = MSG_STATE_START; |
442 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 533 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
443 | msg->size = htons (sizeof (*msg)); | 534 | msg->size = htons (sizeof (*msg)); |
444 | 535 | queue_message (ch, msg, GNUNET_YES); | |
445 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
446 | transmit_next (ch); | ||
447 | return; | 536 | return; |
448 | } | 537 | } |
449 | 538 | ||
@@ -451,20 +540,16 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
451 | { | 540 | { |
452 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); | 541 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); |
453 | msg->size = htons (sizeof (*msg) + data_size); | 542 | msg->size = htons (sizeof (*msg) + data_size); |
454 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 543 | queue_message (ch, msg, !notify_ret); |
455 | } | 544 | } |
456 | 545 | ||
457 | /* End of message. */ | 546 | /* End of message. */ |
458 | if (GNUNET_YES == notify_ret) | 547 | if (GNUNET_YES == notify_ret) |
459 | { | 548 | { |
460 | op = GNUNET_malloc (sizeof *(op) + sizeof (*msg)); | ||
461 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
462 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | 549 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); |
463 | msg->size = htons (sizeof (*msg)); | 550 | msg->size = htons (sizeof (*msg)); |
464 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 551 | queue_message (ch, msg, GNUNET_YES); |
465 | } | 552 | } |
466 | |||
467 | transmit_next (ch); | ||
468 | } | 553 | } |
469 | 554 | ||
470 | 555 | ||
@@ -476,57 +561,55 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
476 | */ | 561 | */ |
477 | static void | 562 | static void |
478 | handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | 563 | handle_psyc_message (struct GNUNET_PSYC_Channel *ch, |
479 | const struct GNUNET_PSYC_MessageHeader *pmsg) | 564 | const struct GNUNET_PSYC_MessageHeader *msg) |
480 | { | 565 | { |
481 | const struct GNUNET_MessageHeader *msg; | 566 | uint16_t size = ntohs (msg->header.size); |
482 | uint16_t msize = ntohs (pmsg->header.size); | ||
483 | uint16_t pos = 0; | ||
484 | uint16_t size = 0; | ||
485 | uint16_t type, size_eq, size_min; | ||
486 | 567 | ||
487 | if (MSG_STATE_START == ch->recv_state) | 568 | if (MSG_STATE_START == ch->recv_state) |
488 | { | 569 | { |
489 | ch->recv_message_id = GNUNET_ntohll (pmsg->message_id); | 570 | ch->recv_message_id = GNUNET_ntohll (msg->message_id); |
490 | ch->recv_flags = ntohl (pmsg->flags); | 571 | ch->recv_flags = ntohl (msg->flags); |
491 | } | 572 | } |
492 | else if (GNUNET_ntohll (pmsg->message_id) != ch->recv_message_id) | 573 | else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) |
493 | { | 574 | { |
494 | LOG (GNUNET_ERROR_TYPE_WARNING, | 575 | LOG (GNUNET_ERROR_TYPE_WARNING, |
495 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", | 576 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", |
496 | GNUNET_ntohll (pmsg->message_id), ch->recv_message_id); | 577 | GNUNET_ntohll (msg->message_id), ch->recv_message_id); |
497 | GNUNET_break_op (0); | 578 | GNUNET_break_op (0); |
498 | recv_error (ch); | 579 | recv_error (ch); |
499 | } | 580 | } |
500 | else if (ntohl (pmsg->flags) != ch->recv_flags) | 581 | else if (ntohl (msg->flags) != ch->recv_flags) |
501 | { | 582 | { |
502 | LOG (GNUNET_ERROR_TYPE_WARNING, | 583 | LOG (GNUNET_ERROR_TYPE_WARNING, |
503 | "Unexpected message flags. Got: %lu, expected: %lu\n", | 584 | "Unexpected message flags. Got: %lu, expected: %lu\n", |
504 | ntohl (pmsg->flags), ch->recv_flags); | 585 | ntohl (msg->flags), ch->recv_flags); |
505 | GNUNET_break_op (0); | 586 | GNUNET_break_op (0); |
506 | recv_error (ch); | 587 | recv_error (ch); |
507 | } | 588 | } |
508 | 589 | ||
509 | for (pos = 0; sizeof (*pmsg) + pos < msize; pos += size) | 590 | uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; |
591 | |||
592 | for (pos = 0; sizeof (*msg) + pos < size; pos += psize) | ||
510 | { | 593 | { |
511 | msg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | 594 | const struct GNUNET_MessageHeader *pmsg |
512 | size = ntohs (msg->size); | 595 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); |
513 | type = ntohs (msg->type); | 596 | psize = ntohs (pmsg->size); |
597 | ptype = ntohs (pmsg->type); | ||
514 | size_eq = size_min = 0; | 598 | size_eq = size_min = 0; |
515 | 599 | ||
516 | if (msize < sizeof (*pmsg) + pos + size) | 600 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
601 | "Received message part of type %u and size %u from PSYC.\n", | ||
602 | ptype, psize); | ||
603 | |||
604 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) | ||
517 | { | 605 | { |
518 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 606 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
519 | "Discarding message of type %u with invalid size. " | 607 | "Discarding message of type %u with invalid size %u.\n", |
520 | "(%u < %u + %u + %u)\n", ntohs (msg->type), | 608 | ptype, psize); |
521 | msize, sizeof (*msg), pos, size); | ||
522 | break; | 609 | break; |
523 | } | 610 | } |
524 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
525 | "Received message part of type %u and size %u from PSYC.\n", | ||
526 | ntohs (msg->type), size); | ||
527 | 611 | ||
528 | 612 | switch (ptype) | |
529 | switch (type) | ||
530 | { | 613 | { |
531 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 614 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: |
532 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | 615 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); |
@@ -534,6 +617,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
534 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | 617 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: |
535 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | 618 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); |
536 | break; | 619 | break; |
620 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
537 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | 621 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: |
538 | size_min = sizeof (struct GNUNET_MessageHeader); | 622 | size_min = sizeof (struct GNUNET_MessageHeader); |
539 | break; | 623 | break; |
@@ -543,22 +627,22 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
543 | break; | 627 | break; |
544 | } | 628 | } |
545 | 629 | ||
546 | if (! ((0 < size_eq && size == size_eq) | 630 | if (! ((0 < size_eq && psize == size_eq) |
547 | || (0 < size_min && size_min <= size))) | 631 | || (0 < size_min && size_min <= psize))) |
548 | { | 632 | { |
549 | GNUNET_break (0); | 633 | GNUNET_break (0); |
550 | reschedule_connect (ch); | 634 | reschedule_connect (ch); |
551 | return; | 635 | return; |
552 | } | 636 | } |
553 | 637 | ||
554 | switch (type) | 638 | switch (ptype) |
555 | { | 639 | { |
556 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 640 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: |
557 | { | 641 | { |
558 | struct GNUNET_PSYC_MessageMethod *meth | 642 | struct GNUNET_PSYC_MessageMethod *meth |
559 | = (struct GNUNET_PSYC_MessageMethod *) msg; | 643 | = (struct GNUNET_PSYC_MessageMethod *) pmsg; |
560 | 644 | ||
561 | if (MSG_STATE_HEADER != ch->recv_state) | 645 | if (MSG_STATE_START != ch->recv_state) |
562 | { | 646 | { |
563 | LOG (GNUNET_ERROR_TYPE_WARNING, | 647 | LOG (GNUNET_ERROR_TYPE_WARNING, |
564 | "Discarding out of order message method.\n"); | 648 | "Discarding out of order message method.\n"); |
@@ -568,89 +652,66 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
568 | */ | 652 | */ |
569 | GNUNET_break_op (0); | 653 | GNUNET_break_op (0); |
570 | recv_error (ch); | 654 | recv_error (ch); |
571 | break; | 655 | return; |
572 | } | 656 | } |
573 | 657 | ||
574 | if ('\0' != (char *) meth + msg->size - 1) | 658 | if ('\0' != *((char *) meth + psize - 1)) |
575 | { | 659 | { |
576 | LOG (GNUNET_ERROR_TYPE_WARNING, | 660 | LOG (GNUNET_ERROR_TYPE_WARNING, |
577 | "Discarding message with malformed method. " | 661 | "Discarding message with malformed method. " |
578 | "Message ID: %" PRIu64 "\n", ch->recv_message_id); | 662 | "Message ID: %" PRIu64 "\n", ch->recv_message_id); |
579 | GNUNET_break_op (0); | 663 | GNUNET_break_op (0); |
580 | recv_error (ch); | 664 | recv_error (ch); |
581 | break; | 665 | return; |
582 | } | 666 | } |
583 | GNUNET_PSYC_MessageCallback message_cb | ||
584 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
585 | ? ch->hist_message_cb | ||
586 | : ch->message_cb; | ||
587 | |||
588 | if (NULL != message_cb) | ||
589 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); | ||
590 | |||
591 | ch->recv_state = MSG_STATE_METHOD; | 667 | ch->recv_state = MSG_STATE_METHOD; |
592 | break; | 668 | break; |
593 | } | 669 | } |
594 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | 670 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: |
595 | { | 671 | { |
596 | if (MSG_STATE_MODIFIER != ch->recv_state) | 672 | if (!(MSG_STATE_METHOD == ch->recv_state |
673 | || MSG_STATE_MODIFIER == ch->recv_state | ||
674 | || MSG_STATE_MOD_CONT == ch->recv_state)) | ||
597 | { | 675 | { |
598 | LOG (GNUNET_ERROR_TYPE_WARNING, | 676 | LOG (GNUNET_ERROR_TYPE_WARNING, |
599 | "Discarding out of order message modifier.\n"); | 677 | "Discarding out of order message modifier.\n"); |
600 | GNUNET_break_op (0); | 678 | GNUNET_break_op (0); |
601 | recv_error (ch); | 679 | recv_error (ch); |
602 | break; | 680 | return; |
603 | } | 681 | } |
604 | 682 | ||
605 | struct GNUNET_PSYC_MessageModifier *mod | 683 | struct GNUNET_PSYC_MessageModifier *mod |
606 | = (struct GNUNET_PSYC_MessageModifier *) msg; | 684 | = (struct GNUNET_PSYC_MessageModifier *) pmsg; |
607 | 685 | ||
608 | uint16_t name_size = ntohs (mod->name_size); | 686 | uint16_t name_size = ntohs (mod->name_size); |
609 | ch->recv_mod_value_size_expected = ntohs (mod->value_size); | 687 | ch->recv_mod_value_size_expected = ntohs (mod->value_size); |
610 | ch->recv_mod_value_size = size - sizeof (*mod) - name_size - 1; | 688 | ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; |
611 | 689 | ||
612 | if (size < sizeof (*mod) + name_size + 1 | 690 | if (psize < sizeof (*mod) + name_size + 1 |
613 | || '\0' != (char *) &mod[1] + mod->name_size | 691 | || '\0' != *((char *) &mod[1] + name_size) |
614 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | 692 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) |
615 | { | 693 | { |
616 | LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); | 694 | LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); |
617 | GNUNET_break_op (0); | 695 | GNUNET_break_op (0); |
618 | break; | 696 | return; |
619 | } | 697 | } |
620 | |||
621 | ch->recv_state = MSG_STATE_MODIFIER; | 698 | ch->recv_state = MSG_STATE_MODIFIER; |
622 | |||
623 | GNUNET_PSYC_MessageCallback message_cb | ||
624 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
625 | ? ch->hist_message_cb | ||
626 | : ch->message_cb; | ||
627 | |||
628 | if (NULL != message_cb) | ||
629 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); | ||
630 | |||
631 | break; | 699 | break; |
632 | } | 700 | } |
633 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | 701 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: |
634 | { | 702 | { |
635 | ch->recv_mod_value_size += size - sizeof (*msg); | 703 | ch->recv_mod_value_size += psize - sizeof (*pmsg); |
636 | 704 | ||
637 | if (MSG_STATE_MODIFIER != ch->recv_state | 705 | if (!(MSG_STATE_MODIFIER == ch->recv_state |
706 | || MSG_STATE_MOD_CONT == ch->recv_state) | ||
638 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | 707 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) |
639 | { | 708 | { |
640 | LOG (GNUNET_ERROR_TYPE_WARNING, | 709 | LOG (GNUNET_ERROR_TYPE_WARNING, |
641 | "Discarding out of order message modifier continuation.\n"); | 710 | "Discarding out of order message modifier continuation.\n"); |
642 | GNUNET_break_op (0); | 711 | GNUNET_break_op (0); |
643 | recv_reset (ch); | 712 | recv_reset (ch); |
644 | break; | 713 | return; |
645 | } | 714 | } |
646 | |||
647 | GNUNET_PSYC_MessageCallback message_cb | ||
648 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
649 | ? ch->hist_message_cb | ||
650 | : ch->message_cb; | ||
651 | |||
652 | if (NULL != message_cb) | ||
653 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); | ||
654 | break; | 715 | break; |
655 | } | 716 | } |
656 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | 717 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: |
@@ -662,12 +723,23 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
662 | "Discarding out of order message data fragment.\n"); | 723 | "Discarding out of order message data fragment.\n"); |
663 | GNUNET_break_op (0); | 724 | GNUNET_break_op (0); |
664 | recv_reset (ch); | 725 | recv_reset (ch); |
665 | break; | 726 | return; |
666 | } | 727 | } |
667 | |||
668 | ch->recv_state = MSG_STATE_DATA; | 728 | ch->recv_state = MSG_STATE_DATA; |
669 | break; | 729 | break; |
670 | } | 730 | } |
731 | } | ||
732 | |||
733 | GNUNET_PSYC_MessageCallback message_cb | ||
734 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
735 | ? ch->hist_message_cb | ||
736 | : ch->message_cb; | ||
737 | |||
738 | if (NULL != message_cb) | ||
739 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg); | ||
740 | |||
741 | switch (ptype) | ||
742 | { | ||
671 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | 743 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: |
672 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | 744 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: |
673 | recv_reset (ch); | 745 | recv_reset (ch); |
@@ -717,18 +789,7 @@ message_handler (void *cls, | |||
717 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: | 789 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: |
718 | size_min = sizeof (struct GNUNET_PSYC_MessageHeader); | 790 | size_min = sizeof (struct GNUNET_PSYC_MessageHeader); |
719 | break; | 791 | break; |
720 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 792 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: |
721 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | ||
722 | break; | ||
723 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
724 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
725 | break; | ||
726 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
727 | size_min = sizeof (struct GNUNET_MessageHeader); | ||
728 | break; | ||
729 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
730 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
731 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | ||
732 | size_eq = sizeof (struct GNUNET_MessageHeader); | 793 | size_eq = sizeof (struct GNUNET_MessageHeader); |
733 | break; | 794 | break; |
734 | } | 795 | } |
@@ -761,9 +822,15 @@ message_handler (void *cls, | |||
761 | #endif | 822 | #endif |
762 | break; | 823 | break; |
763 | } | 824 | } |
764 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | 825 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: |
765 | { | 826 | { |
766 | ch->tmit_ack_pending = GNUNET_NO; | 827 | if (0 == ch->tmit_ack_pending) |
828 | { | ||
829 | LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); | ||
830 | GNUNET_break (0); | ||
831 | break; | ||
832 | } | ||
833 | ch->tmit_ack_pending--; | ||
767 | 834 | ||
768 | if (ch->is_master) | 835 | if (ch->is_master) |
769 | { | 836 | { |
@@ -771,10 +838,6 @@ message_handler (void *cls, | |||
771 | switch (mst->tmit->state) | 838 | switch (mst->tmit->state) |
772 | { | 839 | { |
773 | case MSG_STATE_MODIFIER: | 840 | case MSG_STATE_MODIFIER: |
774 | if (GNUNET_NO == ch->tmit_paused) | ||
775 | master_transmit_mod (mst); | ||
776 | break; | ||
777 | |||
778 | case MSG_STATE_MOD_CONT: | 841 | case MSG_STATE_MOD_CONT: |
779 | if (GNUNET_NO == ch->tmit_paused) | 842 | if (GNUNET_NO == ch->tmit_paused) |
780 | master_transmit_mod (mst); | 843 | master_transmit_mod (mst); |
@@ -795,12 +858,13 @@ message_handler (void *cls, | |||
795 | else | 858 | else |
796 | { | 859 | { |
797 | LOG (GNUNET_ERROR_TYPE_WARNING, | 860 | LOG (GNUNET_ERROR_TYPE_WARNING, |
798 | "Ignoring transmit ack, there's no transmission going on.\n"); | 861 | "Ignoring message ACK, there's no transmission going on.\n"); |
862 | GNUNET_break (0); | ||
799 | } | 863 | } |
800 | break; | 864 | break; |
801 | default: | 865 | default: |
802 | LOG (GNUNET_ERROR_TYPE_WARNING, | 866 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
803 | "Ignoring unexpected transmit ack.\n"); | 867 | "Ignoring message ACK in state %u.\n", mst->tmit->state); |
804 | } | 868 | } |
805 | } | 869 | } |
806 | else | 870 | else |
@@ -811,12 +875,15 @@ message_handler (void *cls, | |||
811 | } | 875 | } |
812 | 876 | ||
813 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: | 877 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: |
814 | handle_psyc_message(ch, (const struct GNUNET_PSYC_MessageHeader *) msg); | 878 | handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg); |
815 | break; | 879 | break; |
816 | } | 880 | } |
817 | 881 | ||
818 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, | 882 | if (NULL != ch->client) |
819 | GNUNET_TIME_UNIT_FOREVER_REL); | 883 | { |
884 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, | ||
885 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
886 | } | ||
820 | } | 887 | } |
821 | 888 | ||
822 | 889 | ||
@@ -1029,6 +1096,8 @@ void | |||
1029 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) | 1096 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) |
1030 | { | 1097 | { |
1031 | disconnect (master); | 1098 | disconnect (master); |
1099 | if (NULL != master->tmit) | ||
1100 | GNUNET_free (master->tmit); | ||
1032 | GNUNET_free (master); | 1101 | GNUNET_free (master); |
1033 | } | 1102 | } |
1034 | 1103 | ||
@@ -1069,30 +1138,6 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1069 | } | 1138 | } |
1070 | 1139 | ||
1071 | 1140 | ||
1072 | /* FIXME: split up value into <64K chunks and transmit the continuations in | ||
1073 | * MOD_CONT msgs */ | ||
1074 | static int | ||
1075 | send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) | ||
1076 | { | ||
1077 | struct GNUNET_PSYC_Channel *ch = cls; | ||
1078 | size_t name_size = strlen (mod->name) + 1; | ||
1079 | struct GNUNET_PSYC_MessageModifier *pmod; | ||
1080 | struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmod) | ||
1081 | + name_size + mod->value_size); | ||
1082 | pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; | ||
1083 | op->msg = (struct GNUNET_MessageHeader *) pmod; | ||
1084 | |||
1085 | pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | ||
1086 | pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size); | ||
1087 | pmod->name_size = htons (name_size); | ||
1088 | memcpy (&pmod[1], mod->name, name_size); | ||
1089 | memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); | ||
1090 | |||
1091 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
1092 | return GNUNET_YES; | ||
1093 | } | ||
1094 | |||
1095 | |||
1096 | /** | 1141 | /** |
1097 | * Send a message to call a method to all members in the PSYC channel. | 1142 | * Send a message to call a method to all members in the PSYC channel. |
1098 | * | 1143 | * |
@@ -1107,7 +1152,7 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) | |||
1107 | struct GNUNET_PSYC_MasterTransmitHandle * | 1152 | struct GNUNET_PSYC_MasterTransmitHandle * |
1108 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | 1153 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, |
1109 | const char *method_name, | 1154 | const char *method_name, |
1110 | GNUNET_PSYC_MasterTransmitNotify notify_mod, | 1155 | GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod, |
1111 | GNUNET_PSYC_MasterTransmitNotify notify_data, | 1156 | GNUNET_PSYC_MasterTransmitNotify notify_data, |
1112 | void *notify_cls, | 1157 | void *notify_cls, |
1113 | enum GNUNET_PSYC_MasterTransmitFlags flags) | 1158 | enum GNUNET_PSYC_MasterTransmitFlags flags) |
@@ -1120,25 +1165,27 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | |||
1120 | 1165 | ||
1121 | size_t size = strlen (method_name) + 1; | 1166 | size_t size = strlen (method_name) + 1; |
1122 | struct GNUNET_PSYC_MessageMethod *pmeth; | 1167 | struct GNUNET_PSYC_MessageMethod *pmeth; |
1123 | struct OperationHandle *op | 1168 | struct OperationHandle *op; |
1124 | = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size); | ||
1125 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1]; | ||
1126 | op->msg = (struct GNUNET_MessageHeader *) pmeth; | ||
1127 | 1169 | ||
1170 | ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) | ||
1171 | + sizeof (*pmeth) + size); | ||
1172 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
1173 | op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size; | ||
1174 | |||
1175 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1]; | ||
1128 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | 1176 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); |
1129 | pmeth->header.size = htons (sizeof (*pmeth) + size); | 1177 | pmeth->header.size = htons (sizeof (*pmeth) + size); |
1130 | pmeth->flags = htonl (flags); | 1178 | pmeth->flags = htonl (flags); |
1131 | memcpy (&pmeth[1], method_name, size); | 1179 | memcpy (&pmeth[1], method_name, size); |
1132 | 1180 | ||
1133 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
1134 | transmit_next (ch); | ||
1135 | |||
1136 | master->tmit = GNUNET_malloc (sizeof (*master->tmit)); | 1181 | master->tmit = GNUNET_malloc (sizeof (*master->tmit)); |
1137 | master->tmit->master = master; | 1182 | master->tmit->master = master; |
1138 | master->tmit->notify_mod = notify_mod; | 1183 | master->tmit->notify_mod = notify_mod; |
1139 | master->tmit->notify_data = notify_data; | 1184 | master->tmit->notify_data = notify_data; |
1140 | master->tmit->notify_cls = notify_cls; | 1185 | master->tmit->notify_cls = notify_cls; |
1141 | master->tmit->state = MSG_STATE_START; // FIXME | 1186 | master->tmit->state = MSG_STATE_MODIFIER; |
1187 | |||
1188 | master_transmit_mod (master); | ||
1142 | return master->tmit; | 1189 | return master->tmit; |
1143 | } | 1190 | } |
1144 | 1191 | ||
@@ -1152,7 +1199,7 @@ void | |||
1152 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | 1199 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) |
1153 | { | 1200 | { |
1154 | struct GNUNET_PSYC_Channel *ch = &th->master->ch; | 1201 | struct GNUNET_PSYC_Channel *ch = &th->master->ch; |
1155 | if (GNUNET_NO == ch->tmit_ack_pending) | 1202 | if (0 == ch->tmit_ack_pending) |
1156 | { | 1203 | { |
1157 | ch->tmit_paused = GNUNET_NO; | 1204 | ch->tmit_paused = GNUNET_NO; |
1158 | master_transmit_data (th->master); | 1205 | master_transmit_data (th->master); |
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 704819c50..33684b125 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -25,6 +25,8 @@ | |||
25 | * @author Christian Grothoff | 25 | * @author Christian Grothoff |
26 | */ | 26 | */ |
27 | 27 | ||
28 | #include <inttypes.h> | ||
29 | |||
28 | #include "platform.h" | 30 | #include "platform.h" |
29 | #include "gnunet_crypto_lib.h" | 31 | #include "gnunet_crypto_lib.h" |
30 | #include "gnunet_common.h" | 32 | #include "gnunet_common.h" |
@@ -35,7 +37,7 @@ | |||
35 | 37 | ||
36 | #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) | 38 | #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) |
37 | 39 | ||
38 | #define DEBUG_SERVICE 1 | 40 | #define DEBUG_SERVICE 0 |
39 | 41 | ||
40 | 42 | ||
41 | /** | 43 | /** |
@@ -62,17 +64,37 @@ static struct GNUNET_CRYPTO_EddsaPublicKey slave_pub_key; | |||
62 | 64 | ||
63 | struct GNUNET_PSYC_MasterTransmitHandle *mth; | 65 | struct GNUNET_PSYC_MasterTransmitHandle *mth; |
64 | 66 | ||
67 | struct TransmitClosure | ||
68 | { | ||
69 | struct GNUNET_PSYC_MasterTransmitHandle *handle; | ||
70 | struct GNUNET_ENV_Environment *env; | ||
71 | char *data[16]; | ||
72 | const char *mod_value; | ||
73 | size_t mod_value_size; | ||
74 | uint8_t data_count; | ||
75 | uint8_t paused; | ||
76 | uint8_t n; | ||
77 | }; | ||
78 | |||
79 | struct TransmitClosure *tmit; | ||
80 | |||
65 | /** | 81 | /** |
66 | * Clean up all resources used. | 82 | * Clean up all resources used. |
67 | */ | 83 | */ |
68 | static void | 84 | static void |
69 | cleanup () | 85 | cleanup () |
70 | { | 86 | { |
71 | if (mst != NULL) | 87 | if (NULL != mst) |
72 | { | 88 | { |
73 | GNUNET_PSYC_master_stop (mst); | 89 | GNUNET_PSYC_master_stop (mst); |
74 | mst = NULL; | 90 | mst = NULL; |
75 | } | 91 | } |
92 | if (NULL != tmit) | ||
93 | { | ||
94 | GNUNET_ENV_environment_destroy (tmit->env); | ||
95 | GNUNET_free (tmit); | ||
96 | tmit = NULL; | ||
97 | } | ||
76 | GNUNET_SCHEDULER_shutdown (); | 98 | GNUNET_SCHEDULER_shutdown (); |
77 | } | 99 | } |
78 | 100 | ||
@@ -121,44 +143,40 @@ end () | |||
121 | } | 143 | } |
122 | 144 | ||
123 | 145 | ||
124 | static int | 146 | static void |
125 | method (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | 147 | message (void *cls, uint64_t message_id, uint32_t flags, |
126 | uint64_t message_id, const char *name, | 148 | const struct GNUNET_MessageHeader *msg) |
127 | size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers, | ||
128 | uint64_t data_offset, const void *data, size_t data_size, | ||
129 | enum GNUNET_PSYC_MessageFlags flags) | ||
130 | { | 149 | { |
131 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 150 | if (NULL == msg) |
132 | "Method: %s, modifiers: %lu, flags: %u\n%.*s\n", | 151 | { |
133 | name, modifier_count, flags, data_size, data); | 152 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
134 | return GNUNET_OK; | 153 | "Error while receiving message %llu\n", message_id); |
135 | } | 154 | return; |
155 | } | ||
136 | 156 | ||
157 | uint16_t type = ntohs (msg->type); | ||
158 | uint16_t size = ntohs (msg->size); | ||
137 | 159 | ||
138 | static int | 160 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
139 | join (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | 161 | "Got message part of type %u and size %u " |
140 | const char *method_name, | 162 | "belonging to message ID %llu with flags %u\n", |
141 | size_t variable_count, const struct GNUNET_ENV_Modifier *variables, | 163 | type, size, message_id, flags); |
142 | const void *data, size_t data_size, struct GNUNET_PSYC_JoinHandle *jh) | 164 | |
143 | { | 165 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) |
144 | return GNUNET_OK; | 166 | end (); |
145 | } | 167 | } |
146 | 168 | ||
147 | 169 | ||
148 | struct TransmitClosure | 170 | static void |
171 | join_request (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | ||
172 | const char *method_name, | ||
173 | size_t variable_count, const struct GNUNET_ENV_Modifier *variables, | ||
174 | const void *data, size_t data_size, | ||
175 | struct GNUNET_PSYC_JoinHandle *jh) | ||
149 | { | 176 | { |
150 | struct GNUNET_PSYC_MasterTransmitHandle *handle; | 177 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
151 | 178 | "Got join request."); | |
152 | char *mod_names[16]; | 179 | } |
153 | char *mod_values[16]; | ||
154 | char *data[16]; | ||
155 | |||
156 | uint8_t mod_count; | ||
157 | uint8_t data_count; | ||
158 | |||
159 | uint8_t paused; | ||
160 | uint8_t n; | ||
161 | }; | ||
162 | 180 | ||
163 | 181 | ||
164 | static void | 182 | static void |
@@ -172,45 +190,95 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
172 | 190 | ||
173 | 191 | ||
174 | static int | 192 | static int |
175 | tmit_notify_mod (void *cls, size_t *data_size, void *data) | 193 | tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper) |
176 | { | 194 | { |
177 | struct TransmitClosure *tmit = cls; | 195 | struct TransmitClosure *tmit = cls; |
178 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 196 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
179 | "Transmit notify modifier: %lu bytes available, " | 197 | "Transmit notify modifier: %lu bytes available, " |
180 | "processing modifier %u/%u.\n", | 198 | "%u modifiers left to process.\n", |
181 | *data_size, tmit->n + 1, tmit->fragment_count); | 199 | *data_size, GNUNET_ENV_environment_get_count (tmit->env)); |
182 | /* FIXME: continuation */ | 200 | |
183 | uint16_t name_size = strlen (tmit->mod_names[tmit->n]); | 201 | enum GNUNET_ENV_Operator op = 0; |
184 | uint16_t value_size = strlen (tmit->mod_values[tmit->n]); | 202 | const char *name = NULL; |
185 | if (name_size + 1 + value_size <= *data_size) | 203 | const char *value = NULL; |
186 | return GNUNET_NO; | 204 | uint16_t name_size = 0; |
187 | 205 | size_t value_size = 0; | |
188 | *data_size = name_size + 1 + value_size; | 206 | |
189 | memcpy (data, tmit->fragments[tmit->n], *data_size); | 207 | if (NULL != tmit->mod_value && 0 < tmit->mod_value_size) |
190 | 208 | { /* Modifier continuation */ | |
191 | if (++tmit->n < tmit->mod_count) | 209 | value = tmit->mod_value; |
192 | { | 210 | if (tmit->mod_value_size <= *data_size) |
193 | return GNUNET_NO; | 211 | { |
212 | value_size = tmit->mod_value_size; | ||
213 | tmit->mod_value = NULL; | ||
214 | } | ||
215 | else | ||
216 | { | ||
217 | value_size = *data_size; | ||
218 | tmit->mod_value += value_size; | ||
219 | } | ||
220 | tmit->mod_value_size -= value_size; | ||
221 | |||
222 | if (*data_size < value_size) | ||
223 | { | ||
224 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
225 | "value larger than buffer: %u < %zu\n", | ||
226 | *data_size, value_size); | ||
227 | *data_size = 0; | ||
228 | return GNUNET_NO; | ||
229 | } | ||
230 | |||
231 | *data_size = value_size; | ||
232 | memcpy (data, value, value_size); | ||
194 | } | 233 | } |
195 | else | 234 | else if (NULL != oper) |
196 | { | 235 | { |
197 | tmit->n = 0; | 236 | if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name, |
198 | return GNUNET_YES; | 237 | (void *) &value, &value_size)) |
238 | { /* No more modifiers, continue with data */ | ||
239 | *data_size = 0; | ||
240 | return GNUNET_YES; | ||
241 | } | ||
242 | |||
243 | *oper = op; | ||
244 | name_size = strlen (name); | ||
245 | |||
246 | if (name_size + 1 + value_size <= *data_size) | ||
247 | { | ||
248 | *data_size = name_size + 1 + value_size; | ||
249 | } | ||
250 | else | ||
251 | { | ||
252 | tmit->mod_value_size = value_size; | ||
253 | value_size = *data_size - name_size - 1; | ||
254 | tmit->mod_value_size -= value_size; | ||
255 | tmit->mod_value = value + value_size; | ||
256 | } | ||
257 | |||
258 | memcpy (data, name, name_size); | ||
259 | ((char *)data)[name_size] = '\0'; | ||
260 | memcpy ((char *)data + name_size + 1, value, value_size); | ||
199 | } | 261 | } |
262 | |||
263 | return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO; | ||
200 | } | 264 | } |
201 | 265 | ||
202 | 266 | ||
203 | static int | 267 | static int |
204 | tmit_notify_data (void *cls, size_t *data_size, void *data) | 268 | tmit_notify_data (void *cls, uint16_t *data_size, void *data) |
205 | { | 269 | { |
206 | struct TransmitClosure *tmit = cls; | 270 | struct TransmitClosure *tmit = cls; |
271 | uint16_t size = strlen (tmit->data[tmit->n]); | ||
207 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 272 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
208 | "Transmit notify data: %lu bytes available, " | 273 | "Transmit notify data: %lu bytes available, " |
209 | "processing fragment %u/%u.\n", | 274 | "processing fragment %u/%u (size %u).\n", |
210 | *data_size, tmit->n + 1, tmit->fragment_count); | 275 | *data_size, tmit->n + 1, tmit->data_count, size); |
211 | uint16_t size = strlen (tmit->data[tmit->n]); | 276 | if (*data_size < size) |
212 | if (size <= *data_size) | 277 | { |
213 | return GNUNET_NO; | 278 | *data_size = 0; |
279 | GNUNET_assert (0); | ||
280 | return GNUNET_SYSERR; | ||
281 | } | ||
214 | 282 | ||
215 | if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1) | 283 | if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1) |
216 | { | 284 | { |
@@ -231,19 +299,18 @@ tmit_notify_data (void *cls, size_t *data_size, void *data) | |||
231 | } | 299 | } |
232 | 300 | ||
233 | 301 | ||
234 | void | 302 | static void |
235 | master_started (void *cls, uint64_t max_message_id) | 303 | master_started (void *cls, uint64_t max_message_id) |
236 | { | 304 | { |
237 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 305 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
238 | "Master started: %lu\n", max_message_id); | 306 | "Master started: %" PRIu64 "\n", max_message_id); |
239 | 307 | ||
240 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); | 308 | tmit = GNUNET_new (struct TransmitClosure); |
241 | GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, | 309 | tmit->env = GNUNET_ENV_environment_create (); |
242 | "_foo", "bar baz", 7); | 310 | GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, |
243 | GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, | 311 | "_foo", "bar baz", 7); |
244 | "_foo_bar", "foo bar baz", 11); | 312 | GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, |
245 | 313 | "_foo_bar", "foo bar baz", 11); | |
246 | struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); | ||
247 | tmit->data[0] = "foo"; | 314 | tmit->data[0] = "foo"; |
248 | tmit->data[1] = "foo bar"; | 315 | tmit->data[1] = "foo bar"; |
249 | tmit->data[2] = "foo bar baz"; | 316 | tmit->data[2] = "foo bar baz"; |
@@ -255,7 +322,7 @@ master_started (void *cls, uint64_t max_message_id) | |||
255 | } | 322 | } |
256 | 323 | ||
257 | 324 | ||
258 | void | 325 | static void |
259 | slave_joined (void *cls, uint64_t max_message_id) | 326 | slave_joined (void *cls, uint64_t max_message_id) |
260 | { | 327 | { |
261 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id); | 328 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id); |
@@ -288,19 +355,19 @@ run (void *cls, | |||
288 | GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); | 355 | GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); |
289 | GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key); | 356 | GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key); |
290 | 357 | ||
291 | mst = GNUNET_PSYC_master_start (cfg, channel_key, | 358 | mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, |
292 | GNUNET_PSYC_CHANNEL_PRIVATE, | 359 | &message, &join_request, &master_started, NULL); |
293 | &method, &join, &master_started, NULL); | 360 | return; /* FIXME: test slave */ |
294 | return; | 361 | |
295 | struct GNUNET_PeerIdentity origin; | 362 | struct GNUNET_PeerIdentity origin; |
296 | struct GNUNET_PeerIdentity relays[16]; | 363 | struct GNUNET_PeerIdentity relays[16]; |
297 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); | 364 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); |
298 | GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, | 365 | GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, |
299 | "_foo", "bar baz", 7); | 366 | "_foo", "bar baz", 7); |
300 | GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, | 367 | GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, |
301 | "_foo_bar", "foo bar baz", 11); | 368 | "_foo_bar", "foo bar baz", 11); |
302 | slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, | 369 | slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, |
303 | 16, relays, &method, &join, &slave_joined, | 370 | 16, relays, &message, &join_request, &slave_joined, |
304 | NULL, "_request_join", env, "some data", 9); | 371 | NULL, "_request_join", env, "some data", 9); |
305 | GNUNET_ENV_environment_destroy (env); | 372 | GNUNET_ENV_environment_destroy (env); |
306 | } | 373 | } |
@@ -319,8 +386,7 @@ main (int argc, char *argv[]) | |||
319 | opts, &run, NULL)) | 386 | opts, &run, NULL)) |
320 | return 1; | 387 | return 1; |
321 | #else | 388 | #else |
322 | if (0 != GNUNET_TESTING_service_run ("test-psyc", "psyc", | 389 | if (0 != GNUNET_TESTING_peer_run ("test-psyc", "test_psyc.conf", &run, NULL)) |
323 | "test_psyc.conf", &run, NULL)) | ||
324 | return 1; | 390 | return 1; |
325 | #endif | 391 | #endif |
326 | return res; | 392 | return res; |
diff --git a/src/psyc/test_psyc.conf b/src/psyc/test_psyc.conf index 1e646239a..7a1eb8404 100644 --- a/src/psyc/test_psyc.conf +++ b/src/psyc/test_psyc.conf | |||
@@ -8,3 +8,10 @@ BINARY = gnunet-service-psyc | |||
8 | UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psyc.sock | 8 | UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psyc.sock |
9 | UNIX_MATCH_UID = NO | 9 | UNIX_MATCH_UID = NO |
10 | UNIX_MATCH_GID = YES | 10 | UNIX_MATCH_GID = YES |
11 | |||
12 | [psycstore] | ||
13 | AUTOSTART = YES | ||
14 | BINARY = gnunet-service-psycstore | ||
15 | UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psycstore.sock | ||
16 | UNIX_MATCH_UID = NO | ||
17 | UNIX_MATCH_GID = YES | ||