aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c315
-rw-r--r--src/psyc/psyc.h4
-rw-r--r--src/psyc/psyc_api.c44
-rw-r--r--src/psyc/psyc_common.c24
-rw-r--r--src/psyc/test_psyc.c66
5 files changed, 390 insertions, 63 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index dcb5031f1..4cdb490a1 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -24,6 +24,8 @@
24 * @author Gabor X Toth 24 * @author Gabor X Toth
25 */ 25 */
26 26
27#include <inttypes.h>
28
27#include "platform.h" 29#include "platform.h"
28#include "gnunet_util_lib.h" 30#include "gnunet_util_lib.h"
29#include "gnunet_constants.h" 31#include "gnunet_constants.h"
@@ -77,6 +79,45 @@ struct TransmitMessage
77 uint8_t state; 79 uint8_t state;
78}; 80};
79 81
82
83/**
84 * Cache for received message fragments.
85 * Message fragments are only sent to clients after all modifiers arrived.
86 *
87 * chan_key -> MultiHashMap chan_msgs
88 */
89static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
90
91
92/**
93 * Entry in the chan_msgs hashmap of @a recv_cache:
94 * fragment_id -> FragmentEntry
95 */
96struct FragmentEntry
97{
98 struct GNUNET_MULTICAST_MessageHeader *mmsg;
99 uint16_t ref_count;
100};
101
102
103/**
104 * Entry in the @a recv_msgs hash map of a @a Channel.
105 * message_id -> FragmentCache
106 */
107struct FragmentCache
108{
109 /**
110 * Total size of header fragments (METHOD & MODIFIERs)
111 */
112 uint64_t header_size;
113
114 /**
115 * Fragment IDs stored in @a recv_cache.
116 */
117 struct GNUNET_CONTAINER_Heap *fragments;
118};
119
120
80/** 121/**
81 * Common part of the client context for both a master and slave channel. 122 * Common part of the client context for both a master and slave channel.
82 */ 123 */
@@ -87,6 +128,12 @@ struct Channel
87 struct TransmitMessage *tmit_head; 128 struct TransmitMessage *tmit_head;
88 struct TransmitMessage *tmit_tail; 129 struct TransmitMessage *tmit_tail;
89 130
131 /**
132 * Received fragments not yet sent to the client.
133 * message_id -> FragmentCache
134 */
135 struct GNUNET_CONTAINER_MultiHashMap *recv_msgs;
136
90 GNUNET_SCHEDULER_TaskIdentifier tmit_task; 137 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
91 138
92 /** 139 /**
@@ -213,6 +260,8 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
213static void 260static void
214client_cleanup (struct Channel *ch) 261client_cleanup (struct Channel *ch)
215{ 262{
263 /* FIXME: fragment_cache_clear */
264
216 if (ch->is_master) 265 if (ch->is_master)
217 { 266 {
218 struct Master *mst = (struct Master *) ch; 267 struct Master *mst = (struct Master *) ch;
@@ -323,6 +372,189 @@ fragment_store_result (void *cls, int64_t result, const char *err_msg)
323} 372}
324 373
325 374
375static void
376message_to_client (struct Channel *ch,
377 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
378{
379 uint16_t size = ntohs (mmsg->header.size);
380 struct GNUNET_PSYC_MessageHeader *pmsg;
381 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
382
383 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384 "%p Sending message to client. "
385 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
386 ch, GNUNET_ntohll (mmsg->fragment_id),
387 GNUNET_ntohll (mmsg->message_id));
388
389 pmsg = GNUNET_malloc (psize);
390 pmsg->header.size = htons (psize);
391 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
392 pmsg->message_id = mmsg->message_id;
393
394 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
395
396 GNUNET_SERVER_notification_context_add (nc, ch->client);
397 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
398 (const struct GNUNET_MessageHeader *) pmsg,
399 GNUNET_NO);
400 GNUNET_free (pmsg);
401}
402
403
404/**
405 * Convert an uint64_t in network byte order to a HashCode
406 * that can be used as key in a MultiHashMap
407 */
408static inline void
409hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
410{
411 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
412
413 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
414 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
415
416 *key = (struct GNUNET_HashCode) {{ 0 }};
417 *((uint64_t *) key)
418 = (n << 32) | (n >> 32);
419}
420
421
422/**
423 * Convert an uint64_t in host byte order to a HashCode
424 * that can be used as key in a MultiHashMap
425 */
426static inline void
427hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
428{
429#if __BYTE_ORDER == __BIG_ENDIAN
430 hash_key_from_nll (key, n);
431#elif __BYTE_ORDER == __LITTLE_ENDIAN
432 *key = (struct GNUNET_HashCode) {{ 0 }};
433 *((uint64_t *) key) = n;
434#else
435 #error byteorder undefined
436#endif
437}
438
439
440static void
441fragment_cache_insert (struct Channel *ch,
442 const struct GNUNET_HashCode *chan_key_hash,
443 const struct GNUNET_HashCode *msg_id,
444 struct FragmentCache *frag_cache,
445 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
446 uint16_t last_part_type)
447{
448 uint16_t size = ntohs (mmsg->header.size);
449 struct GNUNET_CONTAINER_MultiHashMap
450 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, chan_key_hash);
451
452 if (NULL == frag_cache)
453 {
454 frag_cache = GNUNET_new (struct FragmentCache);
455 frag_cache->fragments
456 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
457
458 if (NULL == ch->recv_msgs)
459 {
460 ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
461 }
462 GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache,
463 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
464
465 if (NULL == chan_msgs)
466 {
467 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
468 GNUNET_CONTAINER_multihashmap_put (recv_cache, chan_key_hash, chan_msgs,
469 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
470 }
471 }
472
473 struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode);
474 hash_key_from_nll (frag_id, mmsg->fragment_id);
475 struct FragmentEntry
476 *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
477 if (NULL == frag_entry)
478 {
479 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
480 "%p Adding message fragment to cache. "
481 "fragment_id: %" PRIu64 ", "
482 "header_size: %" PRIu64 " + %" PRIu64 ").\n",
483 ch, GNUNET_ntohll (mmsg->fragment_id),
484 frag_cache->header_size, size);
485 frag_entry = GNUNET_new (struct FragmentEntry);
486 frag_entry->ref_count = 1;
487 frag_entry->mmsg = GNUNET_malloc (size);
488 memcpy (frag_entry->mmsg, mmsg, size);
489 GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry,
490 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
491 }
492 else
493 {
494 frag_entry->ref_count++;
495 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
496 "%p Message fragment already in cache. "
497 "fragment_id: %" PRIu64 ", ref_count: %u\n",
498 ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count);
499 }
500
501 switch (last_part_type)
502 {
503 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
504 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
505 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
506 frag_cache->header_size += size;
507 }
508 GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id,
509 GNUNET_ntohll (mmsg->fragment_id));
510}
511
512
513static void
514fragment_cache_clear (struct Channel *ch,
515 const struct GNUNET_HashCode *chan_key_hash,
516 const struct GNUNET_HashCode *msg_id,
517 struct FragmentCache *frag_cache,
518 uint8_t send_to_client)
519{
520 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521 "%p Clearing message fragment cache.\n", ch);
522
523 struct GNUNET_CONTAINER_MultiHashMap
524 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, chan_key_hash);
525 GNUNET_assert (NULL != chan_msgs);
526 struct GNUNET_HashCode *frag_id;
527
528 while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments)))
529 {
530 struct FragmentEntry
531 *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
532 if (frag_entry != NULL)
533 {
534 if (GNUNET_YES == send_to_client)
535 {
536 message_to_client (ch, frag_entry->mmsg);
537 }
538 if (1 == frag_entry->ref_count)
539 {
540 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry);
541 GNUNET_free (frag_entry->mmsg);
542 GNUNET_free (frag_entry);
543 }
544 else
545 {
546 frag_entry->ref_count--;
547 }
548 }
549 GNUNET_free (frag_id);
550 }
551
552 GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache);
553 GNUNET_CONTAINER_heap_destroy (frag_cache->fragments);
554 GNUNET_free (frag_cache);
555}
556
557
326/** 558/**
327 * Incoming message fragment from multicast. 559 * Incoming message fragment from multicast.
328 * 560 *
@@ -358,11 +590,15 @@ message_cb (struct Channel *ch,
358 rcb, rcb_cls); 590 rcb, rcb_cls);
359#endif 591#endif
360 592
361 const struct GNUNET_MULTICAST_MessageHeader *mmsg 593 const struct GNUNET_MULTICAST_MessageHeader
362 = (const struct GNUNET_MULTICAST_MessageHeader *) msg; 594 *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
363 595
364 if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg), 596 uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg),
365 (const char *) &mmsg[1])) 597 (const char *) &mmsg[1]);
598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599 "Last message part type %u\n", ptype);
600
601 if (GNUNET_NO == ptype)
366 { 602 {
367 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 603 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
368 "%p Received message with invalid parts from multicast. " 604 "%p Received message with invalid parts from multicast. "
@@ -371,20 +607,55 @@ message_cb (struct Channel *ch,
371 break; 607 break;
372 } 608 }
373 609
374 struct GNUNET_PSYC_MessageHeader *pmsg; 610 struct GNUNET_HashCode msg_id;
375 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); 611 hash_key_from_nll (&msg_id, mmsg->message_id);
376 pmsg = GNUNET_malloc (psize);
377 pmsg->header.size = htons (psize);
378 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
379 pmsg->message_id = mmsg->message_id;
380 612
381 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); 613 struct FragmentCache *frag_cache = NULL;
614 if (NULL != ch->recv_msgs)
615 frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id);
382 616
383 GNUNET_SERVER_notification_context_add (nc, ch->client); 617 switch (ptype)
384 GNUNET_SERVER_notification_context_unicast (nc, ch->client, 618 {
385 (const struct GNUNET_MessageHeader *) pmsg, 619 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
386 GNUNET_NO); 620 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
387 GNUNET_free (pmsg); 621 /* FIXME: check state flag / max_state_message_id */
622 if (NULL == frag_cache)
623 {
624 message_to_client (ch, mmsg);
625 break;
626 }
627 else
628 {
629 if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size)
630 { /* first data fragment after the header, send cached fragments */
631 fragment_cache_clear (ch, chan_key_hash, &msg_id, frag_cache, GNUNET_YES);
632 message_to_client (ch, mmsg);
633 break;
634 }
635 else
636 { /* still missing fragments from the header, cache data fragment */
637 /* fall thru */
638 }
639 }
640
641 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
642 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
643 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
644 /* not all modifiers arrived yet, cache fragment */
645 fragment_cache_insert (ch, chan_key_hash, &msg_id, frag_cache, mmsg, ptype);
646 break;
647
648 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
649 if (NULL != frag_cache)
650 { /* fragments not yet sent to client, remove from cache */
651 fragment_cache_clear (ch, chan_key_hash, &msg_id, frag_cache, GNUNET_NO);
652 }
653 else
654 {
655 message_to_client (ch, mmsg);
656 }
657 break;
658 }
388 break; 659 break;
389 } 660 }
390 default: 661 default:
@@ -457,8 +728,9 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
457 const struct GNUNET_MULTICAST_RequestHeader *req 728 const struct GNUNET_MULTICAST_RequestHeader *req
458 = (const struct GNUNET_MULTICAST_RequestHeader *) msg; 729 = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
459 730
460 if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*req), 731 /* FIXME: see message_cb() */
461 (const char *) &req[1])) 732 if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req),
733 (const char *) &req[1]))
462 { 734 {
463 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 735 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
464 "%p Dropping message with invalid parts " 736 "%p Dropping message with invalid parts "
@@ -826,7 +1098,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
826 if (GNUNET_YES != ch->ready) 1098 if (GNUNET_YES != ch->ready)
827 { 1099 {
828 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1100 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
829 "%p Ignoring message from client, channel is not ready yet.\n", 1101 "%p Dropping message from client, channel is not ready yet.\n",
830 ch); 1102 ch);
831 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1103 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
832 return; 1104 return;
@@ -912,11 +1184,12 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
912 store = GNUNET_PSYCSTORE_connect (cfg); 1184 store = GNUNET_PSYCSTORE_connect (cfg);
913 stats = GNUNET_STATISTICS_create ("psyc", cfg); 1185 stats = GNUNET_STATISTICS_create ("psyc", cfg);
914 clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1186 clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1187 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
915 nc = GNUNET_SERVER_notification_context_create (server, 1); 1188 nc = GNUNET_SERVER_notification_context_create (server, 1);
916 GNUNET_SERVER_add_handlers (server, handlers); 1189 GNUNET_SERVER_add_handlers (server, handlers);
917 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); 1190 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
918 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, 1191 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
919 NULL); 1192 &shutdown_task, NULL);
920} 1193}
921 1194
922 1195
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 1ffda5d08..940412a32 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -31,8 +31,8 @@
31#include "gnunet_psyc_service.h" 31#include "gnunet_psyc_service.h"
32 32
33 33
34int 34uint16_t
35GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data); 35GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data);
36 36
37void 37void
38GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, 38GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 8a1c9ffaa..16e8106d4 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -336,7 +336,7 @@ recv_error (struct GNUNET_PSYC_Channel *ch)
336 336
337 337
338/** 338/**
339 * Queue an incoming message part for transmission to the PSYC service. 339 * Queue a message part for transmission to the PSYC service.
340 * 340 *
341 * The message part is added to the current message buffer. 341 * The message part is added to the current message buffer.
342 * When this buffer is full, it is added to the transmission queue. 342 * When this buffer is full, it is added to the transmission queue.
@@ -390,7 +390,7 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
390 op->msg->size = sizeof (*op->msg) + size; 390 op->msg->size = sizeof (*op->msg) + size;
391 memcpy (&op->msg[1], msg, size); 391 memcpy (&op->msg[1], msg, size);
392 } 392 }
393 393
394 if (NULL != op 394 if (NULL != op
395 && (GNUNET_YES == end 395 && (GNUNET_YES == end
396 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD 396 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
@@ -433,12 +433,12 @@ channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
433 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; 433 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
434 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); 434 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
435 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); 435 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
436 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, 436 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
437 &data_size, &mod[1], &mod->oper); 437 &mod->oper, &mod->value_size);
438 mod->name_size = strnlen ((char *) &mod[1], data_size); 438 mod->name_size = strnlen ((char *) &mod[1], data_size);
439 if (mod->name_size < data_size) 439 if (mod->name_size < data_size)
440 { 440 {
441 mod->value_size = htons (data_size - 1 - mod->name_size); 441 mod->value_size = htonl (mod->value_size);
442 mod->name_size = htons (mod->name_size); 442 mod->name_size = htons (mod->name_size);
443 } 443 }
444 else if (0 < data_size) 444 else if (0 < data_size)
@@ -451,10 +451,10 @@ channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
451 case MSG_STATE_MOD_CONT: 451 case MSG_STATE_MOD_CONT:
452 { 452 {
453 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; 453 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
454 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); 454 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
455 msg->size = sizeof (struct GNUNET_MessageHeader); 455 msg->size = sizeof (struct GNUNET_MessageHeader);
456 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, 456 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
457 &data_size, &msg[1], NULL); 457 &data_size, &msg[1], NULL, NULL);
458 break; 458 break;
459 } 459 }
460 default: 460 default:
@@ -669,6 +669,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
669 ch->recv_message_id = GNUNET_ntohll (msg->message_id); 669 ch->recv_message_id = GNUNET_ntohll (msg->message_id);
670 ch->recv_flags = flags; 670 ch->recv_flags = flags;
671 ch->recv_slave_key = msg->slave_key; 671 ch->recv_slave_key = msg->slave_key;
672 ch->recv_mod_value_size = 0;
673 ch->recv_mod_value_size_expected = 0;
672 } 674 }
673 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) 675 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
674 { 676 {
@@ -703,7 +705,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
703 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) 705 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
704 { 706 {
705 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 707 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
706 "Discarding message of type %u with invalid size %u.\n", 708 "Dropping message of type %u with invalid size %u.\n",
707 ptype, psize); 709 ptype, psize);
708 recv_error (ch); 710 recv_error (ch);
709 return; 711 return;
@@ -753,7 +755,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
753 if (MSG_STATE_START != ch->recv_state) 755 if (MSG_STATE_START != ch->recv_state)
754 { 756 {
755 LOG (GNUNET_ERROR_TYPE_WARNING, 757 LOG (GNUNET_ERROR_TYPE_WARNING,
756 "Discarding out of order message method.\n"); 758 "Dropping out of order message method (%u).\n",
759 ch->recv_state);
757 /* It is normal to receive an incomplete message right after connecting, 760 /* It is normal to receive an incomplete message right after connecting,
758 * but should not happen later. 761 * but should not happen later.
759 * FIXME: add a check for this condition. 762 * FIXME: add a check for this condition.
@@ -766,7 +769,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
766 if ('\0' != *((char *) meth + psize - 1)) 769 if ('\0' != *((char *) meth + psize - 1))
767 { 770 {
768 LOG (GNUNET_ERROR_TYPE_WARNING, 771 LOG (GNUNET_ERROR_TYPE_WARNING,
769 "Discarding message with malformed method. " 772 "Dropping message with malformed method. "
770 "Message ID: %" PRIu64 "\n", ch->recv_message_id); 773 "Message ID: %" PRIu64 "\n", ch->recv_message_id);
771 GNUNET_break_op (0); 774 GNUNET_break_op (0);
772 recv_error (ch); 775 recv_error (ch);
@@ -782,7 +785,8 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
782 || MSG_STATE_MOD_CONT == ch->recv_state)) 785 || MSG_STATE_MOD_CONT == ch->recv_state))
783 { 786 {
784 LOG (GNUNET_ERROR_TYPE_WARNING, 787 LOG (GNUNET_ERROR_TYPE_WARNING,
785 "Discarding out of order message modifier.\n"); 788 "Dropping out of order message modifier (%u).\n",
789 ch->recv_state);
786 GNUNET_break_op (0); 790 GNUNET_break_op (0);
787 recv_error (ch); 791 recv_error (ch);
788 return; 792 return;
@@ -792,14 +796,14 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
792 = (struct GNUNET_PSYC_MessageModifier *) pmsg; 796 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
793 797
794 uint16_t name_size = ntohs (mod->name_size); 798 uint16_t name_size = ntohs (mod->name_size);
795 ch->recv_mod_value_size_expected = ntohs (mod->value_size); 799 ch->recv_mod_value_size_expected = ntohl (mod->value_size);
796 ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; 800 ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
797 801
798 if (psize < sizeof (*mod) + name_size + 1 802 if (psize < sizeof (*mod) + name_size + 1
799 || '\0' != *((char *) &mod[1] + name_size) 803 || '\0' != *((char *) &mod[1] + name_size)
800 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) 804 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
801 { 805 {
802 LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); 806 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
803 GNUNET_break_op (0); 807 GNUNET_break_op (0);
804 recv_error (ch); 808 recv_error (ch);
805 return; 809 return;
@@ -816,7 +820,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
816 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) 820 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
817 { 821 {
818 LOG (GNUNET_ERROR_TYPE_WARNING, 822 LOG (GNUNET_ERROR_TYPE_WARNING,
819 "Discarding out of order message modifier continuation.\n"); 823 "Dropping out of order message modifier continuation "
824 "!(%u == %u || %u == %u) || %lu < %lu.\n",
825 MSG_STATE_MODIFIER, ch->recv_state,
826 MSG_STATE_MOD_CONT, ch->recv_state,
827 ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
820 GNUNET_break_op (0); 828 GNUNET_break_op (0);
821 recv_error (ch); 829 recv_error (ch);
822 return; 830 return;
@@ -829,7 +837,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
829 || ch->recv_mod_value_size_expected != ch->recv_mod_value_size) 837 || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
830 { 838 {
831 LOG (GNUNET_ERROR_TYPE_WARNING, 839 LOG (GNUNET_ERROR_TYPE_WARNING,
832 "Discarding out of order message data fragment.\n"); 840 "Dropping out of order message data fragment "
841 "(%u < %u || %lu != %lu).\n",
842 ch->recv_state, MSG_STATE_METHOD,
843 ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
844
833 GNUNET_break_op (0); 845 GNUNET_break_op (0);
834 recv_error (ch); 846 recv_error (ch);
835 return; 847 return;
@@ -1412,7 +1424,7 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
1412 * @param th Handle of the request that is being resumed. 1424 * @param th Handle of the request that is being resumed.
1413 */ 1425 */
1414void 1426void
1415GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 1427GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1416{ 1428{
1417 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); 1429 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1418} 1430}
diff --git a/src/psyc/psyc_common.c b/src/psyc/psyc_common.c
index 7368011fc..bf5643ff2 100644
--- a/src/psyc/psyc_common.c
+++ b/src/psyc/psyc_common.c
@@ -33,28 +33,33 @@
33 * @param data_size Size of @a data. 33 * @param data_size Size of @a data.
34 * @param data Data. 34 * @param data Data.
35 * 35 *
36 * @return GNUNET_YES or GNUNET_NO 36 * @return Message type number
37 * or GNUNET_NO if the message contains invalid or no parts.
37 */ 38 */
38int 39uint16_t
39GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data) 40GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data)
40{ 41{
41 const struct GNUNET_MessageHeader *pmsg; 42 const struct GNUNET_MessageHeader *pmsg;
43 uint16_t ptype = GNUNET_NO;
42 uint16_t psize = 0; 44 uint16_t psize = 0;
43 uint16_t pos = 0; 45 uint16_t pos = 0;
44 46
45 for (pos = 0; data_size + pos < data_size; pos += psize) 47 for (pos = 0; pos < data_size; pos += psize)
46 { 48 {
47 pmsg = (const struct GNUNET_MessageHeader *) (data + pos); 49 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
48 psize = ntohs (pmsg->size); 50 psize = ntohs (pmsg->size);
49 if (psize < sizeof (*pmsg) || data_size + pos + psize > data_size) 51 ptype = ntohs (pmsg->type);
52 if (psize < sizeof (*pmsg) || pos + psize > data_size
53 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
54 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
50 { 55 {
51 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 56 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
52 "Invalid message part of type %u and size %u.", 57 "Invalid message part of type %u and size %u.\n",
53 ntohs (pmsg->type), psize); 58 ptype, psize);
54 return GNUNET_NO; 59 return GNUNET_NO;
55 } 60 }
56 } 61 }
57 return GNUNET_YES; 62 return ptype;
58} 63}
59 64
60 65
@@ -89,7 +94,8 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
89 uint16_t name_size = ntohs (mod->name_size); 94 uint16_t name_size = ntohs (mod->name_size);
90 char oper = ' ' < mod->oper ? mod->oper : ' '; 95 char oper = ' ' < mod->oper ? mod->oper : ' ';
91 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1], 96 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
92 ntohs (mod->value_size), ((char *) &mod[1]) + name_size + 1); 97 size - sizeof (*mod) - name_size - 1,
98 ((char *) &mod[1]) + name_size + 1);
93 break; 99 break;
94 } 100 }
95 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: 101 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 88947be60..360d56c06 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -35,7 +35,7 @@
35#include "gnunet_env_lib.h" 35#include "gnunet_env_lib.h"
36#include "gnunet_psyc_service.h" 36#include "gnunet_psyc_service.h"
37 37
38#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) 38#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
39 39
40#define DEBUG_SERVICE 1 40#define DEBUG_SERVICE 1
41 41
@@ -72,6 +72,7 @@ struct TransmitClosure
72 char *data[16]; 72 char *data[16];
73 const char *mod_value; 73 const char *mod_value;
74 size_t mod_value_size; 74 size_t mod_value_size;
75 uint8_t data_delay[16];
75 uint8_t data_count; 76 uint8_t data_count;
76 uint8_t paused; 77 uint8_t paused;
77 uint8_t n; 78 uint8_t n;
@@ -259,13 +260,16 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
259{ 260{
260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); 261 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
261 struct TransmitClosure *tmit = cls; 262 struct TransmitClosure *tmit = cls;
262 tmit->paused = GNUNET_NO; 263 if (NULL != tmit->mst_tmit)
263 GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit); 264 GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
265 else
266 GNUNET_PSYC_slave_transmit_resume (tmit->slv_tmit);
264} 267}
265 268
266 269
267static int 270static int
268tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper) 271tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
272 uint32_t *full_value_size)
269{ 273{
270 struct TransmitClosure *tmit = cls; 274 struct TransmitClosure *tmit = cls;
271 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 275 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -288,6 +292,8 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper)
288 return GNUNET_YES; 292 return GNUNET_YES;
289 } 293 }
290 294
295 GNUNET_assert (value_size < UINT32_MAX);
296 *full_value_size = value_size;
291 *oper = op; 297 *oper = op;
292 name_size = strlen (name); 298 name_size = strlen (name);
293 299
@@ -351,7 +357,7 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data)
351 357
352 uint16_t size = strlen (tmit->data[tmit->n]); 358 uint16_t size = strlen (tmit->data[tmit->n]);
353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
354 "Transmit notify data: %lu bytes available, " 360 "Transmit notify data: %u bytes available, "
355 "processing fragment %u/%u (size %u).\n", 361 "processing fragment %u/%u (size %u).\n",
356 *data_size, tmit->n + 1, tmit->data_count, size); 362 *data_size, tmit->n + 1, tmit->data_count, size);
357 if (*data_size < size) 363 if (*data_size < size)
@@ -361,17 +367,18 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data)
361 return GNUNET_SYSERR; 367 return GNUNET_SYSERR;
362 } 368 }
363 369
364 if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1) 370 if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
365 { 371 {
366 /* Send last fragment later. */
367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); 372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
368 tmit->paused = GNUNET_YES; 373 tmit->paused = GNUNET_YES;
369 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 374 GNUNET_SCHEDULER_add_delayed (
370 (GNUNET_TIME_UNIT_SECONDS, 3), 375 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
371 &transmit_resume, tmit); 376 tmit->data_delay[tmit->n]),
377 &transmit_resume, tmit);
372 *data_size = 0; 378 *data_size = 0;
373 return GNUNET_NO; 379 return GNUNET_NO;
374 } 380 }
381 tmit->paused = GNUNET_NO;
375 382
376 *data_size = size; 383 *data_size = size;
377 memcpy (data, tmit->data[tmit->n], size); 384 memcpy (data, tmit->data[tmit->n], size);
@@ -416,8 +423,9 @@ slave_join ()
416 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, 423 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
417 "_foo_bar", "foo bar baz", 11); 424 "_foo_bar", "foo bar baz", 11);
418 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, 425 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
419 16, relays, &slave_message, &join_request, &slave_joined, 426 16, relays, &slave_message, &join_request,
420 NULL, "_request_join", env, "some data", 9); 427 &slave_joined, NULL, "_request_join", env,
428 "some data", 9);
421 GNUNET_ENV_environment_destroy (env); 429 GNUNET_ENV_environment_destroy (env);
422} 430}
423 431
@@ -427,17 +435,45 @@ master_transmit ()
427{ 435{
428 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n"); 436 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
429 test = TEST_MASTER_TRANSMIT; 437 test = TEST_MASTER_TRANSMIT;
438 uint32_t i, j;
439
440 char *name_max = "_test_max";
441 uint8_t name_max_size = sizeof ("_test_max");
442 char *val_max = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD);
443 for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; i++)
444 val_max[i] = (0 == i % 10000) ? '0' + i / 10000 : '.';
445
446 char *name_cont = "_test_cont";
447 uint8_t name_cont_size = sizeof ("_test_cont");
448 char *val_cont = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD
449 + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
450 for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size; i++)
451 val_cont[i] = (0 == i % 10000) ? '0' + i / 10000 : ':';
452 for (j = 0; j < GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; j++, i++)
453 val_cont[i] = (0 == j % 10000) ? '0' + j / 10000 : '!';
430 454
431 tmit = GNUNET_new (struct TransmitClosure); 455 tmit = GNUNET_new (struct TransmitClosure);
432 tmit->env = GNUNET_ENV_environment_create (); 456 tmit->env = GNUNET_ENV_environment_create ();
433 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, 457 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
434 "_foo", "bar baz", 7); 458 "_foo", "bar baz", 7);
435 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, 459 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
460 name_max, val_max,
461 GNUNET_PSYC_MODIFIER_MAX_PAYLOAD
462 - name_max_size);
463 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
436 "_foo_bar", "foo bar baz", 11); 464 "_foo_bar", "foo bar baz", 11);
465 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
466 name_cont, val_cont,
467 GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size
468 + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
437 tmit->data[0] = "foo"; 469 tmit->data[0] = "foo";
438 tmit->data[1] = "foo bar"; 470 tmit->data[1] = GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1);
439 tmit->data[2] = "foo bar baz"; 471 for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++)
440 tmit->data_count = 3; 472 tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_';
473 tmit->data[2] = "foo bar";
474 tmit->data[3] = "foo bar baz";
475 tmit->data_delay[1] = 3;
476 tmit->data_count = 4;
441 tmit->mst_tmit 477 tmit->mst_tmit
442 = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod, 478 = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod,
443 tmit_notify_data, tmit, 479 tmit_notify_data, tmit,