aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/psyc_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-03-06 23:46:45 +0000
committerGabor X Toth <*@tg-x.net>2014-03-06 23:46:45 +0000
commit8a0b8a4604526e5f832c4971f9c3b1b48d79bea4 (patch)
treedfd18a61272a18381fe9ce9b09849a965480a303 /src/psyc/psyc_api.c
parenta21beab58c1d2abc747359a98326f19aaad4e8cd (diff)
downloadgnunet-8a0b8a4604526e5f832c4971f9c3b1b48d79bea4.tar.gz
gnunet-8a0b8a4604526e5f832c4971f9c3b1b48d79bea4.zip
PSYC: implement slave to master requests, tests, fixes, reorg
Multicast lib: handle member to origin requests. Keep track of members and origins and call their callbacks when necessary.
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r--src/psyc/psyc_api.c370
1 files changed, 214 insertions, 156 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 20394bbce..8a1c9ffaa 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -48,12 +48,30 @@ struct OperationHandle
48 struct GNUNET_MessageHeader *msg; 48 struct GNUNET_MessageHeader *msg;
49}; 49};
50 50
51
52/**
53 * Handle for a pending PSYC transmission operation.
54 */
55struct GNUNET_PSYC_ChannelTransmitHandle
56{
57 struct GNUNET_PSYC_Channel *ch;
58 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
59 GNUNET_PSYC_TransmitNotifyData notify_data;
60 void *notify_cls;
61 enum MessageState state;
62};
63
51/** 64/**
52 * Handle to access PSYC channel operations for both the master and slaves. 65 * Handle to access PSYC channel operations for both the master and slaves.
53 */ 66 */
54struct GNUNET_PSYC_Channel 67struct GNUNET_PSYC_Channel
55{ 68{
56 /** 69 /**
70 * Transmission handle;
71 */
72 struct GNUNET_PSYC_ChannelTransmitHandle tmit;
73
74 /**
57 * Configuration to use. 75 * Configuration to use.
58 */ 76 */
59 const struct GNUNET_CONFIGURATION_Handle *cfg; 77 const struct GNUNET_CONFIGURATION_Handle *cfg;
@@ -124,6 +142,11 @@ struct GNUNET_PSYC_Channel
124 uint64_t recv_message_id; 142 uint64_t recv_message_id;
125 143
126 /** 144 /**
145 * Public key of the slave from which a message is being received.
146 */
147 struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
148
149 /**
127 * State of the currently being received message from the PSYC service. 150 * State of the currently being received message from the PSYC service.
128 */ 151 */
129 enum MessageState recv_state; 152 enum MessageState recv_state;
@@ -171,27 +194,12 @@ struct GNUNET_PSYC_Channel
171 194
172 195
173/** 196/**
174 * Handle for a pending PSYC transmission operation.
175 */
176struct GNUNET_PSYC_MasterTransmitHandle
177{
178 struct GNUNET_PSYC_Master *master;
179 GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod;
180 GNUNET_PSYC_MasterTransmitNotify notify_data;
181 void *notify_cls;
182 enum MessageState state;
183};
184
185
186/**
187 * Handle for the master of a PSYC channel. 197 * Handle for the master of a PSYC channel.
188 */ 198 */
189struct GNUNET_PSYC_Master 199struct GNUNET_PSYC_Master
190{ 200{
191 struct GNUNET_PSYC_Channel ch; 201 struct GNUNET_PSYC_Channel ch;
192 202
193 struct GNUNET_PSYC_MasterTransmitHandle *tmit;
194
195 GNUNET_PSYC_MasterStartCallback start_cb; 203 GNUNET_PSYC_MasterStartCallback start_cb;
196 204
197 uint64_t max_message_id; 205 uint64_t max_message_id;
@@ -204,6 +212,10 @@ struct GNUNET_PSYC_Master
204struct GNUNET_PSYC_Slave 212struct GNUNET_PSYC_Slave
205{ 213{
206 struct GNUNET_PSYC_Channel ch; 214 struct GNUNET_PSYC_Channel ch;
215
216 GNUNET_PSYC_SlaveJoinCallback join_cb;
217
218 uint64_t max_message_id;
207}; 219};
208 220
209 221
@@ -251,7 +263,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
251 263
252 264
253static void 265static void
254master_transmit_data (struct GNUNET_PSYC_Master *mst); 266channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
255 267
256 268
257/** 269/**
@@ -302,7 +314,8 @@ recv_reset (struct GNUNET_PSYC_Channel *ch)
302 ch->recv_state = MSG_STATE_START; 314 ch->recv_state = MSG_STATE_START;
303 ch->recv_flags = 0; 315 ch->recv_flags = 0;
304 ch->recv_message_id = 0; 316 ch->recv_message_id = 0;
305 ch->recv_mod_value_size =0; 317 //FIXME: ch->recv_slave_key = { 0 };
318 ch->recv_mod_value_size = 0;
306 ch->recv_mod_value_size_expected = 0; 319 ch->recv_mod_value_size_expected = 0;
307} 320}
308 321
@@ -379,8 +392,9 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
379 } 392 }
380 393
381 if (NULL != op 394 if (NULL != op
382 && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD 395 && (GNUNET_YES == end
383 < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) 396 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
397 < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
384 { 398 {
385 /* End of message or buffer is full, add it to transmission queue. */ 399 /* End of message or buffer is full, add it to transmission queue. */
386 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 400 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
@@ -390,6 +404,9 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
390 ch->tmit_ack_pending++; 404 ch->tmit_ack_pending++;
391 } 405 }
392 406
407 if (GNUNET_YES == end)
408 ch->in_transmit = GNUNET_NO;
409
393 transmit_next (ch); 410 transmit_next (ch);
394} 411}
395 412
@@ -400,15 +417,14 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
400 * @param mst Master handle. 417 * @param mst Master handle.
401 */ 418 */
402static void 419static void
403master_transmit_mod (struct GNUNET_PSYC_Master *mst) 420channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
404{ 421{
405 struct GNUNET_PSYC_Channel *ch = &mst->ch;
406 uint16_t max_data_size, data_size; 422 uint16_t max_data_size, data_size;
407 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; 423 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
408 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; 424 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
409 int notify_ret; 425 int notify_ret;
410 426
411 switch (mst->tmit->state) 427 switch (ch->tmit.state)
412 { 428 {
413 case MSG_STATE_MODIFIER: 429 case MSG_STATE_MODIFIER:
414 { 430 {
@@ -417,12 +433,11 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
417 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; 433 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
418 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); 434 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
419 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); 435 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
420 notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, 436 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
421 &data_size, &mod[1], &mod->oper); 437 &data_size, &mod[1], &mod->oper);
422 mod->name_size = strnlen ((char *) &mod[1], data_size); 438 mod->name_size = strnlen ((char *) &mod[1], data_size);
423 if (mod->name_size < data_size) 439 if (mod->name_size < data_size)
424 { 440 {
425 mod->oper = htons (mod->oper);
426 mod->value_size = htons (data_size - 1 - mod->name_size); 441 mod->value_size = htons (data_size - 1 - mod->name_size);
427 mod->name_size = htons (mod->name_size); 442 mod->name_size = htons (mod->name_size);
428 } 443 }
@@ -438,8 +453,8 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
438 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; 453 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
439 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); 454 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
440 msg->size = sizeof (struct GNUNET_MessageHeader); 455 msg->size = sizeof (struct GNUNET_MessageHeader);
441 notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, 456 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
442 &data_size, &msg[1], NULL); 457 &data_size, &msg[1], NULL);
443 break; 458 break;
444 } 459 }
445 default: 460 default:
@@ -454,27 +469,28 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
454 ch->tmit_paused = GNUNET_YES; 469 ch->tmit_paused = GNUNET_YES;
455 return; 470 return;
456 } 471 }
457 mst->tmit->state = MSG_STATE_MOD_CONT; 472 ch->tmit.state = MSG_STATE_MOD_CONT;
458 break; 473 break;
459 474
460 case GNUNET_YES: 475 case GNUNET_YES:
461 if (0 == data_size) 476 if (0 == data_size)
462 { 477 {
463 /* End of modifiers. */ 478 /* End of modifiers. */
464 mst->tmit->state = MSG_STATE_DATA; 479 ch->tmit.state = MSG_STATE_DATA;
465 if (0 == ch->tmit_ack_pending) 480 if (0 == ch->tmit_ack_pending)
466 master_transmit_data (mst); 481 channel_transmit_data (ch);
467 482
468 return; 483 return;
469 } 484 }
470 mst->tmit->state = MSG_STATE_MODIFIER; 485 ch->tmit.state = MSG_STATE_MODIFIER;
471 break; 486 break;
472 487
473 default: 488 default:
474 LOG (GNUNET_ERROR_TYPE_ERROR, 489 LOG (GNUNET_ERROR_TYPE_ERROR,
475 "MasterTransmitNotify returned error when requesting a modifier.\n"); 490 "MasterTransmitNotifyModifier returned error "
491 "when requesting a modifier.\n");
476 492
477 mst->tmit->state = MSG_STATE_START; 493 ch->tmit.state = MSG_STATE_CANCEL;
478 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 494 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
479 msg->size = htons (sizeof (*msg)); 495 msg->size = htons (sizeof (*msg));
480 496
@@ -489,7 +505,7 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
489 queue_message (ch, msg, GNUNET_NO); 505 queue_message (ch, msg, GNUNET_NO);
490 } 506 }
491 507
492 master_transmit_mod (mst); 508 channel_transmit_mod (ch);
493} 509}
494 510
495 511
@@ -499,17 +515,16 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
499 * @param mst Master handle. 515 * @param mst Master handle.
500 */ 516 */
501static void 517static void
502master_transmit_data (struct GNUNET_PSYC_Master *mst) 518channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
503{ 519{
504 struct GNUNET_PSYC_Channel *ch = &mst->ch;
505 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; 520 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
506 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; 521 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
507 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; 522 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
508 523
509 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); 524 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
510 525
511 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, 526 int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
512 &data_size, &msg[1]); 527 &data_size, &msg[1]);
513 switch (notify_ret) 528 switch (notify_ret)
514 { 529 {
515 case GNUNET_NO: 530 case GNUNET_NO:
@@ -522,14 +537,14 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
522 break; 537 break;
523 538
524 case GNUNET_YES: 539 case GNUNET_YES:
525 mst->tmit->state = MSG_STATE_START; 540 ch->tmit.state = MSG_STATE_END;
526 break; 541 break;
527 542
528 default: 543 default:
529 LOG (GNUNET_ERROR_TYPE_ERROR, 544 LOG (GNUNET_ERROR_TYPE_ERROR,
530 "MasterTransmitNotify returned error when requesting data.\n"); 545 "MasterTransmitNotify returned error when requesting data.\n");
531 546
532 mst->tmit->state = MSG_STATE_START; 547 ch->tmit.state = MSG_STATE_CANCEL;
533 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 548 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
534 msg->size = htons (sizeof (*msg)); 549 msg->size = htons (sizeof (*msg));
535 queue_message (ch, msg, GNUNET_YES); 550 queue_message (ch, msg, GNUNET_YES);
@@ -554,6 +569,86 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
554 569
555 570
556/** 571/**
572 * Send a message to a channel.
573 *
574 * @param ch Handle to the PSYC channel.
575 * @param method_name Which method should be invoked.
576 * @param notify_mod Function to call to obtain modifiers.
577 * @param notify_data Function to call to obtain fragments of the data.
578 * @param notify_cls Closure for @a notify_mod and @a notify_data.
579 * @param flags Flags for the message being transmitted.
580 * @return Transmission handle, NULL on error (i.e. more than one request queued).
581 */
582static struct GNUNET_PSYC_ChannelTransmitHandle *
583channel_transmit (struct GNUNET_PSYC_Channel *ch,
584 const char *method_name,
585 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
586 GNUNET_PSYC_TransmitNotifyData notify_data,
587 void *notify_cls,
588 uint32_t flags)
589{
590 if (GNUNET_NO != ch->in_transmit)
591 return NULL;
592 ch->in_transmit = GNUNET_YES;
593
594 size_t size = strlen (method_name) + 1;
595 struct GNUNET_PSYC_MessageMethod *pmeth;
596 struct OperationHandle *op;
597
598 ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
599 + sizeof (*pmeth) + size);
600 op->msg = (struct GNUNET_MessageHeader *) &op[1];
601 op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
602
603 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
604 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
605 pmeth->header.size = htons (sizeof (*pmeth) + size);
606 pmeth->flags = htonl (flags);
607 memcpy (&pmeth[1], method_name, size);
608
609 ch->tmit.ch = ch;
610 ch->tmit.notify_mod = notify_mod;
611 ch->tmit.notify_data = notify_data;
612 ch->tmit.notify_cls = notify_cls;
613 ch->tmit.state = MSG_STATE_MODIFIER;
614
615 channel_transmit_mod (ch);
616 return &ch->tmit;
617}
618
619
620/**
621 * Resume transmission to the channel.
622 *
623 * @param th Handle of the request that is being resumed.
624 */
625static void
626channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th)
627{
628 struct GNUNET_PSYC_Channel *ch = th->ch;
629 if (0 == ch->tmit_ack_pending)
630 {
631 ch->tmit_paused = GNUNET_NO;
632 channel_transmit_data (ch);
633 }
634}
635
636
637/**
638 * Abort transmission request to channel.
639 *
640 * @param th Handle of the request that is being aborted.
641 */
642static void
643channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
644{
645 struct GNUNET_PSYC_Channel *ch = th->ch;
646 if (GNUNET_NO == ch->in_transmit)
647 return;
648}
649
650
651/**
557 * Handle incoming message from the PSYC service. 652 * Handle incoming message from the PSYC service.
558 * 653 *
559 * @param ch The channel the message is sent to. 654 * @param ch The channel the message is sent to.
@@ -564,14 +659,20 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
564 const struct GNUNET_PSYC_MessageHeader *msg) 659 const struct GNUNET_PSYC_MessageHeader *msg)
565{ 660{
566 uint16_t size = ntohs (msg->header.size); 661 uint16_t size = ntohs (msg->header.size);
662 uint32_t flags = ntohl (msg->flags);
663
664 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
665 (struct GNUNET_MessageHeader *) msg);
567 666
568 if (MSG_STATE_START == ch->recv_state) 667 if (MSG_STATE_START == ch->recv_state)
569 { 668 {
570 ch->recv_message_id = GNUNET_ntohll (msg->message_id); 669 ch->recv_message_id = GNUNET_ntohll (msg->message_id);
571 ch->recv_flags = ntohl (msg->flags); 670 ch->recv_flags = flags;
671 ch->recv_slave_key = msg->slave_key;
572 } 672 }
573 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) 673 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
574 { 674 {
675 // FIXME
575 LOG (GNUNET_ERROR_TYPE_WARNING, 676 LOG (GNUNET_ERROR_TYPE_WARNING,
576 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", 677 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
577 GNUNET_ntohll (msg->message_id), ch->recv_message_id); 678 GNUNET_ntohll (msg->message_id), ch->recv_message_id);
@@ -579,11 +680,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
579 recv_error (ch); 680 recv_error (ch);
580 return; 681 return;
581 } 682 }
582 else if (ntohl (msg->flags) != ch->recv_flags) 683 else if (flags != ch->recv_flags)
583 { 684 {
584 LOG (GNUNET_ERROR_TYPE_WARNING, 685 LOG (GNUNET_ERROR_TYPE_WARNING,
585 "Unexpected message flags. Got: %lu, expected: %lu\n", 686 "Unexpected message flags. Got: %lu, expected: %lu\n",
586 ntohl (msg->flags), ch->recv_flags); 687 flags, ch->recv_flags);
587 GNUNET_break_op (0); 688 GNUNET_break_op (0);
588 recv_error (ch); 689 recv_error (ch);
589 return; 690 return;
@@ -599,10 +700,6 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
599 ptype = ntohs (pmsg->type); 700 ptype = ntohs (pmsg->type);
600 size_eq = size_min = 0; 701 size_eq = size_min = 0;
601 702
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603 "Received message part of type %u and size %u from PSYC.\n",
604 ptype, psize);
605
606 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) 703 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
607 { 704 {
608 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 705 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -612,6 +709,10 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
612 return; 709 return;
613 } 710 }
614 711
712 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
713 "Received message part from PSYC.\n");
714 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
715
615 switch (ptype) 716 switch (ptype)
616 { 717 {
617 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 718 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
@@ -758,6 +859,46 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
758 859
759 860
760/** 861/**
862 * Handle incoming message acknowledgement from the PSYC service.
863 *
864 * @param ch The channel the acknowledgement is sent to.
865 */
866static void
867handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
868{
869 if (0 == ch->tmit_ack_pending)
870 {
871 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
872 GNUNET_break (0);
873 return;
874 }
875 ch->tmit_ack_pending--;
876
877 switch (ch->tmit.state)
878 {
879 case MSG_STATE_MODIFIER:
880 case MSG_STATE_MOD_CONT:
881 if (GNUNET_NO == ch->tmit_paused)
882 channel_transmit_mod (ch);
883 break;
884
885 case MSG_STATE_DATA:
886 if (GNUNET_NO == ch->tmit_paused)
887 channel_transmit_data (ch);
888 break;
889
890 case MSG_STATE_END:
891 case MSG_STATE_CANCEL:
892 break;
893
894 default:
895 LOG (GNUNET_ERROR_TYPE_DEBUG,
896 "Ignoring message ACK in state %u.\n", ch->tmit.state);
897 }
898}
899
900
901/**
761 * Type of a function to call when we receive a message 902 * Type of a function to call when we receive a message
762 * from the service. 903 * from the service.
763 * 904 *
@@ -775,7 +916,7 @@ message_handler (void *cls,
775 916
776 if (NULL == msg) 917 if (NULL == msg)
777 { 918 {
778 GNUNET_break (0); 919 // timeout / disconnected from server, reconnect
779 reschedule_connect (ch); 920 reschedule_connect (ch);
780 return; 921 return;
781 } 922 }
@@ -824,63 +965,15 @@ message_handler (void *cls,
824 } 965 }
825 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: 966 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
826 { 967 {
827#if TODO
828 struct CountersResult *cres = (struct CountersResult *) msg; 968 struct CountersResult *cres = (struct CountersResult *) msg;
829 slv->max_message_id = GNUNET_ntohll (cres->max_message_id); 969 slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
830 if (NULL != slv->join_ack_cb) 970 if (NULL != slv->join_cb)
831 mst->join_ack_cb (ch->cb_cls, mst->max_message_id); 971 slv->join_cb (ch->cb_cls, slv->max_message_id);
832#endif
833 break; 972 break;
834 } 973 }
835 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: 974 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
836 { 975 {
837 if (0 == ch->tmit_ack_pending) 976 handle_psyc_message_ack (ch);
838 {
839 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
840 GNUNET_break (0);
841 break;
842 }
843 ch->tmit_ack_pending--;
844
845 if (ch->is_master)
846 {
847 GNUNET_assert (NULL != mst->tmit);
848 switch (mst->tmit->state)
849 {
850 case MSG_STATE_MODIFIER:
851 case MSG_STATE_MOD_CONT:
852 if (GNUNET_NO == ch->tmit_paused)
853 master_transmit_mod (mst);
854 break;
855
856 case MSG_STATE_DATA:
857 if (GNUNET_NO == ch->tmit_paused)
858 master_transmit_data (mst);
859 break;
860
861 case MSG_STATE_END:
862 case MSG_STATE_CANCEL:
863 if (NULL != mst->tmit)
864 {
865 GNUNET_free (mst->tmit);
866 mst->tmit = NULL;
867 }
868 else
869 {
870 LOG (GNUNET_ERROR_TYPE_WARNING,
871 "Ignoring message ACK, there's no transmission going on.\n");
872 GNUNET_break (0);
873 }
874 break;
875 default:
876 LOG (GNUNET_ERROR_TYPE_DEBUG,
877 "Ignoring message ACK in state %u.\n", mst->tmit->state);
878 }
879 }
880 else
881 {
882 /* TODO: slave */
883 }
884 break; 977 break;
885 } 978 }
886 979
@@ -1106,8 +1199,6 @@ void
1106GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) 1199GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
1107{ 1200{
1108 disconnect (master); 1201 disconnect (master);
1109 if (NULL != master->tmit)
1110 GNUNET_free (master->tmit);
1111 GNUNET_free (master); 1202 GNUNET_free (master);
1112} 1203}
1113 1204
@@ -1162,41 +1253,14 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1162struct GNUNET_PSYC_MasterTransmitHandle * 1253struct GNUNET_PSYC_MasterTransmitHandle *
1163GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, 1254GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1164 const char *method_name, 1255 const char *method_name,
1165 GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod, 1256 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1166 GNUNET_PSYC_MasterTransmitNotify notify_data, 1257 GNUNET_PSYC_TransmitNotifyData notify_data,
1167 void *notify_cls, 1258 void *notify_cls,
1168 enum GNUNET_PSYC_MasterTransmitFlags flags) 1259 enum GNUNET_PSYC_MasterTransmitFlags flags)
1169{ 1260{
1170 GNUNET_assert (NULL != master); 1261 return (struct GNUNET_PSYC_MasterTransmitHandle *)
1171 struct GNUNET_PSYC_Channel *ch = &master->ch; 1262 channel_transmit (&master->ch, method_name, notify_mod, notify_data,
1172 if (GNUNET_NO != ch->in_transmit) 1263 notify_cls, flags);
1173 return NULL;
1174 ch->in_transmit = GNUNET_YES;
1175
1176 size_t size = strlen (method_name) + 1;
1177 struct GNUNET_PSYC_MessageMethod *pmeth;
1178 struct OperationHandle *op;
1179
1180 ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
1181 + sizeof (*pmeth) + size);
1182 op->msg = (struct GNUNET_MessageHeader *) &op[1];
1183 op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
1184
1185 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
1186 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
1187 pmeth->header.size = htons (sizeof (*pmeth) + size);
1188 pmeth->flags = htonl (flags);
1189 memcpy (&pmeth[1], method_name, size);
1190
1191 master->tmit = GNUNET_malloc (sizeof (*master->tmit));
1192 master->tmit->master = master;
1193 master->tmit->notify_mod = notify_mod;
1194 master->tmit->notify_data = notify_data;
1195 master->tmit->notify_cls = notify_cls;
1196 master->tmit->state = MSG_STATE_MODIFIER;
1197
1198 master_transmit_mod (master);
1199 return master->tmit;
1200} 1264}
1201 1265
1202 1266
@@ -1208,12 +1272,7 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1208void 1272void
1209GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 1273GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1210{ 1274{
1211 struct GNUNET_PSYC_Channel *ch = &th->master->ch; 1275 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1212 if (0 == ch->tmit_ack_pending)
1213 {
1214 ch->tmit_paused = GNUNET_NO;
1215 master_transmit_data (th->master);
1216 }
1217} 1276}
1218 1277
1219 1278
@@ -1225,10 +1284,7 @@ GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1225void 1284void
1226GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) 1285GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
1227{ 1286{
1228 struct GNUNET_PSYC_Master *master = th->master; 1287 channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1229 struct GNUNET_PSYC_Channel *ch = &master->ch;
1230 if (GNUNET_NO != ch->in_transmit)
1231 return;
1232} 1288}
1233 1289
1234 1290
@@ -1282,15 +1338,15 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1282{ 1338{
1283 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); 1339 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
1284 struct GNUNET_PSYC_Channel *ch = &slv->ch; 1340 struct GNUNET_PSYC_Channel *ch = &slv->ch;
1285 struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req) 1341 struct SlaveJoinRequest *req
1286 + relay_count * sizeof (*relays)); 1342 = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
1287 req->header.size = htons (sizeof (*req) 1343 req->header.size = htons (sizeof (*req)
1288 + relay_count * sizeof (*relays)); 1344 + relay_count * sizeof (*relays));
1289 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); 1345 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
1290 req->channel_key = *channel_key; 1346 req->channel_key = *channel_key;
1291 req->slave_key = *slave_key; 1347 req->slave_key = *slave_key;
1292 req->origin = *origin; 1348 req->origin = *origin;
1293 req->relay_count = relay_count; 1349 req->relay_count = htonl (relay_count);
1294 memcpy (&req[1], relays, relay_count * sizeof (*relays)); 1350 memcpy (&req[1], relays, relay_count * sizeof (*relays));
1295 1351
1296 ch->message_cb = message_cb; 1352 ch->message_cb = message_cb;
@@ -1303,6 +1359,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1303 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 1359 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1304 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); 1360 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
1305 1361
1362 slv->join_cb = slave_joined_cb;
1306 return slv; 1363 return slv;
1307} 1364}
1308 1365
@@ -1328,9 +1385,8 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1328 * 1385 *
1329 * @param slave Slave handle. 1386 * @param slave Slave handle.
1330 * @param method_name Which (PSYC) method should be invoked (on host). 1387 * @param method_name Which (PSYC) method should be invoked (on host).
1331 * @param env Environment containing transient variables for the message, or 1388 * @param notify_mod Function to call to obtain modifiers.
1332 * NULL. 1389 * @param notify_data Function to call to obtain fragments of the data.
1333 * @param notify Function to call when we are allowed to transmit (to get data).
1334 * @param notify_cls Closure for @a notify. 1390 * @param notify_cls Closure for @a notify.
1335 * @param flags Flags for the message being transmitted. 1391 * @param flags Flags for the message being transmitted.
1336 * @return Transmission handle, NULL on error (i.e. more than one request 1392 * @return Transmission handle, NULL on error (i.e. more than one request
@@ -1339,12 +1395,14 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1339struct GNUNET_PSYC_SlaveTransmitHandle * 1395struct GNUNET_PSYC_SlaveTransmitHandle *
1340GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, 1396GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
1341 const char *method_name, 1397 const char *method_name,
1342 const struct GNUNET_ENV_Environment *env, 1398 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1343 GNUNET_PSYC_SlaveTransmitNotify notify, 1399 GNUNET_PSYC_TransmitNotifyData notify_data,
1344 void *notify_cls, 1400 void *notify_cls,
1345 enum GNUNET_PSYC_SlaveTransmitFlags flags) 1401 enum GNUNET_PSYC_SlaveTransmitFlags flags)
1346{ 1402{
1347 return NULL; 1403 return (struct GNUNET_PSYC_SlaveTransmitHandle *)
1404 channel_transmit (&slave->ch, method_name,
1405 notify_mod, notify_data, notify_cls, flags);
1348} 1406}
1349 1407
1350 1408
@@ -1356,7 +1414,7 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
1356void 1414void
1357GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 1415GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1358{ 1416{
1359 1417 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1360} 1418}
1361 1419
1362 1420
@@ -1368,7 +1426,7 @@ GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1368void 1426void
1369GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) 1427GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1370{ 1428{
1371 1429 channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1372} 1430}
1373 1431
1374 1432
@@ -1382,7 +1440,7 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1382struct GNUNET_PSYC_Channel * 1440struct GNUNET_PSYC_Channel *
1383GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) 1441GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1384{ 1442{
1385 return (struct GNUNET_PSYC_Channel *) master; 1443 return &master->ch;
1386} 1444}
1387 1445
1388 1446
@@ -1395,7 +1453,7 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1395struct GNUNET_PSYC_Channel * 1453struct GNUNET_PSYC_Channel *
1396GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) 1454GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
1397{ 1455{
1398 return (struct GNUNET_PSYC_Channel *) slave; 1456 return &slave->ch;
1399} 1457}
1400 1458
1401 1459