aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/psyc_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-01-06 00:09:37 +0000
committerGabor X Toth <*@tg-x.net>2014-01-06 00:09:37 +0000
commitc04d45b9738e1764d2e2c21efdbeb129f298d5d1 (patch)
tree9eec32efdd3fe3f9f459630af16058cc47436bce /src/psyc/psyc_api.c
parent83a0e31631dbc199c37c42f11004e1be544f04a8 (diff)
downloadgnunet-c04d45b9738e1764d2e2c21efdbeb129f298d5d1.tar.gz
gnunet-c04d45b9738e1764d2e2c21efdbeb129f298d5d1.zip
psyc: ipc messages
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r--src/psyc/psyc_api.c569
1 files changed, 482 insertions, 87 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 290f3e375..a5a01fa92 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -30,15 +30,17 @@
30 * @author Gabor X Toth 30 * @author Gabor X Toth
31 */ 31 */
32 32
33#include <inttypes.h>
34
33#include "platform.h" 35#include "platform.h"
34#include "gnunet_util_lib.h" 36#include "gnunet_util_lib.h"
35#include "gnunet_env_lib.h" 37#include "gnunet_env_lib.h"
38#include "gnunet_multicast_service.h"
36#include "gnunet_psyc_service.h" 39#include "gnunet_psyc_service.h"
37#include "psyc.h" 40#include "psyc.h"
38 41
39#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) 42#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
40 43
41
42struct OperationHandle 44struct OperationHandle
43{ 45{
44 struct OperationHandle *prev; 46 struct OperationHandle *prev;
@@ -91,31 +93,55 @@ struct GNUNET_PSYC_Channel
91 */ 93 */
92 struct GNUNET_TIME_Relative reconnect_delay; 94 struct GNUNET_TIME_Relative reconnect_delay;
93 95
94 GNUNET_PSYC_Method method_cb; 96 /**
97 * Message part callback.
98 */
99 GNUNET_PSYC_MessageCallback message_cb;
100
101 /**
102 * Message part callback for historic message.
103 */
104 GNUNET_PSYC_MessageCallback hist_message_cb;
95 105
106 /**
107 * Join handler callback.
108 */
96 GNUNET_PSYC_JoinCallback join_cb; 109 GNUNET_PSYC_JoinCallback join_cb;
97 110
111 /**
112 * Closure for @a message_cb and @a join_cb.
113 */
98 void *cb_cls; 114 void *cb_cls;
99 115
100 /** 116 /**
101 * Are we polling for incoming messages right now? 117 * ID of the message being received from the PSYC service.
102 */ 118 */
103 int in_receive; 119 uint64_t recv_message_id;
104 120
105 /** 121 /**
106 * Are we currently transmitting a message? 122 * State of the currently being received message from the PSYC service.
107 */ 123 */
108 int in_transmit; 124 enum MessageState recv_state;
109 125
110 /** 126 /**
111 * Is this a master or slave channel? 127 * Flags for the currently being received message from the PSYC service.
128 */
129 enum GNUNET_PSYC_MessageFlags recv_flags;
130
131 /**
132 * Expected value size for the modifier being received from the PSYC service.
133 */
134 uint32_t recv_mod_value_size_expected;
135
136 /**
137 * Actual value size for the modifier being received from the PSYC service.
112 */ 138 */
113 int is_master; 139 uint32_t recv_mod_value_size;
114 140
115 /** 141 /**
116 * Buffer space available for transmitting the next data fragment. 142 * Buffer space available for transmitting the next data fragment.
117 */ 143 */
118 uint16_t tmit_buf_avail; 144 uint16_t tmit_size; // FIXME
119 145
120 /** 146 /**
121 * Is transmission paused? 147 * Is transmission paused?
@@ -125,7 +151,22 @@ struct GNUNET_PSYC_Channel
125 /** 151 /**
126 * Are we still waiting for a PSYC_TRANSMIT_ACK? 152 * Are we still waiting for a PSYC_TRANSMIT_ACK?
127 */ 153 */
128 uint8_t tmit_ack_pending; 154 uint8_t tmit_ack_pending; // FIXME
155
156 /**
157 * Are we polling for incoming messages right now?
158 */
159 uint8_t in_receive;
160
161 /**
162 * Are we currently transmitting a message?
163 */
164 uint8_t in_transmit;
165
166 /**
167 * Is this a master or slave channel?
168 */
169 uint8_t is_master;
129}; 170};
130 171
131 172
@@ -135,9 +176,10 @@ struct GNUNET_PSYC_Channel
135struct GNUNET_PSYC_MasterTransmitHandle 176struct GNUNET_PSYC_MasterTransmitHandle
136{ 177{
137 struct GNUNET_PSYC_Master *master; 178 struct GNUNET_PSYC_Master *master;
138 GNUNET_PSYC_MasterTransmitNotify notify; 179 GNUNET_PSYC_MasterTransmitNotify notify_mod;
180 GNUNET_PSYC_MasterTransmitNotify notify_data;
139 void *notify_cls; 181 void *notify_cls;
140 enum GNUNET_PSYC_DataStatus status; 182 enum MessageState state;
141}; 183};
142 184
143 185
@@ -254,52 +296,383 @@ transmit_next (struct GNUNET_PSYC_Channel *ch);
254 296
255 297
256/** 298/**
257 * Request data from client to transmit. 299 * Reset data stored related to the last received message.
300 */
301static void
302recv_reset (struct GNUNET_PSYC_Channel *ch)
303{
304 ch->recv_state = MSG_STATE_START;
305 ch->recv_flags = 0;
306 ch->recv_message_id = 0;
307 ch->recv_mod_value_size =0;
308 ch->recv_mod_value_size_expected = 0;
309}
310
311
312static void
313recv_error (struct GNUNET_PSYC_Channel *ch)
314{
315 recv_reset (ch);
316
317 GNUNET_PSYC_MessageCallback message_cb
318 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
319 ? ch->hist_message_cb
320 : ch->message_cb;
321
322 if (NULL != message_cb)
323 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
324}
325
326/**
327 * Request a modifier from a client to transmit.
258 * 328 *
259 * @param mst Master handle. 329 * @param mst Master handle.
260 */ 330 */
261static void 331static void
262master_transmit_data (struct GNUNET_PSYC_Master *mst) 332master_transmit_mod (struct GNUNET_PSYC_Master *mst)
263{ 333{
264 struct GNUNET_PSYC_Channel *ch = &mst->ch; 334 struct GNUNET_PSYC_Channel *ch = &mst->ch;
265 size_t data_size = ch->tmit_buf_avail; 335 uint16_t max_data_size
266 struct GNUNET_PSYC_MessageData *pdata; 336 = ch->tmit_size > sizeof (struct GNUNET_MessageHeader)
267 struct OperationHandle *op 337 ? GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - ch->tmit_size
268 = GNUNET_malloc (sizeof (*op) + sizeof (*pdata) + data_size); 338 : GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD - ch->tmit_size;
269 pdata = (struct GNUNET_PSYC_MessageData *) &op[1]; 339 uint16_t data_size = max_data_size;
270 op->msg = (struct GNUNET_MessageHeader *) pdata;
271 pdata->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
272 340
273 switch (mst->tmit->notify (mst->tmit->notify_cls, &data_size, &pdata[1])) 341 struct GNUNET_MessageHeader *msg;
342 struct OperationHandle *op
343 = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size);
344 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
345 msg->type
346 = MSG_STATE_MODIFIER == mst->tmit->state
347 ? htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER)
348 : htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
349
350 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
351 &data_size, &msg[1]);
352 switch (notify_ret)
274 { 353 {
275 case GNUNET_NO: 354 case GNUNET_NO:
276 mst->tmit->status = GNUNET_PSYC_DATA_CONT; 355 if (0 != data_size)
356 mst->tmit->state = MSG_STATE_MOD_CONT;
277 break; 357 break;
278 358
279 case GNUNET_YES: 359 case GNUNET_YES:
280 mst->tmit->status = GNUNET_PSYC_DATA_END; 360 mst->tmit->state = (0 == data_size) ? MSG_STATE_DATA : MSG_STATE_MODIFIER;
281 break; 361 break;
282 362
283 default: 363 default:
284 mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; 364 LOG (GNUNET_ERROR_TYPE_ERROR,
285 data_size = 0; 365 "MasterTransmitNotify returned error when requesting a modifier.\n");
286 LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error.\n"); 366
367 mst->tmit->state = MSG_STATE_START;
368 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
369 msg->size = htons (sizeof (*msg));
370
371 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
372 transmit_next (ch);
373 return;
287 } 374 }
288 375
289 if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) 376 if ((GNUNET_NO == notify_ret && 0 == data_size))
290 { 377 {
291 /* Transmission paused, nothing to send. */ 378 /* Transmission paused, nothing to send. */
292 ch->tmit_paused = GNUNET_YES; 379 ch->tmit_paused = GNUNET_YES;
293 GNUNET_free (op); 380 GNUNET_free (op);
294 } 381 }
295 else 382
383 if (0 < data_size)
384 {
385 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
386 msg->size = htons (sizeof (*msg) + data_size);
387 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
388 }
389
390 /* End of message. */
391 if (GNUNET_YES == notify_ret)
392 {
393 op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
394 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
395 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
396 msg->size = htons (sizeof (*msg));
397 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
398 }
399
400 transmit_next (ch);
401}
402
403
404/**
405 * Request data from a client to transmit.
406 *
407 * @param mst Master handle.
408 */
409static void
410master_transmit_data (struct GNUNET_PSYC_Master *mst)
411{
412 struct GNUNET_PSYC_Channel *ch = &mst->ch;
413 struct GNUNET_MessageHeader *msg;
414 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
415 struct OperationHandle *op
416 = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size);
417 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
418 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
419
420 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
421 &data_size, &msg[1]);
422 switch (notify_ret)
296 { 423 {
297 GNUNET_assert (data_size <= ch->tmit_buf_avail); 424 case GNUNET_NO:
298 pdata->header.size = htons (sizeof (*pdata) + data_size); 425 if (0 == data_size)
299 pdata->status = htons (mst->tmit->status); 426 {
427 /* Transmission paused, nothing to send. */
428 ch->tmit_paused = GNUNET_YES;
429 GNUNET_free (op);
430 }
431 break;
432
433 case GNUNET_YES:
434 mst->tmit->state = MSG_STATE_START;
435 break;
436
437 default:
438 LOG (GNUNET_ERROR_TYPE_ERROR,
439 "MasterTransmitNotify returned error when requesting data.\n");
440
441 mst->tmit->state = MSG_STATE_START;
442 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
443 msg->size = htons (sizeof (*msg));
444
300 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 445 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
301 ch->tmit_ack_pending = GNUNET_YES;
302 transmit_next (ch); 446 transmit_next (ch);
447 return;
448 }
449
450 if (0 < data_size)
451 {
452 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
453 msg->size = htons (sizeof (*msg) + data_size);
454 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
455 }
456
457 /* End of message. */
458 if (GNUNET_YES == notify_ret)
459 {
460 op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
461 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
462 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
463 msg->size = htons (sizeof (*msg));
464 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
465 }
466
467 transmit_next (ch);
468}
469
470
471/**
472 * Handle incoming message from the PSYC service.
473 *
474 * @param ch The channel the message is sent to.
475 * @param pmsg The message.
476 */
477static void
478handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
479 const struct GNUNET_PSYC_MessageHeader *pmsg)
480{
481 const struct GNUNET_MessageHeader *msg;
482 uint16_t msize = ntohs (pmsg->header.size);
483 uint16_t pos = 0;
484 uint16_t size = 0;
485 uint16_t type, size_eq, size_min;
486
487 if (MSG_STATE_START == ch->recv_state)
488 {
489 ch->recv_message_id = GNUNET_ntohll (pmsg->message_id);
490 ch->recv_flags = ntohl (pmsg->flags);
491 }
492 else if (GNUNET_ntohll (pmsg->message_id) != ch->recv_message_id)
493 {
494 LOG (GNUNET_ERROR_TYPE_WARNING,
495 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
496 GNUNET_ntohll (pmsg->message_id), ch->recv_message_id);
497 GNUNET_break_op (0);
498 recv_error (ch);
499 }
500 else if (ntohl (pmsg->flags) != ch->recv_flags)
501 {
502 LOG (GNUNET_ERROR_TYPE_WARNING,
503 "Unexpected message flags. Got: %lu, expected: %lu\n",
504 ntohl (pmsg->flags), ch->recv_flags);
505 GNUNET_break_op (0);
506 recv_error (ch);
507 }
508
509 for (pos = 0; sizeof (*pmsg) + pos < msize; pos += size)
510 {
511 msg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
512 size = ntohs (msg->size);
513 type = ntohs (msg->type);
514 size_eq = size_min = 0;
515
516 if (msize < sizeof (*pmsg) + pos + size)
517 {
518 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
519 "Discarding message of type %u with invalid size. "
520 "(%u < %u + %u + %u)\n", ntohs (msg->type),
521 msize, sizeof (*msg), pos, size);
522 break;
523 }
524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525 "Received message part of type %u and size %u from PSYC.\n",
526 ntohs (msg->type), size);
527
528
529 switch (type)
530 {
531 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
532 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
533 break;
534 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
535 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
536 break;
537 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
538 size_min = sizeof (struct GNUNET_MessageHeader);
539 break;
540 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
541 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
542 size_eq = sizeof (struct GNUNET_MessageHeader);
543 break;
544 }
545
546 if (! ((0 < size_eq && size == size_eq)
547 || (0 < size_min && size_min <= size)))
548 {
549 GNUNET_break (0);
550 reschedule_connect (ch);
551 return;
552 }
553
554 switch (type)
555 {
556 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
557 {
558 struct GNUNET_PSYC_MessageMethod *meth
559 = (struct GNUNET_PSYC_MessageMethod *) msg;
560
561 if (MSG_STATE_HEADER != ch->recv_state)
562 {
563 LOG (GNUNET_ERROR_TYPE_WARNING,
564 "Discarding out of order message method.\n");
565 /* It is normal to receive an incomplete message right after connecting,
566 * but should not happen later.
567 * FIXME: add a check for this condition.
568 */
569 GNUNET_break_op (0);
570 recv_error (ch);
571 break;
572 }
573
574 if ('\0' != (char *) meth + msg->size - 1)
575 {
576 LOG (GNUNET_ERROR_TYPE_WARNING,
577 "Discarding message with malformed method. "
578 "Message ID: %" PRIu64 "\n", ch->recv_message_id);
579 GNUNET_break_op (0);
580 recv_error (ch);
581 break;
582 }
583 GNUNET_PSYC_MessageCallback message_cb
584 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
585 ? ch->hist_message_cb
586 : ch->message_cb;
587
588 if (NULL != message_cb)
589 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
590
591 ch->recv_state = MSG_STATE_METHOD;
592 break;
593 }
594 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
595 {
596 if (MSG_STATE_MODIFIER != ch->recv_state)
597 {
598 LOG (GNUNET_ERROR_TYPE_WARNING,
599 "Discarding out of order message modifier.\n");
600 GNUNET_break_op (0);
601 recv_error (ch);
602 break;
603 }
604
605 struct GNUNET_PSYC_MessageModifier *mod
606 = (struct GNUNET_PSYC_MessageModifier *) msg;
607
608 uint16_t name_size = ntohs (mod->name_size);
609 ch->recv_mod_value_size_expected = ntohs (mod->value_size);
610 ch->recv_mod_value_size = size - sizeof (*mod) - name_size - 1;
611
612 if (size < sizeof (*mod) + name_size + 1
613 || '\0' != (char *) &mod[1] + mod->name_size
614 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
615 {
616 LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n");
617 GNUNET_break_op (0);
618 break;
619 }
620
621 ch->recv_state = MSG_STATE_MODIFIER;
622
623 GNUNET_PSYC_MessageCallback message_cb
624 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
625 ? ch->hist_message_cb
626 : ch->message_cb;
627
628 if (NULL != message_cb)
629 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
630
631 break;
632 }
633 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
634 {
635 ch->recv_mod_value_size += size - sizeof (*msg);
636
637 if (MSG_STATE_MODIFIER != ch->recv_state
638 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
639 {
640 LOG (GNUNET_ERROR_TYPE_WARNING,
641 "Discarding out of order message modifier continuation.\n");
642 GNUNET_break_op (0);
643 recv_reset (ch);
644 break;
645 }
646
647 GNUNET_PSYC_MessageCallback message_cb
648 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
649 ? ch->hist_message_cb
650 : ch->message_cb;
651
652 if (NULL != message_cb)
653 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
654 break;
655 }
656 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
657 {
658 if (ch->recv_state < MSG_STATE_METHOD
659 || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
660 {
661 LOG (GNUNET_ERROR_TYPE_WARNING,
662 "Discarding out of order message data fragment.\n");
663 GNUNET_break_op (0);
664 recv_reset (ch);
665 break;
666 }
667
668 ch->recv_state = MSG_STATE_DATA;
669 break;
670 }
671 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
672 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
673 recv_reset (ch);
674 break;
675 }
303 } 676 }
304} 677}
305 678
@@ -319,11 +692,10 @@ message_handler (void *cls,
319 struct GNUNET_PSYC_Channel *ch = cls; 692 struct GNUNET_PSYC_Channel *ch = cls;
320 struct GNUNET_PSYC_Master *mst = cls; 693 struct GNUNET_PSYC_Master *mst = cls;
321 struct GNUNET_PSYC_Slave *slv = cls; 694 struct GNUNET_PSYC_Slave *slv = cls;
322 struct CountersResult *cres;
323 struct TransmitAck *tack;
324 695
325 if (NULL == msg) 696 if (NULL == msg)
326 { 697 {
698 GNUNET_break (0);
327 reschedule_connect (ch); 699 reschedule_connect (ch);
328 return; 700 return;
329 } 701 }
@@ -342,8 +714,8 @@ message_handler (void *cls,
342 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: 714 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
343 size_eq = sizeof (struct CountersResult); 715 size_eq = sizeof (struct CountersResult);
344 break; 716 break;
345 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: 717 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
346 size_eq = sizeof (struct TransmitAck); 718 size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
347 break; 719 break;
348 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 720 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
349 size_min = sizeof (struct GNUNET_PSYC_MessageMethod); 721 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
@@ -352,11 +724,13 @@ message_handler (void *cls,
352 size_min = sizeof (struct GNUNET_PSYC_MessageModifier); 724 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
353 break; 725 break;
354 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: 726 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
355 size_min = sizeof (struct GNUNET_PSYC_MessageData); 727 size_min = sizeof (struct GNUNET_MessageHeader);
728 break;
729 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
730 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
731 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
732 size_eq = sizeof (struct GNUNET_MessageHeader);
356 break; 733 break;
357 default:
358 GNUNET_break_op (0);
359 return;
360 } 734 }
361 735
362 if (! ((0 < size_eq && size == size_eq) 736 if (! ((0 < size_eq && size == size_eq)
@@ -370,38 +744,63 @@ message_handler (void *cls,
370 switch (type) 744 switch (type)
371 { 745 {
372 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: 746 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
373 cres = (struct CountersResult *) msg; 747 {
748 struct CountersResult *cres = (struct CountersResult *) msg;
374 mst->max_message_id = GNUNET_ntohll (cres->max_message_id); 749 mst->max_message_id = GNUNET_ntohll (cres->max_message_id);
375 if (NULL != mst->start_cb) 750 if (NULL != mst->start_cb)
376 mst->start_cb (ch->cb_cls, mst->max_message_id); 751 mst->start_cb (ch->cb_cls, mst->max_message_id);
377 break; 752 break;
378 753 }
379 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: 754 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
380 cres = (struct CountersResult *) msg; 755 {
381#if TODO 756#if TODO
757 struct CountersResult *cres = (struct CountersResult *) msg;
382 slv->max_message_id = GNUNET_ntohll (cres->max_message_id); 758 slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
383 if (NULL != slv->join_ack_cb) 759 if (NULL != slv->join_ack_cb)
384 mst->join_ack_cb (ch->cb_cls, mst->max_message_id); 760 mst->join_ack_cb (ch->cb_cls, mst->max_message_id);
385#endif 761#endif
386 break; 762 break;
387 763 }
388 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: 764 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
389 tack = (struct TransmitAck *) msg; 765 {
766 ch->tmit_ack_pending = GNUNET_NO;
767
390 if (ch->is_master) 768 if (ch->is_master)
391 { 769 {
392 GNUNET_assert (NULL != mst->tmit); 770 GNUNET_assert (NULL != mst->tmit);
393 if (GNUNET_PSYC_DATA_CONT != mst->tmit->status 771 switch (mst->tmit->state)
394 || NULL == mst->tmit->notify)
395 {
396 GNUNET_free (mst->tmit);
397 mst->tmit = NULL;
398 }
399 else
400 { 772 {
401 ch->tmit_buf_avail = ntohs (tack->buf_avail); 773 case MSG_STATE_MODIFIER:
402 ch->tmit_ack_pending = GNUNET_NO; 774 if (GNUNET_NO == ch->tmit_paused)
775 master_transmit_mod (mst);
776 break;
777
778 case MSG_STATE_MOD_CONT:
779 if (GNUNET_NO == ch->tmit_paused)
780 master_transmit_mod (mst);
781 break;
782
783 case MSG_STATE_DATA:
403 if (GNUNET_NO == ch->tmit_paused) 784 if (GNUNET_NO == ch->tmit_paused)
404 master_transmit_data (mst); 785 master_transmit_data (mst);
786 break;
787
788 case MSG_STATE_END:
789 case MSG_STATE_CANCEL:
790 if (NULL != mst->tmit)
791 {
792 GNUNET_free (mst->tmit);
793 mst->tmit = NULL;
794 }
795 else
796 {
797 LOG (GNUNET_ERROR_TYPE_WARNING,
798 "Ignoring transmit ack, there's no transmission going on.\n");
799 }
800 break;
801 default:
802 LOG (GNUNET_ERROR_TYPE_WARNING,
803 "Ignoring unexpected transmit ack.\n");
405 } 804 }
406 } 805 }
407 else 806 else
@@ -409,17 +808,10 @@ message_handler (void *cls,
409 /* TODO: slave */ 808 /* TODO: slave */
410 } 809 }
411 break; 810 break;
811 }
412 812
413 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 813 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
414 814 handle_psyc_message(ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
415 break;
416
417 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
418
419 break;
420
421 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
422
423 break; 815 break;
424 } 816 }
425 817
@@ -506,6 +898,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
506{ 898{
507 struct GNUNET_PSYC_Channel *ch = cls; 899 struct GNUNET_PSYC_Channel *ch = cls;
508 900
901 recv_reset (ch);
509 ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; 902 ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
510 LOG (GNUNET_ERROR_TYPE_DEBUG, 903 LOG (GNUNET_ERROR_TYPE_DEBUG,
511 "Connecting to PSYC service.\n"); 904 "Connecting to PSYC service.\n");
@@ -588,7 +981,7 @@ disconnect (void *c)
588 * one in the future. 981 * one in the future.
589 * @param policy Channel policy specifying join and history restrictions. 982 * @param policy Channel policy specifying join and history restrictions.
590 * Used to automate join decisions. 983 * Used to automate join decisions.
591 * @param method Function to invoke on messages received from slaves. 984 * @param message_cb Function to invoke on message parts received from slaves.
592 * @param join_cb Function to invoke when a peer wants to join. 985 * @param join_cb Function to invoke when a peer wants to join.
593 * @param master_started_cb Function to invoke after the channel master started. 986 * @param master_started_cb Function to invoke after the channel master started.
594 * @param cls Closure for @a master_started_cb and @a join_cb. 987 * @param cls Closure for @a master_started_cb and @a join_cb.
@@ -598,7 +991,7 @@ struct GNUNET_PSYC_Master *
598GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, 991GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
599 const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key, 992 const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key,
600 enum GNUNET_PSYC_Policy policy, 993 enum GNUNET_PSYC_Policy policy,
601 GNUNET_PSYC_Method method, 994 GNUNET_PSYC_MessageCallback message_cb,
602 GNUNET_PSYC_JoinCallback join_cb, 995 GNUNET_PSYC_JoinCallback join_cb,
603 GNUNET_PSYC_MasterStartCallback master_started_cb, 996 GNUNET_PSYC_MasterStartCallback master_started_cb,
604 void *cls) 997 void *cls)
@@ -618,7 +1011,7 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
618 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 1011 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
619 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); 1012 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
620 1013
621 ch->method_cb = method; 1014 ch->message_cb = message_cb;
622 ch->join_cb = join_cb; 1015 ch->join_cb = join_cb;
623 ch->cb_cls = cls; 1016 ch->cb_cls = cls;
624 mst->start_cb = master_started_cb; 1017 mst->start_cb = master_started_cb;
@@ -705,19 +1098,17 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
705 * 1098 *
706 * @param master Handle to the PSYC channel. 1099 * @param master Handle to the PSYC channel.
707 * @param method_name Which method should be invoked. 1100 * @param method_name Which method should be invoked.
708 * @param env Environment containing state operations and transient variables 1101 * @param notify_mod Function to call to obtain modifiers.
709 * for the message, or NULL. 1102 * @param notify_data Function to call to obtain fragments of the data.
710 * @param notify Function to call to obtain the arguments. 1103 * @param notify_cls Closure for @a notify_mod and @a notify_data.
711 * @param notify_cls Closure for @a notify.
712 * @param flags Flags for the message being transmitted. 1104 * @param flags Flags for the message being transmitted.
713 * @return Transmission handle, NULL on error (i.e. more than one request 1105 * @return Transmission handle, NULL on error (i.e. more than one request queued).
714 * queued).
715 */ 1106 */
716struct GNUNET_PSYC_MasterTransmitHandle * 1107struct GNUNET_PSYC_MasterTransmitHandle *
717GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, 1108GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
718 const char *method_name, 1109 const char *method_name,
719 const struct GNUNET_ENV_Environment *env, 1110 GNUNET_PSYC_MasterTransmitNotify notify_mod,
720 GNUNET_PSYC_MasterTransmitNotify notify, 1111 GNUNET_PSYC_MasterTransmitNotify notify_data,
721 void *notify_cls, 1112 void *notify_cls,
722 enum GNUNET_PSYC_MasterTransmitFlags flags) 1113 enum GNUNET_PSYC_MasterTransmitFlags flags)
723{ 1114{
@@ -737,18 +1128,17 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
737 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); 1128 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
738 pmeth->header.size = htons (sizeof (*pmeth) + size); 1129 pmeth->header.size = htons (sizeof (*pmeth) + size);
739 pmeth->flags = htonl (flags); 1130 pmeth->flags = htonl (flags);
740 pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
741 memcpy (&pmeth[1], method_name, size); 1131 memcpy (&pmeth[1], method_name, size);
742 1132
743 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 1133 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
744 GNUNET_ENV_environment_iterate (env, send_modifier, master);
745 transmit_next (ch); 1134 transmit_next (ch);
746 1135
747 master->tmit = GNUNET_malloc (sizeof (*master->tmit)); 1136 master->tmit = GNUNET_malloc (sizeof (*master->tmit));
748 master->tmit->master = master; 1137 master->tmit->master = master;
749 master->tmit->notify = notify; 1138 master->tmit->notify_mod = notify_mod;
1139 master->tmit->notify_data = notify_data;
750 master->tmit->notify_cls = notify_cls; 1140 master->tmit->notify_cls = notify_cls;
751 master->tmit->status = GNUNET_PSYC_DATA_CONT; 1141 master->tmit->state = MSG_STATE_START; // FIXME
752 return master->tmit; 1142 return master->tmit;
753} 1143}
754 1144
@@ -804,12 +1194,13 @@ GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
804 * @param relay_count Number of peers in the @a relays array. 1194 * @param relay_count Number of peers in the @a relays array.
805 * @param relays Peer identities of members of the multicast group, which serve 1195 * @param relays Peer identities of members of the multicast group, which serve
806 * as relays and used to join the group at. 1196 * as relays and used to join the group at.
807 * @param method Function to invoke on messages received from the channel, 1197 * @param message_cb Function to invoke on message parts received from the
808 * typically at least contains functions for @e join and @e part. 1198 * channel, typically at least contains method handlers for @e join and
1199 * @e part.
809 * @param join_cb function invoked once we have joined with the current 1200 * @param join_cb function invoked once we have joined with the current
810 * message ID of the channel 1201 * message ID of the channel
811 * @param slave_joined_cb Function to invoke when a peer wants to join. 1202 * @param slave_joined_cb Function to invoke when a peer wants to join.
812 * @param cls Closure for @a method_cb and @a slave_joined_cb. 1203 * @param cls Closure for @a message_cb and @a slave_joined_cb.
813 * @param method_name Method name for the join request. 1204 * @param method_name Method name for the join request.
814 * @param env Environment containing transient variables for the request, or NULL. 1205 * @param env Environment containing transient variables for the request, or NULL.
815 * @param data Payload for the join message. 1206 * @param data Payload for the join message.
@@ -823,7 +1214,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
823 const struct GNUNET_PeerIdentity *origin, 1214 const struct GNUNET_PeerIdentity *origin,
824 uint32_t relay_count, 1215 uint32_t relay_count,
825 const struct GNUNET_PeerIdentity *relays, 1216 const struct GNUNET_PeerIdentity *relays,
826 GNUNET_PSYC_Method method, 1217 GNUNET_PSYC_MessageCallback message_cb,
827 GNUNET_PSYC_JoinCallback join_cb, 1218 GNUNET_PSYC_JoinCallback join_cb,
828 GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, 1219 GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
829 void *cls, 1220 void *cls,
@@ -845,6 +1236,10 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
845 req->relay_count = relay_count; 1236 req->relay_count = relay_count;
846 memcpy (&req[1], relays, relay_count * sizeof (*relays)); 1237 memcpy (&req[1], relays, relay_count * sizeof (*relays));
847 1238
1239 ch->message_cb = message_cb;
1240 ch->join_cb = join_cb;
1241 ch->cb_cls = cls;
1242
848 ch->cfg = cfg; 1243 ch->cfg = cfg;
849 ch->is_master = GNUNET_NO; 1244 ch->is_master = GNUNET_NO;
850 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; 1245 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
@@ -1043,7 +1438,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
1043 * @param channel Which channel should be replayed? 1438 * @param channel Which channel should be replayed?
1044 * @param start_message_id Earliest interesting point in history. 1439 * @param start_message_id Earliest interesting point in history.
1045 * @param end_message_id Last (exclusive) interesting point in history. 1440 * @param end_message_id Last (exclusive) interesting point in history.
1046 * @param method Function to invoke on messages received from the story. 1441 * @param message_cb Function to invoke on message parts received from the story.
1047 * @param finish_cb Function to call when the requested story has been fully 1442 * @param finish_cb Function to call when the requested story has been fully
1048 * told (counting message IDs might not suffice, as some messages 1443 * told (counting message IDs might not suffice, as some messages
1049 * might be secret and thus the listener would not know the story is 1444 * might be secret and thus the listener would not know the story is
@@ -1057,8 +1452,8 @@ struct GNUNET_PSYC_Story *
1057GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, 1452GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel,
1058 uint64_t start_message_id, 1453 uint64_t start_message_id,
1059 uint64_t end_message_id, 1454 uint64_t end_message_id,
1060 GNUNET_PSYC_Method method, 1455 GNUNET_PSYC_MessageCallback message_cb,
1061 GNUNET_PSYC_FinishCallback *finish_cb, 1456 GNUNET_PSYC_FinishCallback finish_cb,
1062 void *cls) 1457 void *cls)
1063{ 1458{
1064 return NULL; 1459 return NULL;