aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/gnunet-service-psyc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/psyc/gnunet-service-psyc.c')
-rw-r--r--src/psyc/gnunet-service-psyc.c323
1 files changed, 92 insertions, 231 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 628c39900..e5de7dcda 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -171,8 +171,8 @@ struct Slave
171}; 171};
172 172
173 173
174static void 174static inline void
175transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay); 175transmit_message (struct Channel *ch);
176 176
177 177
178/** 178/**
@@ -205,6 +205,7 @@ client_cleanup (struct Channel *ch)
205 struct Master *mst = (struct Master *) ch; 205 struct Master *mst = (struct Master *) ch;
206 if (NULL != mst->origin) 206 if (NULL != mst->origin)
207 GNUNET_MULTICAST_origin_stop (mst->origin); 207 GNUNET_MULTICAST_origin_stop (mst->origin);
208 GNUNET_CONTAINER_multihashmap_remove (clients, &mst->pub_key_hash, mst);
208 } 209 }
209 else 210 else
210 { 211 {
@@ -251,7 +252,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
251 /* Send pending messages to multicast before cleanup. */ 252 /* Send pending messages to multicast before cleanup. */
252 if (NULL != ch->tmit_head) 253 if (NULL != ch->tmit_head)
253 { 254 {
254 transmit_message (ch, GNUNET_TIME_UNIT_ZERO); 255 transmit_message (ch);
255 } 256 }
256 else 257 else
257 { 258 {
@@ -321,6 +322,10 @@ message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
321 const struct GNUNET_MessageHeader *msg = cls; 322 const struct GNUNET_MessageHeader *msg = cls;
322 struct Channel *ch = chan; 323 struct Channel *ch = chan;
323 324
325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
326 "Sending message of type %u and size %u to client 0x%zx.\n",
327 ntohs (msg->type), ntohs (msg->size), ch->client);
328
324 GNUNET_SERVER_notification_context_add (nc, ch->client); 329 GNUNET_SERVER_notification_context_add (nc, ch->client);
325 GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO); 330 GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO);
326 331
@@ -363,24 +368,6 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
363 GNUNET_MULTICAST_MessageHeader *) msg, 368 GNUNET_MULTICAST_MessageHeader *) msg,
364 0, NULL, NULL); 369 0, NULL, NULL);
365 370
366 uint16_t size = ntohs (msg->size);
367 uint16_t psize = 0;
368 uint16_t pos = 0;
369
370 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
371 {
372 const struct GNUNET_MessageHeader *pmsg
373 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
374 uint16_t psize = ntohs (pmsg->size);
375 if (sizeof (*msg) + pos + psize > size)
376 {
377 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
378 "Message received from multicast contains invalid PSYC "
379 "message. Not sending to clients.\n");
380 return;
381 }
382 }
383
384#if TODO 371#if TODO
385 /* FIXME: apply modifiers to state in PSYCstore */ 372 /* FIXME: apply modifiers to state in PSYCstore */
386 GNUNET_PSYCSTORE_state_modify (store, chan_key, 373 GNUNET_PSYCSTORE_state_modify (store, chan_key,
@@ -393,6 +380,26 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
393 = (const struct GNUNET_MULTICAST_MessageHeader *) msg; 380 = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
394 struct GNUNET_PSYC_MessageHeader *pmsg; 381 struct GNUNET_PSYC_MessageHeader *pmsg;
395 382
383 uint16_t size = ntohs (msg->size);
384 uint16_t psize = 0;
385 uint16_t pos = 0;
386
387 for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize)
388 {
389 const struct GNUNET_MessageHeader *pmsg
390 = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos);
391 psize = ntohs (pmsg->size);
392 if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size)
393 {
394 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
395 "Received invalid message part of type %u and size %u "
396 "from multicast. Not sending to clients.\n",
397 ntohs (pmsg->type), psize);
398 GNUNET_break_op (0);
399 return;
400 }
401 }
402
396 psize = sizeof (*pmsg) + size - sizeof (*mmsg); 403 psize = sizeof (*pmsg) + size - sizeof (*mmsg);
397 pmsg = GNUNET_malloc (psize); 404 pmsg = GNUNET_malloc (psize);
398 pmsg->header.size = htons (psize); 405 pmsg->header.size = htons (psize);
@@ -572,19 +579,18 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
572 579
573 580
574/** 581/**
575 * Send transmission acknowledgement to a client. 582 * Send acknowledgement to a client.
576 * 583 *
577 * Sent after the last GNUNET_PSYC_MessageModifier and after each 584 * Sent after a message fragment has been passed on to multicast.
578 * GNUNET_PSYC_MessageData.
579 * 585 *
580 * @param ch The channel struct for the client. 586 * @param ch The channel struct for the client.
581 */ 587 */
582static void 588static void
583send_transmit_ack (struct Channel *ch) 589send_message_ack (struct Channel *ch)
584{ 590{
585 struct GNUNET_MessageHeader res; 591 struct GNUNET_MessageHeader res;
586 res.size = htons (sizeof (res)); 592 res.size = htons (sizeof (res));
587 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); 593 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
588 594
589 GNUNET_SERVER_notification_context_add (nc, ch->client); 595 GNUNET_SERVER_notification_context_add (nc, ch->client);
590 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, 596 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
@@ -599,9 +605,9 @@ static int
599transmit_notify (void *cls, size_t *data_size, void *data) 605transmit_notify (void *cls, size_t *data_size, void *data)
600{ 606{
601 struct Channel *ch = cls; 607 struct Channel *ch = cls;
602 struct TransmitMessage *msg = ch->tmit_head; 608 struct TransmitMessage *tmit_msg = ch->tmit_head;
603 609
604 if (NULL == msg || *data_size < msg->size) 610 if (NULL == tmit_msg || *data_size < tmit_msg->size)
605 { 611 {
606 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n"); 612 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n");
607 *data_size = 0; 613 *data_size = 0;
@@ -609,21 +615,22 @@ transmit_notify (void *cls, size_t *data_size, void *data)
609 } 615 }
610 616
611 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 617 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
612 "transmit_notify: sending %u bytes.\n", msg->size); 618 "transmit_notify: sending %u bytes.\n", tmit_msg->size);
613 619
614 *data_size = msg->size; 620 *data_size = tmit_msg->size;
615 memcpy (data, msg->buf, *data_size); 621 memcpy (data, tmit_msg->buf, *data_size);
616 622
617 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); 623 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
618 GNUNET_free (msg); 624 GNUNET_free (tmit_msg);
619 625
620 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; 626 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
627 send_message_ack (ch);
621 628
622 if (0 == ch->tmit_task) 629 if (0 == ch->tmit_task)
623 { 630 {
624 if (NULL != ch->tmit_head) 631 if (NULL != ch->tmit_head)
625 { 632 {
626 transmit_message (ch, GNUNET_TIME_UNIT_ZERO); 633 transmit_message (ch);
627 } 634 }
628 else if (ch->disconnected) 635 else if (ch->disconnected)
629 { 636 {
@@ -640,11 +647,9 @@ transmit_notify (void *cls, size_t *data_size, void *data)
640 * Transmit a message from a channel master to the multicast group. 647 * Transmit a message from a channel master to the multicast group.
641 */ 648 */
642static void 649static void
643master_transmit_message (void *cls, 650master_transmit_message (struct Master *mst)
644 const struct GNUNET_SCHEDULER_TaskContext *tc)
645{ 651{
646 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n"); 652 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
647 struct Master *mst = cls;
648 mst->channel.tmit_task = 0; 653 mst->channel.tmit_task = 0;
649 if (NULL == mst->tmit_handle) 654 if (NULL == mst->tmit_handle)
650 { 655 {
@@ -664,10 +669,8 @@ master_transmit_message (void *cls,
664 * Transmit a message from a channel slave to the multicast group. 669 * Transmit a message from a channel slave to the multicast group.
665 */ 670 */
666static void 671static void
667slave_transmit_message (void *cls, 672slave_transmit_message (struct Slave *slv)
668 const struct GNUNET_SCHEDULER_TaskContext *tc)
669{ 673{
670 struct Slave *slv = cls;
671 slv->channel.tmit_task = 0; 674 slv->channel.tmit_task = 0;
672 if (NULL == slv->tmit_handle) 675 if (NULL == slv->tmit_handle)
673 { 676 {
@@ -682,214 +685,85 @@ slave_transmit_message (void *cls,
682} 685}
683 686
684 687
685/** 688static inline void
686 * Schedule message transmission from a channel to the multicast group. 689transmit_message (struct Channel *ch)
687 *
688 * @param ch The channel.
689 * @param delay Transmission delay.
690 */
691static void
692transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay)
693{ 690{
694 if (0 != ch->tmit_task) 691 ch->is_master
695 GNUNET_SCHEDULER_cancel (ch->tmit_task); 692 ? master_transmit_message ((struct Master *) ch)
696 693 : slave_transmit_message ((struct Slave *) ch);
697 ch->tmit_task
698 = ch->is_master
699 ? GNUNET_SCHEDULER_add_delayed (delay, master_transmit_message, ch)
700 : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch);
701}
702
703
704/**
705 * Queue incoming message parts from a client for transmission, and send them to
706 * the multicast group when the buffer is full or reached the end of message.
707 *
708 * @param ch Channel struct for the client.
709 * @param msg Message from the client.
710 *
711 * @return #GNUNET_OK on success, else #GNUNET_SYSERR.
712 */
713static int
714queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
715{
716 uint16_t size = ntohs (msg->size);
717 struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
718 struct TransmitMessage *tmit_msg = ch->tmit_tail;
719
720 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
721 "Queueing message of type %u and size %u "
722 "for transmission to multicast.\n",
723 ntohs (msg->type), size);
724
725 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size)
726 return GNUNET_SYSERR;
727
728 if (NULL == tmit_msg
729 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit_msg->size + size)
730 {
731 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
732 "Appending message to new buffer.\n");
733 /* Start filling up new buffer */
734 tmit_msg = GNUNET_new (struct TransmitMessage);
735 tmit_msg->buf = GNUNET_malloc (size);
736 memcpy (tmit_msg->buf, msg, size);
737 tmit_msg->size = size;
738 tmit_msg->state = ch->tmit_state;
739 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
740 }
741 else
742 {
743 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744 "Appending message to existing buffer.\n");
745 /* Append to existing buffer */
746 tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size);
747 memcpy (tmit_msg->buf + tmit_msg->size, msg, size);
748 tmit_msg->size += size;
749 tmit_msg->state = ch->tmit_state;
750 }
751
752 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size);
753
754 /* Wait a bit for the remaining message parts from the client
755 if there's still some space left in the buffer. */
756 if (tmit_msg->state < MSG_STATE_END
757 && (tmit_msg->size + sizeof (struct GNUNET_MessageHeader)
758 < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD))
759 {
760 tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2);
761 }
762 else
763 {
764 send_transmit_ack (ch);
765 }
766
767 transmit_message (ch, tmit_delay);
768
769 return GNUNET_OK;
770} 694}
771 695
772 696
773static void 697static void
774transmit_error (struct Channel *ch) 698transmit_error (struct Channel *ch)
775{ 699{
776 struct GNUNET_MessageHeader *msg = GNUNET_malloc (sizeof (*msg)); 700 struct GNUNET_MessageHeader *msg;
701 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
702 + sizeof (*msg));
703 msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
777 msg->size = ntohs (sizeof (*msg)); 704 msg->size = ntohs (sizeof (*msg));
778 msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 705 msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
779 queue_message (ch, msg);
780 706
707 tmit_msg->buf = (char *) &tmit_msg[1];
708 tmit_msg->size = sizeof (*msg);
709 tmit_msg->state = ch->tmit_state;
710 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
711 transmit_message (ch);
712
713 /* FIXME: cleanup */
781 GNUNET_SERVER_client_disconnect (ch->client); 714 GNUNET_SERVER_client_disconnect (ch->client);
782} 715}
783 716
784/**
785 * Incoming method from a client.
786 */
787static void
788handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
789 const struct GNUNET_MessageHeader *msg)
790{
791 /* const struct GNUNET_PSYC_MessageMethod *meth
792 = (const struct GNUNET_PSYC_MessageMethod *) msg; */
793 struct Channel *ch
794 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
795 GNUNET_assert (NULL != ch);
796
797 if (MSG_STATE_START != ch->tmit_state)
798 {
799 transmit_error (ch);
800 return;
801 }
802 ch->tmit_state = MSG_STATE_METHOD;
803
804 queue_message (ch, msg);
805 send_transmit_ack (ch);
806 GNUNET_SERVER_receive_done (client, GNUNET_OK);
807};
808
809
810/**
811 * Incoming modifier from a client.
812 */
813static void
814handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
815 const struct GNUNET_MessageHeader *msg)
816{
817 const struct GNUNET_PSYC_MessageModifier *mod
818 = (const struct GNUNET_PSYC_MessageModifier *) msg;
819
820 struct Channel *ch
821 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
822 GNUNET_assert (NULL != ch);
823
824 if (MSG_STATE_METHOD != ch->tmit_state
825 || MSG_STATE_MODIFIER != ch->tmit_state
826 || MSG_STATE_MOD_CONT != ch->tmit_state
827 || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size)
828 {
829 transmit_error (ch);
830 return;
831 }
832 ch->tmit_mod_value_size_expected = ntohl (mod->value_size);
833 ch->tmit_mod_value_size = ntohs (msg->size) - ntohs(mod->name_size) - 1;
834
835 queue_message (ch, msg);
836 GNUNET_SERVER_receive_done (client, GNUNET_OK);
837};
838
839 717
840/** 718/**
841 * Incoming modifier from a client. 719 * Incoming message from a client.
842 */ 720 */
843static void 721static void
844handle_transmit_mod_cont (void *cls, struct GNUNET_SERVER_Client *client, 722handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
845 const struct GNUNET_MessageHeader *msg) 723 const struct GNUNET_MessageHeader *msg)
846{ 724{
847 struct Channel *ch 725 struct Channel *ch
848 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 726 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
849 GNUNET_assert (NULL != ch); 727 GNUNET_assert (NULL != ch);
850 728
851 ch->tmit_mod_value_size += ntohs (msg->size); 729 uint16_t size = ntohs (msg->size);
730 uint16_t psize = 0, pos = 0;
852 731
853 if (MSG_STATE_MODIFIER != ch->tmit_state 732 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
854 || MSG_STATE_MOD_CONT != ch->tmit_state
855 || ch->tmit_mod_value_size_expected < ch->tmit_mod_value_size)
856 { 733 {
734 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n");
735 GNUNET_break (0);
857 transmit_error (ch); 736 transmit_error (ch);
858 return; 737 return;
859 } 738 }
860 ch->tmit_state = MSG_STATE_MOD_CONT;
861
862 queue_message (ch, msg);
863 GNUNET_SERVER_receive_done (client, GNUNET_OK);
864};
865 739
866 740 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
867/**
868 * Incoming data from a client.
869 */
870static void
871handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
872 const struct GNUNET_MessageHeader *msg)
873{
874 struct Channel *ch
875 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
876 GNUNET_assert (NULL != ch);
877
878 if (MSG_STATE_METHOD != ch->tmit_state
879 || MSG_STATE_MODIFIER != ch->tmit_state
880 || MSG_STATE_MOD_CONT != ch->tmit_state
881 || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size)
882 { 741 {
883 transmit_error (ch); 742 const struct GNUNET_MessageHeader *pmsg
884 return; 743 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
744 psize = ntohs (pmsg->size);
745 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746 "Received message part of type %u and size %u "
747 "from client.\n", ntohs (pmsg->type), psize);
748 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
749 {
750 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
751 "Received invalid message part of type %u and size %u "
752 "from client.\n", ntohs (pmsg->type), psize);
753 GNUNET_break (0);
754 transmit_error (ch);
755 return;
756 }
885 } 757 }
886 ch->tmit_state = MSG_STATE_DATA;
887 758
888 queue_message (ch, msg); 759 size -= sizeof (*msg);
889 send_transmit_ack (ch); 760 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
890 761 tmit_msg->buf = (char *) &tmit_msg[1];
891 if (MSG_STATE_END <= ch->tmit_state) 762 memcpy (tmit_msg->buf, &msg[1], size);
892 ch->tmit_state = MSG_STATE_START; 763 tmit_msg->size = size;
764 tmit_msg->state = ch->tmit_state;
765 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
766 transmit_message (ch);
893 767
894 GNUNET_SERVER_receive_done (client, GNUNET_OK); 768 GNUNET_SERVER_receive_done (client, GNUNET_OK);
895}; 769};
@@ -912,22 +786,9 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
912 786
913 { &handle_slave_join, NULL, 787 { &handle_slave_join, NULL,
914 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, 788 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
915#if TODO 789
916 { &handle_psyc_message, NULL, 790 { &handle_psyc_message, NULL,
917 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, 791 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
918#endif
919 { &handle_transmit_method, NULL,
920 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
921
922 { &handle_transmit_modifier, NULL,
923 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
924
925 { &handle_transmit_mod_cont, NULL,
926 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT, 0 },
927
928 { &handle_transmit_data, NULL,
929 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
930 { NULL, NULL, 0, 0 }
931 }; 792 };
932 793
933 cfg = c; 794 cfg = c;