aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-03-12 16:39:41 +0000
committerGabor X Toth <*@tg-x.net>2014-03-12 16:39:41 +0000
commitdbdb091b11204e1e1caaa3f4260bb6cf1168cbd2 (patch)
tree23d24f1cd487b68d0fcb30f015c4f7f6ef9a015d /src/psyc
parent1bf8c98f6d843f30e9abfa6dde31e31e50170c06 (diff)
downloadgnunet-dbdb091b11204e1e1caaa3f4260bb6cf1168cbd2.tar.gz
gnunet-dbdb091b11204e1e1caaa3f4260bb6cf1168cbd2.zip
PSYC: in-order delivery of fragments; tests for large messages
Cache message received fragments from multicast and deliver them in the correct order to clients. Test messages with large modifier and data payloads.
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c315
-rw-r--r--src/psyc/psyc.h4
-rw-r--r--src/psyc/psyc_api.c44
-rw-r--r--src/psyc/psyc_common.c24
-rw-r--r--src/psyc/test_psyc.c66
5 files changed, 390 insertions, 63 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index dcb5031f1..4cdb490a1 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -24,6 +24,8 @@
24 * @author Gabor X Toth 24 * @author Gabor X Toth
25 */ 25 */
26 26
27#include <inttypes.h>
28
27#include "platform.h" 29#include "platform.h"
28#include "gnunet_util_lib.h" 30#include "gnunet_util_lib.h"
29#include "gnunet_constants.h" 31#include "gnunet_constants.h"
@@ -77,6 +79,45 @@ struct TransmitMessage
77 uint8_t state; 79 uint8_t state;
78}; 80};
79 81
82
83/**
84 * Cache for received message fragments.
85 * Message fragments are only sent to clients after all modifiers arrived.
86 *
87 * chan_key -> MultiHashMap chan_msgs
88 */
89static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
90
91
92/**
93 * Entry in the chan_msgs hashmap of @a recv_cache:
94 * fragment_id -> FragmentEntry
95 */
96struct FragmentEntry
97{
98 struct GNUNET_MULTICAST_MessageHeader *mmsg;
99 uint16_t ref_count;
100};
101
102
103/**
104 * Entry in the @a recv_msgs hash map of a @a Channel.
105 * message_id -> FragmentCache
106 */
107struct FragmentCache
108{
109 /**
110 * Total size of header fragments (METHOD & MODIFIERs)
111 */
112 uint64_t header_size;
113
114 /**
115 * Fragment IDs stored in @a recv_cache.
116 */
117 struct GNUNET_CONTAINER_Heap *fragments;
118};
119
120
80/** 121/**
81 * Common part of the client context for both a master and slave channel. 122 * Common part of the client context for both a master and slave channel.
82 */ 123 */
@@ -87,6 +128,12 @@ struct Channel
87 struct TransmitMessage *tmit_head; 128 struct TransmitMessage *tmit_head;
88 struct TransmitMessage *tmit_tail; 129 struct TransmitMessage *tmit_tail;
89 130
131 /**
132 * Received fragments not yet sent to the client.
133 * message_id -> FragmentCache
134 */
135 struct GNUNET_CONTAINER_MultiHashMap *recv_msgs;
136
90 GNUNET_SCHEDULER_TaskIdentifier tmit_task; 137 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
91 138
92 /** 139 /**
@@ -213,6 +260,8 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
213static void 260static void
214client_cleanup (struct Channel *ch) 261client_cleanup (struct Channel *ch)
215{ 262{
263 /* FIXME: fragment_cache_clear */
264
216 if (ch->is_master) 265 if (ch->is_master)
217 { 266 {
218 struct Master *mst = (struct Master *) ch; 267 struct Master *mst = (struct Master *) ch;
@@ -323,6 +372,189 @@ fragment_store_result (void *cls, int64_t result, const char *err_msg)
323} 372}
324 373
325 374
375static void
376message_to_client (struct Channel *ch,
377 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
378{
379 uint16_t size = ntohs (mmsg->header.size);
380 struct GNUNET_PSYC_MessageHeader *pmsg;
381 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
382
383 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384 "%p Sending message to client. "
385 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
386 ch, GNUNET_ntohll (mmsg->fragment_id),
387 GNUNET_ntohll (mmsg->message_id));
388
389 pmsg = GNUNET_malloc (psize);
390 pmsg->header.size = htons (psize);
391 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
392 pmsg->message_id = mmsg->message_id;
393
394 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
395
396 GNUNET_SERVER_notification_context_add (nc, ch->client);
397 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
398 (const struct GNUNET_MessageHeader *) pmsg,
399 GNUNET_NO);
400 GNUNET_free (pmsg);
401}
402
403
404/**
405 * Convert an uint64_t in network byte order to a HashCode
406 * that can be used as key in a MultiHashMap
407 */
408static inline void
409hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
410{
411 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
412
413 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
414 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
415
416 *key = (struct GNUNET_HashCode) {{ 0 }};
417 *((uint64_t *) key)
418 = (n << 32) | (n >> 32);
419}
420
421
422/**
423 * Convert an uint64_t in host byte order to a HashCode
424 * that can be used as key in a MultiHashMap
425 */
426static inline void
427hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
428{
429#if __BYTE_ORDER == __BIG_ENDIAN
430 hash_key_from_nll (key, n);
431#elif __BYTE_ORDER == __LITTLE_ENDIAN
432 *key = (struct GNUNET_HashCode) {{ 0 }};
433 *((uint64_t *) key) = n;
434#else
435 #error byteorder undefined
436#endif
437}
438
439
440static void
441fragment_cache_insert (struct Channel *ch,
442 const struct GNUNET_HashCode *chan_key_hash,
443 const struct GNUNET_HashCode *msg_id,
444 struct FragmentCache *frag_cache,
445 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
446 uint16_t last_part_type)
447{
448 uint16_t size = ntohs (mmsg->header.size);
449 struct GNUNET_CONTAINER_MultiHashMap
450 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, chan_key_hash);
451
452 if (NULL == frag_cache)
453 {
454 frag_cache = GNUNET_new (struct FragmentCache);
455 frag_cache->fragments
456 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
457
458 if (NULL == ch->recv_msgs)
459 {
460 ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
461 }
462 GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache,
463 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
464
465 if (NULL == chan_msgs)
466 {
467 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
468 GNUNET_CONTAINER_multihashmap_put (recv_cache, chan_key_hash, chan_msgs,
469 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
470 }
471 }
472
473 struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode);
474 hash_key_from_nll (frag_id, mmsg->fragment_id);
475 struct FragmentEntry
476 *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
477 if (NULL == frag_entry)
478 {
479 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
480 "%p Adding message fragment to cache. "
481 "fragment_id: %" PRIu64 ", "
482 "header_size: %" PRIu64 " + %" PRIu64 ").\n",
483 ch, GNUNET_ntohll (mmsg->fragment_id),
484 frag_cache->header_size, size);
485 frag_entry = GNUNET_new (struct FragmentEntry);
486 frag_entry->ref_count = 1;
487 frag_entry->mmsg = GNUNET_malloc (size);
488 memcpy (frag_entry->mmsg, mmsg, size);
489 GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry,
490 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
491 }
492 else
493 {
494 frag_entry->ref_count++;
495 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
496 "%p Message fragment already in cache. "
497 "fragment_id: %" PRIu64 ", ref_count: %u\n",
498 ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count);
499 }
500
501 switch (last_part_type)
502 {
503 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
504 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
505 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
506 frag_cache->header_size += size;
507 }
508 GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id,
509 GNUNET_ntohll (mmsg->fragment_id));
510}
511
512
513static void
514fragment_cache_clear (struct Channel *ch,
515 const struct GNUNET_HashCode *chan_key_hash,
516 const struct GNUNET_HashCode *msg_id,
517 struct FragmentCache *frag_cache,
518 uint8_t send_to_client)
519{
520 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521 "%p Clearing message fragment cache.\n", ch);
522
523 struct GNUNET_CONTAINER_MultiHashMap
524 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, chan_key_hash);
525 GNUNET_assert (NULL != chan_msgs);
526 struct GNUNET_HashCode *frag_id;
527
528 while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments)))
529 {
530 struct FragmentEntry
531 *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
532 if (frag_entry != NULL)
533 {
534 if (GNUNET_YES == send_to_client)
535 {
536 message_to_client (ch, frag_entry->mmsg);
537 }
538 if (1 == frag_entry->ref_count)
539 {
540 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry);
541 GNUNET_free (frag_entry->mmsg);
542 GNUNET_free (frag_entry);
543 }
544 else
545 {
546 frag_entry->ref_count--;
547 }
548 }
549 GNUNET_free (frag_id);
550 }
551
552 GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache);
553 GNUNET_CONTAINER_heap_destroy (frag_cache->fragments);
554 GNUNET_free (frag_cache);
555}
556
557
326/** 558/**
327 * Incoming message fragment from multicast. 559 * Incoming message fragment from multicast.
328 * 560 *
@@ -358,11 +590,15 @@ message_cb (struct Channel *ch,
358 rcb, rcb_cls); 590 rcb, rcb_cls);
359#endif 591#endif
360 592
361 const struct GNUNET_MULTICAST_MessageHeader *mmsg 593 const struct GNUNET_MULTICAST_MessageHeader
362 = (const struct GNUNET_MULTICAST_MessageHeader *) msg; 594 *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
363 595
364 if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg), 596 uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg),
365 (const char *) &mmsg[1])) 597 (const char *) &mmsg[1]);
598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599 "Last message part type %u\n", ptype);
600
601 if (GNUNET_NO == ptype)
366 { 602 {
367 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 603 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
368 "%p Received message with invalid parts from multicast. " 604 "%p Received message with invalid parts from multicast. "
@@ -371,20 +607,55 @@ message_cb (struct Channel *ch,
371 break; 607 break;
372 } 608 }
373 609
374 struct GNUNET_PSYC_MessageHeader *pmsg; 610 struct GNUNET_HashCode msg_id;
375 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); 611 hash_key_from_nll (&msg_id, mmsg->message_id);
376 pmsg = GNUNET_malloc (psize);
377 pmsg->header.size = htons (psize);
378 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
379 pmsg->message_id = mmsg->message_id;
380 612
381 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); 613 struct FragmentCache *frag_cache = NULL;
614 if (NULL != ch->recv_msgs)
615 frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id);
382 616
383 GNUNET_SERVER_notification_context_add (nc, ch->client); 617 switch (ptype)
384 GNUNET_SERVER_notification_context_unicast (nc, ch->client, 618 {
385 (const struct GNUNET_MessageHeader *) pmsg, 619 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
386 GNUNET_NO); 620 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
387 GNUNET_free (pmsg); 621 /* FIXME: check state flag / max_state_message_id */
622 if (NULL == frag_cache)
623 {
624 message_to_client (ch, mmsg);
625 break;
626 }
627 else
628 {
629 if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size)
630 { /* first data fragment after the header, send cached fragments */
631 fragment_cache_clear (ch, chan_key_hash, &msg_id, frag_cache, GNUNET_YES);
632 message_to_client (ch, mmsg);
633 break;
634 }
635 else
636 { /* still missing fragments from the header, cache data fragment */
637 /* fall thru */
638 }
639 }
640
641 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
642 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
643 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
644 /* not all modifiers arrived yet, cache fragment */
645 fragment_cache_insert (ch, chan_key_hash, &msg_id, frag_cache, mmsg, ptype);
646 break;
647
648 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
649 if (NULL != frag_cache)
650 { /* fragments not yet sent to client, remove from cache */
651 fragment_cache_clear (ch, chan_key_hash, &msg_id, frag_cache, GNUNET_NO);
652 }
653 else
654 {
655 message_to_client (ch, mmsg);
656 }
657 break;
658 }
388 break; 659 break;
389 } 660 }
390 default: 661 default:
@@ -457,8 +728,9 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
457 const struct GNUNET_MULTICAST_RequestHeader *req 728 const struct GNUNET_MULTICAST_RequestHeader *req
458 = (const struct GNUNET_MULTICAST_RequestHeader *) msg; 729 = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
459 730
460 if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*req), 731 /* FIXME: see message_cb() */
461 (const char *) &req[1])) 732 if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req),
733 (const char *) &req[1]))
462 { 734 {
463 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 735 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
464 "%p Dropping message with invalid parts " 736 "%p Dropping message with invalid parts "
@@ -826,7 +1098,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
826 if (GNUNET_YES != ch->ready) 1098 if (GNUNET_YES != ch->ready)
827 { 1099 {
828 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1100 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
829 "%p Ignoring message from client, channel is not ready yet.\n", 1101 "%p Dropping message from client, channel is not ready yet.\n",
830 ch); 1102 ch);
831 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1103 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
832 return; 1104 return;
@@ -912,11 +1184,12 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
912 store = GNUNET_PSYCSTORE_connect (cfg); 1184 store = GNUNET_PSYCSTORE_connect (cfg);
913 stats = GNUNET_STATISTICS_create ("psyc", cfg); 1185 stats = GNUNET_STATISTICS_create ("psyc", cfg);
914 clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1186 clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1187 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
915 nc = GNUNET_SERVER_notification_context_create (server, 1); 1188 nc = GNUNET_SERVER_notification_context_create (server, 1);
916 GNUNET_SERVER_add_handlers (server, handlers); 1189 GNUNET_SERVER_add_handlers (server, handlers);
917 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); 1190 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
918 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, 1191 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
919 NULL); 1192 &shutdown_task, NULL);
920} 1193}
921 1194
922 1195
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 1ffda5d08..940412a32 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -31,8 +31,8 @@
31#include "gnunet_psyc_service.h" 31#include "gnunet_psyc_service.h"
32 32
33 33
34int 34uint16_t
35GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data); 35GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data);
36 36
37void 37void
38GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, 38GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 8a1c9ffaa..16e8106d4 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -336,7 +336,7 @@ recv_error (struct GNUNET_PSYC_Channel *ch)
336 336
337 337
338/** 338/**
339 * Queue an incoming message part for transmission to the PSYC service. 339 * Queue a message part for transmission to the PSYC service.
340 * 340 *
341 * The message part is added to the current message buffer. 341 * The message part is added to the current message buffer.
342 * When this buffer is full, it is added to the transmission queue. 342 * When this buffer is full, it is added to the transmission queue.
@@ -390,7 +390,7 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
390 op->msg->size = sizeof (*op->msg) + size; 390 op->msg->size = sizeof (*op->msg) + size;
391 memcpy (&op->msg[1], msg, size); 391 memcpy (&op->msg[1], msg, size);
392 } 392 }
393 393
394 if (NULL != op 394 if (NULL != op
395 && (GNUNET_YES == end 395 && (GNUNET_YES == end
396 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD 396 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
@@ -433,12 +433,12 @@ channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
433 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; 433 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
434 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); 434 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
435 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); 435 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
436 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, 436 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
437 &data_size, &mod[1], &mod->oper); 437 &mod->oper, &mod->value_size);
438 mod->name_size = strnlen ((char *) &mod[1], data_size); 438 mod->name_size = strnlen ((char *) &mod[1], data_size);
439 if (mod->name_size < data_size) 439 if (mod->name_size < data_size)
440 { 440 {
441 mod->value_size = htons (data_size - 1 - mod->name_size); 441 mod->value_size = htonl (mod->value_size);
442 mod->name_size = htons (mod->name_size); 442 mod->name_size = htons (mod->name_size);
443 } 443 }
444 else if (0 < data_size) 444 else if (0 < data_size)
@@ -451,10 +451,10 @@ channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
451 case MSG_STATE_MOD_CONT: 451 case MSG_STATE_MOD_CONT:
452 { 452 {
453 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; 453 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
454 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); 454 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
455 msg->size = sizeof (struct GNUNET_MessageHeader); 455 msg->size = sizeof (struct GNUNET_MessageHeader);
456 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, 456 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
457 &data_size, &msg[1], NULL); 457 &data_size, &msg[1], NULL, NULL);
458 break; 458 break;
459 } 459 }
460 default: 460 default:
@@ -669,6 +669,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
669 ch->recv_message_id = GNUNET_ntohll (msg->message_id); 669 ch->recv_message_id = GNUNET_ntohll (msg->message_id);
670 ch->recv_flags = flags; 670 ch->recv_flags = flags;
671 ch->recv_slave_key = msg->slave_key; 671 ch->recv_slave_key = msg->slave_key;
672 ch->recv_mod_value_size = 0;
673 ch->recv_mod_value_size_expected = 0;
672 } 674 }
673 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) 675 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
674 { 676 {
@@ -703,7 +705,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
703 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) 705 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
704 { 706 {
705 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 707 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
706 "Discarding message of type %u with invalid size %u.\n", 708 "Dropping message of type %u with invalid size %u.\n",
707 ptype, psize); 709 ptype, psize);
708 recv_error (ch); 710 recv_error (ch);
709 return; 711 return;
@@ -753,7 +755,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
753 if (MSG_STATE_START != ch->recv_state) 755 if (MSG_STATE_START != ch->recv_state)
754 { 756 {
755 LOG (GNUNET_ERROR_TYPE_WARNING, 757 LOG (GNUNET_ERROR_TYPE_WARNING,
756 "Discarding out of order message method.\n"); 758 "Dropping out of order message method (%u).\n",
759 ch->recv_state);
757 /* It is normal to receive an incomplete message right after connecting, 760 /* It is normal to receive an incomplete message right after connecting,
758 * but should not happen later. 761 * but should not happen later.
759 * FIXME: add a check for this condition. 762 * FIXME: add a check for this condition.
@@ -766,7 +769,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
766 if ('\0' != *((char *) meth + psize - 1)) 769 if ('\0' != *((char *) meth + psize - 1))
767 { 770 {
768 LOG (GNUNET_ERROR_TYPE_WARNING, 771 LOG (GNUNET_ERROR_TYPE_WARNING,
769 "Discarding message with malformed method. " 772 "Dropping message with malformed method. "
770 "Message ID: %" PRIu64 "\n", ch->recv_message_id); 773 "Message ID: %" PRIu64 "\n", ch->recv_message_id);
771 GNUNET_break_op (0); 774 GNUNET_break_op (0);
772 recv_error (ch); 775 recv_error (ch);
@@ -782,7 +785,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
782 || MSG_STATE_MOD_CONT == ch->recv_state)) 785 || MSG_STATE_MOD_CONT == ch->recv_state))
783 { 786 {
784 LOG (GNUNET_ERROR_TYPE_WARNING, 787 LOG (GNUNET_ERROR_TYPE_WARNING,
785 "Discarding out of order message modifier.\n"); 788 "Dropping out of order message modifier (%u).\n",
789 ch->recv_state);
786 GNUNET_break_op (0); 790 GNUNET_break_op (0);
787 recv_error (ch); 791 recv_error (ch);
788 return; 792 return;
@@ -792,14 +796,14 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
792 = (struct GNUNET_PSYC_MessageModifier *) pmsg; 796 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
793 797
794 uint16_t name_size = ntohs (mod->name_size); 798 uint16_t name_size = ntohs (mod->name_size);
795 ch->recv_mod_value_size_expected = ntohs (mod->value_size); 799 ch->recv_mod_value_size_expected = ntohl (mod->value_size);
796 ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; 800 ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
797 801
798 if (psize < sizeof (*mod) + name_size + 1 802 if (psize < sizeof (*mod) + name_size + 1
799 || '\0' != *((char *) &mod[1] + name_size) 803 || '\0' != *((char *) &mod[1] + name_size)
800 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) 804 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
801 { 805 {
802 LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); 806 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
803 GNUNET_break_op (0); 807 GNUNET_break_op (0);
804 recv_error (ch); 808 recv_error (ch);
805 return; 809 return;
@@ -816,7 +820,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
816 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) 820 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
817 { 821 {
818 LOG (GNUNET_ERROR_TYPE_WARNING, 822 LOG (GNUNET_ERROR_TYPE_WARNING,
819 "Discarding out of order message modifier continuation.\n"); 823 "Dropping out of order message modifier continuation "
824 "!(%u == %u || %u == %u) || %lu < %lu.\n",
825 MSG_STATE_MODIFIER, ch->recv_state,
826 MSG_STATE_MOD_CONT, ch->recv_state,
827 ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
820 GNUNET_break_op (0); 828 GNUNET_break_op (0);
821 recv_error (ch); 829 recv_error (ch);
822 return; 830 return;
@@ -829,7 +837,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
829 || ch->recv_mod_value_size_expected != ch->recv_mod_value_size) 837 || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
830 { 838 {
831 LOG (GNUNET_ERROR_TYPE_WARNING, 839 LOG (GNUNET_ERROR_TYPE_WARNING,
832 "Discarding out of order message data fragment.\n"); 840 "Dropping out of order message data fragment "
841 "(%u < %u || %lu != %lu).\n",
842 ch->recv_state, MSG_STATE_METHOD,
843 ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
844
833 GNUNET_break_op (0); 845 GNUNET_break_op (0);
834 recv_error (ch); 846 recv_error (ch);
835 return; 847 return;
@@ -1412,7 +1424,7 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
1412 * @param th Handle of the request that is being resumed. 1424 * @param th Handle of the request that is being resumed.
1413 */ 1425 */
1414void 1426void
1415GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 1427GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1416{ 1428{
1417 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); 1429 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1418} 1430}
diff --git a/src/psyc/psyc_common.c b/src/psyc/psyc_common.c
index 7368011fc..bf5643ff2 100644
--- a/src/psyc/psyc_common.c
+++ b/src/psyc/psyc_common.c
@@ -33,28 +33,33 @@
33 * @param data_size Size of @a data. 33 * @param data_size Size of @a data.
34 * @param data Data. 34 * @param data Data.
35 * 35 *
36 * @return GNUNET_YES or GNUNET_NO 36 * @return Message type number
37 * or GNUNET_NO if the message contains invalid or no parts.
37 */ 38 */
38int 39uint16_t
39GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data) 40GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data)
40{ 41{
41 const struct GNUNET_MessageHeader *pmsg; 42 const struct GNUNET_MessageHeader *pmsg;
43 uint16_t ptype = GNUNET_NO;
42 uint16_t psize = 0; 44 uint16_t psize = 0;
43 uint16_t pos = 0; 45 uint16_t pos = 0;
44 46
45 for (pos = 0; data_size + pos < data_size; pos += psize) 47 for (pos = 0; pos < data_size; pos += psize)
46 { 48 {
47 pmsg = (const struct GNUNET_MessageHeader *) (data + pos); 49 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
48 psize = ntohs (pmsg->size); 50 psize = ntohs (pmsg->size);
49 if (psize < sizeof (*pmsg) || data_size + pos + psize > data_size) 51 ptype = ntohs (pmsg->type);
52 if (psize < sizeof (*pmsg) || pos + psize > data_size
53 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
54 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
50 { 55 {
51 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 56 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
52 "Invalid message part of type %u and size %u.", 57 "Invalid message part of type %u and size %u.\n",
53 ntohs (pmsg->type), psize); 58 ptype, psize);
54 return GNUNET_NO; 59 return GNUNET_NO;
55 } 60 }
56 } 61 }
57 return GNUNET_YES; 62 return ptype;
58} 63}
59 64
60 65
@@ -89,7 +94,8 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
89 uint16_t name_size = ntohs (mod->name_size); 94 uint16_t name_size = ntohs (mod->name_size);
90 char oper = ' ' < mod->oper ? mod->oper : ' '; 95 char oper = ' ' < mod->oper ? mod->oper : ' ';
91 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1], 96 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
92 ntohs (mod->value_size), ((char *) &mod[1]) + name_size + 1); 97 size - sizeof (*mod) - name_size - 1,
98 ((char *) &mod[1]) + name_size + 1);
93 break; 99 break;
94 } 100 }
95 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: 101 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 88947be60..360d56c06 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -35,7 +35,7 @@
35#include "gnunet_env_lib.h" 35#include "gnunet_env_lib.h"
36#include "gnunet_psyc_service.h" 36#include "gnunet_psyc_service.h"
37 37
38#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) 38#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
39 39
40#define DEBUG_SERVICE 1 40#define DEBUG_SERVICE 1
41 41
@@ -72,6 +72,7 @@ struct TransmitClosure
72 char *data[16]; 72 char *data[16];
73 const char *mod_value; 73 const char *mod_value;
74 size_t mod_value_size; 74 size_t mod_value_size;
75 uint8_t data_delay[16];
75 uint8_t data_count; 76 uint8_t data_count;
76 uint8_t paused; 77 uint8_t paused;
77 uint8_t n; 78 uint8_t n;
@@ -259,13 +260,16 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
259{ 260{
260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); 261 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
261 struct TransmitClosure *tmit = cls; 262 struct TransmitClosure *tmit = cls;
262 tmit->paused = GNUNET_NO; 263 if (NULL != tmit->mst_tmit)
263 GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit); 264 GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
265 else
266 GNUNET_PSYC_slave_transmit_resume (tmit->slv_tmit);
264} 267}
265 268
266 269
267static int 270static int
268tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper) 271tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
272 uint32_t *full_value_size)
269{ 273{
270 struct TransmitClosure *tmit = cls; 274 struct TransmitClosure *tmit = cls;
271 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 275 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -288,6 +292,8 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper)
288 return GNUNET_YES; 292 return GNUNET_YES;
289 } 293 }
290 294
295 GNUNET_assert (value_size < UINT32_MAX);
296 *full_value_size = value_size;
291 *oper = op; 297 *oper = op;
292 name_size = strlen (name); 298 name_size = strlen (name);
293 299
@@ -351,7 +357,7 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data)
351 357
352 uint16_t size = strlen (tmit->data[tmit->n]); 358 uint16_t size = strlen (tmit->data[tmit->n]);
353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
354 "Transmit notify data: %lu bytes available, " 360 "Transmit notify data: %u bytes available, "
355 "processing fragment %u/%u (size %u).\n", 361 "processing fragment %u/%u (size %u).\n",
356 *data_size, tmit->n + 1, tmit->data_count, size); 362 *data_size, tmit->n + 1, tmit->data_count, size);
357 if (*data_size < size) 363 if (*data_size < size)
@@ -361,17 +367,18 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data)
361 return GNUNET_SYSERR; 367 return GNUNET_SYSERR;
362 } 368 }
363 369
364 if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1) 370 if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
365 { 371 {
366 /* Send last fragment later. */
367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); 372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
368 tmit->paused = GNUNET_YES; 373 tmit->paused = GNUNET_YES;
369 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 374 GNUNET_SCHEDULER_add_delayed (
370 (GNUNET_TIME_UNIT_SECONDS, 3), 375 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
371 &transmit_resume, tmit); 376 tmit->data_delay[tmit->n]),
377 &transmit_resume, tmit);
372 *data_size = 0; 378 *data_size = 0;
373 return GNUNET_NO; 379 return GNUNET_NO;
374 } 380 }
381 tmit->paused = GNUNET_NO;
375 382
376 *data_size = size; 383 *data_size = size;
377 memcpy (data, tmit->data[tmit->n], size); 384 memcpy (data, tmit->data[tmit->n], size);
@@ -416,8 +423,9 @@ slave_join ()
416 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, 423 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
417 "_foo_bar", "foo bar baz", 11); 424 "_foo_bar", "foo bar baz", 11);
418 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, 425 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
419 16, relays, &slave_message, &join_request, &slave_joined, 426 16, relays, &slave_message, &join_request,
420 NULL, "_request_join", env, "some data", 9); 427 &slave_joined, NULL, "_request_join", env,
428 "some data", 9);
421 GNUNET_ENV_environment_destroy (env); 429 GNUNET_ENV_environment_destroy (env);
422} 430}
423 431
@@ -427,17 +435,45 @@ master_transmit ()
427{ 435{
428 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n"); 436 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
429 test = TEST_MASTER_TRANSMIT; 437 test = TEST_MASTER_TRANSMIT;
438 uint32_t i, j;
439
440 char *name_max = "_test_max";
441 uint8_t name_max_size = sizeof ("_test_max");
442 char *val_max = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD);
443 for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; i++)
444 val_max[i] = (0 == i % 10000) ? '0' + i / 10000 : '.';
445
446 char *name_cont = "_test_cont";
447 uint8_t name_cont_size = sizeof ("_test_cont");
448 char *val_cont = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD
449 + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
450 for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size; i++)
451 val_cont[i] = (0 == i % 10000) ? '0' + i / 10000 : ':';
452 for (j = 0; j < GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; j++, i++)
453 val_cont[i] = (0 == j % 10000) ? '0' + j / 10000 : '!';
430 454
431 tmit = GNUNET_new (struct TransmitClosure); 455 tmit = GNUNET_new (struct TransmitClosure);
432 tmit->env = GNUNET_ENV_environment_create (); 456 tmit->env = GNUNET_ENV_environment_create ();
433 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, 457 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
434 "_foo", "bar baz", 7); 458 "_foo", "bar baz", 7);
435 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, 459 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
460 name_max, val_max,
461 GNUNET_PSYC_MODIFIER_MAX_PAYLOAD
462 - name_max_size);
463 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
436 "_foo_bar", "foo bar baz", 11); 464 "_foo_bar", "foo bar baz", 11);
465 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
466 name_cont, val_cont,
467 GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size
468 + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
437 tmit->data[0] = "foo"; 469 tmit->data[0] = "foo";
438 tmit->data[1] = "foo bar"; 470 tmit->data[1] = GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1);
439 tmit->data[2] = "foo bar baz"; 471 for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++)
440 tmit->data_count = 3; 472 tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_';
473 tmit->data[2] = "foo bar";
474 tmit->data[3] = "foo bar baz";
475 tmit->data_delay[1] = 3;
476 tmit->data_count = 4;
441 tmit->mst_tmit 477 tmit->mst_tmit
442 = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod, 478 = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod,
443 tmit_notify_data, tmit, 479 tmit_notify_data, tmit,