aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-05-07 12:15:58 +0000
committerGabor X Toth <*@tg-x.net>2015-05-07 12:15:58 +0000
commit4725d59b468f1f30ba2910992333ca157682ce29 (patch)
tree23715ee20879c94a3363e28ea184370a4a71e44d /src/psyc
parenta5edf8ac9f03a368c87ea6163994d4ac3d62af06 (diff)
downloadgnunet-4725d59b468f1f30ba2910992333ca157682ce29.tar.gz
gnunet-4725d59b468f1f30ba2910992333ca157682ce29.zip
psyc/social: request history & state from psycstore; more documentation, tests, cleanup
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c355
-rw-r--r--src/psyc/psyc.h36
-rw-r--r--src/psyc/psyc_api.c532
-rw-r--r--src/psyc/psyc_util_lib.c38
-rw-r--r--src/psyc/test_psyc.c352
5 files changed, 803 insertions, 510 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 5ebbe6444..2bc128c4f 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -181,11 +181,24 @@ struct FragmentQueue
181/** 181/**
182 * List of connected clients. 182 * List of connected clients.
183 */ 183 */
184struct ClientListItem 184struct Client
185{ 185{
186 struct ClientListItem *prev; 186 struct Client *prev;
187 struct ClientListItem *next; 187 struct Client *next;
188
189 struct GNUNET_SERVER_Client *client;
190};
191
192
193struct Operation
194{
195 struct Operation *prev;
196 struct Operation *next;
197
188 struct GNUNET_SERVER_Client *client; 198 struct GNUNET_SERVER_Client *client;
199 struct Channel *chn;
200 uint64_t op_id;
201 uint32_t flags;
189}; 202};
190 203
191 204
@@ -194,8 +207,11 @@ struct ClientListItem
194 */ 207 */
195struct Channel 208struct Channel
196{ 209{
197 struct ClientListItem *clients_head; 210 struct Client *clients_head;
198 struct ClientListItem *clients_tail; 211 struct Client *clients_tail;
212
213 struct Operation *op_head;
214 struct Operation *op_tail;
199 215
200 struct TransmitMessage *tmit_head; 216 struct TransmitMessage *tmit_head;
201 struct TransmitMessage *tmit_tail; 217 struct TransmitMessage *tmit_tail;
@@ -397,14 +413,6 @@ struct Slave
397}; 413};
398 414
399 415
400struct OperationClosure
401{
402 struct GNUNET_SERVER_Client *client;
403 struct Channel *chn;
404 uint64_t op_id;
405};
406
407
408static void 416static void
409transmit_message (struct Channel *chn); 417transmit_message (struct Channel *chn);
410 418
@@ -435,6 +443,28 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
435} 443}
436 444
437 445
446static struct Operation *
447op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
448 uint64_t op_id, uint32_t flags)
449{
450 struct Operation *op = GNUNET_malloc (sizeof (*op));
451 op->client = client;
452 op->chn = chn;
453 op->op_id = op_id;
454 op->flags = flags;
455 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
456 return op;
457}
458
459
460static void
461op_remove (struct Channel *chn, struct Operation *op)
462{
463 GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op);
464 GNUNET_free (op);
465}
466
467
438/** 468/**
439 * Clean up master data structures after a client disconnected. 469 * Clean up master data structures after a client disconnected.
440 */ 470 */
@@ -541,7 +571,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
541 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", 571 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
542 GNUNET_h2s (&chn->pub_key_hash)); 572 GNUNET_h2s (&chn->pub_key_hash));
543 573
544 struct ClientListItem *cli = chn->clients_head; 574 struct Client *cli = chn->clients_head;
545 while (NULL != cli) 575 while (NULL != cli)
546 { 576 {
547 if (cli->client == client) 577 if (cli->client == client)
@@ -553,6 +583,17 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
553 cli = cli->next; 583 cli = cli->next;
554 } 584 }
555 585
586 struct Operation *op = chn->op_head;
587 while (NULL != op)
588 {
589 if (op->client == client)
590 {
591 op->client = NULL;
592 break;
593 }
594 op = op->next;
595 }
596
556 if (NULL == chn->clients_head) 597 if (NULL == chn->clients_head)
557 { /* Last client disconnected. */ 598 { /* Last client disconnected. */
558 if (NULL != chn->tmit_head) 599 if (NULL != chn->tmit_head)
@@ -574,10 +615,10 @@ static void
574client_send_msg (const struct Channel *chn, 615client_send_msg (const struct Channel *chn,
575 const struct GNUNET_MessageHeader *msg) 616 const struct GNUNET_MessageHeader *msg)
576{ 617{
577 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
578 "%p Sending message to clients.\n", chn); 619 "%p Sending message to clients.\n", chn);
579 620
580 struct ClientListItem *cli = chn->clients_head; 621 struct Client *cli = chn->clients_head;
581 while (NULL != cli) 622 while (NULL != cli)
582 { 623 {
583 GNUNET_SERVER_notification_context_add (nc, cli->client); 624 GNUNET_SERVER_notification_context_add (nc, cli->client);
@@ -596,33 +637,29 @@ client_send_msg (const struct Channel *chn,
596 * Code to transmit. 637 * Code to transmit.
597 * @param op_id 638 * @param op_id
598 * Operation ID in network byte order. 639 * Operation ID in network byte order.
599 * @param err_msg 640 * @param data
600 * Error message to include (or NULL for none). 641 * Data payload or NULL.
642 * @param data_size
643 * Size of @a data.
601 */ 644 */
602static void 645static void
603client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id, 646client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
604 int64_t result_code, const char *err_msg) 647 int64_t result_code, const void *data, uint16_t data_size)
605{ 648{
606 struct OperationResult *res; 649 struct GNUNET_OperationResultMessage *res;
607 size_t err_size = 0;
608 650
609 if (NULL != err_msg) 651 res = GNUNET_malloc (sizeof (*res) + data_size);
610 err_size = strnlen (err_msg,
611 GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res)) + 1;
612 res = GNUNET_malloc (sizeof (struct OperationResult) + err_size);
613 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); 652 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
614 res->header.size = htons (sizeof (struct OperationResult) + err_size); 653 res->header.size = htons (sizeof (*res) + data_size);
615 res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1); 654 res->result_code = GNUNET_htonll_signed (result_code);
616 res->op_id = op_id; 655 res->op_id = op_id;
617 if (0 < err_size) 656 if (0 < data_size)
618 { 657 memcpy (&res[1], data, data_size);
619 memcpy (&res[1], err_msg, err_size); 658
620 ((char *) &res[1])[err_size - 1] = '\0';
621 }
622 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 659 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
623 "%p Sending result to client for operation #%" PRIu64 ": " 660 "%p Sending result to client for operation #%" PRIu64 ": "
624 "%" PRId64 " (%s)\n", 661 "%" PRId64 " (size: %u)\n",
625 client, GNUNET_ntohll (op_id), result_code, err_msg); 662 client, GNUNET_ntohll (op_id), result_code, data_size);
626 663
627 GNUNET_SERVER_notification_context_add (nc, client); 664 GNUNET_SERVER_notification_context_add (nc, client);
628 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header, 665 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
@@ -647,7 +684,8 @@ struct JoinMemTestClosure
647 * Membership test result callback used for join requests. 684 * Membership test result callback used for join requests.
648 */ 685 */
649static void 686static void
650join_mem_test_cb (void *cls, int64_t result, const char *err_msg) 687join_mem_test_cb (void *cls, int64_t result,
688 const char *err_msg, uint16_t err_msg_size)
651{ 689{
652 struct JoinMemTestClosure *jcls = cls; 690 struct JoinMemTestClosure *jcls = cls;
653 691
@@ -663,6 +701,12 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
663 } 701 }
664 else 702 else
665 { 703 {
704 if (GNUNET_SYSERR == result)
705 {
706 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
707 "Could not perform membership test (%.*s)\n",
708 err_msg_size, err_msg);
709 }
666 // FIXME: add relays 710 // FIXME: add relays
667 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL); 711 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
668 } 712 }
@@ -759,12 +803,13 @@ mcast_recv_join_decision (void *cls, int is_admitted,
759 * Received result of GNUNET_PSYCSTORE_membership_test() 803 * Received result of GNUNET_PSYCSTORE_membership_test()
760 */ 804 */
761static void 805static void
762store_recv_membership_test_result (void *cls, int64_t result, const char *err_msg) 806store_recv_membership_test_result (void *cls, int64_t result,
807 const char *err_msg, uint16_t err_msg_size)
763{ 808{
764 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls; 809 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
765 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 810 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766 "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%s)\n", 811 "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%.*s)\n",
767 mth, result, err_msg); 812 mth, result, err_msg_size, err_msg);
768 813
769 GNUNET_MULTICAST_membership_test_result (mth, result); 814 GNUNET_MULTICAST_membership_test_result (mth, result);
770} 815}
@@ -805,12 +850,13 @@ store_recv_fragment_replay (void *cls,
805 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. 850 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
806 */ 851 */
807static void 852static void
808store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg) 853store_recv_fragment_replay_result (void *cls, int64_t result,
854 const char *err_msg, uint16_t err_msg_size)
809{ 855{
810 struct GNUNET_MULTICAST_ReplayHandle *rh = cls; 856 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 857 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n", 858 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
813 rh, result, err_msg); 859 rh, result, err_msg_size, err_msg);
814 860
815 switch (result) 861 switch (result)
816 { 862 {
@@ -867,7 +913,7 @@ mcast_recv_replay_message (void *cls,
867{ 913{
868 struct Channel *chn = cls; 914 struct Channel *chn = cls;
869 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, 915 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
870 message_id, message_id, 916 message_id, message_id, NULL,
871 &store_recv_fragment_replay, 917 &store_recv_fragment_replay,
872 &store_recv_fragment_replay_result, rh); 918 &store_recv_fragment_replay_result, rh);
873} 919}
@@ -911,6 +957,42 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
911 957
912 958
913/** 959/**
960 * Initialize PSYC message header.
961 */
962static inline void
963psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
964 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
965{
966 uint16_t size = ntohs (mmsg->header.size);
967 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
968
969 pmsg->header.size = htons (psize);
970 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
971 pmsg->message_id = mmsg->message_id;
972 pmsg->fragment_offset = mmsg->fragment_offset;
973 pmsg->flags = htonl (flags);
974
975 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
976}
977
978
979/**
980 * Create a new PSYC message from a multicast message for sending it to clients.
981 */
982static inline struct GNUNET_PSYC_MessageHeader *
983psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
984{
985 struct GNUNET_PSYC_MessageHeader *pmsg;
986 uint16_t size = ntohs (mmsg->header.size);
987 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
988
989 pmsg = GNUNET_malloc (psize);
990 psyc_msg_init (pmsg, mmsg, flags);
991 return pmsg;
992}
993
994
995/**
914 * Send multicast message to all clients connected to the channel. 996 * Send multicast message to all clients connected to the channel.
915 */ 997 */
916static void 998static void
@@ -918,24 +1000,13 @@ client_send_mcast_msg (struct Channel *chn,
918 const struct GNUNET_MULTICAST_MessageHeader *mmsg, 1000 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
919 uint32_t flags) 1001 uint32_t flags)
920{ 1002{
921 struct GNUNET_PSYC_MessageHeader *pmsg;
922 uint16_t size = ntohs (mmsg->header.size);
923 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
924
925 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1003 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926 "%p Sending multicast message to client. " 1004 "%p Sending multicast message to client. "
927 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", 1005 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
928 chn, GNUNET_ntohll (mmsg->fragment_id), 1006 chn, GNUNET_ntohll (mmsg->fragment_id),
929 GNUNET_ntohll (mmsg->message_id)); 1007 GNUNET_ntohll (mmsg->message_id));
930 1008
931 pmsg = GNUNET_malloc (psize); 1009 struct GNUNET_PSYC_MessageHeader *pmsg = psyc_msg_new (mmsg, flags);
932 pmsg->header.size = htons (psize);
933 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
934 pmsg->message_id = mmsg->message_id;
935 pmsg->fragment_offset = mmsg->fragment_offset;
936 pmsg->flags = htonl (flags);
937
938 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
939 client_send_msg (chn, &pmsg->header); 1010 client_send_msg (chn, &pmsg->header);
940 GNUNET_free (pmsg); 1011 GNUNET_free (pmsg);
941} 1012}
@@ -1327,12 +1398,13 @@ message_queue_drop (struct Channel *chn)
1327 * Received result of GNUNET_PSYCSTORE_fragment_store(). 1398 * Received result of GNUNET_PSYCSTORE_fragment_store().
1328 */ 1399 */
1329static void 1400static void
1330store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg) 1401store_recv_fragment_store_result (void *cls, int64_t result,
1402 const char *err_msg, uint16_t err_msg_size)
1331{ 1403{
1332 struct Channel *chn = cls; 1404 struct Channel *chn = cls;
1333 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1334 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n", 1406 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1335 chn, result, err_msg); 1407 chn, result, err_msg_size, err_msg);
1336} 1408}
1337 1409
1338 1410
@@ -1430,7 +1502,7 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1430 struct GNUNET_PSYC_CountersResultMessage res; 1502 struct GNUNET_PSYC_CountersResultMessage res;
1431 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1503 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1432 res.header.size = htons (sizeof (res)); 1504 res.header.size = htons (sizeof (res));
1433 res.result_code = htonl (result - INT32_MIN); 1505 res.result_code = GNUNET_htonl_signed (result);
1434 res.max_message_id = GNUNET_htonll (max_message_id); 1506 res.max_message_id = GNUNET_htonll (max_message_id);
1435 1507
1436 if (GNUNET_OK == result || GNUNET_NO == result) 1508 if (GNUNET_OK == result || GNUNET_NO == result)
@@ -1476,7 +1548,7 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1476 struct GNUNET_PSYC_CountersResultMessage res; 1548 struct GNUNET_PSYC_CountersResultMessage res;
1477 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1549 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1478 res.header.size = htons (sizeof (res)); 1550 res.header.size = htons (sizeof (res));
1479 res.result_code = htonl (result - INT32_MIN); 1551 res.result_code = GNUNET_htonl_signed (result);
1480 res.max_message_id = GNUNET_htonll (max_message_id); 1552 res.max_message_id = GNUNET_htonll (max_message_id);
1481 1553
1482 if (GNUNET_OK == result || GNUNET_NO == result) 1554 if (GNUNET_OK == result || GNUNET_NO == result)
@@ -1566,7 +1638,7 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1566 struct GNUNET_PSYC_CountersResultMessage res; 1638 struct GNUNET_PSYC_CountersResultMessage res;
1567 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1639 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1568 res.header.size = htons (sizeof (res)); 1640 res.header.size = htons (sizeof (res));
1569 res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN); 1641 res.result_code = GNUNET_htonl_signed (GNUNET_OK);
1570 res.max_message_id = GNUNET_htonll (mst->max_message_id); 1642 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1571 1643
1572 GNUNET_SERVER_notification_context_add (nc, client); 1644 GNUNET_SERVER_notification_context_add (nc, client);
@@ -1578,7 +1650,7 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1578 "%p Client connected as master to channel %s.\n", 1650 "%p Client connected as master to channel %s.\n",
1579 mst, GNUNET_h2s (&chn->pub_key_hash)); 1651 mst, GNUNET_h2s (&chn->pub_key_hash));
1580 1652
1581 struct ClientListItem *cli = GNUNET_new (struct ClientListItem); 1653 struct Client *cli = GNUNET_new (struct Client);
1582 cli->client = client; 1654 cli->client = client;
1583 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); 1655 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1584 1656
@@ -1677,7 +1749,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1677 struct GNUNET_PSYC_CountersResultMessage res; 1749 struct GNUNET_PSYC_CountersResultMessage res;
1678 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1750 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1679 res.header.size = htons (sizeof (res)); 1751 res.header.size = htons (sizeof (res));
1680 res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN); 1752 res.result_code = GNUNET_htonl_signed (GNUNET_OK);
1681 res.max_message_id = GNUNET_htonll (chn->max_message_id); 1753 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1682 1754
1683 GNUNET_SERVER_notification_context_add (nc, client); 1755 GNUNET_SERVER_notification_context_add (nc, client);
@@ -1716,7 +1788,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1716 "%p Client connected as slave to channel %s.\n", 1788 "%p Client connected as slave to channel %s.\n",
1717 slv, GNUNET_h2s (&chn->pub_key_hash)); 1789 slv, GNUNET_h2s (&chn->pub_key_hash));
1718 1790
1719 struct ClientListItem *cli = GNUNET_new (struct ClientListItem); 1791 struct Client *cli = GNUNET_new (struct Client);
1720 cli->client = client; 1792 cli->client = client;
1721 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); 1793 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1722 1794
@@ -2119,14 +2191,15 @@ struct MembershipStoreClosure
2119 * Received result of GNUNET_PSYCSTORE_membership_store() 2191 * Received result of GNUNET_PSYCSTORE_membership_store()
2120 */ 2192 */
2121static void 2193static void
2122store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg) 2194store_recv_membership_store_result (void *cls, int64_t result,
2195 const char *err_msg, uint16_t err_msg_size)
2123{ 2196{
2124 struct MembershipStoreClosure *mcls = cls; 2197 struct MembershipStoreClosure *mcls = cls;
2125 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2198 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2126 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n", 2199 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2127 mcls->chn, result, err_msg); 2200 mcls->chn, result, err_msg_size, err_msg);
2128 2201
2129 client_send_result (mcls->client, mcls->op_id, result, err_msg); 2202 client_send_result (mcls->client, mcls->op_id, result, err_msg, err_msg_size);
2130} 2203}
2131 2204
2132 2205
@@ -2165,36 +2238,73 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2165} 2238}
2166 2239
2167 2240
2241/**
2242 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2243 * in response to a history request from a client.
2244 */
2168static int 2245static int
2169store_recv_fragment_history (void *cls, 2246store_recv_fragment_history (void *cls,
2170 struct GNUNET_MULTICAST_MessageHeader *msg, 2247 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2171 enum GNUNET_PSYCSTORE_MessageFlags flags) 2248 enum GNUNET_PSYCSTORE_MessageFlags flags)
2172{ 2249{
2173 struct OperationClosure *opcls = cls; 2250 struct Operation *op = cls;
2174 struct Channel *chn = opcls->chn; 2251 if (NULL == op->client)
2175 client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC); 2252 { /* Requesting client already disconnected. */
2253 return GNUNET_NO;
2254 }
2255 struct Channel *chn = op->chn;
2256
2257 struct GNUNET_PSYC_MessageHeader *pmsg;
2258 uint16_t msize = ntohs (mmsg->header.size);
2259 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2260
2261 struct GNUNET_OperationResultMessage *
2262 res = GNUNET_malloc (sizeof (*res) + psize);
2263 res->header.size = htons (sizeof (*res) + psize);
2264 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2265 res->op_id = op->op_id;
2266 res->result_code = GNUNET_htonll_signed (GNUNET_OK);
2267
2268 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2269 psyc_msg_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2270 memcpy (&res[1], pmsg, psize);
2271
2272 /** @todo FIXME: send only to requesting client */
2273 client_send_msg (chn, &res->header);
2176 return GNUNET_YES; 2274 return GNUNET_YES;
2177} 2275}
2178 2276
2179 2277
2180/** 2278/**
2181 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. 2279 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2280 * in response to a history request from a client.
2182 */ 2281 */
2183static void 2282static void
2184store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg) 2283store_recv_fragment_history_result (void *cls, int64_t result,
2284 const char *err_msg, uint16_t err_msg_size)
2185{ 2285{
2186 struct OperationClosure *opcls = cls; 2286 struct Operation *op = cls;
2287 if (NULL == op->client)
2288 { /* Requesting client already disconnected. */
2289 return;
2290 }
2291
2187 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2292 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2188 "%p History replay #%" PRIu64 ": " 2293 "%p History replay #%" PRIu64 ": "
2189 "PSYCSTORE returned %" PRId64 " (%s)\n", 2294 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2190 opcls->chn, opcls->op_id, result, err_msg); 2295 op->chn, op->op_id, result, err_msg_size, err_msg);
2296
2297 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2298 {
2299 /** @todo Multicast replay request for messages not found locally. */
2300 }
2191 2301
2192 client_send_result (opcls->client, opcls->op_id, result, err_msg); 2302 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2193} 2303}
2194 2304
2195 2305
2196/** 2306/**
2197 * Client requests channel history from PSYCstore. 2307 * Client requests channel history.
2198 */ 2308 */
2199static void 2309static void
2200client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, 2310client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
@@ -2204,26 +2314,39 @@ client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2204 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); 2314 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2205 GNUNET_assert (NULL != chn); 2315 GNUNET_assert (NULL != chn);
2206 2316
2207 const struct HistoryRequest * 2317 const struct GNUNET_PSYC_HistoryRequestMessage *
2208 req = (const struct HistoryRequest *) msg; 2318 req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2319 uint16_t size = ntohs (msg->size);
2320 const char *method_prefix = (const char *) &req[1];
2209 2321
2210 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); 2322 if (size < sizeof (*req) + 1
2211 opcls->client = client; 2323 || '\0' != method_prefix[size - sizeof (*req) - 1])
2212 opcls->chn = chn; 2324 {
2213 opcls->op_id = req->op_id; 2325 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2326 "%p History replay #%" PRIu64 ": "
2327 "invalid method prefix. size: %u < %u?\n",
2328 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2329 GNUNET_break (0);
2330 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2331 return;
2332 }
2333
2334 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2214 2335
2215 if (0 == req->message_limit) 2336 if (0 == req->message_limit)
2216 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL, 2337 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2217 GNUNET_ntohll (req->start_message_id), 2338 GNUNET_ntohll (req->start_message_id),
2218 GNUNET_ntohll (req->end_message_id), 2339 GNUNET_ntohll (req->end_message_id),
2340 method_prefix,
2219 &store_recv_fragment_history, 2341 &store_recv_fragment_history,
2220 &store_recv_fragment_history_result, opcls); 2342 &store_recv_fragment_history_result, op);
2221 else 2343 else
2222 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL, 2344 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2223 GNUNET_ntohll (req->message_limit), 2345 GNUNET_ntohll (req->message_limit),
2346 method_prefix,
2224 &store_recv_fragment_history, 2347 &store_recv_fragment_history,
2225 &store_recv_fragment_history_result, 2348 &store_recv_fragment_history_result,
2226 opcls); 2349 op);
2227 2350
2228 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2351 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2229} 2352}
@@ -2236,19 +2359,19 @@ static int
2236store_recv_state_var (void *cls, const char *name, 2359store_recv_state_var (void *cls, const char *name,
2237 const void *value, size_t value_size) 2360 const void *value, size_t value_size)
2238{ 2361{
2239 struct OperationClosure *opcls = cls; 2362 struct Operation *op = cls;
2240 struct OperationResult *op; 2363 struct GNUNET_OperationResultMessage *res;
2241 2364
2242 if (NULL != name) 2365 if (NULL != name)
2243 { 2366 {
2244 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; 2367 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2245 struct GNUNET_PSYC_MessageModifier *mod; 2368 struct GNUNET_PSYC_MessageModifier *mod;
2246 op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size); 2369 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2247 op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size); 2370 res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2248 op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); 2371 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2249 op->op_id = opcls->op_id; 2372 res->op_id = op->op_id;
2250 2373
2251 mod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; 2374 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2252 mod->header.size = htons (sizeof (*mod) + name_size + value_size); 2375 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2253 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); 2376 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2254 mod->name_size = htons (name_size); 2377 mod->name_size = htons (name_size);
@@ -2260,19 +2383,20 @@ store_recv_state_var (void *cls, const char *name,
2260 else 2383 else
2261 { 2384 {
2262 struct GNUNET_MessageHeader *mod; 2385 struct GNUNET_MessageHeader *mod;
2263 op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size); 2386 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2264 op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size); 2387 res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2265 op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); 2388 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2266 op->op_id = opcls->op_id; 2389 res->op_id = op->op_id;
2267 2390
2268 mod = (struct GNUNET_MessageHeader *) &op[1]; 2391 mod = (struct GNUNET_MessageHeader *) &res[1];
2269 mod->size = htons (sizeof (*mod) + value_size); 2392 mod->size = htons (sizeof (*mod) + value_size);
2270 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); 2393 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2271 memcpy (&mod[1], value, value_size); 2394 memcpy (&mod[1], value, value_size);
2272 } 2395 }
2273 2396
2274 GNUNET_SERVER_notification_context_add (nc, opcls->client); 2397 // FIXME: client might have been disconnected
2275 GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header, 2398 GNUNET_SERVER_notification_context_add (nc, op->client);
2399 GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2276 GNUNET_NO); 2400 GNUNET_NO);
2277 GNUNET_free (op); 2401 GNUNET_free (op);
2278 return GNUNET_YES; 2402 return GNUNET_YES;
@@ -2284,15 +2408,17 @@ store_recv_state_var (void *cls, const char *name,
2284 * or GNUNET_PSYCSTORE_state_get_prefix() 2408 * or GNUNET_PSYCSTORE_state_get_prefix()
2285 */ 2409 */
2286static void 2410static void
2287store_recv_state_result (void *cls, int64_t result, const char *err_msg) 2411store_recv_state_result (void *cls, int64_t result,
2412 const char *err_msg, uint16_t err_msg_size)
2288{ 2413{
2289 struct OperationClosure *opcls = cls; 2414 struct Operation *op = cls;
2290 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2415 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2291 "%p History replay #%" PRIu64 ": " 2416 "%p History replay #%" PRIu64 ": "
2292 "PSYCSTORE returned %" PRId64 " (%s)\n", 2417 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2293 opcls->chn, opcls->op_id, result, err_msg); 2418 op->chn, op->op_id, result, err_msg_size, err_msg);
2294 2419
2295 client_send_result (opcls->client, opcls->op_id, result, err_msg); 2420 // FIXME: client might have been disconnected
2421 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2296} 2422}
2297 2423
2298 2424
@@ -2314,18 +2440,15 @@ client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2314 const char *name = (const char *) &req[1]; 2440 const char *name = (const char *) &req[1];
2315 if (0 == name_size || '\0' != name[name_size - 1]) 2441 if (0 == name_size || '\0' != name[name_size - 1])
2316 { 2442 {
2443 GNUNET_break (0);
2317 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2444 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2318 return; 2445 return;
2319 } 2446 }
2320 2447
2321 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); 2448 struct Operation *op = op_add (chn, client, req->op_id, 0);
2322 opcls->client = client;
2323 opcls->chn = chn;
2324 opcls->op_id = req->op_id;
2325
2326 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name, 2449 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2327 &store_recv_state_var, 2450 &store_recv_state_var,
2328 &store_recv_state_result, opcls); 2451 &store_recv_state_result, op);
2329 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2452 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2330} 2453}
2331 2454
@@ -2348,20 +2471,16 @@ client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2348 const char *name = (const char *) &req[1]; 2471 const char *name = (const char *) &req[1];
2349 if (0 == name_size || '\0' != name[name_size - 1]) 2472 if (0 == name_size || '\0' != name[name_size - 1])
2350 { 2473 {
2474 GNUNET_break (0);
2351 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2475 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2352 return; 2476 return;
2353 } 2477 }
2354 2478
2355 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); 2479 struct Operation *op = op_add (chn, client, req->op_id, 0);
2356 opcls->client = client;
2357 opcls->chn = chn;
2358 opcls->op_id = req->op_id;
2359
2360 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name, 2480 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2361 &store_recv_state_var, 2481 &store_recv_state_var,
2362 &store_recv_state_result, opcls); 2482 &store_recv_state_result, op);
2363 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2483 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2364
2365} 2484}
2366 2485
2367 2486
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 4bc92532f..e85e14c0e 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -171,42 +171,6 @@ struct StateRequest
171/**** service -> library ****/ 171/**** service -> library ****/
172 172
173 173
174/**
175 * Answer from service to client about last operation.
176 */
177struct OperationResult
178{
179 /**
180 * Types:
181 * - GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE
182 * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STORY_RESULT
183 * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_RESULT
184 */
185 struct GNUNET_MessageHeader header;
186
187 uint32_t reserved GNUNET_PACKED;
188
189 /**
190 * Operation ID.
191 */
192 uint64_t op_id GNUNET_PACKED;
193
194 /**
195 * Status code for the operation.
196 */
197 uint64_t result_code GNUNET_PACKED;
198
199 /* Followed by:
200 * - on error: NUL-terminated error message
201 * - on success: one of the following message types
202 *
203 * For a STATE_RESULT, one of:
204 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER
205 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT
206 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END
207 */
208};
209
210GNUNET_NETWORK_STRUCT_END 174GNUNET_NETWORK_STRUCT_END
211 175
212#endif 176#endif
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index ce994b272..7839aaf9e 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -43,33 +43,6 @@
43#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) 43#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
44 44
45 45
46struct OperationListItem
47{
48 struct OperationListItem *prev;
49 struct OperationListItem *next;
50
51 /**
52 * Operation ID.
53 */
54 uint64_t op_id;
55
56 /**
57 * Continuation to invoke with the result of an operation.
58 */
59 GNUNET_PSYC_ResultCallback result_cb;
60
61 /**
62 * State variable result callback.
63 */
64 GNUNET_PSYC_StateVarCallback state_var_cb;
65
66 /**
67 * Closure for the callbacks.
68 */
69 void *cls;
70};
71
72
73/** 46/**
74 * Handle to access PSYC channel operations for both the master and slaves. 47 * Handle to access PSYC channel operations for both the master and slaves.
75 */ 48 */
@@ -111,21 +84,6 @@ struct GNUNET_PSYC_Channel
111 void *disconnect_cls; 84 void *disconnect_cls;
112 85
113 /** 86 /**
114 * First operation in the linked list.
115 */
116 struct OperationListItem *op_head;
117
118 /**
119 * Last operation in the linked list.
120 */
121 struct OperationListItem *op_tail;
122
123 /**
124 * Last operation ID used.
125 */
126 uint64_t last_op_id;
127
128 /**
129 * Are we polling for incoming messages right now? 87 * Are we polling for incoming messages right now?
130 */ 88 */
131 uint8_t in_receive; 89 uint8_t in_receive;
@@ -204,83 +162,62 @@ struct GNUNET_PSYC_SlaveTransmitHandle
204}; 162};
205 163
206 164
207/** 165struct GNUNET_PSYC_HistoryRequest
208 * Get a fresh operation ID to distinguish between PSYCstore requests.
209 *
210 * @param h Handle to the PSYCstore service.
211 * @return next operation id to use
212 */
213static uint64_t
214op_get_next_id (struct GNUNET_PSYC_Channel *chn)
215{
216 return ++chn->last_op_id;
217}
218
219
220/**
221 * Find operation by ID.
222 *
223 * @return Operation, or NULL if none found.
224 */
225static struct OperationListItem *
226op_find_by_id (struct GNUNET_PSYC_Channel *chn, uint64_t op_id)
227{ 166{
228 struct OperationListItem *op = chn->op_head; 167 /**
229 while (NULL != op) 168 * Channel.
230 { 169 */
231 if (op->op_id == op_id) 170 struct GNUNET_PSYC_Channel *chn;
232 return op;
233 op = op->next;
234 }
235 return NULL;
236}
237 171
172 /**
173 * Operation ID.
174 */
175 uint64_t op_id;
238 176
239static uint64_t 177 /**
240op_add (struct GNUNET_PSYC_Channel *chn, GNUNET_PSYC_ResultCallback result_cb, 178 * Message handler.
241 void *cls) 179 */
242{ 180 struct GNUNET_PSYC_ReceiveHandle *recv;
243 if (NULL == result_cb)
244 return 0;
245 181
246 struct OperationListItem *op = GNUNET_malloc (sizeof (*op)); 182 /**
247 op->op_id = op_get_next_id (chn); 183 * Function to call when the operation finished.
248 op->result_cb = result_cb; 184 */
249 op->cls = cls; 185 GNUNET_ResultCallback result_cb;
250 GNUNET_CONTAINER_DLL_insert_tail (chn->op_head, chn->op_tail, op);
251 186
252 LOG (GNUNET_ERROR_TYPE_DEBUG, 187 /**
253 "%p Added operation #%" PRIu64 "\n", chn, op->op_id); 188 * Closure for @a result_cb.
254 return op->op_id; 189 */
255} 190 void *cls;
191};
256 192
257 193
258static int 194struct GNUNET_PSYC_StateRequest
259op_result (struct GNUNET_PSYC_Channel *chn, uint64_t op_id,
260 int64_t result_code, const char *err_msg)
261{ 195{
262 LOG (GNUNET_ERROR_TYPE_DEBUG, 196 /**
263 "%p Received result for operation #%" PRIu64 ": %" PRId64 " (%s)\n", 197 * Channel.
264 chn, op_id, result_code, err_msg); 198 */
265 if (0 == op_id) 199 struct GNUNET_PSYC_Channel *chn;
266 return GNUNET_NO;
267 200
268 struct OperationListItem *op = op_find_by_id (chn, op_id); 201 /**
269 if (NULL == op) 202 * Operation ID.
270 { 203 */
271 LOG (GNUNET_ERROR_TYPE_WARNING, 204 uint64_t op_id;
272 "Could not find operation #%" PRIu64 "\n", op_id);
273 return GNUNET_NO;
274 }
275 205
276 GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op); 206 /**
207 * State variable result callback.
208 */
209 GNUNET_PSYC_StateVarCallback var_cb;
277 210
278 if (NULL != op->result_cb) 211 /**
279 op->result_cb (op->cls, result_code, err_msg); 212 * Function to call when the operation finished.
213 */
214 GNUNET_ResultCallback result_cb;
280 215
281 GNUNET_free (op); 216 /**
282 return GNUNET_YES; 217 * Closure for @a result_cb.
283} 218 */
219 void *cls;
220};
284 221
285 222
286static void 223static void
@@ -313,22 +250,97 @@ channel_recv_result (void *cls,
313 struct GNUNET_PSYC_Channel * 250 struct GNUNET_PSYC_Channel *
314 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 251 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
315 252
253 const struct GNUNET_OperationResultMessage *
254 res = (const struct GNUNET_OperationResultMessage *) msg;
255
316 uint16_t size = ntohs (msg->size); 256 uint16_t size = ntohs (msg->size);
317 const struct OperationResult *res = (const struct OperationResult *) msg; 257 if (size < sizeof (*res))
318 const char *err_msg = NULL; 258 { /* Error, message too small. */
259 GNUNET_break (0);
260 return;
261 }
319 262
320 if (sizeof (struct OperationResult) < size) 263 uint16_t data_size = size - sizeof (*res);
321 { 264 const char *data = (0 < data_size) ? (void *) &res[1] : NULL;
322 err_msg = (const char *) &res[1]; 265 GNUNET_CLIENT_MANAGER_op_result (chn->client, GNUNET_ntohll (res->op_id),
323 if ('\0' != err_msg[size - sizeof (struct OperationResult) - 1]) 266 GNUNET_ntohll_signed (res->result_code),
324 { 267 data, data_size);
325 GNUNET_break (0); 268}
326 err_msg = NULL; 269
327 } 270
271static void
272op_recv_history_result (void *cls, int64_t result,
273 const void *data, uint16_t data_size)
274{
275 LOG (GNUNET_ERROR_TYPE_DEBUG,
276 "Received history replay result: %" PRId64 ".\n", result);
277
278 struct GNUNET_PSYC_HistoryRequest *hist = cls;
279
280 if (NULL != hist->result_cb)
281 hist->result_cb (hist->cls, result, data, data_size);
282
283 GNUNET_PSYC_receive_destroy (hist->recv);
284 GNUNET_free (hist);
285}
286
287
288static void
289op_recv_state_result (void *cls, int64_t result,
290 const void *data, uint16_t data_size)
291{
292 LOG (GNUNET_ERROR_TYPE_DEBUG,
293 "Received state request result: %" PRId64 ".\n", result);
294
295 struct GNUNET_PSYC_StateRequest *sr = cls;
296
297 if (NULL != sr->result_cb)
298 sr->result_cb (sr->cls, result, data, data_size);
299
300 GNUNET_free (sr);
301}
302
303
304static void
305channel_recv_history_result (void *cls,
306 struct GNUNET_CLIENT_MANAGER_Connection *client,
307 const struct GNUNET_MessageHeader *msg)
308{
309 struct GNUNET_PSYC_Channel *
310 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
311
312 const struct GNUNET_OperationResultMessage *
313 res = (const struct GNUNET_OperationResultMessage *) msg;
314 struct GNUNET_PSYC_MessageHeader *
315 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
316
317 LOG (GNUNET_ERROR_TYPE_DEBUG,
318 "%p Received historic fragment for message #%" PRIu64 ".\n",
319 chn, GNUNET_ntohll (pmsg->message_id));
320
321 GNUNET_ResultCallback result_cb = NULL;
322 struct GNUNET_PSYC_HistoryRequest *hist = NULL;
323
324 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client,
325 GNUNET_ntohll (res->op_id),
326 &result_cb, (void *) &hist))
327 { /* Operation not found. */
328 LOG (GNUNET_ERROR_TYPE_WARNING,
329 "%p Replay operation not found for historic fragment of message #%"
330 PRIu64 ".\n",
331 chn, GNUNET_ntohll (pmsg->message_id));
332 return;
328 } 333 }
329 334
330 op_result (chn, GNUNET_ntohll (res->op_id), 335 uint16_t size = ntohs (msg->size);
331 GNUNET_ntohll (res->result_code) + INT64_MIN, err_msg); 336 if (size < sizeof (*res) + sizeof (*pmsg))
337 { /* Error, message too small. */
338 GNUNET_break (0);
339 return;
340 }
341
342 GNUNET_PSYC_receive_message (hist->recv,
343 (const struct GNUNET_PSYC_MessageHeader *) pmsg);
332} 344}
333 345
334 346
@@ -340,12 +352,21 @@ channel_recv_state_result (void *cls,
340 struct GNUNET_PSYC_Channel * 352 struct GNUNET_PSYC_Channel *
341 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 353 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
342 354
343 const struct OperationResult *res = (const struct OperationResult *) msg; 355 const struct GNUNET_OperationResultMessage *
344 struct OperationListItem *op = op_find_by_id (chn, GNUNET_ntohll (res->op_id)); 356 res = (const struct GNUNET_OperationResultMessage *) msg;
345 if (NULL == op || NULL == op->state_var_cb) 357
358 GNUNET_ResultCallback result_cb = NULL;
359 struct GNUNET_PSYC_StateRequest *sr = NULL;
360
361 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client,
362 GNUNET_ntohll (res->op_id),
363 &result_cb, (void *) &sr))
364 { /* Operation not found. */
346 return; 365 return;
366 }
347 367
348 const struct GNUNET_MessageHeader *modc = (struct GNUNET_MessageHeader *) &op[1]; 368 const struct GNUNET_MessageHeader *
369 modc = (struct GNUNET_MessageHeader *) &res[1];
349 uint16_t modc_size = ntohs (modc->size); 370 uint16_t modc_size = ntohs (modc->size);
350 if (ntohs (msg->size) - sizeof (*msg) != modc_size) 371 if (ntohs (msg->size) - sizeof (*msg) != modc_size)
351 { 372 {
@@ -366,13 +387,13 @@ channel_recv_state_result (void *cls,
366 GNUNET_break (0); 387 GNUNET_break (0);
367 return; 388 return;
368 } 389 }
369 op->state_var_cb (op->cls, name, name + name_size, ntohs (mod->value_size)); 390 sr->var_cb (sr->cls, name, name + name_size, ntohs (mod->value_size));
370 break; 391 break;
371 } 392 }
372 393
373 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: 394 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
374 op->state_var_cb (op->cls, NULL, (const char *) &modc[1], 395 sr->var_cb (sr->cls, NULL, (const char *) &modc[1],
375 modc_size - sizeof (*modc)); 396 modc_size - sizeof (*modc));
376 break; 397 break;
377 } 398 }
378} 399}
@@ -412,11 +433,12 @@ master_recv_start_ack (void *cls,
412 433
413 struct GNUNET_PSYC_CountersResultMessage * 434 struct GNUNET_PSYC_CountersResultMessage *
414 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; 435 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
415 int32_t result = ntohl (cres->result_code) + INT32_MIN; 436 int32_t result = GNUNET_ntohl_signed (cres->result_code);
416 if (GNUNET_OK != result && GNUNET_NO != result) 437 if (GNUNET_OK != result && GNUNET_NO != result)
417 { 438 {
418 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master.\n"); 439 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master: %ld\n", result);
419 GNUNET_break (0); 440 GNUNET_break (0);
441 /* FIXME: disconnect */
420 } 442 }
421 if (NULL != mst->start_cb) 443 if (NULL != mst->start_cb)
422 mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); 444 mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
@@ -464,11 +486,12 @@ slave_recv_join_ack (void *cls,
464 sizeof (struct GNUNET_PSYC_Channel)); 486 sizeof (struct GNUNET_PSYC_Channel));
465 struct GNUNET_PSYC_CountersResultMessage * 487 struct GNUNET_PSYC_CountersResultMessage *
466 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; 488 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
467 int32_t result = ntohl (cres->result_code) + INT32_MIN; 489 int32_t result = GNUNET_ntohl_signed (cres->result_code);
468 if (GNUNET_YES != result && GNUNET_NO != result) 490 if (GNUNET_YES != result && GNUNET_NO != result)
469 { 491 {
470 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n"); 492 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n");
471 GNUNET_break (0); 493 GNUNET_break (0);
494 /* FIXME: disconnect */
472 } 495 }
473 if (NULL != slv->connect_cb) 496 if (NULL != slv->connect_cb)
474 slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); 497 slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
@@ -513,13 +536,17 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] =
513 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, 536 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
514 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, 537 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES },
515 538
539 { &channel_recv_history_result, NULL,
540 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
541 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
542
516 { &channel_recv_state_result, NULL, 543 { &channel_recv_state_result, NULL,
517 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, 544 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
518 sizeof (struct OperationResult), GNUNET_YES }, 545 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
519 546
520 { &channel_recv_result, NULL, 547 { &channel_recv_result, NULL,
521 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, 548 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
522 sizeof (struct OperationResult), GNUNET_YES }, 549 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
523 550
524 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 551 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
525 552
@@ -545,13 +572,17 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] =
545 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 572 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
546 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, 573 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES },
547 574
575 { &channel_recv_history_result, NULL,
576 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
577 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
578
548 { &channel_recv_state_result, NULL, 579 { &channel_recv_state_result, NULL,
549 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, 580 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
550 sizeof (struct OperationResult), GNUNET_YES }, 581 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
551 582
552 { &channel_recv_result, NULL, 583 { &channel_recv_result, NULL,
553 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, 584 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
554 sizeof (struct OperationResult), GNUNET_YES }, 585 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
555 586
556 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 587 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
557 588
@@ -1011,17 +1042,28 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv)
1011 * correctly; not doing so correctly will result in either denying other slaves 1042 * correctly; not doing so correctly will result in either denying other slaves
1012 * access or offering access to channel data to non-members. 1043 * access or offering access to channel data to non-members.
1013 * 1044 *
1014 * @param channel Channel handle. 1045 * @param chn
1015 * @param slave_key Identity of channel slave to add. 1046 * Channel handle.
1016 * @param announced_at ID of the message that announced the membership change. 1047 * @param slave_key
1017 * @param effective_since Addition of slave is in effect since this message ID. 1048 * Identity of channel slave to add.
1049 * @param announced_at
1050 * ID of the message that announced the membership change.
1051 * @param effective_since
1052 * Addition of slave is in effect since this message ID.
1053 * @param result_cb
1054 * Function to call with the result of the operation.
1055 * The @e result_code argument is #GNUNET_OK on success, or
1056 * #GNUNET_SYSERR on error. In case of an error, the @e data argument
1057 * can contain an optional error message.
1058 * @param cls
1059 * Closure for @a result_cb.
1018 */ 1060 */
1019void 1061void
1020GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, 1062GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1021 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 1063 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1022 uint64_t announced_at, 1064 uint64_t announced_at,
1023 uint64_t effective_since, 1065 uint64_t effective_since,
1024 GNUNET_PSYC_ResultCallback result_cb, 1066 GNUNET_ResultCallback result_cb,
1025 void *cls) 1067 void *cls)
1026{ 1068{
1027 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); 1069 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req));
@@ -1031,7 +1073,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1031 req->announced_at = GNUNET_htonll (announced_at); 1073 req->announced_at = GNUNET_htonll (announced_at);
1032 req->effective_since = GNUNET_htonll (effective_since); 1074 req->effective_since = GNUNET_htonll (effective_since);
1033 req->did_join = GNUNET_YES; 1075 req->did_join = GNUNET_YES;
1034 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); 1076 req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client,
1077 result_cb, cls));
1035 1078
1036 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1079 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1037} 1080}
@@ -1054,15 +1097,25 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1054 * denying members access or offering access to channel data to 1097 * denying members access or offering access to channel data to
1055 * non-members. 1098 * non-members.
1056 * 1099 *
1057 * @param channel Channel handle. 1100 * @param chn
1058 * @param slave_key Identity of channel slave to remove. 1101 * Channel handle.
1059 * @param announced_at ID of the message that announced the membership change. 1102 * @param slave_key
1103 * Identity of channel slave to remove.
1104 * @param announced_at
1105 * ID of the message that announced the membership change.
1106 * @param result_cb
1107 * Function to call with the result of the operation.
1108 * The @e result_code argument is #GNUNET_OK on success, or
1109 * #GNUNET_SYSERR on error. In case of an error, the @e data argument
1110 * can contain an optional error message.
1111 * @param cls
1112 * Closure for @a result_cb.
1060 */ 1113 */
1061void 1114void
1062GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, 1115GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1063 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 1116 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1064 uint64_t announced_at, 1117 uint64_t announced_at,
1065 GNUNET_PSYC_ResultCallback result_cb, 1118 GNUNET_ResultCallback result_cb,
1066 void *cls) 1119 void *cls)
1067{ 1120{
1068 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); 1121 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req));
@@ -1071,17 +1124,62 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1071 req->slave_key = *slave_key; 1124 req->slave_key = *slave_key;
1072 req->announced_at = GNUNET_htonll (announced_at); 1125 req->announced_at = GNUNET_htonll (announced_at);
1073 req->did_join = GNUNET_NO; 1126 req->did_join = GNUNET_NO;
1074 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); 1127 req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client,
1128 result_cb, cls));
1129
1130 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1131}
1132
1133
1134static struct GNUNET_PSYC_HistoryRequest *
1135channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1136 uint64_t start_message_id,
1137 uint64_t end_message_id,
1138 uint64_t message_limit,
1139 const char *method_prefix,
1140 uint32_t flags,
1141 GNUNET_PSYC_MessageCallback message_cb,
1142 GNUNET_PSYC_MessagePartCallback message_part_cb,
1143 GNUNET_ResultCallback result_cb,
1144 void *cls)
1145{
1146 struct GNUNET_PSYC_HistoryRequestMessage *req;
1147 struct GNUNET_PSYC_HistoryRequest *hist = GNUNET_malloc (sizeof (*hist));
1148 hist->chn = chn;
1149 hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
1150 hist->result_cb = result_cb;
1151 hist->cls = cls;
1152 hist->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client,
1153 &op_recv_history_result, hist);
1154
1155 GNUNET_assert (NULL != method_prefix);
1156 uint16_t method_size = strnlen (method_prefix,
1157 GNUNET_SERVER_MAX_MESSAGE_SIZE
1158 - sizeof (*req)) + 1;
1159 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1160 req = GNUNET_malloc (sizeof (*req) + method_size);
1161 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
1162 req->header.size = htons (sizeof (*req) + method_size);
1163 req->start_message_id = GNUNET_htonll (start_message_id);
1164 req->end_message_id = GNUNET_htonll (end_message_id);
1165 req->message_limit = GNUNET_htonll (message_limit);
1166 req->flags = htonl (flags);
1167 req->op_id = GNUNET_htonll (hist->op_id);
1168 memcpy (&req[1], method_prefix, method_size);
1075 1169
1076 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1170 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1171 return hist;
1077} 1172}
1078 1173
1079 1174
1080/** 1175/**
1081 * Request to replay a part of the message history of the channel. 1176 * Request to replay a part of the message history of the channel.
1082 * 1177 *
1083 * Historic messages (but NOT the state at the time) will be replayed (given to 1178 * Historic messages (but NOT the state at the time) will be replayed and given
1084 * the normal method handlers) if available and if access is permitted. 1179 * to the normal method handlers with a #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
1180 *
1181 * Messages are retrieved from the local PSYCstore if available,
1182 * otherwise requested from the network.
1085 * 1183 *
1086 * @param channel 1184 * @param channel
1087 * Which channel should be replayed? 1185 * Which channel should be replayed?
@@ -1089,8 +1187,10 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1089 * Earliest interesting point in history. 1187 * Earliest interesting point in history.
1090 * @param end_message_id 1188 * @param end_message_id
1091 * Last (inclusive) interesting point in history. 1189 * Last (inclusive) interesting point in history.
1092 * FIXME: @param method_prefix 1190 * @param method_prefix
1093 * Retrieve only messages with a matching method prefix. 1191 * Retrieve only messages with a matching method prefix.
1192 * @param flags
1193 * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
1094 * @param result_cb 1194 * @param result_cb
1095 * Function to call when the requested history has been fully replayed. 1195 * Function to call when the requested history has been fully replayed.
1096 * @param cls 1196 * @param cls
@@ -1098,22 +1198,20 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1098 * 1198 *
1099 * @return Handle to cancel history replay operation. 1199 * @return Handle to cancel history replay operation.
1100 */ 1200 */
1101void 1201struct GNUNET_PSYC_HistoryRequest *
1102GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn, 1202GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1103 uint64_t start_message_id, 1203 uint64_t start_message_id,
1104 uint64_t end_message_id, 1204 uint64_t end_message_id,
1105 /* FIXME: const char *method_prefix, */ 1205 const char *method_prefix,
1106 GNUNET_PSYC_ResultCallback result_cb, 1206 uint32_t flags,
1207 GNUNET_PSYC_MessageCallback message_cb,
1208 GNUNET_PSYC_MessagePartCallback message_part_cb,
1209 GNUNET_ResultCallback result_cb,
1107 void *cls) 1210 void *cls)
1108{ 1211{
1109 struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); 1212 return channel_history_replay (chn, start_message_id, end_message_id, 0,
1110 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); 1213 method_prefix, flags,
1111 req->header.size = htons (sizeof (*req)); 1214 message_cb, message_part_cb, result_cb, cls);
1112 req->start_message_id = GNUNET_htonll (start_message_id);
1113 req->end_message_id = GNUNET_htonll (end_message_id);
1114 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1115
1116 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1117} 1215}
1118 1216
1119 1217
@@ -1127,8 +1225,11 @@ GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1127 * Which channel should be replayed? 1225 * Which channel should be replayed?
1128 * @param message_limit 1226 * @param message_limit
1129 * Maximum number of messages to replay. 1227 * Maximum number of messages to replay.
1130 * FIXME: @param method_prefix 1228 * @param method_prefix
1131 * Retrieve only messages with a matching method prefix. 1229 * Retrieve only messages with a matching method prefix.
1230 * Use NULL or "" to retrieve all.
1231 * @param flags
1232 * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
1132 * @param result_cb 1233 * @param result_cb
1133 * Function to call when the requested history has been fully replayed. 1234 * Function to call when the requested history has been fully replayed.
1134 * @param cls 1235 * @param cls
@@ -1136,20 +1237,78 @@ GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1136 * 1237 *
1137 * @return Handle to cancel history replay operation. 1238 * @return Handle to cancel history replay operation.
1138 */ 1239 */
1139void 1240struct GNUNET_PSYC_HistoryRequest *
1140GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn, 1241GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn,
1141 uint64_t message_limit, 1242 uint64_t message_limit,
1142 /* FIXME: const char *method_prefix, */ 1243 const char *method_prefix,
1143 GNUNET_PSYC_ResultCallback result_cb, 1244 uint32_t flags,
1245 GNUNET_PSYC_MessageCallback message_cb,
1246 GNUNET_PSYC_MessagePartCallback message_part_cb,
1247 GNUNET_ResultCallback result_cb,
1144 void *cls) 1248 void *cls)
1145{ 1249{
1146 struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); 1250 return channel_history_replay (chn, 0, 0, message_limit, method_prefix, flags,
1147 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); 1251 message_cb, message_part_cb, result_cb, cls);
1148 req->header.size = htons (sizeof (*req)); 1252}
1149 req->message_limit = GNUNET_htonll (message_limit); 1253
1150 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); 1254
1255void
1256GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel,
1257 struct GNUNET_PSYC_HistoryRequest *hist)
1258{
1259 GNUNET_PSYC_receive_destroy (hist->recv);
1260 GNUNET_CLIENT_MANAGER_op_cancel (hist->chn->client, hist->op_id);
1261 GNUNET_free (hist);
1262}
1263
1264
1265/**
1266 * Retrieve the best matching channel state variable.
1267 *
1268 * If the requested variable name is not present in the state, the nearest
1269 * less-specific name is matched; for example, requesting "_a_b" will match "_a"
1270 * if "_a_b" does not exist.
1271 *
1272 * @param channel
1273 * Channel handle.
1274 * @param full_name
1275 * Full name of the requested variable.
1276 * The actual variable returned might have a shorter name.
1277 * @param var_cb
1278 * Function called once when a matching state variable is found.
1279 * Not called if there's no matching state variable.
1280 * @param result_cb
1281 * Function called after the operation finished.
1282 * (i.e. all state variables have been returned via @a state_cb)
1283 * @param cls
1284 * Closure for the callbacks.
1285 */
1286static struct GNUNET_PSYC_StateRequest *
1287channel_state_get (struct GNUNET_PSYC_Channel *chn,
1288 uint16_t type, const char *name,
1289 GNUNET_PSYC_StateVarCallback var_cb,
1290 GNUNET_ResultCallback result_cb, void *cls)
1291{
1292 struct StateRequest *req;
1293 struct GNUNET_PSYC_StateRequest *sr = GNUNET_malloc (sizeof (*sr));
1294 sr->chn = chn;
1295 sr->var_cb = var_cb;
1296 sr->result_cb = result_cb;
1297 sr->cls = cls;
1298 sr->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client,
1299 &op_recv_state_result, sr);
1300
1301 GNUNET_assert (NULL != name);
1302 size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE
1303 - sizeof (*req)) + 1;
1304 req = GNUNET_malloc (sizeof (*req) + name_size);
1305 req->header.type = htons (type);
1306 req->header.size = htons (sizeof (*req) + name_size);
1307 req->op_id = GNUNET_htonll (sr->op_id);
1308 memcpy (&req[1], name, name_size);
1151 1309
1152 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1310 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1311 return sr;
1153} 1312}
1154 1313
1155 1314
@@ -1174,21 +1333,16 @@ GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn,
1174 * @param cls 1333 * @param cls
1175 * Closure for the callbacks. 1334 * Closure for the callbacks.
1176 */ 1335 */
1177void 1336struct GNUNET_PSYC_StateRequest *
1178GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn, 1337GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn,
1179 const char *full_name, 1338 const char *full_name,
1180 GNUNET_PSYC_StateVarCallback var_cb, 1339 GNUNET_PSYC_StateVarCallback var_cb,
1181 GNUNET_PSYC_ResultCallback result_cb, 1340 GNUNET_ResultCallback result_cb,
1182 void *cls) 1341 void *cls)
1183{ 1342{
1184 size_t name_size = strlen (full_name) + 1; 1343 return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
1185 struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); 1344 full_name, var_cb, result_cb, cls);
1186 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET);
1187 req->header.size = htons (sizeof (*req) + name_size);
1188 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1189 memcpy (&req[1], full_name, name_size);
1190 1345
1191 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1192} 1346}
1193 1347
1194 1348
@@ -1215,21 +1369,29 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn,
1215 * @param cls 1369 * @param cls
1216 * Closure for the callbacks. 1370 * Closure for the callbacks.
1217 */ 1371 */
1218void 1372struct GNUNET_PSYC_StateRequest *
1219GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn, 1373GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn,
1220 const char *name_prefix, 1374 const char *name_prefix,
1221 GNUNET_PSYC_StateVarCallback var_cb, 1375 GNUNET_PSYC_StateVarCallback var_cb,
1222 GNUNET_PSYC_ResultCallback result_cb, 1376 GNUNET_ResultCallback result_cb,
1223 void *cls) 1377 void *cls)
1224{ 1378{
1225 size_t name_size = strlen (name_prefix) + 1; 1379 return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
1226 struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); 1380 name_prefix, var_cb, result_cb, cls);
1227 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET); 1381}
1228 req->header.size = htons (sizeof (*req) + name_size);
1229 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1230 memcpy (&req[1], name_prefix, name_size);
1231 1382
1232 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1383
1384/**
1385 * Cancel a state request operation.
1386 *
1387 * @param sr
1388 * Handle for the operation to cancel.
1389 */
1390void
1391GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr)
1392{
1393 GNUNET_CLIENT_MANAGER_op_cancel (sr->chn->client, sr->op_id);
1394 GNUNET_free (sr);
1233} 1395}
1234 1396
1235/* end of psyc_api.c */ 1397/* end of psyc_api.c */
diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c
index 961922ce4..ebbc2dad8 100644
--- a/src/psyc/psyc_util_lib.c
+++ b/src/psyc/psyc_util_lib.c
@@ -326,9 +326,13 @@ GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
326 * The message part is added to the current message buffer. 326 * The message part is added to the current message buffer.
327 * When this buffer is full, it is added to the transmission queue. 327 * When this buffer is full, it is added to the transmission queue.
328 * 328 *
329 * @param tmit Transmission handle. 329 * @param tmit
330 * @param msg Message part, or NULL. 330 * Transmission handle.
331 * @param end End of message? #GNUNET_YES or #GNUNET_NO. 331 * @param msg
332 * Message part, or NULL.
333 * @param end
334 * End of message?
335 * #GNUNET_YES or #GNUNET_NO.
332 */ 336 */
333static void 337static void
334transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, 338transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
@@ -632,16 +636,24 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
632/** 636/**
633 * Transmit a message. 637 * Transmit a message.
634 * 638 *
635 * @param tmit Transmission handle. 639 * @param tmit
636 * @param method_name Which method should be invoked. 640 * Transmission handle.
637 * @param env Environment for the message. 641 * @param method_name
638 * Should stay available until the first call to notify_data. 642 * Which method should be invoked.
639 * Can be NULL if there are no modifiers or @a notify_mod is provided instead. 643 * @param env
640 * @param notify_mod Function to call to obtain modifiers. 644 * Environment for the message.
641 * Can be NULL if there are no modifiers or @a env is provided instead. 645 * Should stay available until the first call to notify_data.
642 * @param notify_data Function to call to obtain fragments of the data. 646 * Can be NULL if there are no modifiers or @a notify_mod is
643 * @param notify_cls Closure for @a notify_mod and @a notify_data. 647 * provided instead.
644 * @param flags Flags for the message being transmitted. 648 * @param notify_mod
649 * Function to call to obtain modifiers.
650 * Can be NULL if there are no modifiers or @a env is provided instead.
651 * @param notify_data
652 * Function to call to obtain fragments of the data.
653 * @param notify_cls
654 * Closure for @a notify_mod and @a notify_data.
655 * @param flags
656 * Flags for the message being transmitted.
645 * 657 *
646 * @return #GNUNET_OK if the transmission was started. 658 * @return #GNUNET_OK if the transmission was started.
647 * #GNUNET_SYSERR if another transmission is already going on. 659 * #GNUNET_SYSERR if another transmission is already going on.
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 7160c13c6..ba31d9329 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -82,7 +82,7 @@ struct TransmitClosure
82 82
83struct TransmitClosure *tmit; 83struct TransmitClosure *tmit;
84 84
85uint8_t join_req_count; 85uint8_t join_req_count, end_count;
86 86
87enum 87enum
88{ 88{
@@ -105,6 +105,9 @@ enum
105void 105void
106master_transmit (); 106master_transmit ();
107 107
108void
109master_history_replay_latest ();
110
108 111
109void master_stopped (void *cls) 112void master_stopped (void *cls)
110{ 113{
@@ -198,6 +201,134 @@ end ()
198 201
199 202
200void 203void
204master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
205 const struct GNUNET_PSYC_MessageHeader *msg)
206{
207 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
208 "Test #%d: Master got PSYC message fragment of size %u "
209 "belonging to message ID %" PRIu64 " with flags %x\n",
210 test, ntohs (msg->header.size), message_id, flags);
211 // FIXME
212}
213
214
215void
216master_message_part_cb (void *cls, uint64_t message_id,
217 uint64_t data_offset, uint32_t flags,
218 const struct GNUNET_MessageHeader *msg)
219{
220 if (NULL == msg)
221 {
222 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
223 "Error while receiving message %" PRIu64 "\n", message_id);
224 return;
225 }
226
227 uint16_t type = ntohs (msg->type);
228 uint16_t size = ntohs (msg->size);
229
230 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
231 "Test #%d: Master got message part of type %u and size %u "
232 "belonging to message ID %" PRIu64 " with flags %x\n",
233 test, type, size, message_id, flags);
234
235 switch (test)
236 {
237 case TEST_SLAVE_TRANSMIT:
238 if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
239 {
240 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
241 "Unexpected request flags: %x" PRIu32 "\n", flags);
242 GNUNET_assert (0);
243 return;
244 }
245 // FIXME: check rest of message
246
247 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
248 master_transmit ();
249 break;
250
251 case TEST_MASTER_TRANSMIT:
252 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count)
253 master_history_replay_latest ();
254 break;
255
256 case TEST_MASTER_HISTORY_REPLAY:
257 case TEST_MASTER_HISTORY_REPLAY_LATEST:
258 if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
259 {
260 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
261 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
262 test, flags);
263 GNUNET_assert (0);
264 return;
265 }
266 break;
267
268 default:
269 GNUNET_assert (0);
270 }
271}
272
273
274void
275slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
276 const struct GNUNET_PSYC_MessageHeader *msg)
277{
278 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
279 "Test #%d: Slave got PSYC message fragment of size %u "
280 "belonging to message ID %" PRIu64 " with flags %x\n",
281 test, ntohs (msg->header.size), message_id, flags);
282 // FIXME
283}
284
285
286void
287slave_message_part_cb (void *cls, uint64_t message_id,
288 uint64_t data_offset, uint32_t flags,
289 const struct GNUNET_MessageHeader *msg)
290{
291 if (NULL == msg)
292 {
293 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
294 "Error while receiving message " PRIu64 "\n", message_id);
295 return;
296 }
297
298 uint16_t type = ntohs (msg->type);
299 uint16_t size = ntohs (msg->size);
300
301 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
302 "Test #%d: Slave got message part of type %u and size %u "
303 "belonging to message ID %" PRIu64 " with flags %x\n",
304 test, type, size, message_id, flags);
305
306 switch (test)
307 {
308 case TEST_MASTER_TRANSMIT:
309 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count)
310 master_history_replay_latest ();
311 break;
312
313 case TEST_SLAVE_HISTORY_REPLAY:
314 case TEST_SLAVE_HISTORY_REPLAY_LATEST:
315 if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
316 {
317 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
318 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
319 flags);
320 GNUNET_assert (0);
321 return;
322 }
323 break;
324
325 default:
326 GNUNET_assert (0);
327 }
328}
329
330
331void
201state_get_var (void *cls, const char *name, const void *value, size_t value_size) 332state_get_var (void *cls, const char *name, const void *value, size_t value_size)
202{ 333{
203 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 334 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -208,10 +339,12 @@ state_get_var (void *cls, const char *name, const void *value, size_t value_size
208/*** Slave state_get_prefix() ***/ 339/*** Slave state_get_prefix() ***/
209 340
210void 341void
211slave_state_get_prefix_result (void *cls, int64_t result, const char *err_msg) 342slave_state_get_prefix_result (void *cls, int64_t result,
343 const void *err_msg, uint16_t err_msg_size)
212{ 344{
213 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 345 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
214 "slave_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); 346 "slave_state_get_prefix:\t%" PRId64 " (%.s)\n",
347 result, err_msg_size, err_msg);
215 // FIXME: GNUNET_assert (2 == result); 348 // FIXME: GNUNET_assert (2 == result);
216 end (); 349 end ();
217} 350}
@@ -230,7 +363,8 @@ slave_state_get_prefix ()
230 363
231 364
232void 365void
233master_state_get_prefix_result (void *cls, int64_t result, const char *err_msg) 366master_state_get_prefix_result (void *cls, int64_t result,
367 const void *err_msg, uint16_t err_msg_size)
234{ 368{
235 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 369 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
236 "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); 370 "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg);
@@ -252,10 +386,12 @@ master_state_get_prefix ()
252 386
253 387
254void 388void
255slave_state_get_result (void *cls, int64_t result, const char *err_msg) 389slave_state_get_result (void *cls, int64_t result,
390 const void *err_msg, uint16_t err_msg_size)
256{ 391{
257 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 392 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
258 "slave_state_get:\t%" PRId64 " (%s)\n", result, err_msg); 393 "slave_state_get:\t%" PRId64 " (%.*s)\n",
394 result, err_msg_size, err_msg);
259 // FIXME: GNUNET_assert (2 == result); 395 // FIXME: GNUNET_assert (2 == result);
260 master_state_get_prefix (); 396 master_state_get_prefix ();
261} 397}
@@ -274,10 +410,12 @@ slave_state_get ()
274 410
275 411
276void 412void
277master_state_get_result (void *cls, int64_t result, const char *err_msg) 413master_state_get_result (void *cls, int64_t result,
414 const void *err_msg, uint16_t err_msg_size)
278{ 415{
279 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 416 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
280 "master_state_get:\t%" PRId64 " (%s)\n", result, err_msg); 417 "master_state_get:\t%" PRId64 " (%.*s)\n",
418 result, err_msg_size, err_msg);
281 // FIXME: GNUNET_assert (1 == result); 419 // FIXME: GNUNET_assert (1 == result);
282 slave_state_get (); 420 slave_state_get ();
283} 421}
@@ -295,10 +433,12 @@ master_state_get ()
295/*** Slave history_replay() ***/ 433/*** Slave history_replay() ***/
296 434
297void 435void
298slave_history_replay_result (void *cls, int64_t result, const char *err_msg) 436slave_history_replay_result (void *cls, int64_t result,
437 const void *err_msg, uint16_t err_msg_size)
299{ 438{
300 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 439 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
301 "slave_history_replay:\t%" PRId64 " (%s)\n", result, err_msg); 440 "slave_history_replay:\t%" PRId64 " (%.*s)\n",
441 result, err_msg_size, err_msg);
302 GNUNET_assert (9 == result); 442 GNUNET_assert (9 == result);
303 443
304 master_state_get (); 444 master_state_get ();
@@ -309,9 +449,11 @@ void
309slave_history_replay () 449slave_history_replay ()
310{ 450{
311 test = TEST_SLAVE_HISTORY_REPLAY; 451 test = TEST_SLAVE_HISTORY_REPLAY;
312 GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, 452 GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "",
313 &slave_history_replay_result, 453 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
314 NULL); 454 &slave_message_cb,
455 &slave_message_part_cb,
456 &slave_history_replay_result, NULL);
315} 457}
316 458
317 459
@@ -319,10 +461,12 @@ slave_history_replay ()
319 461
320 462
321void 463void
322master_history_replay_result (void *cls, int64_t result, const char *err_msg) 464master_history_replay_result (void *cls, int64_t result,
465 const void *err_msg, uint16_t err_msg_size)
323{ 466{
324 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 467 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
325 "master_history_replay:\t%" PRId64 " (%s)\n", result, err_msg); 468 "master_history_replay:\t%" PRId64 " (%.*s)\n",
469 result, err_msg_size, err_msg);
326 GNUNET_assert (9 == result); 470 GNUNET_assert (9 == result);
327 471
328 slave_history_replay (); 472 slave_history_replay ();
@@ -333,9 +477,11 @@ void
333master_history_replay () 477master_history_replay ()
334{ 478{
335 test = TEST_MASTER_HISTORY_REPLAY; 479 test = TEST_MASTER_HISTORY_REPLAY;
336 GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, 480 GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "",
337 &master_history_replay_result, 481 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
338 NULL); 482 &master_message_cb,
483 &master_message_part_cb,
484 &master_history_replay_result, NULL);
339} 485}
340 486
341 487
@@ -343,10 +489,12 @@ master_history_replay ()
343 489
344 490
345void 491void
346slave_history_replay_latest_result (void *cls, int64_t result, const char *err_msg) 492slave_history_replay_latest_result (void *cls, int64_t result,
493 const void *err_msg, uint16_t err_msg_size)
347{ 494{
348 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 495 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
349 "slave_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg); 496 "slave_history_replay_latest:\t%" PRId64 " (%.*s)\n",
497 result, err_msg_size, err_msg);
350 GNUNET_assert (9 == result); 498 GNUNET_assert (9 == result);
351 499
352 master_history_replay (); 500 master_history_replay ();
@@ -357,7 +505,10 @@ void
357slave_history_replay_latest () 505slave_history_replay_latest ()
358{ 506{
359 test = TEST_SLAVE_HISTORY_REPLAY_LATEST; 507 test = TEST_SLAVE_HISTORY_REPLAY_LATEST;
360 GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, 508 GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, "",
509 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
510 &slave_message_cb,
511 &slave_message_part_cb,
361 &slave_history_replay_latest_result, 512 &slave_history_replay_latest_result,
362 NULL); 513 NULL);
363} 514}
@@ -367,10 +518,12 @@ slave_history_replay_latest ()
367 518
368 519
369void 520void
370master_history_replay_latest_result (void *cls, int64_t result, const char *err_msg) 521master_history_replay_latest_result (void *cls, int64_t result,
522 const void *err_msg, uint16_t err_msg_size)
371{ 523{
372 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 524 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
373 "master_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg); 525 "master_history_replay_latest:\t%" PRId64 " (%.*s)\n",
526 result, err_msg_size, err_msg);
374 GNUNET_assert (9 == result); 527 GNUNET_assert (9 == result);
375 528
376 slave_history_replay_latest (); 529 slave_history_replay_latest ();
@@ -381,139 +534,16 @@ void
381master_history_replay_latest () 534master_history_replay_latest ()
382{ 535{
383 test = TEST_MASTER_HISTORY_REPLAY_LATEST; 536 test = TEST_MASTER_HISTORY_REPLAY_LATEST;
384 GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, 537 GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, "",
538 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
539 &master_message_cb,
540 &master_message_part_cb,
385 &master_history_replay_latest_result, 541 &master_history_replay_latest_result,
386 NULL); 542 NULL);
387} 543}
388 544
389 545
390void 546void
391master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
392 const struct GNUNET_PSYC_MessageHeader *msg)
393{
394 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
395 "Test #%d: Master got PSYC message fragment of size %u "
396 "belonging to message ID %" PRIu64 " with flags %x\n",
397 test, ntohs (msg->header.size), message_id, flags);
398 // FIXME
399}
400
401
402void
403master_message_part_cb (void *cls, uint64_t message_id,
404 uint64_t data_offset, uint32_t flags,
405 const struct GNUNET_MessageHeader *msg)
406{
407 if (NULL == msg)
408 {
409 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
410 "Error while receiving message %" PRIu64 "\n", message_id);
411 return;
412 }
413
414 uint16_t type = ntohs (msg->type);
415 uint16_t size = ntohs (msg->size);
416
417 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
418 "Test #%d: Master got message part of type %u and size %u "
419 "belonging to message ID %" PRIu64 " with flags %x\n",
420 test, type, size, message_id, flags);
421
422 switch (test)
423 {
424 case TEST_SLAVE_TRANSMIT:
425 if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
426 {
427 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
428 "Unexpected request flags: %x" PRIu32 "\n", flags);
429 GNUNET_assert (0);
430 return;
431 }
432 // FIXME: check rest of message
433
434 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
435 master_transmit ();
436 break;
437
438 case TEST_MASTER_TRANSMIT:
439 break;
440
441 case TEST_MASTER_HISTORY_REPLAY:
442 case TEST_MASTER_HISTORY_REPLAY_LATEST:
443 if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
444 {
445 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
446 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
447 flags);
448 GNUNET_assert (0);
449 return;
450 }
451 break;
452
453 default:
454 GNUNET_assert (0);
455 }
456}
457
458
459void
460slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
461 const struct GNUNET_PSYC_MessageHeader *msg)
462{
463 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
464 "Test #%d: Slave got PSYC message fragment of size %u "
465 "belonging to message ID %" PRIu64 " with flags %x\n",
466 test, ntohs (msg->header.size), message_id, flags);
467 // FIXME
468}
469
470
471void
472slave_message_part_cb (void *cls, uint64_t message_id,
473 uint64_t data_offset, uint32_t flags,
474 const struct GNUNET_MessageHeader *msg)
475{
476 if (NULL == msg)
477 {
478 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
479 "Error while receiving message " PRIu64 "\n", message_id);
480 return;
481 }
482
483 uint16_t type = ntohs (msg->type);
484 uint16_t size = ntohs (msg->size);
485
486 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
487 "Test #%d: Slave got message part of type %u and size %u "
488 "belonging to message ID %" PRIu64 " with flags %x\n",
489 test, type, size, message_id, flags);
490
491 switch (test)
492 {
493 case TEST_MASTER_TRANSMIT:
494 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
495 master_history_replay_latest ();
496 break;
497
498 case TEST_SLAVE_HISTORY_REPLAY:
499 case TEST_SLAVE_HISTORY_REPLAY_LATEST:
500 if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
501 {
502 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
503 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
504 flags);
505 GNUNET_assert (0);
506 return;
507 }
508 break;
509
510 default:
511 GNUNET_assert (0);
512 }
513}
514
515
516void
517transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 547transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
518{ 548{
519 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); 549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
@@ -665,27 +695,31 @@ slave_transmit ()
665 tmit->data[0] = "slave test"; 695 tmit->data[0] = "slave test";
666 tmit->data_count = 1; 696 tmit->data_count = 1;
667 tmit->slv_tmit 697 tmit->slv_tmit
668 = GNUNET_PSYC_slave_transmit (slv, "_request_test", tmit_notify_mod, 698 = GNUNET_PSYC_slave_transmit (slv, "_request_test", &tmit_notify_mod,
669 tmit_notify_data, tmit, 699 &tmit_notify_data, tmit,
670 GNUNET_PSYC_SLAVE_TRANSMIT_NONE); 700 GNUNET_PSYC_SLAVE_TRANSMIT_NONE);
671} 701}
672 702
673 703
674void 704void
675slave_remove_cb (void *cls, int64_t result, const char *err_msg) 705slave_remove_cb (void *cls, int64_t result,
706 const void *err_msg, uint16_t err_msg_size)
676{ 707{
677 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 708 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
678 "slave_remove:\t%" PRId64 " (%s)\n", result, err_msg); 709 "slave_remove:\t%" PRId64 " (%.*s)\n",
710 result, err_msg_size, err_msg);
679 711
680 slave_transmit (); 712 slave_transmit ();
681} 713}
682 714
683 715
684void 716void
685slave_add_cb (void *cls, int64_t result, const char *err_msg) 717slave_add_cb (void *cls, int64_t result,
718 const void *err_msg, uint16_t err_msg_size)
686{ 719{
687 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 720 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
688 "slave_add:\t%" PRId64 " (%s)\n", result, err_msg); 721 "slave_add:\t%" PRId64 " (%.*s)\n",
722 result, err_msg_size, err_msg);
689 723
690 struct GNUNET_PSYC_Channel *chn = cls; 724 struct GNUNET_PSYC_Channel *chn = cls;
691 GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2, 725 GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
@@ -775,6 +809,8 @@ master_transmit ()
775{ 809{
776 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n"); 810 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
777 test = TEST_MASTER_TRANSMIT; 811 test = TEST_MASTER_TRANSMIT;
812 end_count = 0;
813
778 uint32_t i, j; 814 uint32_t i, j;
779 815
780 char *name_max = "_test_max"; 816 char *name_max = "_test_max";
@@ -816,8 +852,8 @@ master_transmit ()
816 tmit->data_delay[1] = 3; 852 tmit->data_delay[1] = 3;
817 tmit->data_count = 4; 853 tmit->data_count = 4;
818 tmit->mst_tmit 854 tmit->mst_tmit
819 = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod, 855 = GNUNET_PSYC_master_transmit (mst, "_notice_test", &tmit_notify_mod,
820 tmit_notify_data, tmit, 856 &tmit_notify_data, tmit,
821 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); 857 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
822} 858}
823 859