diff options
author | Gabor X Toth <*@tg-x.net> | 2014-03-12 16:39:41 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-03-12 16:39:41 +0000 |
commit | dbdb091b11204e1e1caaa3f4260bb6cf1168cbd2 (patch) | |
tree | 23d24f1cd487b68d0fcb30f015c4f7f6ef9a015d /src/psyc/psyc_api.c | |
parent | 1bf8c98f6d843f30e9abfa6dde31e31e50170c06 (diff) | |
download | gnunet-dbdb091b11204e1e1caaa3f4260bb6cf1168cbd2.tar.gz gnunet-dbdb091b11204e1e1caaa3f4260bb6cf1168cbd2.zip |
PSYC: in-order delivery of fragments; tests for large messages
Cache message received fragments from multicast and deliver them
in the correct order to clients.
Test messages with large modifier and data payloads.
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r-- | src/psyc/psyc_api.c | 44 |
1 files changed, 28 insertions, 16 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 8a1c9ffaa..16e8106d4 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -336,7 +336,7 @@ recv_error (struct GNUNET_PSYC_Channel *ch) | |||
336 | 336 | ||
337 | 337 | ||
338 | /** | 338 | /** |
339 | * Queue an incoming message part for transmission to the PSYC service. | 339 | * Queue a message part for transmission to the PSYC service. |
340 | * | 340 | * |
341 | * The message part is added to the current message buffer. | 341 | * The message part is added to the current message buffer. |
342 | * When this buffer is full, it is added to the transmission queue. | 342 | * When this buffer is full, it is added to the transmission queue. |
@@ -390,7 +390,7 @@ queue_message (struct GNUNET_PSYC_Channel *ch, | |||
390 | op->msg->size = sizeof (*op->msg) + size; | 390 | op->msg->size = sizeof (*op->msg) + size; |
391 | memcpy (&op->msg[1], msg, size); | 391 | memcpy (&op->msg[1], msg, size); |
392 | } | 392 | } |
393 | 393 | ||
394 | if (NULL != op | 394 | if (NULL != op |
395 | && (GNUNET_YES == end | 395 | && (GNUNET_YES == end |
396 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD | 396 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD |
@@ -433,12 +433,12 @@ channel_transmit_mod (struct GNUNET_PSYC_Channel *ch) | |||
433 | max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; | 433 | max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; |
434 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | 434 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); |
435 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); | 435 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); |
436 | notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, | 436 | notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1], |
437 | &data_size, &mod[1], &mod->oper); | 437 | &mod->oper, &mod->value_size); |
438 | mod->name_size = strnlen ((char *) &mod[1], data_size); | 438 | mod->name_size = strnlen ((char *) &mod[1], data_size); |
439 | if (mod->name_size < data_size) | 439 | if (mod->name_size < data_size) |
440 | { | 440 | { |
441 | mod->value_size = htons (data_size - 1 - mod->name_size); | 441 | mod->value_size = htonl (mod->value_size); |
442 | mod->name_size = htons (mod->name_size); | 442 | mod->name_size = htons (mod->name_size); |
443 | } | 443 | } |
444 | else if (0 < data_size) | 444 | else if (0 < data_size) |
@@ -451,10 +451,10 @@ channel_transmit_mod (struct GNUNET_PSYC_Channel *ch) | |||
451 | case MSG_STATE_MOD_CONT: | 451 | case MSG_STATE_MOD_CONT: |
452 | { | 452 | { |
453 | max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; | 453 | max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; |
454 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | 454 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); |
455 | msg->size = sizeof (struct GNUNET_MessageHeader); | 455 | msg->size = sizeof (struct GNUNET_MessageHeader); |
456 | notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, | 456 | notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, |
457 | &data_size, &msg[1], NULL); | 457 | &data_size, &msg[1], NULL, NULL); |
458 | break; | 458 | break; |
459 | } | 459 | } |
460 | default: | 460 | default: |
@@ -669,6 +669,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
669 | ch->recv_message_id = GNUNET_ntohll (msg->message_id); | 669 | ch->recv_message_id = GNUNET_ntohll (msg->message_id); |
670 | ch->recv_flags = flags; | 670 | ch->recv_flags = flags; |
671 | ch->recv_slave_key = msg->slave_key; | 671 | ch->recv_slave_key = msg->slave_key; |
672 | ch->recv_mod_value_size = 0; | ||
673 | ch->recv_mod_value_size_expected = 0; | ||
672 | } | 674 | } |
673 | else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) | 675 | else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) |
674 | { | 676 | { |
@@ -703,7 +705,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
703 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) | 705 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) |
704 | { | 706 | { |
705 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 707 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
706 | "Discarding message of type %u with invalid size %u.\n", | 708 | "Dropping message of type %u with invalid size %u.\n", |
707 | ptype, psize); | 709 | ptype, psize); |
708 | recv_error (ch); | 710 | recv_error (ch); |
709 | return; | 711 | return; |
@@ -753,7 +755,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
753 | if (MSG_STATE_START != ch->recv_state) | 755 | if (MSG_STATE_START != ch->recv_state) |
754 | { | 756 | { |
755 | LOG (GNUNET_ERROR_TYPE_WARNING, | 757 | LOG (GNUNET_ERROR_TYPE_WARNING, |
756 | "Discarding out of order message method.\n"); | 758 | "Dropping out of order message method (%u).\n", |
759 | ch->recv_state); | ||
757 | /* It is normal to receive an incomplete message right after connecting, | 760 | /* It is normal to receive an incomplete message right after connecting, |
758 | * but should not happen later. | 761 | * but should not happen later. |
759 | * FIXME: add a check for this condition. | 762 | * FIXME: add a check for this condition. |
@@ -766,7 +769,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
766 | if ('\0' != *((char *) meth + psize - 1)) | 769 | if ('\0' != *((char *) meth + psize - 1)) |
767 | { | 770 | { |
768 | LOG (GNUNET_ERROR_TYPE_WARNING, | 771 | LOG (GNUNET_ERROR_TYPE_WARNING, |
769 | "Discarding message with malformed method. " | 772 | "Dropping message with malformed method. " |
770 | "Message ID: %" PRIu64 "\n", ch->recv_message_id); | 773 | "Message ID: %" PRIu64 "\n", ch->recv_message_id); |
771 | GNUNET_break_op (0); | 774 | GNUNET_break_op (0); |
772 | recv_error (ch); | 775 | recv_error (ch); |
@@ -782,7 +785,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
782 | || MSG_STATE_MOD_CONT == ch->recv_state)) | 785 | || MSG_STATE_MOD_CONT == ch->recv_state)) |
783 | { | 786 | { |
784 | LOG (GNUNET_ERROR_TYPE_WARNING, | 787 | LOG (GNUNET_ERROR_TYPE_WARNING, |
785 | "Discarding out of order message modifier.\n"); | 788 | "Dropping out of order message modifier (%u).\n", |
789 | ch->recv_state); | ||
786 | GNUNET_break_op (0); | 790 | GNUNET_break_op (0); |
787 | recv_error (ch); | 791 | recv_error (ch); |
788 | return; | 792 | return; |
@@ -792,14 +796,14 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
792 | = (struct GNUNET_PSYC_MessageModifier *) pmsg; | 796 | = (struct GNUNET_PSYC_MessageModifier *) pmsg; |
793 | 797 | ||
794 | uint16_t name_size = ntohs (mod->name_size); | 798 | uint16_t name_size = ntohs (mod->name_size); |
795 | ch->recv_mod_value_size_expected = ntohs (mod->value_size); | 799 | ch->recv_mod_value_size_expected = ntohl (mod->value_size); |
796 | ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; | 800 | ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; |
797 | 801 | ||
798 | if (psize < sizeof (*mod) + name_size + 1 | 802 | if (psize < sizeof (*mod) + name_size + 1 |
799 | || '\0' != *((char *) &mod[1] + name_size) | 803 | || '\0' != *((char *) &mod[1] + name_size) |
800 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | 804 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) |
801 | { | 805 | { |
802 | LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); | 806 | LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n"); |
803 | GNUNET_break_op (0); | 807 | GNUNET_break_op (0); |
804 | recv_error (ch); | 808 | recv_error (ch); |
805 | return; | 809 | return; |
@@ -816,7 +820,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
816 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | 820 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) |
817 | { | 821 | { |
818 | LOG (GNUNET_ERROR_TYPE_WARNING, | 822 | LOG (GNUNET_ERROR_TYPE_WARNING, |
819 | "Discarding out of order message modifier continuation.\n"); | 823 | "Dropping out of order message modifier continuation " |
824 | "!(%u == %u || %u == %u) || %lu < %lu.\n", | ||
825 | MSG_STATE_MODIFIER, ch->recv_state, | ||
826 | MSG_STATE_MOD_CONT, ch->recv_state, | ||
827 | ch->recv_mod_value_size_expected, ch->recv_mod_value_size); | ||
820 | GNUNET_break_op (0); | 828 | GNUNET_break_op (0); |
821 | recv_error (ch); | 829 | recv_error (ch); |
822 | return; | 830 | return; |
@@ -829,7 +837,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
829 | || ch->recv_mod_value_size_expected != ch->recv_mod_value_size) | 837 | || ch->recv_mod_value_size_expected != ch->recv_mod_value_size) |
830 | { | 838 | { |
831 | LOG (GNUNET_ERROR_TYPE_WARNING, | 839 | LOG (GNUNET_ERROR_TYPE_WARNING, |
832 | "Discarding out of order message data fragment.\n"); | 840 | "Dropping out of order message data fragment " |
841 | "(%u < %u || %lu != %lu).\n", | ||
842 | ch->recv_state, MSG_STATE_METHOD, | ||
843 | ch->recv_mod_value_size_expected, ch->recv_mod_value_size); | ||
844 | |||
833 | GNUNET_break_op (0); | 845 | GNUNET_break_op (0); |
834 | recv_error (ch); | 846 | recv_error (ch); |
835 | return; | 847 | return; |
@@ -1412,7 +1424,7 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, | |||
1412 | * @param th Handle of the request that is being resumed. | 1424 | * @param th Handle of the request that is being resumed. |
1413 | */ | 1425 | */ |
1414 | void | 1426 | void |
1415 | GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | 1427 | GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th) |
1416 | { | 1428 | { |
1417 | channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | 1429 | channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); |
1418 | } | 1430 | } |