aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-05-06 10:26:24 +0000
committerGabor X Toth <*@tg-x.net>2014-05-06 10:26:24 +0000
commit4e1baae59f18ee5d7cd47afe28ced3daaaa5a5ad (patch)
tree4e0d4a1bf488d4969f44e3a8db26af6ca22d4db7 /src
parentb2061e704ac6309bb7ee4427a89a1572aa9f339e (diff)
downloadgnunet-4e1baae59f18ee5d7cd47afe28ced3daaaa5a5ad.tar.gz
gnunet-4e1baae59f18ee5d7cd47afe28ced3daaaa5a5ad.zip
psyc: in-order message delivery
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_protocols.h2
-rw-r--r--src/include/gnunet_psyc_service.h43
-rw-r--r--src/multicast/multicast_api.c10
-rw-r--r--src/psyc/gnunet-service-psyc.c724
-rw-r--r--src/psyc/psyc.h29
-rw-r--r--src/psyc/psyc_api.c4
-rw-r--r--src/psyc/psyc_common.c38
-rw-r--r--src/psyc/test_psyc.c13
-rw-r--r--src/psycstore/psycstore_api.c4
9 files changed, 592 insertions, 275 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 715f91809..93965fd9c 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2148,7 +2148,7 @@ extern "C"
2148/** 2148/**
2149 * C: client 2149 * C: client
2150 * S: service 2150 * S: service
2151 * M: muticast 2151 * M: multicast
2152 */ 2152 */
2153 2153
2154/** S->C: result of an operation */ 2154/** S->C: result of an operation */
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h
index 018f012f4..928e05242 100644
--- a/src/include/gnunet_psyc_service.h
+++ b/src/include/gnunet_psyc_service.h
@@ -167,7 +167,23 @@ enum GNUNET_PSYC_MessageFlags
167 /** 167 /**
168 * Request from slave to master. 168 * Request from slave to master.
169 */ 169 */
170 GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1 170 GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1,
171
172 /**
173 * Message can be delivered out of order.
174 */
175 GNUNET_PSYC_MESSAGE_ORDER_ANY = 1 << 2
176};
177
178
179/**
180 * Values for the @a state_delta field of GNUNET_PSYC_MessageHeader.
181 */
182enum GNUNET_PSYC_StateDeltaValues
183{
184 GNUNET_PSYC_STATE_RESET = 0,
185
186 GNUNET_PSYC_STATE_NOT_MODIFIED = UINT64_MAX
171}; 187};
172 188
173 189
@@ -175,6 +191,8 @@ GNUNET_NETWORK_STRUCT_BEGIN
175 191
176/** 192/**
177 * Header of a PSYC message. 193 * Header of a PSYC message.
194 *
195 * Only present when receiving a message.
178 */ 196 */
179struct GNUNET_PSYC_MessageHeader 197struct GNUNET_PSYC_MessageHeader
180{ 198{
@@ -223,6 +241,12 @@ struct GNUNET_PSYC_MessageMethod
223 */ 241 */
224 uint32_t flags GNUNET_PACKED; 242 uint32_t flags GNUNET_PACKED;
225 243
244 /**
245 * Number of message IDs since the last message that contained state
246 * operations. @see enum GNUNET_PSYC_StateDeltaValues
247 */
248 uint64_t state_delta GNUNET_PACKED;
249
226 /* Followed by NUL-terminated method name. */ 250 /* Followed by NUL-terminated method name. */
227}; 251};
228 252
@@ -479,22 +503,29 @@ typedef int
479enum GNUNET_PSYC_MasterTransmitFlags 503enum GNUNET_PSYC_MasterTransmitFlags
480{ 504{
481 GNUNET_PSYC_MASTER_TRANSMIT_NONE = 0, 505 GNUNET_PSYC_MASTER_TRANSMIT_NONE = 0,
506
482 /** 507 /**
483 * Whether this message should reset the channel state, 508 * Whether this message should reset the channel state,
484 * i.e. remove all previously stored state variables. 509 * i.e. remove all previously stored state variables.
485 */ 510 */
486 GNUNET_PSYC_MASTER_TRANSMIT_RESET_STATE = 1 << 0, 511
512 GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET = 1 << 0,
487 513
488 /** 514 /**
489 * Whether we need to increment the group generation counter after 515 * Whether this message contains any state modifiers.
490 * transmitting this message.
491 */ 516 */
492 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN = 1 << 1, 517 GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY = 1 << 1,
493 518
494 /** 519 /**
495 * Add PSYC header variable with the hash of the current channel state. 520 * Add PSYC header variable with the hash of the current channel state.
496 */ 521 */
497 GNUNET_PSYC_MASTER_TRANSMIT_ADD_STATE_HASH = 1 << 2 522 GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH = 1 << 2,
523
524 /**
525 * Whether we need to increment the group generation counter after
526 * transmitting this message.
527 */
528 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN = 1 << 3
498}; 529};
499 530
500 531
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index b23320c72..da81de486 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -182,7 +182,7 @@ message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
182 { 182 {
183 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 183 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
184 "Calling origin's message callback " 184 "Calling origin's message callback "
185 "for a message of type %u and size %u.\n", 185 "with a message of type %u and size %u.\n",
186 ntohs (msg->type), ntohs (msg->size)); 186 ntohs (msg->type), ntohs (msg->size));
187 struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp; 187 struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp;
188 orig->message_cb (orig->cls, msg); 188 orig->message_cb (orig->cls, msg);
@@ -190,8 +190,8 @@ message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
190 else 190 else
191 { 191 {
192 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 192 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
193 "Calling slave's message callback " 193 "Calling member's message callback "
194 "for a message of type %u and size %u.\n", 194 "with a message of type %u and size %u.\n",
195 ntohs (msg->type), ntohs (msg->size)); 195 ntohs (msg->type), ntohs (msg->size));
196 struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp; 196 struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp;
197 mem->message_cb (mem->cls, msg); 197 mem->message_cb (mem->cls, msg);
@@ -477,8 +477,8 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
477 msg->group_generation = mh->group_generation; 477 msg->group_generation = mh->group_generation;
478 478
479 /* FIXME: add fragment ID and signature in the service instead of here */ 479 /* FIXME: add fragment ID and signature in the service instead of here */
480 msg->fragment_id = GNUNET_ntohll (orig->next_fragment_id++); 480 msg->fragment_id = GNUNET_htonll (orig->next_fragment_id++);
481 msg->fragment_offset = GNUNET_ntohll (mh->fragment_offset); 481 msg->fragment_offset = GNUNET_htonll (mh->fragment_offset);
482 mh->fragment_offset += sizeof (*msg) + buf_size; 482 mh->fragment_offset += sizeof (*msg) + buf_size;
483 msg->purpose.size = htonl (sizeof (*msg) + buf_size 483 msg->purpose.size = htonl (sizeof (*msg) + buf_size
484 - sizeof (msg->header) 484 - sizeof (msg->header)
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index fb3aa65f7..3a29c8ffd 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -73,19 +73,21 @@ struct TransmitMessage
73 struct TransmitMessage *next; 73 struct TransmitMessage *next;
74 74
75 /** 75 /**
76 * Buffer with message to be transmitted. 76 * ID assigned to the message.
77 */ 77 */
78 char *buf; 78 uint64_t id;
79 79
80 /** 80 /**
81 * Size of @a buf 81 * Size of @a buf
82 */ 82 */
83 uint16_t size 83 uint16_t size;
84; 84
85 /** 85 /**
86 * @see enum MessageState 86 * @see enum MessageState
87 */ 87 */
88 uint8_t state; 88 uint8_t state;
89
90 /* Followed by message */
89}; 91};
90 92
91 93
@@ -100,9 +102,9 @@ static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
100 102
101/** 103/**
102 * Entry in the chan_msgs hashmap of @a recv_cache: 104 * Entry in the chan_msgs hashmap of @a recv_cache:
103 * fragment_id -> FragmentEntry 105 * fragment_id -> RecvCacheEntry
104 */ 106 */
105struct FragmentEntry 107struct RecvCacheEntry
106{ 108{
107 struct GNUNET_MULTICAST_MessageHeader *mmsg; 109 struct GNUNET_MULTICAST_MessageHeader *mmsg;
108 uint16_t ref_count; 110 uint16_t ref_count;
@@ -110,20 +112,48 @@ struct FragmentEntry
110 112
111 113
112/** 114/**
113 * Entry in the @a recv_msgs hash map of a @a Channel. 115 * Entry in the @a recv_frags hash map of a @a Channel.
114 * message_id -> FragmentCache 116 * message_id -> FragmentQueue
115 */ 117 */
116struct FragmentCache 118struct FragmentQueue
117{ 119{
118 /** 120 /**
119 * Total size of header fragments (METHOD & MODIFIERs) 121 * Fragment IDs stored in @a recv_cache.
122 */
123 struct GNUNET_CONTAINER_Heap *fragments;
124
125 /**
126 * Total size of received fragments.
127 */
128 uint64_t size;
129
130 /**
131 * Total size of received header fragments (METHOD & MODIFIERs)
120 */ 132 */
121 uint64_t header_size; 133 uint64_t header_size;
122 134
123 /** 135 /**
124 * Fragment IDs stored in @a recv_cache. 136 * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
125 */ 137 */
126 struct GNUNET_CONTAINER_Heap *fragments; 138 uint64_t state_delta;
139
140 /**
141 * The @a flags field from struct GNUNET_PSYC_MessageMethod.
142 */
143 uint32_t flags;
144
145 /**
146 * Receive state of message.
147 *
148 * @see MessageFragmentState
149 */
150 uint8_t state;
151
152 /**
153 * Is the message queued for delivery to the client?
154 * i.e. added to the recv_msgs queue
155 */
156 uint8_t queued;
127}; 157};
128 158
129 159
@@ -139,12 +169,17 @@ struct Channel
139 169
140 /** 170 /**
141 * Received fragments not yet sent to the client. 171 * Received fragments not yet sent to the client.
142 * message_id -> FragmentCache 172 * message_id -> FragmentQueue
173 */
174 struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
175
176 /**
177 * Received message IDs not yet sent to the client.
143 */ 178 */
144 struct GNUNET_CONTAINER_MultiHashMap *recv_msgs; 179 struct GNUNET_CONTAINER_Heap *recv_msgs;
145 180
146 /** 181 /**
147 * FIXME 182 * FIXME: needed?
148 */ 183 */
149 GNUNET_SCHEDULER_TaskIdentifier tmit_task; 184 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
150 185
@@ -159,6 +194,19 @@ struct Channel
159 struct GNUNET_HashCode pub_key_hash; 194 struct GNUNET_HashCode pub_key_hash;
160 195
161 /** 196 /**
197 * Last message ID sent to the client.
198 * 0 if there is no such message.
199 */
200 uint64_t max_message_id;
201
202 /**
203 * ID of the last stateful message, where the state operations has been
204 * processed and saved to PSYCstore and which has been sent to the client.
205 * 0 if there is no such message.
206 */
207 uint64_t max_state_message_id;
208
209 /**
162 * Expected value size for the modifier being received from the PSYC service. 210 * Expected value size for the modifier being received from the PSYC service.
163 */ 211 */
164 uint32_t tmit_mod_value_size_expected; 212 uint32_t tmit_mod_value_size_expected;
@@ -174,7 +222,7 @@ struct Channel
174 uint8_t tmit_state; 222 uint8_t tmit_state;
175 223
176 /** 224 /**
177 * FIXME 225 * FIXME: needed?
178 */ 226 */
179 uint8_t in_transmit; 227 uint8_t in_transmit;
180 228
@@ -221,7 +269,7 @@ struct Master
221 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; 269 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
222 270
223 /** 271 /**
224 * Maximum message ID for this channel. 272 * Last message ID transmitted to this channel.
225 * 273 *
226 * Incremented before sending a message, thus the message_id in messages sent 274 * Incremented before sending a message, thus the message_id in messages sent
227 * starts from 1. 275 * starts from 1.
@@ -229,13 +277,13 @@ struct Master
229 uint64_t max_message_id; 277 uint64_t max_message_id;
230 278
231 /** 279 /**
232 * ID of the last message that contains any state operations. 280 * ID of the last message with state operations transmitted to the channel.
233 * 0 if there is no such message. 281 * 0 if there is no such message.
234 */ 282 */
235 uint64_t max_state_message_id; 283 uint64_t max_state_message_id;
236 284
237 /** 285 /**
238 * Maximum group generation for this channel. 286 * Maximum group generation transmitted to the channel.
239 */ 287 */
240 uint64_t max_group_generation; 288 uint64_t max_group_generation;
241 289
@@ -292,11 +340,6 @@ struct Slave
292 struct GNUNET_MessageHeader *join_req; 340 struct GNUNET_MessageHeader *join_req;
293 341
294 /** 342 /**
295 * Maximum message ID for this channel.
296 */
297 uint64_t max_message_id;
298
299 /**
300 * Maximum request ID for this channel. 343 * Maximum request ID for this channel.
301 */ 344 */
302 uint64_t max_request_id; 345 uint64_t max_request_id;
@@ -304,7 +347,7 @@ struct Slave
304 347
305 348
306static inline void 349static inline void
307transmit_message (struct Channel *ch, uint8_t inc_msg_id); 350transmit_message (struct Channel *ch);
308 351
309 352
310/** 353/**
@@ -386,7 +429,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
386 /* Send pending messages to multicast before cleanup. */ 429 /* Send pending messages to multicast before cleanup. */
387 if (NULL != ch->tmit_head) 430 if (NULL != ch->tmit_head)
388 { 431 {
389 transmit_message (ch, GNUNET_NO); 432 transmit_message (ch);
390 } 433 }
391 else 434 else
392 { 435 {
@@ -484,6 +527,7 @@ static inline void
484hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n) 527hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
485{ 528{
486 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */ 529 /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
530 /* TODO: use built-in byte swap functions if available */
487 531
488 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL); 532 n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
489 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL); 533 n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
@@ -512,29 +556,40 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
512} 556}
513 557
514 558
559/**
560 * Insert a multicast message fragment into the queue belonging to the message.
561 *
562 * @param ch Channel.
563 * @param mmsg Multicast message fragment.
564 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
565 * @param first_ptype First PSYC message part type in @a mmsg.
566 * @param last_ptype Last PSYC message part type in @a mmsg.
567 */
515static void 568static void
516fragment_cache_insert (struct Channel *ch, 569fragment_queue_insert (struct Channel *ch,
517 const struct GNUNET_HashCode *msg_id,
518 struct FragmentCache *frag_cache,
519 const struct GNUNET_MULTICAST_MessageHeader *mmsg, 570 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
520 uint16_t last_part_type) 571 uint16_t first_ptype, uint16_t last_ptype)
521{ 572{
522 uint16_t size = ntohs (mmsg->header.size); 573 const uint16_t size = ntohs (mmsg->header.size);
574 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
523 struct GNUNET_CONTAINER_MultiHashMap 575 struct GNUNET_CONTAINER_MultiHashMap
524 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, 576 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
525 &ch->pub_key_hash); 577 &ch->pub_key_hash);
526 578
527 if (NULL == frag_cache) 579 struct GNUNET_HashCode msg_id_hash;
580 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
581
582 struct FragmentQueue
583 *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
584
585 if (NULL == fragq)
528 { 586 {
529 frag_cache = GNUNET_new (struct FragmentCache); 587 fragq = GNUNET_new (struct FragmentQueue);
530 frag_cache->fragments 588 fragq->state = MSG_FRAG_STATE_HEADER;
589 fragq->fragments
531 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 590 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
532 591
533 if (NULL == ch->recv_msgs) 592 GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq,
534 {
535 ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
536 }
537 GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache,
538 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); 593 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
539 594
540 if (NULL == chan_msgs) 595 if (NULL == chan_msgs)
@@ -545,190 +600,335 @@ fragment_cache_insert (struct Channel *ch,
545 } 600 }
546 } 601 }
547 602
548 struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode); 603 struct GNUNET_HashCode frag_id_hash;
549 hash_key_from_nll (frag_id, mmsg->fragment_id); 604 hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
550 struct FragmentEntry 605 struct RecvCacheEntry
551 *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id); 606 *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
552 if (NULL == frag_entry) 607 if (NULL == cache_entry)
553 { 608 {
554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 609 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
555 "%p Adding message fragment to cache. " 610 "%p Adding message fragment to cache. "
556 "fragment_id: %" PRIu64 ", " 611 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
557 "header_size: %" PRIu64 " + %" PRIu64 ").\n", 612 "header_size: %" PRIu64 " + %u).\n",
558 ch, GNUNET_ntohll (mmsg->fragment_id), 613 ch, GNUNET_ntohll (mmsg->message_id),
559 frag_cache->header_size, size); 614 GNUNET_ntohll (mmsg->fragment_id),
560 frag_entry = GNUNET_new (struct FragmentEntry); 615 fragq->header_size, size);
561 frag_entry->ref_count = 1; 616 cache_entry = GNUNET_new (struct RecvCacheEntry);
562 frag_entry->mmsg = GNUNET_malloc (size); 617 cache_entry->ref_count = 1;
563 memcpy (frag_entry->mmsg, mmsg, size); 618 cache_entry->mmsg = GNUNET_malloc (size);
564 GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry, 619 memcpy (cache_entry->mmsg, mmsg, size);
620 GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
565 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); 621 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
566 } 622 }
567 else 623 else
568 { 624 {
569 frag_entry->ref_count++; 625 cache_entry->ref_count++;
570 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 626 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
571 "%p Message fragment already in cache. " 627 "%p Message fragment is already in cache. "
572 "fragment_id: %" PRIu64 ", ref_count: %u\n", 628 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
573 ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count); 629 ", ref_count: %u\n",
630 ch, GNUNET_ntohll (mmsg->message_id),
631 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
632 }
633
634 if (MSG_FRAG_STATE_HEADER == fragq->state)
635 {
636 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
637 {
638 struct GNUNET_PSYC_MessageMethod *
639 pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
640 fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
641 fragq->flags = ntohl (pmeth->flags);
642 }
643
644 if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
645 {
646 fragq->header_size += size;
647 }
648 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
649 || frag_offset == fragq->header_size)
650 { /* header is now complete */
651 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
652 "%p Header of message %" PRIu64 " is complete.\n",
653 ch, GNUNET_ntohll (mmsg->message_id));
654
655 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
656 "%p Adding message %" PRIu64 " to queue.\n",
657 ch, GNUNET_ntohll (mmsg->message_id));
658 fragq->state = MSG_FRAG_STATE_DATA;
659 }
660 else
661 {
662 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
663 "%p Header of message %" PRIu64 " is NOT complete yet: "
664 "%" PRIu64 " != %" PRIu64 "\n",
665 ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
666 fragq->header_size);
667 }
574 } 668 }
575 669
576 switch (last_part_type) 670 switch (last_ptype)
577 { 671 {
578 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 672 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
579 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: 673 if (frag_offset == fragq->size)
580 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: 674 fragq->state = MSG_FRAG_STATE_END;
581 frag_cache->header_size += size; 675 else
676 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
677 "%p Message %" PRIu64 " is NOT complete yet: "
678 "%" PRIu64 " != %" PRIu64 "\n",
679 ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
680 fragq->size);
681 break;
682
683 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
684 /* Drop message without delivering to client if it's a single fragment */
685 fragq->state =
686 (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
687 ? MSG_FRAG_STATE_DROP
688 : MSG_FRAG_STATE_CANCEL;
689 }
690
691 switch (fragq->state)
692 {
693 case MSG_FRAG_STATE_DATA:
694 case MSG_FRAG_STATE_END:
695 case MSG_FRAG_STATE_CANCEL:
696 if (GNUNET_NO == fragq->queued)
697 {
698 GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL,
699 GNUNET_ntohll (mmsg->message_id));
700 fragq->queued = GNUNET_YES;
701 }
582 } 702 }
583 GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id, 703
704 fragq->size += size;
705 GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
584 GNUNET_ntohll (mmsg->fragment_id)); 706 GNUNET_ntohll (mmsg->fragment_id));
585} 707}
586 708
587 709
710/**
711 * Run fragment queue of a message.
712 *
713 * Send fragments of a message in order to client, after all modifiers arrived
714 * from multicast.
715 *
716 * @param ch Channel.
717 * @param msg_id ID of the message @a fragq belongs to.
718 * @param fragq Fragment queue of the message.
719 * @param drop Drop message without delivering to client?
720 * #GNUNET_YES or #GNUNET_NO.
721 */
588static void 722static void
589fragment_cache_clear (struct Channel *ch, 723fragment_queue_run (struct Channel *ch, uint64_t msg_id,
590 const struct GNUNET_HashCode *msg_id, 724 struct FragmentQueue *fragq, uint8_t drop)
591 struct FragmentCache *frag_cache,
592 uint8_t send_to_client)
593{ 725{
594 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 726 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
595 "%p Clearing message fragment cache.\n", ch); 727 "%p Running message fragment queue for message %" PRIu64
728 " (state: %u).\n",
729 ch, msg_id, fragq->state);
596 730
597 struct GNUNET_CONTAINER_MultiHashMap 731 struct GNUNET_CONTAINER_MultiHashMap
598 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, 732 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
599 &ch->pub_key_hash); 733 &ch->pub_key_hash);
600 GNUNET_assert (NULL != chan_msgs); 734 GNUNET_assert (NULL != chan_msgs);
601 struct GNUNET_HashCode *frag_id; 735 uint64_t frag_id;
602 736
603 while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments))) 737 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
738 &frag_id))
604 { 739 {
605 struct FragmentEntry 740 struct GNUNET_HashCode frag_id_hash;
606 *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id); 741 hash_key_from_hll (&frag_id_hash, frag_id);
607 if (frag_entry != NULL) 742 struct RecvCacheEntry *cache_entry
743 = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
744 if (cache_entry != NULL)
608 { 745 {
609 if (GNUNET_YES == send_to_client) 746 if (GNUNET_NO == drop)
610 { 747 {
611 message_to_client (ch, frag_entry->mmsg); 748 message_to_client (ch, cache_entry->mmsg);
612 } 749 }
613 if (1 == frag_entry->ref_count) 750 if (cache_entry->ref_count <= 1)
614 { 751 {
615 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry); 752 GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
616 GNUNET_free (frag_entry->mmsg); 753 cache_entry);
617 GNUNET_free (frag_entry); 754 GNUNET_free (cache_entry->mmsg);
755 GNUNET_free (cache_entry);
618 } 756 }
619 else 757 else
620 { 758 {
621 frag_entry->ref_count--; 759 cache_entry->ref_count--;
622 } 760 }
623 } 761 }
624 GNUNET_free (frag_id); 762#if CACHE_AGING_IMPLEMENTED
763 else if (GNUNET_NO == drop)
764 {
765 /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
766 }
767#endif
768
769 GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
625 } 770 }
626 771
627 GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache); 772 if (MSG_FRAG_STATE_END <= fragq->state)
628 GNUNET_CONTAINER_heap_destroy (frag_cache->fragments); 773 {
629 GNUNET_free (frag_cache); 774 struct GNUNET_HashCode msg_id_hash;
775 hash_key_from_nll (&msg_id_hash, msg_id);
776
777 GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq);
778 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
779 GNUNET_free (fragq);
780 }
781 else
782 {
783 fragq->queued = GNUNET_NO;
784 }
630} 785}
631 786
632 787
633/** 788/**
634 * Incoming message fragment from multicast. 789 * Run message queue.
635 * 790 *
636 * Store it using PSYCstore and send it to the client of the channel. 791 * Send messages in queue to client in order after a message has arrived from
792 * multicast, according to the following:
793 * - A message is only sent if all of its modifiers arrived.
794 * - A stateful message is only sent if the previous stateful message
795 * has already been delivered to the client.
796 *
797 * @param ch Channel.
798 * @return Number of messages removed from queue and sent to client.
637 */ 799 */
638static void 800static uint64_t
639message_cb (void *cls, const struct GNUNET_MessageHeader *msg) 801message_queue_run (struct Channel *ch)
640{ 802{
641 struct Channel *ch = cls; 803 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
642 uint16_t type = ntohs (msg->type); 804 "%p Running message queue.\n", ch);
643 uint16_t size = ntohs (msg->size); 805 uint64_t n = 0;
644 806 uint64_t msg_id;
645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 807 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
646 "%p Received message of type %u and size %u from multicast.\n", 808 &msg_id))
647 ch, type, size);
648
649 switch (type)
650 {
651 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
652 { 809 {
653 GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, 810 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
654 (const struct 811 "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id);
655 GNUNET_MULTICAST_MessageHeader *) msg, 812 struct GNUNET_HashCode msg_id_hash;
656 0, NULL, NULL); 813 hash_key_from_hll (&msg_id_hash, msg_id);
657
658#if TODO
659 /* FIXME: apply modifiers to state in PSYCstore */
660 GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key,
661 GNUNET_ntohll (mmsg->message_id),
662 meth->mod_count, mods,
663 rcb, rcb_cls);
664#endif
665
666 const struct GNUNET_MULTICAST_MessageHeader
667 *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
668 814
669 uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg), 815 struct FragmentQueue *
670 (const char *) &mmsg[1]); 816 fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
671 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
672 "Last message part type %u\n", ptype);
673 817
674 if (GNUNET_NO == ptype) 818 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
675 { 819 {
676 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 820 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
677 "%p Received message with invalid parts from multicast. " 821 "%p No fragq (%p) or header not complete.\n",
678 "Dropping message.\n", ch); 822 ch, fragq);
679 GNUNET_break_op (0);
680 break; 823 break;
681 } 824 }
682 825
683 struct GNUNET_HashCode msg_id; 826 if (MSG_FRAG_STATE_HEADER == fragq->state)
684 hash_key_from_nll (&msg_id, mmsg->message_id);
685
686 struct FragmentCache *frag_cache = NULL;
687 if (NULL != ch->recv_msgs)
688 frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id);
689
690 switch (ptype)
691 { 827 {
692 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: 828 /* Check if there's a missing message before the current one */
693 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: 829 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
694 /* FIXME: check state flag / max_state_message_id */
695 if (NULL == frag_cache)
696 { 830 {
697 message_to_client (ch, mmsg); 831 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
698 break; 832 && msg_id - 1 != ch->max_message_id)
833 {
834 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
835 "%p Out of order message. "
836 "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
837 ch, msg_id, ch->max_message_id);
838 break;
839 }
699 } 840 }
700 else 841 else
701 { 842 {
702 if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size) 843 if (msg_id - fragq->state_delta != ch->max_state_message_id)
703 { /* first data fragment after the header, send cached fragments */ 844 {
704 fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_YES); 845 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
705 message_to_client (ch, mmsg); 846 "%p Out of order stateful message. "
847 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
848 ch, msg_id, fragq->state_delta, ch->max_state_message_id);
706 break; 849 break;
707 } 850 }
708 else 851#if TODO
709 { /* still missing fragments from the header, cache data fragment */ 852 /* FIXME: apply modifiers to state in PSYCstore */
710 /* fall thru */ 853 GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id,
711 } 854 state_modify_result_cb, cls);
855#endif
856 ch->max_state_message_id = msg_id;
712 } 857 }
858 ch->max_message_id = msg_id;
859 }
860 fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
861 GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
862 n++;
863 }
864 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
865 "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
866 return n;
867}
713 868
714 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
715 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
716 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
717 /* not all modifiers arrived yet, cache fragment */
718 fragment_cache_insert (ch, &msg_id, frag_cache, mmsg, ptype);
719 break;
720 869
721 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: 870/**
722 if (NULL != frag_cache) 871 * Handle incoming message from multicast.
723 { /* fragments not yet sent to client, remove from cache */ 872 *
724 fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_NO); 873 * @param ch Channel.
725 } 874 * @param mmsg Multicast message.
726 else 875 *
727 { 876 * @return #GNUNET_OK or #GNUNET_SYSERR
728 message_to_client (ch, mmsg); 877 */
729 } 878static int
730 break; 879handle_multicast_message (struct Channel *ch,
731 } 880 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
881{
882 GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
883
884 uint16_t size = ntohs (mmsg->header.size);
885 uint16_t first_ptype = 0, last_ptype = 0;
886
887 if (GNUNET_SYSERR
888 == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
889 (const char *) &mmsg[1],
890 &first_ptype, &last_ptype))
891 {
892 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
893 "%p Received message with invalid parts from multicast. "
894 "Dropping message.\n", ch);
895 GNUNET_break_op (0);
896 return GNUNET_SYSERR;
897 }
898
899 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
900 "Message parts: first: type %u, last: type %u\n",
901 first_ptype, last_ptype);
902
903 fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
904 message_queue_run (ch);
905
906 return GNUNET_OK;
907}
908
909
910/**
911 * Incoming message fragment from multicast.
912 *
913 * Store it using PSYCstore and send it to the client of the channel.
914 */
915static void
916message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
917{
918 struct Channel *ch = cls;
919 uint16_t type = ntohs (msg->type);
920 uint16_t size = ntohs (msg->size);
921
922 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
923 "%p Received message of type %u and size %u from multicast.\n",
924 ch, type, size);
925
926 switch (type)
927 {
928 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
929 {
930 handle_multicast_message (ch, (const struct
931 GNUNET_MULTICAST_MessageHeader *) msg);
732 break; 932 break;
733 } 933 }
734 default: 934 default:
@@ -770,8 +970,9 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
770 = (const struct GNUNET_MULTICAST_RequestHeader *) msg; 970 = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
771 971
772 /* FIXME: see message_cb() */ 972 /* FIXME: see message_cb() */
773 if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req), 973 if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
774 (const char *) &req[1])) 974 (const char *) &req[1],
975 NULL, NULL))
775 { 976 {
776 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 977 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
777 "%p Dropping message with invalid parts " 978 "%p Dropping message with invalid parts "
@@ -825,7 +1026,8 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
825 if (GNUNET_OK == result || GNUNET_NO == result) 1026 if (GNUNET_OK == result || GNUNET_NO == result)
826 { 1027 {
827 mst->max_message_id = max_message_id; 1028 mst->max_message_id = max_message_id;
828 mst->max_state_message_id = max_state_message_id; 1029 ch->max_message_id = max_message_id;
1030 ch->max_state_message_id = max_state_message_id;
829 mst->max_group_generation = max_group_generation; 1031 mst->max_group_generation = max_group_generation;
830 mst->origin 1032 mst->origin
831 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, 1033 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
@@ -860,7 +1062,8 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
860 1062
861 if (GNUNET_OK == result || GNUNET_NO == result) 1063 if (GNUNET_OK == result || GNUNET_NO == result)
862 { 1064 {
863 slv->max_message_id = max_message_id; 1065 ch->max_message_id = max_message_id;
1066 ch->max_state_message_id = max_state_message_id;
864 slv->member 1067 slv->member
865 = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key, 1068 = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key,
866 &slv->origin, 1069 &slv->origin,
@@ -879,6 +1082,15 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
879} 1082}
880 1083
881 1084
1085static void
1086channel_init (struct Channel *ch)
1087{
1088 ch->recv_msgs
1089 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1090 ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1091}
1092
1093
882/** 1094/**
883 * Handle a connecting client starting a channel master. 1095 * Handle a connecting client starting a channel master.
884 */ 1096 */
@@ -888,14 +1100,18 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
888{ 1100{
889 const struct MasterStartRequest *req 1101 const struct MasterStartRequest *req
890 = (const struct MasterStartRequest *) msg; 1102 = (const struct MasterStartRequest *) msg;
1103
891 struct Master *mst = GNUNET_new (struct Master); 1104 struct Master *mst = GNUNET_new (struct Master);
1105 mst->policy = ntohl (req->policy);
1106 mst->priv_key = req->channel_key;
1107
892 struct Channel *ch = &mst->channel; 1108 struct Channel *ch = &mst->channel;
893 ch->client = client; 1109 ch->client = client;
894 ch->is_master = GNUNET_YES; 1110 ch->is_master = GNUNET_YES;
895 mst->policy = ntohl (req->policy);
896 mst->priv_key = req->channel_key;
897 GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key); 1111 GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key);
898 GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash); 1112 GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash);
1113 channel_init (ch);
1114
899 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1115 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
900 "%p Master connected to channel %s.\n", 1116 "%p Master connected to channel %s.\n",
901 mst, GNUNET_h2s (&ch->pub_key_hash)); 1117 mst, GNUNET_h2s (&ch->pub_key_hash));
@@ -919,13 +1135,7 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
919 const struct SlaveJoinRequest *req 1135 const struct SlaveJoinRequest *req
920 = (const struct SlaveJoinRequest *) msg; 1136 = (const struct SlaveJoinRequest *) msg;
921 struct Slave *slv = GNUNET_new (struct Slave); 1137 struct Slave *slv = GNUNET_new (struct Slave);
922 struct Channel *ch = &slv->channel;
923 slv->channel.client = client;
924 slv->channel.is_master = GNUNET_NO;
925 slv->slave_key = req->slave_key; 1138 slv->slave_key = req->slave_key;
926 ch->pub_key = req->channel_key;
927 GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
928 &ch->pub_key_hash);
929 slv->origin = req->origin; 1139 slv->origin = req->origin;
930 slv->relay_count = ntohl (req->relay_count); 1140 slv->relay_count = ntohl (req->relay_count);
931 if (0 < slv->relay_count) 1141 if (0 < slv->relay_count)
@@ -939,6 +1149,14 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
939 memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); 1149 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
940 } 1150 }
941 1151
1152 struct Channel *ch = &slv->channel;
1153 ch->client = client;
1154 ch->is_master = GNUNET_NO;
1155 ch->pub_key = req->channel_key;
1156 GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
1157 &ch->pub_key_hash);
1158 channel_init (ch);
1159
942 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1160 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
943 "%p Slave connected to channel %s.\n", 1161 "%p Slave connected to channel %s.\n",
944 slv, GNUNET_h2s (&ch->pub_key_hash)); 1162 slv, GNUNET_h2s (&ch->pub_key_hash));
@@ -991,7 +1209,7 @@ transmit_notify (void *cls, size_t *data_size, void *data)
991 "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size); 1209 "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
992 1210
993 *data_size = tmit_msg->size; 1211 *data_size = tmit_msg->size;
994 memcpy (data, tmit_msg->buf, *data_size); 1212 memcpy (data, &tmit_msg[1], *data_size);
995 1213
996 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); 1214 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
997 GNUNET_free (tmit_msg); 1215 GNUNET_free (tmit_msg);
@@ -1003,7 +1221,7 @@ transmit_notify (void *cls, size_t *data_size, void *data)
1003 { 1221 {
1004 if (NULL != ch->tmit_head) 1222 if (NULL != ch->tmit_head)
1005 { 1223 {
1006 transmit_message (ch, GNUNET_NO); 1224 transmit_message (ch);
1007 } 1225 }
1008 else if (ch->disconnected) 1226 else if (ch->disconnected)
1009 { 1227 {
@@ -1054,14 +1272,12 @@ slave_transmit_notify (void *cls, size_t *data_size, void *data)
1054 * Transmit a message from a channel master to the multicast group. 1272 * Transmit a message from a channel master to the multicast group.
1055 */ 1273 */
1056static void 1274static void
1057master_transmit_message (struct Master *mst, uint8_t inc_msg_id) 1275master_transmit_message (struct Master *mst)
1058{ 1276{
1059 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst); 1277 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
1060 mst->channel.tmit_task = 0; 1278 mst->channel.tmit_task = 0;
1061 if (NULL == mst->tmit_handle) 1279 if (NULL == mst->tmit_handle)
1062 { 1280 {
1063 if (GNUNET_YES == inc_msg_id)
1064 mst->max_message_id++;
1065 mst->tmit_handle 1281 mst->tmit_handle
1066 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, 1282 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
1067 mst->max_group_generation, 1283 mst->max_group_generation,
@@ -1078,13 +1294,11 @@ master_transmit_message (struct Master *mst, uint8_t inc_msg_id)
1078 * Transmit a message from a channel slave to the multicast group. 1294 * Transmit a message from a channel slave to the multicast group.
1079 */ 1295 */
1080static void 1296static void
1081slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id) 1297slave_transmit_message (struct Slave *slv)
1082{ 1298{
1083 slv->channel.tmit_task = 0; 1299 slv->channel.tmit_task = 0;
1084 if (NULL == slv->tmit_handle) 1300 if (NULL == slv->tmit_handle)
1085 { 1301 {
1086 if (GNUNET_YES == inc_msg_id)
1087 slv->max_message_id++;
1088 slv->tmit_handle 1302 slv->tmit_handle
1089 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id, 1303 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
1090 slave_transmit_notify, slv); 1304 slave_transmit_notify, slv);
@@ -1097,29 +1311,94 @@ slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id)
1097 1311
1098 1312
1099static inline void 1313static inline void
1100transmit_message (struct Channel *ch, uint8_t inc_msg_id) 1314transmit_message (struct Channel *ch)
1101{ 1315{
1102 ch->is_master 1316 ch->is_master
1103 ? master_transmit_message ((struct Master *) ch, inc_msg_id) 1317 ? master_transmit_message ((struct Master *) ch)
1104 : slave_transmit_message ((struct Slave *) ch, inc_msg_id); 1318 : slave_transmit_message ((struct Slave *) ch);
1105} 1319}
1106 1320
1107 1321
1322/**
1323 * Queue a message from a channel master for sending to the multicast group.
1324 */
1108static void 1325static void
1109transmit_error (struct Channel *ch) 1326master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
1327 uint16_t first_ptype, uint16_t last_ptype)
1328{
1329 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
1330
1331 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1332 {
1333 tmit_msg->id = ++mst->max_message_id;
1334 struct GNUNET_PSYC_MessageMethod *pmeth
1335 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1336
1337 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
1338 {
1339 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
1340 }
1341 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
1342 {
1343 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
1344 - mst->max_state_message_id);
1345 }
1346 else
1347 {
1348 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1349 }
1350 }
1351}
1352
1353
1354/**
1355 * Queue a message from a channel slave for sending to the multicast group.
1356 */
1357static void
1358slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1359 uint16_t first_ptype, uint16_t last_ptype)
1360{
1361 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
1362 {
1363 struct GNUNET_PSYC_MessageMethod *pmeth
1364 = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
1365 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
1366 tmit_msg->id = ++slv->max_request_id;
1367 }
1368}
1369
1370
1371static void
1372queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg,
1373 uint16_t first_ptype, uint16_t last_ptype)
1110{ 1374{
1111 struct GNUNET_MessageHeader *msg; 1375 uint16_t size = ntohs (msg->size) - sizeof (*msg);
1112 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) 1376 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
1113 + sizeof (*msg)); 1377 memcpy (&tmit_msg[1], &msg[1], size);
1114 msg = (struct GNUNET_MessageHeader *) &tmit_msg[1]; 1378 tmit_msg->size = size;
1115 msg->size = ntohs (sizeof (*msg));
1116 msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
1117
1118 tmit_msg->buf = (char *) &tmit_msg[1];
1119 tmit_msg->size = sizeof (*msg);
1120 tmit_msg->state = ch->tmit_state; 1379 tmit_msg->state = ch->tmit_state;
1380
1121 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); 1381 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
1122 transmit_message (ch, GNUNET_NO); 1382
1383 ch->is_master
1384 ? master_queue_message ((struct Master *) ch, tmit_msg,
1385 first_ptype, last_ptype)
1386 : slave_queue_message ((struct Slave *) ch, tmit_msg,
1387 first_ptype, last_ptype);
1388}
1389
1390
1391static void
1392transmit_error (struct Channel *ch)
1393{
1394 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1395
1396 struct GNUNET_MessageHeader msg;
1397 msg.size = ntohs (sizeof (msg));
1398 msg.type = ntohs (type);
1399
1400 queue_message (ch, &msg, type, type);
1401 transmit_message (ch);
1123 1402
1124 /* FIXME: cleanup */ 1403 /* FIXME: cleanup */
1125} 1404}
@@ -1136,6 +1415,10 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1136 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 1415 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1137 GNUNET_assert (NULL != ch); 1416 GNUNET_assert (NULL != ch);
1138 1417
1418 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1419 "%p Received message from client.\n", ch);
1420 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1421
1139 if (GNUNET_YES != ch->ready) 1422 if (GNUNET_YES != ch->ready)
1140 { 1423 {
1141 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1424 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -1145,10 +1428,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1145 return; 1428 return;
1146 } 1429 }
1147 1430
1148 uint8_t inc_msg_id = GNUNET_NO;
1149 uint16_t size = ntohs (msg->size); 1431 uint16_t size = ntohs (msg->size);
1150 uint16_t psize = 0, ptype = 0, pos = 0;
1151
1152 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) 1432 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
1153 { 1433 {
1154 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch); 1434 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
@@ -1158,42 +1438,22 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1158 return; 1438 return;
1159 } 1439 }
1160 1440
1161 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1441 uint16_t first_ptype = 0, last_ptype = 0;
1162 "%p Received message from client.\n", ch); 1442 if (GNUNET_SYSERR
1163 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); 1443 == GNUNET_PSYC_check_message_parts (size - sizeof (*msg),
1164 1444 (const char *) &msg[1],
1165 for (pos = 0; sizeof (*msg) + pos < size; pos += psize) 1445 &first_ptype, &last_ptype))
1166 { 1446 {
1167 const struct GNUNET_MessageHeader *pmsg 1447 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1168 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); 1448 "%p Received invalid message part from client.\n", ch);
1169 psize = ntohs (pmsg->size); 1449 GNUNET_break (0);
1170 ptype = ntohs (pmsg->type); 1450 transmit_error (ch);
1171 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) 1451 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1172 { 1452 return;
1173 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1174 "%p Received invalid message part of type %u and size %u "
1175 "from client.\n", ch, ptype, psize);
1176 GNUNET_break (0);
1177 transmit_error (ch);
1178 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1179 return;
1180 }
1181 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1182 "%p Received message part from client.\n", ch);
1183 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1184
1185 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
1186 inc_msg_id = GNUNET_YES;
1187 } 1453 }
1188 1454
1189 size -= sizeof (*msg); 1455 queue_message (ch, msg, first_ptype, last_ptype);
1190 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size); 1456 transmit_message (ch);
1191 tmit_msg->buf = (char *) &tmit_msg[1];
1192 memcpy (tmit_msg->buf, &msg[1], size);
1193 tmit_msg->size = size;
1194 tmit_msg->state = ch->tmit_state;
1195 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
1196 transmit_message (ch, inc_msg_id);
1197 1457
1198 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1458 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1199}; 1459};
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 582a8e168..f2d386548 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -31,8 +31,9 @@
31#include "gnunet_psyc_service.h" 31#include "gnunet_psyc_service.h"
32 32
33 33
34uint16_t 34int
35GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data); 35GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data,
36 uint16_t *first_ptype, uint16_t *last_ptype);
36 37
37void 38void
38GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, 39GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
@@ -41,14 +42,26 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
41 42
42enum MessageState 43enum MessageState
43{ 44{
44 MSG_STATE_START = 0, 45 MSG_STATE_START = 0,
45 MSG_STATE_HEADER = 1, 46 MSG_STATE_HEADER = 1,
46 MSG_STATE_METHOD = 2, 47 MSG_STATE_METHOD = 2,
47 MSG_STATE_MODIFIER = 3, 48 MSG_STATE_MODIFIER = 3,
48 MSG_STATE_MOD_CONT = 4, 49 MSG_STATE_MOD_CONT = 4,
49 MSG_STATE_DATA = 5, 50 MSG_STATE_DATA = 5,
50 MSG_STATE_END = 6, 51 MSG_STATE_END = 6,
51 MSG_STATE_CANCEL = 7, 52 MSG_STATE_CANCEL = 7,
53 MSG_STATE_ERROR = 8,
54};
55
56
57enum MessageFragmentState
58{
59 MSG_FRAG_STATE_START = 0,
60 MSG_FRAG_STATE_HEADER = 1,
61 MSG_FRAG_STATE_DATA = 2,
62 MSG_FRAG_STATE_END = 3,
63 MSG_FRAG_STATE_CANCEL = 4,
64 MSG_FRAG_STATE_DROP = 5,
52}; 65};
53 66
54 67
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 16e8106d4..22f1da069 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -1502,7 +1502,7 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
1502 slvadd = (struct ChannelSlaveAdd *) &op[1]; 1502 slvadd = (struct ChannelSlaveAdd *) &op[1];
1503 op->msg = (struct GNUNET_MessageHeader *) slvadd; 1503 op->msg = (struct GNUNET_MessageHeader *) slvadd;
1504 1504
1505 slvadd->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD; 1505 slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
1506 slvadd->header.size = htons (sizeof (*slvadd)); 1506 slvadd->header.size = htons (sizeof (*slvadd));
1507 slvadd->announced_at = GNUNET_htonll (announced_at); 1507 slvadd->announced_at = GNUNET_htonll (announced_at);
1508 slvadd->effective_since = GNUNET_htonll (effective_since); 1508 slvadd->effective_since = GNUNET_htonll (effective_since);
@@ -1544,7 +1544,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
1544 1544
1545 slvrm = (struct ChannelSlaveRemove *) &op[1]; 1545 slvrm = (struct ChannelSlaveRemove *) &op[1];
1546 op->msg = (struct GNUNET_MessageHeader *) slvrm; 1546 op->msg = (struct GNUNET_MessageHeader *) slvrm;
1547 slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM; 1547 slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
1548 slvrm->header.size = htons (sizeof (*slvrm)); 1548 slvrm->header.size = htons (sizeof (*slvrm));
1549 slvrm->announced_at = GNUNET_htonll (announced_at); 1549 slvrm->announced_at = GNUNET_htonll (announced_at);
1550 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, 1550 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
diff --git a/src/psyc/psyc_common.c b/src/psyc/psyc_common.c
index bf5643ff2..74729aca2 100644
--- a/src/psyc/psyc_common.c
+++ b/src/psyc/psyc_common.c
@@ -30,36 +30,48 @@
30/** 30/**
31 * Check if @a data contains a series of valid message parts. 31 * Check if @a data contains a series of valid message parts.
32 * 32 *
33 * @param data_size Size of @a data. 33 * @param data_size Size of @a data.
34 * @param data Data. 34 * @param data Data.
35 * @param[out] first_ptype Type of first message part.
36 * @param[out] last_ptype Type of last message part.
35 * 37 *
36 * @return Message type number 38 * @return Number of message parts found in @a data.
37 * or GNUNET_NO if the message contains invalid or no parts. 39 * or GNUNET_SYSERR if the message contains invalid parts.
38 */ 40 */
39uint16_t 41int
40GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data) 42GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data,
43 uint16_t *first_ptype, uint16_t *last_ptype)
41{ 44{
42 const struct GNUNET_MessageHeader *pmsg; 45 const struct GNUNET_MessageHeader *pmsg;
43 uint16_t ptype = GNUNET_NO; 46 uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
44 uint16_t psize = 0; 47 if (NULL != first_ptype)
45 uint16_t pos = 0; 48 *first_ptype = 0;
49 if (NULL != last_ptype)
50 *last_ptype = 0;
46 51
47 for (pos = 0; pos < data_size; pos += psize) 52 for (pos = 0; pos < data_size; pos += psize, parts++)
48 { 53 {
49 pmsg = (const struct GNUNET_MessageHeader *) (data + pos); 54 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
50 psize = ntohs (pmsg->size); 55 psize = ntohs (pmsg->size);
51 ptype = ntohs (pmsg->type); 56 ptype = ntohs (pmsg->type);
52 if (psize < sizeof (*pmsg) || pos + psize > data_size 57 if (0 == parts && NULL != first_ptype)
58 *first_ptype = ptype;
59 if (NULL != last_ptype
60 && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
61 *last_ptype = ptype;
62 if (psize < sizeof (*pmsg)
63 || pos + psize > data_size
53 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD 64 || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
54 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype) 65 || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
55 { 66 {
56 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 67 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
57 "Invalid message part of type %u and size %u.\n", 68 "Invalid message part of type %u and size %u.\n",
58 ptype, psize); 69 ptype, psize);
59 return GNUNET_NO; 70 return GNUNET_SYSERR;
60 } 71 }
72 /* FIXME: check message part order */
61 } 73 }
62 return ptype; 74 return parts;
63} 75}
64 76
65 77
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 360d56c06..f58ecb7f6 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -35,9 +35,9 @@
35#include "gnunet_env_lib.h" 35#include "gnunet_env_lib.h"
36#include "gnunet_psyc_service.h" 36#include "gnunet_psyc_service.h"
37 37
38#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) 38#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
39 39
40#define DEBUG_SERVICE 1 40#define DEBUG_SERVICE 0
41 41
42 42
43/** 43/**
@@ -120,7 +120,7 @@ cleanup ()
120 120
121 121
122/** 122/**
123 * Terminate the testcase (failure). 123 * Terminate the test case (failure).
124 * 124 *
125 * @param cls NULL 125 * @param cls NULL
126 * @param tc scheduler context 126 * @param tc scheduler context
@@ -134,7 +134,7 @@ end_badly (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
134 134
135 135
136/** 136/**
137 * Terminate the testcase (success). 137 * Terminate the test case (success).
138 * 138 *
139 * @param cls NULL 139 * @param cls NULL
140 * @param tc scheduler context 140 * @param tc scheduler context
@@ -148,7 +148,7 @@ end_normally (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
148 148
149 149
150/** 150/**
151 * Finish the testcase (successfully). 151 * Finish the test case (successfully).
152 */ 152 */
153static void 153static void
154end () 154end ()
@@ -518,7 +518,8 @@ run (void *cls,
518 518
519 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); 519 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
520 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, 520 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE,
521 &master_message, &join_request, &master_started, NULL); 521 &master_message, &join_request,
522 &master_started, NULL);
522} 523}
523 524
524 525
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c
index 88ae1185b..a5fdea10b 100644
--- a/src/psycstore/psycstore_api.c
+++ b/src/psycstore/psycstore_api.c
@@ -1099,9 +1099,9 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1099 req->message_id = GNUNET_htonll (message_id); 1099 req->message_id = GNUNET_htonll (message_id);
1100 req->name_size = htons (name_size); 1100 req->name_size = htons (name_size);
1101 req->flags 1101 req->flags
1102 = 0 == i 1102 = (0 == i)
1103 ? STATE_OP_FIRST 1103 ? STATE_OP_FIRST
1104 : modifier_count - 1 == i 1104 : (modifier_count - 1 == i)
1105 ? STATE_OP_LAST 1105 ? STATE_OP_LAST
1106 : 0; 1106 : 0;
1107 1107