aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/psyc_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-03-12 16:39:41 +0000
committerGabor X Toth <*@tg-x.net>2014-03-12 16:39:41 +0000
commitdbdb091b11204e1e1caaa3f4260bb6cf1168cbd2 (patch)
tree23d24f1cd487b68d0fcb30f015c4f7f6ef9a015d /src/psyc/psyc_api.c
parent1bf8c98f6d843f30e9abfa6dde31e31e50170c06 (diff)
downloadgnunet-dbdb091b11204e1e1caaa3f4260bb6cf1168cbd2.tar.gz
gnunet-dbdb091b11204e1e1caaa3f4260bb6cf1168cbd2.zip
PSYC: in-order delivery of fragments; tests for large messages
Cache message received fragments from multicast and deliver them in the correct order to clients. Test messages with large modifier and data payloads.
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r--src/psyc/psyc_api.c44
1 files changed, 28 insertions, 16 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 8a1c9ffaa..16e8106d4 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -336,7 +336,7 @@ recv_error (struct GNUNET_PSYC_Channel *ch)
336 336
337 337
338/** 338/**
339 * Queue an incoming message part for transmission to the PSYC service. 339 * Queue a message part for transmission to the PSYC service.
340 * 340 *
341 * The message part is added to the current message buffer. 341 * The message part is added to the current message buffer.
342 * When this buffer is full, it is added to the transmission queue. 342 * When this buffer is full, it is added to the transmission queue.
@@ -390,7 +390,7 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
390 op->msg->size = sizeof (*op->msg) + size; 390 op->msg->size = sizeof (*op->msg) + size;
391 memcpy (&op->msg[1], msg, size); 391 memcpy (&op->msg[1], msg, size);
392 } 392 }
393 393
394 if (NULL != op 394 if (NULL != op
395 && (GNUNET_YES == end 395 && (GNUNET_YES == end
396 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD 396 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
@@ -433,12 +433,12 @@ channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
433 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; 433 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
434 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); 434 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
435 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); 435 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
436 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, 436 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
437 &data_size, &mod[1], &mod->oper); 437 &mod->oper, &mod->value_size);
438 mod->name_size = strnlen ((char *) &mod[1], data_size); 438 mod->name_size = strnlen ((char *) &mod[1], data_size);
439 if (mod->name_size < data_size) 439 if (mod->name_size < data_size)
440 { 440 {
441 mod->value_size = htons (data_size - 1 - mod->name_size); 441 mod->value_size = htonl (mod->value_size);
442 mod->name_size = htons (mod->name_size); 442 mod->name_size = htons (mod->name_size);
443 } 443 }
444 else if (0 < data_size) 444 else if (0 < data_size)
@@ -451,10 +451,10 @@ channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
451 case MSG_STATE_MOD_CONT: 451 case MSG_STATE_MOD_CONT:
452 { 452 {
453 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; 453 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
454 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); 454 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
455 msg->size = sizeof (struct GNUNET_MessageHeader); 455 msg->size = sizeof (struct GNUNET_MessageHeader);
456 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, 456 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
457 &data_size, &msg[1], NULL); 457 &data_size, &msg[1], NULL, NULL);
458 break; 458 break;
459 } 459 }
460 default: 460 default:
@@ -669,6 +669,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
669 ch->recv_message_id = GNUNET_ntohll (msg->message_id); 669 ch->recv_message_id = GNUNET_ntohll (msg->message_id);
670 ch->recv_flags = flags; 670 ch->recv_flags = flags;
671 ch->recv_slave_key = msg->slave_key; 671 ch->recv_slave_key = msg->slave_key;
672 ch->recv_mod_value_size = 0;
673 ch->recv_mod_value_size_expected = 0;
672 } 674 }
673 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) 675 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
674 { 676 {
@@ -703,7 +705,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
703 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) 705 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
704 { 706 {
705 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 707 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
706 "Discarding message of type %u with invalid size %u.\n", 708 "Dropping message of type %u with invalid size %u.\n",
707 ptype, psize); 709 ptype, psize);
708 recv_error (ch); 710 recv_error (ch);
709 return; 711 return;
@@ -753,7 +755,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
753 if (MSG_STATE_START != ch->recv_state) 755 if (MSG_STATE_START != ch->recv_state)
754 { 756 {
755 LOG (GNUNET_ERROR_TYPE_WARNING, 757 LOG (GNUNET_ERROR_TYPE_WARNING,
756 "Discarding out of order message method.\n"); 758 "Dropping out of order message method (%u).\n",
759 ch->recv_state);
757 /* It is normal to receive an incomplete message right after connecting, 760 /* It is normal to receive an incomplete message right after connecting,
758 * but should not happen later. 761 * but should not happen later.
759 * FIXME: add a check for this condition. 762 * FIXME: add a check for this condition.
@@ -766,7 +769,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
766 if ('\0' != *((char *) meth + psize - 1)) 769 if ('\0' != *((char *) meth + psize - 1))
767 { 770 {
768 LOG (GNUNET_ERROR_TYPE_WARNING, 771 LOG (GNUNET_ERROR_TYPE_WARNING,
769 "Discarding message with malformed method. " 772 "Dropping message with malformed method. "
770 "Message ID: %" PRIu64 "\n", ch->recv_message_id); 773 "Message ID: %" PRIu64 "\n", ch->recv_message_id);
771 GNUNET_break_op (0); 774 GNUNET_break_op (0);
772 recv_error (ch); 775 recv_error (ch);
@@ -782,7 +785,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
782 || MSG_STATE_MOD_CONT == ch->recv_state)) 785 || MSG_STATE_MOD_CONT == ch->recv_state))
783 { 786 {
784 LOG (GNUNET_ERROR_TYPE_WARNING, 787 LOG (GNUNET_ERROR_TYPE_WARNING,
785 "Discarding out of order message modifier.\n"); 788 "Dropping out of order message modifier (%u).\n",
789 ch->recv_state);
786 GNUNET_break_op (0); 790 GNUNET_break_op (0);
787 recv_error (ch); 791 recv_error (ch);
788 return; 792 return;
@@ -792,14 +796,14 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
792 = (struct GNUNET_PSYC_MessageModifier *) pmsg; 796 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
793 797
794 uint16_t name_size = ntohs (mod->name_size); 798 uint16_t name_size = ntohs (mod->name_size);
795 ch->recv_mod_value_size_expected = ntohs (mod->value_size); 799 ch->recv_mod_value_size_expected = ntohl (mod->value_size);
796 ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; 800 ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
797 801
798 if (psize < sizeof (*mod) + name_size + 1 802 if (psize < sizeof (*mod) + name_size + 1
799 || '\0' != *((char *) &mod[1] + name_size) 803 || '\0' != *((char *) &mod[1] + name_size)
800 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) 804 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
801 { 805 {
802 LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); 806 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
803 GNUNET_break_op (0); 807 GNUNET_break_op (0);
804 recv_error (ch); 808 recv_error (ch);
805 return; 809 return;
@@ -816,7 +820,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
816 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) 820 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
817 { 821 {
818 LOG (GNUNET_ERROR_TYPE_WARNING, 822 LOG (GNUNET_ERROR_TYPE_WARNING,
819 "Discarding out of order message modifier continuation.\n"); 823 "Dropping out of order message modifier continuation "
824 "!(%u == %u || %u == %u) || %lu < %lu.\n",
825 MSG_STATE_MODIFIER, ch->recv_state,
826 MSG_STATE_MOD_CONT, ch->recv_state,
827 ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
820 GNUNET_break_op (0); 828 GNUNET_break_op (0);
821 recv_error (ch); 829 recv_error (ch);
822 return; 830 return;
@@ -829,7 +837,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
829 || ch->recv_mod_value_size_expected != ch->recv_mod_value_size) 837 || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
830 { 838 {
831 LOG (GNUNET_ERROR_TYPE_WARNING, 839 LOG (GNUNET_ERROR_TYPE_WARNING,
832 "Discarding out of order message data fragment.\n"); 840 "Dropping out of order message data fragment "
841 "(%u < %u || %lu != %lu).\n",
842 ch->recv_state, MSG_STATE_METHOD,
843 ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
844
833 GNUNET_break_op (0); 845 GNUNET_break_op (0);
834 recv_error (ch); 846 recv_error (ch);
835 return; 847 return;
@@ -1412,7 +1424,7 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
1412 * @param th Handle of the request that is being resumed. 1424 * @param th Handle of the request that is being resumed.
1413 */ 1425 */
1414void 1426void
1415GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 1427GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1416{ 1428{
1417 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); 1429 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1418} 1430}