aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2016-09-23 22:51:13 +0000
committerGabor X Toth <*@tg-x.net>2016-09-23 22:51:13 +0000
commit238594c75922318bbe5b8b8ee62920bc3fd06d95 (patch)
treebe7b3dd68341d60ce4cd5f0a18bb77d0e6e61bd4 /src/psyc
parentf02f4bc7218f729074337f3b8a3fe93ec46c757b (diff)
downloadgnunet-238594c75922318bbe5b8b8ee62920bc3fd06d95.tar.gz
gnunet-238594c75922318bbe5b8b8ee62920bc3fd06d95.zip
psyc: switch to SERVICE API
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c630
1 files changed, 367 insertions, 263 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 7739fe65b..66612a451 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -44,14 +44,14 @@
44static const struct GNUNET_CONFIGURATION_Handle *cfg; 44static const struct GNUNET_CONFIGURATION_Handle *cfg;
45 45
46/** 46/**
47 * Handle to the statistics service. 47 * Service handle.
48 */ 48 */
49static struct GNUNET_STATISTICS_Handle *stats; 49struct GNUNET_SERVICE_Handle *service;
50 50
51/** 51/**
52 * Notification context, simplifies client broadcasts. 52 * Handle to the statistics service.
53 */ 53 */
54static struct GNUNET_SERVER_NotificationContext *nc; 54static struct GNUNET_STATISTICS_Handle *stats;
55 55
56/** 56/**
57 * Handle to the PSYCstore. 57 * Handle to the PSYCstore.
@@ -85,7 +85,7 @@ struct TransmitMessage
85 struct TransmitMessage *prev; 85 struct TransmitMessage *prev;
86 struct TransmitMessage *next; 86 struct TransmitMessage *next;
87 87
88 struct GNUNET_SERVER_Client *client; 88 struct GNUNET_SERVICE_Client *client;
89 89
90 /** 90 /**
91 * ID assigned to the message. 91 * ID assigned to the message.
@@ -185,12 +185,12 @@ struct FragmentQueue
185/** 185/**
186 * List of connected clients. 186 * List of connected clients.
187 */ 187 */
188struct Client 188struct ClientList
189{ 189{
190 struct Client *prev; 190 struct ClientList *prev;
191 struct Client *next; 191 struct ClientList *next;
192 192
193 struct GNUNET_SERVER_Client *client; 193 struct GNUNET_SERVICE_Client *client;
194}; 194};
195 195
196 196
@@ -199,8 +199,8 @@ struct Operation
199 struct Operation *prev; 199 struct Operation *prev;
200 struct Operation *next; 200 struct Operation *next;
201 201
202 struct GNUNET_SERVER_Client *client; 202 struct GNUNET_SERVICE_Client *client;
203 struct Channel *chn; 203 struct Channel *channel;
204 uint64_t op_id; 204 uint64_t op_id;
205 uint32_t flags; 205 uint32_t flags;
206}; 206};
@@ -211,8 +211,8 @@ struct Operation
211 */ 211 */
212struct Channel 212struct Channel
213{ 213{
214 struct Client *clients_head; 214 struct ClientList *clients_head;
215 struct Client *clients_tail; 215 struct ClientList *clients_tail;
216 216
217 struct Operation *op_head; 217 struct Operation *op_head;
218 struct Operation *op_tail; 218 struct Operation *op_tail;
@@ -270,11 +270,6 @@ struct Channel
270 uint32_t tmit_mod_value_size; 270 uint32_t tmit_mod_value_size;
271 271
272 /** 272 /**
273 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
274 */
275 uint8_t is_master;
276
277 /**
278 * Is this channel ready to receive messages from client? 273 * Is this channel ready to receive messages from client?
279 * #GNUNET_YES or #GNUNET_NO 274 * #GNUNET_YES or #GNUNET_NO
280 */ 275 */
@@ -285,6 +280,16 @@ struct Channel
285 * #GNUNET_YES or #GNUNET_NO 280 * #GNUNET_YES or #GNUNET_NO
286 */ 281 */
287 uint8_t is_disconnected; 282 uint8_t is_disconnected;
283
284 /**
285 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
286 */
287 uint8_t is_master;
288
289 union {
290 struct Master *master;
291 struct Slave *slave;
292 };
288}; 293};
289 294
290 295
@@ -296,7 +301,7 @@ struct Master
296 /** 301 /**
297 * Channel struct common for Master and Slave 302 * Channel struct common for Master and Slave
298 */ 303 */
299 struct Channel chn; 304 struct Channel channel;
300 305
301 /** 306 /**
302 * Private key of the channel. 307 * Private key of the channel.
@@ -353,7 +358,7 @@ struct Slave
353 /** 358 /**
354 * Channel struct common for Master and Slave 359 * Channel struct common for Master and Slave
355 */ 360 */
356 struct Channel chn; 361 struct Channel channel;
357 362
358 /** 363 /**
359 * Private key of the slave. 364 * Private key of the slave.
@@ -417,6 +422,24 @@ struct Slave
417}; 422};
418 423
419 424
425/**
426 * Client context.
427 */
428struct Client {
429 struct GNUNET_SERVICE_Client *client;
430 struct Channel *channel;
431};
432
433
434struct ReplayRequestKey
435{
436 uint64_t fragment_id;
437 uint64_t message_id;
438 uint64_t fragment_offset;
439 uint64_t flags;
440};
441
442
420static void 443static void
421transmit_message (struct Channel *chn); 444transmit_message (struct Channel *chn);
422 445
@@ -444,11 +467,6 @@ schedule_transmit_message (void *cls)
444static void 467static void
445shutdown_task (void *cls) 468shutdown_task (void *cls)
446{ 469{
447 if (NULL != nc)
448 {
449 GNUNET_SERVER_notification_context_destroy (nc);
450 nc = NULL;
451 }
452 if (NULL != stats) 470 if (NULL != stats)
453 { 471 {
454 GNUNET_STATISTICS_destroy (stats, GNUNET_YES); 472 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
@@ -458,12 +476,12 @@ shutdown_task (void *cls)
458 476
459 477
460static struct Operation * 478static struct Operation *
461op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client, 479op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
462 uint64_t op_id, uint32_t flags) 480 uint64_t op_id, uint32_t flags)
463{ 481{
464 struct Operation *op = GNUNET_malloc (sizeof (*op)); 482 struct Operation *op = GNUNET_malloc (sizeof (*op));
465 op->client = client; 483 op->client = client;
466 op->chn = chn; 484 op->channel = chn;
467 op->op_id = op_id; 485 op->op_id = op_id;
468 op->flags = flags; 486 op->flags = flags;
469 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op); 487 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
@@ -474,7 +492,7 @@ op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
474static void 492static void
475op_remove (struct Operation *op) 493op_remove (struct Operation *op)
476{ 494{
477 GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op); 495 GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
478 GNUNET_free (op); 496 GNUNET_free (op);
479} 497}
480 498
@@ -485,7 +503,7 @@ op_remove (struct Operation *op)
485static void 503static void
486cleanup_master (struct Master *mst) 504cleanup_master (struct Master *mst)
487{ 505{
488 struct Channel *chn = &mst->chn; 506 struct Channel *chn = &mst->channel;
489 507
490 if (NULL != mst->origin) 508 if (NULL != mst->origin)
491 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME 509 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
@@ -500,7 +518,7 @@ cleanup_master (struct Master *mst)
500static void 518static void
501cleanup_slave (struct Slave *slv) 519cleanup_slave (struct Slave *slv)
502{ 520{
503 struct Channel *chn = &slv->chn; 521 struct Channel *chn = &slv->channel;
504 struct GNUNET_CONTAINER_MultiHashMap * 522 struct GNUNET_CONTAINER_MultiHashMap *
505 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, 523 chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
506 &chn->pub_key_hash); 524 &chn->pub_key_hash);
@@ -556,8 +574,8 @@ cleanup_channel (struct Channel *chn)
556 } 574 }
557 575
558 (GNUNET_YES == chn->is_master) 576 (GNUNET_YES == chn->is_master)
559 ? cleanup_master ((struct Master *) chn) 577 ? cleanup_master (chn->master)
560 : cleanup_slave ((struct Slave *) chn); 578 : cleanup_slave (chn->slave);
561 GNUNET_free (chn); 579 GNUNET_free (chn);
562} 580}
563 581
@@ -566,17 +584,17 @@ cleanup_channel (struct Channel *chn)
566 * Called whenever a client is disconnected. 584 * Called whenever a client is disconnected.
567 * Frees our resources associated with that client. 585 * Frees our resources associated with that client.
568 * 586 *
569 * @param cls Closure. 587 * @param cls closure
570 * @param client Identification of the client. 588 * @param client identification of the client
589 * @param app_ctx must match @a client
571 */ 590 */
572static void 591static void
573client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) 592client_notify_disconnect (void *cls,
593 struct GNUNET_SERVICE_Client *client,
594 void *app_ctx)
574{ 595{
575 if (NULL == client) 596 struct Client *c = app_ctx;
576 return; 597 struct Channel *chn = c->channel;
577
578 struct Channel *
579 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
580 598
581 if (NULL == chn) 599 if (NULL == chn)
582 { 600 {
@@ -593,7 +611,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
593 (GNUNET_YES == chn->is_master) ? "master" : "slave", 611 (GNUNET_YES == chn->is_master) ? "master" : "slave",
594 GNUNET_h2s (&chn->pub_key_hash)); 612 GNUNET_h2s (&chn->pub_key_hash));
595 613
596 struct Client *cli = chn->clients_head; 614 struct ClientList *cli = chn->clients_head;
597 while (NULL != cli) 615 while (NULL != cli)
598 { 616 {
599 if (cli->client == client) 617 if (cli->client == client)
@@ -637,6 +655,28 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
637 655
638 656
639/** 657/**
658 * A new client connected.
659 *
660 * @param cls NULL
661 * @param client client to add
662 * @param mq message queue for @a client
663 * @return @a client
664 */
665static void *
666client_notify_connect (void *cls,
667 struct GNUNET_SERVICE_Client *client,
668 struct GNUNET_MQ_Handle *mq)
669{
670 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
671
672 struct Client *c = GNUNET_malloc (sizeof (*c));
673 c->client = client;
674
675 return c;
676}
677
678
679/**
640 * Send message to all clients connected to the channel. 680 * Send message to all clients connected to the channel.
641 */ 681 */
642static void 682static void
@@ -647,11 +687,15 @@ client_send_msg (const struct Channel *chn,
647 "%p Sending message to clients.\n", 687 "%p Sending message to clients.\n",
648 chn); 688 chn);
649 689
650 struct Client *cli = chn->clients_head; 690 struct ClientList *cli = chn->clients_head;
651 while (NULL != cli) 691 while (NULL != cli)
652 { 692 {
653 GNUNET_SERVER_notification_context_add (nc, cli->client); 693 struct GNUNET_MQ_Envelope *
654 GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO); 694 env = GNUNET_MQ_msg_copy (msg);
695
696 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
697 env);
698
655 cli = cli->next; 699 cli = cli->next;
656 } 700 }
657} 701}
@@ -672,14 +716,14 @@ client_send_msg (const struct Channel *chn,
672 * Size of @a data. 716 * Size of @a data.
673 */ 717 */
674static void 718static void
675client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id, 719client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
676 int64_t result_code, const void *data, uint16_t data_size) 720 int64_t result_code, const void *data, uint16_t data_size)
677{ 721{
678 struct GNUNET_OperationResultMessage *res; 722 struct GNUNET_OperationResultMessage *res;
679 723 struct GNUNET_MQ_Envelope *
680 res = GNUNET_malloc (sizeof (*res) + data_size); 724 env = GNUNET_MQ_msg_extra (res,
681 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); 725 data_size,
682 res->header.size = htons (sizeof (*res) + data_size); 726 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
683 res->result_code = GNUNET_htonll (result_code); 727 res->result_code = GNUNET_htonll (result_code);
684 res->op_id = op_id; 728 res->op_id = op_id;
685 if (0 < data_size) 729 if (0 < data_size)
@@ -692,10 +736,7 @@ client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
692 result_code, 736 result_code,
693 data_size); 737 data_size);
694 738
695 GNUNET_SERVER_notification_context_add (nc, client); 739 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
696 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
697 GNUNET_NO);
698 GNUNET_free (res);
699} 740}
700 741
701 742
@@ -705,8 +746,8 @@ client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
705struct JoinMemTestClosure 746struct JoinMemTestClosure
706{ 747{
707 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; 748 struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
708 struct Channel *chn; 749 struct Channel *channel;
709 struct GNUNET_MULTICAST_JoinHandle *jh; 750 struct GNUNET_MULTICAST_JoinHandle *join_handle;
710 struct GNUNET_PSYC_JoinRequestMessage *join_msg; 751 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
711}; 752};
712 753
@@ -720,15 +761,15 @@ join_mem_test_cb (void *cls, int64_t result,
720{ 761{
721 struct JoinMemTestClosure *jcls = cls; 762 struct JoinMemTestClosure *jcls = cls;
722 763
723 if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master) 764 if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
724 { /* Pass on join request to client if this is a master channel */ 765 { /* Pass on join request to client if this is a master channel */
725 struct Master *mst = (struct Master *) jcls->chn; 766 struct Master *mst = jcls->channel->master;
726 struct GNUNET_HashCode slave_pub_hash; 767 struct GNUNET_HashCode slave_pub_hash;
727 GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key), 768 GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
728 &slave_pub_hash); 769 &slave_pub_hash);
729 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->jh, 770 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash, jcls->join_handle,
730 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 771 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
731 client_send_msg (jcls->chn, &jcls->join_msg->header); 772 client_send_msg (jcls->channel, &jcls->join_msg->header);
732 } 773 }
733 else 774 else
734 { 775 {
@@ -739,7 +780,7 @@ join_mem_test_cb (void *cls, int64_t result,
739 err_msg_size, err_msg); 780 err_msg_size, err_msg);
740 } 781 }
741 // FIXME: add relays 782 // FIXME: add relays
742 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL); 783 GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
743 } 784 }
744 GNUNET_free (jcls->join_msg); 785 GNUNET_free (jcls->join_msg);
745 GNUNET_free (jcls); 786 GNUNET_free (jcls);
@@ -786,8 +827,8 @@ mcast_recv_join_request (void *cls,
786 827
787 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls)); 828 struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
788 jcls->slave_pub_key = *slave_pub_key; 829 jcls->slave_pub_key = *slave_pub_key;
789 jcls->chn = chn; 830 jcls->channel = chn;
790 jcls->jh = jh; 831 jcls->join_handle = jh;
791 jcls->join_msg = req; 832 jcls->join_msg = req;
792 833
793 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key, 834 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
@@ -807,7 +848,7 @@ mcast_recv_join_decision (void *cls, int is_admitted,
807 const struct GNUNET_MessageHeader *join_resp) 848 const struct GNUNET_MessageHeader *join_resp)
808{ 849{
809 struct Slave *slv = cls; 850 struct Slave *slv = cls;
810 struct Channel *chn = &slv->chn; 851 struct Channel *chn = &slv->channel;
811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 852 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812 "%p Got join decision: %d\n", 853 "%p Got join decision: %d\n",
813 slv, 854 slv,
@@ -1033,7 +1074,7 @@ static void
1033client_send_mcast_req (struct Master *mst, 1074client_send_mcast_req (struct Master *mst,
1034 const struct GNUNET_MULTICAST_RequestHeader *req) 1075 const struct GNUNET_MULTICAST_RequestHeader *req)
1035{ 1076{
1036 struct Channel *chn = &mst->chn; 1077 struct Channel *chn = &mst->channel;
1037 1078
1038 struct GNUNET_PSYC_MessageHeader *pmsg; 1079 struct GNUNET_PSYC_MessageHeader *pmsg;
1039 uint16_t size = ntohs (req->header.size); 1080 uint16_t size = ntohs (req->header.size);
@@ -1090,7 +1131,7 @@ fragment_queue_insert (struct Channel *chn,
1090 1131
1091 if (NULL == fragq) 1132 if (NULL == fragq)
1092 { 1133 {
1093 fragq = GNUNET_new (struct FragmentQueue); 1134 fragq = GNUNET_malloc (sizeof (*fragq));
1094 fragq->state = MSG_FRAG_STATE_HEADER; 1135 fragq->state = MSG_FRAG_STATE_HEADER;
1095 fragq->fragments 1136 fragq->fragments
1096 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 1137 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
@@ -1122,7 +1163,7 @@ fragment_queue_insert (struct Channel *chn,
1122 chn, 1163 chn,
1123 fragq->header_size, 1164 fragq->header_size,
1124 size); 1165 size);
1125 cache_entry = GNUNET_new (struct RecvCacheEntry); 1166 cache_entry = GNUNET_malloc (sizeof (*cache_entry));
1126 cache_entry->ref_count = 1; 1167 cache_entry->ref_count = 1;
1127 cache_entry->mmsg = GNUNET_malloc (size); 1168 cache_entry->mmsg = GNUNET_malloc (size);
1128 GNUNET_memcpy (cache_entry->mmsg, mmsg, size); 1169 GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
@@ -1305,7 +1346,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1305 1346
1306struct StateModifyClosure 1347struct StateModifyClosure
1307{ 1348{
1308 struct Channel *chn; 1349 struct Channel *channel;
1309 uint64_t msg_id; 1350 uint64_t msg_id;
1310 struct GNUNET_HashCode msg_id_hash; 1351 struct GNUNET_HashCode msg_id_hash;
1311}; 1352};
@@ -1316,7 +1357,7 @@ store_recv_state_modify_result (void *cls, int64_t result,
1316 const char *err_msg, uint16_t err_msg_size) 1357 const char *err_msg, uint16_t err_msg_size)
1317{ 1358{
1318 struct StateModifyClosure *mcls = cls; 1359 struct StateModifyClosure *mcls = cls;
1319 struct Channel *chn = mcls->chn; 1360 struct Channel *chn = mcls->channel;
1320 uint64_t msg_id = mcls->msg_id; 1361 uint64_t msg_id = mcls->msg_id;
1321 1362
1322 struct FragmentQueue * 1363 struct FragmentQueue *
@@ -1434,7 +1475,7 @@ message_queue_run (struct Channel *chn)
1434 } 1475 }
1435 1476
1436 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls)); 1477 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1437 mcls->chn = chn; 1478 mcls->channel = chn;
1438 mcls->msg_id = msg_id; 1479 mcls->msg_id = msg_id;
1439 mcls->msg_id_hash = msg_id_hash; 1480 mcls->msg_id_hash = msg_id_hash;
1440 1481
@@ -1604,7 +1645,7 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1604 uint64_t max_state_message_id) 1645 uint64_t max_state_message_id)
1605{ 1646{
1606 struct Master *mst = cls; 1647 struct Master *mst = cls;
1607 struct Channel *chn = &mst->chn; 1648 struct Channel *chn = &mst->channel;
1608 chn->store_op = NULL; 1649 chn->store_op = NULL;
1609 1650
1610 struct GNUNET_PSYC_CountersResultMessage res; 1651 struct GNUNET_PSYC_CountersResultMessage res;
@@ -1649,7 +1690,7 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1649 uint64_t max_state_message_id) 1690 uint64_t max_state_message_id)
1650{ 1691{
1651 struct Slave *slv = cls; 1692 struct Slave *slv = cls;
1652 struct Channel *chn = &slv->chn; 1693 struct Channel *chn = &slv->channel;
1653 chn->store_op = NULL; 1694 chn->store_op = NULL;
1654 1695
1655 struct GNUNET_PSYC_CountersResultMessage res; 1696 struct GNUNET_PSYC_CountersResultMessage res;
@@ -1703,11 +1744,11 @@ channel_init (struct Channel *chn)
1703 * Handle a connecting client starting a channel master. 1744 * Handle a connecting client starting a channel master.
1704 */ 1745 */
1705static void 1746static void
1706client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client, 1747handle_client_master_start (void *cls,
1707 const struct GNUNET_MessageHeader *msg) 1748 const struct MasterStartRequest *req)
1708{ 1749{
1709 const struct MasterStartRequest *req 1750 struct Client *c = cls;
1710 = (const struct MasterStartRequest *) msg; 1751 struct GNUNET_SERVICE_Client *client = c->client;
1711 1752
1712 struct GNUNET_CRYPTO_EddsaPublicKey pub_key; 1753 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1713 struct GNUNET_HashCode pub_key_hash; 1754 struct GNUNET_HashCode pub_key_hash;
@@ -1721,12 +1762,13 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1721 1762
1722 if (NULL == mst) 1763 if (NULL == mst)
1723 { 1764 {
1724 mst = GNUNET_new (struct Master); 1765 mst = GNUNET_malloc (sizeof (*mst));
1725 mst->policy = ntohl (req->policy); 1766 mst->policy = ntohl (req->policy);
1726 mst->priv_key = req->channel_key; 1767 mst->priv_key = req->channel_key;
1727 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 1768 mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1728 1769
1729 chn = &mst->chn; 1770 chn = c->channel = &mst->channel;
1771 chn->master = mst;
1730 chn->is_master = GNUNET_YES; 1772 chn->is_master = GNUNET_YES;
1731 chn->pub_key = pub_key; 1773 chn->pub_key = pub_key;
1732 chn->pub_key_hash = pub_key_hash; 1774 chn->pub_key_hash = pub_key_hash;
@@ -1739,29 +1781,34 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1739 } 1781 }
1740 else 1782 else
1741 { 1783 {
1742 chn = &mst->chn; 1784 chn = &mst->channel;
1743 1785
1744 struct GNUNET_PSYC_CountersResultMessage res; 1786 struct GNUNET_PSYC_CountersResultMessage *res;
1745 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1787 struct GNUNET_MQ_Envelope *
1746 res.header.size = htons (sizeof (res)); 1788 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1747 res.result_code = htonl (GNUNET_OK); 1789 res->result_code = htonl (GNUNET_OK);
1748 res.max_message_id = GNUNET_htonll (mst->max_message_id); 1790 res->max_message_id = GNUNET_htonll (mst->max_message_id);
1749 1791
1750 GNUNET_SERVER_notification_context_add (nc, client); 1792 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1751 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1752 GNUNET_NO);
1753 } 1793 }
1754 1794
1755 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1795 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1756 "%p Client connected as master to channel %s.\n", 1796 "%p Client connected as master to channel %s.\n",
1757 mst, GNUNET_h2s (&chn->pub_key_hash)); 1797 mst, GNUNET_h2s (&chn->pub_key_hash));
1758 1798
1759 struct Client *cli = GNUNET_new (struct Client); 1799 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1760 cli->client = client; 1800 cli->client = client;
1761 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); 1801 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1762 1802
1763 GNUNET_SERVER_client_set_user_context (client, chn); 1803 GNUNET_SERVICE_client_continue (client);
1764 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1804}
1805
1806
1807static int
1808check_client_slave_join (void *cls,
1809 const struct SlaveJoinRequest *req)
1810{
1811 return GNUNET_OK;
1765} 1812}
1766 1813
1767 1814
@@ -1769,11 +1816,12 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1769 * Handle a connecting client joining as a channel slave. 1816 * Handle a connecting client joining as a channel slave.
1770 */ 1817 */
1771static void 1818static void
1772client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, 1819handle_client_slave_join (void *cls,
1773 const struct GNUNET_MessageHeader *msg) 1820 const struct SlaveJoinRequest *req)
1774{ 1821{
1775 const struct SlaveJoinRequest *req 1822 struct Client *c = cls;
1776 = (const struct SlaveJoinRequest *) msg; 1823 struct GNUNET_SERVICE_Client *client = c->client;
1824
1777 uint16_t req_size = ntohs (req->header.size); 1825 uint16_t req_size = ntohs (req->header.size);
1778 1826
1779 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key; 1827 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
@@ -1794,7 +1842,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1794 } 1842 }
1795 if (NULL == slv) 1843 if (NULL == slv)
1796 { 1844 {
1797 slv = GNUNET_new (struct Slave); 1845 slv = GNUNET_malloc (sizeof (*slv));
1798 slv->priv_key = req->slave_key; 1846 slv->priv_key = req->slave_key;
1799 slv->pub_key = slv_pub_key; 1847 slv->pub_key = slv_pub_key;
1800 slv->pub_key_hash = slv_pub_hash; 1848 slv->pub_key_hash = slv_pub_hash;
@@ -1825,7 +1873,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1825 join_msg_size, 1873 join_msg_size,
1826 req_size); 1874 req_size);
1827 GNUNET_break (0); 1875 GNUNET_break (0);
1828 GNUNET_SERVER_client_disconnect (client); 1876 GNUNET_SERVICE_client_drop (client);
1829 GNUNET_free (slv); 1877 GNUNET_free (slv);
1830 return; 1878 return;
1831 } 1879 }
@@ -1835,7 +1883,8 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1835 GNUNET_memcpy (slv->relays, &req[1], relay_size); 1883 GNUNET_memcpy (slv->relays, &req[1], relay_size);
1836 } 1884 }
1837 1885
1838 chn = &slv->chn; 1886 chn = c->channel = &slv->channel;
1887 chn->slave = slv;
1839 chn->is_master = GNUNET_NO; 1888 chn->is_master = GNUNET_NO;
1840 chn->pub_key = req->channel_pub_key; 1889 chn->pub_key = req->channel_pub_key;
1841 chn->pub_key_hash = pub_key_hash; 1890 chn->pub_key_hash = pub_key_hash;
@@ -1856,17 +1905,16 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1856 } 1905 }
1857 else 1906 else
1858 { 1907 {
1859 chn = &slv->chn; 1908 chn = &slv->channel;
1860 1909
1861 struct GNUNET_PSYC_CountersResultMessage res; 1910 struct GNUNET_PSYC_CountersResultMessage *res;
1862 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1863 res.header.size = htons (sizeof (res));
1864 res.result_code = htonl (GNUNET_OK);
1865 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1866 1911
1867 GNUNET_SERVER_notification_context_add (nc, client); 1912 struct GNUNET_MQ_Envelope *
1868 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header, 1913 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1869 GNUNET_NO); 1914 res->result_code = htonl (GNUNET_OK);
1915 res->max_message_id = GNUNET_htonll (chn->max_message_id);
1916
1917 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1870 1918
1871 if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags) 1919 if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
1872 { 1920 {
@@ -1893,10 +1941,9 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1893 } 1941 }
1894 else if (NULL != slv->join_dcsn) 1942 else if (NULL != slv->join_dcsn)
1895 { 1943 {
1896 GNUNET_SERVER_notification_context_add (nc, client); 1944 struct GNUNET_MQ_Envelope *
1897 GNUNET_SERVER_notification_context_unicast (nc, client, 1945 env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
1898 &slv->join_dcsn->header, 1946 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
1899 GNUNET_NO);
1900 } 1947 }
1901 } 1948 }
1902 1949
@@ -1904,12 +1951,11 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1904 "%p Client connected as slave to channel %s.\n", 1951 "%p Client connected as slave to channel %s.\n",
1905 slv, GNUNET_h2s (&chn->pub_key_hash)); 1952 slv, GNUNET_h2s (&chn->pub_key_hash));
1906 1953
1907 struct Client *cli = GNUNET_new (struct Client); 1954 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1908 cli->client = client; 1955 cli->client = client;
1909 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); 1956 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1910 1957
1911 GNUNET_SERVER_client_set_user_context (client, chn); 1958 GNUNET_SERVICE_client_continue (client);
1912 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1913} 1959}
1914 1960
1915 1961
@@ -1935,31 +1981,37 @@ mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1935} 1981}
1936 1982
1937 1983
1984static int
1985check_client_join_decision (void *cls,
1986 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1987{
1988 return GNUNET_OK;
1989}
1990
1991
1938/** 1992/**
1939 * Join decision from client. 1993 * Join decision from client.
1940 */ 1994 */
1941static void 1995static void
1942client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client, 1996handle_client_join_decision (void *cls,
1943 const struct GNUNET_MessageHeader *msg) 1997 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
1944{ 1998{
1945 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn 1999 struct Client *c = cls;
1946 = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg; 2000 struct GNUNET_SERVICE_Client *client = c->client;
1947 struct Channel *chn; 2001 struct Channel *chn = c->channel;
1948 struct Master *mst;
1949 struct JoinDecisionClosure jcls;
1950
1951 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
1952 if (NULL == chn) 2002 if (NULL == chn)
1953 { 2003 {
1954 GNUNET_break (0); 2004 GNUNET_break (0);
1955 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2005 GNUNET_SERVICE_client_drop (client);
1956 return; 2006 return;
1957 } 2007 }
1958 GNUNET_assert (GNUNET_YES == chn->is_master); 2008 GNUNET_assert (GNUNET_YES == chn->is_master);
1959 mst = (struct Master *) chn; 2009 struct Master *mst = chn->master;
2010
2011 struct JoinDecisionClosure jcls;
1960 jcls.is_admitted = ntohl (dcsn->is_admitted); 2012 jcls.is_admitted = ntohl (dcsn->is_admitted);
1961 jcls.msg 2013 jcls.msg
1962 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size)) 2014 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
1963 ? (struct GNUNET_MessageHeader *) &dcsn[1] 2015 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1964 : NULL; 2016 : NULL;
1965 2017
@@ -1977,7 +2029,7 @@ client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1977 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash, 2029 GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
1978 &mcast_send_join_decision, &jcls); 2030 &mcast_send_join_decision, &jcls);
1979 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash); 2031 GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
1980 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2032 GNUNET_SERVICE_client_continue (client);
1981} 2033}
1982 2034
1983 2035
@@ -1989,15 +2041,14 @@ client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1989 * @param chn The channel struct for the client. 2041 * @param chn The channel struct for the client.
1990 */ 2042 */
1991static void 2043static void
1992send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client) 2044send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
1993{ 2045{
1994 struct GNUNET_MessageHeader res; 2046 struct GNUNET_MessageHeader *res;
1995 res.size = htons (sizeof (res)); 2047 struct GNUNET_MQ_Envelope *
1996 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK); 2048 env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1997 2049
1998 /* FIXME */ 2050 /* FIXME? */
1999 GNUNET_SERVER_notification_context_add (nc, client); 2051 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
2000 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
2001} 2052}
2002 2053
2003 2054
@@ -2093,7 +2144,7 @@ slave_transmit_notify (void *cls, size_t *data_size, void *data)
2093static void 2144static void
2094master_transmit_message (struct Master *mst) 2145master_transmit_message (struct Master *mst)
2095{ 2146{
2096 struct Channel *chn = &mst->chn; 2147 struct Channel *chn = &mst->channel;
2097 struct TransmitMessage *tmit_msg = chn->tmit_head; 2148 struct TransmitMessage *tmit_msg = chn->tmit_head;
2098 if (NULL == tmit_msg) 2149 if (NULL == tmit_msg)
2099 return; 2150 return;
@@ -2120,13 +2171,13 @@ master_transmit_message (struct Master *mst)
2120static void 2171static void
2121slave_transmit_message (struct Slave *slv) 2172slave_transmit_message (struct Slave *slv)
2122{ 2173{
2123 if (NULL == slv->chn.tmit_head) 2174 if (NULL == slv->channel.tmit_head)
2124 return; 2175 return;
2125 if (NULL == slv->tmit_handle) 2176 if (NULL == slv->tmit_handle)
2126 { 2177 {
2127 slv->tmit_handle = (void *) &slv->tmit_handle; 2178 slv->tmit_handle = (void *) &slv->tmit_handle;
2128 struct GNUNET_MULTICAST_MemberTransmitHandle * 2179 struct GNUNET_MULTICAST_MemberTransmitHandle *
2129 tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id, 2180 tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, slv->channel.tmit_head->id,
2130 slave_transmit_notify, slv); 2181 slave_transmit_notify, slv);
2131 if (NULL != slv->tmit_handle) 2182 if (NULL != slv->tmit_handle)
2132 slv->tmit_handle = tmit_handle; 2183 slv->tmit_handle = tmit_handle;
@@ -2142,8 +2193,8 @@ static void
2142transmit_message (struct Channel *chn) 2193transmit_message (struct Channel *chn)
2143{ 2194{
2144 chn->is_master 2195 chn->is_master
2145 ? master_transmit_message ((struct Master *) chn) 2196 ? master_transmit_message (chn->master)
2146 : slave_transmit_message ((struct Slave *) chn); 2197 : slave_transmit_message (chn->slave);
2147} 2198}
2148 2199
2149 2200
@@ -2226,7 +2277,7 @@ slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
2226 */ 2277 */
2227static struct TransmitMessage * 2278static struct TransmitMessage *
2228queue_message (struct Channel *chn, 2279queue_message (struct Channel *chn,
2229 struct GNUNET_SERVER_Client *client, 2280 struct GNUNET_SERVICE_Client *client,
2230 size_t data_size, 2281 size_t data_size,
2231 const void *data, 2282 const void *data,
2232 uint16_t first_ptype, uint16_t last_ptype) 2283 uint16_t first_ptype, uint16_t last_ptype)
@@ -2244,8 +2295,8 @@ queue_message (struct Channel *chn,
2244 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg); 2295 GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
2245 2296
2246 chn->is_master 2297 chn->is_master
2247 ? master_queue_message ((struct Master *) chn, tmit_msg) 2298 ? master_queue_message (chn->master, tmit_msg)
2248 : slave_queue_message ((struct Slave *) chn, tmit_msg); 2299 : slave_queue_message (chn->slave, tmit_msg);
2249 return tmit_msg; 2300 return tmit_msg;
2250} 2301}
2251 2302
@@ -2257,7 +2308,7 @@ queue_message (struct Channel *chn,
2257 * @param client Client the message originates from. 2308 * @param client Client the message originates from.
2258 */ 2309 */
2259static void 2310static void
2260transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client) 2311transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
2261{ 2312{
2262 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; 2313 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
2263 2314
@@ -2272,16 +2323,30 @@ transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
2272} 2323}
2273 2324
2274 2325
2326static int
2327check_client_psyc_message (void *cls,
2328 const struct GNUNET_MessageHeader *msg)
2329{
2330 return GNUNET_OK;
2331}
2332
2333
2275/** 2334/**
2276 * Incoming message from a master or slave client. 2335 * Incoming message from a master or slave client.
2277 */ 2336 */
2278static void 2337static void
2279client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, 2338handle_client_psyc_message (void *cls,
2280 const struct GNUNET_MessageHeader *msg) 2339 const struct GNUNET_MessageHeader *msg)
2281{ 2340{
2282 struct Channel * 2341 struct Client *c = cls;
2283 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); 2342 struct GNUNET_SERVICE_Client *client = c->client;
2284 GNUNET_assert (NULL != chn); 2343 struct Channel *chn = c->channel;
2344 if (NULL == chn)
2345 {
2346 GNUNET_break (0);
2347 GNUNET_SERVICE_client_drop (client);
2348 return;
2349 }
2285 2350
2286 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2351 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2287 "%p Received message from client.\n", chn); 2352 "%p Received message from client.\n", chn);
@@ -2292,7 +2357,7 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2292 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 2357 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2293 "%p Channel is not ready yet, disconnecting client.\n", chn); 2358 "%p Channel is not ready yet, disconnecting client.\n", chn);
2294 GNUNET_break (0); 2359 GNUNET_break (0);
2295 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2360 GNUNET_SERVICE_client_drop (client);
2296 return; 2361 return;
2297 } 2362 }
2298 2363
@@ -2306,7 +2371,7 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2306 (unsigned int) (size - sizeof (*msg))); 2371 (unsigned int) (size - sizeof (*msg)));
2307 GNUNET_break (0); 2372 GNUNET_break (0);
2308 transmit_cancel (chn, client); 2373 transmit_cancel (chn, client);
2309 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2374 GNUNET_SERVICE_client_drop (client);
2310 return; 2375 return;
2311 } 2376 }
2312 2377
@@ -2320,7 +2385,7 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2320 "%p Received invalid message part from client.\n", chn); 2385 "%p Received invalid message part from client.\n", chn);
2321 GNUNET_break (0); 2386 GNUNET_break (0);
2322 transmit_cancel (chn, client); 2387 transmit_cancel (chn, client);
2323 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2388 GNUNET_SERVICE_client_drop (client);
2324 return; 2389 return;
2325 } 2390 }
2326 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2332,7 +2397,7 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2332 transmit_message (chn); 2397 transmit_message (chn);
2333 /* FIXME: send a few ACKs even before transmit_notify is called */ 2398 /* FIXME: send a few ACKs even before transmit_notify is called */
2334 2399
2335 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2400 GNUNET_SERVICE_client_continue (client);
2336}; 2401};
2337 2402
2338 2403
@@ -2348,7 +2413,7 @@ store_recv_membership_store_result (void *cls,
2348 struct Operation *op = cls; 2413 struct Operation *op = cls;
2349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2414 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2350 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n", 2415 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.*s)\n",
2351 op->chn, 2416 op->channel,
2352 result, 2417 result,
2353 (int) err_msg_size, 2418 (int) err_msg_size,
2354 err_msg); 2419 err_msg);
@@ -2363,15 +2428,18 @@ store_recv_membership_store_result (void *cls,
2363 * Client requests to add/remove a slave in the membership database. 2428 * Client requests to add/remove a slave in the membership database.
2364 */ 2429 */
2365static void 2430static void
2366client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, 2431handle_client_membership_store (void *cls,
2367 const struct GNUNET_MessageHeader *msg) 2432 const struct ChannelMembershipStoreRequest *req)
2368{ 2433{
2369 struct Channel * 2434 struct Client *c = cls;
2370 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); 2435 struct GNUNET_SERVICE_Client *client = c->client;
2371 GNUNET_assert (NULL != chn); 2436 struct Channel *chn = c->channel;
2372 2437 if (NULL == chn)
2373 const struct ChannelMembershipStoreRequest * 2438 {
2374 req = (const struct ChannelMembershipStoreRequest *) msg; 2439 GNUNET_break (0);
2440 GNUNET_SERVICE_client_drop (client);
2441 return;
2442 }
2375 2443
2376 struct Operation *op = op_add (chn, client, req->op_id, 0); 2444 struct Operation *op = op_add (chn, client, req->op_id, 0);
2377 2445
@@ -2387,7 +2455,7 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2387 req->did_join, announced_at, effective_since, 2455 req->did_join, announced_at, effective_since,
2388 0, /* FIXME: group_generation */ 2456 0, /* FIXME: group_generation */
2389 &store_recv_membership_store_result, op); 2457 &store_recv_membership_store_result, op);
2390 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2458 GNUNET_SERVICE_client_continue (client);
2391} 2459}
2392 2460
2393 2461
@@ -2405,7 +2473,7 @@ store_recv_fragment_history (void *cls,
2405 { /* Requesting client already disconnected. */ 2473 { /* Requesting client already disconnected. */
2406 return GNUNET_NO; 2474 return GNUNET_NO;
2407 } 2475 }
2408 struct Channel *chn = op->chn; 2476 struct Channel *chn = op->channel;
2409 2477
2410 struct GNUNET_PSYC_MessageHeader *pmsg; 2478 struct GNUNET_PSYC_MessageHeader *pmsg;
2411 uint16_t msize = ntohs (mmsg->header.size); 2479 uint16_t msize = ntohs (mmsg->header.size);
@@ -2447,7 +2515,7 @@ store_recv_fragment_history_result (void *cls, int64_t result,
2447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2515 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2448 "%p History replay #%" PRIu64 ": " 2516 "%p History replay #%" PRIu64 ": "
2449 "PSYCSTORE returned %" PRId64 " (%.*s)\n", 2517 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2450 op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); 2518 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2451 2519
2452 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE) 2520 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2453 { 2521 {
@@ -2459,20 +2527,32 @@ store_recv_fragment_history_result (void *cls, int64_t result,
2459} 2527}
2460 2528
2461 2529
2530static int
2531check_client_history_replay (void *cls,
2532 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2533{
2534 return GNUNET_OK;
2535}
2536
2537
2462/** 2538/**
2463 * Client requests channel history. 2539 * Client requests channel history.
2464 */ 2540 */
2465static void 2541static void
2466client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, 2542handle_client_history_replay (void *cls,
2467 const struct GNUNET_MessageHeader *msg) 2543 const struct GNUNET_PSYC_HistoryRequestMessage *req)
2468{ 2544{
2469 struct Channel * 2545 struct Client *c = cls;
2470 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); 2546 struct GNUNET_SERVICE_Client *client = c->client;
2471 GNUNET_assert (NULL != chn); 2547 struct Channel *chn = c->channel;
2548 if (NULL == chn)
2549 {
2550 GNUNET_break (0);
2551 GNUNET_SERVICE_client_drop (client);
2552 return;
2553 }
2472 2554
2473 const struct GNUNET_PSYC_HistoryRequestMessage * 2555 uint16_t size = ntohs (req->header.size);
2474 req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2475 uint16_t size = ntohs (msg->size);
2476 const char *method_prefix = (const char *) &req[1]; 2556 const char *method_prefix = (const char *) &req[1];
2477 2557
2478 if (size < sizeof (*req) + 1 2558 if (size < sizeof (*req) + 1
@@ -2486,28 +2566,31 @@ client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2486 size, 2566 size,
2487 (unsigned int) sizeof (*req) + 1); 2567 (unsigned int) sizeof (*req) + 1);
2488 GNUNET_break (0); 2568 GNUNET_break (0);
2489 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2569 GNUNET_SERVICE_client_drop (client);
2490 return; 2570 return;
2491 } 2571 }
2492 2572
2493 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags)); 2573 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2494 2574
2495 if (0 == req->message_limit) 2575 if (0 == req->message_limit)
2576 {
2496 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL, 2577 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2497 GNUNET_ntohll (req->start_message_id), 2578 GNUNET_ntohll (req->start_message_id),
2498 GNUNET_ntohll (req->end_message_id), 2579 GNUNET_ntohll (req->end_message_id),
2499 0, method_prefix, 2580 0, method_prefix,
2500 &store_recv_fragment_history, 2581 &store_recv_fragment_history,
2501 &store_recv_fragment_history_result, op); 2582 &store_recv_fragment_history_result, op);
2583 }
2502 else 2584 else
2585 {
2503 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL, 2586 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2504 GNUNET_ntohll (req->message_limit), 2587 GNUNET_ntohll (req->message_limit),
2505 method_prefix, 2588 method_prefix,
2506 &store_recv_fragment_history, 2589 &store_recv_fragment_history,
2507 &store_recv_fragment_history_result, 2590 &store_recv_fragment_history_result,
2508 op); 2591 op);
2509 2592 }
2510 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2593 GNUNET_SERVICE_client_continue (client);
2511} 2594}
2512 2595
2513 2596
@@ -2520,18 +2603,19 @@ store_recv_state_var (void *cls, const char *name,
2520{ 2603{
2521 struct Operation *op = cls; 2604 struct Operation *op = cls;
2522 struct GNUNET_OperationResultMessage *res; 2605 struct GNUNET_OperationResultMessage *res;
2606 struct GNUNET_MQ_Envelope *env;
2523 2607
2524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2608 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2525 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n", 2609 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2526 op->chn, GNUNET_ntohll (op->op_id), name); 2610 op->channel, GNUNET_ntohll (op->op_id), name);
2527 2611
2528 if (NULL != name) /* First part */ 2612 if (NULL != name) /* First part */
2529 { 2613 {
2530 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; 2614 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2531 struct GNUNET_PSYC_MessageModifier *mod; 2615 struct GNUNET_PSYC_MessageModifier *mod;
2532 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size); 2616 env = GNUNET_MQ_msg_extra (res,
2533 res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size); 2617 sizeof (*mod) + name_size + value_size,
2534 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); 2618 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2535 res->op_id = op->op_id; 2619 res->op_id = op->op_id;
2536 2620
2537 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1]; 2621 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
@@ -2546,9 +2630,9 @@ store_recv_state_var (void *cls, const char *name,
2546 else /* Continuation */ 2630 else /* Continuation */
2547 { 2631 {
2548 struct GNUNET_MessageHeader *mod; 2632 struct GNUNET_MessageHeader *mod;
2549 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size); 2633 env = GNUNET_MQ_msg_extra (res,
2550 res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size); 2634 sizeof (*mod) + value_size,
2551 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); 2635 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2552 res->op_id = op->op_id; 2636 res->op_id = op->op_id;
2553 2637
2554 mod = (struct GNUNET_MessageHeader *) &res[1]; 2638 mod = (struct GNUNET_MessageHeader *) &res[1];
@@ -2558,10 +2642,7 @@ store_recv_state_var (void *cls, const char *name,
2558 } 2642 }
2559 2643
2560 // FIXME: client might have been disconnected 2644 // FIXME: client might have been disconnected
2561 GNUNET_SERVER_notification_context_add (nc, op->client); 2645 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
2562 GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2563 GNUNET_NO);
2564 GNUNET_free (res);
2565 return GNUNET_YES; 2646 return GNUNET_YES;
2566} 2647}
2567 2648
@@ -2578,7 +2659,7 @@ store_recv_state_result (void *cls, int64_t result,
2578 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2659 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2579 "%p state_get #%" PRIu64 ": " 2660 "%p state_get #%" PRIu64 ": "
2580 "PSYCSTORE returned %" PRId64 " (%.*s)\n", 2661 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2581 op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); 2662 op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2582 2663
2583 // FIXME: client might have been disconnected 2664 // FIXME: client might have been disconnected
2584 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); 2665 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
@@ -2586,97 +2667,94 @@ store_recv_state_result (void *cls, int64_t result,
2586} 2667}
2587 2668
2588 2669
2589/** 2670static int
2590 * Client requests best matching state variable from PSYCstore. 2671check_client_state_get (void *cls,
2591 */ 2672 const struct StateRequest *req)
2592static void
2593client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2594 const struct GNUNET_MessageHeader *msg)
2595{ 2673{
2596 struct Channel * 2674 struct Client *c = cls;
2597 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); 2675 struct Channel *chn = c->channel;
2598 GNUNET_assert (NULL != chn); 2676 if (NULL == chn)
2599 2677 {
2600 const struct StateRequest * 2678 GNUNET_break (0);
2601 req = (const struct StateRequest *) msg; 2679 return GNUNET_SYSERR;
2680 }
2602 2681
2603 uint16_t name_size = ntohs (req->header.size) - sizeof (*req); 2682 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2604 const char *name = (const char *) &req[1]; 2683 const char *name = (const char *) &req[1];
2605 if (0 == name_size || '\0' != name[name_size - 1]) 2684 if (0 == name_size || '\0' != name[name_size - 1])
2606 { 2685 {
2607 GNUNET_break (0); 2686 GNUNET_break (0);
2608 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2687 return GNUNET_SYSERR;
2609 return;
2610 } 2688 }
2611 2689
2612 struct Operation *op = op_add (chn, client, req->op_id, 0); 2690 return GNUNET_OK;
2613 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2614 &store_recv_state_var,
2615 &store_recv_state_result, op);
2616 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2617} 2691}
2618 2692
2619 2693
2620/** 2694/**
2621 * Client requests state variables with a given prefix from PSYCstore. 2695 * Client requests best matching state variable from PSYCstore.
2622 */ 2696 */
2623static void 2697static void
2624client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, 2698handle_client_state_get (void *cls,
2625 const struct GNUNET_MessageHeader *msg) 2699 const struct StateRequest *req)
2626{ 2700{
2627 struct Channel * 2701 struct Client *c = cls;
2628 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); 2702 struct GNUNET_SERVICE_Client *client = c->client;
2629 GNUNET_assert (NULL != chn); 2703 struct Channel *chn = c->channel;
2704
2705 const char *name = (const char *) &req[1];
2706 struct Operation *op = op_add (chn, client, req->op_id, 0);
2707 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2708 &store_recv_state_var,
2709 &store_recv_state_result, op);
2710 GNUNET_SERVICE_client_continue (client);
2711}
2630 2712
2631 const struct StateRequest * 2713
2632 req = (const struct StateRequest *) msg; 2714static int
2715check_client_state_get_prefix (void *cls,
2716 const struct StateRequest *req)
2717{
2718 struct Client *c = cls;
2719 struct Channel *chn = c->channel;
2720 if (NULL == chn)
2721 {
2722 GNUNET_break (0);
2723 return GNUNET_SYSERR;
2724 }
2633 2725
2634 uint16_t name_size = ntohs (req->header.size) - sizeof (*req); 2726 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2635 const char *name = (const char *) &req[1]; 2727 const char *name = (const char *) &req[1];
2636 if (0 == name_size || '\0' != name[name_size - 1]) 2728 if (0 == name_size || '\0' != name[name_size - 1])
2637 { 2729 {
2638 GNUNET_break (0); 2730 GNUNET_break (0);
2639 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2731 return GNUNET_SYSERR;
2640 return;
2641 } 2732 }
2642 2733
2734 return GNUNET_OK;
2735}
2736
2737
2738/**
2739 * Client requests state variables with a given prefix from PSYCstore.
2740 */
2741static void
2742handle_client_state_get_prefix (void *cls,
2743 const struct StateRequest *req)
2744{
2745 struct Client *c = cls;
2746 struct GNUNET_SERVICE_Client *client = c->client;
2747 struct Channel *chn = c->channel;
2748
2749 const char *name = (const char *) &req[1];
2643 struct Operation *op = op_add (chn, client, req->op_id, 0); 2750 struct Operation *op = op_add (chn, client, req->op_id, 0);
2644 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name, 2751 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2645 &store_recv_state_var, 2752 &store_recv_state_var,
2646 &store_recv_state_result, op); 2753 &store_recv_state_result, op);
2647 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2754 GNUNET_SERVICE_client_continue (client);
2648} 2755}
2649 2756
2650 2757
2651static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2652 { &client_recv_master_start, NULL,
2653 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
2654
2655 { &client_recv_slave_join, NULL,
2656 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
2657
2658 { &client_recv_join_decision, NULL,
2659 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
2660
2661 { &client_recv_psyc_message, NULL,
2662 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
2663
2664 { &client_recv_membership_store, NULL,
2665 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2666
2667 { &client_recv_history_replay, NULL,
2668 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2669
2670 { &client_recv_state_get, NULL,
2671 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
2672
2673 { &client_recv_state_get_prefix, NULL,
2674 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
2675
2676 { NULL, NULL, 0, 0 }
2677};
2678
2679
2680/** 2758/**
2681 * Initialize the PSYC service. 2759 * Initialize the PSYC service.
2682 * 2760 *
@@ -2685,37 +2763,63 @@ static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2685 * @param c Configuration to use. 2763 * @param c Configuration to use.
2686 */ 2764 */
2687static void 2765static void
2688run (void *cls, struct GNUNET_SERVER_Handle *server, 2766run (void *cls,
2689 const struct GNUNET_CONFIGURATION_Handle *c) 2767 const struct GNUNET_CONFIGURATION_Handle *c,
2768 struct GNUNET_SERVICE_Handle *svc)
2690{ 2769{
2691 cfg = c; 2770 cfg = c;
2771 service = svc;
2692 store = GNUNET_PSYCSTORE_connect (cfg); 2772 store = GNUNET_PSYCSTORE_connect (cfg);
2693 stats = GNUNET_STATISTICS_create ("psyc", cfg); 2773 stats = GNUNET_STATISTICS_create ("psyc", cfg);
2694 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 2774 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2695 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 2775 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2696 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 2776 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2697 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 2777 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2698 nc = GNUNET_SERVER_notification_context_create (server, 1);
2699 GNUNET_SERVER_add_handlers (server, server_handlers);
2700 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
2701 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); 2778 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2702} 2779}
2703 2780
2704 2781
2705/** 2782/**
2706 * The main function for the service. 2783 * Define "main" method using service macro.
2707 * 2784 */
2708 * @param argc number of arguments from the command line 2785GNUNET_SERVICE_MAIN
2709 * @param argv command line arguments 2786("psyc",
2710 * @return 0 ok, 1 on error 2787 GNUNET_SERVICE_OPTION_NONE,
2711 */ 2788 run,
2712int 2789 client_notify_connect,
2713main (int argc, char *const *argv) 2790 client_notify_disconnect,
2714{ 2791 NULL,
2715 return (GNUNET_OK == 2792 GNUNET_MQ_hd_fixed_size (client_master_start,
2716 GNUNET_SERVICE_run (argc, argv, "psyc", 2793 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
2717 GNUNET_SERVICE_OPTION_NONE, 2794 struct MasterStartRequest,
2718 &run, NULL)) ? 0 : 1; 2795 NULL),
2719} 2796 GNUNET_MQ_hd_var_size (client_slave_join,
2797 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
2798 struct SlaveJoinRequest,
2799 NULL),
2800 GNUNET_MQ_hd_var_size (client_join_decision,
2801 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2802 struct GNUNET_PSYC_JoinDecisionMessage,
2803 NULL),
2804 GNUNET_MQ_hd_var_size (client_psyc_message,
2805 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2806 struct GNUNET_MessageHeader,
2807 NULL),
2808 GNUNET_MQ_hd_fixed_size (client_membership_store,
2809 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
2810 struct ChannelMembershipStoreRequest,
2811 NULL),
2812 GNUNET_MQ_hd_var_size (client_history_replay,
2813 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
2814 struct GNUNET_PSYC_HistoryRequestMessage,
2815 NULL),
2816 GNUNET_MQ_hd_var_size (client_state_get,
2817 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
2818 struct StateRequest,
2819 NULL),
2820 GNUNET_MQ_hd_var_size (client_state_get_prefix,
2821 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2822 struct StateRequest,
2823 NULL));
2720 2824
2721/* end of gnunet-service-psyc.c */ 2825/* end of gnunet-service-psyc.c */