diff options
author | Gabor X Toth <*@tg-x.net> | 2014-01-06 00:09:37 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-01-06 00:09:37 +0000 |
commit | c04d45b9738e1764d2e2c21efdbeb129f298d5d1 (patch) | |
tree | 9eec32efdd3fe3f9f459630af16058cc47436bce /src/psyc | |
parent | 83a0e31631dbc199c37c42f11004e1be544f04a8 (diff) | |
download | gnunet-c04d45b9738e1764d2e2c21efdbeb129f298d5d1.tar.gz gnunet-c04d45b9738e1764d2e2c21efdbeb129f298d5d1.zip |
psyc: ipc messages
Diffstat (limited to 'src/psyc')
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 325 | ||||
-rw-r--r-- | src/psyc/psyc.h | 19 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 569 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 74 |
4 files changed, 774 insertions, 213 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index a3b1b8f82..628c39900 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -71,9 +71,9 @@ struct TransmitMessage | |||
71 | char *buf; | 71 | char *buf; |
72 | uint16_t size; | 72 | uint16_t size; |
73 | /** | 73 | /** |
74 | * enum GNUNET_PSYC_DataStatus | 74 | * enum MessageState |
75 | */ | 75 | */ |
76 | uint8_t status; | 76 | uint8_t state; |
77 | }; | 77 | }; |
78 | 78 | ||
79 | /** | 79 | /** |
@@ -87,12 +87,21 @@ struct Channel | |||
87 | struct TransmitMessage *tmit_tail; | 87 | struct TransmitMessage *tmit_tail; |
88 | 88 | ||
89 | GNUNET_SCHEDULER_TaskIdentifier tmit_task; | 89 | GNUNET_SCHEDULER_TaskIdentifier tmit_task; |
90 | uint32_t tmit_mod_count; | 90 | |
91 | uint32_t tmit_mod_recvd; | ||
92 | /** | 91 | /** |
93 | * enum GNUNET_PSYC_DataStatus | 92 | * Expected value size for the modifier being received from the PSYC service. |
94 | */ | 93 | */ |
95 | uint8_t tmit_status; | 94 | uint32_t tmit_mod_value_size_expected; |
95 | |||
96 | /** | ||
97 | * Actual value size for the modifier being received from the PSYC service. | ||
98 | */ | ||
99 | uint32_t tmit_mod_value_size; | ||
100 | |||
101 | /** | ||
102 | * enum MessageState | ||
103 | */ | ||
104 | uint8_t tmit_state; | ||
96 | 105 | ||
97 | uint8_t in_transmit; | 106 | uint8_t in_transmit; |
98 | uint8_t is_master; | 107 | uint8_t is_master; |
@@ -112,12 +121,27 @@ struct Master | |||
112 | struct GNUNET_MULTICAST_Origin *origin; | 121 | struct GNUNET_MULTICAST_Origin *origin; |
113 | struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; | 122 | struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; |
114 | 123 | ||
124 | /** | ||
125 | * Maximum message ID for this channel. | ||
126 | * | ||
127 | * Incremented before sending a message, thus the message_id in messages sent | ||
128 | * starts from 1. | ||
129 | */ | ||
115 | uint64_t max_message_id; | 130 | uint64_t max_message_id; |
131 | |||
132 | /** | ||
133 | * ID of the last message that contains any state operations. | ||
134 | * 0 if there is no such message. | ||
135 | */ | ||
116 | uint64_t max_state_message_id; | 136 | uint64_t max_state_message_id; |
137 | |||
138 | /** | ||
139 | * Maximum group generation for this channel. | ||
140 | */ | ||
117 | uint64_t max_group_generation; | 141 | uint64_t max_group_generation; |
118 | 142 | ||
119 | /** | 143 | /** |
120 | * enum GNUNET_PSYC_Policy | 144 | * @see enum GNUNET_PSYC_Policy |
121 | */ | 145 | */ |
122 | uint32_t policy; | 146 | uint32_t policy; |
123 | }; | 147 | }; |
@@ -196,6 +220,7 @@ client_cleanup (struct Channel *ch) | |||
196 | GNUNET_free (ch); | 220 | GNUNET_free (ch); |
197 | } | 221 | } |
198 | 222 | ||
223 | |||
199 | /** | 224 | /** |
200 | * Called whenever a client is disconnected. | 225 | * Called whenever a client is disconnected. |
201 | * Frees our resources associated with that client. | 226 | * Frees our resources associated with that client. |
@@ -234,7 +259,8 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
234 | } | 259 | } |
235 | } | 260 | } |
236 | 261 | ||
237 | void | 262 | |
263 | static void | ||
238 | join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | 264 | join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, |
239 | const struct GNUNET_MessageHeader *join_req, | 265 | const struct GNUNET_MessageHeader *join_req, |
240 | struct GNUNET_MULTICAST_JoinHandle *jh) | 266 | struct GNUNET_MULTICAST_JoinHandle *jh) |
@@ -242,7 +268,8 @@ join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | |||
242 | 268 | ||
243 | } | 269 | } |
244 | 270 | ||
245 | void | 271 | |
272 | static void | ||
246 | membership_test_cb (void *cls, | 273 | membership_test_cb (void *cls, |
247 | const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | 274 | const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, |
248 | uint64_t message_id, uint64_t group_generation, | 275 | uint64_t message_id, uint64_t group_generation, |
@@ -251,16 +278,18 @@ membership_test_cb (void *cls, | |||
251 | 278 | ||
252 | } | 279 | } |
253 | 280 | ||
254 | void | 281 | |
282 | static void | ||
255 | replay_fragment_cb (void *cls, | 283 | replay_fragment_cb (void *cls, |
256 | const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | 284 | const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, |
257 | uint64_t fragment_id, uint64_t flags, | 285 | uint64_t fragment_id, uint64_t flags, |
258 | struct GNUNET_MULTICAST_ReplayHandle *rh) | 286 | struct GNUNET_MULTICAST_ReplayHandle *rh) |
259 | { | ||
260 | 287 | ||
288 | { | ||
261 | } | 289 | } |
262 | 290 | ||
263 | void | 291 | |
292 | static void | ||
264 | replay_message_cb (void *cls, | 293 | replay_message_cb (void *cls, |
265 | const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | 294 | const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, |
266 | uint64_t message_id, | 295 | uint64_t message_id, |
@@ -271,56 +300,30 @@ replay_message_cb (void *cls, | |||
271 | 300 | ||
272 | } | 301 | } |
273 | 302 | ||
274 | void | ||
275 | request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | ||
276 | const struct GNUNET_MessageHeader *req, | ||
277 | enum GNUNET_MULTICAST_MessageFlags flags) | ||
278 | { | ||
279 | |||
280 | } | ||
281 | |||
282 | 303 | ||
283 | void | 304 | static void |
284 | fragment_store_result (void *cls, int64_t result, const char *err_msg) | 305 | fragment_store_result (void *cls, int64_t result, const char *err_msg) |
285 | { | 306 | { |
286 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 307 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
287 | "fragment_store() returned %l (%s)\n", result, err_msg); | 308 | "fragment_store() returned %l (%s)\n", result, err_msg); |
288 | } | 309 | } |
289 | 310 | ||
311 | |||
290 | /** | 312 | /** |
291 | * Send PSYC messages in an incoming multicast message to a client. | 313 | * Iterator callback for sending a message to a client. |
314 | * | ||
315 | * @see message_cb() | ||
292 | */ | 316 | */ |
293 | int | 317 | static int |
294 | send_to_client (void *cls, const struct GNUNET_HashCode *ch_key_hash, void *chan) | 318 | message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash, |
319 | void *chan) | ||
295 | { | 320 | { |
296 | const struct GNUNET_MULTICAST_MessageHeader *msg = cls; | 321 | const struct GNUNET_MessageHeader *msg = cls; |
297 | struct Channel *ch = chan; | 322 | struct Channel *ch = chan; |
298 | 323 | ||
299 | uint16_t size = ntohs (msg->header.size); | 324 | GNUNET_SERVER_notification_context_add (nc, ch->client); |
300 | uint16_t pos = 0; | 325 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO); |
301 | |||
302 | while (sizeof (*msg) + pos < size) | ||
303 | { | ||
304 | const struct GNUNET_MessageHeader *pmsg | ||
305 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | ||
306 | uint16_t psize = ntohs (pmsg->size); | ||
307 | if (sizeof (*msg) + pos + psize > size) | ||
308 | { | ||
309 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
310 | "Ignoring message of type %u with invalid size. " | ||
311 | "(%u + %u + %u > %u)\n", ntohs (pmsg->type), | ||
312 | sizeof (*msg), pos, psize, size); | ||
313 | break; | ||
314 | } | ||
315 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
316 | "Sending message of type %u and size %u to client.\n", | ||
317 | ntohs (pmsg->type), psize); | ||
318 | 326 | ||
319 | GNUNET_SERVER_notification_context_add (nc, ch->client); | ||
320 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, pmsg, | ||
321 | GNUNET_NO); | ||
322 | pos += psize; | ||
323 | } | ||
324 | return GNUNET_YES; | 327 | return GNUNET_YES; |
325 | } | 328 | } |
326 | 329 | ||
@@ -330,7 +333,7 @@ send_to_client (void *cls, const struct GNUNET_HashCode *ch_key_hash, void *chan | |||
330 | * | 333 | * |
331 | * Store it using PSYCstore and send it to all clients of the channel. | 334 | * Store it using PSYCstore and send it to all clients of the channel. |
332 | */ | 335 | */ |
333 | void | 336 | static void |
334 | message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | 337 | message_cb (void *cls, const struct GNUNET_MessageHeader *msg) |
335 | { | 338 | { |
336 | uint16_t type = ntohs (msg->type); | 339 | uint16_t type = ntohs (msg->type); |
@@ -344,34 +347,100 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | |||
344 | struct Master *mst = cls; | 347 | struct Master *mst = cls; |
345 | struct Slave *slv = cls; | 348 | struct Slave *slv = cls; |
346 | 349 | ||
347 | struct GNUNET_CRYPTO_EddsaPublicKey *ch_key | 350 | /* const struct GNUNET_MULTICAST_MessageHeader *mmsg |
351 | = (const struct GNUNET_MULTICAST_MessageHeader *) msg; */ | ||
352 | struct GNUNET_CRYPTO_EddsaPublicKey *chan_key | ||
348 | = ch->is_master ? &mst->pub_key : &slv->chan_key; | 353 | = ch->is_master ? &mst->pub_key : &slv->chan_key; |
349 | struct GNUNET_HashCode *ch_key_hash | 354 | struct GNUNET_HashCode *chan_key_hash |
350 | = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash; | 355 | = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash; |
351 | 356 | ||
352 | switch (type) | 357 | switch (type) |
353 | { | 358 | { |
354 | case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: | 359 | case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: |
355 | GNUNET_PSYCSTORE_fragment_store (store, ch_key, | 360 | { |
361 | GNUNET_PSYCSTORE_fragment_store (store, chan_key, | ||
356 | (const struct | 362 | (const struct |
357 | GNUNET_MULTICAST_MessageHeader *) msg, | 363 | GNUNET_MULTICAST_MessageHeader *) msg, |
358 | 0, NULL, NULL); | 364 | 0, NULL, NULL); |
359 | GNUNET_CONTAINER_multihashmap_get_multiple (clients, ch_key_hash, | ||
360 | send_to_client, (void *) msg); | ||
361 | break; | ||
362 | 365 | ||
366 | uint16_t size = ntohs (msg->size); | ||
367 | uint16_t psize = 0; | ||
368 | uint16_t pos = 0; | ||
369 | |||
370 | for (pos = 0; sizeof (*msg) + pos < size; pos += psize) | ||
371 | { | ||
372 | const struct GNUNET_MessageHeader *pmsg | ||
373 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | ||
374 | uint16_t psize = ntohs (pmsg->size); | ||
375 | if (sizeof (*msg) + pos + psize > size) | ||
376 | { | ||
377 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
378 | "Message received from multicast contains invalid PSYC " | ||
379 | "message. Not sending to clients.\n"); | ||
380 | return; | ||
381 | } | ||
382 | } | ||
383 | |||
384 | #if TODO | ||
385 | /* FIXME: apply modifiers to state in PSYCstore */ | ||
386 | GNUNET_PSYCSTORE_state_modify (store, chan_key, | ||
387 | GNUNET_ntohll (mmsg->message_id), | ||
388 | meth->mod_count, mods, | ||
389 | rcb, rcb_cls); | ||
390 | #endif | ||
391 | |||
392 | const struct GNUNET_MULTICAST_MessageHeader *mmsg | ||
393 | = (const struct GNUNET_MULTICAST_MessageHeader *) msg; | ||
394 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
395 | |||
396 | psize = sizeof (*pmsg) + size - sizeof (*mmsg); | ||
397 | pmsg = GNUNET_malloc (psize); | ||
398 | pmsg->header.size = htons (psize); | ||
399 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
400 | pmsg->message_id = mmsg->message_id; | ||
401 | |||
402 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); | ||
403 | |||
404 | GNUNET_CONTAINER_multihashmap_get_multiple (clients, chan_key_hash, | ||
405 | message_to_client, | ||
406 | (void *) pmsg); | ||
407 | GNUNET_free (pmsg); | ||
408 | break; | ||
409 | } | ||
363 | default: | 410 | default: |
364 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 411 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
365 | "Ignoring unknown message of type %u and size %u.\n", | 412 | "Discarding unknown message of type %u and size %u.\n", |
366 | type, size); | 413 | type, size); |
367 | } | 414 | } |
368 | } | 415 | } |
369 | 416 | ||
370 | 417 | ||
371 | /** | 418 | /** |
419 | * Send a request received from multicast to a client. | ||
420 | */ | ||
421 | static int | ||
422 | request_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash, | ||
423 | void *chan) | ||
424 | { | ||
425 | /* TODO */ | ||
426 | |||
427 | return GNUNET_YES; | ||
428 | } | ||
429 | |||
430 | |||
431 | static void | ||
432 | request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | ||
433 | const struct GNUNET_MessageHeader *req, | ||
434 | enum GNUNET_MULTICAST_MessageFlags flags) | ||
435 | { | ||
436 | |||
437 | } | ||
438 | |||
439 | |||
440 | /** | ||
372 | * Response from PSYCstore with the current counter values for a channel master. | 441 | * Response from PSYCstore with the current counter values for a channel master. |
373 | */ | 442 | */ |
374 | void | 443 | static void |
375 | master_counters_cb (void *cls, int result, uint64_t max_fragment_id, | 444 | master_counters_cb (void *cls, int result, uint64_t max_fragment_id, |
376 | uint64_t max_message_id, uint64_t max_group_generation, | 445 | uint64_t max_message_id, uint64_t max_group_generation, |
377 | uint64_t max_state_message_id) | 446 | uint64_t max_state_message_id) |
@@ -513,20 +582,13 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
513 | static void | 582 | static void |
514 | send_transmit_ack (struct Channel *ch) | 583 | send_transmit_ack (struct Channel *ch) |
515 | { | 584 | { |
516 | struct TransmitAck *res = GNUNET_malloc (sizeof (*res)); | 585 | struct GNUNET_MessageHeader res; |
517 | res->header.size = htons (sizeof (*res)); | 586 | res.size = htons (sizeof (res)); |
518 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); | 587 | res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); |
519 | |||
520 | res->buf_avail = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; | ||
521 | struct TransmitMessage *tmit_msg = ch->tmit_tail; | ||
522 | if (NULL != tmit_msg && GNUNET_PSYC_DATA_CONT == tmit_msg->status) | ||
523 | res->buf_avail -= tmit_msg->size; | ||
524 | res->buf_avail = htons (res->buf_avail); | ||
525 | 588 | ||
526 | GNUNET_SERVER_notification_context_add (nc, ch->client); | 589 | GNUNET_SERVER_notification_context_add (nc, ch->client); |
527 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, | 590 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, |
528 | GNUNET_NO); | 591 | GNUNET_NO); |
529 | GNUNET_free (res); | ||
530 | } | 592 | } |
531 | 593 | ||
532 | 594 | ||
@@ -555,7 +617,7 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
555 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); | 617 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); |
556 | GNUNET_free (msg); | 618 | GNUNET_free (msg); |
557 | 619 | ||
558 | int ret = (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; | 620 | int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; |
559 | 621 | ||
560 | if (0 == ch->tmit_task) | 622 | if (0 == ch->tmit_task) |
561 | { | 623 | { |
@@ -638,6 +700,7 @@ transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay) | |||
638 | : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch); | 700 | : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch); |
639 | } | 701 | } |
640 | 702 | ||
703 | |||
641 | /** | 704 | /** |
642 | * Queue incoming message parts from a client for transmission, and send them to | 705 | * Queue incoming message parts from a client for transmission, and send them to |
643 | * the multicast group when the buffer is full or reached the end of message. | 706 | * the multicast group when the buffer is full or reached the end of message. |
@@ -659,21 +722,20 @@ queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) | |||
659 | "for transmission to multicast.\n", | 722 | "for transmission to multicast.\n", |
660 | ntohs (msg->type), size); | 723 | ntohs (msg->type), size); |
661 | 724 | ||
662 | if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size) | 725 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size) |
663 | return GNUNET_SYSERR; | 726 | return GNUNET_SYSERR; |
664 | 727 | ||
665 | if (NULL == tmit_msg | 728 | if (NULL == tmit_msg |
666 | || tmit_msg->status != GNUNET_PSYC_DATA_CONT | 729 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit_msg->size + size) |
667 | || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < tmit_msg->size + size) | ||
668 | { | 730 | { |
669 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 731 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
670 | "Appending message qto new buffer.\n"); | 732 | "Appending message to new buffer.\n"); |
671 | /* Start filling up new buffer */ | 733 | /* Start filling up new buffer */ |
672 | tmit_msg = GNUNET_new (struct TransmitMessage); | 734 | tmit_msg = GNUNET_new (struct TransmitMessage); |
673 | tmit_msg->buf = GNUNET_malloc (size); | 735 | tmit_msg->buf = GNUNET_malloc (size); |
674 | memcpy (tmit_msg->buf, msg, size); | 736 | memcpy (tmit_msg->buf, msg, size); |
675 | tmit_msg->size = size; | 737 | tmit_msg->size = size; |
676 | tmit_msg->status = ch->tmit_status; | 738 | tmit_msg->state = ch->tmit_state; |
677 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); | 739 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); |
678 | } | 740 | } |
679 | else | 741 | else |
@@ -684,23 +746,41 @@ queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) | |||
684 | tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size); | 746 | tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size); |
685 | memcpy (tmit_msg->buf + tmit_msg->size, msg, size); | 747 | memcpy (tmit_msg->buf + tmit_msg->size, msg, size); |
686 | tmit_msg->size += size; | 748 | tmit_msg->size += size; |
687 | tmit_msg->status = ch->tmit_status; | 749 | tmit_msg->state = ch->tmit_state; |
688 | } | 750 | } |
689 | 751 | ||
690 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size); | 752 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size); |
691 | 753 | ||
692 | /* Wait a bit for the remaining message parts from the client | 754 | /* Wait a bit for the remaining message parts from the client |
693 | if there's still some space left in the buffer. */ | 755 | if there's still some space left in the buffer. */ |
694 | if (GNUNET_PSYC_DATA_CONT == tmit_msg->status | 756 | if (tmit_msg->state < MSG_STATE_END |
695 | && (tmit_msg->size + sizeof (struct GNUNET_PSYC_MessageData) | 757 | && (tmit_msg->size + sizeof (struct GNUNET_MessageHeader) |
696 | < GNUNET_MULTICAST_FRAGMENT_MAX_SIZE)) | 758 | < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD)) |
759 | { | ||
697 | tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2); | 760 | tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2); |
761 | } | ||
762 | else | ||
763 | { | ||
764 | send_transmit_ack (ch); | ||
765 | } | ||
698 | 766 | ||
699 | transmit_message (ch, tmit_delay); | 767 | transmit_message (ch, tmit_delay); |
700 | 768 | ||
701 | return GNUNET_OK; | 769 | return GNUNET_OK; |
702 | } | 770 | } |
703 | 771 | ||
772 | |||
773 | static void | ||
774 | transmit_error (struct Channel *ch) | ||
775 | { | ||
776 | struct GNUNET_MessageHeader *msg = GNUNET_malloc (sizeof (*msg)); | ||
777 | msg->size = ntohs (sizeof (*msg)); | ||
778 | msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
779 | queue_message (ch, msg); | ||
780 | |||
781 | GNUNET_SERVER_client_disconnect (ch->client); | ||
782 | } | ||
783 | |||
704 | /** | 784 | /** |
705 | * Incoming method from a client. | 785 | * Incoming method from a client. |
706 | */ | 786 | */ |
@@ -708,28 +788,21 @@ static void | |||
708 | handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, | 788 | handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, |
709 | const struct GNUNET_MessageHeader *msg) | 789 | const struct GNUNET_MessageHeader *msg) |
710 | { | 790 | { |
711 | const struct GNUNET_PSYC_MessageMethod *meth | 791 | /* const struct GNUNET_PSYC_MessageMethod *meth |
712 | = (const struct GNUNET_PSYC_MessageMethod *) msg; | 792 | = (const struct GNUNET_PSYC_MessageMethod *) msg; */ |
713 | struct Channel *ch | 793 | struct Channel *ch |
714 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 794 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
715 | GNUNET_assert (NULL != ch); | 795 | GNUNET_assert (NULL != ch); |
716 | 796 | ||
717 | if (GNUNET_NO != ch->in_transmit) | 797 | if (MSG_STATE_START != ch->tmit_state) |
718 | { | 798 | { |
719 | /* FIXME: already transmitting a message, send back error message. */ | 799 | transmit_error (ch); |
720 | return; | 800 | return; |
721 | } | 801 | } |
722 | 802 | ch->tmit_state = MSG_STATE_METHOD; | |
723 | ch->in_transmit = GNUNET_YES; | ||
724 | ch->tmit_mod_recvd = 0; | ||
725 | ch->tmit_mod_count = ntohl (meth->mod_count); | ||
726 | ch->tmit_status = GNUNET_PSYC_DATA_CONT; | ||
727 | 803 | ||
728 | queue_message (ch, msg); | 804 | queue_message (ch, msg); |
729 | 805 | send_transmit_ack (ch); | |
730 | if (0 == ch->tmit_mod_count) | ||
731 | send_transmit_ack (ch); | ||
732 | |||
733 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 806 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
734 | }; | 807 | }; |
735 | 808 | ||
@@ -741,20 +814,52 @@ static void | |||
741 | handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, | 814 | handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, |
742 | const struct GNUNET_MessageHeader *msg) | 815 | const struct GNUNET_MessageHeader *msg) |
743 | { | 816 | { |
744 | /* | ||
745 | const struct GNUNET_PSYC_MessageModifier *mod | 817 | const struct GNUNET_PSYC_MessageModifier *mod |
746 | = (const struct GNUNET_PSYC_MessageModifier *) msg; | 818 | = (const struct GNUNET_PSYC_MessageModifier *) msg; |
747 | */ | 819 | |
748 | struct Channel *ch | 820 | struct Channel *ch |
749 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 821 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
750 | GNUNET_assert (NULL != ch); | 822 | GNUNET_assert (NULL != ch); |
751 | 823 | ||
752 | ch->tmit_mod_recvd++; | 824 | if (MSG_STATE_METHOD != ch->tmit_state |
825 | || MSG_STATE_MODIFIER != ch->tmit_state | ||
826 | || MSG_STATE_MOD_CONT != ch->tmit_state | ||
827 | || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size) | ||
828 | { | ||
829 | transmit_error (ch); | ||
830 | return; | ||
831 | } | ||
832 | ch->tmit_mod_value_size_expected = ntohl (mod->value_size); | ||
833 | ch->tmit_mod_value_size = ntohs (msg->size) - ntohs(mod->name_size) - 1; | ||
834 | |||
753 | queue_message (ch, msg); | 835 | queue_message (ch, msg); |
836 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
837 | }; | ||
754 | 838 | ||
755 | if (ch->tmit_mod_recvd == ch->tmit_mod_count) | ||
756 | send_transmit_ack (ch); | ||
757 | 839 | ||
840 | /** | ||
841 | * Incoming modifier from a client. | ||
842 | */ | ||
843 | static void | ||
844 | handle_transmit_mod_cont (void *cls, struct GNUNET_SERVER_Client *client, | ||
845 | const struct GNUNET_MessageHeader *msg) | ||
846 | { | ||
847 | struct Channel *ch | ||
848 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | ||
849 | GNUNET_assert (NULL != ch); | ||
850 | |||
851 | ch->tmit_mod_value_size += ntohs (msg->size); | ||
852 | |||
853 | if (MSG_STATE_MODIFIER != ch->tmit_state | ||
854 | || MSG_STATE_MOD_CONT != ch->tmit_state | ||
855 | || ch->tmit_mod_value_size_expected < ch->tmit_mod_value_size) | ||
856 | { | ||
857 | transmit_error (ch); | ||
858 | return; | ||
859 | } | ||
860 | ch->tmit_state = MSG_STATE_MOD_CONT; | ||
861 | |||
862 | queue_message (ch, msg); | ||
758 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 863 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
759 | }; | 864 | }; |
760 | 865 | ||
@@ -766,18 +871,25 @@ static void | |||
766 | handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, | 871 | handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, |
767 | const struct GNUNET_MessageHeader *msg) | 872 | const struct GNUNET_MessageHeader *msg) |
768 | { | 873 | { |
769 | const struct GNUNET_PSYC_MessageData *data | ||
770 | = (const struct GNUNET_PSYC_MessageData *) msg; | ||
771 | struct Channel *ch | 874 | struct Channel *ch |
772 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 875 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
773 | GNUNET_assert (NULL != ch); | 876 | GNUNET_assert (NULL != ch); |
774 | 877 | ||
775 | ch->tmit_status = ntohs (data->status); | 878 | if (MSG_STATE_METHOD != ch->tmit_state |
879 | || MSG_STATE_MODIFIER != ch->tmit_state | ||
880 | || MSG_STATE_MOD_CONT != ch->tmit_state | ||
881 | || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size) | ||
882 | { | ||
883 | transmit_error (ch); | ||
884 | return; | ||
885 | } | ||
886 | ch->tmit_state = MSG_STATE_DATA; | ||
887 | |||
776 | queue_message (ch, msg); | 888 | queue_message (ch, msg); |
777 | send_transmit_ack (ch); | 889 | send_transmit_ack (ch); |
778 | 890 | ||
779 | if (GNUNET_PSYC_DATA_CONT != ch->tmit_status) | 891 | if (MSG_STATE_END <= ch->tmit_state) |
780 | ch->in_transmit = GNUNET_NO; | 892 | ch->tmit_state = MSG_STATE_START; |
781 | 893 | ||
782 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 894 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
783 | }; | 895 | }; |
@@ -800,16 +912,21 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
800 | 912 | ||
801 | { &handle_slave_join, NULL, | 913 | { &handle_slave_join, NULL, |
802 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, | 914 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, |
803 | 915 | #if TODO | |
916 | { &handle_psyc_message, NULL, | ||
917 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, | ||
918 | #endif | ||
804 | { &handle_transmit_method, NULL, | 919 | { &handle_transmit_method, NULL, |
805 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 }, | 920 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 }, |
806 | 921 | ||
807 | { &handle_transmit_modifier, NULL, | 922 | { &handle_transmit_modifier, NULL, |
808 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 }, | 923 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 }, |
809 | 924 | ||
925 | { &handle_transmit_mod_cont, NULL, | ||
926 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT, 0 }, | ||
927 | |||
810 | { &handle_transmit_data, NULL, | 928 | { &handle_transmit_data, NULL, |
811 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 }, | 929 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 }, |
812 | |||
813 | { NULL, NULL, 0, 0 } | 930 | { NULL, NULL, 0, 0 } |
814 | }; | 931 | }; |
815 | 932 | ||
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 85d10858e..90c07480a 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h | |||
@@ -29,6 +29,20 @@ | |||
29 | 29 | ||
30 | #include "gnunet_common.h" | 30 | #include "gnunet_common.h" |
31 | 31 | ||
32 | |||
33 | enum MessageState | ||
34 | { | ||
35 | MSG_STATE_START = 0, | ||
36 | MSG_STATE_HEADER = 1, | ||
37 | MSG_STATE_METHOD = 2, | ||
38 | MSG_STATE_MODIFIER = 3, | ||
39 | MSG_STATE_MOD_CONT = 4, | ||
40 | MSG_STATE_DATA = 5, | ||
41 | MSG_STATE_END = 6, | ||
42 | MSG_STATE_CANCEL = 7, | ||
43 | }; | ||
44 | |||
45 | |||
32 | GNUNET_NETWORK_STRUCT_BEGIN | 46 | GNUNET_NETWORK_STRUCT_BEGIN |
33 | 47 | ||
34 | /**** service -> library ****/ | 48 | /**** service -> library ****/ |
@@ -53,8 +67,7 @@ struct OperationResult | |||
53 | */ | 67 | */ |
54 | int64_t result_code GNUNET_PACKED; | 68 | int64_t result_code GNUNET_PACKED; |
55 | 69 | ||
56 | /* followed by 0-terminated error message (on error) */ | 70 | /* followed by NUL-terminated error message (on error) */ |
57 | |||
58 | }; | 71 | }; |
59 | 72 | ||
60 | 73 | ||
@@ -74,6 +87,7 @@ struct CountersResult | |||
74 | }; | 87 | }; |
75 | 88 | ||
76 | 89 | ||
90 | #if REMOVE | ||
77 | /** | 91 | /** |
78 | * Transmit acknowledgment. | 92 | * Transmit acknowledgment. |
79 | * | 93 | * |
@@ -95,6 +109,7 @@ struct TransmitAck | |||
95 | */ | 109 | */ |
96 | uint16_t buf_avail; | 110 | uint16_t buf_avail; |
97 | }; | 111 | }; |
112 | #endif | ||
98 | 113 | ||
99 | 114 | ||
100 | /**** library -> service ****/ | 115 | /**** library -> service ****/ |
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 290f3e375..a5a01fa92 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -30,15 +30,17 @@ | |||
30 | * @author Gabor X Toth | 30 | * @author Gabor X Toth |
31 | */ | 31 | */ |
32 | 32 | ||
33 | #include <inttypes.h> | ||
34 | |||
33 | #include "platform.h" | 35 | #include "platform.h" |
34 | #include "gnunet_util_lib.h" | 36 | #include "gnunet_util_lib.h" |
35 | #include "gnunet_env_lib.h" | 37 | #include "gnunet_env_lib.h" |
38 | #include "gnunet_multicast_service.h" | ||
36 | #include "gnunet_psyc_service.h" | 39 | #include "gnunet_psyc_service.h" |
37 | #include "psyc.h" | 40 | #include "psyc.h" |
38 | 41 | ||
39 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) | 42 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) |
40 | 43 | ||
41 | |||
42 | struct OperationHandle | 44 | struct OperationHandle |
43 | { | 45 | { |
44 | struct OperationHandle *prev; | 46 | struct OperationHandle *prev; |
@@ -91,31 +93,55 @@ struct GNUNET_PSYC_Channel | |||
91 | */ | 93 | */ |
92 | struct GNUNET_TIME_Relative reconnect_delay; | 94 | struct GNUNET_TIME_Relative reconnect_delay; |
93 | 95 | ||
94 | GNUNET_PSYC_Method method_cb; | 96 | /** |
97 | * Message part callback. | ||
98 | */ | ||
99 | GNUNET_PSYC_MessageCallback message_cb; | ||
100 | |||
101 | /** | ||
102 | * Message part callback for historic message. | ||
103 | */ | ||
104 | GNUNET_PSYC_MessageCallback hist_message_cb; | ||
95 | 105 | ||
106 | /** | ||
107 | * Join handler callback. | ||
108 | */ | ||
96 | GNUNET_PSYC_JoinCallback join_cb; | 109 | GNUNET_PSYC_JoinCallback join_cb; |
97 | 110 | ||
111 | /** | ||
112 | * Closure for @a message_cb and @a join_cb. | ||
113 | */ | ||
98 | void *cb_cls; | 114 | void *cb_cls; |
99 | 115 | ||
100 | /** | 116 | /** |
101 | * Are we polling for incoming messages right now? | 117 | * ID of the message being received from the PSYC service. |
102 | */ | 118 | */ |
103 | int in_receive; | 119 | uint64_t recv_message_id; |
104 | 120 | ||
105 | /** | 121 | /** |
106 | * Are we currently transmitting a message? | 122 | * State of the currently being received message from the PSYC service. |
107 | */ | 123 | */ |
108 | int in_transmit; | 124 | enum MessageState recv_state; |
109 | 125 | ||
110 | /** | 126 | /** |
111 | * Is this a master or slave channel? | 127 | * Flags for the currently being received message from the PSYC service. |
128 | */ | ||
129 | enum GNUNET_PSYC_MessageFlags recv_flags; | ||
130 | |||
131 | /** | ||
132 | * Expected value size for the modifier being received from the PSYC service. | ||
133 | */ | ||
134 | uint32_t recv_mod_value_size_expected; | ||
135 | |||
136 | /** | ||
137 | * Actual value size for the modifier being received from the PSYC service. | ||
112 | */ | 138 | */ |
113 | int is_master; | 139 | uint32_t recv_mod_value_size; |
114 | 140 | ||
115 | /** | 141 | /** |
116 | * Buffer space available for transmitting the next data fragment. | 142 | * Buffer space available for transmitting the next data fragment. |
117 | */ | 143 | */ |
118 | uint16_t tmit_buf_avail; | 144 | uint16_t tmit_size; // FIXME |
119 | 145 | ||
120 | /** | 146 | /** |
121 | * Is transmission paused? | 147 | * Is transmission paused? |
@@ -125,7 +151,22 @@ struct GNUNET_PSYC_Channel | |||
125 | /** | 151 | /** |
126 | * Are we still waiting for a PSYC_TRANSMIT_ACK? | 152 | * Are we still waiting for a PSYC_TRANSMIT_ACK? |
127 | */ | 153 | */ |
128 | uint8_t tmit_ack_pending; | 154 | uint8_t tmit_ack_pending; // FIXME |
155 | |||
156 | /** | ||
157 | * Are we polling for incoming messages right now? | ||
158 | */ | ||
159 | uint8_t in_receive; | ||
160 | |||
161 | /** | ||
162 | * Are we currently transmitting a message? | ||
163 | */ | ||
164 | uint8_t in_transmit; | ||
165 | |||
166 | /** | ||
167 | * Is this a master or slave channel? | ||
168 | */ | ||
169 | uint8_t is_master; | ||
129 | }; | 170 | }; |
130 | 171 | ||
131 | 172 | ||
@@ -135,9 +176,10 @@ struct GNUNET_PSYC_Channel | |||
135 | struct GNUNET_PSYC_MasterTransmitHandle | 176 | struct GNUNET_PSYC_MasterTransmitHandle |
136 | { | 177 | { |
137 | struct GNUNET_PSYC_Master *master; | 178 | struct GNUNET_PSYC_Master *master; |
138 | GNUNET_PSYC_MasterTransmitNotify notify; | 179 | GNUNET_PSYC_MasterTransmitNotify notify_mod; |
180 | GNUNET_PSYC_MasterTransmitNotify notify_data; | ||
139 | void *notify_cls; | 181 | void *notify_cls; |
140 | enum GNUNET_PSYC_DataStatus status; | 182 | enum MessageState state; |
141 | }; | 183 | }; |
142 | 184 | ||
143 | 185 | ||
@@ -254,52 +296,383 @@ transmit_next (struct GNUNET_PSYC_Channel *ch); | |||
254 | 296 | ||
255 | 297 | ||
256 | /** | 298 | /** |
257 | * Request data from client to transmit. | 299 | * Reset data stored related to the last received message. |
300 | */ | ||
301 | static void | ||
302 | recv_reset (struct GNUNET_PSYC_Channel *ch) | ||
303 | { | ||
304 | ch->recv_state = MSG_STATE_START; | ||
305 | ch->recv_flags = 0; | ||
306 | ch->recv_message_id = 0; | ||
307 | ch->recv_mod_value_size =0; | ||
308 | ch->recv_mod_value_size_expected = 0; | ||
309 | } | ||
310 | |||
311 | |||
312 | static void | ||
313 | recv_error (struct GNUNET_PSYC_Channel *ch) | ||
314 | { | ||
315 | recv_reset (ch); | ||
316 | |||
317 | GNUNET_PSYC_MessageCallback message_cb | ||
318 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
319 | ? ch->hist_message_cb | ||
320 | : ch->message_cb; | ||
321 | |||
322 | if (NULL != message_cb) | ||
323 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); | ||
324 | } | ||
325 | |||
326 | /** | ||
327 | * Request a modifier from a client to transmit. | ||
258 | * | 328 | * |
259 | * @param mst Master handle. | 329 | * @param mst Master handle. |
260 | */ | 330 | */ |
261 | static void | 331 | static void |
262 | master_transmit_data (struct GNUNET_PSYC_Master *mst) | 332 | master_transmit_mod (struct GNUNET_PSYC_Master *mst) |
263 | { | 333 | { |
264 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | 334 | struct GNUNET_PSYC_Channel *ch = &mst->ch; |
265 | size_t data_size = ch->tmit_buf_avail; | 335 | uint16_t max_data_size |
266 | struct GNUNET_PSYC_MessageData *pdata; | 336 | = ch->tmit_size > sizeof (struct GNUNET_MessageHeader) |
267 | struct OperationHandle *op | 337 | ? GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - ch->tmit_size |
268 | = GNUNET_malloc (sizeof (*op) + sizeof (*pdata) + data_size); | 338 | : GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD - ch->tmit_size; |
269 | pdata = (struct GNUNET_PSYC_MessageData *) &op[1]; | 339 | uint16_t data_size = max_data_size; |
270 | op->msg = (struct GNUNET_MessageHeader *) pdata; | ||
271 | pdata->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | ||
272 | 340 | ||
273 | switch (mst->tmit->notify (mst->tmit->notify_cls, &data_size, &pdata[1])) | 341 | struct GNUNET_MessageHeader *msg; |
342 | struct OperationHandle *op | ||
343 | = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); | ||
344 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
345 | msg->type | ||
346 | = MSG_STATE_MODIFIER == mst->tmit->state | ||
347 | ? htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER) | ||
348 | : htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | ||
349 | |||
350 | int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, | ||
351 | &data_size, &msg[1]); | ||
352 | switch (notify_ret) | ||
274 | { | 353 | { |
275 | case GNUNET_NO: | 354 | case GNUNET_NO: |
276 | mst->tmit->status = GNUNET_PSYC_DATA_CONT; | 355 | if (0 != data_size) |
356 | mst->tmit->state = MSG_STATE_MOD_CONT; | ||
277 | break; | 357 | break; |
278 | 358 | ||
279 | case GNUNET_YES: | 359 | case GNUNET_YES: |
280 | mst->tmit->status = GNUNET_PSYC_DATA_END; | 360 | mst->tmit->state = (0 == data_size) ? MSG_STATE_DATA : MSG_STATE_MODIFIER; |
281 | break; | 361 | break; |
282 | 362 | ||
283 | default: | 363 | default: |
284 | mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; | 364 | LOG (GNUNET_ERROR_TYPE_ERROR, |
285 | data_size = 0; | 365 | "MasterTransmitNotify returned error when requesting a modifier.\n"); |
286 | LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error.\n"); | 366 | |
367 | mst->tmit->state = MSG_STATE_START; | ||
368 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
369 | msg->size = htons (sizeof (*msg)); | ||
370 | |||
371 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
372 | transmit_next (ch); | ||
373 | return; | ||
287 | } | 374 | } |
288 | 375 | ||
289 | if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) | 376 | if ((GNUNET_NO == notify_ret && 0 == data_size)) |
290 | { | 377 | { |
291 | /* Transmission paused, nothing to send. */ | 378 | /* Transmission paused, nothing to send. */ |
292 | ch->tmit_paused = GNUNET_YES; | 379 | ch->tmit_paused = GNUNET_YES; |
293 | GNUNET_free (op); | 380 | GNUNET_free (op); |
294 | } | 381 | } |
295 | else | 382 | |
383 | if (0 < data_size) | ||
384 | { | ||
385 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); | ||
386 | msg->size = htons (sizeof (*msg) + data_size); | ||
387 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
388 | } | ||
389 | |||
390 | /* End of message. */ | ||
391 | if (GNUNET_YES == notify_ret) | ||
392 | { | ||
393 | op = GNUNET_malloc (sizeof *(op) + sizeof (*msg)); | ||
394 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
395 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | ||
396 | msg->size = htons (sizeof (*msg)); | ||
397 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
398 | } | ||
399 | |||
400 | transmit_next (ch); | ||
401 | } | ||
402 | |||
403 | |||
404 | /** | ||
405 | * Request data from a client to transmit. | ||
406 | * | ||
407 | * @param mst Master handle. | ||
408 | */ | ||
409 | static void | ||
410 | master_transmit_data (struct GNUNET_PSYC_Master *mst) | ||
411 | { | ||
412 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | ||
413 | struct GNUNET_MessageHeader *msg; | ||
414 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | ||
415 | struct OperationHandle *op | ||
416 | = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); | ||
417 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
418 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | ||
419 | |||
420 | int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, | ||
421 | &data_size, &msg[1]); | ||
422 | switch (notify_ret) | ||
296 | { | 423 | { |
297 | GNUNET_assert (data_size <= ch->tmit_buf_avail); | 424 | case GNUNET_NO: |
298 | pdata->header.size = htons (sizeof (*pdata) + data_size); | 425 | if (0 == data_size) |
299 | pdata->status = htons (mst->tmit->status); | 426 | { |
427 | /* Transmission paused, nothing to send. */ | ||
428 | ch->tmit_paused = GNUNET_YES; | ||
429 | GNUNET_free (op); | ||
430 | } | ||
431 | break; | ||
432 | |||
433 | case GNUNET_YES: | ||
434 | mst->tmit->state = MSG_STATE_START; | ||
435 | break; | ||
436 | |||
437 | default: | ||
438 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
439 | "MasterTransmitNotify returned error when requesting data.\n"); | ||
440 | |||
441 | mst->tmit->state = MSG_STATE_START; | ||
442 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
443 | msg->size = htons (sizeof (*msg)); | ||
444 | |||
300 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 445 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); |
301 | ch->tmit_ack_pending = GNUNET_YES; | ||
302 | transmit_next (ch); | 446 | transmit_next (ch); |
447 | return; | ||
448 | } | ||
449 | |||
450 | if (0 < data_size) | ||
451 | { | ||
452 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); | ||
453 | msg->size = htons (sizeof (*msg) + data_size); | ||
454 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
455 | } | ||
456 | |||
457 | /* End of message. */ | ||
458 | if (GNUNET_YES == notify_ret) | ||
459 | { | ||
460 | op = GNUNET_malloc (sizeof *(op) + sizeof (*msg)); | ||
461 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
462 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | ||
463 | msg->size = htons (sizeof (*msg)); | ||
464 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
465 | } | ||
466 | |||
467 | transmit_next (ch); | ||
468 | } | ||
469 | |||
470 | |||
471 | /** | ||
472 | * Handle incoming message from the PSYC service. | ||
473 | * | ||
474 | * @param ch The channel the message is sent to. | ||
475 | * @param pmsg The message. | ||
476 | */ | ||
477 | static void | ||
478 | handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | ||
479 | const struct GNUNET_PSYC_MessageHeader *pmsg) | ||
480 | { | ||
481 | const struct GNUNET_MessageHeader *msg; | ||
482 | uint16_t msize = ntohs (pmsg->header.size); | ||
483 | uint16_t pos = 0; | ||
484 | uint16_t size = 0; | ||
485 | uint16_t type, size_eq, size_min; | ||
486 | |||
487 | if (MSG_STATE_START == ch->recv_state) | ||
488 | { | ||
489 | ch->recv_message_id = GNUNET_ntohll (pmsg->message_id); | ||
490 | ch->recv_flags = ntohl (pmsg->flags); | ||
491 | } | ||
492 | else if (GNUNET_ntohll (pmsg->message_id) != ch->recv_message_id) | ||
493 | { | ||
494 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
495 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", | ||
496 | GNUNET_ntohll (pmsg->message_id), ch->recv_message_id); | ||
497 | GNUNET_break_op (0); | ||
498 | recv_error (ch); | ||
499 | } | ||
500 | else if (ntohl (pmsg->flags) != ch->recv_flags) | ||
501 | { | ||
502 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
503 | "Unexpected message flags. Got: %lu, expected: %lu\n", | ||
504 | ntohl (pmsg->flags), ch->recv_flags); | ||
505 | GNUNET_break_op (0); | ||
506 | recv_error (ch); | ||
507 | } | ||
508 | |||
509 | for (pos = 0; sizeof (*pmsg) + pos < msize; pos += size) | ||
510 | { | ||
511 | msg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | ||
512 | size = ntohs (msg->size); | ||
513 | type = ntohs (msg->type); | ||
514 | size_eq = size_min = 0; | ||
515 | |||
516 | if (msize < sizeof (*pmsg) + pos + size) | ||
517 | { | ||
518 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
519 | "Discarding message of type %u with invalid size. " | ||
520 | "(%u < %u + %u + %u)\n", ntohs (msg->type), | ||
521 | msize, sizeof (*msg), pos, size); | ||
522 | break; | ||
523 | } | ||
524 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
525 | "Received message part of type %u and size %u from PSYC.\n", | ||
526 | ntohs (msg->type), size); | ||
527 | |||
528 | |||
529 | switch (type) | ||
530 | { | ||
531 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
532 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | ||
533 | break; | ||
534 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
535 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
536 | break; | ||
537 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
538 | size_min = sizeof (struct GNUNET_MessageHeader); | ||
539 | break; | ||
540 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
541 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
542 | size_eq = sizeof (struct GNUNET_MessageHeader); | ||
543 | break; | ||
544 | } | ||
545 | |||
546 | if (! ((0 < size_eq && size == size_eq) | ||
547 | || (0 < size_min && size_min <= size))) | ||
548 | { | ||
549 | GNUNET_break (0); | ||
550 | reschedule_connect (ch); | ||
551 | return; | ||
552 | } | ||
553 | |||
554 | switch (type) | ||
555 | { | ||
556 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
557 | { | ||
558 | struct GNUNET_PSYC_MessageMethod *meth | ||
559 | = (struct GNUNET_PSYC_MessageMethod *) msg; | ||
560 | |||
561 | if (MSG_STATE_HEADER != ch->recv_state) | ||
562 | { | ||
563 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
564 | "Discarding out of order message method.\n"); | ||
565 | /* It is normal to receive an incomplete message right after connecting, | ||
566 | * but should not happen later. | ||
567 | * FIXME: add a check for this condition. | ||
568 | */ | ||
569 | GNUNET_break_op (0); | ||
570 | recv_error (ch); | ||
571 | break; | ||
572 | } | ||
573 | |||
574 | if ('\0' != (char *) meth + msg->size - 1) | ||
575 | { | ||
576 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
577 | "Discarding message with malformed method. " | ||
578 | "Message ID: %" PRIu64 "\n", ch->recv_message_id); | ||
579 | GNUNET_break_op (0); | ||
580 | recv_error (ch); | ||
581 | break; | ||
582 | } | ||
583 | GNUNET_PSYC_MessageCallback message_cb | ||
584 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
585 | ? ch->hist_message_cb | ||
586 | : ch->message_cb; | ||
587 | |||
588 | if (NULL != message_cb) | ||
589 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); | ||
590 | |||
591 | ch->recv_state = MSG_STATE_METHOD; | ||
592 | break; | ||
593 | } | ||
594 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
595 | { | ||
596 | if (MSG_STATE_MODIFIER != ch->recv_state) | ||
597 | { | ||
598 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
599 | "Discarding out of order message modifier.\n"); | ||
600 | GNUNET_break_op (0); | ||
601 | recv_error (ch); | ||
602 | break; | ||
603 | } | ||
604 | |||
605 | struct GNUNET_PSYC_MessageModifier *mod | ||
606 | = (struct GNUNET_PSYC_MessageModifier *) msg; | ||
607 | |||
608 | uint16_t name_size = ntohs (mod->name_size); | ||
609 | ch->recv_mod_value_size_expected = ntohs (mod->value_size); | ||
610 | ch->recv_mod_value_size = size - sizeof (*mod) - name_size - 1; | ||
611 | |||
612 | if (size < sizeof (*mod) + name_size + 1 | ||
613 | || '\0' != (char *) &mod[1] + mod->name_size | ||
614 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | ||
615 | { | ||
616 | LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); | ||
617 | GNUNET_break_op (0); | ||
618 | break; | ||
619 | } | ||
620 | |||
621 | ch->recv_state = MSG_STATE_MODIFIER; | ||
622 | |||
623 | GNUNET_PSYC_MessageCallback message_cb | ||
624 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
625 | ? ch->hist_message_cb | ||
626 | : ch->message_cb; | ||
627 | |||
628 | if (NULL != message_cb) | ||
629 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); | ||
630 | |||
631 | break; | ||
632 | } | ||
633 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
634 | { | ||
635 | ch->recv_mod_value_size += size - sizeof (*msg); | ||
636 | |||
637 | if (MSG_STATE_MODIFIER != ch->recv_state | ||
638 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | ||
639 | { | ||
640 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
641 | "Discarding out of order message modifier continuation.\n"); | ||
642 | GNUNET_break_op (0); | ||
643 | recv_reset (ch); | ||
644 | break; | ||
645 | } | ||
646 | |||
647 | GNUNET_PSYC_MessageCallback message_cb | ||
648 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
649 | ? ch->hist_message_cb | ||
650 | : ch->message_cb; | ||
651 | |||
652 | if (NULL != message_cb) | ||
653 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); | ||
654 | break; | ||
655 | } | ||
656 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
657 | { | ||
658 | if (ch->recv_state < MSG_STATE_METHOD | ||
659 | || ch->recv_mod_value_size_expected != ch->recv_mod_value_size) | ||
660 | { | ||
661 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
662 | "Discarding out of order message data fragment.\n"); | ||
663 | GNUNET_break_op (0); | ||
664 | recv_reset (ch); | ||
665 | break; | ||
666 | } | ||
667 | |||
668 | ch->recv_state = MSG_STATE_DATA; | ||
669 | break; | ||
670 | } | ||
671 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
672 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
673 | recv_reset (ch); | ||
674 | break; | ||
675 | } | ||
303 | } | 676 | } |
304 | } | 677 | } |
305 | 678 | ||
@@ -319,11 +692,10 @@ message_handler (void *cls, | |||
319 | struct GNUNET_PSYC_Channel *ch = cls; | 692 | struct GNUNET_PSYC_Channel *ch = cls; |
320 | struct GNUNET_PSYC_Master *mst = cls; | 693 | struct GNUNET_PSYC_Master *mst = cls; |
321 | struct GNUNET_PSYC_Slave *slv = cls; | 694 | struct GNUNET_PSYC_Slave *slv = cls; |
322 | struct CountersResult *cres; | ||
323 | struct TransmitAck *tack; | ||
324 | 695 | ||
325 | if (NULL == msg) | 696 | if (NULL == msg) |
326 | { | 697 | { |
698 | GNUNET_break (0); | ||
327 | reschedule_connect (ch); | 699 | reschedule_connect (ch); |
328 | return; | 700 | return; |
329 | } | 701 | } |
@@ -342,8 +714,8 @@ message_handler (void *cls, | |||
342 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: | 714 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: |
343 | size_eq = sizeof (struct CountersResult); | 715 | size_eq = sizeof (struct CountersResult); |
344 | break; | 716 | break; |
345 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | 717 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: |
346 | size_eq = sizeof (struct TransmitAck); | 718 | size_min = sizeof (struct GNUNET_PSYC_MessageHeader); |
347 | break; | 719 | break; |
348 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 720 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: |
349 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | 721 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); |
@@ -352,11 +724,13 @@ message_handler (void *cls, | |||
352 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | 724 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); |
353 | break; | 725 | break; |
354 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | 726 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: |
355 | size_min = sizeof (struct GNUNET_PSYC_MessageData); | 727 | size_min = sizeof (struct GNUNET_MessageHeader); |
728 | break; | ||
729 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
730 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
731 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | ||
732 | size_eq = sizeof (struct GNUNET_MessageHeader); | ||
356 | break; | 733 | break; |
357 | default: | ||
358 | GNUNET_break_op (0); | ||
359 | return; | ||
360 | } | 734 | } |
361 | 735 | ||
362 | if (! ((0 < size_eq && size == size_eq) | 736 | if (! ((0 < size_eq && size == size_eq) |
@@ -370,38 +744,63 @@ message_handler (void *cls, | |||
370 | switch (type) | 744 | switch (type) |
371 | { | 745 | { |
372 | case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: | 746 | case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: |
373 | cres = (struct CountersResult *) msg; | 747 | { |
748 | struct CountersResult *cres = (struct CountersResult *) msg; | ||
374 | mst->max_message_id = GNUNET_ntohll (cres->max_message_id); | 749 | mst->max_message_id = GNUNET_ntohll (cres->max_message_id); |
375 | if (NULL != mst->start_cb) | 750 | if (NULL != mst->start_cb) |
376 | mst->start_cb (ch->cb_cls, mst->max_message_id); | 751 | mst->start_cb (ch->cb_cls, mst->max_message_id); |
377 | break; | 752 | break; |
378 | 753 | } | |
379 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: | 754 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: |
380 | cres = (struct CountersResult *) msg; | 755 | { |
381 | #if TODO | 756 | #if TODO |
757 | struct CountersResult *cres = (struct CountersResult *) msg; | ||
382 | slv->max_message_id = GNUNET_ntohll (cres->max_message_id); | 758 | slv->max_message_id = GNUNET_ntohll (cres->max_message_id); |
383 | if (NULL != slv->join_ack_cb) | 759 | if (NULL != slv->join_ack_cb) |
384 | mst->join_ack_cb (ch->cb_cls, mst->max_message_id); | 760 | mst->join_ack_cb (ch->cb_cls, mst->max_message_id); |
385 | #endif | 761 | #endif |
386 | break; | 762 | break; |
387 | 763 | } | |
388 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | 764 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: |
389 | tack = (struct TransmitAck *) msg; | 765 | { |
766 | ch->tmit_ack_pending = GNUNET_NO; | ||
767 | |||
390 | if (ch->is_master) | 768 | if (ch->is_master) |
391 | { | 769 | { |
392 | GNUNET_assert (NULL != mst->tmit); | 770 | GNUNET_assert (NULL != mst->tmit); |
393 | if (GNUNET_PSYC_DATA_CONT != mst->tmit->status | 771 | switch (mst->tmit->state) |
394 | || NULL == mst->tmit->notify) | ||
395 | { | ||
396 | GNUNET_free (mst->tmit); | ||
397 | mst->tmit = NULL; | ||
398 | } | ||
399 | else | ||
400 | { | 772 | { |
401 | ch->tmit_buf_avail = ntohs (tack->buf_avail); | 773 | case MSG_STATE_MODIFIER: |
402 | ch->tmit_ack_pending = GNUNET_NO; | 774 | if (GNUNET_NO == ch->tmit_paused) |
775 | master_transmit_mod (mst); | ||
776 | break; | ||
777 | |||
778 | case MSG_STATE_MOD_CONT: | ||
779 | if (GNUNET_NO == ch->tmit_paused) | ||
780 | master_transmit_mod (mst); | ||
781 | break; | ||
782 | |||
783 | case MSG_STATE_DATA: | ||
403 | if (GNUNET_NO == ch->tmit_paused) | 784 | if (GNUNET_NO == ch->tmit_paused) |
404 | master_transmit_data (mst); | 785 | master_transmit_data (mst); |
786 | break; | ||
787 | |||
788 | case MSG_STATE_END: | ||
789 | case MSG_STATE_CANCEL: | ||
790 | if (NULL != mst->tmit) | ||
791 | { | ||
792 | GNUNET_free (mst->tmit); | ||
793 | mst->tmit = NULL; | ||
794 | } | ||
795 | else | ||
796 | { | ||
797 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
798 | "Ignoring transmit ack, there's no transmission going on.\n"); | ||
799 | } | ||
800 | break; | ||
801 | default: | ||
802 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
803 | "Ignoring unexpected transmit ack.\n"); | ||
405 | } | 804 | } |
406 | } | 805 | } |
407 | else | 806 | else |
@@ -409,17 +808,10 @@ message_handler (void *cls, | |||
409 | /* TODO: slave */ | 808 | /* TODO: slave */ |
410 | } | 809 | } |
411 | break; | 810 | break; |
811 | } | ||
412 | 812 | ||
413 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 813 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: |
414 | 814 | handle_psyc_message(ch, (const struct GNUNET_PSYC_MessageHeader *) msg); | |
415 | break; | ||
416 | |||
417 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
418 | |||
419 | break; | ||
420 | |||
421 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
422 | |||
423 | break; | 815 | break; |
424 | } | 816 | } |
425 | 817 | ||
@@ -506,6 +898,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
506 | { | 898 | { |
507 | struct GNUNET_PSYC_Channel *ch = cls; | 899 | struct GNUNET_PSYC_Channel *ch = cls; |
508 | 900 | ||
901 | recv_reset (ch); | ||
509 | ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | 902 | ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; |
510 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 903 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
511 | "Connecting to PSYC service.\n"); | 904 | "Connecting to PSYC service.\n"); |
@@ -588,7 +981,7 @@ disconnect (void *c) | |||
588 | * one in the future. | 981 | * one in the future. |
589 | * @param policy Channel policy specifying join and history restrictions. | 982 | * @param policy Channel policy specifying join and history restrictions. |
590 | * Used to automate join decisions. | 983 | * Used to automate join decisions. |
591 | * @param method Function to invoke on messages received from slaves. | 984 | * @param message_cb Function to invoke on message parts received from slaves. |
592 | * @param join_cb Function to invoke when a peer wants to join. | 985 | * @param join_cb Function to invoke when a peer wants to join. |
593 | * @param master_started_cb Function to invoke after the channel master started. | 986 | * @param master_started_cb Function to invoke after the channel master started. |
594 | * @param cls Closure for @a master_started_cb and @a join_cb. | 987 | * @param cls Closure for @a master_started_cb and @a join_cb. |
@@ -598,7 +991,7 @@ struct GNUNET_PSYC_Master * | |||
598 | GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | 991 | GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, |
599 | const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key, | 992 | const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key, |
600 | enum GNUNET_PSYC_Policy policy, | 993 | enum GNUNET_PSYC_Policy policy, |
601 | GNUNET_PSYC_Method method, | 994 | GNUNET_PSYC_MessageCallback message_cb, |
602 | GNUNET_PSYC_JoinCallback join_cb, | 995 | GNUNET_PSYC_JoinCallback join_cb, |
603 | GNUNET_PSYC_MasterStartCallback master_started_cb, | 996 | GNUNET_PSYC_MasterStartCallback master_started_cb, |
604 | void *cls) | 997 | void *cls) |
@@ -618,7 +1011,7 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
618 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 1011 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; |
619 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); | 1012 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); |
620 | 1013 | ||
621 | ch->method_cb = method; | 1014 | ch->message_cb = message_cb; |
622 | ch->join_cb = join_cb; | 1015 | ch->join_cb = join_cb; |
623 | ch->cb_cls = cls; | 1016 | ch->cb_cls = cls; |
624 | mst->start_cb = master_started_cb; | 1017 | mst->start_cb = master_started_cb; |
@@ -705,19 +1098,17 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) | |||
705 | * | 1098 | * |
706 | * @param master Handle to the PSYC channel. | 1099 | * @param master Handle to the PSYC channel. |
707 | * @param method_name Which method should be invoked. | 1100 | * @param method_name Which method should be invoked. |
708 | * @param env Environment containing state operations and transient variables | 1101 | * @param notify_mod Function to call to obtain modifiers. |
709 | * for the message, or NULL. | 1102 | * @param notify_data Function to call to obtain fragments of the data. |
710 | * @param notify Function to call to obtain the arguments. | 1103 | * @param notify_cls Closure for @a notify_mod and @a notify_data. |
711 | * @param notify_cls Closure for @a notify. | ||
712 | * @param flags Flags for the message being transmitted. | 1104 | * @param flags Flags for the message being transmitted. |
713 | * @return Transmission handle, NULL on error (i.e. more than one request | 1105 | * @return Transmission handle, NULL on error (i.e. more than one request queued). |
714 | * queued). | ||
715 | */ | 1106 | */ |
716 | struct GNUNET_PSYC_MasterTransmitHandle * | 1107 | struct GNUNET_PSYC_MasterTransmitHandle * |
717 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | 1108 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, |
718 | const char *method_name, | 1109 | const char *method_name, |
719 | const struct GNUNET_ENV_Environment *env, | 1110 | GNUNET_PSYC_MasterTransmitNotify notify_mod, |
720 | GNUNET_PSYC_MasterTransmitNotify notify, | 1111 | GNUNET_PSYC_MasterTransmitNotify notify_data, |
721 | void *notify_cls, | 1112 | void *notify_cls, |
722 | enum GNUNET_PSYC_MasterTransmitFlags flags) | 1113 | enum GNUNET_PSYC_MasterTransmitFlags flags) |
723 | { | 1114 | { |
@@ -737,18 +1128,17 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | |||
737 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | 1128 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); |
738 | pmeth->header.size = htons (sizeof (*pmeth) + size); | 1129 | pmeth->header.size = htons (sizeof (*pmeth) + size); |
739 | pmeth->flags = htonl (flags); | 1130 | pmeth->flags = htonl (flags); |
740 | pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); | ||
741 | memcpy (&pmeth[1], method_name, size); | 1131 | memcpy (&pmeth[1], method_name, size); |
742 | 1132 | ||
743 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 1133 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); |
744 | GNUNET_ENV_environment_iterate (env, send_modifier, master); | ||
745 | transmit_next (ch); | 1134 | transmit_next (ch); |
746 | 1135 | ||
747 | master->tmit = GNUNET_malloc (sizeof (*master->tmit)); | 1136 | master->tmit = GNUNET_malloc (sizeof (*master->tmit)); |
748 | master->tmit->master = master; | 1137 | master->tmit->master = master; |
749 | master->tmit->notify = notify; | 1138 | master->tmit->notify_mod = notify_mod; |
1139 | master->tmit->notify_data = notify_data; | ||
750 | master->tmit->notify_cls = notify_cls; | 1140 | master->tmit->notify_cls = notify_cls; |
751 | master->tmit->status = GNUNET_PSYC_DATA_CONT; | 1141 | master->tmit->state = MSG_STATE_START; // FIXME |
752 | return master->tmit; | 1142 | return master->tmit; |
753 | } | 1143 | } |
754 | 1144 | ||
@@ -804,12 +1194,13 @@ GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) | |||
804 | * @param relay_count Number of peers in the @a relays array. | 1194 | * @param relay_count Number of peers in the @a relays array. |
805 | * @param relays Peer identities of members of the multicast group, which serve | 1195 | * @param relays Peer identities of members of the multicast group, which serve |
806 | * as relays and used to join the group at. | 1196 | * as relays and used to join the group at. |
807 | * @param method Function to invoke on messages received from the channel, | 1197 | * @param message_cb Function to invoke on message parts received from the |
808 | * typically at least contains functions for @e join and @e part. | 1198 | * channel, typically at least contains method handlers for @e join and |
1199 | * @e part. | ||
809 | * @param join_cb function invoked once we have joined with the current | 1200 | * @param join_cb function invoked once we have joined with the current |
810 | * message ID of the channel | 1201 | * message ID of the channel |
811 | * @param slave_joined_cb Function to invoke when a peer wants to join. | 1202 | * @param slave_joined_cb Function to invoke when a peer wants to join. |
812 | * @param cls Closure for @a method_cb and @a slave_joined_cb. | 1203 | * @param cls Closure for @a message_cb and @a slave_joined_cb. |
813 | * @param method_name Method name for the join request. | 1204 | * @param method_name Method name for the join request. |
814 | * @param env Environment containing transient variables for the request, or NULL. | 1205 | * @param env Environment containing transient variables for the request, or NULL. |
815 | * @param data Payload for the join message. | 1206 | * @param data Payload for the join message. |
@@ -823,7 +1214,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
823 | const struct GNUNET_PeerIdentity *origin, | 1214 | const struct GNUNET_PeerIdentity *origin, |
824 | uint32_t relay_count, | 1215 | uint32_t relay_count, |
825 | const struct GNUNET_PeerIdentity *relays, | 1216 | const struct GNUNET_PeerIdentity *relays, |
826 | GNUNET_PSYC_Method method, | 1217 | GNUNET_PSYC_MessageCallback message_cb, |
827 | GNUNET_PSYC_JoinCallback join_cb, | 1218 | GNUNET_PSYC_JoinCallback join_cb, |
828 | GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, | 1219 | GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, |
829 | void *cls, | 1220 | void *cls, |
@@ -845,6 +1236,10 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
845 | req->relay_count = relay_count; | 1236 | req->relay_count = relay_count; |
846 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); | 1237 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); |
847 | 1238 | ||
1239 | ch->message_cb = message_cb; | ||
1240 | ch->join_cb = join_cb; | ||
1241 | ch->cb_cls = cls; | ||
1242 | |||
848 | ch->cfg = cfg; | 1243 | ch->cfg = cfg; |
849 | ch->is_master = GNUNET_NO; | 1244 | ch->is_master = GNUNET_NO; |
850 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; | 1245 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; |
@@ -1043,7 +1438,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, | |||
1043 | * @param channel Which channel should be replayed? | 1438 | * @param channel Which channel should be replayed? |
1044 | * @param start_message_id Earliest interesting point in history. | 1439 | * @param start_message_id Earliest interesting point in history. |
1045 | * @param end_message_id Last (exclusive) interesting point in history. | 1440 | * @param end_message_id Last (exclusive) interesting point in history. |
1046 | * @param method Function to invoke on messages received from the story. | 1441 | * @param message_cb Function to invoke on message parts received from the story. |
1047 | * @param finish_cb Function to call when the requested story has been fully | 1442 | * @param finish_cb Function to call when the requested story has been fully |
1048 | * told (counting message IDs might not suffice, as some messages | 1443 | * told (counting message IDs might not suffice, as some messages |
1049 | * might be secret and thus the listener would not know the story is | 1444 | * might be secret and thus the listener would not know the story is |
@@ -1057,8 +1452,8 @@ struct GNUNET_PSYC_Story * | |||
1057 | GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, | 1452 | GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, |
1058 | uint64_t start_message_id, | 1453 | uint64_t start_message_id, |
1059 | uint64_t end_message_id, | 1454 | uint64_t end_message_id, |
1060 | GNUNET_PSYC_Method method, | 1455 | GNUNET_PSYC_MessageCallback message_cb, |
1061 | GNUNET_PSYC_FinishCallback *finish_cb, | 1456 | GNUNET_PSYC_FinishCallback finish_cb, |
1062 | void *cls) | 1457 | void *cls) |
1063 | { | 1458 | { |
1064 | return NULL; | 1459 | return NULL; |
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 2986fdf6a..704819c50 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -148,11 +148,16 @@ join (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | |||
148 | struct TransmitClosure | 148 | struct TransmitClosure |
149 | { | 149 | { |
150 | struct GNUNET_PSYC_MasterTransmitHandle *handle; | 150 | struct GNUNET_PSYC_MasterTransmitHandle *handle; |
151 | uint8_t n; | 151 | |
152 | char *mod_names[16]; | ||
153 | char *mod_values[16]; | ||
154 | char *data[16]; | ||
155 | |||
156 | uint8_t mod_count; | ||
157 | uint8_t data_count; | ||
158 | |||
152 | uint8_t paused; | 159 | uint8_t paused; |
153 | uint8_t fragment_count; | 160 | uint8_t n; |
154 | char *fragments[16]; | ||
155 | uint16_t fragment_sizes[16]; | ||
156 | }; | 161 | }; |
157 | 162 | ||
158 | 163 | ||
@@ -167,16 +172,47 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
167 | 172 | ||
168 | 173 | ||
169 | static int | 174 | static int |
170 | transmit_notify (void *cls, size_t *data_size, void *data) | 175 | tmit_notify_mod (void *cls, size_t *data_size, void *data) |
171 | { | 176 | { |
172 | struct TransmitClosure *tmit = cls; | 177 | struct TransmitClosure *tmit = cls; |
173 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 178 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
174 | "Transmit notify: %lu bytes available, " | 179 | "Transmit notify modifier: %lu bytes available, " |
180 | "processing modifier %u/%u.\n", | ||
181 | *data_size, tmit->n + 1, tmit->fragment_count); | ||
182 | /* FIXME: continuation */ | ||
183 | uint16_t name_size = strlen (tmit->mod_names[tmit->n]); | ||
184 | uint16_t value_size = strlen (tmit->mod_values[tmit->n]); | ||
185 | if (name_size + 1 + value_size <= *data_size) | ||
186 | return GNUNET_NO; | ||
187 | |||
188 | *data_size = name_size + 1 + value_size; | ||
189 | memcpy (data, tmit->fragments[tmit->n], *data_size); | ||
190 | |||
191 | if (++tmit->n < tmit->mod_count) | ||
192 | { | ||
193 | return GNUNET_NO; | ||
194 | } | ||
195 | else | ||
196 | { | ||
197 | tmit->n = 0; | ||
198 | return GNUNET_YES; | ||
199 | } | ||
200 | } | ||
201 | |||
202 | |||
203 | static int | ||
204 | tmit_notify_data (void *cls, size_t *data_size, void *data) | ||
205 | { | ||
206 | struct TransmitClosure *tmit = cls; | ||
207 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
208 | "Transmit notify data: %lu bytes available, " | ||
175 | "processing fragment %u/%u.\n", | 209 | "processing fragment %u/%u.\n", |
176 | *data_size, tmit->n + 1, tmit->fragment_count); | 210 | *data_size, tmit->n + 1, tmit->fragment_count); |
177 | GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); | 211 | uint16_t size = strlen (tmit->data[tmit->n]); |
212 | if (size <= *data_size) | ||
213 | return GNUNET_NO; | ||
178 | 214 | ||
179 | if (GNUNET_YES == tmit->paused && tmit->n == tmit->fragment_count - 1) | 215 | if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1) |
180 | { | 216 | { |
181 | /* Send last fragment later. */ | 217 | /* Send last fragment later. */ |
182 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); | 218 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); |
@@ -188,13 +224,13 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
188 | return GNUNET_NO; | 224 | return GNUNET_NO; |
189 | } | 225 | } |
190 | 226 | ||
191 | GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); | 227 | *data_size = size; |
192 | *data_size = tmit->fragment_sizes[tmit->n]; | 228 | memcpy (data, tmit->data[tmit->n], size); |
193 | memcpy (data, tmit->fragments[tmit->n], *data_size); | ||
194 | 229 | ||
195 | return ++tmit->n < tmit->fragment_count ? GNUNET_NO : GNUNET_YES; | 230 | return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES; |
196 | } | 231 | } |
197 | 232 | ||
233 | |||
198 | void | 234 | void |
199 | master_started (void *cls, uint64_t max_message_id) | 235 | master_started (void *cls, uint64_t max_message_id) |
200 | { | 236 | { |
@@ -208,15 +244,13 @@ master_started (void *cls, uint64_t max_message_id) | |||
208 | "_foo_bar", "foo bar baz", 11); | 244 | "_foo_bar", "foo bar baz", 11); |
209 | 245 | ||
210 | struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); | 246 | struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); |
211 | tmit->fragment_count = 3; | 247 | tmit->data[0] = "foo"; |
212 | tmit->fragments[0] = "foo"; | 248 | tmit->data[1] = "foo bar"; |
213 | tmit->fragment_sizes[0] = 4; | 249 | tmit->data[2] = "foo bar baz"; |
214 | tmit->fragments[1] = "foo bar"; | 250 | tmit->data_count = 3; |
215 | tmit->fragment_sizes[1] = 7; | ||
216 | tmit->fragments[2] = "foo bar baz"; | ||
217 | tmit->fragment_sizes[2] = 11; | ||
218 | tmit->handle | 251 | tmit->handle |
219 | = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit, | 252 | = GNUNET_PSYC_master_transmit (mst, "_test", tmit_notify_mod, |
253 | tmit_notify_data, tmit, | ||
220 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); | 254 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); |
221 | } | 255 | } |
222 | 256 | ||