diff options
author | Gabor X Toth <*@tg-x.net> | 2014-05-29 16:35:55 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-05-29 16:35:55 +0000 |
commit | 9955561e1b204ccf23fbf841f409bd3ef79be88c (patch) | |
tree | 0271c23ae9f1dad72266a0e6073d696e5afca027 /src/psyc | |
parent | a5877668ba805c5e0efe622e6ce4c58ff5609bf9 (diff) | |
download | gnunet-9955561e1b204ccf23fbf841f409bd3ef79be88c.tar.gz gnunet-9955561e1b204ccf23fbf841f409bd3ef79be88c.zip |
psyc, multicast: reorg code, use new client manager & psyc util lib
Diffstat (limited to 'src/psyc')
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 791 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 1295 | ||||
-rw-r--r-- | src/psyc/psyc_util_lib.c | 1 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 129 |
4 files changed, 674 insertions, 1542 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 13f908b6c..9dcf40e0f 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -34,6 +34,7 @@ | |||
34 | #include "gnunet_multicast_service.h" | 34 | #include "gnunet_multicast_service.h" |
35 | #include "gnunet_psycstore_service.h" | 35 | #include "gnunet_psycstore_service.h" |
36 | #include "gnunet_psyc_service.h" | 36 | #include "gnunet_psyc_service.h" |
37 | #include "gnunet_psyc_util_lib.h" | ||
37 | #include "psyc.h" | 38 | #include "psyc.h" |
38 | 39 | ||
39 | 40 | ||
@@ -174,10 +175,10 @@ struct FragmentQueue | |||
174 | /** | 175 | /** |
175 | * List of connected clients. | 176 | * List of connected clients. |
176 | */ | 177 | */ |
177 | struct ClientList | 178 | struct ClientListItem |
178 | { | 179 | { |
179 | struct ClientList *prev; | 180 | struct ClientListItem *prev; |
180 | struct ClientList *next; | 181 | struct ClientListItem *next; |
181 | struct GNUNET_SERVER_Client *client; | 182 | struct GNUNET_SERVER_Client *client; |
182 | }; | 183 | }; |
183 | 184 | ||
@@ -187,8 +188,8 @@ struct ClientList | |||
187 | */ | 188 | */ |
188 | struct Channel | 189 | struct Channel |
189 | { | 190 | { |
190 | struct ClientList *clients_head; | 191 | struct ClientListItem *clients_head; |
191 | struct ClientList *clients_tail; | 192 | struct ClientListItem *clients_tail; |
192 | 193 | ||
193 | struct TransmitMessage *tmit_head; | 194 | struct TransmitMessage *tmit_head; |
194 | struct TransmitMessage *tmit_tail; | 195 | struct TransmitMessage *tmit_tail; |
@@ -282,7 +283,7 @@ struct Master | |||
282 | /** | 283 | /** |
283 | * Channel struct common for Master and Slave | 284 | * Channel struct common for Master and Slave |
284 | */ | 285 | */ |
285 | struct Channel ch; | 286 | struct Channel chn; |
286 | 287 | ||
287 | /** | 288 | /** |
288 | * Private key of the channel. | 289 | * Private key of the channel. |
@@ -339,7 +340,7 @@ struct Slave | |||
339 | /** | 340 | /** |
340 | * Channel struct common for Master and Slave | 341 | * Channel struct common for Master and Slave |
341 | */ | 342 | */ |
342 | struct Channel ch; | 343 | struct Channel chn; |
343 | 344 | ||
344 | /** | 345 | /** |
345 | * Private key of the slave. | 346 | * Private key of the slave. |
@@ -399,11 +400,11 @@ struct Slave | |||
399 | 400 | ||
400 | 401 | ||
401 | static inline void | 402 | static inline void |
402 | transmit_message (struct Channel *ch); | 403 | transmit_message (struct Channel *chn); |
403 | 404 | ||
404 | 405 | ||
405 | static uint64_t | 406 | static uint64_t |
406 | message_queue_drop (struct Channel *ch); | 407 | message_queue_drop (struct Channel *chn); |
407 | 408 | ||
408 | 409 | ||
409 | /** | 410 | /** |
@@ -434,12 +435,12 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
434 | static void | 435 | static void |
435 | cleanup_master (struct Master *mst) | 436 | cleanup_master (struct Master *mst) |
436 | { | 437 | { |
437 | struct Channel *ch = &mst->ch; | 438 | struct Channel *chn = &mst->chn; |
438 | 439 | ||
439 | if (NULL != mst->origin) | 440 | if (NULL != mst->origin) |
440 | GNUNET_MULTICAST_origin_stop (mst->origin); | 441 | GNUNET_MULTICAST_origin_stop (mst->origin); |
441 | GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); | 442 | GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); |
442 | GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch); | 443 | GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn); |
443 | } | 444 | } |
444 | 445 | ||
445 | 446 | ||
@@ -449,20 +450,20 @@ cleanup_master (struct Master *mst) | |||
449 | static void | 450 | static void |
450 | cleanup_slave (struct Slave *slv) | 451 | cleanup_slave (struct Slave *slv) |
451 | { | 452 | { |
452 | struct Channel *ch = &slv->ch; | 453 | struct Channel *chn = &slv->chn; |
453 | struct GNUNET_CONTAINER_MultiHashMap * | 454 | struct GNUNET_CONTAINER_MultiHashMap * |
454 | ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, | 455 | chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, |
455 | &ch->pub_key_hash); | 456 | &chn->pub_key_hash); |
456 | GNUNET_assert (NULL != ch_slv); | 457 | GNUNET_assert (NULL != chn_slv); |
457 | GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv); | 458 | GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv); |
458 | 459 | ||
459 | if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv)) | 460 | if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv)) |
460 | { | 461 | { |
461 | GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash, | 462 | GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash, |
462 | ch_slv); | 463 | chn_slv); |
463 | GNUNET_CONTAINER_multihashmap_destroy (ch_slv); | 464 | GNUNET_CONTAINER_multihashmap_destroy (chn_slv); |
464 | } | 465 | } |
465 | GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv); | 466 | GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); |
466 | 467 | ||
467 | if (NULL != slv->join_req) | 468 | if (NULL != slv->join_req) |
468 | GNUNET_free (slv->join_req); | 469 | GNUNET_free (slv->join_req); |
@@ -470,7 +471,7 @@ cleanup_slave (struct Slave *slv) | |||
470 | GNUNET_free (slv->relays); | 471 | GNUNET_free (slv->relays); |
471 | if (NULL != slv->member) | 472 | if (NULL != slv->member) |
472 | GNUNET_MULTICAST_member_part (slv->member); | 473 | GNUNET_MULTICAST_member_part (slv->member); |
473 | GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch); | 474 | GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn); |
474 | } | 475 | } |
475 | 476 | ||
476 | 477 | ||
@@ -478,18 +479,18 @@ cleanup_slave (struct Slave *slv) | |||
478 | * Clean up channel data structures after a client disconnected. | 479 | * Clean up channel data structures after a client disconnected. |
479 | */ | 480 | */ |
480 | static void | 481 | static void |
481 | cleanup_channel (struct Channel *ch) | 482 | cleanup_channel (struct Channel *chn) |
482 | { | 483 | { |
483 | message_queue_drop (ch); | 484 | message_queue_drop (chn); |
484 | GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &ch->pub_key_hash); | 485 | GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash); |
485 | 486 | ||
486 | if (NULL != ch->store_op) | 487 | if (NULL != chn->store_op) |
487 | GNUNET_PSYCSTORE_operation_cancel (ch->store_op); | 488 | GNUNET_PSYCSTORE_operation_cancel (chn->store_op); |
488 | 489 | ||
489 | (GNUNET_YES == ch->is_master) | 490 | (GNUNET_YES == chn->is_master) |
490 | ? cleanup_master ((struct Master *) ch) | 491 | ? cleanup_master ((struct Master *) chn) |
491 | : cleanup_slave ((struct Slave *) ch); | 492 | : cleanup_slave ((struct Slave *) chn); |
492 | GNUNET_free (ch); | 493 | GNUNET_free (chn); |
493 | } | 494 | } |
494 | 495 | ||
495 | 496 | ||
@@ -507,41 +508,41 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
507 | return; | 508 | return; |
508 | 509 | ||
509 | struct Channel * | 510 | struct Channel * |
510 | ch = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 511 | chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
511 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
512 | "%p Client (%s) disconnected from channel %s\n", | ||
513 | ch, (GNUNET_YES == ch->is_master) ? "master" : "slave", | ||
514 | GNUNET_h2s (&ch->pub_key_hash)); | ||
515 | 512 | ||
516 | if (NULL == ch) | 513 | if (NULL == chn) |
517 | { | 514 | { |
518 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 515 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
519 | "%p User context is NULL in client_disconnect()\n", ch); | 516 | "%p User context is NULL in client_disconnect()\n", chn); |
520 | GNUNET_break (0); | ||
521 | return; | 517 | return; |
522 | } | 518 | } |
523 | 519 | ||
524 | struct ClientList *cl = ch->clients_head; | 520 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
525 | while (NULL != cl) | 521 | "%p Client (%s) disconnected from channel %s\n", |
522 | chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", | ||
523 | GNUNET_h2s (&chn->pub_key_hash)); | ||
524 | |||
525 | struct ClientListItem *cli = chn->clients_head; | ||
526 | while (NULL != cli) | ||
526 | { | 527 | { |
527 | if (cl->client == client) | 528 | if (cli->client == client) |
528 | { | 529 | { |
529 | GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl); | 530 | GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli); |
530 | GNUNET_free (cl); | 531 | GNUNET_free (cli); |
531 | break; | 532 | break; |
532 | } | 533 | } |
533 | cl = cl->next; | 534 | cli = cli->next; |
534 | } | 535 | } |
535 | 536 | ||
536 | if (NULL == ch->clients_head) | 537 | if (NULL == chn->clients_head) |
537 | { /* Last client disconnected. */ | 538 | { /* Last client disconnected. */ |
538 | if (NULL != ch->tmit_head) | 539 | if (NULL != chn->tmit_head) |
539 | { /* Send pending messages to multicast before cleanup. */ | 540 | { /* Send pending messages to multicast before cleanup. */ |
540 | transmit_message (ch); | 541 | transmit_message (chn); |
541 | } | 542 | } |
542 | else | 543 | else |
543 | { | 544 | { |
544 | cleanup_channel (ch); | 545 | cleanup_channel (chn); |
545 | } | 546 | } |
546 | } | 547 | } |
547 | } | 548 | } |
@@ -551,18 +552,18 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
551 | * Send message to all clients connected to the channel. | 552 | * Send message to all clients connected to the channel. |
552 | */ | 553 | */ |
553 | static void | 554 | static void |
554 | msg_to_clients (const struct Channel *ch, | 555 | client_send_msg (const struct Channel *chn, |
555 | const struct GNUNET_MessageHeader *msg) | 556 | const struct GNUNET_MessageHeader *msg) |
556 | { | 557 | { |
557 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 558 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
558 | "%p Sending message to clients.\n", ch); | 559 | "%p Sending message to clients.\n", chn); |
559 | 560 | ||
560 | struct ClientList *cl = ch->clients_head; | 561 | struct ClientListItem *cli = chn->clients_head; |
561 | while (NULL != cl) | 562 | while (NULL != cli) |
562 | { | 563 | { |
563 | GNUNET_SERVER_notification_context_add (nc, cl->client); | 564 | GNUNET_SERVER_notification_context_add (nc, cli->client); |
564 | GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO); | 565 | GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO); |
565 | cl = cl->next; | 566 | cli = cli->next; |
566 | } | 567 | } |
567 | } | 568 | } |
568 | 569 | ||
@@ -573,7 +574,7 @@ msg_to_clients (const struct Channel *ch, | |||
573 | struct JoinMemTestClosure | 574 | struct JoinMemTestClosure |
574 | { | 575 | { |
575 | struct GNUNET_CRYPTO_EddsaPublicKey slave_key; | 576 | struct GNUNET_CRYPTO_EddsaPublicKey slave_key; |
576 | struct Channel *ch; | 577 | struct Channel *chn; |
577 | struct GNUNET_MULTICAST_JoinHandle *jh; | 578 | struct GNUNET_MULTICAST_JoinHandle *jh; |
578 | struct MasterJoinRequest *master_join_req; | 579 | struct MasterJoinRequest *master_join_req; |
579 | }; | 580 | }; |
@@ -587,15 +588,15 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg) | |||
587 | { | 588 | { |
588 | struct JoinMemTestClosure *jcls = cls; | 589 | struct JoinMemTestClosure *jcls = cls; |
589 | 590 | ||
590 | if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master) | 591 | if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master) |
591 | { /* Pass on join request to client if this is a master channel */ | 592 | { /* Pass on join request to client if this is a master channel */ |
592 | struct Master *mst = (struct Master *) jcls->ch; | 593 | struct Master *mst = (struct Master *) jcls->chn; |
593 | struct GNUNET_HashCode slave_key_hash; | 594 | struct GNUNET_HashCode slave_key_hash; |
594 | GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key), | 595 | GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key), |
595 | &slave_key_hash); | 596 | &slave_key_hash); |
596 | GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh, | 597 | GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh, |
597 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 598 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
598 | msg_to_clients (jcls->ch, &jcls->master_join_req->header); | 599 | client_send_msg (jcls->chn, &jcls->master_join_req->header); |
599 | } | 600 | } |
600 | else | 601 | else |
601 | { | 602 | { |
@@ -611,13 +612,13 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg) | |||
611 | * Incoming join request from multicast. | 612 | * Incoming join request from multicast. |
612 | */ | 613 | */ |
613 | static void | 614 | static void |
614 | mcast_join_request_cb (void *cls, | 615 | mcast_recv_join_request (void *cls, |
615 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | 616 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, |
616 | const struct GNUNET_MessageHeader *join_msg, | 617 | const struct GNUNET_MessageHeader *join_msg, |
617 | struct GNUNET_MULTICAST_JoinHandle *jh) | 618 | struct GNUNET_MULTICAST_JoinHandle *jh) |
618 | { | 619 | { |
619 | struct Channel *ch = cls; | 620 | struct Channel *chn = cls; |
620 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch); | 621 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn); |
621 | 622 | ||
622 | uint16_t join_msg_size = 0; | 623 | uint16_t join_msg_size = 0; |
623 | if (NULL != join_msg) | 624 | if (NULL != join_msg) |
@@ -630,7 +631,7 @@ mcast_join_request_cb (void *cls, | |||
630 | { | 631 | { |
631 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 632 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
632 | "%p Got join message with invalid type %u.\n", | 633 | "%p Got join message with invalid type %u.\n", |
633 | ch, ntohs (join_msg->type)); | 634 | chn, ntohs (join_msg->type)); |
634 | } | 635 | } |
635 | } | 636 | } |
636 | 637 | ||
@@ -643,12 +644,12 @@ mcast_join_request_cb (void *cls, | |||
643 | 644 | ||
644 | struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls)); | 645 | struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls)); |
645 | jcls->slave_key = *slave_key; | 646 | jcls->slave_key = *slave_key; |
646 | jcls->ch = ch; | 647 | jcls->chn = chn; |
647 | jcls->jh = jh; | 648 | jcls->jh = jh; |
648 | jcls->master_join_req = req; | 649 | jcls->master_join_req = req; |
649 | 650 | ||
650 | GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key, | 651 | GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key, |
651 | ch->max_message_id, 0, | 652 | chn->max_message_id, 0, |
652 | &join_mem_test_cb, jcls); | 653 | &join_mem_test_cb, jcls); |
653 | } | 654 | } |
654 | 655 | ||
@@ -657,14 +658,14 @@ mcast_join_request_cb (void *cls, | |||
657 | * Join decision received from multicast. | 658 | * Join decision received from multicast. |
658 | */ | 659 | */ |
659 | static void | 660 | static void |
660 | mcast_join_decision_cb (void *cls, int is_admitted, | 661 | mcast_recv_join_decision (void *cls, int is_admitted, |
661 | const struct GNUNET_PeerIdentity *peer, | 662 | const struct GNUNET_PeerIdentity *peer, |
662 | uint16_t relay_count, | 663 | uint16_t relay_count, |
663 | const struct GNUNET_PeerIdentity *relays, | 664 | const struct GNUNET_PeerIdentity *relays, |
664 | const struct GNUNET_MessageHeader *join_resp) | 665 | const struct GNUNET_MessageHeader *join_resp) |
665 | { | 666 | { |
666 | struct Slave *slv = cls; | 667 | struct Slave *slv = cls; |
667 | struct Channel *ch = &slv->ch; | 668 | struct Channel *chn = &slv->chn; |
668 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 669 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
669 | "%p Got join decision: %d\n", slv, is_admitted); | 670 | "%p Got join decision: %d\n", slv, is_admitted); |
670 | 671 | ||
@@ -677,11 +678,11 @@ mcast_join_decision_cb (void *cls, int is_admitted, | |||
677 | if (0 < join_resp_size) | 678 | if (0 < join_resp_size) |
678 | memcpy (&dcsn[1], join_resp, join_resp_size); | 679 | memcpy (&dcsn[1], join_resp, join_resp_size); |
679 | 680 | ||
680 | msg_to_clients (ch, &dcsn->header); | 681 | client_send_msg (chn, &dcsn->header); |
681 | 682 | ||
682 | if (GNUNET_YES == is_admitted) | 683 | if (GNUNET_YES == is_admitted) |
683 | { | 684 | { |
684 | ch->ready = GNUNET_YES; | 685 | chn->ready = GNUNET_YES; |
685 | } | 686 | } |
686 | else | 687 | else |
687 | { | 688 | { |
@@ -691,20 +692,20 @@ mcast_join_decision_cb (void *cls, int is_admitted, | |||
691 | 692 | ||
692 | 693 | ||
693 | static void | 694 | static void |
694 | mcast_membership_test_cb (void *cls, | 695 | mcast_recv_membership_test (void *cls, |
695 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | 696 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, |
696 | uint64_t message_id, uint64_t group_generation, | 697 | uint64_t message_id, uint64_t group_generation, |
697 | struct GNUNET_MULTICAST_MembershipTestHandle *mth) | 698 | struct GNUNET_MULTICAST_MembershipTestHandle *mth) |
698 | { | 699 | { |
699 | 700 | ||
700 | } | 701 | } |
701 | 702 | ||
702 | 703 | ||
703 | static void | 704 | static void |
704 | mcast_replay_fragment_cb (void *cls, | 705 | mcast_recv_replay_fragment (void *cls, |
705 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | 706 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, |
706 | uint64_t fragment_id, uint64_t flags, | 707 | uint64_t fragment_id, uint64_t flags, |
707 | struct GNUNET_MULTICAST_ReplayHandle *rh) | 708 | struct GNUNET_MULTICAST_ReplayHandle *rh) |
708 | 709 | ||
709 | { | 710 | { |
710 | 711 | ||
@@ -712,25 +713,17 @@ mcast_replay_fragment_cb (void *cls, | |||
712 | 713 | ||
713 | 714 | ||
714 | static void | 715 | static void |
715 | mcast_replay_message_cb (void *cls, | 716 | mcast_recv_replay_message (void *cls, |
716 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | 717 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, |
717 | uint64_t message_id, | 718 | uint64_t message_id, |
718 | uint64_t fragment_offset, | 719 | uint64_t fragment_offset, |
719 | uint64_t flags, | 720 | uint64_t flags, |
720 | struct GNUNET_MULTICAST_ReplayHandle *rh) | 721 | struct GNUNET_MULTICAST_ReplayHandle *rh) |
721 | { | 722 | { |
722 | 723 | ||
723 | } | 724 | } |
724 | 725 | ||
725 | 726 | ||
726 | static void | ||
727 | fragment_store_result (void *cls, int64_t result, const char *err_msg) | ||
728 | { | ||
729 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
730 | "fragment_store() returned %l (%s)\n", result, err_msg); | ||
731 | } | ||
732 | |||
733 | |||
734 | /** | 727 | /** |
735 | * Convert an uint64_t in network byte order to a HashCode | 728 | * Convert an uint64_t in network byte order to a HashCode |
736 | * that can be used as key in a MultiHashMap | 729 | * that can be used as key in a MultiHashMap |
@@ -772,17 +765,17 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) | |||
772 | * Send multicast message to all clients connected to the channel. | 765 | * Send multicast message to all clients connected to the channel. |
773 | */ | 766 | */ |
774 | static void | 767 | static void |
775 | mmsg_to_clients (struct Channel *ch, | 768 | client_send_mcast_msg (struct Channel *chn, |
776 | const struct GNUNET_MULTICAST_MessageHeader *mmsg) | 769 | const struct GNUNET_MULTICAST_MessageHeader *mmsg) |
777 | { | 770 | { |
778 | uint16_t size = ntohs (mmsg->header.size); | ||
779 | struct GNUNET_PSYC_MessageHeader *pmsg; | 771 | struct GNUNET_PSYC_MessageHeader *pmsg; |
772 | uint16_t size = ntohs (mmsg->header.size); | ||
780 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); | 773 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); |
781 | 774 | ||
782 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 775 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
783 | "%p Sending message to client. " | 776 | "%p Sending multicast message to client. " |
784 | "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", | 777 | "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", |
785 | ch, GNUNET_ntohll (mmsg->fragment_id), | 778 | chn, GNUNET_ntohll (mmsg->fragment_id), |
786 | GNUNET_ntohll (mmsg->message_id)); | 779 | GNUNET_ntohll (mmsg->message_id)); |
787 | 780 | ||
788 | pmsg = GNUNET_malloc (psize); | 781 | pmsg = GNUNET_malloc (psize); |
@@ -791,7 +784,38 @@ mmsg_to_clients (struct Channel *ch, | |||
791 | pmsg->message_id = mmsg->message_id; | 784 | pmsg->message_id = mmsg->message_id; |
792 | 785 | ||
793 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); | 786 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); |
794 | msg_to_clients (ch, &pmsg->header); | 787 | client_send_msg (chn, &pmsg->header); |
788 | GNUNET_free (pmsg); | ||
789 | } | ||
790 | |||
791 | |||
792 | /** | ||
793 | * Send multicast request to all clients connected to the channel. | ||
794 | */ | ||
795 | static void | ||
796 | client_send_mcast_req (struct Master *mst, | ||
797 | const struct GNUNET_MULTICAST_RequestHeader *req) | ||
798 | { | ||
799 | struct Channel *chn = &mst->chn; | ||
800 | |||
801 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
802 | uint16_t size = ntohs (req->header.size); | ||
803 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*req); | ||
804 | |||
805 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
806 | "%p Sending multicast request to client. " | ||
807 | "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", | ||
808 | chn, GNUNET_ntohll (req->fragment_id), | ||
809 | GNUNET_ntohll (req->request_id)); | ||
810 | |||
811 | pmsg = GNUNET_malloc (psize); | ||
812 | pmsg->header.size = htons (psize); | ||
813 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
814 | pmsg->message_id = req->request_id; | ||
815 | pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); | ||
816 | |||
817 | memcpy (&pmsg[1], &req[1], size - sizeof (*req)); | ||
818 | client_send_msg (chn, &pmsg->header); | ||
795 | GNUNET_free (pmsg); | 819 | GNUNET_free (pmsg); |
796 | } | 820 | } |
797 | 821 | ||
@@ -799,14 +823,14 @@ mmsg_to_clients (struct Channel *ch, | |||
799 | /** | 823 | /** |
800 | * Insert a multicast message fragment into the queue belonging to the message. | 824 | * Insert a multicast message fragment into the queue belonging to the message. |
801 | * | 825 | * |
802 | * @param ch Channel. | 826 | * @param chn Channel. |
803 | * @param mmsg Multicast message fragment. | 827 | * @param mmsg Multicast message fragment. |
804 | * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. | 828 | * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. |
805 | * @param first_ptype First PSYC message part type in @a mmsg. | 829 | * @param first_ptype First PSYC message part type in @a mmsg. |
806 | * @param last_ptype Last PSYC message part type in @a mmsg. | 830 | * @param last_ptype Last PSYC message part type in @a mmsg. |
807 | */ | 831 | */ |
808 | static void | 832 | static void |
809 | fragment_queue_insert (struct Channel *ch, | 833 | fragment_queue_insert (struct Channel *chn, |
810 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, | 834 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, |
811 | uint16_t first_ptype, uint16_t last_ptype) | 835 | uint16_t first_ptype, uint16_t last_ptype) |
812 | { | 836 | { |
@@ -814,13 +838,13 @@ fragment_queue_insert (struct Channel *ch, | |||
814 | const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset); | 838 | const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset); |
815 | struct GNUNET_CONTAINER_MultiHashMap | 839 | struct GNUNET_CONTAINER_MultiHashMap |
816 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, | 840 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, |
817 | &ch->pub_key_hash); | 841 | &chn->pub_key_hash); |
818 | 842 | ||
819 | struct GNUNET_HashCode msg_id_hash; | 843 | struct GNUNET_HashCode msg_id_hash; |
820 | hash_key_from_nll (&msg_id_hash, mmsg->message_id); | 844 | hash_key_from_nll (&msg_id_hash, mmsg->message_id); |
821 | 845 | ||
822 | struct FragmentQueue | 846 | struct FragmentQueue |
823 | *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); | 847 | *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); |
824 | 848 | ||
825 | if (NULL == fragq) | 849 | if (NULL == fragq) |
826 | { | 850 | { |
@@ -829,13 +853,13 @@ fragment_queue_insert (struct Channel *ch, | |||
829 | fragq->fragments | 853 | fragq->fragments |
830 | = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | 854 | = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); |
831 | 855 | ||
832 | GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq, | 856 | GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq, |
833 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | 857 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); |
834 | 858 | ||
835 | if (NULL == chan_msgs) | 859 | if (NULL == chan_msgs) |
836 | { | 860 | { |
837 | chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | 861 | chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); |
838 | GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs, | 862 | GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs, |
839 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | 863 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); |
840 | } | 864 | } |
841 | } | 865 | } |
@@ -850,7 +874,7 @@ fragment_queue_insert (struct Channel *ch, | |||
850 | "%p Adding message fragment to cache. " | 874 | "%p Adding message fragment to cache. " |
851 | "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", " | 875 | "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", " |
852 | "header_size: %" PRIu64 " + %u).\n", | 876 | "header_size: %" PRIu64 " + %u).\n", |
853 | ch, GNUNET_ntohll (mmsg->message_id), | 877 | chn, GNUNET_ntohll (mmsg->message_id), |
854 | GNUNET_ntohll (mmsg->fragment_id), | 878 | GNUNET_ntohll (mmsg->fragment_id), |
855 | fragq->header_size, size); | 879 | fragq->header_size, size); |
856 | cache_entry = GNUNET_new (struct RecvCacheEntry); | 880 | cache_entry = GNUNET_new (struct RecvCacheEntry); |
@@ -867,7 +891,7 @@ fragment_queue_insert (struct Channel *ch, | |||
867 | "%p Message fragment is already in cache. " | 891 | "%p Message fragment is already in cache. " |
868 | "message_id: %" PRIu64 ", fragment_id: %" PRIu64 | 892 | "message_id: %" PRIu64 ", fragment_id: %" PRIu64 |
869 | ", ref_count: %u\n", | 893 | ", ref_count: %u\n", |
870 | ch, GNUNET_ntohll (mmsg->message_id), | 894 | chn, GNUNET_ntohll (mmsg->message_id), |
871 | GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count); | 895 | GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count); |
872 | } | 896 | } |
873 | 897 | ||
@@ -890,11 +914,11 @@ fragment_queue_insert (struct Channel *ch, | |||
890 | { /* header is now complete */ | 914 | { /* header is now complete */ |
891 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 915 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
892 | "%p Header of message %" PRIu64 " is complete.\n", | 916 | "%p Header of message %" PRIu64 " is complete.\n", |
893 | ch, GNUNET_ntohll (mmsg->message_id)); | 917 | chn, GNUNET_ntohll (mmsg->message_id)); |
894 | 918 | ||
895 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 919 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
896 | "%p Adding message %" PRIu64 " to queue.\n", | 920 | "%p Adding message %" PRIu64 " to queue.\n", |
897 | ch, GNUNET_ntohll (mmsg->message_id)); | 921 | chn, GNUNET_ntohll (mmsg->message_id)); |
898 | fragq->state = MSG_FRAG_STATE_DATA; | 922 | fragq->state = MSG_FRAG_STATE_DATA; |
899 | } | 923 | } |
900 | else | 924 | else |
@@ -902,7 +926,7 @@ fragment_queue_insert (struct Channel *ch, | |||
902 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 926 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
903 | "%p Header of message %" PRIu64 " is NOT complete yet: " | 927 | "%p Header of message %" PRIu64 " is NOT complete yet: " |
904 | "%" PRIu64 " != %" PRIu64 "\n", | 928 | "%" PRIu64 " != %" PRIu64 "\n", |
905 | ch, GNUNET_ntohll (mmsg->message_id), frag_offset, | 929 | chn, GNUNET_ntohll (mmsg->message_id), frag_offset, |
906 | fragq->header_size); | 930 | fragq->header_size); |
907 | } | 931 | } |
908 | } | 932 | } |
@@ -916,7 +940,7 @@ fragment_queue_insert (struct Channel *ch, | |||
916 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 940 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
917 | "%p Message %" PRIu64 " is NOT complete yet: " | 941 | "%p Message %" PRIu64 " is NOT complete yet: " |
918 | "%" PRIu64 " != %" PRIu64 "\n", | 942 | "%" PRIu64 " != %" PRIu64 "\n", |
919 | ch, GNUNET_ntohll (mmsg->message_id), frag_offset, | 943 | chn, GNUNET_ntohll (mmsg->message_id), frag_offset, |
920 | fragq->size); | 944 | fragq->size); |
921 | break; | 945 | break; |
922 | 946 | ||
@@ -935,7 +959,7 @@ fragment_queue_insert (struct Channel *ch, | |||
935 | case MSG_FRAG_STATE_CANCEL: | 959 | case MSG_FRAG_STATE_CANCEL: |
936 | if (GNUNET_NO == fragq->queued) | 960 | if (GNUNET_NO == fragq->queued) |
937 | { | 961 | { |
938 | GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL, | 962 | GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL, |
939 | GNUNET_ntohll (mmsg->message_id)); | 963 | GNUNET_ntohll (mmsg->message_id)); |
940 | fragq->queued = GNUNET_YES; | 964 | fragq->queued = GNUNET_YES; |
941 | } | 965 | } |
@@ -953,24 +977,24 @@ fragment_queue_insert (struct Channel *ch, | |||
953 | * Send fragments of a message in order to client, after all modifiers arrived | 977 | * Send fragments of a message in order to client, after all modifiers arrived |
954 | * from multicast. | 978 | * from multicast. |
955 | * | 979 | * |
956 | * @param ch Channel. | 980 | * @param chn Channel. |
957 | * @param msg_id ID of the message @a fragq belongs to. | 981 | * @param msg_id ID of the message @a fragq belongs to. |
958 | * @param fragq Fragment queue of the message. | 982 | * @param fragq Fragment queue of the message. |
959 | * @param drop Drop message without delivering to client? | 983 | * @param drop Drop message without delivering to client? |
960 | * #GNUNET_YES or #GNUNET_NO. | 984 | * #GNUNET_YES or #GNUNET_NO. |
961 | */ | 985 | */ |
962 | static void | 986 | static void |
963 | fragment_queue_run (struct Channel *ch, uint64_t msg_id, | 987 | fragment_queue_run (struct Channel *chn, uint64_t msg_id, |
964 | struct FragmentQueue *fragq, uint8_t drop) | 988 | struct FragmentQueue *fragq, uint8_t drop) |
965 | { | 989 | { |
966 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 990 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
967 | "%p Running message fragment queue for message %" PRIu64 | 991 | "%p Running message fragment queue for message %" PRIu64 |
968 | " (state: %u).\n", | 992 | " (state: %u).\n", |
969 | ch, msg_id, fragq->state); | 993 | chn, msg_id, fragq->state); |
970 | 994 | ||
971 | struct GNUNET_CONTAINER_MultiHashMap | 995 | struct GNUNET_CONTAINER_MultiHashMap |
972 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, | 996 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, |
973 | &ch->pub_key_hash); | 997 | &chn->pub_key_hash); |
974 | GNUNET_assert (NULL != chan_msgs); | 998 | GNUNET_assert (NULL != chan_msgs); |
975 | uint64_t frag_id; | 999 | uint64_t frag_id; |
976 | 1000 | ||
@@ -985,7 +1009,7 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id, | |||
985 | { | 1009 | { |
986 | if (GNUNET_NO == drop) | 1010 | if (GNUNET_NO == drop) |
987 | { | 1011 | { |
988 | mmsg_to_clients (ch, cache_entry->mmsg); | 1012 | client_send_mcast_msg (chn, cache_entry->mmsg); |
989 | } | 1013 | } |
990 | if (cache_entry->ref_count <= 1) | 1014 | if (cache_entry->ref_count <= 1) |
991 | { | 1015 | { |
@@ -1014,7 +1038,7 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id, | |||
1014 | struct GNUNET_HashCode msg_id_hash; | 1038 | struct GNUNET_HashCode msg_id_hash; |
1015 | hash_key_from_nll (&msg_id_hash, msg_id); | 1039 | hash_key_from_nll (&msg_id_hash, msg_id); |
1016 | 1040 | ||
1017 | GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq); | 1041 | GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq); |
1018 | GNUNET_CONTAINER_heap_destroy (fragq->fragments); | 1042 | GNUNET_CONTAINER_heap_destroy (fragq->fragments); |
1019 | GNUNET_free (fragq); | 1043 | GNUNET_free (fragq); |
1020 | } | 1044 | } |
@@ -1034,33 +1058,33 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id, | |||
1034 | * - A stateful message is only sent if the previous stateful message | 1058 | * - A stateful message is only sent if the previous stateful message |
1035 | * has already been delivered to the client. | 1059 | * has already been delivered to the client. |
1036 | * | 1060 | * |
1037 | * @param ch Channel. | 1061 | * @param chn Channel. |
1038 | * | 1062 | * |
1039 | * @return Number of messages removed from queue and sent to client. | 1063 | * @return Number of messages removed from queue and sent to client. |
1040 | */ | 1064 | */ |
1041 | static uint64_t | 1065 | static uint64_t |
1042 | message_queue_run (struct Channel *ch) | 1066 | message_queue_run (struct Channel *chn) |
1043 | { | 1067 | { |
1044 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1068 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1045 | "%p Running message queue.\n", ch); | 1069 | "%p Running message queue.\n", chn); |
1046 | uint64_t n = 0; | 1070 | uint64_t n = 0; |
1047 | uint64_t msg_id; | 1071 | uint64_t msg_id; |
1048 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL, | 1072 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, |
1049 | &msg_id)) | 1073 | &msg_id)) |
1050 | { | 1074 | { |
1051 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1075 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1052 | "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id); | 1076 | "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id); |
1053 | struct GNUNET_HashCode msg_id_hash; | 1077 | struct GNUNET_HashCode msg_id_hash; |
1054 | hash_key_from_hll (&msg_id_hash, msg_id); | 1078 | hash_key_from_hll (&msg_id_hash, msg_id); |
1055 | 1079 | ||
1056 | struct FragmentQueue * | 1080 | struct FragmentQueue * |
1057 | fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); | 1081 | fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); |
1058 | 1082 | ||
1059 | if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER) | 1083 | if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER) |
1060 | { | 1084 | { |
1061 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1085 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1062 | "%p No fragq (%p) or header not complete.\n", | 1086 | "%p No fragq (%p) or header not complete.\n", |
1063 | ch, fragq); | 1087 | chn, fragq); |
1064 | break; | 1088 | break; |
1065 | } | 1089 | } |
1066 | 1090 | ||
@@ -1070,40 +1094,40 @@ message_queue_run (struct Channel *ch) | |||
1070 | if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) | 1094 | if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) |
1071 | { | 1095 | { |
1072 | if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) | 1096 | if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) |
1073 | && msg_id - 1 != ch->max_message_id) | 1097 | && msg_id - 1 != chn->max_message_id) |
1074 | { | 1098 | { |
1075 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1099 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1076 | "%p Out of order message. " | 1100 | "%p Out of order message. " |
1077 | "(%" PRIu64 " - 1 != %" PRIu64 ")\n", | 1101 | "(%" PRIu64 " - 1 != %" PRIu64 ")\n", |
1078 | ch, msg_id, ch->max_message_id); | 1102 | chn, msg_id, chn->max_message_id); |
1079 | break; | 1103 | break; |
1080 | } | 1104 | } |
1081 | } | 1105 | } |
1082 | else | 1106 | else |
1083 | { | 1107 | { |
1084 | if (msg_id - fragq->state_delta != ch->max_state_message_id) | 1108 | if (msg_id - fragq->state_delta != chn->max_state_message_id) |
1085 | { | 1109 | { |
1086 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1110 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1087 | "%p Out of order stateful message. " | 1111 | "%p Out of order stateful message. " |
1088 | "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", | 1112 | "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", |
1089 | ch, msg_id, fragq->state_delta, ch->max_state_message_id); | 1113 | chn, msg_id, fragq->state_delta, chn->max_state_message_id); |
1090 | break; | 1114 | break; |
1091 | } | 1115 | } |
1092 | #if TODO | 1116 | #if TODO |
1093 | /* FIXME: apply modifiers to state in PSYCstore */ | 1117 | /* FIXME: apply modifiers to state in PSYCstore */ |
1094 | GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id, | 1118 | GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id, |
1095 | state_modify_result_cb, cls); | 1119 | store_recv_state_modify_result, cls); |
1096 | #endif | 1120 | #endif |
1097 | ch->max_state_message_id = msg_id; | 1121 | chn->max_state_message_id = msg_id; |
1098 | } | 1122 | } |
1099 | ch->max_message_id = msg_id; | 1123 | chn->max_message_id = msg_id; |
1100 | } | 1124 | } |
1101 | fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); | 1125 | fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); |
1102 | GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs); | 1126 | GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); |
1103 | n++; | 1127 | n++; |
1104 | } | 1128 | } |
1105 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1129 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1106 | "%p Removed %" PRIu64 " messages from queue.\n", ch, n); | 1130 | "%p Removed %" PRIu64 " messages from queue.\n", chn, n); |
1107 | return n; | 1131 | return n; |
1108 | } | 1132 | } |
1109 | 1133 | ||
@@ -1113,107 +1137,88 @@ message_queue_run (struct Channel *ch) | |||
1113 | * | 1137 | * |
1114 | * Remove all messages in queue without sending it to clients. | 1138 | * Remove all messages in queue without sending it to clients. |
1115 | * | 1139 | * |
1116 | * @param ch Channel. | 1140 | * @param chn Channel. |
1117 | * | 1141 | * |
1118 | * @return Number of messages removed from queue. | 1142 | * @return Number of messages removed from queue. |
1119 | */ | 1143 | */ |
1120 | static uint64_t | 1144 | static uint64_t |
1121 | message_queue_drop (struct Channel *ch) | 1145 | message_queue_drop (struct Channel *chn) |
1122 | { | 1146 | { |
1123 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1147 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1124 | "%p Dropping message queue.\n", ch); | 1148 | "%p Dropping message queue.\n", chn); |
1125 | uint64_t n = 0; | 1149 | uint64_t n = 0; |
1126 | uint64_t msg_id; | 1150 | uint64_t msg_id; |
1127 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL, | 1151 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, |
1128 | &msg_id)) | 1152 | &msg_id)) |
1129 | { | 1153 | { |
1130 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1154 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1131 | "%p Dropping message %" PRIu64 " from queue.\n", ch, msg_id); | 1155 | "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id); |
1132 | struct GNUNET_HashCode msg_id_hash; | 1156 | struct GNUNET_HashCode msg_id_hash; |
1133 | hash_key_from_hll (&msg_id_hash, msg_id); | 1157 | hash_key_from_hll (&msg_id_hash, msg_id); |
1134 | 1158 | ||
1135 | struct FragmentQueue * | 1159 | struct FragmentQueue * |
1136 | fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); | 1160 | fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); |
1137 | 1161 | ||
1138 | fragment_queue_run (ch, msg_id, fragq, GNUNET_YES); | 1162 | fragment_queue_run (chn, msg_id, fragq, GNUNET_YES); |
1139 | GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs); | 1163 | GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); |
1140 | n++; | 1164 | n++; |
1141 | } | 1165 | } |
1142 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1166 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1143 | "%p Removed %" PRIu64 " messages from queue.\n", ch, n); | 1167 | "%p Removed %" PRIu64 " messages from queue.\n", chn, n); |
1144 | return n; | 1168 | return n; |
1145 | } | 1169 | } |
1146 | 1170 | ||
1147 | 1171 | ||
1148 | /** | 1172 | /** |
1149 | * Handle incoming message from multicast. | 1173 | * Handle the result of a GNUNET_PSYCSTORE_fragment_store() operation. |
1150 | * | ||
1151 | * @param ch Channel. | ||
1152 | * @param mmsg Multicast message. | ||
1153 | * | ||
1154 | * @return #GNUNET_OK or #GNUNET_SYSERR | ||
1155 | */ | 1174 | */ |
1156 | static int | 1175 | static void |
1157 | client_multicast_message (struct Channel *ch, | 1176 | store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg) |
1158 | const struct GNUNET_MULTICAST_MessageHeader *mmsg) | ||
1159 | { | 1177 | { |
1160 | GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL); | 1178 | struct Channel *chn = cls; |
1161 | |||
1162 | uint16_t size = ntohs (mmsg->header.size); | ||
1163 | uint16_t first_ptype = 0, last_ptype = 0; | ||
1164 | |||
1165 | if (GNUNET_SYSERR | ||
1166 | == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg), | ||
1167 | (const char *) &mmsg[1], | ||
1168 | &first_ptype, &last_ptype)) | ||
1169 | { | ||
1170 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1171 | "%p Received message with invalid parts from multicast. " | ||
1172 | "Dropping message.\n", ch); | ||
1173 | GNUNET_break_op (0); | ||
1174 | return GNUNET_SYSERR; | ||
1175 | } | ||
1176 | |||
1177 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1179 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1178 | "Message parts: first: type %u, last: type %u\n", | 1180 | "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n", |
1179 | first_ptype, last_ptype); | 1181 | chn, result, err_msg); |
1180 | |||
1181 | fragment_queue_insert (ch, mmsg, first_ptype, last_ptype); | ||
1182 | message_queue_run (ch); | ||
1183 | |||
1184 | return GNUNET_OK; | ||
1185 | } | 1182 | } |
1186 | 1183 | ||
1187 | 1184 | ||
1188 | /** | 1185 | /** |
1189 | * Incoming message fragment from multicast. | 1186 | * Handle incoming message fragment from multicast. |
1190 | * | 1187 | * |
1191 | * Store it using PSYCstore and send it to the client of the channel. | 1188 | * Store it using PSYCstore and send it to the clients of the channel in order. |
1192 | */ | 1189 | */ |
1193 | static void | 1190 | static void |
1194 | mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | 1191 | mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg) |
1195 | { | 1192 | { |
1196 | struct Channel *ch = cls; | 1193 | struct Channel *chn = cls; |
1197 | uint16_t type = ntohs (msg->type); | 1194 | uint16_t size = ntohs (mmsg->header.size); |
1198 | uint16_t size = ntohs (msg->size); | ||
1199 | 1195 | ||
1200 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1196 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1201 | "%p Received message of type %u and size %u from multicast.\n", | 1197 | "%p Received multicast message of size %u.\n", |
1202 | ch, type, size); | 1198 | chn, size); |
1203 | 1199 | ||
1204 | switch (type) | 1200 | GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0, |
1205 | { | 1201 | &store_recv_fragment_store_result, chn); |
1206 | case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: | 1202 | |
1203 | uint16_t first_ptype = 0, last_ptype = 0; | ||
1204 | if (GNUNET_SYSERR | ||
1205 | == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg), | ||
1206 | (const char *) &mmsg[1], | ||
1207 | &first_ptype, &last_ptype)) | ||
1207 | { | 1208 | { |
1208 | client_multicast_message (ch, (const struct | ||
1209 | GNUNET_MULTICAST_MessageHeader *) msg); | ||
1210 | break; | ||
1211 | } | ||
1212 | default: | ||
1213 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1209 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1214 | "%p Dropping unknown message of type %u and size %u.\n", | 1210 | "%p Dropping incoming multicast message with invalid parts.\n", |
1215 | ch, type, size); | 1211 | chn); |
1212 | GNUNET_break_op (0); | ||
1213 | return; | ||
1216 | } | 1214 | } |
1215 | |||
1216 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1217 | "Message parts: first: type %u, last: type %u\n", | ||
1218 | first_ptype, last_ptype); | ||
1219 | |||
1220 | fragment_queue_insert (chn, mmsg, first_ptype, last_ptype); | ||
1221 | message_queue_run (chn); | ||
1217 | } | 1222 | } |
1218 | 1223 | ||
1219 | 1224 | ||
@@ -1226,59 +1231,35 @@ mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | |||
1226 | * @param flags Request flags. | 1231 | * @param flags Request flags. |
1227 | */ | 1232 | */ |
1228 | static void | 1233 | static void |
1229 | mcast_request_cb (void *cls, | 1234 | mcast_recv_request (void *cls, |
1230 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | 1235 | const struct GNUNET_MULTICAST_RequestHeader *req) |
1231 | const struct GNUNET_MessageHeader *msg, | ||
1232 | enum GNUNET_MULTICAST_MessageFlags flags) | ||
1233 | { | 1236 | { |
1234 | struct Master *mst = cls; | 1237 | struct Master *mst = cls; |
1235 | struct Channel *ch = &mst->ch; | 1238 | uint16_t size = ntohs (req->header.size); |
1236 | |||
1237 | uint16_t type = ntohs (msg->type); | ||
1238 | uint16_t size = ntohs (msg->size); | ||
1239 | 1239 | ||
1240 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1240 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1241 | "%p Received request of type %u and size %u from multicast.\n", | 1241 | "%p Received multicast request of size %u.\n", |
1242 | ch, type, size); | 1242 | mst, size); |
1243 | 1243 | ||
1244 | switch (type) | 1244 | uint16_t first_ptype = 0, last_ptype = 0; |
1245 | { | 1245 | if (GNUNET_SYSERR |
1246 | case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: | 1246 | == GNUNET_PSYC_receive_check_parts (size - sizeof (*req), |
1247 | (const char *) &req[1], | ||
1248 | &first_ptype, &last_ptype)) | ||
1247 | { | 1249 | { |
1248 | const struct GNUNET_MULTICAST_RequestHeader *req | 1250 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1249 | = (const struct GNUNET_MULTICAST_RequestHeader *) msg; | 1251 | "%p Dropping incoming multicast request with invalid parts.\n", |
1250 | 1252 | mst); | |
1251 | /* FIXME: see message_cb() */ | ||
1252 | if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req), | ||
1253 | (const char *) &req[1], | ||
1254 | NULL, NULL)) | ||
1255 | { | ||
1256 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1257 | "%p Dropping request with invalid parts " | ||
1258 | "received from multicast.\n", ch); | ||
1259 | GNUNET_break_op (0); | ||
1260 | break; | ||
1261 | } | ||
1262 | |||
1263 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
1264 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*req); | ||
1265 | pmsg = GNUNET_malloc (psize); | ||
1266 | pmsg->header.size = htons (psize); | ||
1267 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
1268 | pmsg->message_id = req->request_id; | ||
1269 | pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); | ||
1270 | |||
1271 | memcpy (&pmsg[1], &req[1], size - sizeof (*req)); | ||
1272 | msg_to_clients (ch, &pmsg->header); | ||
1273 | GNUNET_free (pmsg); | ||
1274 | break; | ||
1275 | } | ||
1276 | default: | ||
1277 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1278 | "%p Dropping unknown request of type %u and size %u.\n", | ||
1279 | ch, type, size); | ||
1280 | GNUNET_break_op (0); | 1253 | GNUNET_break_op (0); |
1254 | return; | ||
1281 | } | 1255 | } |
1256 | |||
1257 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1258 | "Message parts: first: type %u, last: type %u\n", | ||
1259 | first_ptype, last_ptype); | ||
1260 | |||
1261 | /* FIXME: in-order delivery */ | ||
1262 | client_send_mcast_req (mst, req); | ||
1282 | } | 1263 | } |
1283 | 1264 | ||
1284 | 1265 | ||
@@ -1286,13 +1267,13 @@ mcast_request_cb (void *cls, | |||
1286 | * Response from PSYCstore with the current counter values for a channel master. | 1267 | * Response from PSYCstore with the current counter values for a channel master. |
1287 | */ | 1268 | */ |
1288 | static void | 1269 | static void |
1289 | master_counters_cb (void *cls, int result, uint64_t max_fragment_id, | 1270 | store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id, |
1290 | uint64_t max_message_id, uint64_t max_group_generation, | 1271 | uint64_t max_message_id, uint64_t max_group_generation, |
1291 | uint64_t max_state_message_id) | 1272 | uint64_t max_state_message_id) |
1292 | { | 1273 | { |
1293 | struct Master *mst = cls; | 1274 | struct Master *mst = cls; |
1294 | struct Channel *ch = &mst->ch; | 1275 | struct Channel *chn = &mst->chn; |
1295 | ch->store_op = NULL; | 1276 | chn->store_op = NULL; |
1296 | 1277 | ||
1297 | struct CountersResult res; | 1278 | struct CountersResult res; |
1298 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); | 1279 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); |
@@ -1303,28 +1284,28 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, | |||
1303 | if (GNUNET_OK == result || GNUNET_NO == result) | 1284 | if (GNUNET_OK == result || GNUNET_NO == result) |
1304 | { | 1285 | { |
1305 | mst->max_message_id = max_message_id; | 1286 | mst->max_message_id = max_message_id; |
1306 | ch->max_message_id = max_message_id; | 1287 | chn->max_message_id = max_message_id; |
1307 | ch->max_state_message_id = max_state_message_id; | 1288 | chn->max_state_message_id = max_state_message_id; |
1308 | mst->max_group_generation = max_group_generation; | 1289 | mst->max_group_generation = max_group_generation; |
1309 | mst->origin | 1290 | mst->origin |
1310 | = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, | 1291 | = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, |
1311 | &mcast_join_request_cb, | 1292 | &mcast_recv_join_request, |
1312 | &mcast_membership_test_cb, | 1293 | &mcast_recv_membership_test, |
1313 | &mcast_replay_fragment_cb, | 1294 | &mcast_recv_replay_fragment, |
1314 | &mcast_replay_message_cb, | 1295 | &mcast_recv_replay_message, |
1315 | &mcast_request_cb, | 1296 | &mcast_recv_request, |
1316 | &mcast_message_cb, ch); | 1297 | &mcast_recv_message, chn); |
1317 | ch->ready = GNUNET_YES; | 1298 | chn->ready = GNUNET_YES; |
1318 | } | 1299 | } |
1319 | else | 1300 | else |
1320 | { | 1301 | { |
1321 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 1302 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
1322 | "%p GNUNET_PSYCSTORE_counters_get() " | 1303 | "%p GNUNET_PSYCSTORE_counters_get() " |
1323 | "returned %d for channel %s.\n", | 1304 | "returned %d for channel %s.\n", |
1324 | ch, result, GNUNET_h2s (&ch->pub_key_hash)); | 1305 | chn, result, GNUNET_h2s (&chn->pub_key_hash)); |
1325 | } | 1306 | } |
1326 | 1307 | ||
1327 | msg_to_clients (ch, &res.header); | 1308 | client_send_msg (chn, &res.header); |
1328 | } | 1309 | } |
1329 | 1310 | ||
1330 | 1311 | ||
@@ -1332,13 +1313,13 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, | |||
1332 | * Response from PSYCstore with the current counter values for a channel slave. | 1313 | * Response from PSYCstore with the current counter values for a channel slave. |
1333 | */ | 1314 | */ |
1334 | void | 1315 | void |
1335 | slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, | 1316 | store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, |
1336 | uint64_t max_message_id, uint64_t max_group_generation, | 1317 | uint64_t max_message_id, uint64_t max_group_generation, |
1337 | uint64_t max_state_message_id) | 1318 | uint64_t max_state_message_id) |
1338 | { | 1319 | { |
1339 | struct Slave *slv = cls; | 1320 | struct Slave *slv = cls; |
1340 | struct Channel *ch = &slv->ch; | 1321 | struct Channel *chn = &slv->chn; |
1341 | ch->store_op = NULL; | 1322 | chn->store_op = NULL; |
1342 | 1323 | ||
1343 | struct CountersResult res; | 1324 | struct CountersResult res; |
1344 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); | 1325 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); |
@@ -1348,38 +1329,38 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, | |||
1348 | 1329 | ||
1349 | if (GNUNET_OK == result || GNUNET_NO == result) | 1330 | if (GNUNET_OK == result || GNUNET_NO == result) |
1350 | { | 1331 | { |
1351 | ch->max_message_id = max_message_id; | 1332 | chn->max_message_id = max_message_id; |
1352 | ch->max_state_message_id = max_state_message_id; | 1333 | chn->max_state_message_id = max_state_message_id; |
1353 | slv->member | 1334 | slv->member |
1354 | = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key, | 1335 | = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, |
1355 | &slv->origin, | 1336 | &slv->origin, |
1356 | slv->relay_count, slv->relays, | 1337 | slv->relay_count, slv->relays, |
1357 | slv->join_req, | 1338 | slv->join_req, |
1358 | &mcast_join_request_cb, | 1339 | &mcast_recv_join_request, |
1359 | &mcast_join_decision_cb, | 1340 | &mcast_recv_join_decision, |
1360 | &mcast_membership_test_cb, | 1341 | &mcast_recv_membership_test, |
1361 | &mcast_replay_fragment_cb, | 1342 | &mcast_recv_replay_fragment, |
1362 | &mcast_replay_message_cb, | 1343 | &mcast_recv_replay_message, |
1363 | &mcast_message_cb, ch); | 1344 | &mcast_recv_message, chn); |
1364 | } | 1345 | } |
1365 | else | 1346 | else |
1366 | { | 1347 | { |
1367 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 1348 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
1368 | "%p GNUNET_PSYCSTORE_counters_get() " | 1349 | "%p GNUNET_PSYCSTORE_counters_get() " |
1369 | "returned %d for channel %s.\n", | 1350 | "returned %d for channel %s.\n", |
1370 | ch, result, GNUNET_h2s (&ch->pub_key_hash)); | 1351 | chn, result, GNUNET_h2s (&chn->pub_key_hash)); |
1371 | } | 1352 | } |
1372 | 1353 | ||
1373 | msg_to_clients (ch, &res.header); | 1354 | client_send_msg (chn, &res.header); |
1374 | } | 1355 | } |
1375 | 1356 | ||
1376 | 1357 | ||
1377 | static void | 1358 | static void |
1378 | channel_init (struct Channel *ch) | 1359 | channel_init (struct Channel *chn) |
1379 | { | 1360 | { |
1380 | ch->recv_msgs | 1361 | chn->recv_msgs |
1381 | = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | 1362 | = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); |
1382 | ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | 1363 | chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); |
1383 | } | 1364 | } |
1384 | 1365 | ||
1385 | 1366 | ||
@@ -1387,8 +1368,8 @@ channel_init (struct Channel *ch) | |||
1387 | * Handle a connecting client starting a channel master. | 1368 | * Handle a connecting client starting a channel master. |
1388 | */ | 1369 | */ |
1389 | static void | 1370 | static void |
1390 | client_master_start (void *cls, struct GNUNET_SERVER_Client *client, | 1371 | client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client, |
1391 | const struct GNUNET_MessageHeader *msg) | 1372 | const struct GNUNET_MessageHeader *msg) |
1392 | { | 1373 | { |
1393 | const struct MasterStartRequest *req | 1374 | const struct MasterStartRequest *req |
1394 | = (const struct MasterStartRequest *) msg; | 1375 | = (const struct MasterStartRequest *) msg; |
@@ -1401,7 +1382,7 @@ client_master_start (void *cls, struct GNUNET_SERVER_Client *client, | |||
1401 | 1382 | ||
1402 | struct Master * | 1383 | struct Master * |
1403 | mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash); | 1384 | mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash); |
1404 | struct Channel *ch; | 1385 | struct Channel *chn; |
1405 | 1386 | ||
1406 | if (NULL == mst) | 1387 | if (NULL == mst) |
1407 | { | 1388 | { |
@@ -1410,20 +1391,20 @@ client_master_start (void *cls, struct GNUNET_SERVER_Client *client, | |||
1410 | mst->priv_key = req->channel_key; | 1391 | mst->priv_key = req->channel_key; |
1411 | mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | 1392 | mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); |
1412 | 1393 | ||
1413 | ch = &mst->ch; | 1394 | chn = &mst->chn; |
1414 | ch->is_master = GNUNET_YES; | 1395 | chn->is_master = GNUNET_YES; |
1415 | ch->pub_key = pub_key; | 1396 | chn->pub_key = pub_key; |
1416 | ch->pub_key_hash = pub_key_hash; | 1397 | chn->pub_key_hash = pub_key_hash; |
1417 | channel_init (ch); | 1398 | channel_init (chn); |
1418 | 1399 | ||
1419 | GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch, | 1400 | GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn, |
1420 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 1401 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
1421 | ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, | 1402 | chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key, |
1422 | master_counters_cb, mst); | 1403 | store_recv_master_counters, mst); |
1423 | } | 1404 | } |
1424 | else | 1405 | else |
1425 | { | 1406 | { |
1426 | ch = &mst->ch; | 1407 | chn = &mst->chn; |
1427 | 1408 | ||
1428 | struct CountersResult res; | 1409 | struct CountersResult res; |
1429 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); | 1410 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); |
@@ -1438,13 +1419,13 @@ client_master_start (void *cls, struct GNUNET_SERVER_Client *client, | |||
1438 | 1419 | ||
1439 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1420 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1440 | "%p Client connected as master to channel %s.\n", | 1421 | "%p Client connected as master to channel %s.\n", |
1441 | mst, GNUNET_h2s (&ch->pub_key_hash)); | 1422 | mst, GNUNET_h2s (&chn->pub_key_hash)); |
1442 | 1423 | ||
1443 | struct ClientList *cl = GNUNET_new (struct ClientList); | 1424 | struct ClientListItem *cli = GNUNET_new (struct ClientListItem); |
1444 | cl->client = client; | 1425 | cli->client = client; |
1445 | GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl); | 1426 | GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); |
1446 | 1427 | ||
1447 | GNUNET_SERVER_client_set_user_context (client, ch); | 1428 | GNUNET_SERVER_client_set_user_context (client, chn); |
1448 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1429 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1449 | } | 1430 | } |
1450 | 1431 | ||
@@ -1453,8 +1434,8 @@ client_master_start (void *cls, struct GNUNET_SERVER_Client *client, | |||
1453 | * Handle a connecting client joining as a channel slave. | 1434 | * Handle a connecting client joining as a channel slave. |
1454 | */ | 1435 | */ |
1455 | static void | 1436 | static void |
1456 | client_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | 1437 | client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, |
1457 | const struct GNUNET_MessageHeader *msg) | 1438 | const struct GNUNET_MessageHeader *msg) |
1458 | { | 1439 | { |
1459 | const struct SlaveJoinRequest *req | 1440 | const struct SlaveJoinRequest *req |
1460 | = (const struct SlaveJoinRequest *) msg; | 1441 | = (const struct SlaveJoinRequest *) msg; |
@@ -1467,13 +1448,13 @@ client_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
1467 | GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash); | 1448 | GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash); |
1468 | 1449 | ||
1469 | struct GNUNET_CONTAINER_MultiHashMap * | 1450 | struct GNUNET_CONTAINER_MultiHashMap * |
1470 | ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash); | 1451 | chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash); |
1471 | struct Slave *slv = NULL; | 1452 | struct Slave *slv = NULL; |
1472 | struct Channel *ch; | 1453 | struct Channel *chn; |
1473 | 1454 | ||
1474 | if (NULL != ch_slv) | 1455 | if (NULL != chn_slv) |
1475 | { | 1456 | { |
1476 | slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash); | 1457 | slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash); |
1477 | } | 1458 | } |
1478 | if (NULL == slv) | 1459 | if (NULL == slv) |
1479 | { | 1460 | { |
@@ -1494,34 +1475,34 @@ client_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
1494 | memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); | 1475 | memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); |
1495 | } | 1476 | } |
1496 | 1477 | ||
1497 | ch = &slv->ch; | 1478 | chn = &slv->chn; |
1498 | ch->is_master = GNUNET_NO; | 1479 | chn->is_master = GNUNET_NO; |
1499 | ch->pub_key = req->channel_key; | 1480 | chn->pub_key = req->channel_key; |
1500 | ch->pub_key_hash = pub_key_hash; | 1481 | chn->pub_key_hash = pub_key_hash; |
1501 | channel_init (ch); | 1482 | channel_init (chn); |
1502 | 1483 | ||
1503 | if (NULL == ch_slv) | 1484 | if (NULL == chn_slv) |
1504 | { | 1485 | { |
1505 | ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | 1486 | chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); |
1506 | GNUNET_CONTAINER_multihashmap_put (channel_slaves, &ch->pub_key_hash, ch_slv, | 1487 | GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv, |
1507 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 1488 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
1508 | } | 1489 | } |
1509 | GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv->pub_key_hash, ch, | 1490 | GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn, |
1510 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); | 1491 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); |
1511 | GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch, | 1492 | GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn, |
1512 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 1493 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
1513 | ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, | 1494 | chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key, |
1514 | slave_counters_cb, slv); | 1495 | &store_recv_slave_counters, slv); |
1515 | } | 1496 | } |
1516 | else | 1497 | else |
1517 | { | 1498 | { |
1518 | ch = &slv->ch; | 1499 | chn = &slv->chn; |
1519 | 1500 | ||
1520 | struct CountersResult res; | 1501 | struct CountersResult res; |
1521 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); | 1502 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); |
1522 | res.header.size = htons (sizeof (res)); | 1503 | res.header.size = htons (sizeof (res)); |
1523 | res.result_code = htonl (GNUNET_OK); | 1504 | res.result_code = htonl (GNUNET_OK); |
1524 | res.max_message_id = GNUNET_htonll (ch->max_message_id); | 1505 | res.max_message_id = GNUNET_htonll (chn->max_message_id); |
1525 | 1506 | ||
1526 | GNUNET_SERVER_notification_context_add (nc, client); | 1507 | GNUNET_SERVER_notification_context_add (nc, client); |
1527 | GNUNET_SERVER_notification_context_unicast (nc, client, &res.header, | 1508 | GNUNET_SERVER_notification_context_unicast (nc, client, &res.header, |
@@ -1530,16 +1511,16 @@ client_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
1530 | if (NULL == slv->member) | 1511 | if (NULL == slv->member) |
1531 | { | 1512 | { |
1532 | slv->member | 1513 | slv->member |
1533 | = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key, | 1514 | = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, |
1534 | &slv->origin, | 1515 | &slv->origin, |
1535 | slv->relay_count, slv->relays, | 1516 | slv->relay_count, slv->relays, |
1536 | slv->join_req, | 1517 | slv->join_req, |
1537 | &mcast_join_request_cb, | 1518 | &mcast_recv_join_request, |
1538 | &mcast_join_decision_cb, | 1519 | &mcast_recv_join_decision, |
1539 | &mcast_membership_test_cb, | 1520 | &mcast_recv_membership_test, |
1540 | &mcast_replay_fragment_cb, | 1521 | &mcast_recv_replay_fragment, |
1541 | &mcast_replay_message_cb, | 1522 | &mcast_recv_replay_message, |
1542 | &mcast_message_cb, ch); | 1523 | &mcast_recv_message, chn); |
1543 | 1524 | ||
1544 | } | 1525 | } |
1545 | else if (NULL != slv->join_dcsn) | 1526 | else if (NULL != slv->join_dcsn) |
@@ -1553,13 +1534,13 @@ client_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
1553 | 1534 | ||
1554 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1535 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1555 | "%p Client connected as slave to channel %s.\n", | 1536 | "%p Client connected as slave to channel %s.\n", |
1556 | slv, GNUNET_h2s (&ch->pub_key_hash)); | 1537 | slv, GNUNET_h2s (&chn->pub_key_hash)); |
1557 | 1538 | ||
1558 | struct ClientList *cl = GNUNET_new (struct ClientList); | 1539 | struct ClientListItem *cli = GNUNET_new (struct ClientListItem); |
1559 | cl->client = client; | 1540 | cli->client = client; |
1560 | GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl); | 1541 | GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); |
1561 | 1542 | ||
1562 | GNUNET_SERVER_client_set_user_context (client, &slv->ch); | 1543 | GNUNET_SERVER_client_set_user_context (client, &slv->chn); |
1563 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1544 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1564 | } | 1545 | } |
1565 | 1546 | ||
@@ -1575,8 +1556,8 @@ struct JoinDecisionClosure | |||
1575 | * Iterator callback for responding to join requests of a slave. | 1556 | * Iterator callback for responding to join requests of a slave. |
1576 | */ | 1557 | */ |
1577 | static int | 1558 | static int |
1578 | send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, | 1559 | mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash, |
1579 | void *jh) | 1560 | void *jh) |
1580 | { | 1561 | { |
1581 | struct JoinDecisionClosure *jcls = cls; | 1562 | struct JoinDecisionClosure *jcls = cls; |
1582 | // FIXME: add relays | 1563 | // FIXME: add relays |
@@ -1589,13 +1570,13 @@ send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, | |||
1589 | * Join decision from client. | 1570 | * Join decision from client. |
1590 | */ | 1571 | */ |
1591 | static void | 1572 | static void |
1592 | client_join_decision (void *cls, struct GNUNET_SERVER_Client *client, | 1573 | client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client, |
1593 | const struct GNUNET_MessageHeader *msg) | 1574 | const struct GNUNET_MessageHeader *msg) |
1594 | { | 1575 | { |
1595 | struct Channel * | 1576 | struct Channel * |
1596 | ch = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 1577 | chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
1597 | GNUNET_assert (GNUNET_YES == ch->is_master); | 1578 | GNUNET_assert (GNUNET_YES == chn->is_master); |
1598 | struct Master *mst = (struct Master *) ch; | 1579 | struct Master *mst = (struct Master *) chn; |
1599 | 1580 | ||
1600 | struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg; | 1581 | struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg; |
1601 | struct JoinDecisionClosure jcls; | 1582 | struct JoinDecisionClosure jcls; |
@@ -1612,13 +1593,13 @@ client_join_decision (void *cls, struct GNUNET_SERVER_Client *client, | |||
1612 | 1593 | ||
1613 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1594 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1614 | "%p Got join decision (%d) from client for channel %s..\n", | 1595 | "%p Got join decision (%d) from client for channel %s..\n", |
1615 | mst, jcls.is_admitted, GNUNET_h2s (&ch->pub_key_hash)); | 1596 | mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash)); |
1616 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1597 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1617 | "%p ..and slave %s.\n", | 1598 | "%p ..and slave %s.\n", |
1618 | mst, GNUNET_h2s (&slave_key_hash)); | 1599 | mst, GNUNET_h2s (&slave_key_hash)); |
1619 | 1600 | ||
1620 | GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash, | 1601 | GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash, |
1621 | &send_join_decision_cb, &jcls); | 1602 | &mcast_send_join_decision, &jcls); |
1622 | GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash); | 1603 | GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash); |
1623 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1604 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1624 | } | 1605 | } |
@@ -1629,10 +1610,10 @@ client_join_decision (void *cls, struct GNUNET_SERVER_Client *client, | |||
1629 | * | 1610 | * |
1630 | * Sent after a message fragment has been passed on to multicast. | 1611 | * Sent after a message fragment has been passed on to multicast. |
1631 | * | 1612 | * |
1632 | * @param ch The channel struct for the client. | 1613 | * @param chn The channel struct for the client. |
1633 | */ | 1614 | */ |
1634 | static void | 1615 | static void |
1635 | send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client) | 1616 | send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client) |
1636 | { | 1617 | { |
1637 | struct GNUNET_MessageHeader res; | 1618 | struct GNUNET_MessageHeader res; |
1638 | res.size = htons (sizeof (res)); | 1619 | res.size = htons (sizeof (res)); |
@@ -1650,40 +1631,40 @@ send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client) | |||
1650 | static int | 1631 | static int |
1651 | transmit_notify (void *cls, size_t *data_size, void *data) | 1632 | transmit_notify (void *cls, size_t *data_size, void *data) |
1652 | { | 1633 | { |
1653 | struct Channel *ch = cls; | 1634 | struct Channel *chn = cls; |
1654 | struct TransmitMessage *tmit_msg = ch->tmit_head; | 1635 | struct TransmitMessage *tmit_msg = chn->tmit_head; |
1655 | 1636 | ||
1656 | if (NULL == tmit_msg || *data_size < tmit_msg->size) | 1637 | if (NULL == tmit_msg || *data_size < tmit_msg->size) |
1657 | { | 1638 | { |
1658 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1639 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1659 | "%p transmit_notify: nothing to send.\n", ch); | 1640 | "%p transmit_notify: nothing to send.\n", chn); |
1660 | *data_size = 0; | 1641 | *data_size = 0; |
1661 | return GNUNET_NO; | 1642 | return GNUNET_NO; |
1662 | } | 1643 | } |
1663 | 1644 | ||
1664 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1645 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1665 | "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size); | 1646 | "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size); |
1666 | 1647 | ||
1667 | *data_size = tmit_msg->size; | 1648 | *data_size = tmit_msg->size; |
1668 | memcpy (data, &tmit_msg[1], *data_size); | 1649 | memcpy (data, &tmit_msg[1], *data_size); |
1669 | 1650 | ||
1670 | int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; | 1651 | int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES; |
1671 | if (NULL != tmit_msg->client) | 1652 | if (NULL != tmit_msg->client) |
1672 | send_message_ack (ch, tmit_msg->client); | 1653 | send_message_ack (chn, tmit_msg->client); |
1673 | 1654 | ||
1674 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); | 1655 | GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg); |
1675 | GNUNET_free (tmit_msg); | 1656 | GNUNET_free (tmit_msg); |
1676 | 1657 | ||
1677 | if (0 == ch->tmit_task) | 1658 | if (0 == chn->tmit_task) |
1678 | { | 1659 | { |
1679 | if (NULL != ch->tmit_head) | 1660 | if (NULL != chn->tmit_head) |
1680 | { | 1661 | { |
1681 | transmit_message (ch); | 1662 | transmit_message (chn); |
1682 | } | 1663 | } |
1683 | else if (ch->disconnected) | 1664 | else if (chn->disconnected) |
1684 | { | 1665 | { |
1685 | /* FIXME: handle partial message (when still in_transmit) */ | 1666 | /* FIXME: handle partial message (when still in_transmit) */ |
1686 | cleanup_channel (ch); | 1667 | cleanup_channel (chn); |
1687 | } | 1668 | } |
1688 | } | 1669 | } |
1689 | 1670 | ||
@@ -1732,7 +1713,7 @@ static void | |||
1732 | master_transmit_message (struct Master *mst) | 1713 | master_transmit_message (struct Master *mst) |
1733 | { | 1714 | { |
1734 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst); | 1715 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst); |
1735 | mst->ch.tmit_task = 0; | 1716 | mst->chn.tmit_task = 0; |
1736 | if (NULL == mst->tmit_handle) | 1717 | if (NULL == mst->tmit_handle) |
1737 | { | 1718 | { |
1738 | mst->tmit_handle | 1719 | mst->tmit_handle |
@@ -1753,7 +1734,7 @@ master_transmit_message (struct Master *mst) | |||
1753 | static void | 1734 | static void |
1754 | slave_transmit_message (struct Slave *slv) | 1735 | slave_transmit_message (struct Slave *slv) |
1755 | { | 1736 | { |
1756 | slv->ch.tmit_task = 0; | 1737 | slv->chn.tmit_task = 0; |
1757 | if (NULL == slv->tmit_handle) | 1738 | if (NULL == slv->tmit_handle) |
1758 | { | 1739 | { |
1759 | slv->tmit_handle | 1740 | slv->tmit_handle |
@@ -1768,11 +1749,11 @@ slave_transmit_message (struct Slave *slv) | |||
1768 | 1749 | ||
1769 | 1750 | ||
1770 | static inline void | 1751 | static inline void |
1771 | transmit_message (struct Channel *ch) | 1752 | transmit_message (struct Channel *chn) |
1772 | { | 1753 | { |
1773 | ch->is_master | 1754 | chn->is_master |
1774 | ? master_transmit_message ((struct Master *) ch) | 1755 | ? master_transmit_message ((struct Master *) chn) |
1775 | : slave_transmit_message ((struct Slave *) ch); | 1756 | : slave_transmit_message ((struct Slave *) chn); |
1776 | } | 1757 | } |
1777 | 1758 | ||
1778 | 1759 | ||
@@ -1828,7 +1809,7 @@ slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg, | |||
1828 | /** | 1809 | /** |
1829 | * Queue PSYC message parts for sending to multicast. | 1810 | * Queue PSYC message parts for sending to multicast. |
1830 | * | 1811 | * |
1831 | * @param ch Channel to send to. | 1812 | * @param chn Channel to send to. |
1832 | * @param client Client the message originates from. | 1813 | * @param client Client the message originates from. |
1833 | * @param data_size Size of @a data. | 1814 | * @param data_size Size of @a data. |
1834 | * @param data Concatenated message parts. | 1815 | * @param data Concatenated message parts. |
@@ -1836,7 +1817,7 @@ slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg, | |||
1836 | * @param last_ptype Last message part type in @a data. | 1817 | * @param last_ptype Last message part type in @a data. |
1837 | */ | 1818 | */ |
1838 | static void | 1819 | static void |
1839 | queue_message (struct Channel *ch, | 1820 | queue_message (struct Channel *chn, |
1840 | struct GNUNET_SERVER_Client *client, | 1821 | struct GNUNET_SERVER_Client *client, |
1841 | size_t data_size, | 1822 | size_t data_size, |
1842 | const void *data, | 1823 | const void *data, |
@@ -1847,14 +1828,14 @@ queue_message (struct Channel *ch, | |||
1847 | memcpy (&tmit_msg[1], data, data_size); | 1828 | memcpy (&tmit_msg[1], data, data_size); |
1848 | tmit_msg->client = client; | 1829 | tmit_msg->client = client; |
1849 | tmit_msg->size = data_size; | 1830 | tmit_msg->size = data_size; |
1850 | tmit_msg->state = ch->tmit_state; | 1831 | tmit_msg->state = chn->tmit_state; |
1851 | 1832 | ||
1852 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); | 1833 | GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg); |
1853 | 1834 | ||
1854 | ch->is_master | 1835 | chn->is_master |
1855 | ? master_queue_message ((struct Master *) ch, tmit_msg, | 1836 | ? master_queue_message ((struct Master *) chn, tmit_msg, |
1856 | first_ptype, last_ptype) | 1837 | first_ptype, last_ptype) |
1857 | : slave_queue_message ((struct Slave *) ch, tmit_msg, | 1838 | : slave_queue_message ((struct Slave *) chn, tmit_msg, |
1858 | first_ptype, last_ptype); | 1839 | first_ptype, last_ptype); |
1859 | } | 1840 | } |
1860 | 1841 | ||
@@ -1862,11 +1843,11 @@ queue_message (struct Channel *ch, | |||
1862 | /** | 1843 | /** |
1863 | * Cancel transmission of current message. | 1844 | * Cancel transmission of current message. |
1864 | * | 1845 | * |
1865 | * @param ch Channel to send to. | 1846 | * @param chn Channel to send to. |
1866 | * @param client Client the message originates from. | 1847 | * @param client Client the message originates from. |
1867 | */ | 1848 | */ |
1868 | static void | 1849 | static void |
1869 | transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client) | 1850 | transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client) |
1870 | { | 1851 | { |
1871 | uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; | 1852 | uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; |
1872 | 1853 | ||
@@ -1874,8 +1855,8 @@ transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client) | |||
1874 | msg.size = htons (sizeof (msg)); | 1855 | msg.size = htons (sizeof (msg)); |
1875 | msg.type = htons (type); | 1856 | msg.type = htons (type); |
1876 | 1857 | ||
1877 | queue_message (ch, client, sizeof (msg), &msg, type, type); | 1858 | queue_message (chn, client, sizeof (msg), &msg, type, type); |
1878 | transmit_message (ch); | 1859 | transmit_message (chn); |
1879 | 1860 | ||
1880 | /* FIXME: cleanup */ | 1861 | /* FIXME: cleanup */ |
1881 | } | 1862 | } |
@@ -1885,21 +1866,21 @@ transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client) | |||
1885 | * Incoming message from a master or slave client. | 1866 | * Incoming message from a master or slave client. |
1886 | */ | 1867 | */ |
1887 | static void | 1868 | static void |
1888 | client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, | 1869 | client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, |
1889 | const struct GNUNET_MessageHeader *msg) | 1870 | const struct GNUNET_MessageHeader *msg) |
1890 | { | 1871 | { |
1891 | struct Channel * | 1872 | struct Channel * |
1892 | ch = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 1873 | chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
1893 | GNUNET_assert (NULL != ch); | 1874 | GNUNET_assert (NULL != chn); |
1894 | 1875 | ||
1895 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1876 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1896 | "%p Received message from client.\n", ch); | 1877 | "%p Received message from client.\n", chn); |
1897 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); | 1878 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); |
1898 | 1879 | ||
1899 | if (GNUNET_YES != ch->ready) | 1880 | if (GNUNET_YES != chn->ready) |
1900 | { | 1881 | { |
1901 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1882 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1902 | "%p Channel is not ready, dropping message from client.\n", ch); | 1883 | "%p Channel is not ready, dropping message from client.\n", chn); |
1903 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | 1884 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
1904 | return; | 1885 | return; |
1905 | } | 1886 | } |
@@ -1907,30 +1888,30 @@ client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
1907 | uint16_t size = ntohs (msg->size); | 1888 | uint16_t size = ntohs (msg->size); |
1908 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) | 1889 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) |
1909 | { | 1890 | { |
1910 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", ch); | 1891 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn); |
1911 | GNUNET_break (0); | 1892 | GNUNET_break (0); |
1912 | transmit_cancel (ch, client); | 1893 | transmit_cancel (chn, client); |
1913 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | 1894 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
1914 | return; | 1895 | return; |
1915 | } | 1896 | } |
1916 | 1897 | ||
1917 | uint16_t first_ptype = 0, last_ptype = 0; | 1898 | uint16_t first_ptype = 0, last_ptype = 0; |
1918 | if (GNUNET_SYSERR | 1899 | if (GNUNET_SYSERR |
1919 | == GNUNET_PSYC_check_message_parts (size - sizeof (*msg), | 1900 | == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg), |
1920 | (const char *) &msg[1], | 1901 | (const char *) &msg[1], |
1921 | &first_ptype, &last_ptype)) | 1902 | &first_ptype, &last_ptype)) |
1922 | { | 1903 | { |
1923 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 1904 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
1924 | "%p Received invalid message part from client.\n", ch); | 1905 | "%p Received invalid message part from client.\n", chn); |
1925 | GNUNET_break (0); | 1906 | GNUNET_break (0); |
1926 | transmit_cancel (ch, client); | 1907 | transmit_cancel (chn, client); |
1927 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | 1908 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
1928 | return; | 1909 | return; |
1929 | } | 1910 | } |
1930 | 1911 | ||
1931 | queue_message (ch, client, size - sizeof (*msg), &msg[1], | 1912 | queue_message (chn, client, size - sizeof (*msg), &msg[1], |
1932 | first_ptype, last_ptype); | 1913 | first_ptype, last_ptype); |
1933 | transmit_message (ch); | 1914 | transmit_message (chn); |
1934 | 1915 | ||
1935 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1916 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1936 | }; | 1917 | }; |
@@ -1940,8 +1921,8 @@ client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
1940 | * Client requests to add a slave to the membership database. | 1921 | * Client requests to add a slave to the membership database. |
1941 | */ | 1922 | */ |
1942 | static void | 1923 | static void |
1943 | client_slave_add (void *cls, struct GNUNET_SERVER_Client *client, | 1924 | client_recv_slave_add (void *cls, struct GNUNET_SERVER_Client *client, |
1944 | const struct GNUNET_MessageHeader *msg) | 1925 | const struct GNUNET_MessageHeader *msg) |
1945 | { | 1926 | { |
1946 | 1927 | ||
1947 | } | 1928 | } |
@@ -1951,8 +1932,8 @@ client_slave_add (void *cls, struct GNUNET_SERVER_Client *client, | |||
1951 | * Client requests to remove a slave from the membership database. | 1932 | * Client requests to remove a slave from the membership database. |
1952 | */ | 1933 | */ |
1953 | static void | 1934 | static void |
1954 | client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client, | 1935 | client_recv_slave_remove (void *cls, struct GNUNET_SERVER_Client *client, |
1955 | const struct GNUNET_MessageHeader *msg) | 1936 | const struct GNUNET_MessageHeader *msg) |
1956 | { | 1937 | { |
1957 | 1938 | ||
1958 | } | 1939 | } |
@@ -1962,8 +1943,8 @@ client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client, | |||
1962 | * Client requests channel history from PSYCstore. | 1943 | * Client requests channel history from PSYCstore. |
1963 | */ | 1944 | */ |
1964 | static void | 1945 | static void |
1965 | client_story_request (void *cls, struct GNUNET_SERVER_Client *client, | 1946 | client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client, |
1966 | const struct GNUNET_MessageHeader *msg) | 1947 | const struct GNUNET_MessageHeader *msg) |
1967 | { | 1948 | { |
1968 | 1949 | ||
1969 | } | 1950 | } |
@@ -1973,8 +1954,8 @@ client_story_request (void *cls, struct GNUNET_SERVER_Client *client, | |||
1973 | * Client requests best matching state variable from PSYCstore. | 1954 | * Client requests best matching state variable from PSYCstore. |
1974 | */ | 1955 | */ |
1975 | static void | 1956 | static void |
1976 | client_state_get (void *cls, struct GNUNET_SERVER_Client *client, | 1957 | client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client, |
1977 | const struct GNUNET_MessageHeader *msg) | 1958 | const struct GNUNET_MessageHeader *msg) |
1978 | { | 1959 | { |
1979 | 1960 | ||
1980 | } | 1961 | } |
@@ -1984,8 +1965,8 @@ client_state_get (void *cls, struct GNUNET_SERVER_Client *client, | |||
1984 | * Client requests state variables with a given prefix from PSYCstore. | 1965 | * Client requests state variables with a given prefix from PSYCstore. |
1985 | */ | 1966 | */ |
1986 | static void | 1967 | static void |
1987 | client_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, | 1968 | client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, |
1988 | const struct GNUNET_MessageHeader *msg) | 1969 | const struct GNUNET_MessageHeader *msg) |
1989 | { | 1970 | { |
1990 | 1971 | ||
1991 | } | 1972 | } |
@@ -2003,32 +1984,34 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
2003 | const struct GNUNET_CONFIGURATION_Handle *c) | 1984 | const struct GNUNET_CONFIGURATION_Handle *c) |
2004 | { | 1985 | { |
2005 | static const struct GNUNET_SERVER_MessageHandler handlers[] = { | 1986 | static const struct GNUNET_SERVER_MessageHandler handlers[] = { |
2006 | { &client_master_start, NULL, | 1987 | { &client_recv_master_start, NULL, |
2007 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 }, | 1988 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 }, |
2008 | 1989 | ||
2009 | { &client_slave_join, NULL, | 1990 | { &client_recv_slave_join, NULL, |
2010 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, | 1991 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, |
2011 | 1992 | ||
2012 | { &client_join_decision, NULL, | 1993 | { &client_recv_join_decision, NULL, |
2013 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 }, | 1994 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 }, |
2014 | 1995 | ||
2015 | { &client_psyc_message, NULL, | 1996 | { &client_recv_psyc_message, NULL, |
2016 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, | 1997 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, |
2017 | 1998 | ||
2018 | { &client_slave_add, NULL, | 1999 | { &client_recv_slave_add, NULL, |
2019 | GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 }, | 2000 | GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 }, |
2020 | 2001 | ||
2021 | { &client_slave_remove, NULL, | 2002 | { &client_recv_slave_remove, NULL, |
2022 | GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 }, | 2003 | GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 }, |
2023 | 2004 | ||
2024 | { &client_story_request, NULL, | 2005 | { &client_recv_story_request, NULL, |
2025 | GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 }, | 2006 | GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 }, |
2026 | 2007 | ||
2027 | { &client_state_get, NULL, | 2008 | { &client_recv_state_get, NULL, |
2028 | GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, | 2009 | GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, |
2029 | 2010 | ||
2030 | { &client_state_get_prefix, NULL, | 2011 | { &client_recv_state_get_prefix, NULL, |
2031 | GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 } | 2012 | GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }, |
2013 | |||
2014 | { NULL, NULL, 0, 0 } | ||
2032 | }; | 2015 | }; |
2033 | 2016 | ||
2034 | cfg = c; | 2017 | cfg = c; |
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 7ec9d21b7..bfb6f43fb 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -37,29 +37,11 @@ | |||
37 | #include "gnunet_env_lib.h" | 37 | #include "gnunet_env_lib.h" |
38 | #include "gnunet_multicast_service.h" | 38 | #include "gnunet_multicast_service.h" |
39 | #include "gnunet_psyc_service.h" | 39 | #include "gnunet_psyc_service.h" |
40 | #include "gnunet_psyc_util_lib.h" | ||
40 | #include "psyc.h" | 41 | #include "psyc.h" |
41 | 42 | ||
42 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) | 43 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) |
43 | 44 | ||
44 | struct MessageQueue | ||
45 | { | ||
46 | struct MessageQueue *prev; | ||
47 | struct MessageQueue *next; | ||
48 | /* Followed by struct GNUNET_MessageHeader msg */ | ||
49 | }; | ||
50 | |||
51 | |||
52 | /** | ||
53 | * Handle for a pending PSYC transmission operation. | ||
54 | */ | ||
55 | struct GNUNET_PSYC_ChannelTransmitHandle | ||
56 | { | ||
57 | struct GNUNET_PSYC_Channel *ch; | ||
58 | GNUNET_PSYC_TransmitNotifyModifier notify_mod; | ||
59 | GNUNET_PSYC_TransmitNotifyData notify_data; | ||
60 | void *notify_cls; | ||
61 | enum MessageState state; | ||
62 | }; | ||
63 | 45 | ||
64 | /** | 46 | /** |
65 | * Handle to access PSYC channel operations for both the master and slaves. | 47 | * Handle to access PSYC channel operations for both the master and slaves. |
@@ -67,109 +49,29 @@ struct GNUNET_PSYC_ChannelTransmitHandle | |||
67 | struct GNUNET_PSYC_Channel | 49 | struct GNUNET_PSYC_Channel |
68 | { | 50 | { |
69 | /** | 51 | /** |
70 | * Transmission handle; | ||
71 | */ | ||
72 | struct GNUNET_PSYC_ChannelTransmitHandle tmit; | ||
73 | |||
74 | /** | ||
75 | * Configuration to use. | 52 | * Configuration to use. |
76 | */ | 53 | */ |
77 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 54 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
78 | 55 | ||
79 | /** | 56 | /** |
80 | * Socket (if available). | 57 | * Client connection to the service. |
81 | */ | ||
82 | struct GNUNET_CLIENT_Connection *client; | ||
83 | |||
84 | /** | ||
85 | * Currently pending transmission request, or NULL for none. | ||
86 | */ | 58 | */ |
87 | struct GNUNET_CLIENT_TransmitHandle *th; | 59 | struct GNUNET_CLIENT_MANAGER_Connection *client; |
88 | 60 | ||
89 | /** | 61 | /** |
90 | * Head of messages to transmit to the service. | 62 | * Transmission handle; |
91 | */ | ||
92 | struct MessageQueue *tmit_head; | ||
93 | |||
94 | /** | ||
95 | * Tail of operations to transmit to the service. | ||
96 | */ | 63 | */ |
97 | struct MessageQueue *tmit_tail; | 64 | struct GNUNET_PSYC_TransmitHandle *tmit; |
98 | 65 | ||
99 | /** | 66 | /** |
100 | * Message currently being transmitted to the service. | 67 | * Receipt handle; |
101 | */ | 68 | */ |
102 | struct MessageQueue *tmit_msg; | 69 | struct GNUNET_PSYC_ReceiveHandle *recv; |
103 | 70 | ||
104 | /** | 71 | /** |
105 | * Message to send on reconnect. | 72 | * Message to send on reconnect. |
106 | */ | 73 | */ |
107 | struct GNUNET_MessageHeader *reconnect_msg; | 74 | struct GNUNET_MessageHeader *connect_msg; |
108 | |||
109 | /** | ||
110 | * Task doing exponential back-off trying to reconnect. | ||
111 | */ | ||
112 | GNUNET_SCHEDULER_TaskIdentifier reconnect_task; | ||
113 | |||
114 | /** | ||
115 | * Time for next connect retry. | ||
116 | */ | ||
117 | struct GNUNET_TIME_Relative reconnect_delay; | ||
118 | |||
119 | /** | ||
120 | * Message part callback. | ||
121 | */ | ||
122 | GNUNET_PSYC_MessageCallback message_cb; | ||
123 | |||
124 | /** | ||
125 | * Message part callback for historic message. | ||
126 | */ | ||
127 | GNUNET_PSYC_MessageCallback hist_message_cb; | ||
128 | |||
129 | /** | ||
130 | * Closure for @a message_cb. | ||
131 | */ | ||
132 | void *cb_cls; | ||
133 | |||
134 | /** | ||
135 | * ID of the message being received from the PSYC service. | ||
136 | */ | ||
137 | uint64_t recv_message_id; | ||
138 | |||
139 | /** | ||
140 | * Public key of the slave from which a message is being received. | ||
141 | */ | ||
142 | struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key; | ||
143 | |||
144 | /** | ||
145 | * State of the currently being received message from the PSYC service. | ||
146 | */ | ||
147 | enum MessageState recv_state; | ||
148 | |||
149 | /** | ||
150 | * Flags for the currently being received message from the PSYC service. | ||
151 | */ | ||
152 | enum GNUNET_PSYC_MessageFlags recv_flags; | ||
153 | |||
154 | /** | ||
155 | * Expected value size for the modifier being received from the PSYC service. | ||
156 | */ | ||
157 | uint32_t recv_mod_value_size_expected; | ||
158 | |||
159 | /** | ||
160 | * Actual value size for the modifier being received from the PSYC service. | ||
161 | */ | ||
162 | uint32_t recv_mod_value_size; | ||
163 | |||
164 | /** | ||
165 | * Is transmission paused? | ||
166 | */ | ||
167 | uint8_t tmit_paused; | ||
168 | |||
169 | /** | ||
170 | * Are we still waiting for a PSYC_TRANSMIT_ACK? | ||
171 | */ | ||
172 | uint8_t tmit_ack_pending; | ||
173 | 75 | ||
174 | /** | 76 | /** |
175 | * Are we polling for incoming messages right now? | 77 | * Are we polling for incoming messages right now? |
@@ -177,11 +79,6 @@ struct GNUNET_PSYC_Channel | |||
177 | uint8_t in_receive; | 79 | uint8_t in_receive; |
178 | 80 | ||
179 | /** | 81 | /** |
180 | * Are we currently transmitting a message? | ||
181 | */ | ||
182 | uint8_t in_transmit; | ||
183 | |||
184 | /** | ||
185 | * Is this a master or slave channel? | 82 | * Is this a master or slave channel? |
186 | */ | 83 | */ |
187 | uint8_t is_master; | 84 | uint8_t is_master; |
@@ -193,7 +90,7 @@ struct GNUNET_PSYC_Channel | |||
193 | */ | 90 | */ |
194 | struct GNUNET_PSYC_Master | 91 | struct GNUNET_PSYC_Master |
195 | { | 92 | { |
196 | struct GNUNET_PSYC_Channel ch; | 93 | struct GNUNET_PSYC_Channel chn; |
197 | 94 | ||
198 | GNUNET_PSYC_MasterStartCallback start_cb; | 95 | GNUNET_PSYC_MasterStartCallback start_cb; |
199 | 96 | ||
@@ -201,6 +98,11 @@ struct GNUNET_PSYC_Master | |||
201 | * Join request callback. | 98 | * Join request callback. |
202 | */ | 99 | */ |
203 | GNUNET_PSYC_JoinRequestCallback join_req_cb; | 100 | GNUNET_PSYC_JoinRequestCallback join_req_cb; |
101 | |||
102 | /** | ||
103 | * Closure for the callbacks. | ||
104 | */ | ||
105 | void *cb_cls; | ||
204 | }; | 106 | }; |
205 | 107 | ||
206 | 108 | ||
@@ -209,11 +111,16 @@ struct GNUNET_PSYC_Master | |||
209 | */ | 111 | */ |
210 | struct GNUNET_PSYC_Slave | 112 | struct GNUNET_PSYC_Slave |
211 | { | 113 | { |
212 | struct GNUNET_PSYC_Channel ch; | 114 | struct GNUNET_PSYC_Channel chn; |
213 | 115 | ||
214 | GNUNET_PSYC_SlaveConnectCallback connect_cb; | 116 | GNUNET_PSYC_SlaveConnectCallback connect_cb; |
215 | 117 | ||
216 | GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb; | 118 | GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb; |
119 | |||
120 | /** | ||
121 | * Closure for the callbacks. | ||
122 | */ | ||
123 | void *cb_cls; | ||
217 | }; | 124 | }; |
218 | 125 | ||
219 | 126 | ||
@@ -258,934 +165,170 @@ struct GNUNET_PSYC_StateQuery | |||
258 | 165 | ||
259 | 166 | ||
260 | static void | 167 | static void |
261 | reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | 168 | channel_send_connect_msg (struct GNUNET_PSYC_Channel *chn) |
262 | |||
263 | |||
264 | static void | ||
265 | channel_transmit_data (struct GNUNET_PSYC_Channel *ch); | ||
266 | |||
267 | |||
268 | /** | ||
269 | * Reschedule a connect attempt to the service. | ||
270 | * | ||
271 | * @param ch Channel to reconnect. | ||
272 | */ | ||
273 | static void | ||
274 | reschedule_connect (struct GNUNET_PSYC_Channel *ch) | ||
275 | { | 169 | { |
276 | GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK); | 170 | uint16_t cmsg_size = ntohs (chn->connect_msg->size); |
277 | 171 | struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size); | |
278 | if (NULL != ch->th) | 172 | memcpy (cmsg, chn->connect_msg, cmsg_size); |
279 | { | 173 | GNUNET_CLIENT_MANAGER_transmit_now (chn->client, cmsg); |
280 | GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th); | ||
281 | ch->th = NULL; | ||
282 | } | ||
283 | if (NULL != ch->client) | ||
284 | { | ||
285 | GNUNET_CLIENT_disconnect (ch->client); | ||
286 | ch->client = NULL; | ||
287 | } | ||
288 | ch->in_receive = GNUNET_NO; | ||
289 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
290 | "Scheduling task to reconnect to PSYC service in %s.\n", | ||
291 | GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, GNUNET_YES)); | ||
292 | ch->reconnect_task = | ||
293 | GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch); | ||
294 | ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay); | ||
295 | } | 174 | } |
296 | 175 | ||
297 | 176 | ||
298 | /** | ||
299 | * Schedule transmission of the next message from our queue. | ||
300 | * | ||
301 | * @param ch PSYC channel handle | ||
302 | */ | ||
303 | static void | ||
304 | transmit_next (struct GNUNET_PSYC_Channel *ch); | ||
305 | |||
306 | |||
307 | /** | ||
308 | * Reset stored data related to the last received message. | ||
309 | */ | ||
310 | static void | 177 | static void |
311 | recv_reset (struct GNUNET_PSYC_Channel *ch) | 178 | channel_recv_disconnect (void *cls, |
179 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
180 | const struct GNUNET_MessageHeader *msg) | ||
312 | { | 181 | { |
313 | ch->recv_state = MSG_STATE_START; | 182 | struct GNUNET_PSYC_Channel * |
314 | ch->recv_flags = 0; | 183 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); |
315 | ch->recv_message_id = 0; | 184 | GNUNET_CLIENT_MANAGER_reconnect (client); |
316 | //FIXME: ch->recv_slave_key = { 0 }; | 185 | channel_send_connect_msg (chn); |
317 | ch->recv_mod_value_size = 0; | ||
318 | ch->recv_mod_value_size_expected = 0; | ||
319 | } | 186 | } |
320 | 187 | ||
321 | 188 | ||
322 | static void | 189 | static void |
323 | recv_error (struct GNUNET_PSYC_Channel *ch) | 190 | channel_recv_message (void *cls, |
191 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
192 | const struct GNUNET_MessageHeader *msg) | ||
324 | { | 193 | { |
325 | GNUNET_PSYC_MessageCallback message_cb | 194 | struct GNUNET_PSYC_Channel * |
326 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | 195 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); |
327 | ? ch->hist_message_cb | 196 | GNUNET_PSYC_receive_message (chn->recv, |
328 | : ch->message_cb; | 197 | (const struct GNUNET_PSYC_MessageHeader *) msg); |
329 | |||
330 | if (NULL != message_cb) | ||
331 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); | ||
332 | |||
333 | recv_reset (ch); | ||
334 | } | 198 | } |
335 | 199 | ||
336 | 200 | ||
337 | /** | ||
338 | * Queue a message part for transmission to the PSYC service. | ||
339 | * | ||
340 | * The message part is added to the current message buffer. | ||
341 | * When this buffer is full, it is added to the transmission queue. | ||
342 | * | ||
343 | * @param ch Channel struct for the client. | ||
344 | * @param msg Modifier message part, or NULL when there's no more modifiers. | ||
345 | * @param end End of message. | ||
346 | */ | ||
347 | static void | 201 | static void |
348 | queue_message (struct GNUNET_PSYC_Channel *ch, | 202 | channel_recv_message_ack (void *cls, |
349 | const struct GNUNET_MessageHeader *msg, | 203 | struct GNUNET_CLIENT_MANAGER_Connection *client, |
350 | uint8_t end) | 204 | const struct GNUNET_MessageHeader *msg) |
351 | { | 205 | { |
352 | uint16_t size = msg ? ntohs (msg->size) : 0; | 206 | struct GNUNET_PSYC_Channel * |
353 | 207 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | |
354 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 208 | GNUNET_PSYC_transmit_got_ack (chn->tmit); |
355 | "Queueing message of type %u and size %u (end: %u)).\n", | ||
356 | ntohs (msg->type), size, end); | ||
357 | |||
358 | struct MessageQueue *mq = ch->tmit_msg; | ||
359 | struct GNUNET_MessageHeader *qmsg = NULL; | ||
360 | if (NULL != mq) | ||
361 | { | ||
362 | qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
363 | if (NULL == msg | ||
364 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size) | ||
365 | { | ||
366 | /* End of message or buffer is full, add it to transmission queue | ||
367 | * and start with empty buffer */ | ||
368 | qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
369 | qmsg->size = htons (qmsg->size); | ||
370 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); | ||
371 | ch->tmit_msg = mq = NULL; | ||
372 | ch->tmit_ack_pending++; | ||
373 | } | ||
374 | else | ||
375 | { | ||
376 | /* Message fits in current buffer, append */ | ||
377 | ch->tmit_msg | ||
378 | = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size); | ||
379 | qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
380 | memcpy ((char *) qmsg + qmsg->size, msg, size); | ||
381 | qmsg->size += size; | ||
382 | } | ||
383 | } | ||
384 | |||
385 | if (NULL == mq && NULL != msg) | ||
386 | { | ||
387 | /* Empty buffer, copy over message. */ | ||
388 | ch->tmit_msg | ||
389 | = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size); | ||
390 | qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
391 | qmsg->size = sizeof (*qmsg) + size; | ||
392 | memcpy (&qmsg[1], msg, size); | ||
393 | } | ||
394 | |||
395 | if (NULL != mq | ||
396 | && (GNUNET_YES == end | ||
397 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD | ||
398 | < qmsg->size + sizeof (struct GNUNET_MessageHeader)))) | ||
399 | { | ||
400 | /* End of message or buffer is full, add it to transmission queue. */ | ||
401 | qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
402 | qmsg->size = htons (qmsg->size); | ||
403 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); | ||
404 | ch->tmit_msg = mq = NULL; | ||
405 | ch->tmit_ack_pending++; | ||
406 | } | ||
407 | |||
408 | if (GNUNET_YES == end) | ||
409 | ch->in_transmit = GNUNET_NO; | ||
410 | |||
411 | transmit_next (ch); | ||
412 | } | 209 | } |
413 | 210 | ||
414 | 211 | ||
415 | /** | ||
416 | * Request a modifier from a client to transmit. | ||
417 | * | ||
418 | * @param mst Master handle. | ||
419 | */ | ||
420 | static void | 212 | static void |
421 | channel_transmit_mod (struct GNUNET_PSYC_Channel *ch) | 213 | master_recv_start_ack (void *cls, |
214 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
215 | const struct GNUNET_MessageHeader *msg) | ||
422 | { | 216 | { |
423 | uint16_t max_data_size, data_size; | 217 | struct GNUNET_PSYC_Master * |
424 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | 218 | mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, |
425 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; | 219 | sizeof (struct GNUNET_PSYC_Channel)); |
426 | int notify_ret; | ||
427 | |||
428 | switch (ch->tmit.state) | ||
429 | { | ||
430 | case MSG_STATE_MODIFIER: | ||
431 | { | ||
432 | struct GNUNET_PSYC_MessageModifier *mod | ||
433 | = (struct GNUNET_PSYC_MessageModifier *) msg; | ||
434 | max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; | ||
435 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | ||
436 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
437 | notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1], | ||
438 | &mod->oper, &mod->value_size); | ||
439 | mod->name_size = strnlen ((char *) &mod[1], data_size); | ||
440 | if (mod->name_size < data_size) | ||
441 | { | ||
442 | mod->value_size = htonl (mod->value_size); | ||
443 | mod->name_size = htons (mod->name_size); | ||
444 | } | ||
445 | else if (0 < data_size) | ||
446 | { | ||
447 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n"); | ||
448 | notify_ret = GNUNET_SYSERR; | ||
449 | } | ||
450 | break; | ||
451 | } | ||
452 | case MSG_STATE_MOD_CONT: | ||
453 | { | ||
454 | max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; | ||
455 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | ||
456 | msg->size = sizeof (struct GNUNET_MessageHeader); | ||
457 | notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, | ||
458 | &data_size, &msg[1], NULL, NULL); | ||
459 | break; | ||
460 | } | ||
461 | default: | ||
462 | GNUNET_assert (0); | ||
463 | } | ||
464 | |||
465 | switch (notify_ret) | ||
466 | { | ||
467 | case GNUNET_NO: | ||
468 | if (0 == data_size) | ||
469 | { /* Transmission paused, nothing to send. */ | ||
470 | ch->tmit_paused = GNUNET_YES; | ||
471 | return; | ||
472 | } | ||
473 | ch->tmit.state = MSG_STATE_MOD_CONT; | ||
474 | break; | ||
475 | |||
476 | case GNUNET_YES: | ||
477 | if (0 == data_size) | ||
478 | { | ||
479 | /* End of modifiers. */ | ||
480 | ch->tmit.state = MSG_STATE_DATA; | ||
481 | if (0 == ch->tmit_ack_pending) | ||
482 | channel_transmit_data (ch); | ||
483 | |||
484 | return; | ||
485 | } | ||
486 | ch->tmit.state = MSG_STATE_MODIFIER; | ||
487 | break; | ||
488 | |||
489 | default: | ||
490 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
491 | "MasterTransmitNotifyModifier returned error " | ||
492 | "when requesting a modifier.\n"); | ||
493 | |||
494 | ch->tmit.state = MSG_STATE_CANCEL; | ||
495 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
496 | msg->size = htons (sizeof (*msg)); | ||
497 | |||
498 | queue_message (ch, msg, GNUNET_YES); | ||
499 | return; | ||
500 | } | ||
501 | |||
502 | if (0 < data_size) | ||
503 | { | ||
504 | GNUNET_assert (data_size <= max_data_size); | ||
505 | msg->size = htons (msg->size + data_size); | ||
506 | queue_message (ch, msg, GNUNET_NO); | ||
507 | } | ||
508 | |||
509 | channel_transmit_mod (ch); | ||
510 | } | ||
511 | 220 | ||
512 | 221 | struct CountersResult *cres = (struct CountersResult *) msg; | |
513 | /** | 222 | if (NULL != mst->start_cb) |
514 | * Request data from a client to transmit. | 223 | mst->start_cb (mst->cb_cls, GNUNET_ntohll (cres->max_message_id)); |
515 | * | ||
516 | * @param mst Master handle. | ||
517 | */ | ||
518 | static void | ||
519 | channel_transmit_data (struct GNUNET_PSYC_Channel *ch) | ||
520 | { | ||
521 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | ||
522 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | ||
523 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; | ||
524 | |||
525 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | ||
526 | |||
527 | int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls, | ||
528 | &data_size, &msg[1]); | ||
529 | switch (notify_ret) | ||
530 | { | ||
531 | case GNUNET_NO: | ||
532 | if (0 == data_size) | ||
533 | { | ||
534 | /* Transmission paused, nothing to send. */ | ||
535 | ch->tmit_paused = GNUNET_YES; | ||
536 | return; | ||
537 | } | ||
538 | break; | ||
539 | |||
540 | case GNUNET_YES: | ||
541 | ch->tmit.state = MSG_STATE_END; | ||
542 | break; | ||
543 | |||
544 | default: | ||
545 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
546 | "MasterTransmitNotify returned error when requesting data.\n"); | ||
547 | |||
548 | ch->tmit.state = MSG_STATE_CANCEL; | ||
549 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
550 | msg->size = htons (sizeof (*msg)); | ||
551 | queue_message (ch, msg, GNUNET_YES); | ||
552 | return; | ||
553 | } | ||
554 | |||
555 | if (0 < data_size) | ||
556 | { | ||
557 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); | ||
558 | msg->size = htons (sizeof (*msg) + data_size); | ||
559 | queue_message (ch, msg, !notify_ret); | ||
560 | } | ||
561 | |||
562 | /* End of message. */ | ||
563 | if (GNUNET_YES == notify_ret) | ||
564 | { | ||
565 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | ||
566 | msg->size = htons (sizeof (*msg)); | ||
567 | queue_message (ch, msg, GNUNET_YES); | ||
568 | } | ||
569 | } | 224 | } |
570 | 225 | ||
571 | 226 | ||
572 | /** | ||
573 | * Send a message to a channel. | ||
574 | * | ||
575 | * @param ch Handle to the PSYC channel. | ||
576 | * @param method_name Which method should be invoked. | ||
577 | * @param notify_mod Function to call to obtain modifiers. | ||
578 | * @param notify_data Function to call to obtain fragments of the data. | ||
579 | * @param notify_cls Closure for @a notify_mod and @a notify_data. | ||
580 | * @param flags Flags for the message being transmitted. | ||
581 | * | ||
582 | * @return Transmission handle, NULL on error (i.e. more than one request queued). | ||
583 | */ | ||
584 | static struct GNUNET_PSYC_ChannelTransmitHandle * | ||
585 | channel_transmit (struct GNUNET_PSYC_Channel *ch, | ||
586 | const char *method_name, | ||
587 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, | ||
588 | GNUNET_PSYC_TransmitNotifyData notify_data, | ||
589 | void *notify_cls, | ||
590 | uint32_t flags) | ||
591 | { | ||
592 | if (GNUNET_NO != ch->in_transmit) | ||
593 | return NULL; | ||
594 | ch->in_transmit = GNUNET_YES; | ||
595 | |||
596 | size_t size = strlen (method_name) + 1; | ||
597 | struct GNUNET_PSYC_MessageMethod *pmeth; | ||
598 | struct GNUNET_MessageHeader *qmsg; | ||
599 | struct MessageQueue * | ||
600 | mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) | ||
601 | + sizeof (*pmeth) + size); | ||
602 | qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
603 | qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size; | ||
604 | |||
605 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1]; | ||
606 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | ||
607 | pmeth->header.size = htons (sizeof (*pmeth) + size); | ||
608 | pmeth->flags = htonl (flags); | ||
609 | memcpy (&pmeth[1], method_name, size); | ||
610 | |||
611 | ch->tmit.ch = ch; | ||
612 | ch->tmit.notify_mod = notify_mod; | ||
613 | ch->tmit.notify_data = notify_data; | ||
614 | ch->tmit.notify_cls = notify_cls; | ||
615 | ch->tmit.state = MSG_STATE_MODIFIER; | ||
616 | |||
617 | channel_transmit_mod (ch); | ||
618 | return &ch->tmit; | ||
619 | } | ||
620 | |||
621 | |||
622 | /** | ||
623 | * Resume transmission to the channel. | ||
624 | * | ||
625 | * @param th Handle of the request that is being resumed. | ||
626 | */ | ||
627 | static void | 227 | static void |
628 | channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th) | 228 | master_recv_join_request (void *cls, |
229 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
230 | const struct GNUNET_MessageHeader *msg) | ||
629 | { | 231 | { |
630 | struct GNUNET_PSYC_Channel *ch = th->ch; | 232 | struct GNUNET_PSYC_Master * |
631 | if (0 == ch->tmit_ack_pending) | 233 | mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, |
632 | { | 234 | sizeof (struct GNUNET_PSYC_Channel)); |
633 | ch->tmit_paused = GNUNET_NO; | ||
634 | channel_transmit_data (ch); | ||
635 | } | ||
636 | } | ||
637 | 235 | ||
236 | const struct MasterJoinRequest *req = (const struct MasterJoinRequest *) msg; | ||
638 | 237 | ||
639 | /** | 238 | struct GNUNET_PSYC_MessageHeader *pmsg = NULL; |
640 | * Abort transmission request to channel. | 239 | if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*pmsg)) |
641 | * | 240 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &req[1]; |
642 | * @param th Handle of the request that is being aborted. | ||
643 | */ | ||
644 | static void | ||
645 | channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th) | ||
646 | { | ||
647 | struct GNUNET_PSYC_Channel *ch = th->ch; | ||
648 | if (GNUNET_NO == ch->in_transmit) | ||
649 | return; | ||
650 | } | ||
651 | 241 | ||
242 | struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); | ||
243 | jh->mst = mst; | ||
244 | jh->slave_key = req->slave_key; | ||
652 | 245 | ||
653 | /** | 246 | if (NULL != mst->join_req_cb) |
654 | * Handle incoming message from the PSYC service. | 247 | mst->join_req_cb (mst->cb_cls, &req->slave_key, pmsg, jh); |
655 | * | ||
656 | * @param ch The channel the message is sent to. | ||
657 | * @param pmsg The message. | ||
658 | */ | ||
659 | static void | ||
660 | handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | ||
661 | const struct GNUNET_PSYC_MessageHeader *msg) | ||
662 | { | ||
663 | uint16_t size = ntohs (msg->header.size); | ||
664 | uint32_t flags = ntohl (msg->flags); | ||
665 | |||
666 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, | ||
667 | (struct GNUNET_MessageHeader *) msg); | ||
668 | |||
669 | if (MSG_STATE_START == ch->recv_state) | ||
670 | { | ||
671 | ch->recv_message_id = GNUNET_ntohll (msg->message_id); | ||
672 | ch->recv_flags = flags; | ||
673 | ch->recv_slave_key = msg->slave_key; | ||
674 | ch->recv_mod_value_size = 0; | ||
675 | ch->recv_mod_value_size_expected = 0; | ||
676 | } | ||
677 | else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) | ||
678 | { | ||
679 | // FIXME | ||
680 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
681 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", | ||
682 | GNUNET_ntohll (msg->message_id), ch->recv_message_id); | ||
683 | GNUNET_break_op (0); | ||
684 | recv_error (ch); | ||
685 | return; | ||
686 | } | ||
687 | else if (flags != ch->recv_flags) | ||
688 | { | ||
689 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
690 | "Unexpected message flags. Got: %lu, expected: %lu\n", | ||
691 | flags, ch->recv_flags); | ||
692 | GNUNET_break_op (0); | ||
693 | recv_error (ch); | ||
694 | return; | ||
695 | } | ||
696 | |||
697 | uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; | ||
698 | |||
699 | for (pos = 0; sizeof (*msg) + pos < size; pos += psize) | ||
700 | { | ||
701 | const struct GNUNET_MessageHeader *pmsg | ||
702 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | ||
703 | psize = ntohs (pmsg->size); | ||
704 | ptype = ntohs (pmsg->type); | ||
705 | size_eq = size_min = 0; | ||
706 | |||
707 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) | ||
708 | { | ||
709 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
710 | "Dropping message of type %u with invalid size %u.\n", | ||
711 | ptype, psize); | ||
712 | recv_error (ch); | ||
713 | return; | ||
714 | } | ||
715 | |||
716 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
717 | "Received message part from PSYC.\n"); | ||
718 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); | ||
719 | |||
720 | switch (ptype) | ||
721 | { | ||
722 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
723 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | ||
724 | break; | ||
725 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
726 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
727 | break; | ||
728 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
729 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
730 | size_min = sizeof (struct GNUNET_MessageHeader); | ||
731 | break; | ||
732 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
733 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
734 | size_eq = sizeof (struct GNUNET_MessageHeader); | ||
735 | break; | ||
736 | default: | ||
737 | GNUNET_break_op (0); | ||
738 | recv_error (ch); | ||
739 | return; | ||
740 | } | ||
741 | |||
742 | if (! ((0 < size_eq && psize == size_eq) | ||
743 | || (0 < size_min && size_min <= psize))) | ||
744 | { | ||
745 | GNUNET_break_op (0); | ||
746 | recv_error (ch); | ||
747 | return; | ||
748 | } | ||
749 | |||
750 | switch (ptype) | ||
751 | { | ||
752 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
753 | { | ||
754 | struct GNUNET_PSYC_MessageMethod *meth | ||
755 | = (struct GNUNET_PSYC_MessageMethod *) pmsg; | ||
756 | |||
757 | if (MSG_STATE_START != ch->recv_state) | ||
758 | { | ||
759 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
760 | "Dropping out of order message method (%u).\n", | ||
761 | ch->recv_state); | ||
762 | /* It is normal to receive an incomplete message right after connecting, | ||
763 | * but should not happen later. | ||
764 | * FIXME: add a check for this condition. | ||
765 | */ | ||
766 | GNUNET_break_op (0); | ||
767 | recv_error (ch); | ||
768 | return; | ||
769 | } | ||
770 | |||
771 | if ('\0' != *((char *) meth + psize - 1)) | ||
772 | { | ||
773 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
774 | "Dropping message with malformed method. " | ||
775 | "Message ID: %" PRIu64 "\n", ch->recv_message_id); | ||
776 | GNUNET_break_op (0); | ||
777 | recv_error (ch); | ||
778 | return; | ||
779 | } | ||
780 | ch->recv_state = MSG_STATE_METHOD; | ||
781 | break; | ||
782 | } | ||
783 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
784 | { | ||
785 | if (!(MSG_STATE_METHOD == ch->recv_state | ||
786 | || MSG_STATE_MODIFIER == ch->recv_state | ||
787 | || MSG_STATE_MOD_CONT == ch->recv_state)) | ||
788 | { | ||
789 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
790 | "Dropping out of order message modifier (%u).\n", | ||
791 | ch->recv_state); | ||
792 | GNUNET_break_op (0); | ||
793 | recv_error (ch); | ||
794 | return; | ||
795 | } | ||
796 | |||
797 | struct GNUNET_PSYC_MessageModifier *mod | ||
798 | = (struct GNUNET_PSYC_MessageModifier *) pmsg; | ||
799 | |||
800 | uint16_t name_size = ntohs (mod->name_size); | ||
801 | ch->recv_mod_value_size_expected = ntohl (mod->value_size); | ||
802 | ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; | ||
803 | |||
804 | if (psize < sizeof (*mod) + name_size + 1 | ||
805 | || '\0' != *((char *) &mod[1] + name_size) | ||
806 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | ||
807 | { | ||
808 | LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n"); | ||
809 | GNUNET_break_op (0); | ||
810 | recv_error (ch); | ||
811 | return; | ||
812 | } | ||
813 | ch->recv_state = MSG_STATE_MODIFIER; | ||
814 | break; | ||
815 | } | ||
816 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
817 | { | ||
818 | ch->recv_mod_value_size += psize - sizeof (*pmsg); | ||
819 | |||
820 | if (!(MSG_STATE_MODIFIER == ch->recv_state | ||
821 | || MSG_STATE_MOD_CONT == ch->recv_state) | ||
822 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | ||
823 | { | ||
824 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
825 | "Dropping out of order message modifier continuation " | ||
826 | "!(%u == %u || %u == %u) || %lu < %lu.\n", | ||
827 | MSG_STATE_MODIFIER, ch->recv_state, | ||
828 | MSG_STATE_MOD_CONT, ch->recv_state, | ||
829 | ch->recv_mod_value_size_expected, ch->recv_mod_value_size); | ||
830 | GNUNET_break_op (0); | ||
831 | recv_error (ch); | ||
832 | return; | ||
833 | } | ||
834 | break; | ||
835 | } | ||
836 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
837 | { | ||
838 | if (ch->recv_state < MSG_STATE_METHOD | ||
839 | || ch->recv_mod_value_size_expected != ch->recv_mod_value_size) | ||
840 | { | ||
841 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
842 | "Dropping out of order message data fragment " | ||
843 | "(%u < %u || %lu != %lu).\n", | ||
844 | ch->recv_state, MSG_STATE_METHOD, | ||
845 | ch->recv_mod_value_size_expected, ch->recv_mod_value_size); | ||
846 | |||
847 | GNUNET_break_op (0); | ||
848 | recv_error (ch); | ||
849 | return; | ||
850 | } | ||
851 | ch->recv_state = MSG_STATE_DATA; | ||
852 | break; | ||
853 | } | ||
854 | } | ||
855 | |||
856 | GNUNET_PSYC_MessageCallback message_cb | ||
857 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
858 | ? ch->hist_message_cb | ||
859 | : ch->message_cb; | ||
860 | |||
861 | if (NULL != message_cb) | ||
862 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg); | ||
863 | |||
864 | switch (ptype) | ||
865 | { | ||
866 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
867 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
868 | recv_reset (ch); | ||
869 | break; | ||
870 | } | ||
871 | } | ||
872 | } | 248 | } |
873 | 249 | ||
874 | 250 | ||
875 | /** | ||
876 | * Handle incoming message acknowledgement from the PSYC service. | ||
877 | * | ||
878 | * @param ch The channel the acknowledgement is sent to. | ||
879 | */ | ||
880 | static void | 251 | static void |
881 | handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch) | 252 | slave_recv_join_ack (void *cls, |
253 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
254 | const struct GNUNET_MessageHeader *msg) | ||
882 | { | 255 | { |
883 | if (0 == ch->tmit_ack_pending) | 256 | struct GNUNET_PSYC_Slave * |
884 | { | 257 | slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, |
885 | LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); | 258 | sizeof (struct GNUNET_PSYC_Channel)); |
886 | GNUNET_break (0); | 259 | struct CountersResult *cres = (struct CountersResult *) msg; |
887 | return; | 260 | if (NULL != slv->connect_cb) |
888 | } | 261 | slv->connect_cb (slv->cb_cls, GNUNET_ntohll (cres->max_message_id)); |
889 | ch->tmit_ack_pending--; | ||
890 | |||
891 | switch (ch->tmit.state) | ||
892 | { | ||
893 | case MSG_STATE_MODIFIER: | ||
894 | case MSG_STATE_MOD_CONT: | ||
895 | if (GNUNET_NO == ch->tmit_paused) | ||
896 | channel_transmit_mod (ch); | ||
897 | break; | ||
898 | |||
899 | case MSG_STATE_DATA: | ||
900 | if (GNUNET_NO == ch->tmit_paused) | ||
901 | channel_transmit_data (ch); | ||
902 | break; | ||
903 | |||
904 | case MSG_STATE_END: | ||
905 | case MSG_STATE_CANCEL: | ||
906 | break; | ||
907 | |||
908 | default: | ||
909 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
910 | "Ignoring message ACK in state %u.\n", ch->tmit.state); | ||
911 | } | ||
912 | } | 262 | } |
913 | 263 | ||
914 | 264 | ||
915 | static void | 265 | static void |
916 | handle_psyc_join_request (struct GNUNET_PSYC_Master *mst, | 266 | slave_recv_join_decision (void *cls, |
917 | const struct MasterJoinRequest *req) | 267 | struct GNUNET_CLIENT_MANAGER_Connection *client, |
268 | const struct GNUNET_MessageHeader *msg) | ||
918 | { | 269 | { |
919 | struct GNUNET_PSYC_MessageHeader *msg = NULL; | 270 | struct GNUNET_PSYC_Slave * |
920 | if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*msg)) | 271 | slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, |
921 | msg = (struct GNUNET_PSYC_MessageHeader *) &req[1]; | 272 | sizeof (struct GNUNET_PSYC_Channel)); |
273 | const struct SlaveJoinDecision * | ||
274 | dcsn = (const struct SlaveJoinDecision *) msg; | ||
922 | 275 | ||
923 | struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); | 276 | struct GNUNET_PSYC_MessageHeader *pmsg = NULL; |
924 | jh->mst = mst; | 277 | if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) |
925 | jh->slave_key = req->slave_key; | 278 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1]; |
926 | 279 | ||
927 | if (NULL != mst->join_req_cb) | 280 | struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); |
928 | mst->join_req_cb (mst->ch.cb_cls, &req->slave_key, msg, jh); | 281 | if (NULL != slv->join_dcsn_cb) |
282 | slv->join_dcsn_cb (slv->cb_cls, ntohl (dcsn->is_admitted), pmsg); | ||
929 | } | 283 | } |
930 | 284 | ||
931 | 285 | ||
932 | static void | 286 | static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = |
933 | handle_psyc_join_decision (struct GNUNET_PSYC_Slave *slv, | ||
934 | const struct SlaveJoinDecision *dcsn) | ||
935 | { | 287 | { |
936 | struct GNUNET_PSYC_MessageHeader *msg = NULL; | 288 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, |
937 | if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*msg)) | ||
938 | msg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1]; | ||
939 | 289 | ||
940 | struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); | 290 | { &channel_recv_message, NULL, |
941 | if (NULL != slv->join_dcsn_cb) | 291 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, |
942 | slv->join_dcsn_cb (slv->ch.cb_cls, ntohl (dcsn->is_admitted), msg); | 292 | sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, |
943 | } | ||
944 | 293 | ||
294 | { &channel_recv_message_ack, NULL, | ||
295 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | ||
296 | sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, | ||
945 | 297 | ||
946 | /** | 298 | { &master_recv_start_ack, NULL, |
947 | * Type of a function to call when we receive a message | 299 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, |
948 | * from the service. | 300 | sizeof (struct CountersResult), GNUNET_NO }, |
949 | * | ||
950 | * @param cls closure | ||
951 | * @param msg message received, NULL on timeout or fatal error | ||
952 | */ | ||
953 | static void | ||
954 | message_handler (void *cls, | ||
955 | const struct GNUNET_MessageHeader *msg) | ||
956 | { | ||
957 | struct GNUNET_PSYC_Channel *ch = cls; | ||
958 | struct GNUNET_PSYC_Master *mst = cls; | ||
959 | struct GNUNET_PSYC_Slave *slv = cls; | ||
960 | |||
961 | if (NULL == msg) | ||
962 | { | ||
963 | // timeout / disconnected from service, reconnect | ||
964 | reschedule_connect (ch); | ||
965 | return; | ||
966 | } | ||
967 | uint16_t size_eq = 0; | ||
968 | uint16_t size_min = 0; | ||
969 | uint16_t size = ntohs (msg->size); | ||
970 | uint16_t type = ntohs (msg->type); | ||
971 | |||
972 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
973 | "Received message of type %d and size %u from PSYC service\n", | ||
974 | type, size); | ||
975 | |||
976 | switch (type) | ||
977 | { | ||
978 | case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: | ||
979 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: | ||
980 | size_eq = sizeof (struct CountersResult); | ||
981 | break; | ||
982 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: | ||
983 | size_min = sizeof (struct GNUNET_PSYC_MessageHeader); | ||
984 | break; | ||
985 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: | ||
986 | size_eq = sizeof (struct GNUNET_MessageHeader); | ||
987 | break; | ||
988 | case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST: | ||
989 | size_min = sizeof (struct MasterJoinRequest); | ||
990 | break; | ||
991 | case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION: | ||
992 | size_min = sizeof (struct SlaveJoinDecision); | ||
993 | break; | ||
994 | default: | ||
995 | GNUNET_break_op (0); | ||
996 | return; | ||
997 | } | ||
998 | |||
999 | if (! ((0 < size_eq && size == size_eq) | ||
1000 | || (0 < size_min && size_min <= size))) | ||
1001 | { | ||
1002 | GNUNET_break_op (0); | ||
1003 | return; | ||
1004 | } | ||
1005 | |||
1006 | switch (type) | ||
1007 | { | ||
1008 | case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: | ||
1009 | { | ||
1010 | struct CountersResult *cres = (struct CountersResult *) msg; | ||
1011 | if (NULL != mst->start_cb) | ||
1012 | mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id)); | ||
1013 | break; | ||
1014 | } | ||
1015 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: | ||
1016 | { | ||
1017 | struct CountersResult *cres = (struct CountersResult *) msg; | ||
1018 | if (NULL != slv->connect_cb) | ||
1019 | slv->connect_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id)); | ||
1020 | break; | ||
1021 | } | ||
1022 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: | ||
1023 | { | ||
1024 | handle_psyc_message_ack (ch); | ||
1025 | break; | ||
1026 | } | ||
1027 | |||
1028 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: | ||
1029 | handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg); | ||
1030 | break; | ||
1031 | |||
1032 | case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST: | ||
1033 | handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch, | ||
1034 | (const struct MasterJoinRequest *) msg); | ||
1035 | break; | ||
1036 | |||
1037 | case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION: | ||
1038 | handle_psyc_join_decision ((struct GNUNET_PSYC_Slave *) ch, | ||
1039 | (const struct SlaveJoinDecision *) msg); | ||
1040 | break; | ||
1041 | } | ||
1042 | |||
1043 | if (NULL != ch->client) | ||
1044 | { | ||
1045 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, | ||
1046 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
1047 | } | ||
1048 | } | ||
1049 | 301 | ||
302 | { &master_recv_join_request, NULL, | ||
303 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | ||
304 | sizeof (struct MasterJoinRequest), GNUNET_YES }, | ||
1050 | 305 | ||
1051 | /** | 306 | { NULL, NULL, 0, 0, GNUNET_NO } |
1052 | * Transmit next message to service. | 307 | }; |
1053 | * | ||
1054 | * @param cls The struct GNUNET_PSYC_Channel. | ||
1055 | * @param size Number of bytes available in @a buf. | ||
1056 | * @param buf Where to copy the message. | ||
1057 | * | ||
1058 | * @return Number of bytes copied to @a buf. | ||
1059 | */ | ||
1060 | static size_t | ||
1061 | send_next_message (void *cls, size_t size, void *buf) | ||
1062 | { | ||
1063 | LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); | ||
1064 | struct GNUNET_PSYC_Channel *ch = cls; | ||
1065 | struct MessageQueue *mq = ch->tmit_head; | ||
1066 | if (NULL == mq) | ||
1067 | return 0; | ||
1068 | struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
1069 | size_t ret = ntohs (qmsg->size); | ||
1070 | ch->th = NULL; | ||
1071 | if (ret > size) | ||
1072 | { | ||
1073 | reschedule_connect (ch); | ||
1074 | return 0; | ||
1075 | } | ||
1076 | memcpy (buf, qmsg, ret); | ||
1077 | |||
1078 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq); | ||
1079 | GNUNET_free (mq); | ||
1080 | |||
1081 | if (NULL != ch->tmit_head) | ||
1082 | transmit_next (ch); | ||
1083 | |||
1084 | if (GNUNET_NO == ch->in_receive) | ||
1085 | { | ||
1086 | ch->in_receive = GNUNET_YES; | ||
1087 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, | ||
1088 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
1089 | } | ||
1090 | return ret; | ||
1091 | } | ||
1092 | 308 | ||
1093 | 309 | ||
1094 | /** | 310 | static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = |
1095 | * Schedule transmission of the next message from our queue. | ||
1096 | * | ||
1097 | * @param ch PSYC handle. | ||
1098 | */ | ||
1099 | static void | ||
1100 | transmit_next (struct GNUNET_PSYC_Channel *ch) | ||
1101 | { | 311 | { |
1102 | LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); | 312 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, |
1103 | if (NULL != ch->th || NULL == ch->client) | ||
1104 | return; | ||
1105 | |||
1106 | struct MessageQueue *mq = ch->tmit_head; | ||
1107 | if (NULL == mq) | ||
1108 | return; | ||
1109 | struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
1110 | |||
1111 | ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client, | ||
1112 | ntohs (qmsg->size), | ||
1113 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1114 | GNUNET_NO, | ||
1115 | &send_next_message, | ||
1116 | ch); | ||
1117 | } | ||
1118 | 313 | ||
314 | { &channel_recv_message, NULL, | ||
315 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | ||
316 | sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, | ||
1119 | 317 | ||
1120 | /** | 318 | { &channel_recv_message_ack, NULL, |
1121 | * Try again to connect to the PSYC service. | 319 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, |
1122 | * | 320 | sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, |
1123 | * @param cls Channel handle. | ||
1124 | * @param tc Scheduler context. | ||
1125 | */ | ||
1126 | static void | ||
1127 | reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1128 | { | ||
1129 | struct GNUNET_PSYC_Channel *ch = cls; | ||
1130 | |||
1131 | recv_reset (ch); | ||
1132 | ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
1133 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1134 | "Connecting to PSYC service.\n"); | ||
1135 | GNUNET_assert (NULL == ch->client); | ||
1136 | ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); | ||
1137 | GNUNET_assert (NULL != ch->client); | ||
1138 | uint16_t reconn_size = ntohs (ch->reconnect_msg->size); | ||
1139 | |||
1140 | if (NULL == ch->tmit_head || | ||
1141 | 0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size)) | ||
1142 | { | ||
1143 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size); | ||
1144 | memcpy (&mq[1], ch->reconnect_msg, reconn_size); | ||
1145 | GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq); | ||
1146 | } | ||
1147 | transmit_next (ch); | ||
1148 | } | ||
1149 | 321 | ||
322 | { &slave_recv_join_ack, NULL, | ||
323 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK, | ||
324 | sizeof (struct CountersResult), GNUNET_NO }, | ||
1150 | 325 | ||
1151 | /** | 326 | { &slave_recv_join_decision, NULL, |
1152 | * Disconnect from the PSYC service. | 327 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, |
1153 | * | 328 | sizeof (struct SlaveJoinDecision), GNUNET_YES }, |
1154 | * @param c Channel handle to disconnect. | 329 | |
1155 | */ | 330 | { NULL, NULL, 0, 0, GNUNET_NO } |
1156 | static void | 331 | }; |
1157 | disconnect (void *c) | ||
1158 | { | ||
1159 | struct GNUNET_PSYC_Channel *ch = c; | ||
1160 | |||
1161 | GNUNET_assert (NULL != ch); | ||
1162 | if (ch->tmit_head != ch->tmit_tail) | ||
1163 | { | ||
1164 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1165 | "Disconnecting while there are still outstanding messages!\n"); | ||
1166 | GNUNET_break (0); | ||
1167 | } | ||
1168 | if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK) | ||
1169 | { | ||
1170 | GNUNET_SCHEDULER_cancel (ch->reconnect_task); | ||
1171 | ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
1172 | } | ||
1173 | if (NULL != ch->th) | ||
1174 | { | ||
1175 | GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th); | ||
1176 | ch->th = NULL; | ||
1177 | } | ||
1178 | if (NULL != ch->client) | ||
1179 | { | ||
1180 | GNUNET_CLIENT_disconnect (ch->client); | ||
1181 | ch->client = NULL; | ||
1182 | } | ||
1183 | if (NULL != ch->reconnect_msg) | ||
1184 | { | ||
1185 | GNUNET_free (ch->reconnect_msg); | ||
1186 | ch->reconnect_msg = NULL; | ||
1187 | } | ||
1188 | } | ||
1189 | 332 | ||
1190 | 333 | ||
1191 | /** | 334 | /** |
@@ -1227,24 +370,29 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1227 | void *cls) | 370 | void *cls) |
1228 | { | 371 | { |
1229 | struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); | 372 | struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); |
1230 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | 373 | struct GNUNET_PSYC_Channel *chn = &mst->chn; |
1231 | struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); | ||
1232 | 374 | ||
375 | struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); | ||
1233 | req->header.size = htons (sizeof (*req)); | 376 | req->header.size = htons (sizeof (*req)); |
1234 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); | 377 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); |
1235 | req->channel_key = *channel_key; | 378 | req->channel_key = *channel_key; |
1236 | req->policy = policy; | 379 | req->policy = policy; |
1237 | 380 | ||
381 | chn->connect_msg = (struct GNUNET_MessageHeader *) req; | ||
382 | chn->cfg = cfg; | ||
383 | chn->is_master = GNUNET_YES; | ||
384 | |||
1238 | mst->start_cb = start_cb; | 385 | mst->start_cb = start_cb; |
1239 | mst->join_req_cb = join_request_cb; | 386 | mst->join_req_cb = join_request_cb; |
1240 | ch->message_cb = message_cb; | 387 | mst->cb_cls = cls; |
1241 | ch->cb_cls = cls; | 388 | |
1242 | ch->cfg = cfg; | 389 | chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", master_handlers); |
1243 | ch->is_master = GNUNET_YES; | 390 | GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn)); |
1244 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; | 391 | |
1245 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 392 | chn->tmit = GNUNET_PSYC_transmit_create (chn->client); |
1246 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); | 393 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls); |
1247 | 394 | ||
395 | channel_send_connect_msg (chn); | ||
1248 | return mst; | 396 | return mst; |
1249 | } | 397 | } |
1250 | 398 | ||
@@ -1253,12 +401,13 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1253 | * Stop a PSYC master channel. | 401 | * Stop a PSYC master channel. |
1254 | * | 402 | * |
1255 | * @param master PSYC channel master to stop. | 403 | * @param master PSYC channel master to stop. |
404 | * @param keep_active FIXME | ||
1256 | */ | 405 | */ |
1257 | void | 406 | void |
1258 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) | 407 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst) |
1259 | { | 408 | { |
1260 | disconnect (master); | 409 | GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES); |
1261 | GNUNET_free (master); | 410 | GNUNET_free (mst); |
1262 | } | 411 | } |
1263 | 412 | ||
1264 | 413 | ||
@@ -1292,7 +441,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1292 | const struct GNUNET_PeerIdentity *relays, | 441 | const struct GNUNET_PeerIdentity *relays, |
1293 | const struct GNUNET_PSYC_MessageHeader *join_resp) | 442 | const struct GNUNET_PSYC_MessageHeader *join_resp) |
1294 | { | 443 | { |
1295 | struct GNUNET_PSYC_Channel *ch = &jh->mst->ch; | 444 | struct GNUNET_PSYC_Channel *chn = &jh->mst->chn; |
1296 | struct MasterJoinDecision *dcsn; | 445 | struct MasterJoinDecision *dcsn; |
1297 | uint16_t join_resp_size | 446 | uint16_t join_resp_size |
1298 | = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0; | 447 | = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0; |
@@ -1302,9 +451,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1302 | < sizeof (*dcsn) + relay_size + join_resp_size) | 451 | < sizeof (*dcsn) + relay_size + join_resp_size) |
1303 | return GNUNET_SYSERR; | 452 | return GNUNET_SYSERR; |
1304 | 453 | ||
1305 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn) | 454 | dcsn = GNUNET_malloc (sizeof (*dcsn) + relay_size + join_resp_size); |
1306 | + relay_size + join_resp_size); | ||
1307 | dcsn = (struct MasterJoinDecision *) &mq[1]; | ||
1308 | dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); | 455 | dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); |
1309 | dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); | 456 | dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); |
1310 | dcsn->is_admitted = htonl (is_admitted); | 457 | dcsn->is_admitted = htonl (is_admitted); |
@@ -1313,8 +460,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1313 | if (0 < join_resp_size) | 460 | if (0 < join_resp_size) |
1314 | memcpy (&dcsn[1], join_resp, join_resp_size); | 461 | memcpy (&dcsn[1], join_resp, join_resp_size); |
1315 | 462 | ||
1316 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); | 463 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header); |
1317 | transmit_next (ch); | ||
1318 | return GNUNET_OK; | 464 | return GNUNET_OK; |
1319 | } | 465 | } |
1320 | 466 | ||
@@ -1332,40 +478,59 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1332 | * @return Transmission handle, NULL on error (i.e. more than one request queued). | 478 | * @return Transmission handle, NULL on error (i.e. more than one request queued). |
1333 | */ | 479 | */ |
1334 | struct GNUNET_PSYC_MasterTransmitHandle * | 480 | struct GNUNET_PSYC_MasterTransmitHandle * |
1335 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | 481 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst, |
1336 | const char *method_name, | 482 | const char *method_name, |
1337 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, | 483 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, |
1338 | GNUNET_PSYC_TransmitNotifyData notify_data, | 484 | GNUNET_PSYC_TransmitNotifyData notify_data, |
1339 | void *notify_cls, | 485 | void *notify_cls, |
1340 | enum GNUNET_PSYC_MasterTransmitFlags flags) | 486 | enum GNUNET_PSYC_MasterTransmitFlags flags) |
1341 | { | 487 | { |
1342 | return (struct GNUNET_PSYC_MasterTransmitHandle *) | 488 | if (GNUNET_OK |
1343 | channel_transmit (&master->ch, method_name, notify_mod, notify_data, | 489 | == GNUNET_PSYC_transmit_message (mst->chn.tmit, method_name, NULL, |
1344 | notify_cls, flags); | 490 | notify_mod, notify_data, notify_cls, |
491 | flags)) | ||
492 | return (struct GNUNET_PSYC_MasterTransmitHandle *) mst->chn.tmit; | ||
493 | else | ||
494 | return NULL; | ||
1345 | } | 495 | } |
1346 | 496 | ||
1347 | 497 | ||
1348 | /** | 498 | /** |
1349 | * Resume transmission to the channel. | 499 | * Resume transmission to the channel. |
1350 | * | 500 | * |
1351 | * @param th Handle of the request that is being resumed. | 501 | * @param tmit Handle of the request that is being resumed. |
1352 | */ | 502 | */ |
1353 | void | 503 | void |
1354 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | 504 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *tmit) |
1355 | { | 505 | { |
1356 | channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | 506 | GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit); |
1357 | } | 507 | } |
1358 | 508 | ||
1359 | 509 | ||
1360 | /** | 510 | /** |
1361 | * Abort transmission request to the channel. | 511 | * Abort transmission request to the channel. |
1362 | * | 512 | * |
1363 | * @param th Handle of the request that is being aborted. | 513 | * @param tmit Handle of the request that is being aborted. |
1364 | */ | 514 | */ |
1365 | void | 515 | void |
1366 | GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) | 516 | GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *tmit) |
1367 | { | 517 | { |
1368 | channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | 518 | GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit); |
519 | } | ||
520 | |||
521 | |||
522 | /** | ||
523 | * Convert a channel @a master to a @e channel handle to access the @e channel | ||
524 | * APIs. | ||
525 | * | ||
526 | * @param master Channel master handle. | ||
527 | * | ||
528 | * @return Channel handle, valid for as long as @a master is valid. | ||
529 | */ | ||
530 | struct GNUNET_PSYC_Channel * | ||
531 | GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) | ||
532 | { | ||
533 | return &master->chn; | ||
1369 | } | 534 | } |
1370 | 535 | ||
1371 | 536 | ||
@@ -1420,7 +585,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1420 | uint16_t data_size) | 585 | uint16_t data_size) |
1421 | { | 586 | { |
1422 | struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); | 587 | struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); |
1423 | struct GNUNET_PSYC_Channel *ch = &slv->ch; | 588 | struct GNUNET_PSYC_Channel *chn = &slv->chn; |
1424 | struct SlaveJoinRequest *req | 589 | struct SlaveJoinRequest *req |
1425 | = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays)); | 590 | = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays)); |
1426 | req->header.size = htons (sizeof (*req) | 591 | req->header.size = htons (sizeof (*req) |
@@ -1432,17 +597,21 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1432 | req->relay_count = htonl (relay_count); | 597 | req->relay_count = htonl (relay_count); |
1433 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); | 598 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); |
1434 | 599 | ||
600 | chn->connect_msg = (struct GNUNET_MessageHeader *) req; | ||
601 | chn->cfg = cfg; | ||
602 | chn->is_master = GNUNET_NO; | ||
603 | |||
1435 | slv->connect_cb = connect_cb; | 604 | slv->connect_cb = connect_cb; |
1436 | slv->join_dcsn_cb = join_decision_cb; | 605 | slv->join_dcsn_cb = join_decision_cb; |
1437 | ch->message_cb = message_cb; | 606 | slv->cb_cls = cls; |
1438 | ch->cb_cls = cls; | ||
1439 | 607 | ||
1440 | ch->cfg = cfg; | 608 | chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers); |
1441 | ch->is_master = GNUNET_NO; | 609 | GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn)); |
1442 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; | ||
1443 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | ||
1444 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); | ||
1445 | 610 | ||
611 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls); | ||
612 | chn->tmit = GNUNET_PSYC_transmit_create (chn->client); | ||
613 | |||
614 | channel_send_connect_msg (chn); | ||
1446 | return slv; | 615 | return slv; |
1447 | } | 616 | } |
1448 | 617 | ||
@@ -1456,10 +625,10 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1456 | * @param slave Slave handle. | 625 | * @param slave Slave handle. |
1457 | */ | 626 | */ |
1458 | void | 627 | void |
1459 | GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) | 628 | GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv) |
1460 | { | 629 | { |
1461 | disconnect (slave); | 630 | GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES); |
1462 | GNUNET_free (slave); | 631 | GNUNET_free (slv); |
1463 | } | 632 | } |
1464 | 633 | ||
1465 | 634 | ||
@@ -1477,69 +646,59 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) | |||
1477 | * queued). | 646 | * queued). |
1478 | */ | 647 | */ |
1479 | struct GNUNET_PSYC_SlaveTransmitHandle * | 648 | struct GNUNET_PSYC_SlaveTransmitHandle * |
1480 | GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, | 649 | GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slv, |
1481 | const char *method_name, | 650 | const char *method_name, |
1482 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, | 651 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, |
1483 | GNUNET_PSYC_TransmitNotifyData notify_data, | 652 | GNUNET_PSYC_TransmitNotifyData notify_data, |
1484 | void *notify_cls, | 653 | void *notify_cls, |
1485 | enum GNUNET_PSYC_SlaveTransmitFlags flags) | 654 | enum GNUNET_PSYC_SlaveTransmitFlags flags) |
655 | |||
1486 | { | 656 | { |
1487 | return (struct GNUNET_PSYC_SlaveTransmitHandle *) | 657 | if (GNUNET_OK |
1488 | channel_transmit (&slave->ch, method_name, | 658 | == GNUNET_PSYC_transmit_message (slv->chn.tmit, method_name, NULL, |
1489 | notify_mod, notify_data, notify_cls, flags); | 659 | notify_mod, notify_data, notify_cls, |
660 | flags)) | ||
661 | return (struct GNUNET_PSYC_SlaveTransmitHandle *) slv->chn.tmit; | ||
662 | else | ||
663 | return NULL; | ||
1490 | } | 664 | } |
1491 | 665 | ||
1492 | 666 | ||
1493 | /** | 667 | /** |
1494 | * Resume transmission to the master. | 668 | * Resume transmission to the master. |
1495 | * | 669 | * |
1496 | * @param th Handle of the request that is being resumed. | 670 | * @param tmit Handle of the request that is being resumed. |
1497 | */ | 671 | */ |
1498 | void | 672 | void |
1499 | GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th) | 673 | GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *tmit) |
1500 | { | 674 | { |
1501 | channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | 675 | GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit); |
1502 | } | 676 | } |
1503 | 677 | ||
1504 | 678 | ||
1505 | /** | 679 | /** |
1506 | * Abort transmission request to master. | 680 | * Abort transmission request to master. |
1507 | * | 681 | * |
1508 | * @param th Handle of the request that is being aborted. | 682 | * @param tmit Handle of the request that is being aborted. |
1509 | */ | 683 | */ |
1510 | void | 684 | void |
1511 | GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) | 685 | GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *tmit) |
1512 | { | 686 | { |
1513 | channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | 687 | GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit); |
1514 | } | ||
1515 | |||
1516 | |||
1517 | /** | ||
1518 | * Convert a channel @a master to a @e channel handle to access the @e channel | ||
1519 | * APIs. | ||
1520 | * | ||
1521 | * @param master Channel master handle. | ||
1522 | * | ||
1523 | * @return Channel handle, valid for as long as @a master is valid. | ||
1524 | */ | ||
1525 | struct GNUNET_PSYC_Channel * | ||
1526 | GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) | ||
1527 | { | ||
1528 | return &master->ch; | ||
1529 | } | 688 | } |
1530 | 689 | ||
1531 | 690 | ||
1532 | /** | 691 | /** |
1533 | * Convert @a slave to a @e channel handle to access the @e channel APIs. | 692 | * Convert @a slave to a @e channel handle to access the @e channel APIs. |
1534 | * | 693 | * |
1535 | * @param slave Slave handle. | 694 | * @param slv Slave handle. |
1536 | * | 695 | * |
1537 | * @return Channel handle, valid for as long as @a slave is valid. | 696 | * @return Channel handle, valid for as long as @a slave is valid. |
1538 | */ | 697 | */ |
1539 | struct GNUNET_PSYC_Channel * | 698 | struct GNUNET_PSYC_Channel * |
1540 | GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) | 699 | GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv) |
1541 | { | 700 | { |
1542 | return &slave->ch; | 701 | return &slv->chn; |
1543 | } | 702 | } |
1544 | 703 | ||
1545 | 704 | ||
@@ -1565,23 +724,17 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) | |||
1565 | * @param effective_since Addition of slave is in effect since this message ID. | 724 | * @param effective_since Addition of slave is in effect since this message ID. |
1566 | */ | 725 | */ |
1567 | void | 726 | void |
1568 | GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, | 727 | GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, |
1569 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | 728 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, |
1570 | uint64_t announced_at, | 729 | uint64_t announced_at, |
1571 | uint64_t effective_since) | 730 | uint64_t effective_since) |
1572 | { | 731 | { |
1573 | struct ChannelSlaveAdd *slvadd; | 732 | struct ChannelSlaveAdd *add = GNUNET_malloc (sizeof (*add)); |
1574 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd)); | 733 | add->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); |
1575 | 734 | add->header.size = htons (sizeof (*add)); | |
1576 | slvadd = (struct ChannelSlaveAdd *) &mq[1]; | 735 | add->announced_at = GNUNET_htonll (announced_at); |
1577 | slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); | 736 | add->effective_since = GNUNET_htonll (effective_since); |
1578 | slvadd->header.size = htons (sizeof (*slvadd)); | 737 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &add->header); |
1579 | slvadd->announced_at = GNUNET_htonll (announced_at); | ||
1580 | slvadd->effective_since = GNUNET_htonll (effective_since); | ||
1581 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, | ||
1582 | channel->tmit_tail, | ||
1583 | mq); | ||
1584 | transmit_next (channel); | ||
1585 | } | 738 | } |
1586 | 739 | ||
1587 | 740 | ||
@@ -1607,21 +760,15 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, | |||
1607 | * @param announced_at ID of the message that announced the membership change. | 760 | * @param announced_at ID of the message that announced the membership change. |
1608 | */ | 761 | */ |
1609 | void | 762 | void |
1610 | GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, | 763 | GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, |
1611 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | 764 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, |
1612 | uint64_t announced_at) | 765 | uint64_t announced_at) |
1613 | { | 766 | { |
1614 | struct ChannelSlaveRemove *slvrm; | 767 | struct ChannelSlaveRemove *rm = GNUNET_malloc (sizeof (*rm)); |
1615 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm)); | 768 | rm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); |
1616 | 769 | rm->header.size = htons (sizeof (*rm)); | |
1617 | slvrm = (struct ChannelSlaveRemove *) &mq[1]; | 770 | rm->announced_at = GNUNET_htonll (announced_at); |
1618 | slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); | 771 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &rm->header); |
1619 | slvrm->header.size = htons (sizeof (*slvrm)); | ||
1620 | slvrm->announced_at = GNUNET_htonll (announced_at); | ||
1621 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, | ||
1622 | channel->tmit_tail, | ||
1623 | mq); | ||
1624 | transmit_next (channel); | ||
1625 | } | 772 | } |
1626 | 773 | ||
1627 | 774 | ||
diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c index 6dd968190..0104c93e8 100644 --- a/src/psyc/psyc_util_lib.c +++ b/src/psyc/psyc_util_lib.c | |||
@@ -28,6 +28,7 @@ | |||
28 | 28 | ||
29 | #include "platform.h" | 29 | #include "platform.h" |
30 | #include "gnunet_util_lib.h" | 30 | #include "gnunet_util_lib.h" |
31 | #include "gnunet_env_lib.h" | ||
31 | #include "gnunet_psyc_service.h" | 32 | #include "gnunet_psyc_service.h" |
32 | #include "gnunet_psyc_util_lib.h" | 33 | #include "gnunet_psyc_util_lib.h" |
33 | 34 | ||
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 35e80868c..6468b8a2b 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -68,6 +68,7 @@ struct TransmitClosure | |||
68 | struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit; | 68 | struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit; |
69 | struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit; | 69 | struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit; |
70 | struct GNUNET_ENV_Environment *env; | 70 | struct GNUNET_ENV_Environment *env; |
71 | struct GNUNET_ENV_Modifier *mod; | ||
71 | char *data[16]; | 72 | char *data[16]; |
72 | const char *mod_value; | 73 | const char *mod_value; |
73 | size_t mod_value_size; | 74 | size_t mod_value_size; |
@@ -79,7 +80,7 @@ struct TransmitClosure | |||
79 | 80 | ||
80 | struct TransmitClosure *tmit; | 81 | struct TransmitClosure *tmit; |
81 | 82 | ||
82 | static int join_req_count; | 83 | static uint8_t join_req_count; |
83 | 84 | ||
84 | enum | 85 | enum |
85 | { | 86 | { |
@@ -183,7 +184,7 @@ master_message_cb (void *cls, uint64_t message_id, uint32_t flags, | |||
183 | 184 | ||
184 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 185 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
185 | "Master got message part of type %u and size %u " | 186 | "Master got message part of type %u and size %u " |
186 | "belonging to message ID %llu with flags %xu\n", | 187 | "belonging to message ID %llu with flags %x\n", |
187 | type, size, message_id, flags); | 188 | type, size, message_id, flags); |
188 | 189 | ||
189 | switch (test) | 190 | switch (test) |
@@ -227,7 +228,7 @@ slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, | |||
227 | 228 | ||
228 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 229 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
229 | "Slave got message part of type %u and size %u " | 230 | "Slave got message part of type %u and size %u " |
230 | "belonging to message ID %llu with flags %xu\n", | 231 | "belonging to message ID %llu with flags %x\n", |
231 | type, size, message_id, flags); | 232 | type, size, message_id, flags); |
232 | 233 | ||
233 | switch (test) | 234 | switch (test) |
@@ -256,6 +257,48 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
256 | 257 | ||
257 | 258 | ||
258 | static int | 259 | static int |
260 | tmit_notify_data (void *cls, uint16_t *data_size, void *data) | ||
261 | { | ||
262 | struct TransmitClosure *tmit = cls; | ||
263 | if (0 == tmit->data_count) | ||
264 | { | ||
265 | *data_size = 0; | ||
266 | return GNUNET_YES; | ||
267 | } | ||
268 | |||
269 | uint16_t size = strlen (tmit->data[tmit->n]); | ||
270 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
271 | "Transmit notify data: %u bytes available, " | ||
272 | "processing fragment %u/%u (size %u).\n", | ||
273 | *data_size, tmit->n + 1, tmit->data_count, size); | ||
274 | if (*data_size < size) | ||
275 | { | ||
276 | *data_size = 0; | ||
277 | GNUNET_assert (0); | ||
278 | return GNUNET_SYSERR; | ||
279 | } | ||
280 | |||
281 | if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n]) | ||
282 | { | ||
283 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); | ||
284 | tmit->paused = GNUNET_YES; | ||
285 | GNUNET_SCHEDULER_add_delayed ( | ||
286 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, | ||
287 | tmit->data_delay[tmit->n]), | ||
288 | &transmit_resume, tmit); | ||
289 | *data_size = 0; | ||
290 | return GNUNET_NO; | ||
291 | } | ||
292 | tmit->paused = GNUNET_NO; | ||
293 | |||
294 | *data_size = size; | ||
295 | memcpy (data, tmit->data[tmit->n], size); | ||
296 | |||
297 | return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES; | ||
298 | } | ||
299 | |||
300 | |||
301 | static int | ||
259 | tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper, | 302 | tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper, |
260 | uint32_t *full_value_size) | 303 | uint32_t *full_value_size) |
261 | { | 304 | { |
@@ -265,41 +308,39 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper, | |||
265 | "%u modifiers left to process.\n", | 308 | "%u modifiers left to process.\n", |
266 | *data_size, GNUNET_ENV_environment_get_count (tmit->env)); | 309 | *data_size, GNUNET_ENV_environment_get_count (tmit->env)); |
267 | 310 | ||
268 | enum GNUNET_ENV_Operator op = 0; | ||
269 | const char *name = NULL; | ||
270 | const char *value = NULL; | ||
271 | uint16_t name_size = 0; | 311 | uint16_t name_size = 0; |
272 | size_t value_size = 0; | 312 | size_t value_size = 0; |
313 | const char *value = NULL; | ||
273 | 314 | ||
274 | if (NULL != oper) | 315 | if (NULL != oper && NULL != tmit->mod) |
275 | { /* New modifier */ | 316 | { /* New modifier */ |
276 | if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name, | 317 | tmit->mod = tmit->mod->next; |
277 | (void *) &value, &value_size)) | 318 | if (NULL == tmit->mod) |
278 | { /* No more modifiers, continue with data */ | 319 | { /* No more modifiers, continue with data */ |
279 | *data_size = 0; | 320 | *data_size = 0; |
280 | return GNUNET_YES; | 321 | return GNUNET_YES; |
281 | } | 322 | } |
282 | 323 | ||
283 | GNUNET_assert (value_size < UINT32_MAX); | 324 | GNUNET_assert (tmit->mod->value_size < UINT32_MAX); |
284 | *full_value_size = value_size; | 325 | *full_value_size = tmit->mod->value_size; |
285 | *oper = op; | 326 | *oper = tmit->mod->oper; |
286 | name_size = strlen (name); | 327 | name_size = strlen (tmit->mod->name); |
287 | 328 | ||
288 | if (name_size + 1 + value_size <= *data_size) | 329 | if (name_size + 1 + tmit->mod->value_size <= *data_size) |
289 | { | 330 | { |
290 | *data_size = name_size + 1 + value_size; | 331 | *data_size = name_size + 1 + tmit->mod->value_size; |
291 | } | 332 | } |
292 | else | 333 | else |
293 | { | 334 | { |
294 | tmit->mod_value_size = value_size; | 335 | tmit->mod_value_size = tmit->mod->value_size; |
295 | value_size = *data_size - name_size - 1; | 336 | value_size = *data_size - name_size - 1; |
296 | tmit->mod_value_size -= value_size; | 337 | tmit->mod_value_size -= value_size; |
297 | tmit->mod_value = value + value_size; | 338 | tmit->mod_value = tmit->mod->value + value_size; |
298 | } | 339 | } |
299 | 340 | ||
300 | memcpy (data, name, name_size); | 341 | memcpy (data, tmit->mod->name, name_size); |
301 | ((char *)data)[name_size] = '\0'; | 342 | ((char *)data)[name_size] = '\0'; |
302 | memcpy ((char *)data + name_size + 1, value, value_size); | 343 | memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size); |
303 | } | 344 | } |
304 | else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size) | 345 | else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size) |
305 | { /* Modifier continuation */ | 346 | { /* Modifier continuation */ |
@@ -333,48 +374,6 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper, | |||
333 | } | 374 | } |
334 | 375 | ||
335 | 376 | ||
336 | static int | ||
337 | tmit_notify_data (void *cls, uint16_t *data_size, void *data) | ||
338 | { | ||
339 | struct TransmitClosure *tmit = cls; | ||
340 | if (0 == tmit->data_count) | ||
341 | { | ||
342 | *data_size = 0; | ||
343 | return GNUNET_YES; | ||
344 | } | ||
345 | |||
346 | uint16_t size = strlen (tmit->data[tmit->n]); | ||
347 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
348 | "Transmit notify data: %u bytes available, " | ||
349 | "processing fragment %u/%u (size %u).\n", | ||
350 | *data_size, tmit->n + 1, tmit->data_count, size); | ||
351 | if (*data_size < size) | ||
352 | { | ||
353 | *data_size = 0; | ||
354 | GNUNET_assert (0); | ||
355 | return GNUNET_SYSERR; | ||
356 | } | ||
357 | |||
358 | if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n]) | ||
359 | { | ||
360 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); | ||
361 | tmit->paused = GNUNET_YES; | ||
362 | GNUNET_SCHEDULER_add_delayed ( | ||
363 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, | ||
364 | tmit->data_delay[tmit->n]), | ||
365 | &transmit_resume, tmit); | ||
366 | *data_size = 0; | ||
367 | return GNUNET_NO; | ||
368 | } | ||
369 | tmit->paused = GNUNET_NO; | ||
370 | |||
371 | *data_size = size; | ||
372 | memcpy (data, tmit->data[tmit->n], size); | ||
373 | |||
374 | return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES; | ||
375 | } | ||
376 | |||
377 | |||
378 | static void | 377 | static void |
379 | slave_join (); | 378 | slave_join (); |
380 | 379 | ||
@@ -388,7 +387,7 @@ join_decision_cb (void *cls, int is_admitted, | |||
388 | 387 | ||
389 | if (GNUNET_YES != is_admitted) | 388 | if (GNUNET_YES != is_admitted) |
390 | { /* First join request is refused, retry. */ | 389 | { /* First join request is refused, retry. */ |
391 | //GNUNET_assert (1 == join_req_count); | 390 | GNUNET_assert (1 == join_req_count); |
392 | slave_join (); | 391 | slave_join (); |
393 | return; | 392 | return; |
394 | } | 393 | } |
@@ -403,6 +402,7 @@ join_decision_cb (void *cls, int is_admitted, | |||
403 | "_abc", "abc def", 7); | 402 | "_abc", "abc def", 7); |
404 | GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, | 403 | GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, |
405 | "_abc_def", "abc def ghi", 11); | 404 | "_abc_def", "abc def ghi", 11); |
405 | tmit->mod = GNUNET_ENV_environment_head (tmit->env); | ||
406 | tmit->n = 0; | 406 | tmit->n = 0; |
407 | tmit->data[0] = "slave test"; | 407 | tmit->data[0] = "slave test"; |
408 | tmit->data_count = 1; | 408 | tmit->data_count = 1; |
@@ -421,8 +421,8 @@ join_request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key | |||
421 | struct GNUNET_HashCode slave_key_hash; | 421 | struct GNUNET_HashCode slave_key_hash; |
422 | GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash); | 422 | GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash); |
423 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 423 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
424 | "Got join request from %s.\n", | 424 | "Got join request #%u from %s.\n", |
425 | GNUNET_h2s (&slave_key_hash)); | 425 | join_req_count, GNUNET_h2s (&slave_key_hash)); |
426 | 426 | ||
427 | /* Reject first request */ | 427 | /* Reject first request */ |
428 | int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO; | 428 | int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO; |
@@ -493,6 +493,7 @@ master_transmit () | |||
493 | name_cont, val_cont, | 493 | name_cont, val_cont, |
494 | GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size | 494 | GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size |
495 | + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD); | 495 | + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD); |
496 | tmit->mod = GNUNET_ENV_environment_head (tmit->env); | ||
496 | tmit->data[0] = "foo"; | 497 | tmit->data[0] = "foo"; |
497 | tmit->data[1] = GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1); | 498 | tmit->data[1] = GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1); |
498 | for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++) | 499 | for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++) |