diff options
author | Gabor X Toth <*@tg-x.net> | 2014-05-06 10:26:24 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-05-06 10:26:24 +0000 |
commit | 4e1baae59f18ee5d7cd47afe28ced3daaaa5a5ad (patch) | |
tree | 4e0d4a1bf488d4969f44e3a8db26af6ca22d4db7 /src | |
parent | b2061e704ac6309bb7ee4427a89a1572aa9f339e (diff) | |
download | gnunet-4e1baae59f18ee5d7cd47afe28ced3daaaa5a5ad.tar.gz gnunet-4e1baae59f18ee5d7cd47afe28ced3daaaa5a5ad.zip |
psyc: in-order message delivery
Diffstat (limited to 'src')
-rw-r--r-- | src/include/gnunet_protocols.h | 2 | ||||
-rw-r--r-- | src/include/gnunet_psyc_service.h | 43 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 10 | ||||
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 724 | ||||
-rw-r--r-- | src/psyc/psyc.h | 29 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 4 | ||||
-rw-r--r-- | src/psyc/psyc_common.c | 38 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 13 | ||||
-rw-r--r-- | src/psycstore/psycstore_api.c | 4 |
9 files changed, 592 insertions, 275 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 715f91809..93965fd9c 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -2148,7 +2148,7 @@ extern "C" | |||
2148 | /** | 2148 | /** |
2149 | * C: client | 2149 | * C: client |
2150 | * S: service | 2150 | * S: service |
2151 | * M: muticast | 2151 | * M: multicast |
2152 | */ | 2152 | */ |
2153 | 2153 | ||
2154 | /** S->C: result of an operation */ | 2154 | /** S->C: result of an operation */ |
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h index 018f012f4..928e05242 100644 --- a/src/include/gnunet_psyc_service.h +++ b/src/include/gnunet_psyc_service.h | |||
@@ -167,7 +167,23 @@ enum GNUNET_PSYC_MessageFlags | |||
167 | /** | 167 | /** |
168 | * Request from slave to master. | 168 | * Request from slave to master. |
169 | */ | 169 | */ |
170 | GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1 | 170 | GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1, |
171 | |||
172 | /** | ||
173 | * Message can be delivered out of order. | ||
174 | */ | ||
175 | GNUNET_PSYC_MESSAGE_ORDER_ANY = 1 << 2 | ||
176 | }; | ||
177 | |||
178 | |||
179 | /** | ||
180 | * Values for the @a state_delta field of GNUNET_PSYC_MessageHeader. | ||
181 | */ | ||
182 | enum GNUNET_PSYC_StateDeltaValues | ||
183 | { | ||
184 | GNUNET_PSYC_STATE_RESET = 0, | ||
185 | |||
186 | GNUNET_PSYC_STATE_NOT_MODIFIED = UINT64_MAX | ||
171 | }; | 187 | }; |
172 | 188 | ||
173 | 189 | ||
@@ -175,6 +191,8 @@ GNUNET_NETWORK_STRUCT_BEGIN | |||
175 | 191 | ||
176 | /** | 192 | /** |
177 | * Header of a PSYC message. | 193 | * Header of a PSYC message. |
194 | * | ||
195 | * Only present when receiving a message. | ||
178 | */ | 196 | */ |
179 | struct GNUNET_PSYC_MessageHeader | 197 | struct GNUNET_PSYC_MessageHeader |
180 | { | 198 | { |
@@ -223,6 +241,12 @@ struct GNUNET_PSYC_MessageMethod | |||
223 | */ | 241 | */ |
224 | uint32_t flags GNUNET_PACKED; | 242 | uint32_t flags GNUNET_PACKED; |
225 | 243 | ||
244 | /** | ||
245 | * Number of message IDs since the last message that contained state | ||
246 | * operations. @see enum GNUNET_PSYC_StateDeltaValues | ||
247 | */ | ||
248 | uint64_t state_delta GNUNET_PACKED; | ||
249 | |||
226 | /* Followed by NUL-terminated method name. */ | 250 | /* Followed by NUL-terminated method name. */ |
227 | }; | 251 | }; |
228 | 252 | ||
@@ -479,22 +503,29 @@ typedef int | |||
479 | enum GNUNET_PSYC_MasterTransmitFlags | 503 | enum GNUNET_PSYC_MasterTransmitFlags |
480 | { | 504 | { |
481 | GNUNET_PSYC_MASTER_TRANSMIT_NONE = 0, | 505 | GNUNET_PSYC_MASTER_TRANSMIT_NONE = 0, |
506 | |||
482 | /** | 507 | /** |
483 | * Whether this message should reset the channel state, | 508 | * Whether this message should reset the channel state, |
484 | * i.e. remove all previously stored state variables. | 509 | * i.e. remove all previously stored state variables. |
485 | */ | 510 | */ |
486 | GNUNET_PSYC_MASTER_TRANSMIT_RESET_STATE = 1 << 0, | 511 | |
512 | GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET = 1 << 0, | ||
487 | 513 | ||
488 | /** | 514 | /** |
489 | * Whether we need to increment the group generation counter after | 515 | * Whether this message contains any state modifiers. |
490 | * transmitting this message. | ||
491 | */ | 516 | */ |
492 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN = 1 << 1, | 517 | GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY = 1 << 1, |
493 | 518 | ||
494 | /** | 519 | /** |
495 | * Add PSYC header variable with the hash of the current channel state. | 520 | * Add PSYC header variable with the hash of the current channel state. |
496 | */ | 521 | */ |
497 | GNUNET_PSYC_MASTER_TRANSMIT_ADD_STATE_HASH = 1 << 2 | 522 | GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH = 1 << 2, |
523 | |||
524 | /** | ||
525 | * Whether we need to increment the group generation counter after | ||
526 | * transmitting this message. | ||
527 | */ | ||
528 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN = 1 << 3 | ||
498 | }; | 529 | }; |
499 | 530 | ||
500 | 531 | ||
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index b23320c72..da81de486 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c | |||
@@ -182,7 +182,7 @@ message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash, | |||
182 | { | 182 | { |
183 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 183 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
184 | "Calling origin's message callback " | 184 | "Calling origin's message callback " |
185 | "for a message of type %u and size %u.\n", | 185 | "with a message of type %u and size %u.\n", |
186 | ntohs (msg->type), ntohs (msg->size)); | 186 | ntohs (msg->type), ntohs (msg->size)); |
187 | struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp; | 187 | struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp; |
188 | orig->message_cb (orig->cls, msg); | 188 | orig->message_cb (orig->cls, msg); |
@@ -190,8 +190,8 @@ message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash, | |||
190 | else | 190 | else |
191 | { | 191 | { |
192 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 192 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
193 | "Calling slave's message callback " | 193 | "Calling member's message callback " |
194 | "for a message of type %u and size %u.\n", | 194 | "with a message of type %u and size %u.\n", |
195 | ntohs (msg->type), ntohs (msg->size)); | 195 | ntohs (msg->type), ntohs (msg->size)); |
196 | struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp; | 196 | struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp; |
197 | mem->message_cb (mem->cls, msg); | 197 | mem->message_cb (mem->cls, msg); |
@@ -477,8 +477,8 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc | |||
477 | msg->group_generation = mh->group_generation; | 477 | msg->group_generation = mh->group_generation; |
478 | 478 | ||
479 | /* FIXME: add fragment ID and signature in the service instead of here */ | 479 | /* FIXME: add fragment ID and signature in the service instead of here */ |
480 | msg->fragment_id = GNUNET_ntohll (orig->next_fragment_id++); | 480 | msg->fragment_id = GNUNET_htonll (orig->next_fragment_id++); |
481 | msg->fragment_offset = GNUNET_ntohll (mh->fragment_offset); | 481 | msg->fragment_offset = GNUNET_htonll (mh->fragment_offset); |
482 | mh->fragment_offset += sizeof (*msg) + buf_size; | 482 | mh->fragment_offset += sizeof (*msg) + buf_size; |
483 | msg->purpose.size = htonl (sizeof (*msg) + buf_size | 483 | msg->purpose.size = htonl (sizeof (*msg) + buf_size |
484 | - sizeof (msg->header) | 484 | - sizeof (msg->header) |
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index fb3aa65f7..3a29c8ffd 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -73,19 +73,21 @@ struct TransmitMessage | |||
73 | struct TransmitMessage *next; | 73 | struct TransmitMessage *next; |
74 | 74 | ||
75 | /** | 75 | /** |
76 | * Buffer with message to be transmitted. | 76 | * ID assigned to the message. |
77 | */ | 77 | */ |
78 | char *buf; | 78 | uint64_t id; |
79 | 79 | ||
80 | /** | 80 | /** |
81 | * Size of @a buf | 81 | * Size of @a buf |
82 | */ | 82 | */ |
83 | uint16_t size | 83 | uint16_t size; |
84 | ; | 84 | |
85 | /** | 85 | /** |
86 | * @see enum MessageState | 86 | * @see enum MessageState |
87 | */ | 87 | */ |
88 | uint8_t state; | 88 | uint8_t state; |
89 | |||
90 | /* Followed by message */ | ||
89 | }; | 91 | }; |
90 | 92 | ||
91 | 93 | ||
@@ -100,9 +102,9 @@ static struct GNUNET_CONTAINER_MultiHashMap *recv_cache; | |||
100 | 102 | ||
101 | /** | 103 | /** |
102 | * Entry in the chan_msgs hashmap of @a recv_cache: | 104 | * Entry in the chan_msgs hashmap of @a recv_cache: |
103 | * fragment_id -> FragmentEntry | 105 | * fragment_id -> RecvCacheEntry |
104 | */ | 106 | */ |
105 | struct FragmentEntry | 107 | struct RecvCacheEntry |
106 | { | 108 | { |
107 | struct GNUNET_MULTICAST_MessageHeader *mmsg; | 109 | struct GNUNET_MULTICAST_MessageHeader *mmsg; |
108 | uint16_t ref_count; | 110 | uint16_t ref_count; |
@@ -110,20 +112,48 @@ struct FragmentEntry | |||
110 | 112 | ||
111 | 113 | ||
112 | /** | 114 | /** |
113 | * Entry in the @a recv_msgs hash map of a @a Channel. | 115 | * Entry in the @a recv_frags hash map of a @a Channel. |
114 | * message_id -> FragmentCache | 116 | * message_id -> FragmentQueue |
115 | */ | 117 | */ |
116 | struct FragmentCache | 118 | struct FragmentQueue |
117 | { | 119 | { |
118 | /** | 120 | /** |
119 | * Total size of header fragments (METHOD & MODIFIERs) | 121 | * Fragment IDs stored in @a recv_cache. |
122 | */ | ||
123 | struct GNUNET_CONTAINER_Heap *fragments; | ||
124 | |||
125 | /** | ||
126 | * Total size of received fragments. | ||
127 | */ | ||
128 | uint64_t size; | ||
129 | |||
130 | /** | ||
131 | * Total size of received header fragments (METHOD & MODIFIERs) | ||
120 | */ | 132 | */ |
121 | uint64_t header_size; | 133 | uint64_t header_size; |
122 | 134 | ||
123 | /** | 135 | /** |
124 | * Fragment IDs stored in @a recv_cache. | 136 | * The @a state_delta field from struct GNUNET_PSYC_MessageMethod. |
125 | */ | 137 | */ |
126 | struct GNUNET_CONTAINER_Heap *fragments; | 138 | uint64_t state_delta; |
139 | |||
140 | /** | ||
141 | * The @a flags field from struct GNUNET_PSYC_MessageMethod. | ||
142 | */ | ||
143 | uint32_t flags; | ||
144 | |||
145 | /** | ||
146 | * Receive state of message. | ||
147 | * | ||
148 | * @see MessageFragmentState | ||
149 | */ | ||
150 | uint8_t state; | ||
151 | |||
152 | /** | ||
153 | * Is the message queued for delivery to the client? | ||
154 | * i.e. added to the recv_msgs queue | ||
155 | */ | ||
156 | uint8_t queued; | ||
127 | }; | 157 | }; |
128 | 158 | ||
129 | 159 | ||
@@ -139,12 +169,17 @@ struct Channel | |||
139 | 169 | ||
140 | /** | 170 | /** |
141 | * Received fragments not yet sent to the client. | 171 | * Received fragments not yet sent to the client. |
142 | * message_id -> FragmentCache | 172 | * message_id -> FragmentQueue |
173 | */ | ||
174 | struct GNUNET_CONTAINER_MultiHashMap *recv_frags; | ||
175 | |||
176 | /** | ||
177 | * Received message IDs not yet sent to the client. | ||
143 | */ | 178 | */ |
144 | struct GNUNET_CONTAINER_MultiHashMap *recv_msgs; | 179 | struct GNUNET_CONTAINER_Heap *recv_msgs; |
145 | 180 | ||
146 | /** | 181 | /** |
147 | * FIXME | 182 | * FIXME: needed? |
148 | */ | 183 | */ |
149 | GNUNET_SCHEDULER_TaskIdentifier tmit_task; | 184 | GNUNET_SCHEDULER_TaskIdentifier tmit_task; |
150 | 185 | ||
@@ -159,6 +194,19 @@ struct Channel | |||
159 | struct GNUNET_HashCode pub_key_hash; | 194 | struct GNUNET_HashCode pub_key_hash; |
160 | 195 | ||
161 | /** | 196 | /** |
197 | * Last message ID sent to the client. | ||
198 | * 0 if there is no such message. | ||
199 | */ | ||
200 | uint64_t max_message_id; | ||
201 | |||
202 | /** | ||
203 | * ID of the last stateful message, where the state operations has been | ||
204 | * processed and saved to PSYCstore and which has been sent to the client. | ||
205 | * 0 if there is no such message. | ||
206 | */ | ||
207 | uint64_t max_state_message_id; | ||
208 | |||
209 | /** | ||
162 | * Expected value size for the modifier being received from the PSYC service. | 210 | * Expected value size for the modifier being received from the PSYC service. |
163 | */ | 211 | */ |
164 | uint32_t tmit_mod_value_size_expected; | 212 | uint32_t tmit_mod_value_size_expected; |
@@ -174,7 +222,7 @@ struct Channel | |||
174 | uint8_t tmit_state; | 222 | uint8_t tmit_state; |
175 | 223 | ||
176 | /** | 224 | /** |
177 | * FIXME | 225 | * FIXME: needed? |
178 | */ | 226 | */ |
179 | uint8_t in_transmit; | 227 | uint8_t in_transmit; |
180 | 228 | ||
@@ -221,7 +269,7 @@ struct Master | |||
221 | struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; | 269 | struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; |
222 | 270 | ||
223 | /** | 271 | /** |
224 | * Maximum message ID for this channel. | 272 | * Last message ID transmitted to this channel. |
225 | * | 273 | * |
226 | * Incremented before sending a message, thus the message_id in messages sent | 274 | * Incremented before sending a message, thus the message_id in messages sent |
227 | * starts from 1. | 275 | * starts from 1. |
@@ -229,13 +277,13 @@ struct Master | |||
229 | uint64_t max_message_id; | 277 | uint64_t max_message_id; |
230 | 278 | ||
231 | /** | 279 | /** |
232 | * ID of the last message that contains any state operations. | 280 | * ID of the last message with state operations transmitted to the channel. |
233 | * 0 if there is no such message. | 281 | * 0 if there is no such message. |
234 | */ | 282 | */ |
235 | uint64_t max_state_message_id; | 283 | uint64_t max_state_message_id; |
236 | 284 | ||
237 | /** | 285 | /** |
238 | * Maximum group generation for this channel. | 286 | * Maximum group generation transmitted to the channel. |
239 | */ | 287 | */ |
240 | uint64_t max_group_generation; | 288 | uint64_t max_group_generation; |
241 | 289 | ||
@@ -292,11 +340,6 @@ struct Slave | |||
292 | struct GNUNET_MessageHeader *join_req; | 340 | struct GNUNET_MessageHeader *join_req; |
293 | 341 | ||
294 | /** | 342 | /** |
295 | * Maximum message ID for this channel. | ||
296 | */ | ||
297 | uint64_t max_message_id; | ||
298 | |||
299 | /** | ||
300 | * Maximum request ID for this channel. | 343 | * Maximum request ID for this channel. |
301 | */ | 344 | */ |
302 | uint64_t max_request_id; | 345 | uint64_t max_request_id; |
@@ -304,7 +347,7 @@ struct Slave | |||
304 | 347 | ||
305 | 348 | ||
306 | static inline void | 349 | static inline void |
307 | transmit_message (struct Channel *ch, uint8_t inc_msg_id); | 350 | transmit_message (struct Channel *ch); |
308 | 351 | ||
309 | 352 | ||
310 | /** | 353 | /** |
@@ -386,7 +429,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
386 | /* Send pending messages to multicast before cleanup. */ | 429 | /* Send pending messages to multicast before cleanup. */ |
387 | if (NULL != ch->tmit_head) | 430 | if (NULL != ch->tmit_head) |
388 | { | 431 | { |
389 | transmit_message (ch, GNUNET_NO); | 432 | transmit_message (ch); |
390 | } | 433 | } |
391 | else | 434 | else |
392 | { | 435 | { |
@@ -484,6 +527,7 @@ static inline void | |||
484 | hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n) | 527 | hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n) |
485 | { | 528 | { |
486 | /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */ | 529 | /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */ |
530 | /* TODO: use built-in byte swap functions if available */ | ||
487 | 531 | ||
488 | n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL); | 532 | n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL); |
489 | n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL); | 533 | n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL); |
@@ -512,29 +556,40 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) | |||
512 | } | 556 | } |
513 | 557 | ||
514 | 558 | ||
559 | /** | ||
560 | * Insert a multicast message fragment into the queue belonging to the message. | ||
561 | * | ||
562 | * @param ch Channel. | ||
563 | * @param mmsg Multicast message fragment. | ||
564 | * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. | ||
565 | * @param first_ptype First PSYC message part type in @a mmsg. | ||
566 | * @param last_ptype Last PSYC message part type in @a mmsg. | ||
567 | */ | ||
515 | static void | 568 | static void |
516 | fragment_cache_insert (struct Channel *ch, | 569 | fragment_queue_insert (struct Channel *ch, |
517 | const struct GNUNET_HashCode *msg_id, | ||
518 | struct FragmentCache *frag_cache, | ||
519 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, | 570 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, |
520 | uint16_t last_part_type) | 571 | uint16_t first_ptype, uint16_t last_ptype) |
521 | { | 572 | { |
522 | uint16_t size = ntohs (mmsg->header.size); | 573 | const uint16_t size = ntohs (mmsg->header.size); |
574 | const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset); | ||
523 | struct GNUNET_CONTAINER_MultiHashMap | 575 | struct GNUNET_CONTAINER_MultiHashMap |
524 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, | 576 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, |
525 | &ch->pub_key_hash); | 577 | &ch->pub_key_hash); |
526 | 578 | ||
527 | if (NULL == frag_cache) | 579 | struct GNUNET_HashCode msg_id_hash; |
580 | hash_key_from_nll (&msg_id_hash, mmsg->message_id); | ||
581 | |||
582 | struct FragmentQueue | ||
583 | *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); | ||
584 | |||
585 | if (NULL == fragq) | ||
528 | { | 586 | { |
529 | frag_cache = GNUNET_new (struct FragmentCache); | 587 | fragq = GNUNET_new (struct FragmentQueue); |
530 | frag_cache->fragments | 588 | fragq->state = MSG_FRAG_STATE_HEADER; |
589 | fragq->fragments | ||
531 | = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | 590 | = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); |
532 | 591 | ||
533 | if (NULL == ch->recv_msgs) | 592 | GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq, |
534 | { | ||
535 | ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | ||
536 | } | ||
537 | GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache, | ||
538 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | 593 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); |
539 | 594 | ||
540 | if (NULL == chan_msgs) | 595 | if (NULL == chan_msgs) |
@@ -545,190 +600,335 @@ fragment_cache_insert (struct Channel *ch, | |||
545 | } | 600 | } |
546 | } | 601 | } |
547 | 602 | ||
548 | struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode); | 603 | struct GNUNET_HashCode frag_id_hash; |
549 | hash_key_from_nll (frag_id, mmsg->fragment_id); | 604 | hash_key_from_nll (&frag_id_hash, mmsg->fragment_id); |
550 | struct FragmentEntry | 605 | struct RecvCacheEntry |
551 | *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id); | 606 | *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash); |
552 | if (NULL == frag_entry) | 607 | if (NULL == cache_entry) |
553 | { | 608 | { |
554 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 609 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
555 | "%p Adding message fragment to cache. " | 610 | "%p Adding message fragment to cache. " |
556 | "fragment_id: %" PRIu64 ", " | 611 | "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", " |
557 | "header_size: %" PRIu64 " + %" PRIu64 ").\n", | 612 | "header_size: %" PRIu64 " + %u).\n", |
558 | ch, GNUNET_ntohll (mmsg->fragment_id), | 613 | ch, GNUNET_ntohll (mmsg->message_id), |
559 | frag_cache->header_size, size); | 614 | GNUNET_ntohll (mmsg->fragment_id), |
560 | frag_entry = GNUNET_new (struct FragmentEntry); | 615 | fragq->header_size, size); |
561 | frag_entry->ref_count = 1; | 616 | cache_entry = GNUNET_new (struct RecvCacheEntry); |
562 | frag_entry->mmsg = GNUNET_malloc (size); | 617 | cache_entry->ref_count = 1; |
563 | memcpy (frag_entry->mmsg, mmsg, size); | 618 | cache_entry->mmsg = GNUNET_malloc (size); |
564 | GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry, | 619 | memcpy (cache_entry->mmsg, mmsg, size); |
620 | GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry, | ||
565 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | 621 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); |
566 | } | 622 | } |
567 | else | 623 | else |
568 | { | 624 | { |
569 | frag_entry->ref_count++; | 625 | cache_entry->ref_count++; |
570 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 626 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
571 | "%p Message fragment already in cache. " | 627 | "%p Message fragment is already in cache. " |
572 | "fragment_id: %" PRIu64 ", ref_count: %u\n", | 628 | "message_id: %" PRIu64 ", fragment_id: %" PRIu64 |
573 | ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count); | 629 | ", ref_count: %u\n", |
630 | ch, GNUNET_ntohll (mmsg->message_id), | ||
631 | GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count); | ||
632 | } | ||
633 | |||
634 | if (MSG_FRAG_STATE_HEADER == fragq->state) | ||
635 | { | ||
636 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) | ||
637 | { | ||
638 | struct GNUNET_PSYC_MessageMethod * | ||
639 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1]; | ||
640 | fragq->state_delta = GNUNET_ntohll (pmeth->state_delta); | ||
641 | fragq->flags = ntohl (pmeth->flags); | ||
642 | } | ||
643 | |||
644 | if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA) | ||
645 | { | ||
646 | fragq->header_size += size; | ||
647 | } | ||
648 | else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype | ||
649 | || frag_offset == fragq->header_size) | ||
650 | { /* header is now complete */ | ||
651 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
652 | "%p Header of message %" PRIu64 " is complete.\n", | ||
653 | ch, GNUNET_ntohll (mmsg->message_id)); | ||
654 | |||
655 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
656 | "%p Adding message %" PRIu64 " to queue.\n", | ||
657 | ch, GNUNET_ntohll (mmsg->message_id)); | ||
658 | fragq->state = MSG_FRAG_STATE_DATA; | ||
659 | } | ||
660 | else | ||
661 | { | ||
662 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
663 | "%p Header of message %" PRIu64 " is NOT complete yet: " | ||
664 | "%" PRIu64 " != %" PRIu64 "\n", | ||
665 | ch, GNUNET_ntohll (mmsg->message_id), frag_offset, | ||
666 | fragq->header_size); | ||
667 | } | ||
574 | } | 668 | } |
575 | 669 | ||
576 | switch (last_part_type) | 670 | switch (last_ptype) |
577 | { | 671 | { |
578 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 672 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: |
579 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | 673 | if (frag_offset == fragq->size) |
580 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | 674 | fragq->state = MSG_FRAG_STATE_END; |
581 | frag_cache->header_size += size; | 675 | else |
676 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
677 | "%p Message %" PRIu64 " is NOT complete yet: " | ||
678 | "%" PRIu64 " != %" PRIu64 "\n", | ||
679 | ch, GNUNET_ntohll (mmsg->message_id), frag_offset, | ||
680 | fragq->size); | ||
681 | break; | ||
682 | |||
683 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
684 | /* Drop message without delivering to client if it's a single fragment */ | ||
685 | fragq->state = | ||
686 | (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) | ||
687 | ? MSG_FRAG_STATE_DROP | ||
688 | : MSG_FRAG_STATE_CANCEL; | ||
689 | } | ||
690 | |||
691 | switch (fragq->state) | ||
692 | { | ||
693 | case MSG_FRAG_STATE_DATA: | ||
694 | case MSG_FRAG_STATE_END: | ||
695 | case MSG_FRAG_STATE_CANCEL: | ||
696 | if (GNUNET_NO == fragq->queued) | ||
697 | { | ||
698 | GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL, | ||
699 | GNUNET_ntohll (mmsg->message_id)); | ||
700 | fragq->queued = GNUNET_YES; | ||
701 | } | ||
582 | } | 702 | } |
583 | GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id, | 703 | |
704 | fragq->size += size; | ||
705 | GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL, | ||
584 | GNUNET_ntohll (mmsg->fragment_id)); | 706 | GNUNET_ntohll (mmsg->fragment_id)); |
585 | } | 707 | } |
586 | 708 | ||
587 | 709 | ||
710 | /** | ||
711 | * Run fragment queue of a message. | ||
712 | * | ||
713 | * Send fragments of a message in order to client, after all modifiers arrived | ||
714 | * from multicast. | ||
715 | * | ||
716 | * @param ch Channel. | ||
717 | * @param msg_id ID of the message @a fragq belongs to. | ||
718 | * @param fragq Fragment queue of the message. | ||
719 | * @param drop Drop message without delivering to client? | ||
720 | * #GNUNET_YES or #GNUNET_NO. | ||
721 | */ | ||
588 | static void | 722 | static void |
589 | fragment_cache_clear (struct Channel *ch, | 723 | fragment_queue_run (struct Channel *ch, uint64_t msg_id, |
590 | const struct GNUNET_HashCode *msg_id, | 724 | struct FragmentQueue *fragq, uint8_t drop) |
591 | struct FragmentCache *frag_cache, | ||
592 | uint8_t send_to_client) | ||
593 | { | 725 | { |
594 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 726 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
595 | "%p Clearing message fragment cache.\n", ch); | 727 | "%p Running message fragment queue for message %" PRIu64 |
728 | " (state: %u).\n", | ||
729 | ch, msg_id, fragq->state); | ||
596 | 730 | ||
597 | struct GNUNET_CONTAINER_MultiHashMap | 731 | struct GNUNET_CONTAINER_MultiHashMap |
598 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, | 732 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, |
599 | &ch->pub_key_hash); | 733 | &ch->pub_key_hash); |
600 | GNUNET_assert (NULL != chan_msgs); | 734 | GNUNET_assert (NULL != chan_msgs); |
601 | struct GNUNET_HashCode *frag_id; | 735 | uint64_t frag_id; |
602 | 736 | ||
603 | while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments))) | 737 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, |
738 | &frag_id)) | ||
604 | { | 739 | { |
605 | struct FragmentEntry | 740 | struct GNUNET_HashCode frag_id_hash; |
606 | *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id); | 741 | hash_key_from_hll (&frag_id_hash, frag_id); |
607 | if (frag_entry != NULL) | 742 | struct RecvCacheEntry *cache_entry |
743 | = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash); | ||
744 | if (cache_entry != NULL) | ||
608 | { | 745 | { |
609 | if (GNUNET_YES == send_to_client) | 746 | if (GNUNET_NO == drop) |
610 | { | 747 | { |
611 | message_to_client (ch, frag_entry->mmsg); | 748 | message_to_client (ch, cache_entry->mmsg); |
612 | } | 749 | } |
613 | if (1 == frag_entry->ref_count) | 750 | if (cache_entry->ref_count <= 1) |
614 | { | 751 | { |
615 | GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry); | 752 | GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash, |
616 | GNUNET_free (frag_entry->mmsg); | 753 | cache_entry); |
617 | GNUNET_free (frag_entry); | 754 | GNUNET_free (cache_entry->mmsg); |
755 | GNUNET_free (cache_entry); | ||
618 | } | 756 | } |
619 | else | 757 | else |
620 | { | 758 | { |
621 | frag_entry->ref_count--; | 759 | cache_entry->ref_count--; |
622 | } | 760 | } |
623 | } | 761 | } |
624 | GNUNET_free (frag_id); | 762 | #if CACHE_AGING_IMPLEMENTED |
763 | else if (GNUNET_NO == drop) | ||
764 | { | ||
765 | /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */ | ||
766 | } | ||
767 | #endif | ||
768 | |||
769 | GNUNET_CONTAINER_heap_remove_root (fragq->fragments); | ||
625 | } | 770 | } |
626 | 771 | ||
627 | GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache); | 772 | if (MSG_FRAG_STATE_END <= fragq->state) |
628 | GNUNET_CONTAINER_heap_destroy (frag_cache->fragments); | 773 | { |
629 | GNUNET_free (frag_cache); | 774 | struct GNUNET_HashCode msg_id_hash; |
775 | hash_key_from_nll (&msg_id_hash, msg_id); | ||
776 | |||
777 | GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq); | ||
778 | GNUNET_CONTAINER_heap_destroy (fragq->fragments); | ||
779 | GNUNET_free (fragq); | ||
780 | } | ||
781 | else | ||
782 | { | ||
783 | fragq->queued = GNUNET_NO; | ||
784 | } | ||
630 | } | 785 | } |
631 | 786 | ||
632 | 787 | ||
633 | /** | 788 | /** |
634 | * Incoming message fragment from multicast. | 789 | * Run message queue. |
635 | * | 790 | * |
636 | * Store it using PSYCstore and send it to the client of the channel. | 791 | * Send messages in queue to client in order after a message has arrived from |
792 | * multicast, according to the following: | ||
793 | * - A message is only sent if all of its modifiers arrived. | ||
794 | * - A stateful message is only sent if the previous stateful message | ||
795 | * has already been delivered to the client. | ||
796 | * | ||
797 | * @param ch Channel. | ||
798 | * @return Number of messages removed from queue and sent to client. | ||
637 | */ | 799 | */ |
638 | static void | 800 | static uint64_t |
639 | message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | 801 | message_queue_run (struct Channel *ch) |
640 | { | 802 | { |
641 | struct Channel *ch = cls; | 803 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
642 | uint16_t type = ntohs (msg->type); | 804 | "%p Running message queue.\n", ch); |
643 | uint16_t size = ntohs (msg->size); | 805 | uint64_t n = 0; |
644 | 806 | uint64_t msg_id; | |
645 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 807 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL, |
646 | "%p Received message of type %u and size %u from multicast.\n", | 808 | &msg_id)) |
647 | ch, type, size); | ||
648 | |||
649 | switch (type) | ||
650 | { | ||
651 | case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: | ||
652 | { | 809 | { |
653 | GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, | 810 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
654 | (const struct | 811 | "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id); |
655 | GNUNET_MULTICAST_MessageHeader *) msg, | 812 | struct GNUNET_HashCode msg_id_hash; |
656 | 0, NULL, NULL); | 813 | hash_key_from_hll (&msg_id_hash, msg_id); |
657 | |||
658 | #if TODO | ||
659 | /* FIXME: apply modifiers to state in PSYCstore */ | ||
660 | GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, | ||
661 | GNUNET_ntohll (mmsg->message_id), | ||
662 | meth->mod_count, mods, | ||
663 | rcb, rcb_cls); | ||
664 | #endif | ||
665 | |||
666 | const struct GNUNET_MULTICAST_MessageHeader | ||
667 | *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg; | ||
668 | 814 | ||
669 | uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg), | 815 | struct FragmentQueue * |
670 | (const char *) &mmsg[1]); | 816 | fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); |
671 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
672 | "Last message part type %u\n", ptype); | ||
673 | 817 | ||
674 | if (GNUNET_NO == ptype) | 818 | if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER) |
675 | { | 819 | { |
676 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 820 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
677 | "%p Received message with invalid parts from multicast. " | 821 | "%p No fragq (%p) or header not complete.\n", |
678 | "Dropping message.\n", ch); | 822 | ch, fragq); |
679 | GNUNET_break_op (0); | ||
680 | break; | 823 | break; |
681 | } | 824 | } |
682 | 825 | ||
683 | struct GNUNET_HashCode msg_id; | 826 | if (MSG_FRAG_STATE_HEADER == fragq->state) |
684 | hash_key_from_nll (&msg_id, mmsg->message_id); | ||
685 | |||
686 | struct FragmentCache *frag_cache = NULL; | ||
687 | if (NULL != ch->recv_msgs) | ||
688 | frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id); | ||
689 | |||
690 | switch (ptype) | ||
691 | { | 827 | { |
692 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | 828 | /* Check if there's a missing message before the current one */ |
693 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | 829 | if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) |
694 | /* FIXME: check state flag / max_state_message_id */ | ||
695 | if (NULL == frag_cache) | ||
696 | { | 830 | { |
697 | message_to_client (ch, mmsg); | 831 | if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) |
698 | break; | 832 | && msg_id - 1 != ch->max_message_id) |
833 | { | ||
834 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
835 | "%p Out of order message. " | ||
836 | "(%" PRIu64 " - 1 != %" PRIu64 ")\n", | ||
837 | ch, msg_id, ch->max_message_id); | ||
838 | break; | ||
839 | } | ||
699 | } | 840 | } |
700 | else | 841 | else |
701 | { | 842 | { |
702 | if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size) | 843 | if (msg_id - fragq->state_delta != ch->max_state_message_id) |
703 | { /* first data fragment after the header, send cached fragments */ | 844 | { |
704 | fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_YES); | 845 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
705 | message_to_client (ch, mmsg); | 846 | "%p Out of order stateful message. " |
847 | "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", | ||
848 | ch, msg_id, fragq->state_delta, ch->max_state_message_id); | ||
706 | break; | 849 | break; |
707 | } | 850 | } |
708 | else | 851 | #if TODO |
709 | { /* still missing fragments from the header, cache data fragment */ | 852 | /* FIXME: apply modifiers to state in PSYCstore */ |
710 | /* fall thru */ | 853 | GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id, |
711 | } | 854 | state_modify_result_cb, cls); |
855 | #endif | ||
856 | ch->max_state_message_id = msg_id; | ||
712 | } | 857 | } |
858 | ch->max_message_id = msg_id; | ||
859 | } | ||
860 | fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); | ||
861 | GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs); | ||
862 | n++; | ||
863 | } | ||
864 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
865 | "%p Removed %" PRIu64 " messages from queue.\n", ch, n); | ||
866 | return n; | ||
867 | } | ||
713 | 868 | ||
714 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
715 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
716 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
717 | /* not all modifiers arrived yet, cache fragment */ | ||
718 | fragment_cache_insert (ch, &msg_id, frag_cache, mmsg, ptype); | ||
719 | break; | ||
720 | 869 | ||
721 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | 870 | /** |
722 | if (NULL != frag_cache) | 871 | * Handle incoming message from multicast. |
723 | { /* fragments not yet sent to client, remove from cache */ | 872 | * |
724 | fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_NO); | 873 | * @param ch Channel. |
725 | } | 874 | * @param mmsg Multicast message. |
726 | else | 875 | * |
727 | { | 876 | * @return #GNUNET_OK or #GNUNET_SYSERR |
728 | message_to_client (ch, mmsg); | 877 | */ |
729 | } | 878 | static int |
730 | break; | 879 | handle_multicast_message (struct Channel *ch, |
731 | } | 880 | const struct GNUNET_MULTICAST_MessageHeader *mmsg) |
881 | { | ||
882 | GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL); | ||
883 | |||
884 | uint16_t size = ntohs (mmsg->header.size); | ||
885 | uint16_t first_ptype = 0, last_ptype = 0; | ||
886 | |||
887 | if (GNUNET_SYSERR | ||
888 | == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg), | ||
889 | (const char *) &mmsg[1], | ||
890 | &first_ptype, &last_ptype)) | ||
891 | { | ||
892 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
893 | "%p Received message with invalid parts from multicast. " | ||
894 | "Dropping message.\n", ch); | ||
895 | GNUNET_break_op (0); | ||
896 | return GNUNET_SYSERR; | ||
897 | } | ||
898 | |||
899 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
900 | "Message parts: first: type %u, last: type %u\n", | ||
901 | first_ptype, last_ptype); | ||
902 | |||
903 | fragment_queue_insert (ch, mmsg, first_ptype, last_ptype); | ||
904 | message_queue_run (ch); | ||
905 | |||
906 | return GNUNET_OK; | ||
907 | } | ||
908 | |||
909 | |||
910 | /** | ||
911 | * Incoming message fragment from multicast. | ||
912 | * | ||
913 | * Store it using PSYCstore and send it to the client of the channel. | ||
914 | */ | ||
915 | static void | ||
916 | message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | ||
917 | { | ||
918 | struct Channel *ch = cls; | ||
919 | uint16_t type = ntohs (msg->type); | ||
920 | uint16_t size = ntohs (msg->size); | ||
921 | |||
922 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
923 | "%p Received message of type %u and size %u from multicast.\n", | ||
924 | ch, type, size); | ||
925 | |||
926 | switch (type) | ||
927 | { | ||
928 | case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: | ||
929 | { | ||
930 | handle_multicast_message (ch, (const struct | ||
931 | GNUNET_MULTICAST_MessageHeader *) msg); | ||
732 | break; | 932 | break; |
733 | } | 933 | } |
734 | default: | 934 | default: |
@@ -770,8 +970,9 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | |||
770 | = (const struct GNUNET_MULTICAST_RequestHeader *) msg; | 970 | = (const struct GNUNET_MULTICAST_RequestHeader *) msg; |
771 | 971 | ||
772 | /* FIXME: see message_cb() */ | 972 | /* FIXME: see message_cb() */ |
773 | if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req), | 973 | if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req), |
774 | (const char *) &req[1])) | 974 | (const char *) &req[1], |
975 | NULL, NULL)) | ||
775 | { | 976 | { |
776 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 977 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
777 | "%p Dropping message with invalid parts " | 978 | "%p Dropping message with invalid parts " |
@@ -825,7 +1026,8 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, | |||
825 | if (GNUNET_OK == result || GNUNET_NO == result) | 1026 | if (GNUNET_OK == result || GNUNET_NO == result) |
826 | { | 1027 | { |
827 | mst->max_message_id = max_message_id; | 1028 | mst->max_message_id = max_message_id; |
828 | mst->max_state_message_id = max_state_message_id; | 1029 | ch->max_message_id = max_message_id; |
1030 | ch->max_state_message_id = max_state_message_id; | ||
829 | mst->max_group_generation = max_group_generation; | 1031 | mst->max_group_generation = max_group_generation; |
830 | mst->origin | 1032 | mst->origin |
831 | = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, | 1033 | = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, |
@@ -860,7 +1062,8 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, | |||
860 | 1062 | ||
861 | if (GNUNET_OK == result || GNUNET_NO == result) | 1063 | if (GNUNET_OK == result || GNUNET_NO == result) |
862 | { | 1064 | { |
863 | slv->max_message_id = max_message_id; | 1065 | ch->max_message_id = max_message_id; |
1066 | ch->max_state_message_id = max_state_message_id; | ||
864 | slv->member | 1067 | slv->member |
865 | = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key, | 1068 | = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key, |
866 | &slv->origin, | 1069 | &slv->origin, |
@@ -879,6 +1082,15 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, | |||
879 | } | 1082 | } |
880 | 1083 | ||
881 | 1084 | ||
1085 | static void | ||
1086 | channel_init (struct Channel *ch) | ||
1087 | { | ||
1088 | ch->recv_msgs | ||
1089 | = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | ||
1090 | ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | ||
1091 | } | ||
1092 | |||
1093 | |||
882 | /** | 1094 | /** |
883 | * Handle a connecting client starting a channel master. | 1095 | * Handle a connecting client starting a channel master. |
884 | */ | 1096 | */ |
@@ -888,14 +1100,18 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, | |||
888 | { | 1100 | { |
889 | const struct MasterStartRequest *req | 1101 | const struct MasterStartRequest *req |
890 | = (const struct MasterStartRequest *) msg; | 1102 | = (const struct MasterStartRequest *) msg; |
1103 | |||
891 | struct Master *mst = GNUNET_new (struct Master); | 1104 | struct Master *mst = GNUNET_new (struct Master); |
1105 | mst->policy = ntohl (req->policy); | ||
1106 | mst->priv_key = req->channel_key; | ||
1107 | |||
892 | struct Channel *ch = &mst->channel; | 1108 | struct Channel *ch = &mst->channel; |
893 | ch->client = client; | 1109 | ch->client = client; |
894 | ch->is_master = GNUNET_YES; | 1110 | ch->is_master = GNUNET_YES; |
895 | mst->policy = ntohl (req->policy); | ||
896 | mst->priv_key = req->channel_key; | ||
897 | GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key); | 1111 | GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key); |
898 | GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash); | 1112 | GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash); |
1113 | channel_init (ch); | ||
1114 | |||
899 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1115 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
900 | "%p Master connected to channel %s.\n", | 1116 | "%p Master connected to channel %s.\n", |
901 | mst, GNUNET_h2s (&ch->pub_key_hash)); | 1117 | mst, GNUNET_h2s (&ch->pub_key_hash)); |
@@ -919,13 +1135,7 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
919 | const struct SlaveJoinRequest *req | 1135 | const struct SlaveJoinRequest *req |
920 | = (const struct SlaveJoinRequest *) msg; | 1136 | = (const struct SlaveJoinRequest *) msg; |
921 | struct Slave *slv = GNUNET_new (struct Slave); | 1137 | struct Slave *slv = GNUNET_new (struct Slave); |
922 | struct Channel *ch = &slv->channel; | ||
923 | slv->channel.client = client; | ||
924 | slv->channel.is_master = GNUNET_NO; | ||
925 | slv->slave_key = req->slave_key; | 1138 | slv->slave_key = req->slave_key; |
926 | ch->pub_key = req->channel_key; | ||
927 | GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), | ||
928 | &ch->pub_key_hash); | ||
929 | slv->origin = req->origin; | 1139 | slv->origin = req->origin; |
930 | slv->relay_count = ntohl (req->relay_count); | 1140 | slv->relay_count = ntohl (req->relay_count); |
931 | if (0 < slv->relay_count) | 1141 | if (0 < slv->relay_count) |
@@ -939,6 +1149,14 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
939 | memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); | 1149 | memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); |
940 | } | 1150 | } |
941 | 1151 | ||
1152 | struct Channel *ch = &slv->channel; | ||
1153 | ch->client = client; | ||
1154 | ch->is_master = GNUNET_NO; | ||
1155 | ch->pub_key = req->channel_key; | ||
1156 | GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), | ||
1157 | &ch->pub_key_hash); | ||
1158 | channel_init (ch); | ||
1159 | |||
942 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1160 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
943 | "%p Slave connected to channel %s.\n", | 1161 | "%p Slave connected to channel %s.\n", |
944 | slv, GNUNET_h2s (&ch->pub_key_hash)); | 1162 | slv, GNUNET_h2s (&ch->pub_key_hash)); |
@@ -991,7 +1209,7 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
991 | "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size); | 1209 | "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size); |
992 | 1210 | ||
993 | *data_size = tmit_msg->size; | 1211 | *data_size = tmit_msg->size; |
994 | memcpy (data, tmit_msg->buf, *data_size); | 1212 | memcpy (data, &tmit_msg[1], *data_size); |
995 | 1213 | ||
996 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); | 1214 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); |
997 | GNUNET_free (tmit_msg); | 1215 | GNUNET_free (tmit_msg); |
@@ -1003,7 +1221,7 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
1003 | { | 1221 | { |
1004 | if (NULL != ch->tmit_head) | 1222 | if (NULL != ch->tmit_head) |
1005 | { | 1223 | { |
1006 | transmit_message (ch, GNUNET_NO); | 1224 | transmit_message (ch); |
1007 | } | 1225 | } |
1008 | else if (ch->disconnected) | 1226 | else if (ch->disconnected) |
1009 | { | 1227 | { |
@@ -1054,14 +1272,12 @@ slave_transmit_notify (void *cls, size_t *data_size, void *data) | |||
1054 | * Transmit a message from a channel master to the multicast group. | 1272 | * Transmit a message from a channel master to the multicast group. |
1055 | */ | 1273 | */ |
1056 | static void | 1274 | static void |
1057 | master_transmit_message (struct Master *mst, uint8_t inc_msg_id) | 1275 | master_transmit_message (struct Master *mst) |
1058 | { | 1276 | { |
1059 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst); | 1277 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst); |
1060 | mst->channel.tmit_task = 0; | 1278 | mst->channel.tmit_task = 0; |
1061 | if (NULL == mst->tmit_handle) | 1279 | if (NULL == mst->tmit_handle) |
1062 | { | 1280 | { |
1063 | if (GNUNET_YES == inc_msg_id) | ||
1064 | mst->max_message_id++; | ||
1065 | mst->tmit_handle | 1281 | mst->tmit_handle |
1066 | = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, | 1282 | = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, |
1067 | mst->max_group_generation, | 1283 | mst->max_group_generation, |
@@ -1078,13 +1294,11 @@ master_transmit_message (struct Master *mst, uint8_t inc_msg_id) | |||
1078 | * Transmit a message from a channel slave to the multicast group. | 1294 | * Transmit a message from a channel slave to the multicast group. |
1079 | */ | 1295 | */ |
1080 | static void | 1296 | static void |
1081 | slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id) | 1297 | slave_transmit_message (struct Slave *slv) |
1082 | { | 1298 | { |
1083 | slv->channel.tmit_task = 0; | 1299 | slv->channel.tmit_task = 0; |
1084 | if (NULL == slv->tmit_handle) | 1300 | if (NULL == slv->tmit_handle) |
1085 | { | 1301 | { |
1086 | if (GNUNET_YES == inc_msg_id) | ||
1087 | slv->max_message_id++; | ||
1088 | slv->tmit_handle | 1302 | slv->tmit_handle |
1089 | = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id, | 1303 | = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id, |
1090 | slave_transmit_notify, slv); | 1304 | slave_transmit_notify, slv); |
@@ -1097,29 +1311,94 @@ slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id) | |||
1097 | 1311 | ||
1098 | 1312 | ||
1099 | static inline void | 1313 | static inline void |
1100 | transmit_message (struct Channel *ch, uint8_t inc_msg_id) | 1314 | transmit_message (struct Channel *ch) |
1101 | { | 1315 | { |
1102 | ch->is_master | 1316 | ch->is_master |
1103 | ? master_transmit_message ((struct Master *) ch, inc_msg_id) | 1317 | ? master_transmit_message ((struct Master *) ch) |
1104 | : slave_transmit_message ((struct Slave *) ch, inc_msg_id); | 1318 | : slave_transmit_message ((struct Slave *) ch); |
1105 | } | 1319 | } |
1106 | 1320 | ||
1107 | 1321 | ||
1322 | /** | ||
1323 | * Queue a message from a channel master for sending to the multicast group. | ||
1324 | */ | ||
1108 | static void | 1325 | static void |
1109 | transmit_error (struct Channel *ch) | 1326 | master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, |
1327 | uint16_t first_ptype, uint16_t last_ptype) | ||
1328 | { | ||
1329 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst); | ||
1330 | |||
1331 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) | ||
1332 | { | ||
1333 | tmit_msg->id = ++mst->max_message_id; | ||
1334 | struct GNUNET_PSYC_MessageMethod *pmeth | ||
1335 | = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; | ||
1336 | |||
1337 | if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET) | ||
1338 | { | ||
1339 | pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET); | ||
1340 | } | ||
1341 | else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) | ||
1342 | { | ||
1343 | pmeth->state_delta = GNUNET_htonll (tmit_msg->id | ||
1344 | - mst->max_state_message_id); | ||
1345 | } | ||
1346 | else | ||
1347 | { | ||
1348 | pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); | ||
1349 | } | ||
1350 | } | ||
1351 | } | ||
1352 | |||
1353 | |||
1354 | /** | ||
1355 | * Queue a message from a channel slave for sending to the multicast group. | ||
1356 | */ | ||
1357 | static void | ||
1358 | slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg, | ||
1359 | uint16_t first_ptype, uint16_t last_ptype) | ||
1360 | { | ||
1361 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) | ||
1362 | { | ||
1363 | struct GNUNET_PSYC_MessageMethod *pmeth | ||
1364 | = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; | ||
1365 | pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); | ||
1366 | tmit_msg->id = ++slv->max_request_id; | ||
1367 | } | ||
1368 | } | ||
1369 | |||
1370 | |||
1371 | static void | ||
1372 | queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg, | ||
1373 | uint16_t first_ptype, uint16_t last_ptype) | ||
1110 | { | 1374 | { |
1111 | struct GNUNET_MessageHeader *msg; | 1375 | uint16_t size = ntohs (msg->size) - sizeof (*msg); |
1112 | struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) | 1376 | struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size); |
1113 | + sizeof (*msg)); | 1377 | memcpy (&tmit_msg[1], &msg[1], size); |
1114 | msg = (struct GNUNET_MessageHeader *) &tmit_msg[1]; | 1378 | tmit_msg->size = size; |
1115 | msg->size = ntohs (sizeof (*msg)); | ||
1116 | msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
1117 | |||
1118 | tmit_msg->buf = (char *) &tmit_msg[1]; | ||
1119 | tmit_msg->size = sizeof (*msg); | ||
1120 | tmit_msg->state = ch->tmit_state; | 1379 | tmit_msg->state = ch->tmit_state; |
1380 | |||
1121 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); | 1381 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); |
1122 | transmit_message (ch, GNUNET_NO); | 1382 | |
1383 | ch->is_master | ||
1384 | ? master_queue_message ((struct Master *) ch, tmit_msg, | ||
1385 | first_ptype, last_ptype) | ||
1386 | : slave_queue_message ((struct Slave *) ch, tmit_msg, | ||
1387 | first_ptype, last_ptype); | ||
1388 | } | ||
1389 | |||
1390 | |||
1391 | static void | ||
1392 | transmit_error (struct Channel *ch) | ||
1393 | { | ||
1394 | uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; | ||
1395 | |||
1396 | struct GNUNET_MessageHeader msg; | ||
1397 | msg.size = ntohs (sizeof (msg)); | ||
1398 | msg.type = ntohs (type); | ||
1399 | |||
1400 | queue_message (ch, &msg, type, type); | ||
1401 | transmit_message (ch); | ||
1123 | 1402 | ||
1124 | /* FIXME: cleanup */ | 1403 | /* FIXME: cleanup */ |
1125 | } | 1404 | } |
@@ -1136,6 +1415,10 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
1136 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 1415 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
1137 | GNUNET_assert (NULL != ch); | 1416 | GNUNET_assert (NULL != ch); |
1138 | 1417 | ||
1418 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1419 | "%p Received message from client.\n", ch); | ||
1420 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); | ||
1421 | |||
1139 | if (GNUNET_YES != ch->ready) | 1422 | if (GNUNET_YES != ch->ready) |
1140 | { | 1423 | { |
1141 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1424 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
@@ -1145,10 +1428,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
1145 | return; | 1428 | return; |
1146 | } | 1429 | } |
1147 | 1430 | ||
1148 | uint8_t inc_msg_id = GNUNET_NO; | ||
1149 | uint16_t size = ntohs (msg->size); | 1431 | uint16_t size = ntohs (msg->size); |
1150 | uint16_t psize = 0, ptype = 0, pos = 0; | ||
1151 | |||
1152 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) | 1432 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) |
1153 | { | 1433 | { |
1154 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch); | 1434 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch); |
@@ -1158,42 +1438,22 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
1158 | return; | 1438 | return; |
1159 | } | 1439 | } |
1160 | 1440 | ||
1161 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1441 | uint16_t first_ptype = 0, last_ptype = 0; |
1162 | "%p Received message from client.\n", ch); | 1442 | if (GNUNET_SYSERR |
1163 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); | 1443 | == GNUNET_PSYC_check_message_parts (size - sizeof (*msg), |
1164 | 1444 | (const char *) &msg[1], | |
1165 | for (pos = 0; sizeof (*msg) + pos < size; pos += psize) | 1445 | &first_ptype, &last_ptype)) |
1166 | { | 1446 | { |
1167 | const struct GNUNET_MessageHeader *pmsg | 1447 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
1168 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | 1448 | "%p Received invalid message part from client.\n", ch); |
1169 | psize = ntohs (pmsg->size); | 1449 | GNUNET_break (0); |
1170 | ptype = ntohs (pmsg->type); | 1450 | transmit_error (ch); |
1171 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) | 1451 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
1172 | { | 1452 | return; |
1173 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1174 | "%p Received invalid message part of type %u and size %u " | ||
1175 | "from client.\n", ch, ptype, psize); | ||
1176 | GNUNET_break (0); | ||
1177 | transmit_error (ch); | ||
1178 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
1179 | return; | ||
1180 | } | ||
1181 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1182 | "%p Received message part from client.\n", ch); | ||
1183 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); | ||
1184 | |||
1185 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype) | ||
1186 | inc_msg_id = GNUNET_YES; | ||
1187 | } | 1453 | } |
1188 | 1454 | ||
1189 | size -= sizeof (*msg); | 1455 | queue_message (ch, msg, first_ptype, last_ptype); |
1190 | struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size); | 1456 | transmit_message (ch); |
1191 | tmit_msg->buf = (char *) &tmit_msg[1]; | ||
1192 | memcpy (tmit_msg->buf, &msg[1], size); | ||
1193 | tmit_msg->size = size; | ||
1194 | tmit_msg->state = ch->tmit_state; | ||
1195 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); | ||
1196 | transmit_message (ch, inc_msg_id); | ||
1197 | 1457 | ||
1198 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1458 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1199 | }; | 1459 | }; |
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 582a8e168..f2d386548 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h | |||
@@ -31,8 +31,9 @@ | |||
31 | #include "gnunet_psyc_service.h" | 31 | #include "gnunet_psyc_service.h" |
32 | 32 | ||
33 | 33 | ||
34 | uint16_t | 34 | int |
35 | GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data); | 35 | GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data, |
36 | uint16_t *first_ptype, uint16_t *last_ptype); | ||
36 | 37 | ||
37 | void | 38 | void |
38 | GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, | 39 | GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, |
@@ -41,14 +42,26 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, | |||
41 | 42 | ||
42 | enum MessageState | 43 | enum MessageState |
43 | { | 44 | { |
44 | MSG_STATE_START = 0, | 45 | MSG_STATE_START = 0, |
45 | MSG_STATE_HEADER = 1, | 46 | MSG_STATE_HEADER = 1, |
46 | MSG_STATE_METHOD = 2, | 47 | MSG_STATE_METHOD = 2, |
47 | MSG_STATE_MODIFIER = 3, | 48 | MSG_STATE_MODIFIER = 3, |
48 | MSG_STATE_MOD_CONT = 4, | 49 | MSG_STATE_MOD_CONT = 4, |
49 | MSG_STATE_DATA = 5, | 50 | MSG_STATE_DATA = 5, |
50 | MSG_STATE_END = 6, | 51 | MSG_STATE_END = 6, |
51 | MSG_STATE_CANCEL = 7, | 52 | MSG_STATE_CANCEL = 7, |
53 | MSG_STATE_ERROR = 8, | ||
54 | }; | ||
55 | |||
56 | |||
57 | enum MessageFragmentState | ||
58 | { | ||
59 | MSG_FRAG_STATE_START = 0, | ||
60 | MSG_FRAG_STATE_HEADER = 1, | ||
61 | MSG_FRAG_STATE_DATA = 2, | ||
62 | MSG_FRAG_STATE_END = 3, | ||
63 | MSG_FRAG_STATE_CANCEL = 4, | ||
64 | MSG_FRAG_STATE_DROP = 5, | ||
52 | }; | 65 | }; |
53 | 66 | ||
54 | 67 | ||
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 16e8106d4..22f1da069 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -1502,7 +1502,7 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, | |||
1502 | slvadd = (struct ChannelSlaveAdd *) &op[1]; | 1502 | slvadd = (struct ChannelSlaveAdd *) &op[1]; |
1503 | op->msg = (struct GNUNET_MessageHeader *) slvadd; | 1503 | op->msg = (struct GNUNET_MessageHeader *) slvadd; |
1504 | 1504 | ||
1505 | slvadd->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD; | 1505 | slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); |
1506 | slvadd->header.size = htons (sizeof (*slvadd)); | 1506 | slvadd->header.size = htons (sizeof (*slvadd)); |
1507 | slvadd->announced_at = GNUNET_htonll (announced_at); | 1507 | slvadd->announced_at = GNUNET_htonll (announced_at); |
1508 | slvadd->effective_since = GNUNET_htonll (effective_since); | 1508 | slvadd->effective_since = GNUNET_htonll (effective_since); |
@@ -1544,7 +1544,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, | |||
1544 | 1544 | ||
1545 | slvrm = (struct ChannelSlaveRemove *) &op[1]; | 1545 | slvrm = (struct ChannelSlaveRemove *) &op[1]; |
1546 | op->msg = (struct GNUNET_MessageHeader *) slvrm; | 1546 | op->msg = (struct GNUNET_MessageHeader *) slvrm; |
1547 | slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM; | 1547 | slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); |
1548 | slvrm->header.size = htons (sizeof (*slvrm)); | 1548 | slvrm->header.size = htons (sizeof (*slvrm)); |
1549 | slvrm->announced_at = GNUNET_htonll (announced_at); | 1549 | slvrm->announced_at = GNUNET_htonll (announced_at); |
1550 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, | 1550 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, |
diff --git a/src/psyc/psyc_common.c b/src/psyc/psyc_common.c index bf5643ff2..74729aca2 100644 --- a/src/psyc/psyc_common.c +++ b/src/psyc/psyc_common.c | |||
@@ -30,36 +30,48 @@ | |||
30 | /** | 30 | /** |
31 | * Check if @a data contains a series of valid message parts. | 31 | * Check if @a data contains a series of valid message parts. |
32 | * | 32 | * |
33 | * @param data_size Size of @a data. | 33 | * @param data_size Size of @a data. |
34 | * @param data Data. | 34 | * @param data Data. |
35 | * @param[out] first_ptype Type of first message part. | ||
36 | * @param[out] last_ptype Type of last message part. | ||
35 | * | 37 | * |
36 | * @return Message type number | 38 | * @return Number of message parts found in @a data. |
37 | * or GNUNET_NO if the message contains invalid or no parts. | 39 | * or GNUNET_SYSERR if the message contains invalid parts. |
38 | */ | 40 | */ |
39 | uint16_t | 41 | int |
40 | GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data) | 42 | GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data, |
43 | uint16_t *first_ptype, uint16_t *last_ptype) | ||
41 | { | 44 | { |
42 | const struct GNUNET_MessageHeader *pmsg; | 45 | const struct GNUNET_MessageHeader *pmsg; |
43 | uint16_t ptype = GNUNET_NO; | 46 | uint16_t parts = 0, ptype = 0, psize = 0, pos = 0; |
44 | uint16_t psize = 0; | 47 | if (NULL != first_ptype) |
45 | uint16_t pos = 0; | 48 | *first_ptype = 0; |
49 | if (NULL != last_ptype) | ||
50 | *last_ptype = 0; | ||
46 | 51 | ||
47 | for (pos = 0; pos < data_size; pos += psize) | 52 | for (pos = 0; pos < data_size; pos += psize, parts++) |
48 | { | 53 | { |
49 | pmsg = (const struct GNUNET_MessageHeader *) (data + pos); | 54 | pmsg = (const struct GNUNET_MessageHeader *) (data + pos); |
50 | psize = ntohs (pmsg->size); | 55 | psize = ntohs (pmsg->size); |
51 | ptype = ntohs (pmsg->type); | 56 | ptype = ntohs (pmsg->type); |
52 | if (psize < sizeof (*pmsg) || pos + psize > data_size | 57 | if (0 == parts && NULL != first_ptype) |
58 | *first_ptype = ptype; | ||
59 | if (NULL != last_ptype | ||
60 | && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) | ||
61 | *last_ptype = ptype; | ||
62 | if (psize < sizeof (*pmsg) | ||
63 | || pos + psize > data_size | ||
53 | || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD | 64 | || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD |
54 | || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype) | 65 | || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype) |
55 | { | 66 | { |
56 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 67 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
57 | "Invalid message part of type %u and size %u.\n", | 68 | "Invalid message part of type %u and size %u.\n", |
58 | ptype, psize); | 69 | ptype, psize); |
59 | return GNUNET_NO; | 70 | return GNUNET_SYSERR; |
60 | } | 71 | } |
72 | /* FIXME: check message part order */ | ||
61 | } | 73 | } |
62 | return ptype; | 74 | return parts; |
63 | } | 75 | } |
64 | 76 | ||
65 | 77 | ||
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 360d56c06..f58ecb7f6 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -35,9 +35,9 @@ | |||
35 | #include "gnunet_env_lib.h" | 35 | #include "gnunet_env_lib.h" |
36 | #include "gnunet_psyc_service.h" | 36 | #include "gnunet_psyc_service.h" |
37 | 37 | ||
38 | #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) | 38 | #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) |
39 | 39 | ||
40 | #define DEBUG_SERVICE 1 | 40 | #define DEBUG_SERVICE 0 |
41 | 41 | ||
42 | 42 | ||
43 | /** | 43 | /** |
@@ -120,7 +120,7 @@ cleanup () | |||
120 | 120 | ||
121 | 121 | ||
122 | /** | 122 | /** |
123 | * Terminate the testcase (failure). | 123 | * Terminate the test case (failure). |
124 | * | 124 | * |
125 | * @param cls NULL | 125 | * @param cls NULL |
126 | * @param tc scheduler context | 126 | * @param tc scheduler context |
@@ -134,7 +134,7 @@ end_badly (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
134 | 134 | ||
135 | 135 | ||
136 | /** | 136 | /** |
137 | * Terminate the testcase (success). | 137 | * Terminate the test case (success). |
138 | * | 138 | * |
139 | * @param cls NULL | 139 | * @param cls NULL |
140 | * @param tc scheduler context | 140 | * @param tc scheduler context |
@@ -148,7 +148,7 @@ end_normally (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
148 | 148 | ||
149 | 149 | ||
150 | /** | 150 | /** |
151 | * Finish the testcase (successfully). | 151 | * Finish the test case (successfully). |
152 | */ | 152 | */ |
153 | static void | 153 | static void |
154 | end () | 154 | end () |
@@ -518,7 +518,8 @@ run (void *cls, | |||
518 | 518 | ||
519 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); | 519 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); |
520 | mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, | 520 | mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, |
521 | &master_message, &join_request, &master_started, NULL); | 521 | &master_message, &join_request, |
522 | &master_started, NULL); | ||
522 | } | 523 | } |
523 | 524 | ||
524 | 525 | ||
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c index 88ae1185b..a5fdea10b 100644 --- a/src/psycstore/psycstore_api.c +++ b/src/psycstore/psycstore_api.c | |||
@@ -1099,9 +1099,9 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, | |||
1099 | req->message_id = GNUNET_htonll (message_id); | 1099 | req->message_id = GNUNET_htonll (message_id); |
1100 | req->name_size = htons (name_size); | 1100 | req->name_size = htons (name_size); |
1101 | req->flags | 1101 | req->flags |
1102 | = 0 == i | 1102 | = (0 == i) |
1103 | ? STATE_OP_FIRST | 1103 | ? STATE_OP_FIRST |
1104 | : modifier_count - 1 == i | 1104 | : (modifier_count - 1 == i) |
1105 | ? STATE_OP_LAST | 1105 | ? STATE_OP_LAST |
1106 | : 0; | 1106 | : 0; |
1107 | 1107 | ||