diff options
author | Gabor X Toth <*@tg-x.net> | 2014-03-06 23:46:45 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-03-06 23:46:45 +0000 |
commit | 8a0b8a4604526e5f832c4971f9c3b1b48d79bea4 (patch) | |
tree | dfd18a61272a18381fe9ce9b09849a965480a303 /src/psyc/psyc_api.c | |
parent | a21beab58c1d2abc747359a98326f19aaad4e8cd (diff) | |
download | gnunet-8a0b8a4604526e5f832c4971f9c3b1b48d79bea4.tar.gz gnunet-8a0b8a4604526e5f832c4971f9c3b1b48d79bea4.zip |
PSYC: implement slave to master requests, tests, fixes, reorg
Multicast lib: handle member to origin requests.
Keep track of members and origins and call their callbacks when necessary.
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r-- | src/psyc/psyc_api.c | 370 |
1 files changed, 214 insertions, 156 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 20394bbce..8a1c9ffaa 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -48,12 +48,30 @@ struct OperationHandle | |||
48 | struct GNUNET_MessageHeader *msg; | 48 | struct GNUNET_MessageHeader *msg; |
49 | }; | 49 | }; |
50 | 50 | ||
51 | |||
52 | /** | ||
53 | * Handle for a pending PSYC transmission operation. | ||
54 | */ | ||
55 | struct GNUNET_PSYC_ChannelTransmitHandle | ||
56 | { | ||
57 | struct GNUNET_PSYC_Channel *ch; | ||
58 | GNUNET_PSYC_TransmitNotifyModifier notify_mod; | ||
59 | GNUNET_PSYC_TransmitNotifyData notify_data; | ||
60 | void *notify_cls; | ||
61 | enum MessageState state; | ||
62 | }; | ||
63 | |||
51 | /** | 64 | /** |
52 | * Handle to access PSYC channel operations for both the master and slaves. | 65 | * Handle to access PSYC channel operations for both the master and slaves. |
53 | */ | 66 | */ |
54 | struct GNUNET_PSYC_Channel | 67 | struct GNUNET_PSYC_Channel |
55 | { | 68 | { |
56 | /** | 69 | /** |
70 | * Transmission handle; | ||
71 | */ | ||
72 | struct GNUNET_PSYC_ChannelTransmitHandle tmit; | ||
73 | |||
74 | /** | ||
57 | * Configuration to use. | 75 | * Configuration to use. |
58 | */ | 76 | */ |
59 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 77 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
@@ -124,6 +142,11 @@ struct GNUNET_PSYC_Channel | |||
124 | uint64_t recv_message_id; | 142 | uint64_t recv_message_id; |
125 | 143 | ||
126 | /** | 144 | /** |
145 | * Public key of the slave from which a message is being received. | ||
146 | */ | ||
147 | struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key; | ||
148 | |||
149 | /** | ||
127 | * State of the currently being received message from the PSYC service. | 150 | * State of the currently being received message from the PSYC service. |
128 | */ | 151 | */ |
129 | enum MessageState recv_state; | 152 | enum MessageState recv_state; |
@@ -171,27 +194,12 @@ struct GNUNET_PSYC_Channel | |||
171 | 194 | ||
172 | 195 | ||
173 | /** | 196 | /** |
174 | * Handle for a pending PSYC transmission operation. | ||
175 | */ | ||
176 | struct GNUNET_PSYC_MasterTransmitHandle | ||
177 | { | ||
178 | struct GNUNET_PSYC_Master *master; | ||
179 | GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod; | ||
180 | GNUNET_PSYC_MasterTransmitNotify notify_data; | ||
181 | void *notify_cls; | ||
182 | enum MessageState state; | ||
183 | }; | ||
184 | |||
185 | |||
186 | /** | ||
187 | * Handle for the master of a PSYC channel. | 197 | * Handle for the master of a PSYC channel. |
188 | */ | 198 | */ |
189 | struct GNUNET_PSYC_Master | 199 | struct GNUNET_PSYC_Master |
190 | { | 200 | { |
191 | struct GNUNET_PSYC_Channel ch; | 201 | struct GNUNET_PSYC_Channel ch; |
192 | 202 | ||
193 | struct GNUNET_PSYC_MasterTransmitHandle *tmit; | ||
194 | |||
195 | GNUNET_PSYC_MasterStartCallback start_cb; | 203 | GNUNET_PSYC_MasterStartCallback start_cb; |
196 | 204 | ||
197 | uint64_t max_message_id; | 205 | uint64_t max_message_id; |
@@ -204,6 +212,10 @@ struct GNUNET_PSYC_Master | |||
204 | struct GNUNET_PSYC_Slave | 212 | struct GNUNET_PSYC_Slave |
205 | { | 213 | { |
206 | struct GNUNET_PSYC_Channel ch; | 214 | struct GNUNET_PSYC_Channel ch; |
215 | |||
216 | GNUNET_PSYC_SlaveJoinCallback join_cb; | ||
217 | |||
218 | uint64_t max_message_id; | ||
207 | }; | 219 | }; |
208 | 220 | ||
209 | 221 | ||
@@ -251,7 +263,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | |||
251 | 263 | ||
252 | 264 | ||
253 | static void | 265 | static void |
254 | master_transmit_data (struct GNUNET_PSYC_Master *mst); | 266 | channel_transmit_data (struct GNUNET_PSYC_Channel *ch); |
255 | 267 | ||
256 | 268 | ||
257 | /** | 269 | /** |
@@ -302,7 +314,8 @@ recv_reset (struct GNUNET_PSYC_Channel *ch) | |||
302 | ch->recv_state = MSG_STATE_START; | 314 | ch->recv_state = MSG_STATE_START; |
303 | ch->recv_flags = 0; | 315 | ch->recv_flags = 0; |
304 | ch->recv_message_id = 0; | 316 | ch->recv_message_id = 0; |
305 | ch->recv_mod_value_size =0; | 317 | //FIXME: ch->recv_slave_key = { 0 }; |
318 | ch->recv_mod_value_size = 0; | ||
306 | ch->recv_mod_value_size_expected = 0; | 319 | ch->recv_mod_value_size_expected = 0; |
307 | } | 320 | } |
308 | 321 | ||
@@ -379,8 +392,9 @@ queue_message (struct GNUNET_PSYC_Channel *ch, | |||
379 | } | 392 | } |
380 | 393 | ||
381 | if (NULL != op | 394 | if (NULL != op |
382 | && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD | 395 | && (GNUNET_YES == end |
383 | < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) | 396 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD |
397 | < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) | ||
384 | { | 398 | { |
385 | /* End of message or buffer is full, add it to transmission queue. */ | 399 | /* End of message or buffer is full, add it to transmission queue. */ |
386 | op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | 400 | op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); |
@@ -390,6 +404,9 @@ queue_message (struct GNUNET_PSYC_Channel *ch, | |||
390 | ch->tmit_ack_pending++; | 404 | ch->tmit_ack_pending++; |
391 | } | 405 | } |
392 | 406 | ||
407 | if (GNUNET_YES == end) | ||
408 | ch->in_transmit = GNUNET_NO; | ||
409 | |||
393 | transmit_next (ch); | 410 | transmit_next (ch); |
394 | } | 411 | } |
395 | 412 | ||
@@ -400,15 +417,14 @@ queue_message (struct GNUNET_PSYC_Channel *ch, | |||
400 | * @param mst Master handle. | 417 | * @param mst Master handle. |
401 | */ | 418 | */ |
402 | static void | 419 | static void |
403 | master_transmit_mod (struct GNUNET_PSYC_Master *mst) | 420 | channel_transmit_mod (struct GNUNET_PSYC_Channel *ch) |
404 | { | 421 | { |
405 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | ||
406 | uint16_t max_data_size, data_size; | 422 | uint16_t max_data_size, data_size; |
407 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | 423 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; |
408 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; | 424 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; |
409 | int notify_ret; | 425 | int notify_ret; |
410 | 426 | ||
411 | switch (mst->tmit->state) | 427 | switch (ch->tmit.state) |
412 | { | 428 | { |
413 | case MSG_STATE_MODIFIER: | 429 | case MSG_STATE_MODIFIER: |
414 | { | 430 | { |
@@ -417,12 +433,11 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) | |||
417 | max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; | 433 | max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; |
418 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | 434 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); |
419 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); | 435 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); |
420 | notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, | 436 | notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, |
421 | &data_size, &mod[1], &mod->oper); | 437 | &data_size, &mod[1], &mod->oper); |
422 | mod->name_size = strnlen ((char *) &mod[1], data_size); | 438 | mod->name_size = strnlen ((char *) &mod[1], data_size); |
423 | if (mod->name_size < data_size) | 439 | if (mod->name_size < data_size) |
424 | { | 440 | { |
425 | mod->oper = htons (mod->oper); | ||
426 | mod->value_size = htons (data_size - 1 - mod->name_size); | 441 | mod->value_size = htons (data_size - 1 - mod->name_size); |
427 | mod->name_size = htons (mod->name_size); | 442 | mod->name_size = htons (mod->name_size); |
428 | } | 443 | } |
@@ -438,8 +453,8 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) | |||
438 | max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; | 453 | max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; |
439 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | 454 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); |
440 | msg->size = sizeof (struct GNUNET_MessageHeader); | 455 | msg->size = sizeof (struct GNUNET_MessageHeader); |
441 | notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, | 456 | notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, |
442 | &data_size, &msg[1], NULL); | 457 | &data_size, &msg[1], NULL); |
443 | break; | 458 | break; |
444 | } | 459 | } |
445 | default: | 460 | default: |
@@ -454,27 +469,28 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) | |||
454 | ch->tmit_paused = GNUNET_YES; | 469 | ch->tmit_paused = GNUNET_YES; |
455 | return; | 470 | return; |
456 | } | 471 | } |
457 | mst->tmit->state = MSG_STATE_MOD_CONT; | 472 | ch->tmit.state = MSG_STATE_MOD_CONT; |
458 | break; | 473 | break; |
459 | 474 | ||
460 | case GNUNET_YES: | 475 | case GNUNET_YES: |
461 | if (0 == data_size) | 476 | if (0 == data_size) |
462 | { | 477 | { |
463 | /* End of modifiers. */ | 478 | /* End of modifiers. */ |
464 | mst->tmit->state = MSG_STATE_DATA; | 479 | ch->tmit.state = MSG_STATE_DATA; |
465 | if (0 == ch->tmit_ack_pending) | 480 | if (0 == ch->tmit_ack_pending) |
466 | master_transmit_data (mst); | 481 | channel_transmit_data (ch); |
467 | 482 | ||
468 | return; | 483 | return; |
469 | } | 484 | } |
470 | mst->tmit->state = MSG_STATE_MODIFIER; | 485 | ch->tmit.state = MSG_STATE_MODIFIER; |
471 | break; | 486 | break; |
472 | 487 | ||
473 | default: | 488 | default: |
474 | LOG (GNUNET_ERROR_TYPE_ERROR, | 489 | LOG (GNUNET_ERROR_TYPE_ERROR, |
475 | "MasterTransmitNotify returned error when requesting a modifier.\n"); | 490 | "MasterTransmitNotifyModifier returned error " |
491 | "when requesting a modifier.\n"); | ||
476 | 492 | ||
477 | mst->tmit->state = MSG_STATE_START; | 493 | ch->tmit.state = MSG_STATE_CANCEL; |
478 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 494 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
479 | msg->size = htons (sizeof (*msg)); | 495 | msg->size = htons (sizeof (*msg)); |
480 | 496 | ||
@@ -489,7 +505,7 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) | |||
489 | queue_message (ch, msg, GNUNET_NO); | 505 | queue_message (ch, msg, GNUNET_NO); |
490 | } | 506 | } |
491 | 507 | ||
492 | master_transmit_mod (mst); | 508 | channel_transmit_mod (ch); |
493 | } | 509 | } |
494 | 510 | ||
495 | 511 | ||
@@ -499,17 +515,16 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) | |||
499 | * @param mst Master handle. | 515 | * @param mst Master handle. |
500 | */ | 516 | */ |
501 | static void | 517 | static void |
502 | master_transmit_data (struct GNUNET_PSYC_Master *mst) | 518 | channel_transmit_data (struct GNUNET_PSYC_Channel *ch) |
503 | { | 519 | { |
504 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | ||
505 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | 520 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; |
506 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | 521 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; |
507 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; | 522 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; |
508 | 523 | ||
509 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | 524 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); |
510 | 525 | ||
511 | int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, | 526 | int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls, |
512 | &data_size, &msg[1]); | 527 | &data_size, &msg[1]); |
513 | switch (notify_ret) | 528 | switch (notify_ret) |
514 | { | 529 | { |
515 | case GNUNET_NO: | 530 | case GNUNET_NO: |
@@ -522,14 +537,14 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
522 | break; | 537 | break; |
523 | 538 | ||
524 | case GNUNET_YES: | 539 | case GNUNET_YES: |
525 | mst->tmit->state = MSG_STATE_START; | 540 | ch->tmit.state = MSG_STATE_END; |
526 | break; | 541 | break; |
527 | 542 | ||
528 | default: | 543 | default: |
529 | LOG (GNUNET_ERROR_TYPE_ERROR, | 544 | LOG (GNUNET_ERROR_TYPE_ERROR, |
530 | "MasterTransmitNotify returned error when requesting data.\n"); | 545 | "MasterTransmitNotify returned error when requesting data.\n"); |
531 | 546 | ||
532 | mst->tmit->state = MSG_STATE_START; | 547 | ch->tmit.state = MSG_STATE_CANCEL; |
533 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 548 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
534 | msg->size = htons (sizeof (*msg)); | 549 | msg->size = htons (sizeof (*msg)); |
535 | queue_message (ch, msg, GNUNET_YES); | 550 | queue_message (ch, msg, GNUNET_YES); |
@@ -554,6 +569,86 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
554 | 569 | ||
555 | 570 | ||
556 | /** | 571 | /** |
572 | * Send a message to a channel. | ||
573 | * | ||
574 | * @param ch Handle to the PSYC channel. | ||
575 | * @param method_name Which method should be invoked. | ||
576 | * @param notify_mod Function to call to obtain modifiers. | ||
577 | * @param notify_data Function to call to obtain fragments of the data. | ||
578 | * @param notify_cls Closure for @a notify_mod and @a notify_data. | ||
579 | * @param flags Flags for the message being transmitted. | ||
580 | * @return Transmission handle, NULL on error (i.e. more than one request queued). | ||
581 | */ | ||
582 | static struct GNUNET_PSYC_ChannelTransmitHandle * | ||
583 | channel_transmit (struct GNUNET_PSYC_Channel *ch, | ||
584 | const char *method_name, | ||
585 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, | ||
586 | GNUNET_PSYC_TransmitNotifyData notify_data, | ||
587 | void *notify_cls, | ||
588 | uint32_t flags) | ||
589 | { | ||
590 | if (GNUNET_NO != ch->in_transmit) | ||
591 | return NULL; | ||
592 | ch->in_transmit = GNUNET_YES; | ||
593 | |||
594 | size_t size = strlen (method_name) + 1; | ||
595 | struct GNUNET_PSYC_MessageMethod *pmeth; | ||
596 | struct OperationHandle *op; | ||
597 | |||
598 | ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) | ||
599 | + sizeof (*pmeth) + size); | ||
600 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
601 | op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size; | ||
602 | |||
603 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1]; | ||
604 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | ||
605 | pmeth->header.size = htons (sizeof (*pmeth) + size); | ||
606 | pmeth->flags = htonl (flags); | ||
607 | memcpy (&pmeth[1], method_name, size); | ||
608 | |||
609 | ch->tmit.ch = ch; | ||
610 | ch->tmit.notify_mod = notify_mod; | ||
611 | ch->tmit.notify_data = notify_data; | ||
612 | ch->tmit.notify_cls = notify_cls; | ||
613 | ch->tmit.state = MSG_STATE_MODIFIER; | ||
614 | |||
615 | channel_transmit_mod (ch); | ||
616 | return &ch->tmit; | ||
617 | } | ||
618 | |||
619 | |||
620 | /** | ||
621 | * Resume transmission to the channel. | ||
622 | * | ||
623 | * @param th Handle of the request that is being resumed. | ||
624 | */ | ||
625 | static void | ||
626 | channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th) | ||
627 | { | ||
628 | struct GNUNET_PSYC_Channel *ch = th->ch; | ||
629 | if (0 == ch->tmit_ack_pending) | ||
630 | { | ||
631 | ch->tmit_paused = GNUNET_NO; | ||
632 | channel_transmit_data (ch); | ||
633 | } | ||
634 | } | ||
635 | |||
636 | |||
637 | /** | ||
638 | * Abort transmission request to channel. | ||
639 | * | ||
640 | * @param th Handle of the request that is being aborted. | ||
641 | */ | ||
642 | static void | ||
643 | channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th) | ||
644 | { | ||
645 | struct GNUNET_PSYC_Channel *ch = th->ch; | ||
646 | if (GNUNET_NO == ch->in_transmit) | ||
647 | return; | ||
648 | } | ||
649 | |||
650 | |||
651 | /** | ||
557 | * Handle incoming message from the PSYC service. | 652 | * Handle incoming message from the PSYC service. |
558 | * | 653 | * |
559 | * @param ch The channel the message is sent to. | 654 | * @param ch The channel the message is sent to. |
@@ -564,14 +659,20 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
564 | const struct GNUNET_PSYC_MessageHeader *msg) | 659 | const struct GNUNET_PSYC_MessageHeader *msg) |
565 | { | 660 | { |
566 | uint16_t size = ntohs (msg->header.size); | 661 | uint16_t size = ntohs (msg->header.size); |
662 | uint32_t flags = ntohl (msg->flags); | ||
663 | |||
664 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, | ||
665 | (struct GNUNET_MessageHeader *) msg); | ||
567 | 666 | ||
568 | if (MSG_STATE_START == ch->recv_state) | 667 | if (MSG_STATE_START == ch->recv_state) |
569 | { | 668 | { |
570 | ch->recv_message_id = GNUNET_ntohll (msg->message_id); | 669 | ch->recv_message_id = GNUNET_ntohll (msg->message_id); |
571 | ch->recv_flags = ntohl (msg->flags); | 670 | ch->recv_flags = flags; |
671 | ch->recv_slave_key = msg->slave_key; | ||
572 | } | 672 | } |
573 | else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) | 673 | else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) |
574 | { | 674 | { |
675 | // FIXME | ||
575 | LOG (GNUNET_ERROR_TYPE_WARNING, | 676 | LOG (GNUNET_ERROR_TYPE_WARNING, |
576 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", | 677 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", |
577 | GNUNET_ntohll (msg->message_id), ch->recv_message_id); | 678 | GNUNET_ntohll (msg->message_id), ch->recv_message_id); |
@@ -579,11 +680,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
579 | recv_error (ch); | 680 | recv_error (ch); |
580 | return; | 681 | return; |
581 | } | 682 | } |
582 | else if (ntohl (msg->flags) != ch->recv_flags) | 683 | else if (flags != ch->recv_flags) |
583 | { | 684 | { |
584 | LOG (GNUNET_ERROR_TYPE_WARNING, | 685 | LOG (GNUNET_ERROR_TYPE_WARNING, |
585 | "Unexpected message flags. Got: %lu, expected: %lu\n", | 686 | "Unexpected message flags. Got: %lu, expected: %lu\n", |
586 | ntohl (msg->flags), ch->recv_flags); | 687 | flags, ch->recv_flags); |
587 | GNUNET_break_op (0); | 688 | GNUNET_break_op (0); |
588 | recv_error (ch); | 689 | recv_error (ch); |
589 | return; | 690 | return; |
@@ -599,10 +700,6 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
599 | ptype = ntohs (pmsg->type); | 700 | ptype = ntohs (pmsg->type); |
600 | size_eq = size_min = 0; | 701 | size_eq = size_min = 0; |
601 | 702 | ||
602 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
603 | "Received message part of type %u and size %u from PSYC.\n", | ||
604 | ptype, psize); | ||
605 | |||
606 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) | 703 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) |
607 | { | 704 | { |
608 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 705 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
@@ -612,6 +709,10 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
612 | return; | 709 | return; |
613 | } | 710 | } |
614 | 711 | ||
712 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
713 | "Received message part from PSYC.\n"); | ||
714 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); | ||
715 | |||
615 | switch (ptype) | 716 | switch (ptype) |
616 | { | 717 | { |
617 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 718 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: |
@@ -758,6 +859,46 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
758 | 859 | ||
759 | 860 | ||
760 | /** | 861 | /** |
862 | * Handle incoming message acknowledgement from the PSYC service. | ||
863 | * | ||
864 | * @param ch The channel the acknowledgement is sent to. | ||
865 | */ | ||
866 | static void | ||
867 | handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch) | ||
868 | { | ||
869 | if (0 == ch->tmit_ack_pending) | ||
870 | { | ||
871 | LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); | ||
872 | GNUNET_break (0); | ||
873 | return; | ||
874 | } | ||
875 | ch->tmit_ack_pending--; | ||
876 | |||
877 | switch (ch->tmit.state) | ||
878 | { | ||
879 | case MSG_STATE_MODIFIER: | ||
880 | case MSG_STATE_MOD_CONT: | ||
881 | if (GNUNET_NO == ch->tmit_paused) | ||
882 | channel_transmit_mod (ch); | ||
883 | break; | ||
884 | |||
885 | case MSG_STATE_DATA: | ||
886 | if (GNUNET_NO == ch->tmit_paused) | ||
887 | channel_transmit_data (ch); | ||
888 | break; | ||
889 | |||
890 | case MSG_STATE_END: | ||
891 | case MSG_STATE_CANCEL: | ||
892 | break; | ||
893 | |||
894 | default: | ||
895 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
896 | "Ignoring message ACK in state %u.\n", ch->tmit.state); | ||
897 | } | ||
898 | } | ||
899 | |||
900 | |||
901 | /** | ||
761 | * Type of a function to call when we receive a message | 902 | * Type of a function to call when we receive a message |
762 | * from the service. | 903 | * from the service. |
763 | * | 904 | * |
@@ -775,7 +916,7 @@ message_handler (void *cls, | |||
775 | 916 | ||
776 | if (NULL == msg) | 917 | if (NULL == msg) |
777 | { | 918 | { |
778 | GNUNET_break (0); | 919 | // timeout / disconnected from server, reconnect |
779 | reschedule_connect (ch); | 920 | reschedule_connect (ch); |
780 | return; | 921 | return; |
781 | } | 922 | } |
@@ -824,63 +965,15 @@ message_handler (void *cls, | |||
824 | } | 965 | } |
825 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: | 966 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: |
826 | { | 967 | { |
827 | #if TODO | ||
828 | struct CountersResult *cres = (struct CountersResult *) msg; | 968 | struct CountersResult *cres = (struct CountersResult *) msg; |
829 | slv->max_message_id = GNUNET_ntohll (cres->max_message_id); | 969 | slv->max_message_id = GNUNET_ntohll (cres->max_message_id); |
830 | if (NULL != slv->join_ack_cb) | 970 | if (NULL != slv->join_cb) |
831 | mst->join_ack_cb (ch->cb_cls, mst->max_message_id); | 971 | slv->join_cb (ch->cb_cls, slv->max_message_id); |
832 | #endif | ||
833 | break; | 972 | break; |
834 | } | 973 | } |
835 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: | 974 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: |
836 | { | 975 | { |
837 | if (0 == ch->tmit_ack_pending) | 976 | handle_psyc_message_ack (ch); |
838 | { | ||
839 | LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); | ||
840 | GNUNET_break (0); | ||
841 | break; | ||
842 | } | ||
843 | ch->tmit_ack_pending--; | ||
844 | |||
845 | if (ch->is_master) | ||
846 | { | ||
847 | GNUNET_assert (NULL != mst->tmit); | ||
848 | switch (mst->tmit->state) | ||
849 | { | ||
850 | case MSG_STATE_MODIFIER: | ||
851 | case MSG_STATE_MOD_CONT: | ||
852 | if (GNUNET_NO == ch->tmit_paused) | ||
853 | master_transmit_mod (mst); | ||
854 | break; | ||
855 | |||
856 | case MSG_STATE_DATA: | ||
857 | if (GNUNET_NO == ch->tmit_paused) | ||
858 | master_transmit_data (mst); | ||
859 | break; | ||
860 | |||
861 | case MSG_STATE_END: | ||
862 | case MSG_STATE_CANCEL: | ||
863 | if (NULL != mst->tmit) | ||
864 | { | ||
865 | GNUNET_free (mst->tmit); | ||
866 | mst->tmit = NULL; | ||
867 | } | ||
868 | else | ||
869 | { | ||
870 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
871 | "Ignoring message ACK, there's no transmission going on.\n"); | ||
872 | GNUNET_break (0); | ||
873 | } | ||
874 | break; | ||
875 | default: | ||
876 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
877 | "Ignoring message ACK in state %u.\n", mst->tmit->state); | ||
878 | } | ||
879 | } | ||
880 | else | ||
881 | { | ||
882 | /* TODO: slave */ | ||
883 | } | ||
884 | break; | 977 | break; |
885 | } | 978 | } |
886 | 979 | ||
@@ -1106,8 +1199,6 @@ void | |||
1106 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) | 1199 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) |
1107 | { | 1200 | { |
1108 | disconnect (master); | 1201 | disconnect (master); |
1109 | if (NULL != master->tmit) | ||
1110 | GNUNET_free (master->tmit); | ||
1111 | GNUNET_free (master); | 1202 | GNUNET_free (master); |
1112 | } | 1203 | } |
1113 | 1204 | ||
@@ -1162,41 +1253,14 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1162 | struct GNUNET_PSYC_MasterTransmitHandle * | 1253 | struct GNUNET_PSYC_MasterTransmitHandle * |
1163 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | 1254 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, |
1164 | const char *method_name, | 1255 | const char *method_name, |
1165 | GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod, | 1256 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, |
1166 | GNUNET_PSYC_MasterTransmitNotify notify_data, | 1257 | GNUNET_PSYC_TransmitNotifyData notify_data, |
1167 | void *notify_cls, | 1258 | void *notify_cls, |
1168 | enum GNUNET_PSYC_MasterTransmitFlags flags) | 1259 | enum GNUNET_PSYC_MasterTransmitFlags flags) |
1169 | { | 1260 | { |
1170 | GNUNET_assert (NULL != master); | 1261 | return (struct GNUNET_PSYC_MasterTransmitHandle *) |
1171 | struct GNUNET_PSYC_Channel *ch = &master->ch; | 1262 | channel_transmit (&master->ch, method_name, notify_mod, notify_data, |
1172 | if (GNUNET_NO != ch->in_transmit) | 1263 | notify_cls, flags); |
1173 | return NULL; | ||
1174 | ch->in_transmit = GNUNET_YES; | ||
1175 | |||
1176 | size_t size = strlen (method_name) + 1; | ||
1177 | struct GNUNET_PSYC_MessageMethod *pmeth; | ||
1178 | struct OperationHandle *op; | ||
1179 | |||
1180 | ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) | ||
1181 | + sizeof (*pmeth) + size); | ||
1182 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
1183 | op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size; | ||
1184 | |||
1185 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1]; | ||
1186 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | ||
1187 | pmeth->header.size = htons (sizeof (*pmeth) + size); | ||
1188 | pmeth->flags = htonl (flags); | ||
1189 | memcpy (&pmeth[1], method_name, size); | ||
1190 | |||
1191 | master->tmit = GNUNET_malloc (sizeof (*master->tmit)); | ||
1192 | master->tmit->master = master; | ||
1193 | master->tmit->notify_mod = notify_mod; | ||
1194 | master->tmit->notify_data = notify_data; | ||
1195 | master->tmit->notify_cls = notify_cls; | ||
1196 | master->tmit->state = MSG_STATE_MODIFIER; | ||
1197 | |||
1198 | master_transmit_mod (master); | ||
1199 | return master->tmit; | ||
1200 | } | 1264 | } |
1201 | 1265 | ||
1202 | 1266 | ||
@@ -1208,12 +1272,7 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | |||
1208 | void | 1272 | void |
1209 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | 1273 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) |
1210 | { | 1274 | { |
1211 | struct GNUNET_PSYC_Channel *ch = &th->master->ch; | 1275 | channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); |
1212 | if (0 == ch->tmit_ack_pending) | ||
1213 | { | ||
1214 | ch->tmit_paused = GNUNET_NO; | ||
1215 | master_transmit_data (th->master); | ||
1216 | } | ||
1217 | } | 1276 | } |
1218 | 1277 | ||
1219 | 1278 | ||
@@ -1225,10 +1284,7 @@ GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | |||
1225 | void | 1284 | void |
1226 | GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) | 1285 | GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) |
1227 | { | 1286 | { |
1228 | struct GNUNET_PSYC_Master *master = th->master; | 1287 | channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); |
1229 | struct GNUNET_PSYC_Channel *ch = &master->ch; | ||
1230 | if (GNUNET_NO != ch->in_transmit) | ||
1231 | return; | ||
1232 | } | 1288 | } |
1233 | 1289 | ||
1234 | 1290 | ||
@@ -1282,15 +1338,15 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1282 | { | 1338 | { |
1283 | struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); | 1339 | struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); |
1284 | struct GNUNET_PSYC_Channel *ch = &slv->ch; | 1340 | struct GNUNET_PSYC_Channel *ch = &slv->ch; |
1285 | struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req) | 1341 | struct SlaveJoinRequest *req |
1286 | + relay_count * sizeof (*relays)); | 1342 | = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays)); |
1287 | req->header.size = htons (sizeof (*req) | 1343 | req->header.size = htons (sizeof (*req) |
1288 | + relay_count * sizeof (*relays)); | 1344 | + relay_count * sizeof (*relays)); |
1289 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); | 1345 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); |
1290 | req->channel_key = *channel_key; | 1346 | req->channel_key = *channel_key; |
1291 | req->slave_key = *slave_key; | 1347 | req->slave_key = *slave_key; |
1292 | req->origin = *origin; | 1348 | req->origin = *origin; |
1293 | req->relay_count = relay_count; | 1349 | req->relay_count = htonl (relay_count); |
1294 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); | 1350 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); |
1295 | 1351 | ||
1296 | ch->message_cb = message_cb; | 1352 | ch->message_cb = message_cb; |
@@ -1303,6 +1359,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1303 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 1359 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; |
1304 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); | 1360 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); |
1305 | 1361 | ||
1362 | slv->join_cb = slave_joined_cb; | ||
1306 | return slv; | 1363 | return slv; |
1307 | } | 1364 | } |
1308 | 1365 | ||
@@ -1328,9 +1385,8 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) | |||
1328 | * | 1385 | * |
1329 | * @param slave Slave handle. | 1386 | * @param slave Slave handle. |
1330 | * @param method_name Which (PSYC) method should be invoked (on host). | 1387 | * @param method_name Which (PSYC) method should be invoked (on host). |
1331 | * @param env Environment containing transient variables for the message, or | 1388 | * @param notify_mod Function to call to obtain modifiers. |
1332 | * NULL. | 1389 | * @param notify_data Function to call to obtain fragments of the data. |
1333 | * @param notify Function to call when we are allowed to transmit (to get data). | ||
1334 | * @param notify_cls Closure for @a notify. | 1390 | * @param notify_cls Closure for @a notify. |
1335 | * @param flags Flags for the message being transmitted. | 1391 | * @param flags Flags for the message being transmitted. |
1336 | * @return Transmission handle, NULL on error (i.e. more than one request | 1392 | * @return Transmission handle, NULL on error (i.e. more than one request |
@@ -1339,12 +1395,14 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) | |||
1339 | struct GNUNET_PSYC_SlaveTransmitHandle * | 1395 | struct GNUNET_PSYC_SlaveTransmitHandle * |
1340 | GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, | 1396 | GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, |
1341 | const char *method_name, | 1397 | const char *method_name, |
1342 | const struct GNUNET_ENV_Environment *env, | 1398 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, |
1343 | GNUNET_PSYC_SlaveTransmitNotify notify, | 1399 | GNUNET_PSYC_TransmitNotifyData notify_data, |
1344 | void *notify_cls, | 1400 | void *notify_cls, |
1345 | enum GNUNET_PSYC_SlaveTransmitFlags flags) | 1401 | enum GNUNET_PSYC_SlaveTransmitFlags flags) |
1346 | { | 1402 | { |
1347 | return NULL; | 1403 | return (struct GNUNET_PSYC_SlaveTransmitHandle *) |
1404 | channel_transmit (&slave->ch, method_name, | ||
1405 | notify_mod, notify_data, notify_cls, flags); | ||
1348 | } | 1406 | } |
1349 | 1407 | ||
1350 | 1408 | ||
@@ -1356,7 +1414,7 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, | |||
1356 | void | 1414 | void |
1357 | GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | 1415 | GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) |
1358 | { | 1416 | { |
1359 | 1417 | channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | |
1360 | } | 1418 | } |
1361 | 1419 | ||
1362 | 1420 | ||
@@ -1368,7 +1426,7 @@ GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | |||
1368 | void | 1426 | void |
1369 | GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) | 1427 | GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) |
1370 | { | 1428 | { |
1371 | 1429 | channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | |
1372 | } | 1430 | } |
1373 | 1431 | ||
1374 | 1432 | ||
@@ -1382,7 +1440,7 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) | |||
1382 | struct GNUNET_PSYC_Channel * | 1440 | struct GNUNET_PSYC_Channel * |
1383 | GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) | 1441 | GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) |
1384 | { | 1442 | { |
1385 | return (struct GNUNET_PSYC_Channel *) master; | 1443 | return &master->ch; |
1386 | } | 1444 | } |
1387 | 1445 | ||
1388 | 1446 | ||
@@ -1395,7 +1453,7 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) | |||
1395 | struct GNUNET_PSYC_Channel * | 1453 | struct GNUNET_PSYC_Channel * |
1396 | GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) | 1454 | GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) |
1397 | { | 1455 | { |
1398 | return (struct GNUNET_PSYC_Channel *) slave; | 1456 | return &slave->ch; |
1399 | } | 1457 | } |
1400 | 1458 | ||
1401 | 1459 | ||