aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-01-06 00:09:37 +0000
committerGabor X Toth <*@tg-x.net>2014-01-06 00:09:37 +0000
commitc04d45b9738e1764d2e2c21efdbeb129f298d5d1 (patch)
tree9eec32efdd3fe3f9f459630af16058cc47436bce /src/psyc
parent83a0e31631dbc199c37c42f11004e1be544f04a8 (diff)
downloadgnunet-c04d45b9738e1764d2e2c21efdbeb129f298d5d1.tar.gz
gnunet-c04d45b9738e1764d2e2c21efdbeb129f298d5d1.zip
psyc: ipc messages
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c325
-rw-r--r--src/psyc/psyc.h19
-rw-r--r--src/psyc/psyc_api.c569
-rw-r--r--src/psyc/test_psyc.c74
4 files changed, 774 insertions, 213 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index a3b1b8f82..628c39900 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -71,9 +71,9 @@ struct TransmitMessage
71 char *buf; 71 char *buf;
72 uint16_t size; 72 uint16_t size;
73 /** 73 /**
74 * enum GNUNET_PSYC_DataStatus 74 * enum MessageState
75 */ 75 */
76 uint8_t status; 76 uint8_t state;
77}; 77};
78 78
79/** 79/**
@@ -87,12 +87,21 @@ struct Channel
87 struct TransmitMessage *tmit_tail; 87 struct TransmitMessage *tmit_tail;
88 88
89 GNUNET_SCHEDULER_TaskIdentifier tmit_task; 89 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
90 uint32_t tmit_mod_count; 90
91 uint32_t tmit_mod_recvd;
92 /** 91 /**
93 * enum GNUNET_PSYC_DataStatus 92 * Expected value size for the modifier being received from the PSYC service.
94 */ 93 */
95 uint8_t tmit_status; 94 uint32_t tmit_mod_value_size_expected;
95
96 /**
97 * Actual value size for the modifier being received from the PSYC service.
98 */
99 uint32_t tmit_mod_value_size;
100
101 /**
102 * enum MessageState
103 */
104 uint8_t tmit_state;
96 105
97 uint8_t in_transmit; 106 uint8_t in_transmit;
98 uint8_t is_master; 107 uint8_t is_master;
@@ -112,12 +121,27 @@ struct Master
112 struct GNUNET_MULTICAST_Origin *origin; 121 struct GNUNET_MULTICAST_Origin *origin;
113 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; 122 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
114 123
124 /**
125 * Maximum message ID for this channel.
126 *
127 * Incremented before sending a message, thus the message_id in messages sent
128 * starts from 1.
129 */
115 uint64_t max_message_id; 130 uint64_t max_message_id;
131
132 /**
133 * ID of the last message that contains any state operations.
134 * 0 if there is no such message.
135 */
116 uint64_t max_state_message_id; 136 uint64_t max_state_message_id;
137
138 /**
139 * Maximum group generation for this channel.
140 */
117 uint64_t max_group_generation; 141 uint64_t max_group_generation;
118 142
119 /** 143 /**
120 * enum GNUNET_PSYC_Policy 144 * @see enum GNUNET_PSYC_Policy
121 */ 145 */
122 uint32_t policy; 146 uint32_t policy;
123}; 147};
@@ -196,6 +220,7 @@ client_cleanup (struct Channel *ch)
196 GNUNET_free (ch); 220 GNUNET_free (ch);
197} 221}
198 222
223
199/** 224/**
200 * Called whenever a client is disconnected. 225 * Called whenever a client is disconnected.
201 * Frees our resources associated with that client. 226 * Frees our resources associated with that client.
@@ -234,7 +259,8 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
234 } 259 }
235} 260}
236 261
237void 262
263static void
238join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, 264join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
239 const struct GNUNET_MessageHeader *join_req, 265 const struct GNUNET_MessageHeader *join_req,
240 struct GNUNET_MULTICAST_JoinHandle *jh) 266 struct GNUNET_MULTICAST_JoinHandle *jh)
@@ -242,7 +268,8 @@ join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
242 268
243} 269}
244 270
245void 271
272static void
246membership_test_cb (void *cls, 273membership_test_cb (void *cls,
247 const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, 274 const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
248 uint64_t message_id, uint64_t group_generation, 275 uint64_t message_id, uint64_t group_generation,
@@ -251,16 +278,18 @@ membership_test_cb (void *cls,
251 278
252} 279}
253 280
254void 281
282static void
255replay_fragment_cb (void *cls, 283replay_fragment_cb (void *cls,
256 const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, 284 const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
257 uint64_t fragment_id, uint64_t flags, 285 uint64_t fragment_id, uint64_t flags,
258 struct GNUNET_MULTICAST_ReplayHandle *rh) 286 struct GNUNET_MULTICAST_ReplayHandle *rh)
259{
260 287
288{
261} 289}
262 290
263void 291
292static void
264replay_message_cb (void *cls, 293replay_message_cb (void *cls,
265 const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, 294 const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
266 uint64_t message_id, 295 uint64_t message_id,
@@ -271,56 +300,30 @@ replay_message_cb (void *cls,
271 300
272} 301}
273 302
274void
275request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
276 const struct GNUNET_MessageHeader *req,
277 enum GNUNET_MULTICAST_MessageFlags flags)
278{
279
280}
281
282 303
283void 304static void
284fragment_store_result (void *cls, int64_t result, const char *err_msg) 305fragment_store_result (void *cls, int64_t result, const char *err_msg)
285{ 306{
286 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 307 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
287 "fragment_store() returned %l (%s)\n", result, err_msg); 308 "fragment_store() returned %l (%s)\n", result, err_msg);
288} 309}
289 310
311
290/** 312/**
291 * Send PSYC messages in an incoming multicast message to a client. 313 * Iterator callback for sending a message to a client.
314 *
315 * @see message_cb()
292 */ 316 */
293int 317static int
294send_to_client (void *cls, const struct GNUNET_HashCode *ch_key_hash, void *chan) 318message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
319 void *chan)
295{ 320{
296 const struct GNUNET_MULTICAST_MessageHeader *msg = cls; 321 const struct GNUNET_MessageHeader *msg = cls;
297 struct Channel *ch = chan; 322 struct Channel *ch = chan;
298 323
299 uint16_t size = ntohs (msg->header.size); 324 GNUNET_SERVER_notification_context_add (nc, ch->client);
300 uint16_t pos = 0; 325 GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO);
301
302 while (sizeof (*msg) + pos < size)
303 {
304 const struct GNUNET_MessageHeader *pmsg
305 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
306 uint16_t psize = ntohs (pmsg->size);
307 if (sizeof (*msg) + pos + psize > size)
308 {
309 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
310 "Ignoring message of type %u with invalid size. "
311 "(%u + %u + %u > %u)\n", ntohs (pmsg->type),
312 sizeof (*msg), pos, psize, size);
313 break;
314 }
315 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
316 "Sending message of type %u and size %u to client.\n",
317 ntohs (pmsg->type), psize);
318 326
319 GNUNET_SERVER_notification_context_add (nc, ch->client);
320 GNUNET_SERVER_notification_context_unicast (nc, ch->client, pmsg,
321 GNUNET_NO);
322 pos += psize;
323 }
324 return GNUNET_YES; 327 return GNUNET_YES;
325} 328}
326 329
@@ -330,7 +333,7 @@ send_to_client (void *cls, const struct GNUNET_HashCode *ch_key_hash, void *chan
330 * 333 *
331 * Store it using PSYCstore and send it to all clients of the channel. 334 * Store it using PSYCstore and send it to all clients of the channel.
332 */ 335 */
333void 336static void
334message_cb (void *cls, const struct GNUNET_MessageHeader *msg) 337message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
335{ 338{
336 uint16_t type = ntohs (msg->type); 339 uint16_t type = ntohs (msg->type);
@@ -344,34 +347,100 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
344 struct Master *mst = cls; 347 struct Master *mst = cls;
345 struct Slave *slv = cls; 348 struct Slave *slv = cls;
346 349
347 struct GNUNET_CRYPTO_EddsaPublicKey *ch_key 350 /* const struct GNUNET_MULTICAST_MessageHeader *mmsg
351 = (const struct GNUNET_MULTICAST_MessageHeader *) msg; */
352 struct GNUNET_CRYPTO_EddsaPublicKey *chan_key
348 = ch->is_master ? &mst->pub_key : &slv->chan_key; 353 = ch->is_master ? &mst->pub_key : &slv->chan_key;
349 struct GNUNET_HashCode *ch_key_hash 354 struct GNUNET_HashCode *chan_key_hash
350 = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash; 355 = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash;
351 356
352 switch (type) 357 switch (type)
353 { 358 {
354 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: 359 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
355 GNUNET_PSYCSTORE_fragment_store (store, ch_key, 360 {
361 GNUNET_PSYCSTORE_fragment_store (store, chan_key,
356 (const struct 362 (const struct
357 GNUNET_MULTICAST_MessageHeader *) msg, 363 GNUNET_MULTICAST_MessageHeader *) msg,
358 0, NULL, NULL); 364 0, NULL, NULL);
359 GNUNET_CONTAINER_multihashmap_get_multiple (clients, ch_key_hash,
360 send_to_client, (void *) msg);
361 break;
362 365
366 uint16_t size = ntohs (msg->size);
367 uint16_t psize = 0;
368 uint16_t pos = 0;
369
370 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
371 {
372 const struct GNUNET_MessageHeader *pmsg
373 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
374 uint16_t psize = ntohs (pmsg->size);
375 if (sizeof (*msg) + pos + psize > size)
376 {
377 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
378 "Message received from multicast contains invalid PSYC "
379 "message. Not sending to clients.\n");
380 return;
381 }
382 }
383
384#if TODO
385 /* FIXME: apply modifiers to state in PSYCstore */
386 GNUNET_PSYCSTORE_state_modify (store, chan_key,
387 GNUNET_ntohll (mmsg->message_id),
388 meth->mod_count, mods,
389 rcb, rcb_cls);
390#endif
391
392 const struct GNUNET_MULTICAST_MessageHeader *mmsg
393 = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
394 struct GNUNET_PSYC_MessageHeader *pmsg;
395
396 psize = sizeof (*pmsg) + size - sizeof (*mmsg);
397 pmsg = GNUNET_malloc (psize);
398 pmsg->header.size = htons (psize);
399 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
400 pmsg->message_id = mmsg->message_id;
401
402 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
403
404 GNUNET_CONTAINER_multihashmap_get_multiple (clients, chan_key_hash,
405 message_to_client,
406 (void *) pmsg);
407 GNUNET_free (pmsg);
408 break;
409 }
363 default: 410 default:
364 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 411 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
365 "Ignoring unknown message of type %u and size %u.\n", 412 "Discarding unknown message of type %u and size %u.\n",
366 type, size); 413 type, size);
367 } 414 }
368} 415}
369 416
370 417
371/** 418/**
419 * Send a request received from multicast to a client.
420 */
421static int
422request_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
423 void *chan)
424{
425 /* TODO */
426
427 return GNUNET_YES;
428}
429
430
431static void
432request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
433 const struct GNUNET_MessageHeader *req,
434 enum GNUNET_MULTICAST_MessageFlags flags)
435{
436
437}
438
439
440/**
372 * Response from PSYCstore with the current counter values for a channel master. 441 * Response from PSYCstore with the current counter values for a channel master.
373 */ 442 */
374void 443static void
375master_counters_cb (void *cls, int result, uint64_t max_fragment_id, 444master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
376 uint64_t max_message_id, uint64_t max_group_generation, 445 uint64_t max_message_id, uint64_t max_group_generation,
377 uint64_t max_state_message_id) 446 uint64_t max_state_message_id)
@@ -513,20 +582,13 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
513static void 582static void
514send_transmit_ack (struct Channel *ch) 583send_transmit_ack (struct Channel *ch)
515{ 584{
516 struct TransmitAck *res = GNUNET_malloc (sizeof (*res)); 585 struct GNUNET_MessageHeader res;
517 res->header.size = htons (sizeof (*res)); 586 res.size = htons (sizeof (res));
518 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); 587 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
519
520 res->buf_avail = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
521 struct TransmitMessage *tmit_msg = ch->tmit_tail;
522 if (NULL != tmit_msg && GNUNET_PSYC_DATA_CONT == tmit_msg->status)
523 res->buf_avail -= tmit_msg->size;
524 res->buf_avail = htons (res->buf_avail);
525 588
526 GNUNET_SERVER_notification_context_add (nc, ch->client); 589 GNUNET_SERVER_notification_context_add (nc, ch->client);
527 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, 590 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
528 GNUNET_NO); 591 GNUNET_NO);
529 GNUNET_free (res);
530} 592}
531 593
532 594
@@ -555,7 +617,7 @@ transmit_notify (void *cls, size_t *data_size, void *data)
555 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); 617 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
556 GNUNET_free (msg); 618 GNUNET_free (msg);
557 619
558 int ret = (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; 620 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
559 621
560 if (0 == ch->tmit_task) 622 if (0 == ch->tmit_task)
561 { 623 {
@@ -638,6 +700,7 @@ transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay)
638 : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch); 700 : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch);
639} 701}
640 702
703
641/** 704/**
642 * Queue incoming message parts from a client for transmission, and send them to 705 * Queue incoming message parts from a client for transmission, and send them to
643 * the multicast group when the buffer is full or reached the end of message. 706 * the multicast group when the buffer is full or reached the end of message.
@@ -659,21 +722,20 @@ queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
659 "for transmission to multicast.\n", 722 "for transmission to multicast.\n",
660 ntohs (msg->type), size); 723 ntohs (msg->type), size);
661 724
662 if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size) 725 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size)
663 return GNUNET_SYSERR; 726 return GNUNET_SYSERR;
664 727
665 if (NULL == tmit_msg 728 if (NULL == tmit_msg
666 || tmit_msg->status != GNUNET_PSYC_DATA_CONT 729 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit_msg->size + size)
667 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < tmit_msg->size + size)
668 { 730 {
669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 731 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
670 "Appending message qto new buffer.\n"); 732 "Appending message to new buffer.\n");
671 /* Start filling up new buffer */ 733 /* Start filling up new buffer */
672 tmit_msg = GNUNET_new (struct TransmitMessage); 734 tmit_msg = GNUNET_new (struct TransmitMessage);
673 tmit_msg->buf = GNUNET_malloc (size); 735 tmit_msg->buf = GNUNET_malloc (size);
674 memcpy (tmit_msg->buf, msg, size); 736 memcpy (tmit_msg->buf, msg, size);
675 tmit_msg->size = size; 737 tmit_msg->size = size;
676 tmit_msg->status = ch->tmit_status; 738 tmit_msg->state = ch->tmit_state;
677 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); 739 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
678 } 740 }
679 else 741 else
@@ -684,23 +746,41 @@ queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
684 tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size); 746 tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size);
685 memcpy (tmit_msg->buf + tmit_msg->size, msg, size); 747 memcpy (tmit_msg->buf + tmit_msg->size, msg, size);
686 tmit_msg->size += size; 748 tmit_msg->size += size;
687 tmit_msg->status = ch->tmit_status; 749 tmit_msg->state = ch->tmit_state;
688 } 750 }
689 751
690 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size); 752 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size);
691 753
692 /* Wait a bit for the remaining message parts from the client 754 /* Wait a bit for the remaining message parts from the client
693 if there's still some space left in the buffer. */ 755 if there's still some space left in the buffer. */
694 if (GNUNET_PSYC_DATA_CONT == tmit_msg->status 756 if (tmit_msg->state < MSG_STATE_END
695 && (tmit_msg->size + sizeof (struct GNUNET_PSYC_MessageData) 757 && (tmit_msg->size + sizeof (struct GNUNET_MessageHeader)
696 < GNUNET_MULTICAST_FRAGMENT_MAX_SIZE)) 758 < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD))
759 {
697 tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2); 760 tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2);
761 }
762 else
763 {
764 send_transmit_ack (ch);
765 }
698 766
699 transmit_message (ch, tmit_delay); 767 transmit_message (ch, tmit_delay);
700 768
701 return GNUNET_OK; 769 return GNUNET_OK;
702} 770}
703 771
772
773static void
774transmit_error (struct Channel *ch)
775{
776 struct GNUNET_MessageHeader *msg = GNUNET_malloc (sizeof (*msg));
777 msg->size = ntohs (sizeof (*msg));
778 msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
779 queue_message (ch, msg);
780
781 GNUNET_SERVER_client_disconnect (ch->client);
782}
783
704/** 784/**
705 * Incoming method from a client. 785 * Incoming method from a client.
706 */ 786 */
@@ -708,28 +788,21 @@ static void
708handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, 788handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
709 const struct GNUNET_MessageHeader *msg) 789 const struct GNUNET_MessageHeader *msg)
710{ 790{
711 const struct GNUNET_PSYC_MessageMethod *meth 791 /* const struct GNUNET_PSYC_MessageMethod *meth
712 = (const struct GNUNET_PSYC_MessageMethod *) msg; 792 = (const struct GNUNET_PSYC_MessageMethod *) msg; */
713 struct Channel *ch 793 struct Channel *ch
714 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 794 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
715 GNUNET_assert (NULL != ch); 795 GNUNET_assert (NULL != ch);
716 796
717 if (GNUNET_NO != ch->in_transmit) 797 if (MSG_STATE_START != ch->tmit_state)
718 { 798 {
719 /* FIXME: already transmitting a message, send back error message. */ 799 transmit_error (ch);
720 return; 800 return;
721 } 801 }
722 802 ch->tmit_state = MSG_STATE_METHOD;
723 ch->in_transmit = GNUNET_YES;
724 ch->tmit_mod_recvd = 0;
725 ch->tmit_mod_count = ntohl (meth->mod_count);
726 ch->tmit_status = GNUNET_PSYC_DATA_CONT;
727 803
728 queue_message (ch, msg); 804 queue_message (ch, msg);
729 805 send_transmit_ack (ch);
730 if (0 == ch->tmit_mod_count)
731 send_transmit_ack (ch);
732
733 GNUNET_SERVER_receive_done (client, GNUNET_OK); 806 GNUNET_SERVER_receive_done (client, GNUNET_OK);
734}; 807};
735 808
@@ -741,20 +814,52 @@ static void
741handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, 814handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
742 const struct GNUNET_MessageHeader *msg) 815 const struct GNUNET_MessageHeader *msg)
743{ 816{
744 /*
745 const struct GNUNET_PSYC_MessageModifier *mod 817 const struct GNUNET_PSYC_MessageModifier *mod
746 = (const struct GNUNET_PSYC_MessageModifier *) msg; 818 = (const struct GNUNET_PSYC_MessageModifier *) msg;
747 */ 819
748 struct Channel *ch 820 struct Channel *ch
749 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 821 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
750 GNUNET_assert (NULL != ch); 822 GNUNET_assert (NULL != ch);
751 823
752 ch->tmit_mod_recvd++; 824 if (MSG_STATE_METHOD != ch->tmit_state
825 || MSG_STATE_MODIFIER != ch->tmit_state
826 || MSG_STATE_MOD_CONT != ch->tmit_state
827 || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size)
828 {
829 transmit_error (ch);
830 return;
831 }
832 ch->tmit_mod_value_size_expected = ntohl (mod->value_size);
833 ch->tmit_mod_value_size = ntohs (msg->size) - ntohs(mod->name_size) - 1;
834
753 queue_message (ch, msg); 835 queue_message (ch, msg);
836 GNUNET_SERVER_receive_done (client, GNUNET_OK);
837};
754 838
755 if (ch->tmit_mod_recvd == ch->tmit_mod_count)
756 send_transmit_ack (ch);
757 839
840/**
841 * Incoming modifier from a client.
842 */
843static void
844handle_transmit_mod_cont (void *cls, struct GNUNET_SERVER_Client *client,
845 const struct GNUNET_MessageHeader *msg)
846{
847 struct Channel *ch
848 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
849 GNUNET_assert (NULL != ch);
850
851 ch->tmit_mod_value_size += ntohs (msg->size);
852
853 if (MSG_STATE_MODIFIER != ch->tmit_state
854 || MSG_STATE_MOD_CONT != ch->tmit_state
855 || ch->tmit_mod_value_size_expected < ch->tmit_mod_value_size)
856 {
857 transmit_error (ch);
858 return;
859 }
860 ch->tmit_state = MSG_STATE_MOD_CONT;
861
862 queue_message (ch, msg);
758 GNUNET_SERVER_receive_done (client, GNUNET_OK); 863 GNUNET_SERVER_receive_done (client, GNUNET_OK);
759}; 864};
760 865
@@ -766,18 +871,25 @@ static void
766handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, 871handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
767 const struct GNUNET_MessageHeader *msg) 872 const struct GNUNET_MessageHeader *msg)
768{ 873{
769 const struct GNUNET_PSYC_MessageData *data
770 = (const struct GNUNET_PSYC_MessageData *) msg;
771 struct Channel *ch 874 struct Channel *ch
772 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 875 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
773 GNUNET_assert (NULL != ch); 876 GNUNET_assert (NULL != ch);
774 877
775 ch->tmit_status = ntohs (data->status); 878 if (MSG_STATE_METHOD != ch->tmit_state
879 || MSG_STATE_MODIFIER != ch->tmit_state
880 || MSG_STATE_MOD_CONT != ch->tmit_state
881 || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size)
882 {
883 transmit_error (ch);
884 return;
885 }
886 ch->tmit_state = MSG_STATE_DATA;
887
776 queue_message (ch, msg); 888 queue_message (ch, msg);
777 send_transmit_ack (ch); 889 send_transmit_ack (ch);
778 890
779 if (GNUNET_PSYC_DATA_CONT != ch->tmit_status) 891 if (MSG_STATE_END <= ch->tmit_state)
780 ch->in_transmit = GNUNET_NO; 892 ch->tmit_state = MSG_STATE_START;
781 893
782 GNUNET_SERVER_receive_done (client, GNUNET_OK); 894 GNUNET_SERVER_receive_done (client, GNUNET_OK);
783}; 895};
@@ -800,16 +912,21 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
800 912
801 { &handle_slave_join, NULL, 913 { &handle_slave_join, NULL,
802 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, 914 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
803 915#if TODO
916 { &handle_psyc_message, NULL,
917 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
918#endif
804 { &handle_transmit_method, NULL, 919 { &handle_transmit_method, NULL,
805 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 }, 920 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
806 921
807 { &handle_transmit_modifier, NULL, 922 { &handle_transmit_modifier, NULL,
808 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 }, 923 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
809 924
925 { &handle_transmit_mod_cont, NULL,
926 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT, 0 },
927
810 { &handle_transmit_data, NULL, 928 { &handle_transmit_data, NULL,
811 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 }, 929 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
812
813 { NULL, NULL, 0, 0 } 930 { NULL, NULL, 0, 0 }
814 }; 931 };
815 932
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 85d10858e..90c07480a 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -29,6 +29,20 @@
29 29
30#include "gnunet_common.h" 30#include "gnunet_common.h"
31 31
32
33enum MessageState
34{
35 MSG_STATE_START = 0,
36 MSG_STATE_HEADER = 1,
37 MSG_STATE_METHOD = 2,
38 MSG_STATE_MODIFIER = 3,
39 MSG_STATE_MOD_CONT = 4,
40 MSG_STATE_DATA = 5,
41 MSG_STATE_END = 6,
42 MSG_STATE_CANCEL = 7,
43};
44
45
32GNUNET_NETWORK_STRUCT_BEGIN 46GNUNET_NETWORK_STRUCT_BEGIN
33 47
34/**** service -> library ****/ 48/**** service -> library ****/
@@ -53,8 +67,7 @@ struct OperationResult
53 */ 67 */
54 int64_t result_code GNUNET_PACKED; 68 int64_t result_code GNUNET_PACKED;
55 69
56 /* followed by 0-terminated error message (on error) */ 70 /* followed by NUL-terminated error message (on error) */
57
58}; 71};
59 72
60 73
@@ -74,6 +87,7 @@ struct CountersResult
74}; 87};
75 88
76 89
90#if REMOVE
77/** 91/**
78 * Transmit acknowledgment. 92 * Transmit acknowledgment.
79 * 93 *
@@ -95,6 +109,7 @@ struct TransmitAck
95 */ 109 */
96 uint16_t buf_avail; 110 uint16_t buf_avail;
97}; 111};
112#endif
98 113
99 114
100/**** library -> service ****/ 115/**** library -> service ****/
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 290f3e375..a5a01fa92 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -30,15 +30,17 @@
30 * @author Gabor X Toth 30 * @author Gabor X Toth
31 */ 31 */
32 32
33#include <inttypes.h>
34
33#include "platform.h" 35#include "platform.h"
34#include "gnunet_util_lib.h" 36#include "gnunet_util_lib.h"
35#include "gnunet_env_lib.h" 37#include "gnunet_env_lib.h"
38#include "gnunet_multicast_service.h"
36#include "gnunet_psyc_service.h" 39#include "gnunet_psyc_service.h"
37#include "psyc.h" 40#include "psyc.h"
38 41
39#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) 42#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
40 43
41
42struct OperationHandle 44struct OperationHandle
43{ 45{
44 struct OperationHandle *prev; 46 struct OperationHandle *prev;
@@ -91,31 +93,55 @@ struct GNUNET_PSYC_Channel
91 */ 93 */
92 struct GNUNET_TIME_Relative reconnect_delay; 94 struct GNUNET_TIME_Relative reconnect_delay;
93 95
94 GNUNET_PSYC_Method method_cb; 96 /**
97 * Message part callback.
98 */
99 GNUNET_PSYC_MessageCallback message_cb;
100
101 /**
102 * Message part callback for historic message.
103 */
104 GNUNET_PSYC_MessageCallback hist_message_cb;
95 105
106 /**
107 * Join handler callback.
108 */
96 GNUNET_PSYC_JoinCallback join_cb; 109 GNUNET_PSYC_JoinCallback join_cb;
97 110
111 /**
112 * Closure for @a message_cb and @a join_cb.
113 */
98 void *cb_cls; 114 void *cb_cls;
99 115
100 /** 116 /**
101 * Are we polling for incoming messages right now? 117 * ID of the message being received from the PSYC service.
102 */ 118 */
103 int in_receive; 119 uint64_t recv_message_id;
104 120
105 /** 121 /**
106 * Are we currently transmitting a message? 122 * State of the currently being received message from the PSYC service.
107 */ 123 */
108 int in_transmit; 124 enum MessageState recv_state;
109 125
110 /** 126 /**
111 * Is this a master or slave channel? 127 * Flags for the currently being received message from the PSYC service.
128 */
129 enum GNUNET_PSYC_MessageFlags recv_flags;
130
131 /**
132 * Expected value size for the modifier being received from the PSYC service.
133 */
134 uint32_t recv_mod_value_size_expected;
135
136 /**
137 * Actual value size for the modifier being received from the PSYC service.
112 */ 138 */
113 int is_master; 139 uint32_t recv_mod_value_size;
114 140
115 /** 141 /**
116 * Buffer space available for transmitting the next data fragment. 142 * Buffer space available for transmitting the next data fragment.
117 */ 143 */
118 uint16_t tmit_buf_avail; 144 uint16_t tmit_size; // FIXME
119 145
120 /** 146 /**
121 * Is transmission paused? 147 * Is transmission paused?
@@ -125,7 +151,22 @@ struct GNUNET_PSYC_Channel
125 /** 151 /**
126 * Are we still waiting for a PSYC_TRANSMIT_ACK? 152 * Are we still waiting for a PSYC_TRANSMIT_ACK?
127 */ 153 */
128 uint8_t tmit_ack_pending; 154 uint8_t tmit_ack_pending; // FIXME
155
156 /**
157 * Are we polling for incoming messages right now?
158 */
159 uint8_t in_receive;
160
161 /**
162 * Are we currently transmitting a message?
163 */
164 uint8_t in_transmit;
165
166 /**
167 * Is this a master or slave channel?
168 */
169 uint8_t is_master;
129}; 170};
130 171
131 172
@@ -135,9 +176,10 @@ struct GNUNET_PSYC_Channel
135struct GNUNET_PSYC_MasterTransmitHandle 176struct GNUNET_PSYC_MasterTransmitHandle
136{ 177{
137 struct GNUNET_PSYC_Master *master; 178 struct GNUNET_PSYC_Master *master;
138 GNUNET_PSYC_MasterTransmitNotify notify; 179 GNUNET_PSYC_MasterTransmitNotify notify_mod;
180 GNUNET_PSYC_MasterTransmitNotify notify_data;
139 void *notify_cls; 181 void *notify_cls;
140 enum GNUNET_PSYC_DataStatus status; 182 enum MessageState state;
141}; 183};
142 184
143 185
@@ -254,52 +296,383 @@ transmit_next (struct GNUNET_PSYC_Channel *ch);
254 296
255 297
256/** 298/**
257 * Request data from client to transmit. 299 * Reset data stored related to the last received message.
300 */
301static void
302recv_reset (struct GNUNET_PSYC_Channel *ch)
303{
304 ch->recv_state = MSG_STATE_START;
305 ch->recv_flags = 0;
306 ch->recv_message_id = 0;
307 ch->recv_mod_value_size =0;
308 ch->recv_mod_value_size_expected = 0;
309}
310
311
312static void
313recv_error (struct GNUNET_PSYC_Channel *ch)
314{
315 recv_reset (ch);
316
317 GNUNET_PSYC_MessageCallback message_cb
318 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
319 ? ch->hist_message_cb
320 : ch->message_cb;
321
322 if (NULL != message_cb)
323 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
324}
325
326/**
327 * Request a modifier from a client to transmit.
258 * 328 *
259 * @param mst Master handle. 329 * @param mst Master handle.
260 */ 330 */
261static void 331static void
262master_transmit_data (struct GNUNET_PSYC_Master *mst) 332master_transmit_mod (struct GNUNET_PSYC_Master *mst)
263{ 333{
264 struct GNUNET_PSYC_Channel *ch = &mst->ch; 334 struct GNUNET_PSYC_Channel *ch = &mst->ch;
265 size_t data_size = ch->tmit_buf_avail; 335 uint16_t max_data_size
266 struct GNUNET_PSYC_MessageData *pdata; 336 = ch->tmit_size > sizeof (struct GNUNET_MessageHeader)
267 struct OperationHandle *op 337 ? GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - ch->tmit_size
268 = GNUNET_malloc (sizeof (*op) + sizeof (*pdata) + data_size); 338 : GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD - ch->tmit_size;
269 pdata = (struct GNUNET_PSYC_MessageData *) &op[1]; 339 uint16_t data_size = max_data_size;
270 op->msg = (struct GNUNET_MessageHeader *) pdata;
271 pdata->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
272 340
273 switch (mst->tmit->notify (mst->tmit->notify_cls, &data_size, &pdata[1])) 341 struct GNUNET_MessageHeader *msg;
342 struct OperationHandle *op
343 = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size);
344 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
345 msg->type
346 = MSG_STATE_MODIFIER == mst->tmit->state
347 ? htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER)
348 : htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
349
350 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
351 &data_size, &msg[1]);
352 switch (notify_ret)
274 { 353 {
275 case GNUNET_NO: 354 case GNUNET_NO:
276 mst->tmit->status = GNUNET_PSYC_DATA_CONT; 355 if (0 != data_size)
356 mst->tmit->state = MSG_STATE_MOD_CONT;
277 break; 357 break;
278 358
279 case GNUNET_YES: 359 case GNUNET_YES:
280 mst->tmit->status = GNUNET_PSYC_DATA_END; 360 mst->tmit->state = (0 == data_size) ? MSG_STATE_DATA : MSG_STATE_MODIFIER;
281 break; 361 break;
282 362
283 default: 363 default:
284 mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; 364 LOG (GNUNET_ERROR_TYPE_ERROR,
285 data_size = 0; 365 "MasterTransmitNotify returned error when requesting a modifier.\n");
286 LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error.\n"); 366
367 mst->tmit->state = MSG_STATE_START;
368 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
369 msg->size = htons (sizeof (*msg));
370
371 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
372 transmit_next (ch);
373 return;
287 } 374 }
288 375
289 if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) 376 if ((GNUNET_NO == notify_ret && 0 == data_size))
290 { 377 {
291 /* Transmission paused, nothing to send. */ 378 /* Transmission paused, nothing to send. */
292 ch->tmit_paused = GNUNET_YES; 379 ch->tmit_paused = GNUNET_YES;
293 GNUNET_free (op); 380 GNUNET_free (op);
294 } 381 }
295 else 382
383 if (0 < data_size)
384 {
385 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
386 msg->size = htons (sizeof (*msg) + data_size);
387 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
388 }
389
390 /* End of message. */
391 if (GNUNET_YES == notify_ret)
392 {
393 op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
394 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
395 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
396 msg->size = htons (sizeof (*msg));
397 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
398 }
399
400 transmit_next (ch);
401}
402
403
404/**
405 * Request data from a client to transmit.
406 *
407 * @param mst Master handle.
408 */
409static void
410master_transmit_data (struct GNUNET_PSYC_Master *mst)
411{
412 struct GNUNET_PSYC_Channel *ch = &mst->ch;
413 struct GNUNET_MessageHeader *msg;
414 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
415 struct OperationHandle *op
416 = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size);
417 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
418 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
419
420 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
421 &data_size, &msg[1]);
422 switch (notify_ret)
296 { 423 {
297 GNUNET_assert (data_size <= ch->tmit_buf_avail); 424 case GNUNET_NO:
298 pdata->header.size = htons (sizeof (*pdata) + data_size); 425 if (0 == data_size)
299 pdata->status = htons (mst->tmit->status); 426 {
427 /* Transmission paused, nothing to send. */
428 ch->tmit_paused = GNUNET_YES;
429 GNUNET_free (op);
430 }
431 break;
432
433 case GNUNET_YES:
434 mst->tmit->state = MSG_STATE_START;
435 break;
436
437 default:
438 LOG (GNUNET_ERROR_TYPE_ERROR,
439 "MasterTransmitNotify returned error when requesting data.\n");
440
441 mst->tmit->state = MSG_STATE_START;
442 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
443 msg->size = htons (sizeof (*msg));
444
300 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 445 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
301 ch->tmit_ack_pending = GNUNET_YES;
302 transmit_next (ch); 446 transmit_next (ch);
447 return;
448 }
449
450 if (0 < data_size)
451 {
452 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
453 msg->size = htons (sizeof (*msg) + data_size);
454 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
455 }
456
457 /* End of message. */
458 if (GNUNET_YES == notify_ret)
459 {
460 op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
461 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
462 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
463 msg->size = htons (sizeof (*msg));
464 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
465 }
466
467 transmit_next (ch);
468}
469
470
471/**
472 * Handle incoming message from the PSYC service.
473 *
474 * @param ch The channel the message is sent to.
475 * @param pmsg The message.
476 */
477static void
478handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
479 const struct GNUNET_PSYC_MessageHeader *pmsg)
480{
481 const struct GNUNET_MessageHeader *msg;
482 uint16_t msize = ntohs (pmsg->header.size);
483 uint16_t pos = 0;
484 uint16_t size = 0;
485 uint16_t type, size_eq, size_min;
486
487 if (MSG_STATE_START == ch->recv_state)
488 {
489 ch->recv_message_id = GNUNET_ntohll (pmsg->message_id);
490 ch->recv_flags = ntohl (pmsg->flags);
491 }
492 else if (GNUNET_ntohll (pmsg->message_id) != ch->recv_message_id)
493 {
494 LOG (GNUNET_ERROR_TYPE_WARNING,
495 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
496 GNUNET_ntohll (pmsg->message_id), ch->recv_message_id);
497 GNUNET_break_op (0);
498 recv_error (ch);
499 }
500 else if (ntohl (pmsg->flags) != ch->recv_flags)
501 {
502 LOG (GNUNET_ERROR_TYPE_WARNING,
503 "Unexpected message flags. Got: %lu, expected: %lu\n",
504 ntohl (pmsg->flags), ch->recv_flags);
505 GNUNET_break_op (0);
506 recv_error (ch);
507 }
508
509 for (pos = 0; sizeof (*pmsg) + pos < msize; pos += size)
510 {
511 msg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
512 size = ntohs (msg->size);
513 type = ntohs (msg->type);
514 size_eq = size_min = 0;
515
516 if (msize < sizeof (*pmsg) + pos + size)
517 {
518 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
519 "Discarding message of type %u with invalid size. "
520 "(%u < %u + %u + %u)\n", ntohs (msg->type),
521 msize, sizeof (*msg), pos, size);
522 break;
523 }
524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525 "Received message part of type %u and size %u from PSYC.\n",
526 ntohs (msg->type), size);
527
528
529 switch (type)
530 {
531 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
532 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
533 break;
534 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
535 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
536 break;
537 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
538 size_min = sizeof (struct GNUNET_MessageHeader);
539 break;
540 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
541 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
542 size_eq = sizeof (struct GNUNET_MessageHeader);
543 break;
544 }
545
546 if (! ((0 < size_eq && size == size_eq)
547 || (0 < size_min && size_min <= size)))
548 {
549 GNUNET_break (0);
550 reschedule_connect (ch);
551 return;
552 }
553
554 switch (type)
555 {
556 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
557 {
558 struct GNUNET_PSYC_MessageMethod *meth
559 = (struct GNUNET_PSYC_MessageMethod *) msg;
560
561 if (MSG_STATE_HEADER != ch->recv_state)
562 {
563 LOG (GNUNET_ERROR_TYPE_WARNING,
564 "Discarding out of order message method.\n");
565 /* It is normal to receive an incomplete message right after connecting,
566 * but should not happen later.
567 * FIXME: add a check for this condition.
568 */
569 GNUNET_break_op (0);
570 recv_error (ch);
571 break;
572 }
573
574 if ('\0' != (char *) meth + msg->size - 1)
575 {
576 LOG (GNUNET_ERROR_TYPE_WARNING,
577 "Discarding message with malformed method. "
578 "Message ID: %" PRIu64 "\n", ch->recv_message_id);
579 GNUNET_break_op (0);
580 recv_error (ch);
581 break;
582 }
583 GNUNET_PSYC_MessageCallback message_cb
584 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
585 ? ch->hist_message_cb
586 : ch->message_cb;
587
588 if (NULL != message_cb)
589 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
590
591 ch->recv_state = MSG_STATE_METHOD;
592 break;
593 }
594 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
595 {
596 if (MSG_STATE_MODIFIER != ch->recv_state)
597 {
598 LOG (GNUNET_ERROR_TYPE_WARNING,
599 "Discarding out of order message modifier.\n");
600 GNUNET_break_op (0);
601 recv_error (ch);
602 break;
603 }
604
605 struct GNUNET_PSYC_MessageModifier *mod
606 = (struct GNUNET_PSYC_MessageModifier *) msg;
607
608 uint16_t name_size = ntohs (mod->name_size);
609 ch->recv_mod_value_size_expected = ntohs (mod->value_size);
610 ch->recv_mod_value_size = size - sizeof (*mod) - name_size - 1;
611
612 if (size < sizeof (*mod) + name_size + 1
613 || '\0' != (char *) &mod[1] + mod->name_size
614 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
615 {
616 LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n");
617 GNUNET_break_op (0);
618 break;
619 }
620
621 ch->recv_state = MSG_STATE_MODIFIER;
622
623 GNUNET_PSYC_MessageCallback message_cb
624 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
625 ? ch->hist_message_cb
626 : ch->message_cb;
627
628 if (NULL != message_cb)
629 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
630
631 break;
632 }
633 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
634 {
635 ch->recv_mod_value_size += size - sizeof (*msg);
636
637 if (MSG_STATE_MODIFIER != ch->recv_state
638 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
639 {
640 LOG (GNUNET_ERROR_TYPE_WARNING,
641 "Discarding out of order message modifier continuation.\n");
642 GNUNET_break_op (0);
643 recv_reset (ch);
644 break;
645 }
646
647 GNUNET_PSYC_MessageCallback message_cb
648 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
649 ? ch->hist_message_cb
650 : ch->message_cb;
651
652 if (NULL != message_cb)
653 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
654 break;
655 }
656 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
657 {
658 if (ch->recv_state < MSG_STATE_METHOD
659 || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
660 {
661 LOG (GNUNET_ERROR_TYPE_WARNING,
662 "Discarding out of order message data fragment.\n");
663 GNUNET_break_op (0);
664 recv_reset (ch);
665 break;
666 }
667
668 ch->recv_state = MSG_STATE_DATA;
669 break;
670 }
671 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
672 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
673 recv_reset (ch);
674 break;
675 }
303 } 676 }
304} 677}
305 678
@@ -319,11 +692,10 @@ message_handler (void *cls,
319 struct GNUNET_PSYC_Channel *ch = cls; 692 struct GNUNET_PSYC_Channel *ch = cls;
320 struct GNUNET_PSYC_Master *mst = cls; 693 struct GNUNET_PSYC_Master *mst = cls;
321 struct GNUNET_PSYC_Slave *slv = cls; 694 struct GNUNET_PSYC_Slave *slv = cls;
322 struct CountersResult *cres;
323 struct TransmitAck *tack;
324 695
325 if (NULL == msg) 696 if (NULL == msg)
326 { 697 {
698 GNUNET_break (0);
327 reschedule_connect (ch); 699 reschedule_connect (ch);
328 return; 700 return;
329 } 701 }
@@ -342,8 +714,8 @@ message_handler (void *cls,
342 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: 714 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
343 size_eq = sizeof (struct CountersResult); 715 size_eq = sizeof (struct CountersResult);
344 break; 716 break;
345 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: 717 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
346 size_eq = sizeof (struct TransmitAck); 718 size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
347 break; 719 break;
348 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 720 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
349 size_min = sizeof (struct GNUNET_PSYC_MessageMethod); 721 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
@@ -352,11 +724,13 @@ message_handler (void *cls,
352 size_min = sizeof (struct GNUNET_PSYC_MessageModifier); 724 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
353 break; 725 break;
354 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: 726 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
355 size_min = sizeof (struct GNUNET_PSYC_MessageData); 727 size_min = sizeof (struct GNUNET_MessageHeader);
728 break;
729 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
730 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
731 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
732 size_eq = sizeof (struct GNUNET_MessageHeader);
356 break; 733 break;
357 default:
358 GNUNET_break_op (0);
359 return;
360 } 734 }
361 735
362 if (! ((0 < size_eq && size == size_eq) 736 if (! ((0 < size_eq && size == size_eq)
@@ -370,38 +744,63 @@ message_handler (void *cls,
370 switch (type) 744 switch (type)
371 { 745 {
372 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: 746 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
373 cres = (struct CountersResult *) msg; 747 {
748 struct CountersResult *cres = (struct CountersResult *) msg;
374 mst->max_message_id = GNUNET_ntohll (cres->max_message_id); 749 mst->max_message_id = GNUNET_ntohll (cres->max_message_id);
375 if (NULL != mst->start_cb) 750 if (NULL != mst->start_cb)
376 mst->start_cb (ch->cb_cls, mst->max_message_id); 751 mst->start_cb (ch->cb_cls, mst->max_message_id);
377 break; 752 break;
378 753 }
379 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: 754 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
380 cres = (struct CountersResult *) msg; 755 {
381#if TODO 756#if TODO
757 struct CountersResult *cres = (struct CountersResult *) msg;
382 slv->max_message_id = GNUNET_ntohll (cres->max_message_id); 758 slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
383 if (NULL != slv->join_ack_cb) 759 if (NULL != slv->join_ack_cb)
384 mst->join_ack_cb (ch->cb_cls, mst->max_message_id); 760 mst->join_ack_cb (ch->cb_cls, mst->max_message_id);
385#endif 761#endif
386 break; 762 break;
387 763 }
388 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: 764 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
389 tack = (struct TransmitAck *) msg; 765 {
766 ch->tmit_ack_pending = GNUNET_NO;
767
390 if (ch->is_master) 768 if (ch->is_master)
391 { 769 {
392 GNUNET_assert (NULL != mst->tmit); 770 GNUNET_assert (NULL != mst->tmit);
393 if (GNUNET_PSYC_DATA_CONT != mst->tmit->status 771 switch (mst->tmit->state)
394 || NULL == mst->tmit->notify)
395 {
396 GNUNET_free (mst->tmit);
397 mst->tmit = NULL;
398 }
399 else
400 { 772 {
401 ch->tmit_buf_avail = ntohs (tack->buf_avail); 773 case MSG_STATE_MODIFIER:
402 ch->tmit_ack_pending = GNUNET_NO; 774 if (GNUNET_NO == ch->tmit_paused)
775 master_transmit_mod (mst);
776 break;
777
778 case MSG_STATE_MOD_CONT:
779 if (GNUNET_NO == ch->tmit_paused)
780 master_transmit_mod (mst);
781 break;
782
783 case MSG_STATE_DATA:
403 if (GNUNET_NO == ch->tmit_paused) 784 if (GNUNET_NO == ch->tmit_paused)
404 master_transmit_data (mst); 785 master_transmit_data (mst);
786 break;
787
788 case MSG_STATE_END:
789 case MSG_STATE_CANCEL:
790 if (NULL != mst->tmit)
791 {
792 GNUNET_free (mst->tmit);
793 mst->tmit = NULL;
794 }
795 else
796 {
797 LOG (GNUNET_ERROR_TYPE_WARNING,
798 "Ignoring transmit ack, there's no transmission going on.\n");
799 }
800 break;
801 default:
802 LOG (GNUNET_ERROR_TYPE_WARNING,
803 "Ignoring unexpected transmit ack.\n");
405 } 804 }
406 } 805 }
407 else 806 else
@@ -409,17 +808,10 @@ message_handler (void *cls,
409 /* TODO: slave */ 808 /* TODO: slave */
410 } 809 }
411 break; 810 break;
811 }
412 812
413 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 813 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
414 814 handle_psyc_message(ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
415 break;
416
417 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
418
419 break;
420
421 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
422
423 break; 815 break;
424 } 816 }
425 817
@@ -506,6 +898,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
506{ 898{
507 struct GNUNET_PSYC_Channel *ch = cls; 899 struct GNUNET_PSYC_Channel *ch = cls;
508 900
901 recv_reset (ch);
509 ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; 902 ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
510 LOG (GNUNET_ERROR_TYPE_DEBUG, 903 LOG (GNUNET_ERROR_TYPE_DEBUG,
511 "Connecting to PSYC service.\n"); 904 "Connecting to PSYC service.\n");
@@ -588,7 +981,7 @@ disconnect (void *c)
588 * one in the future. 981 * one in the future.
589 * @param policy Channel policy specifying join and history restrictions. 982 * @param policy Channel policy specifying join and history restrictions.
590 * Used to automate join decisions. 983 * Used to automate join decisions.
591 * @param method Function to invoke on messages received from slaves. 984 * @param message_cb Function to invoke on message parts received from slaves.
592 * @param join_cb Function to invoke when a peer wants to join. 985 * @param join_cb Function to invoke when a peer wants to join.
593 * @param master_started_cb Function to invoke after the channel master started. 986 * @param master_started_cb Function to invoke after the channel master started.
594 * @param cls Closure for @a master_started_cb and @a join_cb. 987 * @param cls Closure for @a master_started_cb and @a join_cb.
@@ -598,7 +991,7 @@ struct GNUNET_PSYC_Master *
598GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, 991GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
599 const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key, 992 const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key,
600 enum GNUNET_PSYC_Policy policy, 993 enum GNUNET_PSYC_Policy policy,
601 GNUNET_PSYC_Method method, 994 GNUNET_PSYC_MessageCallback message_cb,
602 GNUNET_PSYC_JoinCallback join_cb, 995 GNUNET_PSYC_JoinCallback join_cb,
603 GNUNET_PSYC_MasterStartCallback master_started_cb, 996 GNUNET_PSYC_MasterStartCallback master_started_cb,
604 void *cls) 997 void *cls)
@@ -618,7 +1011,7 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
618 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 1011 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
619 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); 1012 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
620 1013
621 ch->method_cb = method; 1014 ch->message_cb = message_cb;
622 ch->join_cb = join_cb; 1015 ch->join_cb = join_cb;
623 ch->cb_cls = cls; 1016 ch->cb_cls = cls;
624 mst->start_cb = master_started_cb; 1017 mst->start_cb = master_started_cb;
@@ -705,19 +1098,17 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
705 * 1098 *
706 * @param master Handle to the PSYC channel. 1099 * @param master Handle to the PSYC channel.
707 * @param method_name Which method should be invoked. 1100 * @param method_name Which method should be invoked.
708 * @param env Environment containing state operations and transient variables 1101 * @param notify_mod Function to call to obtain modifiers.
709 * for the message, or NULL. 1102 * @param notify_data Function to call to obtain fragments of the data.
710 * @param notify Function to call to obtain the arguments. 1103 * @param notify_cls Closure for @a notify_mod and @a notify_data.
711 * @param notify_cls Closure for @a notify.
712 * @param flags Flags for the message being transmitted. 1104 * @param flags Flags for the message being transmitted.
713 * @return Transmission handle, NULL on error (i.e. more than one request 1105 * @return Transmission handle, NULL on error (i.e. more than one request queued).
714 * queued).
715 */ 1106 */
716struct GNUNET_PSYC_MasterTransmitHandle * 1107struct GNUNET_PSYC_MasterTransmitHandle *
717GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, 1108GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
718 const char *method_name, 1109 const char *method_name,
719 const struct GNUNET_ENV_Environment *env, 1110 GNUNET_PSYC_MasterTransmitNotify notify_mod,
720 GNUNET_PSYC_MasterTransmitNotify notify, 1111 GNUNET_PSYC_MasterTransmitNotify notify_data,
721 void *notify_cls, 1112 void *notify_cls,
722 enum GNUNET_PSYC_MasterTransmitFlags flags) 1113 enum GNUNET_PSYC_MasterTransmitFlags flags)
723{ 1114{
@@ -737,18 +1128,17 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
737 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); 1128 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
738 pmeth->header.size = htons (sizeof (*pmeth) + size); 1129 pmeth->header.size = htons (sizeof (*pmeth) + size);
739 pmeth->flags = htonl (flags); 1130 pmeth->flags = htonl (flags);
740 pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
741 memcpy (&pmeth[1], method_name, size); 1131 memcpy (&pmeth[1], method_name, size);
742 1132
743 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 1133 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
744 GNUNET_ENV_environment_iterate (env, send_modifier, master);
745 transmit_next (ch); 1134 transmit_next (ch);
746 1135
747 master->tmit = GNUNET_malloc (sizeof (*master->tmit)); 1136 master->tmit = GNUNET_malloc (sizeof (*master->tmit));
748 master->tmit->master = master; 1137 master->tmit->master = master;
749 master->tmit->notify = notify; 1138 master->tmit->notify_mod = notify_mod;
1139 master->tmit->notify_data = notify_data;
750 master->tmit->notify_cls = notify_cls; 1140 master->tmit->notify_cls = notify_cls;
751 master->tmit->status = GNUNET_PSYC_DATA_CONT; 1141 master->tmit->state = MSG_STATE_START; // FIXME
752 return master->tmit; 1142 return master->tmit;
753} 1143}
754 1144
@@ -804,12 +1194,13 @@ GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
804 * @param relay_count Number of peers in the @a relays array. 1194 * @param relay_count Number of peers in the @a relays array.
805 * @param relays Peer identities of members of the multicast group, which serve 1195 * @param relays Peer identities of members of the multicast group, which serve
806 * as relays and used to join the group at. 1196 * as relays and used to join the group at.
807 * @param method Function to invoke on messages received from the channel, 1197 * @param message_cb Function to invoke on message parts received from the
808 * typically at least contains functions for @e join and @e part. 1198 * channel, typically at least contains method handlers for @e join and
1199 * @e part.
809 * @param join_cb function invoked once we have joined with the current 1200 * @param join_cb function invoked once we have joined with the current
810 * message ID of the channel 1201 * message ID of the channel
811 * @param slave_joined_cb Function to invoke when a peer wants to join. 1202 * @param slave_joined_cb Function to invoke when a peer wants to join.
812 * @param cls Closure for @a method_cb and @a slave_joined_cb. 1203 * @param cls Closure for @a message_cb and @a slave_joined_cb.
813 * @param method_name Method name for the join request. 1204 * @param method_name Method name for the join request.
814 * @param env Environment containing transient variables for the request, or NULL. 1205 * @param env Environment containing transient variables for the request, or NULL.
815 * @param data Payload for the join message. 1206 * @param data Payload for the join message.
@@ -823,7 +1214,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
823 const struct GNUNET_PeerIdentity *origin, 1214 const struct GNUNET_PeerIdentity *origin,
824 uint32_t relay_count, 1215 uint32_t relay_count,
825 const struct GNUNET_PeerIdentity *relays, 1216 const struct GNUNET_PeerIdentity *relays,
826 GNUNET_PSYC_Method method, 1217 GNUNET_PSYC_MessageCallback message_cb,
827 GNUNET_PSYC_JoinCallback join_cb, 1218 GNUNET_PSYC_JoinCallback join_cb,
828 GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, 1219 GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
829 void *cls, 1220 void *cls,
@@ -845,6 +1236,10 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
845 req->relay_count = relay_count; 1236 req->relay_count = relay_count;
846 memcpy (&req[1], relays, relay_count * sizeof (*relays)); 1237 memcpy (&req[1], relays, relay_count * sizeof (*relays));
847 1238
1239 ch->message_cb = message_cb;
1240 ch->join_cb = join_cb;
1241 ch->cb_cls = cls;
1242
848 ch->cfg = cfg; 1243 ch->cfg = cfg;
849 ch->is_master = GNUNET_NO; 1244 ch->is_master = GNUNET_NO;
850 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; 1245 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
@@ -1043,7 +1438,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
1043 * @param channel Which channel should be replayed? 1438 * @param channel Which channel should be replayed?
1044 * @param start_message_id Earliest interesting point in history. 1439 * @param start_message_id Earliest interesting point in history.
1045 * @param end_message_id Last (exclusive) interesting point in history. 1440 * @param end_message_id Last (exclusive) interesting point in history.
1046 * @param method Function to invoke on messages received from the story. 1441 * @param message_cb Function to invoke on message parts received from the story.
1047 * @param finish_cb Function to call when the requested story has been fully 1442 * @param finish_cb Function to call when the requested story has been fully
1048 * told (counting message IDs might not suffice, as some messages 1443 * told (counting message IDs might not suffice, as some messages
1049 * might be secret and thus the listener would not know the story is 1444 * might be secret and thus the listener would not know the story is
@@ -1057,8 +1452,8 @@ struct GNUNET_PSYC_Story *
1057GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, 1452GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel,
1058 uint64_t start_message_id, 1453 uint64_t start_message_id,
1059 uint64_t end_message_id, 1454 uint64_t end_message_id,
1060 GNUNET_PSYC_Method method, 1455 GNUNET_PSYC_MessageCallback message_cb,
1061 GNUNET_PSYC_FinishCallback *finish_cb, 1456 GNUNET_PSYC_FinishCallback finish_cb,
1062 void *cls) 1457 void *cls)
1063{ 1458{
1064 return NULL; 1459 return NULL;
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 2986fdf6a..704819c50 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -148,11 +148,16 @@ join (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
148struct TransmitClosure 148struct TransmitClosure
149{ 149{
150 struct GNUNET_PSYC_MasterTransmitHandle *handle; 150 struct GNUNET_PSYC_MasterTransmitHandle *handle;
151 uint8_t n; 151
152 char *mod_names[16];
153 char *mod_values[16];
154 char *data[16];
155
156 uint8_t mod_count;
157 uint8_t data_count;
158
152 uint8_t paused; 159 uint8_t paused;
153 uint8_t fragment_count; 160 uint8_t n;
154 char *fragments[16];
155 uint16_t fragment_sizes[16];
156}; 161};
157 162
158 163
@@ -167,16 +172,47 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
167 172
168 173
169static int 174static int
170transmit_notify (void *cls, size_t *data_size, void *data) 175tmit_notify_mod (void *cls, size_t *data_size, void *data)
171{ 176{
172 struct TransmitClosure *tmit = cls; 177 struct TransmitClosure *tmit = cls;
173 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 178 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
174 "Transmit notify: %lu bytes available, " 179 "Transmit notify modifier: %lu bytes available, "
180 "processing modifier %u/%u.\n",
181 *data_size, tmit->n + 1, tmit->fragment_count);
182 /* FIXME: continuation */
183 uint16_t name_size = strlen (tmit->mod_names[tmit->n]);
184 uint16_t value_size = strlen (tmit->mod_values[tmit->n]);
185 if (name_size + 1 + value_size <= *data_size)
186 return GNUNET_NO;
187
188 *data_size = name_size + 1 + value_size;
189 memcpy (data, tmit->fragments[tmit->n], *data_size);
190
191 if (++tmit->n < tmit->mod_count)
192 {
193 return GNUNET_NO;
194 }
195 else
196 {
197 tmit->n = 0;
198 return GNUNET_YES;
199 }
200}
201
202
203static int
204tmit_notify_data (void *cls, size_t *data_size, void *data)
205{
206 struct TransmitClosure *tmit = cls;
207 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
208 "Transmit notify data: %lu bytes available, "
175 "processing fragment %u/%u.\n", 209 "processing fragment %u/%u.\n",
176 *data_size, tmit->n + 1, tmit->fragment_count); 210 *data_size, tmit->n + 1, tmit->fragment_count);
177 GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); 211 uint16_t size = strlen (tmit->data[tmit->n]);
212 if (size <= *data_size)
213 return GNUNET_NO;
178 214
179 if (GNUNET_YES == tmit->paused && tmit->n == tmit->fragment_count - 1) 215 if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1)
180 { 216 {
181 /* Send last fragment later. */ 217 /* Send last fragment later. */
182 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); 218 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
@@ -188,13 +224,13 @@ transmit_notify (void *cls, size_t *data_size, void *data)
188 return GNUNET_NO; 224 return GNUNET_NO;
189 } 225 }
190 226
191 GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); 227 *data_size = size;
192 *data_size = tmit->fragment_sizes[tmit->n]; 228 memcpy (data, tmit->data[tmit->n], size);
193 memcpy (data, tmit->fragments[tmit->n], *data_size);
194 229
195 return ++tmit->n < tmit->fragment_count ? GNUNET_NO : GNUNET_YES; 230 return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES;
196} 231}
197 232
233
198void 234void
199master_started (void *cls, uint64_t max_message_id) 235master_started (void *cls, uint64_t max_message_id)
200{ 236{
@@ -208,15 +244,13 @@ master_started (void *cls, uint64_t max_message_id)
208 "_foo_bar", "foo bar baz", 11); 244 "_foo_bar", "foo bar baz", 11);
209 245
210 struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); 246 struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure);
211 tmit->fragment_count = 3; 247 tmit->data[0] = "foo";
212 tmit->fragments[0] = "foo"; 248 tmit->data[1] = "foo bar";
213 tmit->fragment_sizes[0] = 4; 249 tmit->data[2] = "foo bar baz";
214 tmit->fragments[1] = "foo bar"; 250 tmit->data_count = 3;
215 tmit->fragment_sizes[1] = 7;
216 tmit->fragments[2] = "foo bar baz";
217 tmit->fragment_sizes[2] = 11;
218 tmit->handle 251 tmit->handle
219 = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit, 252 = GNUNET_PSYC_master_transmit (mst, "_test", tmit_notify_mod,
253 tmit_notify_data, tmit,
220 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); 254 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
221} 255}
222 256