aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-05-29 16:35:55 +0000
committerGabor X Toth <*@tg-x.net>2014-05-29 16:35:55 +0000
commit9955561e1b204ccf23fbf841f409bd3ef79be88c (patch)
tree0271c23ae9f1dad72266a0e6073d696e5afca027 /src/psyc
parenta5877668ba805c5e0efe622e6ce4c58ff5609bf9 (diff)
downloadgnunet-9955561e1b204ccf23fbf841f409bd3ef79be88c.tar.gz
gnunet-9955561e1b204ccf23fbf841f409bd3ef79be88c.zip
psyc, multicast: reorg code, use new client manager & psyc util lib
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c791
-rw-r--r--src/psyc/psyc_api.c1295
-rw-r--r--src/psyc/psyc_util_lib.c1
-rw-r--r--src/psyc/test_psyc.c129
4 files changed, 674 insertions, 1542 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 13f908b6c..9dcf40e0f 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -34,6 +34,7 @@
34#include "gnunet_multicast_service.h" 34#include "gnunet_multicast_service.h"
35#include "gnunet_psycstore_service.h" 35#include "gnunet_psycstore_service.h"
36#include "gnunet_psyc_service.h" 36#include "gnunet_psyc_service.h"
37#include "gnunet_psyc_util_lib.h"
37#include "psyc.h" 38#include "psyc.h"
38 39
39 40
@@ -174,10 +175,10 @@ struct FragmentQueue
174/** 175/**
175 * List of connected clients. 176 * List of connected clients.
176 */ 177 */
177struct ClientList 178struct ClientListItem
178{ 179{
179 struct ClientList *prev; 180 struct ClientListItem *prev;
180 struct ClientList *next; 181 struct ClientListItem *next;
181 struct GNUNET_SERVER_Client *client; 182 struct GNUNET_SERVER_Client *client;
182}; 183};
183 184
@@ -187,8 +188,8 @@ struct ClientList
187 */ 188 */
188struct Channel 189struct Channel
189{ 190{
190 struct ClientList *clients_head; 191 struct ClientListItem *clients_head;
191 struct ClientList *clients_tail; 192 struct ClientListItem *clients_tail;
192 193
193 struct TransmitMessage *tmit_head; 194 struct TransmitMessage *tmit_head;
194 struct TransmitMessage *tmit_tail; 195 struct TransmitMessage *tmit_tail;
@@ -282,7 +283,7 @@ struct Master
282 /** 283 /**
283 * Channel struct common for Master and Slave 284 * Channel struct common for Master and Slave
284 */ 285 */
285 struct Channel ch; 286 struct Channel chn;
286 287
287 /** 288 /**
288 * Private key of the channel. 289 * Private key of the channel.
@@ -339,7 +340,7 @@ struct Slave
339 /** 340 /**
340 * Channel struct common for Master and Slave 341 * Channel struct common for Master and Slave
341 */ 342 */
342 struct Channel ch; 343 struct Channel chn;
343 344
344 /** 345 /**
345 * Private key of the slave. 346 * Private key of the slave.
@@ -399,11 +400,11 @@ struct Slave
399 400
400 401
401static inline void 402static inline void
402transmit_message (struct Channel *ch); 403transmit_message (struct Channel *chn);
403 404
404 405
405static uint64_t 406static uint64_t
406message_queue_drop (struct Channel *ch); 407message_queue_drop (struct Channel *chn);
407 408
408 409
409/** 410/**
@@ -434,12 +435,12 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
434static void 435static void
435cleanup_master (struct Master *mst) 436cleanup_master (struct Master *mst)
436{ 437{
437 struct Channel *ch = &mst->ch; 438 struct Channel *chn = &mst->chn;
438 439
439 if (NULL != mst->origin) 440 if (NULL != mst->origin)
440 GNUNET_MULTICAST_origin_stop (mst->origin); 441 GNUNET_MULTICAST_origin_stop (mst->origin);
441 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); 442 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
442 GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch); 443 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
443} 444}
444 445
445 446
@@ -449,20 +450,20 @@ cleanup_master (struct Master *mst)
449static void 450static void
450cleanup_slave (struct Slave *slv) 451cleanup_slave (struct Slave *slv)
451{ 452{
452 struct Channel *ch = &slv->ch; 453 struct Channel *chn = &slv->chn;
453 struct GNUNET_CONTAINER_MultiHashMap * 454 struct GNUNET_CONTAINER_MultiHashMap *
454 ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, 455 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
455 &ch->pub_key_hash); 456 &chn->pub_key_hash);
456 GNUNET_assert (NULL != ch_slv); 457 GNUNET_assert (NULL != chn_slv);
457 GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv); 458 GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
458 459
459 if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv)) 460 if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
460 { 461 {
461 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash, 462 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
462 ch_slv); 463 chn_slv);
463 GNUNET_CONTAINER_multihashmap_destroy (ch_slv); 464 GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
464 } 465 }
465 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv); 466 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
466 467
467 if (NULL != slv->join_req) 468 if (NULL != slv->join_req)
468 GNUNET_free (slv->join_req); 469 GNUNET_free (slv->join_req);
@@ -470,7 +471,7 @@ cleanup_slave (struct Slave *slv)
470 GNUNET_free (slv->relays); 471 GNUNET_free (slv->relays);
471 if (NULL != slv->member) 472 if (NULL != slv->member)
472 GNUNET_MULTICAST_member_part (slv->member); 473 GNUNET_MULTICAST_member_part (slv->member);
473 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch); 474 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
474} 475}
475 476
476 477
@@ -478,18 +479,18 @@ cleanup_slave (struct Slave *slv)
478 * Clean up channel data structures after a client disconnected. 479 * Clean up channel data structures after a client disconnected.
479 */ 480 */
480static void 481static void
481cleanup_channel (struct Channel *ch) 482cleanup_channel (struct Channel *chn)
482{ 483{
483 message_queue_drop (ch); 484 message_queue_drop (chn);
484 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &ch->pub_key_hash); 485 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
485 486
486 if (NULL != ch->store_op) 487 if (NULL != chn->store_op)
487 GNUNET_PSYCSTORE_operation_cancel (ch->store_op); 488 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
488 489
489 (GNUNET_YES == ch->is_master) 490 (GNUNET_YES == chn->is_master)
490 ? cleanup_master ((struct Master *) ch) 491 ? cleanup_master ((struct Master *) chn)
491 : cleanup_slave ((struct Slave *) ch); 492 : cleanup_slave ((struct Slave *) chn);
492 GNUNET_free (ch); 493 GNUNET_free (chn);
493} 494}
494 495
495 496
@@ -507,41 +508,41 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
507 return; 508 return;
508 509
509 struct Channel * 510 struct Channel *
510 ch = GNUNET_SERVER_client_get_user_context (client, struct Channel); 511 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
511 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
512 "%p Client (%s) disconnected from channel %s\n",
513 ch, (GNUNET_YES == ch->is_master) ? "master" : "slave",
514 GNUNET_h2s (&ch->pub_key_hash));
515 512
516 if (NULL == ch) 513 if (NULL == chn)
517 { 514 {
518 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 515 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519 "%p User context is NULL in client_disconnect()\n", ch); 516 "%p User context is NULL in client_disconnect()\n", chn);
520 GNUNET_break (0);
521 return; 517 return;
522 } 518 }
523 519
524 struct ClientList *cl = ch->clients_head; 520 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525 while (NULL != cl) 521 "%p Client (%s) disconnected from channel %s\n",
522 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
523 GNUNET_h2s (&chn->pub_key_hash));
524
525 struct ClientListItem *cli = chn->clients_head;
526 while (NULL != cli)
526 { 527 {
527 if (cl->client == client) 528 if (cli->client == client)
528 { 529 {
529 GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl); 530 GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
530 GNUNET_free (cl); 531 GNUNET_free (cli);
531 break; 532 break;
532 } 533 }
533 cl = cl->next; 534 cli = cli->next;
534 } 535 }
535 536
536 if (NULL == ch->clients_head) 537 if (NULL == chn->clients_head)
537 { /* Last client disconnected. */ 538 { /* Last client disconnected. */
538 if (NULL != ch->tmit_head) 539 if (NULL != chn->tmit_head)
539 { /* Send pending messages to multicast before cleanup. */ 540 { /* Send pending messages to multicast before cleanup. */
540 transmit_message (ch); 541 transmit_message (chn);
541 } 542 }
542 else 543 else
543 { 544 {
544 cleanup_channel (ch); 545 cleanup_channel (chn);
545 } 546 }
546 } 547 }
547} 548}
@@ -551,18 +552,18 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
551 * Send message to all clients connected to the channel. 552 * Send message to all clients connected to the channel.
552 */ 553 */
553static void 554static void
554msg_to_clients (const struct Channel *ch, 555client_send_msg (const struct Channel *chn,
555 const struct GNUNET_MessageHeader *msg) 556 const struct GNUNET_MessageHeader *msg)
556{ 557{
557 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 558 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
558 "%p Sending message to clients.\n", ch); 559 "%p Sending message to clients.\n", chn);
559 560
560 struct ClientList *cl = ch->clients_head; 561 struct ClientListItem *cli = chn->clients_head;
561 while (NULL != cl) 562 while (NULL != cli)
562 { 563 {
563 GNUNET_SERVER_notification_context_add (nc, cl->client); 564 GNUNET_SERVER_notification_context_add (nc, cli->client);
564 GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO); 565 GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
565 cl = cl->next; 566 cli = cli->next;
566 } 567 }
567} 568}
568 569
@@ -573,7 +574,7 @@ msg_to_clients (const struct Channel *ch,
573struct JoinMemTestClosure 574struct JoinMemTestClosure
574{ 575{
575 struct GNUNET_CRYPTO_EddsaPublicKey slave_key; 576 struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
576 struct Channel *ch; 577 struct Channel *chn;
577 struct GNUNET_MULTICAST_JoinHandle *jh; 578 struct GNUNET_MULTICAST_JoinHandle *jh;
578 struct MasterJoinRequest *master_join_req; 579 struct MasterJoinRequest *master_join_req;
579}; 580};
@@ -587,15 +588,15 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
587{ 588{
588 struct JoinMemTestClosure *jcls = cls; 589 struct JoinMemTestClosure *jcls = cls;
589 590
590 if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master) 591 if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
591 { /* Pass on join request to client if this is a master channel */ 592 { /* Pass on join request to client if this is a master channel */
592 struct Master *mst = (struct Master *) jcls->ch; 593 struct Master *mst = (struct Master *) jcls->chn;
593 struct GNUNET_HashCode slave_key_hash; 594 struct GNUNET_HashCode slave_key_hash;
594 GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key), 595 GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
595 &slave_key_hash); 596 &slave_key_hash);
596 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh, 597 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
597 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 598 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
598 msg_to_clients (jcls->ch, &jcls->master_join_req->header); 599 client_send_msg (jcls->chn, &jcls->master_join_req->header);
599 } 600 }
600 else 601 else
601 { 602 {
@@ -611,13 +612,13 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
611 * Incoming join request from multicast. 612 * Incoming join request from multicast.
612 */ 613 */
613static void 614static void
614mcast_join_request_cb (void *cls, 615mcast_recv_join_request (void *cls,
615 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 616 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
616 const struct GNUNET_MessageHeader *join_msg, 617 const struct GNUNET_MessageHeader *join_msg,
617 struct GNUNET_MULTICAST_JoinHandle *jh) 618 struct GNUNET_MULTICAST_JoinHandle *jh)
618{ 619{
619 struct Channel *ch = cls; 620 struct Channel *chn = cls;
620 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch); 621 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
621 622
622 uint16_t join_msg_size = 0; 623 uint16_t join_msg_size = 0;
623 if (NULL != join_msg) 624 if (NULL != join_msg)
@@ -630,7 +631,7 @@ mcast_join_request_cb (void *cls,
630 { 631 {
631 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 632 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
632 "%p Got join message with invalid type %u.\n", 633 "%p Got join message with invalid type %u.\n",
633 ch, ntohs (join_msg->type)); 634 chn, ntohs (join_msg->type));
634 } 635 }
635 } 636 }
636 637
@@ -643,12 +644,12 @@ mcast_join_request_cb (void *cls,
643 644
644 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls)); 645 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
645 jcls->slave_key = *slave_key; 646 jcls->slave_key = *slave_key;
646 jcls->ch = ch; 647 jcls->chn = chn;
647 jcls->jh = jh; 648 jcls->jh = jh;
648 jcls->master_join_req = req; 649 jcls->master_join_req = req;
649 650
650 GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key, 651 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
651 ch->max_message_id, 0, 652 chn->max_message_id, 0,
652 &join_mem_test_cb, jcls); 653 &join_mem_test_cb, jcls);
653} 654}
654 655
@@ -657,14 +658,14 @@ mcast_join_request_cb (void *cls,
657 * Join decision received from multicast. 658 * Join decision received from multicast.
658 */ 659 */
659static void 660static void
660mcast_join_decision_cb (void *cls, int is_admitted, 661mcast_recv_join_decision (void *cls, int is_admitted,
661 const struct GNUNET_PeerIdentity *peer, 662 const struct GNUNET_PeerIdentity *peer,
662 uint16_t relay_count, 663 uint16_t relay_count,
663 const struct GNUNET_PeerIdentity *relays, 664 const struct GNUNET_PeerIdentity *relays,
664 const struct GNUNET_MessageHeader *join_resp) 665 const struct GNUNET_MessageHeader *join_resp)
665{ 666{
666 struct Slave *slv = cls; 667 struct Slave *slv = cls;
667 struct Channel *ch = &slv->ch; 668 struct Channel *chn = &slv->chn;
668 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669 "%p Got join decision: %d\n", slv, is_admitted); 670 "%p Got join decision: %d\n", slv, is_admitted);
670 671
@@ -677,11 +678,11 @@ mcast_join_decision_cb (void *cls, int is_admitted,
677 if (0 < join_resp_size) 678 if (0 < join_resp_size)
678 memcpy (&dcsn[1], join_resp, join_resp_size); 679 memcpy (&dcsn[1], join_resp, join_resp_size);
679 680
680 msg_to_clients (ch, &dcsn->header); 681 client_send_msg (chn, &dcsn->header);
681 682
682 if (GNUNET_YES == is_admitted) 683 if (GNUNET_YES == is_admitted)
683 { 684 {
684 ch->ready = GNUNET_YES; 685 chn->ready = GNUNET_YES;
685 } 686 }
686 else 687 else
687 { 688 {
@@ -691,20 +692,20 @@ mcast_join_decision_cb (void *cls, int is_admitted,
691 692
692 693
693static void 694static void
694mcast_membership_test_cb (void *cls, 695mcast_recv_membership_test (void *cls,
695 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 696 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
696 uint64_t message_id, uint64_t group_generation, 697 uint64_t message_id, uint64_t group_generation,
697 struct GNUNET_MULTICAST_MembershipTestHandle *mth) 698 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
698{ 699{
699 700
700} 701}
701 702
702 703
703static void 704static void
704mcast_replay_fragment_cb (void *cls, 705mcast_recv_replay_fragment (void *cls,
705 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 706 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
706 uint64_t fragment_id, uint64_t flags, 707 uint64_t fragment_id, uint64_t flags,
707 struct GNUNET_MULTICAST_ReplayHandle *rh) 708 struct GNUNET_MULTICAST_ReplayHandle *rh)
708 709
709{ 710{
710 711
@@ -712,25 +713,17 @@ mcast_replay_fragment_cb (void *cls,
712 713
713 714
714static void 715static void
715mcast_replay_message_cb (void *cls, 716mcast_recv_replay_message (void *cls,
716 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 717 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
717 uint64_t message_id, 718 uint64_t message_id,
718 uint64_t fragment_offset, 719 uint64_t fragment_offset,
719 uint64_t flags, 720 uint64_t flags,
720 struct GNUNET_MULTICAST_ReplayHandle *rh) 721 struct GNUNET_MULTICAST_ReplayHandle *rh)
721{ 722{
722 723
723} 724}
724 725
725 726
726static void
727fragment_store_result (void *cls, int64_t result, const char *err_msg)
728{
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "fragment_store() returned %l (%s)\n", result, err_msg);
731}
732
733
734/** 727/**
735 * Convert an uint64_t in network byte order to a HashCode 728 * Convert an uint64_t in network byte order to a HashCode
736 * that can be used as key in a MultiHashMap 729 * that can be used as key in a MultiHashMap
@@ -772,17 +765,17 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
772 * Send multicast message to all clients connected to the channel. 765 * Send multicast message to all clients connected to the channel.
773 */ 766 */
774static void 767static void
775mmsg_to_clients (struct Channel *ch, 768client_send_mcast_msg (struct Channel *chn,
776 const struct GNUNET_MULTICAST_MessageHeader *mmsg) 769 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
777{ 770{
778 uint16_t size = ntohs (mmsg->header.size);
779 struct GNUNET_PSYC_MessageHeader *pmsg; 771 struct GNUNET_PSYC_MessageHeader *pmsg;
772 uint16_t size = ntohs (mmsg->header.size);
780 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); 773 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
781 774
782 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 775 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
783 "%p Sending message to client. " 776 "%p Sending multicast message to client. "
784 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", 777 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
785 ch, GNUNET_ntohll (mmsg->fragment_id), 778 chn, GNUNET_ntohll (mmsg->fragment_id),
786 GNUNET_ntohll (mmsg->message_id)); 779 GNUNET_ntohll (mmsg->message_id));
787 780
788 pmsg = GNUNET_malloc (psize); 781 pmsg = GNUNET_malloc (psize);
@@ -791,7 +784,38 @@ mmsg_to_clients (struct Channel *ch,
791 pmsg->message_id = mmsg->message_id; 784 pmsg->message_id = mmsg->message_id;
792 785
793 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); 786 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
794 msg_to_clients (ch, &pmsg->header); 787 client_send_msg (chn, &pmsg->header);
788 GNUNET_free (pmsg);
789}
790
791
792/**
793 * Send multicast request to all clients connected to the channel.
794 */
795static void
796client_send_mcast_req (struct Master *mst,
797 const struct GNUNET_MULTICAST_RequestHeader *req)
798{
799 struct Channel *chn = &mst->chn;
800
801 struct GNUNET_PSYC_MessageHeader *pmsg;
802 uint16_t size = ntohs (req->header.size);
803 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
804
805 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
806 "%p Sending multicast request to client. "
807 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
808 chn, GNUNET_ntohll (req->fragment_id),
809 GNUNET_ntohll (req->request_id));
810
811 pmsg = GNUNET_malloc (psize);
812 pmsg->header.size = htons (psize);
813 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
814 pmsg->message_id = req->request_id;
815 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
816
817 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
818 client_send_msg (chn, &pmsg->header);
795 GNUNET_free (pmsg); 819 GNUNET_free (pmsg);
796} 820}
797 821
@@ -799,14 +823,14 @@ mmsg_to_clients (struct Channel *ch,
799/** 823/**
800 * Insert a multicast message fragment into the queue belonging to the message. 824 * Insert a multicast message fragment into the queue belonging to the message.
801 * 825 *
802 * @param ch Channel. 826 * @param chn Channel.
803 * @param mmsg Multicast message fragment. 827 * @param mmsg Multicast message fragment.
804 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. 828 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
805 * @param first_ptype First PSYC message part type in @a mmsg. 829 * @param first_ptype First PSYC message part type in @a mmsg.
806 * @param last_ptype Last PSYC message part type in @a mmsg. 830 * @param last_ptype Last PSYC message part type in @a mmsg.
807 */ 831 */
808static void 832static void
809fragment_queue_insert (struct Channel *ch, 833fragment_queue_insert (struct Channel *chn,
810 const struct GNUNET_MULTICAST_MessageHeader *mmsg, 834 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
811 uint16_t first_ptype, uint16_t last_ptype) 835 uint16_t first_ptype, uint16_t last_ptype)
812{ 836{
@@ -814,13 +838,13 @@ fragment_queue_insert (struct Channel *ch,
814 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset); 838 const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
815 struct GNUNET_CONTAINER_MultiHashMap 839 struct GNUNET_CONTAINER_MultiHashMap
816 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, 840 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
817 &ch->pub_key_hash); 841 &chn->pub_key_hash);
818 842
819 struct GNUNET_HashCode msg_id_hash; 843 struct GNUNET_HashCode msg_id_hash;
820 hash_key_from_nll (&msg_id_hash, mmsg->message_id); 844 hash_key_from_nll (&msg_id_hash, mmsg->message_id);
821 845
822 struct FragmentQueue 846 struct FragmentQueue
823 *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); 847 *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
824 848
825 if (NULL == fragq) 849 if (NULL == fragq)
826 { 850 {
@@ -829,13 +853,13 @@ fragment_queue_insert (struct Channel *ch,
829 fragq->fragments 853 fragq->fragments
830 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 854 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
831 855
832 GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq, 856 GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
833 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); 857 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
834 858
835 if (NULL == chan_msgs) 859 if (NULL == chan_msgs)
836 { 860 {
837 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 861 chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
838 GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs, 862 GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
839 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); 863 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
840 } 864 }
841 } 865 }
@@ -850,7 +874,7 @@ fragment_queue_insert (struct Channel *ch,
850 "%p Adding message fragment to cache. " 874 "%p Adding message fragment to cache. "
851 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", " 875 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
852 "header_size: %" PRIu64 " + %u).\n", 876 "header_size: %" PRIu64 " + %u).\n",
853 ch, GNUNET_ntohll (mmsg->message_id), 877 chn, GNUNET_ntohll (mmsg->message_id),
854 GNUNET_ntohll (mmsg->fragment_id), 878 GNUNET_ntohll (mmsg->fragment_id),
855 fragq->header_size, size); 879 fragq->header_size, size);
856 cache_entry = GNUNET_new (struct RecvCacheEntry); 880 cache_entry = GNUNET_new (struct RecvCacheEntry);
@@ -867,7 +891,7 @@ fragment_queue_insert (struct Channel *ch,
867 "%p Message fragment is already in cache. " 891 "%p Message fragment is already in cache. "
868 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 892 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
869 ", ref_count: %u\n", 893 ", ref_count: %u\n",
870 ch, GNUNET_ntohll (mmsg->message_id), 894 chn, GNUNET_ntohll (mmsg->message_id),
871 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count); 895 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
872 } 896 }
873 897
@@ -890,11 +914,11 @@ fragment_queue_insert (struct Channel *ch,
890 { /* header is now complete */ 914 { /* header is now complete */
891 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 915 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
892 "%p Header of message %" PRIu64 " is complete.\n", 916 "%p Header of message %" PRIu64 " is complete.\n",
893 ch, GNUNET_ntohll (mmsg->message_id)); 917 chn, GNUNET_ntohll (mmsg->message_id));
894 918
895 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 919 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
896 "%p Adding message %" PRIu64 " to queue.\n", 920 "%p Adding message %" PRIu64 " to queue.\n",
897 ch, GNUNET_ntohll (mmsg->message_id)); 921 chn, GNUNET_ntohll (mmsg->message_id));
898 fragq->state = MSG_FRAG_STATE_DATA; 922 fragq->state = MSG_FRAG_STATE_DATA;
899 } 923 }
900 else 924 else
@@ -902,7 +926,7 @@ fragment_queue_insert (struct Channel *ch,
902 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 926 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
903 "%p Header of message %" PRIu64 " is NOT complete yet: " 927 "%p Header of message %" PRIu64 " is NOT complete yet: "
904 "%" PRIu64 " != %" PRIu64 "\n", 928 "%" PRIu64 " != %" PRIu64 "\n",
905 ch, GNUNET_ntohll (mmsg->message_id), frag_offset, 929 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
906 fragq->header_size); 930 fragq->header_size);
907 } 931 }
908 } 932 }
@@ -916,7 +940,7 @@ fragment_queue_insert (struct Channel *ch,
916 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 940 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
917 "%p Message %" PRIu64 " is NOT complete yet: " 941 "%p Message %" PRIu64 " is NOT complete yet: "
918 "%" PRIu64 " != %" PRIu64 "\n", 942 "%" PRIu64 " != %" PRIu64 "\n",
919 ch, GNUNET_ntohll (mmsg->message_id), frag_offset, 943 chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
920 fragq->size); 944 fragq->size);
921 break; 945 break;
922 946
@@ -935,7 +959,7 @@ fragment_queue_insert (struct Channel *ch,
935 case MSG_FRAG_STATE_CANCEL: 959 case MSG_FRAG_STATE_CANCEL:
936 if (GNUNET_NO == fragq->queued) 960 if (GNUNET_NO == fragq->queued)
937 { 961 {
938 GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL, 962 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
939 GNUNET_ntohll (mmsg->message_id)); 963 GNUNET_ntohll (mmsg->message_id));
940 fragq->queued = GNUNET_YES; 964 fragq->queued = GNUNET_YES;
941 } 965 }
@@ -953,24 +977,24 @@ fragment_queue_insert (struct Channel *ch,
953 * Send fragments of a message in order to client, after all modifiers arrived 977 * Send fragments of a message in order to client, after all modifiers arrived
954 * from multicast. 978 * from multicast.
955 * 979 *
956 * @param ch Channel. 980 * @param chn Channel.
957 * @param msg_id ID of the message @a fragq belongs to. 981 * @param msg_id ID of the message @a fragq belongs to.
958 * @param fragq Fragment queue of the message. 982 * @param fragq Fragment queue of the message.
959 * @param drop Drop message without delivering to client? 983 * @param drop Drop message without delivering to client?
960 * #GNUNET_YES or #GNUNET_NO. 984 * #GNUNET_YES or #GNUNET_NO.
961 */ 985 */
962static void 986static void
963fragment_queue_run (struct Channel *ch, uint64_t msg_id, 987fragment_queue_run (struct Channel *chn, uint64_t msg_id,
964 struct FragmentQueue *fragq, uint8_t drop) 988 struct FragmentQueue *fragq, uint8_t drop)
965{ 989{
966 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 990 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
967 "%p Running message fragment queue for message %" PRIu64 991 "%p Running message fragment queue for message %" PRIu64
968 " (state: %u).\n", 992 " (state: %u).\n",
969 ch, msg_id, fragq->state); 993 chn, msg_id, fragq->state);
970 994
971 struct GNUNET_CONTAINER_MultiHashMap 995 struct GNUNET_CONTAINER_MultiHashMap
972 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, 996 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
973 &ch->pub_key_hash); 997 &chn->pub_key_hash);
974 GNUNET_assert (NULL != chan_msgs); 998 GNUNET_assert (NULL != chan_msgs);
975 uint64_t frag_id; 999 uint64_t frag_id;
976 1000
@@ -985,7 +1009,7 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id,
985 { 1009 {
986 if (GNUNET_NO == drop) 1010 if (GNUNET_NO == drop)
987 { 1011 {
988 mmsg_to_clients (ch, cache_entry->mmsg); 1012 client_send_mcast_msg (chn, cache_entry->mmsg);
989 } 1013 }
990 if (cache_entry->ref_count <= 1) 1014 if (cache_entry->ref_count <= 1)
991 { 1015 {
@@ -1014,7 +1038,7 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id,
1014 struct GNUNET_HashCode msg_id_hash; 1038 struct GNUNET_HashCode msg_id_hash;
1015 hash_key_from_nll (&msg_id_hash, msg_id); 1039 hash_key_from_nll (&msg_id_hash, msg_id);
1016 1040
1017 GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq); 1041 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1018 GNUNET_CONTAINER_heap_destroy (fragq->fragments); 1042 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
1019 GNUNET_free (fragq); 1043 GNUNET_free (fragq);
1020 } 1044 }
@@ -1034,33 +1058,33 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id,
1034 * - A stateful message is only sent if the previous stateful message 1058 * - A stateful message is only sent if the previous stateful message
1035 * has already been delivered to the client. 1059 * has already been delivered to the client.
1036 * 1060 *
1037 * @param ch Channel. 1061 * @param chn Channel.
1038 * 1062 *
1039 * @return Number of messages removed from queue and sent to client. 1063 * @return Number of messages removed from queue and sent to client.
1040 */ 1064 */
1041static uint64_t 1065static uint64_t
1042message_queue_run (struct Channel *ch) 1066message_queue_run (struct Channel *chn)
1043{ 1067{
1044 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1068 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1045 "%p Running message queue.\n", ch); 1069 "%p Running message queue.\n", chn);
1046 uint64_t n = 0; 1070 uint64_t n = 0;
1047 uint64_t msg_id; 1071 uint64_t msg_id;
1048 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL, 1072 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1049 &msg_id)) 1073 &msg_id))
1050 { 1074 {
1051 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1075 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1052 "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id); 1076 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1053 struct GNUNET_HashCode msg_id_hash; 1077 struct GNUNET_HashCode msg_id_hash;
1054 hash_key_from_hll (&msg_id_hash, msg_id); 1078 hash_key_from_hll (&msg_id_hash, msg_id);
1055 1079
1056 struct FragmentQueue * 1080 struct FragmentQueue *
1057 fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); 1081 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1058 1082
1059 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER) 1083 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1060 { 1084 {
1061 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1085 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1062 "%p No fragq (%p) or header not complete.\n", 1086 "%p No fragq (%p) or header not complete.\n",
1063 ch, fragq); 1087 chn, fragq);
1064 break; 1088 break;
1065 } 1089 }
1066 1090
@@ -1070,40 +1094,40 @@ message_queue_run (struct Channel *ch)
1070 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) 1094 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1071 { 1095 {
1072 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) 1096 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1073 && msg_id - 1 != ch->max_message_id) 1097 && msg_id - 1 != chn->max_message_id)
1074 { 1098 {
1075 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1099 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1076 "%p Out of order message. " 1100 "%p Out of order message. "
1077 "(%" PRIu64 " - 1 != %" PRIu64 ")\n", 1101 "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1078 ch, msg_id, ch->max_message_id); 1102 chn, msg_id, chn->max_message_id);
1079 break; 1103 break;
1080 } 1104 }
1081 } 1105 }
1082 else 1106 else
1083 { 1107 {
1084 if (msg_id - fragq->state_delta != ch->max_state_message_id) 1108 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1085 { 1109 {
1086 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1110 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1087 "%p Out of order stateful message. " 1111 "%p Out of order stateful message. "
1088 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", 1112 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1089 ch, msg_id, fragq->state_delta, ch->max_state_message_id); 1113 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1090 break; 1114 break;
1091 } 1115 }
1092#if TODO 1116#if TODO
1093 /* FIXME: apply modifiers to state in PSYCstore */ 1117 /* FIXME: apply modifiers to state in PSYCstore */
1094 GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id, 1118 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
1095 state_modify_result_cb, cls); 1119 store_recv_state_modify_result, cls);
1096#endif 1120#endif
1097 ch->max_state_message_id = msg_id; 1121 chn->max_state_message_id = msg_id;
1098 } 1122 }
1099 ch->max_message_id = msg_id; 1123 chn->max_message_id = msg_id;
1100 } 1124 }
1101 fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); 1125 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1102 GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs); 1126 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1103 n++; 1127 n++;
1104 } 1128 }
1105 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1129 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1106 "%p Removed %" PRIu64 " messages from queue.\n", ch, n); 1130 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1107 return n; 1131 return n;
1108} 1132}
1109 1133
@@ -1113,107 +1137,88 @@ message_queue_run (struct Channel *ch)
1113 * 1137 *
1114 * Remove all messages in queue without sending it to clients. 1138 * Remove all messages in queue without sending it to clients.
1115 * 1139 *
1116 * @param ch Channel. 1140 * @param chn Channel.
1117 * 1141 *
1118 * @return Number of messages removed from queue. 1142 * @return Number of messages removed from queue.
1119 */ 1143 */
1120static uint64_t 1144static uint64_t
1121message_queue_drop (struct Channel *ch) 1145message_queue_drop (struct Channel *chn)
1122{ 1146{
1123 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1147 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1124 "%p Dropping message queue.\n", ch); 1148 "%p Dropping message queue.\n", chn);
1125 uint64_t n = 0; 1149 uint64_t n = 0;
1126 uint64_t msg_id; 1150 uint64_t msg_id;
1127 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL, 1151 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1128 &msg_id)) 1152 &msg_id))
1129 { 1153 {
1130 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1154 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1131 "%p Dropping message %" PRIu64 " from queue.\n", ch, msg_id); 1155 "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
1132 struct GNUNET_HashCode msg_id_hash; 1156 struct GNUNET_HashCode msg_id_hash;
1133 hash_key_from_hll (&msg_id_hash, msg_id); 1157 hash_key_from_hll (&msg_id_hash, msg_id);
1134 1158
1135 struct FragmentQueue * 1159 struct FragmentQueue *
1136 fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); 1160 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
1137 1161
1138 fragment_queue_run (ch, msg_id, fragq, GNUNET_YES); 1162 fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
1139 GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs); 1163 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1140 n++; 1164 n++;
1141 } 1165 }
1142 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1166 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1143 "%p Removed %" PRIu64 " messages from queue.\n", ch, n); 1167 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1144 return n; 1168 return n;
1145} 1169}
1146 1170
1147 1171
1148/** 1172/**
1149 * Handle incoming message from multicast. 1173 * Handle the result of a GNUNET_PSYCSTORE_fragment_store() operation.
1150 *
1151 * @param ch Channel.
1152 * @param mmsg Multicast message.
1153 *
1154 * @return #GNUNET_OK or #GNUNET_SYSERR
1155 */ 1174 */
1156static int 1175static void
1157client_multicast_message (struct Channel *ch, 1176store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg)
1158 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1159{ 1177{
1160 GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL); 1178 struct Channel *chn = cls;
1161
1162 uint16_t size = ntohs (mmsg->header.size);
1163 uint16_t first_ptype = 0, last_ptype = 0;
1164
1165 if (GNUNET_SYSERR
1166 == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
1167 (const char *) &mmsg[1],
1168 &first_ptype, &last_ptype))
1169 {
1170 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1171 "%p Received message with invalid parts from multicast. "
1172 "Dropping message.\n", ch);
1173 GNUNET_break_op (0);
1174 return GNUNET_SYSERR;
1175 }
1176
1177 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1179 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1178 "Message parts: first: type %u, last: type %u\n", 1180 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n",
1179 first_ptype, last_ptype); 1181 chn, result, err_msg);
1180
1181 fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
1182 message_queue_run (ch);
1183
1184 return GNUNET_OK;
1185} 1182}
1186 1183
1187 1184
1188/** 1185/**
1189 * Incoming message fragment from multicast. 1186 * Handle incoming message fragment from multicast.
1190 * 1187 *
1191 * Store it using PSYCstore and send it to the client of the channel. 1188 * Store it using PSYCstore and send it to the clients of the channel in order.
1192 */ 1189 */
1193static void 1190static void
1194mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg) 1191mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
1195{ 1192{
1196 struct Channel *ch = cls; 1193 struct Channel *chn = cls;
1197 uint16_t type = ntohs (msg->type); 1194 uint16_t size = ntohs (mmsg->header.size);
1198 uint16_t size = ntohs (msg->size);
1199 1195
1200 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1196 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1201 "%p Received message of type %u and size %u from multicast.\n", 1197 "%p Received multicast message of size %u.\n",
1202 ch, type, size); 1198 chn, size);
1203 1199
1204 switch (type) 1200 GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
1205 { 1201 &store_recv_fragment_store_result, chn);
1206 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: 1202
1203 uint16_t first_ptype = 0, last_ptype = 0;
1204 if (GNUNET_SYSERR
1205 == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
1206 (const char *) &mmsg[1],
1207 &first_ptype, &last_ptype))
1207 { 1208 {
1208 client_multicast_message (ch, (const struct
1209 GNUNET_MULTICAST_MessageHeader *) msg);
1210 break;
1211 }
1212 default:
1213 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1209 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1214 "%p Dropping unknown message of type %u and size %u.\n", 1210 "%p Dropping incoming multicast message with invalid parts.\n",
1215 ch, type, size); 1211 chn);
1212 GNUNET_break_op (0);
1213 return;
1216 } 1214 }
1215
1216 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1217 "Message parts: first: type %u, last: type %u\n",
1218 first_ptype, last_ptype);
1219
1220 fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
1221 message_queue_run (chn);
1217} 1222}
1218 1223
1219 1224
@@ -1226,59 +1231,35 @@ mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
1226 * @param flags Request flags. 1231 * @param flags Request flags.
1227 */ 1232 */
1228static void 1233static void
1229mcast_request_cb (void *cls, 1234mcast_recv_request (void *cls,
1230 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 1235 const struct GNUNET_MULTICAST_RequestHeader *req)
1231 const struct GNUNET_MessageHeader *msg,
1232 enum GNUNET_MULTICAST_MessageFlags flags)
1233{ 1236{
1234 struct Master *mst = cls; 1237 struct Master *mst = cls;
1235 struct Channel *ch = &mst->ch; 1238 uint16_t size = ntohs (req->header.size);
1236
1237 uint16_t type = ntohs (msg->type);
1238 uint16_t size = ntohs (msg->size);
1239 1239
1240 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1240 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1241 "%p Received request of type %u and size %u from multicast.\n", 1241 "%p Received multicast request of size %u.\n",
1242 ch, type, size); 1242 mst, size);
1243 1243
1244 switch (type) 1244 uint16_t first_ptype = 0, last_ptype = 0;
1245 { 1245 if (GNUNET_SYSERR
1246 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: 1246 == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
1247 (const char *) &req[1],
1248 &first_ptype, &last_ptype))
1247 { 1249 {
1248 const struct GNUNET_MULTICAST_RequestHeader *req 1250 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1249 = (const struct GNUNET_MULTICAST_RequestHeader *) msg; 1251 "%p Dropping incoming multicast request with invalid parts.\n",
1250 1252 mst);
1251 /* FIXME: see message_cb() */
1252 if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
1253 (const char *) &req[1],
1254 NULL, NULL))
1255 {
1256 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1257 "%p Dropping request with invalid parts "
1258 "received from multicast.\n", ch);
1259 GNUNET_break_op (0);
1260 break;
1261 }
1262
1263 struct GNUNET_PSYC_MessageHeader *pmsg;
1264 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
1265 pmsg = GNUNET_malloc (psize);
1266 pmsg->header.size = htons (psize);
1267 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
1268 pmsg->message_id = req->request_id;
1269 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1270
1271 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1272 msg_to_clients (ch, &pmsg->header);
1273 GNUNET_free (pmsg);
1274 break;
1275 }
1276 default:
1277 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1278 "%p Dropping unknown request of type %u and size %u.\n",
1279 ch, type, size);
1280 GNUNET_break_op (0); 1253 GNUNET_break_op (0);
1254 return;
1281 } 1255 }
1256
1257 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258 "Message parts: first: type %u, last: type %u\n",
1259 first_ptype, last_ptype);
1260
1261 /* FIXME: in-order delivery */
1262 client_send_mcast_req (mst, req);
1282} 1263}
1283 1264
1284 1265
@@ -1286,13 +1267,13 @@ mcast_request_cb (void *cls,
1286 * Response from PSYCstore with the current counter values for a channel master. 1267 * Response from PSYCstore with the current counter values for a channel master.
1287 */ 1268 */
1288static void 1269static void
1289master_counters_cb (void *cls, int result, uint64_t max_fragment_id, 1270store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1290 uint64_t max_message_id, uint64_t max_group_generation, 1271 uint64_t max_message_id, uint64_t max_group_generation,
1291 uint64_t max_state_message_id) 1272 uint64_t max_state_message_id)
1292{ 1273{
1293 struct Master *mst = cls; 1274 struct Master *mst = cls;
1294 struct Channel *ch = &mst->ch; 1275 struct Channel *chn = &mst->chn;
1295 ch->store_op = NULL; 1276 chn->store_op = NULL;
1296 1277
1297 struct CountersResult res; 1278 struct CountersResult res;
1298 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1279 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
@@ -1303,28 +1284,28 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1303 if (GNUNET_OK == result || GNUNET_NO == result) 1284 if (GNUNET_OK == result || GNUNET_NO == result)
1304 { 1285 {
1305 mst->max_message_id = max_message_id; 1286 mst->max_message_id = max_message_id;
1306 ch->max_message_id = max_message_id; 1287 chn->max_message_id = max_message_id;
1307 ch->max_state_message_id = max_state_message_id; 1288 chn->max_state_message_id = max_state_message_id;
1308 mst->max_group_generation = max_group_generation; 1289 mst->max_group_generation = max_group_generation;
1309 mst->origin 1290 mst->origin
1310 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, 1291 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1311 &mcast_join_request_cb, 1292 &mcast_recv_join_request,
1312 &mcast_membership_test_cb, 1293 &mcast_recv_membership_test,
1313 &mcast_replay_fragment_cb, 1294 &mcast_recv_replay_fragment,
1314 &mcast_replay_message_cb, 1295 &mcast_recv_replay_message,
1315 &mcast_request_cb, 1296 &mcast_recv_request,
1316 &mcast_message_cb, ch); 1297 &mcast_recv_message, chn);
1317 ch->ready = GNUNET_YES; 1298 chn->ready = GNUNET_YES;
1318 } 1299 }
1319 else 1300 else
1320 { 1301 {
1321 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1302 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1322 "%p GNUNET_PSYCSTORE_counters_get() " 1303 "%p GNUNET_PSYCSTORE_counters_get() "
1323 "returned %d for channel %s.\n", 1304 "returned %d for channel %s.\n",
1324 ch, result, GNUNET_h2s (&ch->pub_key_hash)); 1305 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1325 } 1306 }
1326 1307
1327 msg_to_clients (ch, &res.header); 1308 client_send_msg (chn, &res.header);
1328} 1309}
1329 1310
1330 1311
@@ -1332,13 +1313,13 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1332 * Response from PSYCstore with the current counter values for a channel slave. 1313 * Response from PSYCstore with the current counter values for a channel slave.
1333 */ 1314 */
1334void 1315void
1335slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, 1316store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1336 uint64_t max_message_id, uint64_t max_group_generation, 1317 uint64_t max_message_id, uint64_t max_group_generation,
1337 uint64_t max_state_message_id) 1318 uint64_t max_state_message_id)
1338{ 1319{
1339 struct Slave *slv = cls; 1320 struct Slave *slv = cls;
1340 struct Channel *ch = &slv->ch; 1321 struct Channel *chn = &slv->chn;
1341 ch->store_op = NULL; 1322 chn->store_op = NULL;
1342 1323
1343 struct CountersResult res; 1324 struct CountersResult res;
1344 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1325 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
@@ -1348,38 +1329,38 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1348 1329
1349 if (GNUNET_OK == result || GNUNET_NO == result) 1330 if (GNUNET_OK == result || GNUNET_NO == result)
1350 { 1331 {
1351 ch->max_message_id = max_message_id; 1332 chn->max_message_id = max_message_id;
1352 ch->max_state_message_id = max_state_message_id; 1333 chn->max_state_message_id = max_state_message_id;
1353 slv->member 1334 slv->member
1354 = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key, 1335 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1355 &slv->origin, 1336 &slv->origin,
1356 slv->relay_count, slv->relays, 1337 slv->relay_count, slv->relays,
1357 slv->join_req, 1338 slv->join_req,
1358 &mcast_join_request_cb, 1339 &mcast_recv_join_request,
1359 &mcast_join_decision_cb, 1340 &mcast_recv_join_decision,
1360 &mcast_membership_test_cb, 1341 &mcast_recv_membership_test,
1361 &mcast_replay_fragment_cb, 1342 &mcast_recv_replay_fragment,
1362 &mcast_replay_message_cb, 1343 &mcast_recv_replay_message,
1363 &mcast_message_cb, ch); 1344 &mcast_recv_message, chn);
1364 } 1345 }
1365 else 1346 else
1366 { 1347 {
1367 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1348 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1368 "%p GNUNET_PSYCSTORE_counters_get() " 1349 "%p GNUNET_PSYCSTORE_counters_get() "
1369 "returned %d for channel %s.\n", 1350 "returned %d for channel %s.\n",
1370 ch, result, GNUNET_h2s (&ch->pub_key_hash)); 1351 chn, result, GNUNET_h2s (&chn->pub_key_hash));
1371 } 1352 }
1372 1353
1373 msg_to_clients (ch, &res.header); 1354 client_send_msg (chn, &res.header);
1374} 1355}
1375 1356
1376 1357
1377static void 1358static void
1378channel_init (struct Channel *ch) 1359channel_init (struct Channel *chn)
1379{ 1360{
1380 ch->recv_msgs 1361 chn->recv_msgs
1381 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 1362 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1382 ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 1363 chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1383} 1364}
1384 1365
1385 1366
@@ -1387,8 +1368,8 @@ channel_init (struct Channel *ch)
1387 * Handle a connecting client starting a channel master. 1368 * Handle a connecting client starting a channel master.
1388 */ 1369 */
1389static void 1370static void
1390client_master_start (void *cls, struct GNUNET_SERVER_Client *client, 1371client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1391 const struct GNUNET_MessageHeader *msg) 1372 const struct GNUNET_MessageHeader *msg)
1392{ 1373{
1393 const struct MasterStartRequest *req 1374 const struct MasterStartRequest *req
1394 = (const struct MasterStartRequest *) msg; 1375 = (const struct MasterStartRequest *) msg;
@@ -1401,7 +1382,7 @@ client_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1401 1382
1402 struct Master * 1383 struct Master *
1403 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash); 1384 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1404 struct Channel *ch; 1385 struct Channel *chn;
1405 1386
1406 if (NULL == mst) 1387 if (NULL == mst)
1407 { 1388 {
@@ -1410,20 +1391,20 @@ client_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1410 mst->priv_key = req->channel_key; 1391 mst->priv_key = req->channel_key;
1411 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 1392 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1412 1393
1413 ch = &mst->ch; 1394 chn = &mst->chn;
1414 ch->is_master = GNUNET_YES; 1395 chn->is_master = GNUNET_YES;
1415 ch->pub_key = pub_key; 1396 chn->pub_key = pub_key;
1416 ch->pub_key_hash = pub_key_hash; 1397 chn->pub_key_hash = pub_key_hash;
1417 channel_init (ch); 1398 channel_init (chn);
1418 1399
1419 GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch, 1400 GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
1420 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 1401 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1421 ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, 1402 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1422 master_counters_cb, mst); 1403 store_recv_master_counters, mst);
1423 } 1404 }
1424 else 1405 else
1425 { 1406 {
1426 ch = &mst->ch; 1407 chn = &mst->chn;
1427 1408
1428 struct CountersResult res; 1409 struct CountersResult res;
1429 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1410 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
@@ -1438,13 +1419,13 @@ client_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1438 1419
1439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1440 "%p Client connected as master to channel %s.\n", 1421 "%p Client connected as master to channel %s.\n",
1441 mst, GNUNET_h2s (&ch->pub_key_hash)); 1422 mst, GNUNET_h2s (&chn->pub_key_hash));
1442 1423
1443 struct ClientList *cl = GNUNET_new (struct ClientList); 1424 struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1444 cl->client = client; 1425 cli->client = client;
1445 GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl); 1426 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1446 1427
1447 GNUNET_SERVER_client_set_user_context (client, ch); 1428 GNUNET_SERVER_client_set_user_context (client, chn);
1448 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1429 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1449} 1430}
1450 1431
@@ -1453,8 +1434,8 @@ client_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1453 * Handle a connecting client joining as a channel slave. 1434 * Handle a connecting client joining as a channel slave.
1454 */ 1435 */
1455static void 1436static void
1456client_slave_join (void *cls, struct GNUNET_SERVER_Client *client, 1437client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1457 const struct GNUNET_MessageHeader *msg) 1438 const struct GNUNET_MessageHeader *msg)
1458{ 1439{
1459 const struct SlaveJoinRequest *req 1440 const struct SlaveJoinRequest *req
1460 = (const struct SlaveJoinRequest *) msg; 1441 = (const struct SlaveJoinRequest *) msg;
@@ -1467,13 +1448,13 @@ client_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1467 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash); 1448 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1468 1449
1469 struct GNUNET_CONTAINER_MultiHashMap * 1450 struct GNUNET_CONTAINER_MultiHashMap *
1470 ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash); 1451 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1471 struct Slave *slv = NULL; 1452 struct Slave *slv = NULL;
1472 struct Channel *ch; 1453 struct Channel *chn;
1473 1454
1474 if (NULL != ch_slv) 1455 if (NULL != chn_slv)
1475 { 1456 {
1476 slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash); 1457 slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
1477 } 1458 }
1478 if (NULL == slv) 1459 if (NULL == slv)
1479 { 1460 {
@@ -1494,34 +1475,34 @@ client_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1494 memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); 1475 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1495 } 1476 }
1496 1477
1497 ch = &slv->ch; 1478 chn = &slv->chn;
1498 ch->is_master = GNUNET_NO; 1479 chn->is_master = GNUNET_NO;
1499 ch->pub_key = req->channel_key; 1480 chn->pub_key = req->channel_key;
1500 ch->pub_key_hash = pub_key_hash; 1481 chn->pub_key_hash = pub_key_hash;
1501 channel_init (ch); 1482 channel_init (chn);
1502 1483
1503 if (NULL == ch_slv) 1484 if (NULL == chn_slv)
1504 { 1485 {
1505 ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1486 chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1506 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &ch->pub_key_hash, ch_slv, 1487 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
1507 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 1488 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1508 } 1489 }
1509 GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv->pub_key_hash, ch, 1490 GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
1510 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); 1491 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1511 GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch, 1492 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1512 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 1493 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1513 ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, 1494 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1514 slave_counters_cb, slv); 1495 &store_recv_slave_counters, slv);
1515 } 1496 }
1516 else 1497 else
1517 { 1498 {
1518 ch = &slv->ch; 1499 chn = &slv->chn;
1519 1500
1520 struct CountersResult res; 1501 struct CountersResult res;
1521 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1502 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1522 res.header.size = htons (sizeof (res)); 1503 res.header.size = htons (sizeof (res));
1523 res.result_code = htonl (GNUNET_OK); 1504 res.result_code = htonl (GNUNET_OK);
1524 res.max_message_id = GNUNET_htonll (ch->max_message_id); 1505 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1525 1506
1526 GNUNET_SERVER_notification_context_add (nc, client); 1507 GNUNET_SERVER_notification_context_add (nc, client);
1527 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header, 1508 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
@@ -1530,16 +1511,16 @@ client_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1530 if (NULL == slv->member) 1511 if (NULL == slv->member)
1531 { 1512 {
1532 slv->member 1513 slv->member
1533 = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key, 1514 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1534 &slv->origin, 1515 &slv->origin,
1535 slv->relay_count, slv->relays, 1516 slv->relay_count, slv->relays,
1536 slv->join_req, 1517 slv->join_req,
1537 &mcast_join_request_cb, 1518 &mcast_recv_join_request,
1538 &mcast_join_decision_cb, 1519 &mcast_recv_join_decision,
1539 &mcast_membership_test_cb, 1520 &mcast_recv_membership_test,
1540 &mcast_replay_fragment_cb, 1521 &mcast_recv_replay_fragment,
1541 &mcast_replay_message_cb, 1522 &mcast_recv_replay_message,
1542 &mcast_message_cb, ch); 1523 &mcast_recv_message, chn);
1543 1524
1544 } 1525 }
1545 else if (NULL != slv->join_dcsn) 1526 else if (NULL != slv->join_dcsn)
@@ -1553,13 +1534,13 @@ client_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1553 1534
1554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1535 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1555 "%p Client connected as slave to channel %s.\n", 1536 "%p Client connected as slave to channel %s.\n",
1556 slv, GNUNET_h2s (&ch->pub_key_hash)); 1537 slv, GNUNET_h2s (&chn->pub_key_hash));
1557 1538
1558 struct ClientList *cl = GNUNET_new (struct ClientList); 1539 struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
1559 cl->client = client; 1540 cli->client = client;
1560 GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl); 1541 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1561 1542
1562 GNUNET_SERVER_client_set_user_context (client, &slv->ch); 1543 GNUNET_SERVER_client_set_user_context (client, &slv->chn);
1563 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1544 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1564} 1545}
1565 1546
@@ -1575,8 +1556,8 @@ struct JoinDecisionClosure
1575 * Iterator callback for responding to join requests of a slave. 1556 * Iterator callback for responding to join requests of a slave.
1576 */ 1557 */
1577static int 1558static int
1578send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, 1559mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1579 void *jh) 1560 void *jh)
1580{ 1561{
1581 struct JoinDecisionClosure *jcls = cls; 1562 struct JoinDecisionClosure *jcls = cls;
1582 // FIXME: add relays 1563 // FIXME: add relays
@@ -1589,13 +1570,13 @@ send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1589 * Join decision from client. 1570 * Join decision from client.
1590 */ 1571 */
1591static void 1572static void
1592client_join_decision (void *cls, struct GNUNET_SERVER_Client *client, 1573client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1593 const struct GNUNET_MessageHeader *msg) 1574 const struct GNUNET_MessageHeader *msg)
1594{ 1575{
1595 struct Channel * 1576 struct Channel *
1596 ch = GNUNET_SERVER_client_get_user_context (client, struct Channel); 1577 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1597 GNUNET_assert (GNUNET_YES == ch->is_master); 1578 GNUNET_assert (GNUNET_YES == chn->is_master);
1598 struct Master *mst = (struct Master *) ch; 1579 struct Master *mst = (struct Master *) chn;
1599 1580
1600 struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg; 1581 struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg;
1601 struct JoinDecisionClosure jcls; 1582 struct JoinDecisionClosure jcls;
@@ -1612,13 +1593,13 @@ client_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1612 1593
1613 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1594 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1614 "%p Got join decision (%d) from client for channel %s..\n", 1595 "%p Got join decision (%d) from client for channel %s..\n",
1615 mst, jcls.is_admitted, GNUNET_h2s (&ch->pub_key_hash)); 1596 mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
1616 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1597 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617 "%p ..and slave %s.\n", 1598 "%p ..and slave %s.\n",
1618 mst, GNUNET_h2s (&slave_key_hash)); 1599 mst, GNUNET_h2s (&slave_key_hash));
1619 1600
1620 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash, 1601 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
1621 &send_join_decision_cb, &jcls); 1602 &mcast_send_join_decision, &jcls);
1622 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash); 1603 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
1623 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1604 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1624} 1605}
@@ -1629,10 +1610,10 @@ client_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1629 * 1610 *
1630 * Sent after a message fragment has been passed on to multicast. 1611 * Sent after a message fragment has been passed on to multicast.
1631 * 1612 *
1632 * @param ch The channel struct for the client. 1613 * @param chn The channel struct for the client.
1633 */ 1614 */
1634static void 1615static void
1635send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client) 1616send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1636{ 1617{
1637 struct GNUNET_MessageHeader res; 1618 struct GNUNET_MessageHeader res;
1638 res.size = htons (sizeof (res)); 1619 res.size = htons (sizeof (res));
@@ -1650,40 +1631,40 @@ send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1650static int 1631static int
1651transmit_notify (void *cls, size_t *data_size, void *data) 1632transmit_notify (void *cls, size_t *data_size, void *data)
1652{ 1633{
1653 struct Channel *ch = cls; 1634 struct Channel *chn = cls;
1654 struct TransmitMessage *tmit_msg = ch->tmit_head; 1635 struct TransmitMessage *tmit_msg = chn->tmit_head;
1655 1636
1656 if (NULL == tmit_msg || *data_size < tmit_msg->size) 1637 if (NULL == tmit_msg || *data_size < tmit_msg->size)
1657 { 1638 {
1658 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1639 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1659 "%p transmit_notify: nothing to send.\n", ch); 1640 "%p transmit_notify: nothing to send.\n", chn);
1660 *data_size = 0; 1641 *data_size = 0;
1661 return GNUNET_NO; 1642 return GNUNET_NO;
1662 } 1643 }
1663 1644
1664 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1665 "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size); 1646 "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
1666 1647
1667 *data_size = tmit_msg->size; 1648 *data_size = tmit_msg->size;
1668 memcpy (data, &tmit_msg[1], *data_size); 1649 memcpy (data, &tmit_msg[1], *data_size);
1669 1650
1670 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; 1651 int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
1671 if (NULL != tmit_msg->client) 1652 if (NULL != tmit_msg->client)
1672 send_message_ack (ch, tmit_msg->client); 1653 send_message_ack (chn, tmit_msg->client);
1673 1654
1674 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); 1655 GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
1675 GNUNET_free (tmit_msg); 1656 GNUNET_free (tmit_msg);
1676 1657
1677 if (0 == ch->tmit_task) 1658 if (0 == chn->tmit_task)
1678 { 1659 {
1679 if (NULL != ch->tmit_head) 1660 if (NULL != chn->tmit_head)
1680 { 1661 {
1681 transmit_message (ch); 1662 transmit_message (chn);
1682 } 1663 }
1683 else if (ch->disconnected) 1664 else if (chn->disconnected)
1684 { 1665 {
1685 /* FIXME: handle partial message (when still in_transmit) */ 1666 /* FIXME: handle partial message (when still in_transmit) */
1686 cleanup_channel (ch); 1667 cleanup_channel (chn);
1687 } 1668 }
1688 } 1669 }
1689 1670
@@ -1732,7 +1713,7 @@ static void
1732master_transmit_message (struct Master *mst) 1713master_transmit_message (struct Master *mst)
1733{ 1714{
1734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst); 1715 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
1735 mst->ch.tmit_task = 0; 1716 mst->chn.tmit_task = 0;
1736 if (NULL == mst->tmit_handle) 1717 if (NULL == mst->tmit_handle)
1737 { 1718 {
1738 mst->tmit_handle 1719 mst->tmit_handle
@@ -1753,7 +1734,7 @@ master_transmit_message (struct Master *mst)
1753static void 1734static void
1754slave_transmit_message (struct Slave *slv) 1735slave_transmit_message (struct Slave *slv)
1755{ 1736{
1756 slv->ch.tmit_task = 0; 1737 slv->chn.tmit_task = 0;
1757 if (NULL == slv->tmit_handle) 1738 if (NULL == slv->tmit_handle)
1758 { 1739 {
1759 slv->tmit_handle 1740 slv->tmit_handle
@@ -1768,11 +1749,11 @@ slave_transmit_message (struct Slave *slv)
1768 1749
1769 1750
1770static inline void 1751static inline void
1771transmit_message (struct Channel *ch) 1752transmit_message (struct Channel *chn)
1772{ 1753{
1773 ch->is_master 1754 chn->is_master
1774 ? master_transmit_message ((struct Master *) ch) 1755 ? master_transmit_message ((struct Master *) chn)
1775 : slave_transmit_message ((struct Slave *) ch); 1756 : slave_transmit_message ((struct Slave *) chn);
1776} 1757}
1777 1758
1778 1759
@@ -1828,7 +1809,7 @@ slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1828/** 1809/**
1829 * Queue PSYC message parts for sending to multicast. 1810 * Queue PSYC message parts for sending to multicast.
1830 * 1811 *
1831 * @param ch Channel to send to. 1812 * @param chn Channel to send to.
1832 * @param client Client the message originates from. 1813 * @param client Client the message originates from.
1833 * @param data_size Size of @a data. 1814 * @param data_size Size of @a data.
1834 * @param data Concatenated message parts. 1815 * @param data Concatenated message parts.
@@ -1836,7 +1817,7 @@ slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1836 * @param last_ptype Last message part type in @a data. 1817 * @param last_ptype Last message part type in @a data.
1837 */ 1818 */
1838static void 1819static void
1839queue_message (struct Channel *ch, 1820queue_message (struct Channel *chn,
1840 struct GNUNET_SERVER_Client *client, 1821 struct GNUNET_SERVER_Client *client,
1841 size_t data_size, 1822 size_t data_size,
1842 const void *data, 1823 const void *data,
@@ -1847,14 +1828,14 @@ queue_message (struct Channel *ch,
1847 memcpy (&tmit_msg[1], data, data_size); 1828 memcpy (&tmit_msg[1], data, data_size);
1848 tmit_msg->client = client; 1829 tmit_msg->client = client;
1849 tmit_msg->size = data_size; 1830 tmit_msg->size = data_size;
1850 tmit_msg->state = ch->tmit_state; 1831 tmit_msg->state = chn->tmit_state;
1851 1832
1852 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); 1833 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
1853 1834
1854 ch->is_master 1835 chn->is_master
1855 ? master_queue_message ((struct Master *) ch, tmit_msg, 1836 ? master_queue_message ((struct Master *) chn, tmit_msg,
1856 first_ptype, last_ptype) 1837 first_ptype, last_ptype)
1857 : slave_queue_message ((struct Slave *) ch, tmit_msg, 1838 : slave_queue_message ((struct Slave *) chn, tmit_msg,
1858 first_ptype, last_ptype); 1839 first_ptype, last_ptype);
1859} 1840}
1860 1841
@@ -1862,11 +1843,11 @@ queue_message (struct Channel *ch,
1862/** 1843/**
1863 * Cancel transmission of current message. 1844 * Cancel transmission of current message.
1864 * 1845 *
1865 * @param ch Channel to send to. 1846 * @param chn Channel to send to.
1866 * @param client Client the message originates from. 1847 * @param client Client the message originates from.
1867 */ 1848 */
1868static void 1849static void
1869transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client) 1850transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
1870{ 1851{
1871 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; 1852 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1872 1853
@@ -1874,8 +1855,8 @@ transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1874 msg.size = htons (sizeof (msg)); 1855 msg.size = htons (sizeof (msg));
1875 msg.type = htons (type); 1856 msg.type = htons (type);
1876 1857
1877 queue_message (ch, client, sizeof (msg), &msg, type, type); 1858 queue_message (chn, client, sizeof (msg), &msg, type, type);
1878 transmit_message (ch); 1859 transmit_message (chn);
1879 1860
1880 /* FIXME: cleanup */ 1861 /* FIXME: cleanup */
1881} 1862}
@@ -1885,21 +1866,21 @@ transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1885 * Incoming message from a master or slave client. 1866 * Incoming message from a master or slave client.
1886 */ 1867 */
1887static void 1868static void
1888client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, 1869client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1889 const struct GNUNET_MessageHeader *msg) 1870 const struct GNUNET_MessageHeader *msg)
1890{ 1871{
1891 struct Channel * 1872 struct Channel *
1892 ch = GNUNET_SERVER_client_get_user_context (client, struct Channel); 1873 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1893 GNUNET_assert (NULL != ch); 1874 GNUNET_assert (NULL != chn);
1894 1875
1895 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1876 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1896 "%p Received message from client.\n", ch); 1877 "%p Received message from client.\n", chn);
1897 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); 1878 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
1898 1879
1899 if (GNUNET_YES != ch->ready) 1880 if (GNUNET_YES != chn->ready)
1900 { 1881 {
1901 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1882 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1902 "%p Channel is not ready, dropping message from client.\n", ch); 1883 "%p Channel is not ready, dropping message from client.\n", chn);
1903 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1884 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1904 return; 1885 return;
1905 } 1886 }
@@ -1907,30 +1888,30 @@ client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1907 uint16_t size = ntohs (msg->size); 1888 uint16_t size = ntohs (msg->size);
1908 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) 1889 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
1909 { 1890 {
1910 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", ch); 1891 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
1911 GNUNET_break (0); 1892 GNUNET_break (0);
1912 transmit_cancel (ch, client); 1893 transmit_cancel (chn, client);
1913 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1894 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1914 return; 1895 return;
1915 } 1896 }
1916 1897
1917 uint16_t first_ptype = 0, last_ptype = 0; 1898 uint16_t first_ptype = 0, last_ptype = 0;
1918 if (GNUNET_SYSERR 1899 if (GNUNET_SYSERR
1919 == GNUNET_PSYC_check_message_parts (size - sizeof (*msg), 1900 == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
1920 (const char *) &msg[1], 1901 (const char *) &msg[1],
1921 &first_ptype, &last_ptype)) 1902 &first_ptype, &last_ptype))
1922 { 1903 {
1923 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1904 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1924 "%p Received invalid message part from client.\n", ch); 1905 "%p Received invalid message part from client.\n", chn);
1925 GNUNET_break (0); 1906 GNUNET_break (0);
1926 transmit_cancel (ch, client); 1907 transmit_cancel (chn, client);
1927 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1908 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1928 return; 1909 return;
1929 } 1910 }
1930 1911
1931 queue_message (ch, client, size - sizeof (*msg), &msg[1], 1912 queue_message (chn, client, size - sizeof (*msg), &msg[1],
1932 first_ptype, last_ptype); 1913 first_ptype, last_ptype);
1933 transmit_message (ch); 1914 transmit_message (chn);
1934 1915
1935 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1916 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1936}; 1917};
@@ -1940,8 +1921,8 @@ client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1940 * Client requests to add a slave to the membership database. 1921 * Client requests to add a slave to the membership database.
1941 */ 1922 */
1942static void 1923static void
1943client_slave_add (void *cls, struct GNUNET_SERVER_Client *client, 1924client_recv_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
1944 const struct GNUNET_MessageHeader *msg) 1925 const struct GNUNET_MessageHeader *msg)
1945{ 1926{
1946 1927
1947} 1928}
@@ -1951,8 +1932,8 @@ client_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
1951 * Client requests to remove a slave from the membership database. 1932 * Client requests to remove a slave from the membership database.
1952 */ 1933 */
1953static void 1934static void
1954client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client, 1935client_recv_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
1955 const struct GNUNET_MessageHeader *msg) 1936 const struct GNUNET_MessageHeader *msg)
1956{ 1937{
1957 1938
1958} 1939}
@@ -1962,8 +1943,8 @@ client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
1962 * Client requests channel history from PSYCstore. 1943 * Client requests channel history from PSYCstore.
1963 */ 1944 */
1964static void 1945static void
1965client_story_request (void *cls, struct GNUNET_SERVER_Client *client, 1946client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client,
1966 const struct GNUNET_MessageHeader *msg) 1947 const struct GNUNET_MessageHeader *msg)
1967{ 1948{
1968 1949
1969} 1950}
@@ -1973,8 +1954,8 @@ client_story_request (void *cls, struct GNUNET_SERVER_Client *client,
1973 * Client requests best matching state variable from PSYCstore. 1954 * Client requests best matching state variable from PSYCstore.
1974 */ 1955 */
1975static void 1956static void
1976client_state_get (void *cls, struct GNUNET_SERVER_Client *client, 1957client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1977 const struct GNUNET_MessageHeader *msg) 1958 const struct GNUNET_MessageHeader *msg)
1978{ 1959{
1979 1960
1980} 1961}
@@ -1984,8 +1965,8 @@ client_state_get (void *cls, struct GNUNET_SERVER_Client *client,
1984 * Client requests state variables with a given prefix from PSYCstore. 1965 * Client requests state variables with a given prefix from PSYCstore.
1985 */ 1966 */
1986static void 1967static void
1987client_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, 1968client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
1988 const struct GNUNET_MessageHeader *msg) 1969 const struct GNUNET_MessageHeader *msg)
1989{ 1970{
1990 1971
1991} 1972}
@@ -2003,32 +1984,34 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
2003 const struct GNUNET_CONFIGURATION_Handle *c) 1984 const struct GNUNET_CONFIGURATION_Handle *c)
2004{ 1985{
2005 static const struct GNUNET_SERVER_MessageHandler handlers[] = { 1986 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
2006 { &client_master_start, NULL, 1987 { &client_recv_master_start, NULL,
2007 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 }, 1988 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2008 1989
2009 { &client_slave_join, NULL, 1990 { &client_recv_slave_join, NULL,
2010 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, 1991 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2011 1992
2012 { &client_join_decision, NULL, 1993 { &client_recv_join_decision, NULL,
2013 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 }, 1994 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2014 1995
2015 { &client_psyc_message, NULL, 1996 { &client_recv_psyc_message, NULL,
2016 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, 1997 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2017 1998
2018 { &client_slave_add, NULL, 1999 { &client_recv_slave_add, NULL,
2019 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 }, 2000 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
2020 2001
2021 { &client_slave_remove, NULL, 2002 { &client_recv_slave_remove, NULL,
2022 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 }, 2003 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
2023 2004
2024 { &client_story_request, NULL, 2005 { &client_recv_story_request, NULL,
2025 GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 }, 2006 GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
2026 2007
2027 { &client_state_get, NULL, 2008 { &client_recv_state_get, NULL,
2028 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, 2009 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2029 2010
2030 { &client_state_get_prefix, NULL, 2011 { &client_recv_state_get_prefix, NULL,
2031 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 } 2012 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2013
2014 { NULL, NULL, 0, 0 }
2032 }; 2015 };
2033 2016
2034 cfg = c; 2017 cfg = c;
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 7ec9d21b7..bfb6f43fb 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -37,29 +37,11 @@
37#include "gnunet_env_lib.h" 37#include "gnunet_env_lib.h"
38#include "gnunet_multicast_service.h" 38#include "gnunet_multicast_service.h"
39#include "gnunet_psyc_service.h" 39#include "gnunet_psyc_service.h"
40#include "gnunet_psyc_util_lib.h"
40#include "psyc.h" 41#include "psyc.h"
41 42
42#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) 43#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
43 44
44struct MessageQueue
45{
46 struct MessageQueue *prev;
47 struct MessageQueue *next;
48 /* Followed by struct GNUNET_MessageHeader msg */
49};
50
51
52/**
53 * Handle for a pending PSYC transmission operation.
54 */
55struct GNUNET_PSYC_ChannelTransmitHandle
56{
57 struct GNUNET_PSYC_Channel *ch;
58 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
59 GNUNET_PSYC_TransmitNotifyData notify_data;
60 void *notify_cls;
61 enum MessageState state;
62};
63 45
64/** 46/**
65 * Handle to access PSYC channel operations for both the master and slaves. 47 * Handle to access PSYC channel operations for both the master and slaves.
@@ -67,109 +49,29 @@ struct GNUNET_PSYC_ChannelTransmitHandle
67struct GNUNET_PSYC_Channel 49struct GNUNET_PSYC_Channel
68{ 50{
69 /** 51 /**
70 * Transmission handle;
71 */
72 struct GNUNET_PSYC_ChannelTransmitHandle tmit;
73
74 /**
75 * Configuration to use. 52 * Configuration to use.
76 */ 53 */
77 const struct GNUNET_CONFIGURATION_Handle *cfg; 54 const struct GNUNET_CONFIGURATION_Handle *cfg;
78 55
79 /** 56 /**
80 * Socket (if available). 57 * Client connection to the service.
81 */
82 struct GNUNET_CLIENT_Connection *client;
83
84 /**
85 * Currently pending transmission request, or NULL for none.
86 */ 58 */
87 struct GNUNET_CLIENT_TransmitHandle *th; 59 struct GNUNET_CLIENT_MANAGER_Connection *client;
88 60
89 /** 61 /**
90 * Head of messages to transmit to the service. 62 * Transmission handle;
91 */
92 struct MessageQueue *tmit_head;
93
94 /**
95 * Tail of operations to transmit to the service.
96 */ 63 */
97 struct MessageQueue *tmit_tail; 64 struct GNUNET_PSYC_TransmitHandle *tmit;
98 65
99 /** 66 /**
100 * Message currently being transmitted to the service. 67 * Receipt handle;
101 */ 68 */
102 struct MessageQueue *tmit_msg; 69 struct GNUNET_PSYC_ReceiveHandle *recv;
103 70
104 /** 71 /**
105 * Message to send on reconnect. 72 * Message to send on reconnect.
106 */ 73 */
107 struct GNUNET_MessageHeader *reconnect_msg; 74 struct GNUNET_MessageHeader *connect_msg;
108
109 /**
110 * Task doing exponential back-off trying to reconnect.
111 */
112 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
113
114 /**
115 * Time for next connect retry.
116 */
117 struct GNUNET_TIME_Relative reconnect_delay;
118
119 /**
120 * Message part callback.
121 */
122 GNUNET_PSYC_MessageCallback message_cb;
123
124 /**
125 * Message part callback for historic message.
126 */
127 GNUNET_PSYC_MessageCallback hist_message_cb;
128
129 /**
130 * Closure for @a message_cb.
131 */
132 void *cb_cls;
133
134 /**
135 * ID of the message being received from the PSYC service.
136 */
137 uint64_t recv_message_id;
138
139 /**
140 * Public key of the slave from which a message is being received.
141 */
142 struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
143
144 /**
145 * State of the currently being received message from the PSYC service.
146 */
147 enum MessageState recv_state;
148
149 /**
150 * Flags for the currently being received message from the PSYC service.
151 */
152 enum GNUNET_PSYC_MessageFlags recv_flags;
153
154 /**
155 * Expected value size for the modifier being received from the PSYC service.
156 */
157 uint32_t recv_mod_value_size_expected;
158
159 /**
160 * Actual value size for the modifier being received from the PSYC service.
161 */
162 uint32_t recv_mod_value_size;
163
164 /**
165 * Is transmission paused?
166 */
167 uint8_t tmit_paused;
168
169 /**
170 * Are we still waiting for a PSYC_TRANSMIT_ACK?
171 */
172 uint8_t tmit_ack_pending;
173 75
174 /** 76 /**
175 * Are we polling for incoming messages right now? 77 * Are we polling for incoming messages right now?
@@ -177,11 +79,6 @@ struct GNUNET_PSYC_Channel
177 uint8_t in_receive; 79 uint8_t in_receive;
178 80
179 /** 81 /**
180 * Are we currently transmitting a message?
181 */
182 uint8_t in_transmit;
183
184 /**
185 * Is this a master or slave channel? 82 * Is this a master or slave channel?
186 */ 83 */
187 uint8_t is_master; 84 uint8_t is_master;
@@ -193,7 +90,7 @@ struct GNUNET_PSYC_Channel
193 */ 90 */
194struct GNUNET_PSYC_Master 91struct GNUNET_PSYC_Master
195{ 92{
196 struct GNUNET_PSYC_Channel ch; 93 struct GNUNET_PSYC_Channel chn;
197 94
198 GNUNET_PSYC_MasterStartCallback start_cb; 95 GNUNET_PSYC_MasterStartCallback start_cb;
199 96
@@ -201,6 +98,11 @@ struct GNUNET_PSYC_Master
201 * Join request callback. 98 * Join request callback.
202 */ 99 */
203 GNUNET_PSYC_JoinRequestCallback join_req_cb; 100 GNUNET_PSYC_JoinRequestCallback join_req_cb;
101
102 /**
103 * Closure for the callbacks.
104 */
105 void *cb_cls;
204}; 106};
205 107
206 108
@@ -209,11 +111,16 @@ struct GNUNET_PSYC_Master
209 */ 111 */
210struct GNUNET_PSYC_Slave 112struct GNUNET_PSYC_Slave
211{ 113{
212 struct GNUNET_PSYC_Channel ch; 114 struct GNUNET_PSYC_Channel chn;
213 115
214 GNUNET_PSYC_SlaveConnectCallback connect_cb; 116 GNUNET_PSYC_SlaveConnectCallback connect_cb;
215 117
216 GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb; 118 GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb;
119
120 /**
121 * Closure for the callbacks.
122 */
123 void *cb_cls;
217}; 124};
218 125
219 126
@@ -258,934 +165,170 @@ struct GNUNET_PSYC_StateQuery
258 165
259 166
260static void 167static void
261reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); 168channel_send_connect_msg (struct GNUNET_PSYC_Channel *chn)
262
263
264static void
265channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
266
267
268/**
269 * Reschedule a connect attempt to the service.
270 *
271 * @param ch Channel to reconnect.
272 */
273static void
274reschedule_connect (struct GNUNET_PSYC_Channel *ch)
275{ 169{
276 GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK); 170 uint16_t cmsg_size = ntohs (chn->connect_msg->size);
277 171 struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
278 if (NULL != ch->th) 172 memcpy (cmsg, chn->connect_msg, cmsg_size);
279 { 173 GNUNET_CLIENT_MANAGER_transmit_now (chn->client, cmsg);
280 GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
281 ch->th = NULL;
282 }
283 if (NULL != ch->client)
284 {
285 GNUNET_CLIENT_disconnect (ch->client);
286 ch->client = NULL;
287 }
288 ch->in_receive = GNUNET_NO;
289 LOG (GNUNET_ERROR_TYPE_DEBUG,
290 "Scheduling task to reconnect to PSYC service in %s.\n",
291 GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, GNUNET_YES));
292 ch->reconnect_task =
293 GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch);
294 ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay);
295} 174}
296 175
297 176
298/**
299 * Schedule transmission of the next message from our queue.
300 *
301 * @param ch PSYC channel handle
302 */
303static void
304transmit_next (struct GNUNET_PSYC_Channel *ch);
305
306
307/**
308 * Reset stored data related to the last received message.
309 */
310static void 177static void
311recv_reset (struct GNUNET_PSYC_Channel *ch) 178channel_recv_disconnect (void *cls,
179 struct GNUNET_CLIENT_MANAGER_Connection *client,
180 const struct GNUNET_MessageHeader *msg)
312{ 181{
313 ch->recv_state = MSG_STATE_START; 182 struct GNUNET_PSYC_Channel *
314 ch->recv_flags = 0; 183 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
315 ch->recv_message_id = 0; 184 GNUNET_CLIENT_MANAGER_reconnect (client);
316 //FIXME: ch->recv_slave_key = { 0 }; 185 channel_send_connect_msg (chn);
317 ch->recv_mod_value_size = 0;
318 ch->recv_mod_value_size_expected = 0;
319} 186}
320 187
321 188
322static void 189static void
323recv_error (struct GNUNET_PSYC_Channel *ch) 190channel_recv_message (void *cls,
191 struct GNUNET_CLIENT_MANAGER_Connection *client,
192 const struct GNUNET_MessageHeader *msg)
324{ 193{
325 GNUNET_PSYC_MessageCallback message_cb 194 struct GNUNET_PSYC_Channel *
326 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC 195 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
327 ? ch->hist_message_cb 196 GNUNET_PSYC_receive_message (chn->recv,
328 : ch->message_cb; 197 (const struct GNUNET_PSYC_MessageHeader *) msg);
329
330 if (NULL != message_cb)
331 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
332
333 recv_reset (ch);
334} 198}
335 199
336 200
337/**
338 * Queue a message part for transmission to the PSYC service.
339 *
340 * The message part is added to the current message buffer.
341 * When this buffer is full, it is added to the transmission queue.
342 *
343 * @param ch Channel struct for the client.
344 * @param msg Modifier message part, or NULL when there's no more modifiers.
345 * @param end End of message.
346 */
347static void 201static void
348queue_message (struct GNUNET_PSYC_Channel *ch, 202channel_recv_message_ack (void *cls,
349 const struct GNUNET_MessageHeader *msg, 203 struct GNUNET_CLIENT_MANAGER_Connection *client,
350 uint8_t end) 204 const struct GNUNET_MessageHeader *msg)
351{ 205{
352 uint16_t size = msg ? ntohs (msg->size) : 0; 206 struct GNUNET_PSYC_Channel *
353 207 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
354 LOG (GNUNET_ERROR_TYPE_DEBUG, 208 GNUNET_PSYC_transmit_got_ack (chn->tmit);
355 "Queueing message of type %u and size %u (end: %u)).\n",
356 ntohs (msg->type), size, end);
357
358 struct MessageQueue *mq = ch->tmit_msg;
359 struct GNUNET_MessageHeader *qmsg = NULL;
360 if (NULL != mq)
361 {
362 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
363 if (NULL == msg
364 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size)
365 {
366 /* End of message or buffer is full, add it to transmission queue
367 * and start with empty buffer */
368 qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
369 qmsg->size = htons (qmsg->size);
370 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
371 ch->tmit_msg = mq = NULL;
372 ch->tmit_ack_pending++;
373 }
374 else
375 {
376 /* Message fits in current buffer, append */
377 ch->tmit_msg
378 = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size);
379 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
380 memcpy ((char *) qmsg + qmsg->size, msg, size);
381 qmsg->size += size;
382 }
383 }
384
385 if (NULL == mq && NULL != msg)
386 {
387 /* Empty buffer, copy over message. */
388 ch->tmit_msg
389 = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size);
390 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
391 qmsg->size = sizeof (*qmsg) + size;
392 memcpy (&qmsg[1], msg, size);
393 }
394
395 if (NULL != mq
396 && (GNUNET_YES == end
397 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
398 < qmsg->size + sizeof (struct GNUNET_MessageHeader))))
399 {
400 /* End of message or buffer is full, add it to transmission queue. */
401 qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
402 qmsg->size = htons (qmsg->size);
403 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
404 ch->tmit_msg = mq = NULL;
405 ch->tmit_ack_pending++;
406 }
407
408 if (GNUNET_YES == end)
409 ch->in_transmit = GNUNET_NO;
410
411 transmit_next (ch);
412} 209}
413 210
414 211
415/**
416 * Request a modifier from a client to transmit.
417 *
418 * @param mst Master handle.
419 */
420static void 212static void
421channel_transmit_mod (struct GNUNET_PSYC_Channel *ch) 213master_recv_start_ack (void *cls,
214 struct GNUNET_CLIENT_MANAGER_Connection *client,
215 const struct GNUNET_MessageHeader *msg)
422{ 216{
423 uint16_t max_data_size, data_size; 217 struct GNUNET_PSYC_Master *
424 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; 218 mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
425 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; 219 sizeof (struct GNUNET_PSYC_Channel));
426 int notify_ret;
427
428 switch (ch->tmit.state)
429 {
430 case MSG_STATE_MODIFIER:
431 {
432 struct GNUNET_PSYC_MessageModifier *mod
433 = (struct GNUNET_PSYC_MessageModifier *) msg;
434 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
435 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
436 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
437 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
438 &mod->oper, &mod->value_size);
439 mod->name_size = strnlen ((char *) &mod[1], data_size);
440 if (mod->name_size < data_size)
441 {
442 mod->value_size = htonl (mod->value_size);
443 mod->name_size = htons (mod->name_size);
444 }
445 else if (0 < data_size)
446 {
447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
448 notify_ret = GNUNET_SYSERR;
449 }
450 break;
451 }
452 case MSG_STATE_MOD_CONT:
453 {
454 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
455 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
456 msg->size = sizeof (struct GNUNET_MessageHeader);
457 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
458 &data_size, &msg[1], NULL, NULL);
459 break;
460 }
461 default:
462 GNUNET_assert (0);
463 }
464
465 switch (notify_ret)
466 {
467 case GNUNET_NO:
468 if (0 == data_size)
469 { /* Transmission paused, nothing to send. */
470 ch->tmit_paused = GNUNET_YES;
471 return;
472 }
473 ch->tmit.state = MSG_STATE_MOD_CONT;
474 break;
475
476 case GNUNET_YES:
477 if (0 == data_size)
478 {
479 /* End of modifiers. */
480 ch->tmit.state = MSG_STATE_DATA;
481 if (0 == ch->tmit_ack_pending)
482 channel_transmit_data (ch);
483
484 return;
485 }
486 ch->tmit.state = MSG_STATE_MODIFIER;
487 break;
488
489 default:
490 LOG (GNUNET_ERROR_TYPE_ERROR,
491 "MasterTransmitNotifyModifier returned error "
492 "when requesting a modifier.\n");
493
494 ch->tmit.state = MSG_STATE_CANCEL;
495 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
496 msg->size = htons (sizeof (*msg));
497
498 queue_message (ch, msg, GNUNET_YES);
499 return;
500 }
501
502 if (0 < data_size)
503 {
504 GNUNET_assert (data_size <= max_data_size);
505 msg->size = htons (msg->size + data_size);
506 queue_message (ch, msg, GNUNET_NO);
507 }
508
509 channel_transmit_mod (ch);
510}
511 220
512 221 struct CountersResult *cres = (struct CountersResult *) msg;
513/** 222 if (NULL != mst->start_cb)
514 * Request data from a client to transmit. 223 mst->start_cb (mst->cb_cls, GNUNET_ntohll (cres->max_message_id));
515 *
516 * @param mst Master handle.
517 */
518static void
519channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
520{
521 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
522 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
523 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
524
525 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
526
527 int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
528 &data_size, &msg[1]);
529 switch (notify_ret)
530 {
531 case GNUNET_NO:
532 if (0 == data_size)
533 {
534 /* Transmission paused, nothing to send. */
535 ch->tmit_paused = GNUNET_YES;
536 return;
537 }
538 break;
539
540 case GNUNET_YES:
541 ch->tmit.state = MSG_STATE_END;
542 break;
543
544 default:
545 LOG (GNUNET_ERROR_TYPE_ERROR,
546 "MasterTransmitNotify returned error when requesting data.\n");
547
548 ch->tmit.state = MSG_STATE_CANCEL;
549 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
550 msg->size = htons (sizeof (*msg));
551 queue_message (ch, msg, GNUNET_YES);
552 return;
553 }
554
555 if (0 < data_size)
556 {
557 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
558 msg->size = htons (sizeof (*msg) + data_size);
559 queue_message (ch, msg, !notify_ret);
560 }
561
562 /* End of message. */
563 if (GNUNET_YES == notify_ret)
564 {
565 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
566 msg->size = htons (sizeof (*msg));
567 queue_message (ch, msg, GNUNET_YES);
568 }
569} 224}
570 225
571 226
572/**
573 * Send a message to a channel.
574 *
575 * @param ch Handle to the PSYC channel.
576 * @param method_name Which method should be invoked.
577 * @param notify_mod Function to call to obtain modifiers.
578 * @param notify_data Function to call to obtain fragments of the data.
579 * @param notify_cls Closure for @a notify_mod and @a notify_data.
580 * @param flags Flags for the message being transmitted.
581 *
582 * @return Transmission handle, NULL on error (i.e. more than one request queued).
583 */
584static struct GNUNET_PSYC_ChannelTransmitHandle *
585channel_transmit (struct GNUNET_PSYC_Channel *ch,
586 const char *method_name,
587 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
588 GNUNET_PSYC_TransmitNotifyData notify_data,
589 void *notify_cls,
590 uint32_t flags)
591{
592 if (GNUNET_NO != ch->in_transmit)
593 return NULL;
594 ch->in_transmit = GNUNET_YES;
595
596 size_t size = strlen (method_name) + 1;
597 struct GNUNET_PSYC_MessageMethod *pmeth;
598 struct GNUNET_MessageHeader *qmsg;
599 struct MessageQueue *
600 mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg)
601 + sizeof (*pmeth) + size);
602 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
603 qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size;
604
605 pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1];
606 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
607 pmeth->header.size = htons (sizeof (*pmeth) + size);
608 pmeth->flags = htonl (flags);
609 memcpy (&pmeth[1], method_name, size);
610
611 ch->tmit.ch = ch;
612 ch->tmit.notify_mod = notify_mod;
613 ch->tmit.notify_data = notify_data;
614 ch->tmit.notify_cls = notify_cls;
615 ch->tmit.state = MSG_STATE_MODIFIER;
616
617 channel_transmit_mod (ch);
618 return &ch->tmit;
619}
620
621
622/**
623 * Resume transmission to the channel.
624 *
625 * @param th Handle of the request that is being resumed.
626 */
627static void 227static void
628channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th) 228master_recv_join_request (void *cls,
229 struct GNUNET_CLIENT_MANAGER_Connection *client,
230 const struct GNUNET_MessageHeader *msg)
629{ 231{
630 struct GNUNET_PSYC_Channel *ch = th->ch; 232 struct GNUNET_PSYC_Master *
631 if (0 == ch->tmit_ack_pending) 233 mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
632 { 234 sizeof (struct GNUNET_PSYC_Channel));
633 ch->tmit_paused = GNUNET_NO;
634 channel_transmit_data (ch);
635 }
636}
637 235
236 const struct MasterJoinRequest *req = (const struct MasterJoinRequest *) msg;
638 237
639/** 238 struct GNUNET_PSYC_MessageHeader *pmsg = NULL;
640 * Abort transmission request to channel. 239 if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*pmsg))
641 * 240 pmsg = (struct GNUNET_PSYC_MessageHeader *) &req[1];
642 * @param th Handle of the request that is being aborted.
643 */
644static void
645channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
646{
647 struct GNUNET_PSYC_Channel *ch = th->ch;
648 if (GNUNET_NO == ch->in_transmit)
649 return;
650}
651 241
242 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
243 jh->mst = mst;
244 jh->slave_key = req->slave_key;
652 245
653/** 246 if (NULL != mst->join_req_cb)
654 * Handle incoming message from the PSYC service. 247 mst->join_req_cb (mst->cb_cls, &req->slave_key, pmsg, jh);
655 *
656 * @param ch The channel the message is sent to.
657 * @param pmsg The message.
658 */
659static void
660handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
661 const struct GNUNET_PSYC_MessageHeader *msg)
662{
663 uint16_t size = ntohs (msg->header.size);
664 uint32_t flags = ntohl (msg->flags);
665
666 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
667 (struct GNUNET_MessageHeader *) msg);
668
669 if (MSG_STATE_START == ch->recv_state)
670 {
671 ch->recv_message_id = GNUNET_ntohll (msg->message_id);
672 ch->recv_flags = flags;
673 ch->recv_slave_key = msg->slave_key;
674 ch->recv_mod_value_size = 0;
675 ch->recv_mod_value_size_expected = 0;
676 }
677 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
678 {
679 // FIXME
680 LOG (GNUNET_ERROR_TYPE_WARNING,
681 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
682 GNUNET_ntohll (msg->message_id), ch->recv_message_id);
683 GNUNET_break_op (0);
684 recv_error (ch);
685 return;
686 }
687 else if (flags != ch->recv_flags)
688 {
689 LOG (GNUNET_ERROR_TYPE_WARNING,
690 "Unexpected message flags. Got: %lu, expected: %lu\n",
691 flags, ch->recv_flags);
692 GNUNET_break_op (0);
693 recv_error (ch);
694 return;
695 }
696
697 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
698
699 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
700 {
701 const struct GNUNET_MessageHeader *pmsg
702 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
703 psize = ntohs (pmsg->size);
704 ptype = ntohs (pmsg->type);
705 size_eq = size_min = 0;
706
707 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
708 {
709 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
710 "Dropping message of type %u with invalid size %u.\n",
711 ptype, psize);
712 recv_error (ch);
713 return;
714 }
715
716 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
717 "Received message part from PSYC.\n");
718 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
719
720 switch (ptype)
721 {
722 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
723 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
724 break;
725 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
726 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
727 break;
728 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
729 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
730 size_min = sizeof (struct GNUNET_MessageHeader);
731 break;
732 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
733 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
734 size_eq = sizeof (struct GNUNET_MessageHeader);
735 break;
736 default:
737 GNUNET_break_op (0);
738 recv_error (ch);
739 return;
740 }
741
742 if (! ((0 < size_eq && psize == size_eq)
743 || (0 < size_min && size_min <= psize)))
744 {
745 GNUNET_break_op (0);
746 recv_error (ch);
747 return;
748 }
749
750 switch (ptype)
751 {
752 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
753 {
754 struct GNUNET_PSYC_MessageMethod *meth
755 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
756
757 if (MSG_STATE_START != ch->recv_state)
758 {
759 LOG (GNUNET_ERROR_TYPE_WARNING,
760 "Dropping out of order message method (%u).\n",
761 ch->recv_state);
762 /* It is normal to receive an incomplete message right after connecting,
763 * but should not happen later.
764 * FIXME: add a check for this condition.
765 */
766 GNUNET_break_op (0);
767 recv_error (ch);
768 return;
769 }
770
771 if ('\0' != *((char *) meth + psize - 1))
772 {
773 LOG (GNUNET_ERROR_TYPE_WARNING,
774 "Dropping message with malformed method. "
775 "Message ID: %" PRIu64 "\n", ch->recv_message_id);
776 GNUNET_break_op (0);
777 recv_error (ch);
778 return;
779 }
780 ch->recv_state = MSG_STATE_METHOD;
781 break;
782 }
783 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
784 {
785 if (!(MSG_STATE_METHOD == ch->recv_state
786 || MSG_STATE_MODIFIER == ch->recv_state
787 || MSG_STATE_MOD_CONT == ch->recv_state))
788 {
789 LOG (GNUNET_ERROR_TYPE_WARNING,
790 "Dropping out of order message modifier (%u).\n",
791 ch->recv_state);
792 GNUNET_break_op (0);
793 recv_error (ch);
794 return;
795 }
796
797 struct GNUNET_PSYC_MessageModifier *mod
798 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
799
800 uint16_t name_size = ntohs (mod->name_size);
801 ch->recv_mod_value_size_expected = ntohl (mod->value_size);
802 ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
803
804 if (psize < sizeof (*mod) + name_size + 1
805 || '\0' != *((char *) &mod[1] + name_size)
806 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
807 {
808 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
809 GNUNET_break_op (0);
810 recv_error (ch);
811 return;
812 }
813 ch->recv_state = MSG_STATE_MODIFIER;
814 break;
815 }
816 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
817 {
818 ch->recv_mod_value_size += psize - sizeof (*pmsg);
819
820 if (!(MSG_STATE_MODIFIER == ch->recv_state
821 || MSG_STATE_MOD_CONT == ch->recv_state)
822 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
823 {
824 LOG (GNUNET_ERROR_TYPE_WARNING,
825 "Dropping out of order message modifier continuation "
826 "!(%u == %u || %u == %u) || %lu < %lu.\n",
827 MSG_STATE_MODIFIER, ch->recv_state,
828 MSG_STATE_MOD_CONT, ch->recv_state,
829 ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
830 GNUNET_break_op (0);
831 recv_error (ch);
832 return;
833 }
834 break;
835 }
836 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
837 {
838 if (ch->recv_state < MSG_STATE_METHOD
839 || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
840 {
841 LOG (GNUNET_ERROR_TYPE_WARNING,
842 "Dropping out of order message data fragment "
843 "(%u < %u || %lu != %lu).\n",
844 ch->recv_state, MSG_STATE_METHOD,
845 ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
846
847 GNUNET_break_op (0);
848 recv_error (ch);
849 return;
850 }
851 ch->recv_state = MSG_STATE_DATA;
852 break;
853 }
854 }
855
856 GNUNET_PSYC_MessageCallback message_cb
857 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
858 ? ch->hist_message_cb
859 : ch->message_cb;
860
861 if (NULL != message_cb)
862 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg);
863
864 switch (ptype)
865 {
866 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
867 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
868 recv_reset (ch);
869 break;
870 }
871 }
872} 248}
873 249
874 250
875/**
876 * Handle incoming message acknowledgement from the PSYC service.
877 *
878 * @param ch The channel the acknowledgement is sent to.
879 */
880static void 251static void
881handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch) 252slave_recv_join_ack (void *cls,
253 struct GNUNET_CLIENT_MANAGER_Connection *client,
254 const struct GNUNET_MessageHeader *msg)
882{ 255{
883 if (0 == ch->tmit_ack_pending) 256 struct GNUNET_PSYC_Slave *
884 { 257 slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
885 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); 258 sizeof (struct GNUNET_PSYC_Channel));
886 GNUNET_break (0); 259 struct CountersResult *cres = (struct CountersResult *) msg;
887 return; 260 if (NULL != slv->connect_cb)
888 } 261 slv->connect_cb (slv->cb_cls, GNUNET_ntohll (cres->max_message_id));
889 ch->tmit_ack_pending--;
890
891 switch (ch->tmit.state)
892 {
893 case MSG_STATE_MODIFIER:
894 case MSG_STATE_MOD_CONT:
895 if (GNUNET_NO == ch->tmit_paused)
896 channel_transmit_mod (ch);
897 break;
898
899 case MSG_STATE_DATA:
900 if (GNUNET_NO == ch->tmit_paused)
901 channel_transmit_data (ch);
902 break;
903
904 case MSG_STATE_END:
905 case MSG_STATE_CANCEL:
906 break;
907
908 default:
909 LOG (GNUNET_ERROR_TYPE_DEBUG,
910 "Ignoring message ACK in state %u.\n", ch->tmit.state);
911 }
912} 262}
913 263
914 264
915static void 265static void
916handle_psyc_join_request (struct GNUNET_PSYC_Master *mst, 266slave_recv_join_decision (void *cls,
917 const struct MasterJoinRequest *req) 267 struct GNUNET_CLIENT_MANAGER_Connection *client,
268 const struct GNUNET_MessageHeader *msg)
918{ 269{
919 struct GNUNET_PSYC_MessageHeader *msg = NULL; 270 struct GNUNET_PSYC_Slave *
920 if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*msg)) 271 slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
921 msg = (struct GNUNET_PSYC_MessageHeader *) &req[1]; 272 sizeof (struct GNUNET_PSYC_Channel));
273 const struct SlaveJoinDecision *
274 dcsn = (const struct SlaveJoinDecision *) msg;
922 275
923 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); 276 struct GNUNET_PSYC_MessageHeader *pmsg = NULL;
924 jh->mst = mst; 277 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg))
925 jh->slave_key = req->slave_key; 278 pmsg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1];
926 279
927 if (NULL != mst->join_req_cb) 280 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
928 mst->join_req_cb (mst->ch.cb_cls, &req->slave_key, msg, jh); 281 if (NULL != slv->join_dcsn_cb)
282 slv->join_dcsn_cb (slv->cb_cls, ntohl (dcsn->is_admitted), pmsg);
929} 283}
930 284
931 285
932static void 286static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] =
933handle_psyc_join_decision (struct GNUNET_PSYC_Slave *slv,
934 const struct SlaveJoinDecision *dcsn)
935{ 287{
936 struct GNUNET_PSYC_MessageHeader *msg = NULL; 288 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
937 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*msg))
938 msg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1];
939 289
940 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); 290 { &channel_recv_message, NULL,
941 if (NULL != slv->join_dcsn_cb) 291 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
942 slv->join_dcsn_cb (slv->ch.cb_cls, ntohl (dcsn->is_admitted), msg); 292 sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES },
943}
944 293
294 { &channel_recv_message_ack, NULL,
295 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
296 sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
945 297
946/** 298 { &master_recv_start_ack, NULL,
947 * Type of a function to call when we receive a message 299 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK,
948 * from the service. 300 sizeof (struct CountersResult), GNUNET_NO },
949 *
950 * @param cls closure
951 * @param msg message received, NULL on timeout or fatal error
952 */
953static void
954message_handler (void *cls,
955 const struct GNUNET_MessageHeader *msg)
956{
957 struct GNUNET_PSYC_Channel *ch = cls;
958 struct GNUNET_PSYC_Master *mst = cls;
959 struct GNUNET_PSYC_Slave *slv = cls;
960
961 if (NULL == msg)
962 {
963 // timeout / disconnected from service, reconnect
964 reschedule_connect (ch);
965 return;
966 }
967 uint16_t size_eq = 0;
968 uint16_t size_min = 0;
969 uint16_t size = ntohs (msg->size);
970 uint16_t type = ntohs (msg->type);
971
972 LOG (GNUNET_ERROR_TYPE_DEBUG,
973 "Received message of type %d and size %u from PSYC service\n",
974 type, size);
975
976 switch (type)
977 {
978 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
979 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
980 size_eq = sizeof (struct CountersResult);
981 break;
982 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
983 size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
984 break;
985 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
986 size_eq = sizeof (struct GNUNET_MessageHeader);
987 break;
988 case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
989 size_min = sizeof (struct MasterJoinRequest);
990 break;
991 case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION:
992 size_min = sizeof (struct SlaveJoinDecision);
993 break;
994 default:
995 GNUNET_break_op (0);
996 return;
997 }
998
999 if (! ((0 < size_eq && size == size_eq)
1000 || (0 < size_min && size_min <= size)))
1001 {
1002 GNUNET_break_op (0);
1003 return;
1004 }
1005
1006 switch (type)
1007 {
1008 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
1009 {
1010 struct CountersResult *cres = (struct CountersResult *) msg;
1011 if (NULL != mst->start_cb)
1012 mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
1013 break;
1014 }
1015 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
1016 {
1017 struct CountersResult *cres = (struct CountersResult *) msg;
1018 if (NULL != slv->connect_cb)
1019 slv->connect_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
1020 break;
1021 }
1022 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
1023 {
1024 handle_psyc_message_ack (ch);
1025 break;
1026 }
1027
1028 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
1029 handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
1030 break;
1031
1032 case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
1033 handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch,
1034 (const struct MasterJoinRequest *) msg);
1035 break;
1036
1037 case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION:
1038 handle_psyc_join_decision ((struct GNUNET_PSYC_Slave *) ch,
1039 (const struct SlaveJoinDecision *) msg);
1040 break;
1041 }
1042
1043 if (NULL != ch->client)
1044 {
1045 GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
1046 GNUNET_TIME_UNIT_FOREVER_REL);
1047 }
1048}
1049 301
302 { &master_recv_join_request, NULL,
303 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
304 sizeof (struct MasterJoinRequest), GNUNET_YES },
1050 305
1051/** 306 { NULL, NULL, 0, 0, GNUNET_NO }
1052 * Transmit next message to service. 307};
1053 *
1054 * @param cls The struct GNUNET_PSYC_Channel.
1055 * @param size Number of bytes available in @a buf.
1056 * @param buf Where to copy the message.
1057 *
1058 * @return Number of bytes copied to @a buf.
1059 */
1060static size_t
1061send_next_message (void *cls, size_t size, void *buf)
1062{
1063 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
1064 struct GNUNET_PSYC_Channel *ch = cls;
1065 struct MessageQueue *mq = ch->tmit_head;
1066 if (NULL == mq)
1067 return 0;
1068 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
1069 size_t ret = ntohs (qmsg->size);
1070 ch->th = NULL;
1071 if (ret > size)
1072 {
1073 reschedule_connect (ch);
1074 return 0;
1075 }
1076 memcpy (buf, qmsg, ret);
1077
1078 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq);
1079 GNUNET_free (mq);
1080
1081 if (NULL != ch->tmit_head)
1082 transmit_next (ch);
1083
1084 if (GNUNET_NO == ch->in_receive)
1085 {
1086 ch->in_receive = GNUNET_YES;
1087 GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
1088 GNUNET_TIME_UNIT_FOREVER_REL);
1089 }
1090 return ret;
1091}
1092 308
1093 309
1094/** 310static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] =
1095 * Schedule transmission of the next message from our queue.
1096 *
1097 * @param ch PSYC handle.
1098 */
1099static void
1100transmit_next (struct GNUNET_PSYC_Channel *ch)
1101{ 311{
1102 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); 312 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
1103 if (NULL != ch->th || NULL == ch->client)
1104 return;
1105
1106 struct MessageQueue *mq = ch->tmit_head;
1107 if (NULL == mq)
1108 return;
1109 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
1110
1111 ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client,
1112 ntohs (qmsg->size),
1113 GNUNET_TIME_UNIT_FOREVER_REL,
1114 GNUNET_NO,
1115 &send_next_message,
1116 ch);
1117}
1118 313
314 { &channel_recv_message, NULL,
315 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
316 sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES },
1119 317
1120/** 318 { &channel_recv_message_ack, NULL,
1121 * Try again to connect to the PSYC service. 319 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
1122 * 320 sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
1123 * @param cls Channel handle.
1124 * @param tc Scheduler context.
1125 */
1126static void
1127reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1128{
1129 struct GNUNET_PSYC_Channel *ch = cls;
1130
1131 recv_reset (ch);
1132 ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1133 LOG (GNUNET_ERROR_TYPE_DEBUG,
1134 "Connecting to PSYC service.\n");
1135 GNUNET_assert (NULL == ch->client);
1136 ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
1137 GNUNET_assert (NULL != ch->client);
1138 uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
1139
1140 if (NULL == ch->tmit_head ||
1141 0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size))
1142 {
1143 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
1144 memcpy (&mq[1], ch->reconnect_msg, reconn_size);
1145 GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq);
1146 }
1147 transmit_next (ch);
1148}
1149 321
322 { &slave_recv_join_ack, NULL,
323 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK,
324 sizeof (struct CountersResult), GNUNET_NO },
1150 325
1151/** 326 { &slave_recv_join_decision, NULL,
1152 * Disconnect from the PSYC service. 327 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
1153 * 328 sizeof (struct SlaveJoinDecision), GNUNET_YES },
1154 * @param c Channel handle to disconnect. 329
1155 */ 330 { NULL, NULL, 0, 0, GNUNET_NO }
1156static void 331};
1157disconnect (void *c)
1158{
1159 struct GNUNET_PSYC_Channel *ch = c;
1160
1161 GNUNET_assert (NULL != ch);
1162 if (ch->tmit_head != ch->tmit_tail)
1163 {
1164 LOG (GNUNET_ERROR_TYPE_ERROR,
1165 "Disconnecting while there are still outstanding messages!\n");
1166 GNUNET_break (0);
1167 }
1168 if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1169 {
1170 GNUNET_SCHEDULER_cancel (ch->reconnect_task);
1171 ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1172 }
1173 if (NULL != ch->th)
1174 {
1175 GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
1176 ch->th = NULL;
1177 }
1178 if (NULL != ch->client)
1179 {
1180 GNUNET_CLIENT_disconnect (ch->client);
1181 ch->client = NULL;
1182 }
1183 if (NULL != ch->reconnect_msg)
1184 {
1185 GNUNET_free (ch->reconnect_msg);
1186 ch->reconnect_msg = NULL;
1187 }
1188}
1189 332
1190 333
1191/** 334/**
@@ -1227,24 +370,29 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
1227 void *cls) 370 void *cls)
1228{ 371{
1229 struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); 372 struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst));
1230 struct GNUNET_PSYC_Channel *ch = &mst->ch; 373 struct GNUNET_PSYC_Channel *chn = &mst->chn;
1231 struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
1232 374
375 struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
1233 req->header.size = htons (sizeof (*req)); 376 req->header.size = htons (sizeof (*req));
1234 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); 377 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
1235 req->channel_key = *channel_key; 378 req->channel_key = *channel_key;
1236 req->policy = policy; 379 req->policy = policy;
1237 380
381 chn->connect_msg = (struct GNUNET_MessageHeader *) req;
382 chn->cfg = cfg;
383 chn->is_master = GNUNET_YES;
384
1238 mst->start_cb = start_cb; 385 mst->start_cb = start_cb;
1239 mst->join_req_cb = join_request_cb; 386 mst->join_req_cb = join_request_cb;
1240 ch->message_cb = message_cb; 387 mst->cb_cls = cls;
1241 ch->cb_cls = cls; 388
1242 ch->cfg = cfg; 389 chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", master_handlers);
1243 ch->is_master = GNUNET_YES; 390 GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn));
1244 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; 391
1245 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 392 chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
1246 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); 393 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls);
1247 394
395 channel_send_connect_msg (chn);
1248 return mst; 396 return mst;
1249} 397}
1250 398
@@ -1253,12 +401,13 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
1253 * Stop a PSYC master channel. 401 * Stop a PSYC master channel.
1254 * 402 *
1255 * @param master PSYC channel master to stop. 403 * @param master PSYC channel master to stop.
404 * @param keep_active FIXME
1256 */ 405 */
1257void 406void
1258GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) 407GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst)
1259{ 408{
1260 disconnect (master); 409 GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES);
1261 GNUNET_free (master); 410 GNUNET_free (mst);
1262} 411}
1263 412
1264 413
@@ -1292,7 +441,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1292 const struct GNUNET_PeerIdentity *relays, 441 const struct GNUNET_PeerIdentity *relays,
1293 const struct GNUNET_PSYC_MessageHeader *join_resp) 442 const struct GNUNET_PSYC_MessageHeader *join_resp)
1294{ 443{
1295 struct GNUNET_PSYC_Channel *ch = &jh->mst->ch; 444 struct GNUNET_PSYC_Channel *chn = &jh->mst->chn;
1296 struct MasterJoinDecision *dcsn; 445 struct MasterJoinDecision *dcsn;
1297 uint16_t join_resp_size 446 uint16_t join_resp_size
1298 = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0; 447 = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0;
@@ -1302,9 +451,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1302 < sizeof (*dcsn) + relay_size + join_resp_size) 451 < sizeof (*dcsn) + relay_size + join_resp_size)
1303 return GNUNET_SYSERR; 452 return GNUNET_SYSERR;
1304 453
1305 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn) 454 dcsn = GNUNET_malloc (sizeof (*dcsn) + relay_size + join_resp_size);
1306 + relay_size + join_resp_size);
1307 dcsn = (struct MasterJoinDecision *) &mq[1];
1308 dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); 455 dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
1309 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); 456 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
1310 dcsn->is_admitted = htonl (is_admitted); 457 dcsn->is_admitted = htonl (is_admitted);
@@ -1313,8 +460,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1313 if (0 < join_resp_size) 460 if (0 < join_resp_size)
1314 memcpy (&dcsn[1], join_resp, join_resp_size); 461 memcpy (&dcsn[1], join_resp, join_resp_size);
1315 462
1316 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); 463 GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header);
1317 transmit_next (ch);
1318 return GNUNET_OK; 464 return GNUNET_OK;
1319} 465}
1320 466
@@ -1332,40 +478,59 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1332 * @return Transmission handle, NULL on error (i.e. more than one request queued). 478 * @return Transmission handle, NULL on error (i.e. more than one request queued).
1333 */ 479 */
1334struct GNUNET_PSYC_MasterTransmitHandle * 480struct GNUNET_PSYC_MasterTransmitHandle *
1335GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, 481GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst,
1336 const char *method_name, 482 const char *method_name,
1337 GNUNET_PSYC_TransmitNotifyModifier notify_mod, 483 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1338 GNUNET_PSYC_TransmitNotifyData notify_data, 484 GNUNET_PSYC_TransmitNotifyData notify_data,
1339 void *notify_cls, 485 void *notify_cls,
1340 enum GNUNET_PSYC_MasterTransmitFlags flags) 486 enum GNUNET_PSYC_MasterTransmitFlags flags)
1341{ 487{
1342 return (struct GNUNET_PSYC_MasterTransmitHandle *) 488 if (GNUNET_OK
1343 channel_transmit (&master->ch, method_name, notify_mod, notify_data, 489 == GNUNET_PSYC_transmit_message (mst->chn.tmit, method_name, NULL,
1344 notify_cls, flags); 490 notify_mod, notify_data, notify_cls,
491 flags))
492 return (struct GNUNET_PSYC_MasterTransmitHandle *) mst->chn.tmit;
493 else
494 return NULL;
1345} 495}
1346 496
1347 497
1348/** 498/**
1349 * Resume transmission to the channel. 499 * Resume transmission to the channel.
1350 * 500 *
1351 * @param th Handle of the request that is being resumed. 501 * @param tmit Handle of the request that is being resumed.
1352 */ 502 */
1353void 503void
1354GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 504GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *tmit)
1355{ 505{
1356 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); 506 GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit);
1357} 507}
1358 508
1359 509
1360/** 510/**
1361 * Abort transmission request to the channel. 511 * Abort transmission request to the channel.
1362 * 512 *
1363 * @param th Handle of the request that is being aborted. 513 * @param tmit Handle of the request that is being aborted.
1364 */ 514 */
1365void 515void
1366GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) 516GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *tmit)
1367{ 517{
1368 channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); 518 GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit);
519}
520
521
522/**
523 * Convert a channel @a master to a @e channel handle to access the @e channel
524 * APIs.
525 *
526 * @param master Channel master handle.
527 *
528 * @return Channel handle, valid for as long as @a master is valid.
529 */
530struct GNUNET_PSYC_Channel *
531GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
532{
533 return &master->chn;
1369} 534}
1370 535
1371 536
@@ -1420,7 +585,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1420 uint16_t data_size) 585 uint16_t data_size)
1421{ 586{
1422 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); 587 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
1423 struct GNUNET_PSYC_Channel *ch = &slv->ch; 588 struct GNUNET_PSYC_Channel *chn = &slv->chn;
1424 struct SlaveJoinRequest *req 589 struct SlaveJoinRequest *req
1425 = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays)); 590 = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
1426 req->header.size = htons (sizeof (*req) 591 req->header.size = htons (sizeof (*req)
@@ -1432,17 +597,21 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1432 req->relay_count = htonl (relay_count); 597 req->relay_count = htonl (relay_count);
1433 memcpy (&req[1], relays, relay_count * sizeof (*relays)); 598 memcpy (&req[1], relays, relay_count * sizeof (*relays));
1434 599
600 chn->connect_msg = (struct GNUNET_MessageHeader *) req;
601 chn->cfg = cfg;
602 chn->is_master = GNUNET_NO;
603
1435 slv->connect_cb = connect_cb; 604 slv->connect_cb = connect_cb;
1436 slv->join_dcsn_cb = join_decision_cb; 605 slv->join_dcsn_cb = join_decision_cb;
1437 ch->message_cb = message_cb; 606 slv->cb_cls = cls;
1438 ch->cb_cls = cls;
1439 607
1440 ch->cfg = cfg; 608 chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers);
1441 ch->is_master = GNUNET_NO; 609 GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn));
1442 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
1443 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1444 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
1445 610
611 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls);
612 chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
613
614 channel_send_connect_msg (chn);
1446 return slv; 615 return slv;
1447} 616}
1448 617
@@ -1456,10 +625,10 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1456 * @param slave Slave handle. 625 * @param slave Slave handle.
1457 */ 626 */
1458void 627void
1459GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) 628GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv)
1460{ 629{
1461 disconnect (slave); 630 GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES);
1462 GNUNET_free (slave); 631 GNUNET_free (slv);
1463} 632}
1464 633
1465 634
@@ -1477,69 +646,59 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1477 * queued). 646 * queued).
1478 */ 647 */
1479struct GNUNET_PSYC_SlaveTransmitHandle * 648struct GNUNET_PSYC_SlaveTransmitHandle *
1480GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, 649GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slv,
1481 const char *method_name, 650 const char *method_name,
1482 GNUNET_PSYC_TransmitNotifyModifier notify_mod, 651 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1483 GNUNET_PSYC_TransmitNotifyData notify_data, 652 GNUNET_PSYC_TransmitNotifyData notify_data,
1484 void *notify_cls, 653 void *notify_cls,
1485 enum GNUNET_PSYC_SlaveTransmitFlags flags) 654 enum GNUNET_PSYC_SlaveTransmitFlags flags)
655
1486{ 656{
1487 return (struct GNUNET_PSYC_SlaveTransmitHandle *) 657 if (GNUNET_OK
1488 channel_transmit (&slave->ch, method_name, 658 == GNUNET_PSYC_transmit_message (slv->chn.tmit, method_name, NULL,
1489 notify_mod, notify_data, notify_cls, flags); 659 notify_mod, notify_data, notify_cls,
660 flags))
661 return (struct GNUNET_PSYC_SlaveTransmitHandle *) slv->chn.tmit;
662 else
663 return NULL;
1490} 664}
1491 665
1492 666
1493/** 667/**
1494 * Resume transmission to the master. 668 * Resume transmission to the master.
1495 * 669 *
1496 * @param th Handle of the request that is being resumed. 670 * @param tmit Handle of the request that is being resumed.
1497 */ 671 */
1498void 672void
1499GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th) 673GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *tmit)
1500{ 674{
1501 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); 675 GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit);
1502} 676}
1503 677
1504 678
1505/** 679/**
1506 * Abort transmission request to master. 680 * Abort transmission request to master.
1507 * 681 *
1508 * @param th Handle of the request that is being aborted. 682 * @param tmit Handle of the request that is being aborted.
1509 */ 683 */
1510void 684void
1511GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) 685GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *tmit)
1512{ 686{
1513 channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); 687 GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit);
1514}
1515
1516
1517/**
1518 * Convert a channel @a master to a @e channel handle to access the @e channel
1519 * APIs.
1520 *
1521 * @param master Channel master handle.
1522 *
1523 * @return Channel handle, valid for as long as @a master is valid.
1524 */
1525struct GNUNET_PSYC_Channel *
1526GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1527{
1528 return &master->ch;
1529} 688}
1530 689
1531 690
1532/** 691/**
1533 * Convert @a slave to a @e channel handle to access the @e channel APIs. 692 * Convert @a slave to a @e channel handle to access the @e channel APIs.
1534 * 693 *
1535 * @param slave Slave handle. 694 * @param slv Slave handle.
1536 * 695 *
1537 * @return Channel handle, valid for as long as @a slave is valid. 696 * @return Channel handle, valid for as long as @a slave is valid.
1538 */ 697 */
1539struct GNUNET_PSYC_Channel * 698struct GNUNET_PSYC_Channel *
1540GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) 699GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv)
1541{ 700{
1542 return &slave->ch; 701 return &slv->chn;
1543} 702}
1544 703
1545 704
@@ -1565,23 +724,17 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
1565 * @param effective_since Addition of slave is in effect since this message ID. 724 * @param effective_since Addition of slave is in effect since this message ID.
1566 */ 725 */
1567void 726void
1568GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, 727GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1569 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 728 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1570 uint64_t announced_at, 729 uint64_t announced_at,
1571 uint64_t effective_since) 730 uint64_t effective_since)
1572{ 731{
1573 struct ChannelSlaveAdd *slvadd; 732 struct ChannelSlaveAdd *add = GNUNET_malloc (sizeof (*add));
1574 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd)); 733 add->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
1575 734 add->header.size = htons (sizeof (*add));
1576 slvadd = (struct ChannelSlaveAdd *) &mq[1]; 735 add->announced_at = GNUNET_htonll (announced_at);
1577 slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); 736 add->effective_since = GNUNET_htonll (effective_since);
1578 slvadd->header.size = htons (sizeof (*slvadd)); 737 GNUNET_CLIENT_MANAGER_transmit (chn->client, &add->header);
1579 slvadd->announced_at = GNUNET_htonll (announced_at);
1580 slvadd->effective_since = GNUNET_htonll (effective_since);
1581 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1582 channel->tmit_tail,
1583 mq);
1584 transmit_next (channel);
1585} 738}
1586 739
1587 740
@@ -1607,21 +760,15 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
1607 * @param announced_at ID of the message that announced the membership change. 760 * @param announced_at ID of the message that announced the membership change.
1608 */ 761 */
1609void 762void
1610GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, 763GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1611 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 764 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1612 uint64_t announced_at) 765 uint64_t announced_at)
1613{ 766{
1614 struct ChannelSlaveRemove *slvrm; 767 struct ChannelSlaveRemove *rm = GNUNET_malloc (sizeof (*rm));
1615 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm)); 768 rm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
1616 769 rm->header.size = htons (sizeof (*rm));
1617 slvrm = (struct ChannelSlaveRemove *) &mq[1]; 770 rm->announced_at = GNUNET_htonll (announced_at);
1618 slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); 771 GNUNET_CLIENT_MANAGER_transmit (chn->client, &rm->header);
1619 slvrm->header.size = htons (sizeof (*slvrm));
1620 slvrm->announced_at = GNUNET_htonll (announced_at);
1621 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1622 channel->tmit_tail,
1623 mq);
1624 transmit_next (channel);
1625} 772}
1626 773
1627 774
diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c
index 6dd968190..0104c93e8 100644
--- a/src/psyc/psyc_util_lib.c
+++ b/src/psyc/psyc_util_lib.c
@@ -28,6 +28,7 @@
28 28
29#include "platform.h" 29#include "platform.h"
30#include "gnunet_util_lib.h" 30#include "gnunet_util_lib.h"
31#include "gnunet_env_lib.h"
31#include "gnunet_psyc_service.h" 32#include "gnunet_psyc_service.h"
32#include "gnunet_psyc_util_lib.h" 33#include "gnunet_psyc_util_lib.h"
33 34
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 35e80868c..6468b8a2b 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -68,6 +68,7 @@ struct TransmitClosure
68 struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit; 68 struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit;
69 struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit; 69 struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit;
70 struct GNUNET_ENV_Environment *env; 70 struct GNUNET_ENV_Environment *env;
71 struct GNUNET_ENV_Modifier *mod;
71 char *data[16]; 72 char *data[16];
72 const char *mod_value; 73 const char *mod_value;
73 size_t mod_value_size; 74 size_t mod_value_size;
@@ -79,7 +80,7 @@ struct TransmitClosure
79 80
80struct TransmitClosure *tmit; 81struct TransmitClosure *tmit;
81 82
82static int join_req_count; 83static uint8_t join_req_count;
83 84
84enum 85enum
85{ 86{
@@ -183,7 +184,7 @@ master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
183 184
184 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 185 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
185 "Master got message part of type %u and size %u " 186 "Master got message part of type %u and size %u "
186 "belonging to message ID %llu with flags %xu\n", 187 "belonging to message ID %llu with flags %x\n",
187 type, size, message_id, flags); 188 type, size, message_id, flags);
188 189
189 switch (test) 190 switch (test)
@@ -227,7 +228,7 @@ slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
227 228
228 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 229 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
229 "Slave got message part of type %u and size %u " 230 "Slave got message part of type %u and size %u "
230 "belonging to message ID %llu with flags %xu\n", 231 "belonging to message ID %llu with flags %x\n",
231 type, size, message_id, flags); 232 type, size, message_id, flags);
232 233
233 switch (test) 234 switch (test)
@@ -256,6 +257,48 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
256 257
257 258
258static int 259static int
260tmit_notify_data (void *cls, uint16_t *data_size, void *data)
261{
262 struct TransmitClosure *tmit = cls;
263 if (0 == tmit->data_count)
264 {
265 *data_size = 0;
266 return GNUNET_YES;
267 }
268
269 uint16_t size = strlen (tmit->data[tmit->n]);
270 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
271 "Transmit notify data: %u bytes available, "
272 "processing fragment %u/%u (size %u).\n",
273 *data_size, tmit->n + 1, tmit->data_count, size);
274 if (*data_size < size)
275 {
276 *data_size = 0;
277 GNUNET_assert (0);
278 return GNUNET_SYSERR;
279 }
280
281 if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
282 {
283 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
284 tmit->paused = GNUNET_YES;
285 GNUNET_SCHEDULER_add_delayed (
286 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
287 tmit->data_delay[tmit->n]),
288 &transmit_resume, tmit);
289 *data_size = 0;
290 return GNUNET_NO;
291 }
292 tmit->paused = GNUNET_NO;
293
294 *data_size = size;
295 memcpy (data, tmit->data[tmit->n], size);
296
297 return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES;
298}
299
300
301static int
259tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper, 302tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
260 uint32_t *full_value_size) 303 uint32_t *full_value_size)
261{ 304{
@@ -265,41 +308,39 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
265 "%u modifiers left to process.\n", 308 "%u modifiers left to process.\n",
266 *data_size, GNUNET_ENV_environment_get_count (tmit->env)); 309 *data_size, GNUNET_ENV_environment_get_count (tmit->env));
267 310
268 enum GNUNET_ENV_Operator op = 0;
269 const char *name = NULL;
270 const char *value = NULL;
271 uint16_t name_size = 0; 311 uint16_t name_size = 0;
272 size_t value_size = 0; 312 size_t value_size = 0;
313 const char *value = NULL;
273 314
274 if (NULL != oper) 315 if (NULL != oper && NULL != tmit->mod)
275 { /* New modifier */ 316 { /* New modifier */
276 if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name, 317 tmit->mod = tmit->mod->next;
277 (void *) &value, &value_size)) 318 if (NULL == tmit->mod)
278 { /* No more modifiers, continue with data */ 319 { /* No more modifiers, continue with data */
279 *data_size = 0; 320 *data_size = 0;
280 return GNUNET_YES; 321 return GNUNET_YES;
281 } 322 }
282 323
283 GNUNET_assert (value_size < UINT32_MAX); 324 GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
284 *full_value_size = value_size; 325 *full_value_size = tmit->mod->value_size;
285 *oper = op; 326 *oper = tmit->mod->oper;
286 name_size = strlen (name); 327 name_size = strlen (tmit->mod->name);
287 328
288 if (name_size + 1 + value_size <= *data_size) 329 if (name_size + 1 + tmit->mod->value_size <= *data_size)
289 { 330 {
290 *data_size = name_size + 1 + value_size; 331 *data_size = name_size + 1 + tmit->mod->value_size;
291 } 332 }
292 else 333 else
293 { 334 {
294 tmit->mod_value_size = value_size; 335 tmit->mod_value_size = tmit->mod->value_size;
295 value_size = *data_size - name_size - 1; 336 value_size = *data_size - name_size - 1;
296 tmit->mod_value_size -= value_size; 337 tmit->mod_value_size -= value_size;
297 tmit->mod_value = value + value_size; 338 tmit->mod_value = tmit->mod->value + value_size;
298 } 339 }
299 340
300 memcpy (data, name, name_size); 341 memcpy (data, tmit->mod->name, name_size);
301 ((char *)data)[name_size] = '\0'; 342 ((char *)data)[name_size] = '\0';
302 memcpy ((char *)data + name_size + 1, value, value_size); 343 memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size);
303 } 344 }
304 else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size) 345 else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
305 { /* Modifier continuation */ 346 { /* Modifier continuation */
@@ -333,48 +374,6 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
333} 374}
334 375
335 376
336static int
337tmit_notify_data (void *cls, uint16_t *data_size, void *data)
338{
339 struct TransmitClosure *tmit = cls;
340 if (0 == tmit->data_count)
341 {
342 *data_size = 0;
343 return GNUNET_YES;
344 }
345
346 uint16_t size = strlen (tmit->data[tmit->n]);
347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
348 "Transmit notify data: %u bytes available, "
349 "processing fragment %u/%u (size %u).\n",
350 *data_size, tmit->n + 1, tmit->data_count, size);
351 if (*data_size < size)
352 {
353 *data_size = 0;
354 GNUNET_assert (0);
355 return GNUNET_SYSERR;
356 }
357
358 if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
359 {
360 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
361 tmit->paused = GNUNET_YES;
362 GNUNET_SCHEDULER_add_delayed (
363 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
364 tmit->data_delay[tmit->n]),
365 &transmit_resume, tmit);
366 *data_size = 0;
367 return GNUNET_NO;
368 }
369 tmit->paused = GNUNET_NO;
370
371 *data_size = size;
372 memcpy (data, tmit->data[tmit->n], size);
373
374 return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES;
375}
376
377
378static void 377static void
379slave_join (); 378slave_join ();
380 379
@@ -388,7 +387,7 @@ join_decision_cb (void *cls, int is_admitted,
388 387
389 if (GNUNET_YES != is_admitted) 388 if (GNUNET_YES != is_admitted)
390 { /* First join request is refused, retry. */ 389 { /* First join request is refused, retry. */
391 //GNUNET_assert (1 == join_req_count); 390 GNUNET_assert (1 == join_req_count);
392 slave_join (); 391 slave_join ();
393 return; 392 return;
394 } 393 }
@@ -403,6 +402,7 @@ join_decision_cb (void *cls, int is_admitted,
403 "_abc", "abc def", 7); 402 "_abc", "abc def", 7);
404 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, 403 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
405 "_abc_def", "abc def ghi", 11); 404 "_abc_def", "abc def ghi", 11);
405 tmit->mod = GNUNET_ENV_environment_head (tmit->env);
406 tmit->n = 0; 406 tmit->n = 0;
407 tmit->data[0] = "slave test"; 407 tmit->data[0] = "slave test";
408 tmit->data_count = 1; 408 tmit->data_count = 1;
@@ -421,8 +421,8 @@ join_request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key
421 struct GNUNET_HashCode slave_key_hash; 421 struct GNUNET_HashCode slave_key_hash;
422 GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash); 422 GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
423 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 423 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
424 "Got join request from %s.\n", 424 "Got join request #%u from %s.\n",
425 GNUNET_h2s (&slave_key_hash)); 425 join_req_count, GNUNET_h2s (&slave_key_hash));
426 426
427 /* Reject first request */ 427 /* Reject first request */
428 int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO; 428 int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO;
@@ -493,6 +493,7 @@ master_transmit ()
493 name_cont, val_cont, 493 name_cont, val_cont,
494 GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size 494 GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size
495 + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD); 495 + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
496 tmit->mod = GNUNET_ENV_environment_head (tmit->env);
496 tmit->data[0] = "foo"; 497 tmit->data[0] = "foo";
497 tmit->data[1] = GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1); 498 tmit->data[1] = GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1);
498 for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++) 499 for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++)