diff options
author | Gabor X Toth <*@tg-x.net> | 2015-05-07 12:15:58 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2015-05-07 12:15:58 +0000 |
commit | 4725d59b468f1f30ba2910992333ca157682ce29 (patch) | |
tree | 23715ee20879c94a3363e28ea184370a4a71e44d /src/psyc | |
parent | a5edf8ac9f03a368c87ea6163994d4ac3d62af06 (diff) | |
download | gnunet-4725d59b468f1f30ba2910992333ca157682ce29.tar.gz gnunet-4725d59b468f1f30ba2910992333ca157682ce29.zip |
psyc/social: request history & state from psycstore; more documentation, tests, cleanup
Diffstat (limited to 'src/psyc')
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 355 | ||||
-rw-r--r-- | src/psyc/psyc.h | 36 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 532 | ||||
-rw-r--r-- | src/psyc/psyc_util_lib.c | 38 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 352 |
5 files changed, 803 insertions, 510 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 5ebbe6444..2bc128c4f 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -181,11 +181,24 @@ struct FragmentQueue | |||
181 | /** | 181 | /** |
182 | * List of connected clients. | 182 | * List of connected clients. |
183 | */ | 183 | */ |
184 | struct ClientListItem | 184 | struct Client |
185 | { | 185 | { |
186 | struct ClientListItem *prev; | 186 | struct Client *prev; |
187 | struct ClientListItem *next; | 187 | struct Client *next; |
188 | |||
189 | struct GNUNET_SERVER_Client *client; | ||
190 | }; | ||
191 | |||
192 | |||
193 | struct Operation | ||
194 | { | ||
195 | struct Operation *prev; | ||
196 | struct Operation *next; | ||
197 | |||
188 | struct GNUNET_SERVER_Client *client; | 198 | struct GNUNET_SERVER_Client *client; |
199 | struct Channel *chn; | ||
200 | uint64_t op_id; | ||
201 | uint32_t flags; | ||
189 | }; | 202 | }; |
190 | 203 | ||
191 | 204 | ||
@@ -194,8 +207,11 @@ struct ClientListItem | |||
194 | */ | 207 | */ |
195 | struct Channel | 208 | struct Channel |
196 | { | 209 | { |
197 | struct ClientListItem *clients_head; | 210 | struct Client *clients_head; |
198 | struct ClientListItem *clients_tail; | 211 | struct Client *clients_tail; |
212 | |||
213 | struct Operation *op_head; | ||
214 | struct Operation *op_tail; | ||
199 | 215 | ||
200 | struct TransmitMessage *tmit_head; | 216 | struct TransmitMessage *tmit_head; |
201 | struct TransmitMessage *tmit_tail; | 217 | struct TransmitMessage *tmit_tail; |
@@ -397,14 +413,6 @@ struct Slave | |||
397 | }; | 413 | }; |
398 | 414 | ||
399 | 415 | ||
400 | struct OperationClosure | ||
401 | { | ||
402 | struct GNUNET_SERVER_Client *client; | ||
403 | struct Channel *chn; | ||
404 | uint64_t op_id; | ||
405 | }; | ||
406 | |||
407 | |||
408 | static void | 416 | static void |
409 | transmit_message (struct Channel *chn); | 417 | transmit_message (struct Channel *chn); |
410 | 418 | ||
@@ -435,6 +443,28 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
435 | } | 443 | } |
436 | 444 | ||
437 | 445 | ||
446 | static struct Operation * | ||
447 | op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client, | ||
448 | uint64_t op_id, uint32_t flags) | ||
449 | { | ||
450 | struct Operation *op = GNUNET_malloc (sizeof (*op)); | ||
451 | op->client = client; | ||
452 | op->chn = chn; | ||
453 | op->op_id = op_id; | ||
454 | op->flags = flags; | ||
455 | GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op); | ||
456 | return op; | ||
457 | } | ||
458 | |||
459 | |||
460 | static void | ||
461 | op_remove (struct Channel *chn, struct Operation *op) | ||
462 | { | ||
463 | GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op); | ||
464 | GNUNET_free (op); | ||
465 | } | ||
466 | |||
467 | |||
438 | /** | 468 | /** |
439 | * Clean up master data structures after a client disconnected. | 469 | * Clean up master data structures after a client disconnected. |
440 | */ | 470 | */ |
@@ -541,7 +571,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
541 | chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", | 571 | chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", |
542 | GNUNET_h2s (&chn->pub_key_hash)); | 572 | GNUNET_h2s (&chn->pub_key_hash)); |
543 | 573 | ||
544 | struct ClientListItem *cli = chn->clients_head; | 574 | struct Client *cli = chn->clients_head; |
545 | while (NULL != cli) | 575 | while (NULL != cli) |
546 | { | 576 | { |
547 | if (cli->client == client) | 577 | if (cli->client == client) |
@@ -553,6 +583,17 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
553 | cli = cli->next; | 583 | cli = cli->next; |
554 | } | 584 | } |
555 | 585 | ||
586 | struct Operation *op = chn->op_head; | ||
587 | while (NULL != op) | ||
588 | { | ||
589 | if (op->client == client) | ||
590 | { | ||
591 | op->client = NULL; | ||
592 | break; | ||
593 | } | ||
594 | op = op->next; | ||
595 | } | ||
596 | |||
556 | if (NULL == chn->clients_head) | 597 | if (NULL == chn->clients_head) |
557 | { /* Last client disconnected. */ | 598 | { /* Last client disconnected. */ |
558 | if (NULL != chn->tmit_head) | 599 | if (NULL != chn->tmit_head) |
@@ -574,10 +615,10 @@ static void | |||
574 | client_send_msg (const struct Channel *chn, | 615 | client_send_msg (const struct Channel *chn, |
575 | const struct GNUNET_MessageHeader *msg) | 616 | const struct GNUNET_MessageHeader *msg) |
576 | { | 617 | { |
577 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 618 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
578 | "%p Sending message to clients.\n", chn); | 619 | "%p Sending message to clients.\n", chn); |
579 | 620 | ||
580 | struct ClientListItem *cli = chn->clients_head; | 621 | struct Client *cli = chn->clients_head; |
581 | while (NULL != cli) | 622 | while (NULL != cli) |
582 | { | 623 | { |
583 | GNUNET_SERVER_notification_context_add (nc, cli->client); | 624 | GNUNET_SERVER_notification_context_add (nc, cli->client); |
@@ -596,33 +637,29 @@ client_send_msg (const struct Channel *chn, | |||
596 | * Code to transmit. | 637 | * Code to transmit. |
597 | * @param op_id | 638 | * @param op_id |
598 | * Operation ID in network byte order. | 639 | * Operation ID in network byte order. |
599 | * @param err_msg | 640 | * @param data |
600 | * Error message to include (or NULL for none). | 641 | * Data payload or NULL. |
642 | * @param data_size | ||
643 | * Size of @a data. | ||
601 | */ | 644 | */ |
602 | static void | 645 | static void |
603 | client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id, | 646 | client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id, |
604 | int64_t result_code, const char *err_msg) | 647 | int64_t result_code, const void *data, uint16_t data_size) |
605 | { | 648 | { |
606 | struct OperationResult *res; | 649 | struct GNUNET_OperationResultMessage *res; |
607 | size_t err_size = 0; | ||
608 | 650 | ||
609 | if (NULL != err_msg) | 651 | res = GNUNET_malloc (sizeof (*res) + data_size); |
610 | err_size = strnlen (err_msg, | ||
611 | GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res)) + 1; | ||
612 | res = GNUNET_malloc (sizeof (struct OperationResult) + err_size); | ||
613 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); | 652 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); |
614 | res->header.size = htons (sizeof (struct OperationResult) + err_size); | 653 | res->header.size = htons (sizeof (*res) + data_size); |
615 | res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1); | 654 | res->result_code = GNUNET_htonll_signed (result_code); |
616 | res->op_id = op_id; | 655 | res->op_id = op_id; |
617 | if (0 < err_size) | 656 | if (0 < data_size) |
618 | { | 657 | memcpy (&res[1], data, data_size); |
619 | memcpy (&res[1], err_msg, err_size); | 658 | |
620 | ((char *) &res[1])[err_size - 1] = '\0'; | ||
621 | } | ||
622 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 659 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
623 | "%p Sending result to client for operation #%" PRIu64 ": " | 660 | "%p Sending result to client for operation #%" PRIu64 ": " |
624 | "%" PRId64 " (%s)\n", | 661 | "%" PRId64 " (size: %u)\n", |
625 | client, GNUNET_ntohll (op_id), result_code, err_msg); | 662 | client, GNUNET_ntohll (op_id), result_code, data_size); |
626 | 663 | ||
627 | GNUNET_SERVER_notification_context_add (nc, client); | 664 | GNUNET_SERVER_notification_context_add (nc, client); |
628 | GNUNET_SERVER_notification_context_unicast (nc, client, &res->header, | 665 | GNUNET_SERVER_notification_context_unicast (nc, client, &res->header, |
@@ -647,7 +684,8 @@ struct JoinMemTestClosure | |||
647 | * Membership test result callback used for join requests. | 684 | * Membership test result callback used for join requests. |
648 | */ | 685 | */ |
649 | static void | 686 | static void |
650 | join_mem_test_cb (void *cls, int64_t result, const char *err_msg) | 687 | join_mem_test_cb (void *cls, int64_t result, |
688 | const char *err_msg, uint16_t err_msg_size) | ||
651 | { | 689 | { |
652 | struct JoinMemTestClosure *jcls = cls; | 690 | struct JoinMemTestClosure *jcls = cls; |
653 | 691 | ||
@@ -663,6 +701,12 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg) | |||
663 | } | 701 | } |
664 | else | 702 | else |
665 | { | 703 | { |
704 | if (GNUNET_SYSERR == result) | ||
705 | { | ||
706 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
707 | "Could not perform membership test (%.*s)\n", | ||
708 | err_msg_size, err_msg); | ||
709 | } | ||
666 | // FIXME: add relays | 710 | // FIXME: add relays |
667 | GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL); | 711 | GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL); |
668 | } | 712 | } |
@@ -759,12 +803,13 @@ mcast_recv_join_decision (void *cls, int is_admitted, | |||
759 | * Received result of GNUNET_PSYCSTORE_membership_test() | 803 | * Received result of GNUNET_PSYCSTORE_membership_test() |
760 | */ | 804 | */ |
761 | static void | 805 | static void |
762 | store_recv_membership_test_result (void *cls, int64_t result, const char *err_msg) | 806 | store_recv_membership_test_result (void *cls, int64_t result, |
807 | const char *err_msg, uint16_t err_msg_size) | ||
763 | { | 808 | { |
764 | struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls; | 809 | struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls; |
765 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 810 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
766 | "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%s)\n", | 811 | "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%.*s)\n", |
767 | mth, result, err_msg); | 812 | mth, result, err_msg_size, err_msg); |
768 | 813 | ||
769 | GNUNET_MULTICAST_membership_test_result (mth, result); | 814 | GNUNET_MULTICAST_membership_test_result (mth, result); |
770 | } | 815 | } |
@@ -805,12 +850,13 @@ store_recv_fragment_replay (void *cls, | |||
805 | * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. | 850 | * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. |
806 | */ | 851 | */ |
807 | static void | 852 | static void |
808 | store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg) | 853 | store_recv_fragment_replay_result (void *cls, int64_t result, |
854 | const char *err_msg, uint16_t err_msg_size) | ||
809 | { | 855 | { |
810 | struct GNUNET_MULTICAST_ReplayHandle *rh = cls; | 856 | struct GNUNET_MULTICAST_ReplayHandle *rh = cls; |
811 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 857 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
812 | "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n", | 858 | "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n", |
813 | rh, result, err_msg); | 859 | rh, result, err_msg_size, err_msg); |
814 | 860 | ||
815 | switch (result) | 861 | switch (result) |
816 | { | 862 | { |
@@ -867,7 +913,7 @@ mcast_recv_replay_message (void *cls, | |||
867 | { | 913 | { |
868 | struct Channel *chn = cls; | 914 | struct Channel *chn = cls; |
869 | GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, | 915 | GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, |
870 | message_id, message_id, | 916 | message_id, message_id, NULL, |
871 | &store_recv_fragment_replay, | 917 | &store_recv_fragment_replay, |
872 | &store_recv_fragment_replay_result, rh); | 918 | &store_recv_fragment_replay_result, rh); |
873 | } | 919 | } |
@@ -911,6 +957,42 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) | |||
911 | 957 | ||
912 | 958 | ||
913 | /** | 959 | /** |
960 | * Initialize PSYC message header. | ||
961 | */ | ||
962 | static inline void | ||
963 | psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg, | ||
964 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) | ||
965 | { | ||
966 | uint16_t size = ntohs (mmsg->header.size); | ||
967 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); | ||
968 | |||
969 | pmsg->header.size = htons (psize); | ||
970 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
971 | pmsg->message_id = mmsg->message_id; | ||
972 | pmsg->fragment_offset = mmsg->fragment_offset; | ||
973 | pmsg->flags = htonl (flags); | ||
974 | |||
975 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); | ||
976 | } | ||
977 | |||
978 | |||
979 | /** | ||
980 | * Create a new PSYC message from a multicast message for sending it to clients. | ||
981 | */ | ||
982 | static inline struct GNUNET_PSYC_MessageHeader * | ||
983 | psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) | ||
984 | { | ||
985 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
986 | uint16_t size = ntohs (mmsg->header.size); | ||
987 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); | ||
988 | |||
989 | pmsg = GNUNET_malloc (psize); | ||
990 | psyc_msg_init (pmsg, mmsg, flags); | ||
991 | return pmsg; | ||
992 | } | ||
993 | |||
994 | |||
995 | /** | ||
914 | * Send multicast message to all clients connected to the channel. | 996 | * Send multicast message to all clients connected to the channel. |
915 | */ | 997 | */ |
916 | static void | 998 | static void |
@@ -918,24 +1000,13 @@ client_send_mcast_msg (struct Channel *chn, | |||
918 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, | 1000 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, |
919 | uint32_t flags) | 1001 | uint32_t flags) |
920 | { | 1002 | { |
921 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
922 | uint16_t size = ntohs (mmsg->header.size); | ||
923 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); | ||
924 | |||
925 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1003 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
926 | "%p Sending multicast message to client. " | 1004 | "%p Sending multicast message to client. " |
927 | "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", | 1005 | "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", |
928 | chn, GNUNET_ntohll (mmsg->fragment_id), | 1006 | chn, GNUNET_ntohll (mmsg->fragment_id), |
929 | GNUNET_ntohll (mmsg->message_id)); | 1007 | GNUNET_ntohll (mmsg->message_id)); |
930 | 1008 | ||
931 | pmsg = GNUNET_malloc (psize); | 1009 | struct GNUNET_PSYC_MessageHeader *pmsg = psyc_msg_new (mmsg, flags); |
932 | pmsg->header.size = htons (psize); | ||
933 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
934 | pmsg->message_id = mmsg->message_id; | ||
935 | pmsg->fragment_offset = mmsg->fragment_offset; | ||
936 | pmsg->flags = htonl (flags); | ||
937 | |||
938 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); | ||
939 | client_send_msg (chn, &pmsg->header); | 1010 | client_send_msg (chn, &pmsg->header); |
940 | GNUNET_free (pmsg); | 1011 | GNUNET_free (pmsg); |
941 | } | 1012 | } |
@@ -1327,12 +1398,13 @@ message_queue_drop (struct Channel *chn) | |||
1327 | * Received result of GNUNET_PSYCSTORE_fragment_store(). | 1398 | * Received result of GNUNET_PSYCSTORE_fragment_store(). |
1328 | */ | 1399 | */ |
1329 | static void | 1400 | static void |
1330 | store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg) | 1401 | store_recv_fragment_store_result (void *cls, int64_t result, |
1402 | const char *err_msg, uint16_t err_msg_size) | ||
1331 | { | 1403 | { |
1332 | struct Channel *chn = cls; | 1404 | struct Channel *chn = cls; |
1333 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1405 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1334 | "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n", | 1406 | "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n", |
1335 | chn, result, err_msg); | 1407 | chn, result, err_msg_size, err_msg); |
1336 | } | 1408 | } |
1337 | 1409 | ||
1338 | 1410 | ||
@@ -1430,7 +1502,7 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id, | |||
1430 | struct GNUNET_PSYC_CountersResultMessage res; | 1502 | struct GNUNET_PSYC_CountersResultMessage res; |
1431 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); | 1503 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); |
1432 | res.header.size = htons (sizeof (res)); | 1504 | res.header.size = htons (sizeof (res)); |
1433 | res.result_code = htonl (result - INT32_MIN); | 1505 | res.result_code = GNUNET_htonl_signed (result); |
1434 | res.max_message_id = GNUNET_htonll (max_message_id); | 1506 | res.max_message_id = GNUNET_htonll (max_message_id); |
1435 | 1507 | ||
1436 | if (GNUNET_OK == result || GNUNET_NO == result) | 1508 | if (GNUNET_OK == result || GNUNET_NO == result) |
@@ -1476,7 +1548,7 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, | |||
1476 | struct GNUNET_PSYC_CountersResultMessage res; | 1548 | struct GNUNET_PSYC_CountersResultMessage res; |
1477 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); | 1549 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); |
1478 | res.header.size = htons (sizeof (res)); | 1550 | res.header.size = htons (sizeof (res)); |
1479 | res.result_code = htonl (result - INT32_MIN); | 1551 | res.result_code = GNUNET_htonl_signed (result); |
1480 | res.max_message_id = GNUNET_htonll (max_message_id); | 1552 | res.max_message_id = GNUNET_htonll (max_message_id); |
1481 | 1553 | ||
1482 | if (GNUNET_OK == result || GNUNET_NO == result) | 1554 | if (GNUNET_OK == result || GNUNET_NO == result) |
@@ -1566,7 +1638,7 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client, | |||
1566 | struct GNUNET_PSYC_CountersResultMessage res; | 1638 | struct GNUNET_PSYC_CountersResultMessage res; |
1567 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); | 1639 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); |
1568 | res.header.size = htons (sizeof (res)); | 1640 | res.header.size = htons (sizeof (res)); |
1569 | res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN); | 1641 | res.result_code = GNUNET_htonl_signed (GNUNET_OK); |
1570 | res.max_message_id = GNUNET_htonll (mst->max_message_id); | 1642 | res.max_message_id = GNUNET_htonll (mst->max_message_id); |
1571 | 1643 | ||
1572 | GNUNET_SERVER_notification_context_add (nc, client); | 1644 | GNUNET_SERVER_notification_context_add (nc, client); |
@@ -1578,7 +1650,7 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client, | |||
1578 | "%p Client connected as master to channel %s.\n", | 1650 | "%p Client connected as master to channel %s.\n", |
1579 | mst, GNUNET_h2s (&chn->pub_key_hash)); | 1651 | mst, GNUNET_h2s (&chn->pub_key_hash)); |
1580 | 1652 | ||
1581 | struct ClientListItem *cli = GNUNET_new (struct ClientListItem); | 1653 | struct Client *cli = GNUNET_new (struct Client); |
1582 | cli->client = client; | 1654 | cli->client = client; |
1583 | GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); | 1655 | GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); |
1584 | 1656 | ||
@@ -1677,7 +1749,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
1677 | struct GNUNET_PSYC_CountersResultMessage res; | 1749 | struct GNUNET_PSYC_CountersResultMessage res; |
1678 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); | 1750 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); |
1679 | res.header.size = htons (sizeof (res)); | 1751 | res.header.size = htons (sizeof (res)); |
1680 | res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN); | 1752 | res.result_code = GNUNET_htonl_signed (GNUNET_OK); |
1681 | res.max_message_id = GNUNET_htonll (chn->max_message_id); | 1753 | res.max_message_id = GNUNET_htonll (chn->max_message_id); |
1682 | 1754 | ||
1683 | GNUNET_SERVER_notification_context_add (nc, client); | 1755 | GNUNET_SERVER_notification_context_add (nc, client); |
@@ -1716,7 +1788,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
1716 | "%p Client connected as slave to channel %s.\n", | 1788 | "%p Client connected as slave to channel %s.\n", |
1717 | slv, GNUNET_h2s (&chn->pub_key_hash)); | 1789 | slv, GNUNET_h2s (&chn->pub_key_hash)); |
1718 | 1790 | ||
1719 | struct ClientListItem *cli = GNUNET_new (struct ClientListItem); | 1791 | struct Client *cli = GNUNET_new (struct Client); |
1720 | cli->client = client; | 1792 | cli->client = client; |
1721 | GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); | 1793 | GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); |
1722 | 1794 | ||
@@ -2119,14 +2191,15 @@ struct MembershipStoreClosure | |||
2119 | * Received result of GNUNET_PSYCSTORE_membership_store() | 2191 | * Received result of GNUNET_PSYCSTORE_membership_store() |
2120 | */ | 2192 | */ |
2121 | static void | 2193 | static void |
2122 | store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg) | 2194 | store_recv_membership_store_result (void *cls, int64_t result, |
2195 | const char *err_msg, uint16_t err_msg_size) | ||
2123 | { | 2196 | { |
2124 | struct MembershipStoreClosure *mcls = cls; | 2197 | struct MembershipStoreClosure *mcls = cls; |
2125 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2198 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2126 | "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n", | 2199 | "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n", |
2127 | mcls->chn, result, err_msg); | 2200 | mcls->chn, result, err_msg_size, err_msg); |
2128 | 2201 | ||
2129 | client_send_result (mcls->client, mcls->op_id, result, err_msg); | 2202 | client_send_result (mcls->client, mcls->op_id, result, err_msg, err_msg_size); |
2130 | } | 2203 | } |
2131 | 2204 | ||
2132 | 2205 | ||
@@ -2165,36 +2238,73 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, | |||
2165 | } | 2238 | } |
2166 | 2239 | ||
2167 | 2240 | ||
2241 | /** | ||
2242 | * Received a fragment for GNUNET_PSYCSTORE_fragment_get(), | ||
2243 | * in response to a history request from a client. | ||
2244 | */ | ||
2168 | static int | 2245 | static int |
2169 | store_recv_fragment_history (void *cls, | 2246 | store_recv_fragment_history (void *cls, |
2170 | struct GNUNET_MULTICAST_MessageHeader *msg, | 2247 | struct GNUNET_MULTICAST_MessageHeader *mmsg, |
2171 | enum GNUNET_PSYCSTORE_MessageFlags flags) | 2248 | enum GNUNET_PSYCSTORE_MessageFlags flags) |
2172 | { | 2249 | { |
2173 | struct OperationClosure *opcls = cls; | 2250 | struct Operation *op = cls; |
2174 | struct Channel *chn = opcls->chn; | 2251 | if (NULL == op->client) |
2175 | client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC); | 2252 | { /* Requesting client already disconnected. */ |
2253 | return GNUNET_NO; | ||
2254 | } | ||
2255 | struct Channel *chn = op->chn; | ||
2256 | |||
2257 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
2258 | uint16_t msize = ntohs (mmsg->header.size); | ||
2259 | uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg); | ||
2260 | |||
2261 | struct GNUNET_OperationResultMessage * | ||
2262 | res = GNUNET_malloc (sizeof (*res) + psize); | ||
2263 | res->header.size = htons (sizeof (*res) + psize); | ||
2264 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT); | ||
2265 | res->op_id = op->op_id; | ||
2266 | res->result_code = GNUNET_htonll_signed (GNUNET_OK); | ||
2267 | |||
2268 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; | ||
2269 | psyc_msg_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC); | ||
2270 | memcpy (&res[1], pmsg, psize); | ||
2271 | |||
2272 | /** @todo FIXME: send only to requesting client */ | ||
2273 | client_send_msg (chn, &res->header); | ||
2176 | return GNUNET_YES; | 2274 | return GNUNET_YES; |
2177 | } | 2275 | } |
2178 | 2276 | ||
2179 | 2277 | ||
2180 | /** | 2278 | /** |
2181 | * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. | 2279 | * Received the result of GNUNET_PSYCSTORE_fragment_get(), |
2280 | * in response to a history request from a client. | ||
2182 | */ | 2281 | */ |
2183 | static void | 2282 | static void |
2184 | store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg) | 2283 | store_recv_fragment_history_result (void *cls, int64_t result, |
2284 | const char *err_msg, uint16_t err_msg_size) | ||
2185 | { | 2285 | { |
2186 | struct OperationClosure *opcls = cls; | 2286 | struct Operation *op = cls; |
2287 | if (NULL == op->client) | ||
2288 | { /* Requesting client already disconnected. */ | ||
2289 | return; | ||
2290 | } | ||
2291 | |||
2187 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2292 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2188 | "%p History replay #%" PRIu64 ": " | 2293 | "%p History replay #%" PRIu64 ": " |
2189 | "PSYCSTORE returned %" PRId64 " (%s)\n", | 2294 | "PSYCSTORE returned %" PRId64 " (%.*s)\n", |
2190 | opcls->chn, opcls->op_id, result, err_msg); | 2295 | op->chn, op->op_id, result, err_msg_size, err_msg); |
2296 | |||
2297 | if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE) | ||
2298 | { | ||
2299 | /** @todo Multicast replay request for messages not found locally. */ | ||
2300 | } | ||
2191 | 2301 | ||
2192 | client_send_result (opcls->client, opcls->op_id, result, err_msg); | 2302 | client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); |
2193 | } | 2303 | } |
2194 | 2304 | ||
2195 | 2305 | ||
2196 | /** | 2306 | /** |
2197 | * Client requests channel history from PSYCstore. | 2307 | * Client requests channel history. |
2198 | */ | 2308 | */ |
2199 | static void | 2309 | static void |
2200 | client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, | 2310 | client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, |
@@ -2204,26 +2314,39 @@ client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, | |||
2204 | chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 2314 | chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
2205 | GNUNET_assert (NULL != chn); | 2315 | GNUNET_assert (NULL != chn); |
2206 | 2316 | ||
2207 | const struct HistoryRequest * | 2317 | const struct GNUNET_PSYC_HistoryRequestMessage * |
2208 | req = (const struct HistoryRequest *) msg; | 2318 | req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg; |
2319 | uint16_t size = ntohs (msg->size); | ||
2320 | const char *method_prefix = (const char *) &req[1]; | ||
2209 | 2321 | ||
2210 | struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); | 2322 | if (size < sizeof (*req) + 1 |
2211 | opcls->client = client; | 2323 | || '\0' != method_prefix[size - sizeof (*req) - 1]) |
2212 | opcls->chn = chn; | 2324 | { |
2213 | opcls->op_id = req->op_id; | 2325 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
2326 | "%p History replay #%" PRIu64 ": " | ||
2327 | "invalid method prefix. size: %u < %u?\n", | ||
2328 | chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1); | ||
2329 | GNUNET_break (0); | ||
2330 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
2331 | return; | ||
2332 | } | ||
2333 | |||
2334 | struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags)); | ||
2214 | 2335 | ||
2215 | if (0 == req->message_limit) | 2336 | if (0 == req->message_limit) |
2216 | GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL, | 2337 | GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL, |
2217 | GNUNET_ntohll (req->start_message_id), | 2338 | GNUNET_ntohll (req->start_message_id), |
2218 | GNUNET_ntohll (req->end_message_id), | 2339 | GNUNET_ntohll (req->end_message_id), |
2340 | method_prefix, | ||
2219 | &store_recv_fragment_history, | 2341 | &store_recv_fragment_history, |
2220 | &store_recv_fragment_history_result, opcls); | 2342 | &store_recv_fragment_history_result, op); |
2221 | else | 2343 | else |
2222 | GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL, | 2344 | GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL, |
2223 | GNUNET_ntohll (req->message_limit), | 2345 | GNUNET_ntohll (req->message_limit), |
2346 | method_prefix, | ||
2224 | &store_recv_fragment_history, | 2347 | &store_recv_fragment_history, |
2225 | &store_recv_fragment_history_result, | 2348 | &store_recv_fragment_history_result, |
2226 | opcls); | 2349 | op); |
2227 | 2350 | ||
2228 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2351 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2229 | } | 2352 | } |
@@ -2236,19 +2359,19 @@ static int | |||
2236 | store_recv_state_var (void *cls, const char *name, | 2359 | store_recv_state_var (void *cls, const char *name, |
2237 | const void *value, size_t value_size) | 2360 | const void *value, size_t value_size) |
2238 | { | 2361 | { |
2239 | struct OperationClosure *opcls = cls; | 2362 | struct Operation *op = cls; |
2240 | struct OperationResult *op; | 2363 | struct GNUNET_OperationResultMessage *res; |
2241 | 2364 | ||
2242 | if (NULL != name) | 2365 | if (NULL != name) |
2243 | { | 2366 | { |
2244 | uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; | 2367 | uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; |
2245 | struct GNUNET_PSYC_MessageModifier *mod; | 2368 | struct GNUNET_PSYC_MessageModifier *mod; |
2246 | op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size); | 2369 | res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size); |
2247 | op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size); | 2370 | res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size); |
2248 | op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); | 2371 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); |
2249 | op->op_id = opcls->op_id; | 2372 | res->op_id = op->op_id; |
2250 | 2373 | ||
2251 | mod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; | 2374 | mod = (struct GNUNET_PSYC_MessageModifier *) &res[1]; |
2252 | mod->header.size = htons (sizeof (*mod) + name_size + value_size); | 2375 | mod->header.size = htons (sizeof (*mod) + name_size + value_size); |
2253 | mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | 2376 | mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); |
2254 | mod->name_size = htons (name_size); | 2377 | mod->name_size = htons (name_size); |
@@ -2260,19 +2383,20 @@ store_recv_state_var (void *cls, const char *name, | |||
2260 | else | 2383 | else |
2261 | { | 2384 | { |
2262 | struct GNUNET_MessageHeader *mod; | 2385 | struct GNUNET_MessageHeader *mod; |
2263 | op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size); | 2386 | res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size); |
2264 | op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size); | 2387 | res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size); |
2265 | op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); | 2388 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); |
2266 | op->op_id = opcls->op_id; | 2389 | res->op_id = op->op_id; |
2267 | 2390 | ||
2268 | mod = (struct GNUNET_MessageHeader *) &op[1]; | 2391 | mod = (struct GNUNET_MessageHeader *) &res[1]; |
2269 | mod->size = htons (sizeof (*mod) + value_size); | 2392 | mod->size = htons (sizeof (*mod) + value_size); |
2270 | mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | 2393 | mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); |
2271 | memcpy (&mod[1], value, value_size); | 2394 | memcpy (&mod[1], value, value_size); |
2272 | } | 2395 | } |
2273 | 2396 | ||
2274 | GNUNET_SERVER_notification_context_add (nc, opcls->client); | 2397 | // FIXME: client might have been disconnected |
2275 | GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header, | 2398 | GNUNET_SERVER_notification_context_add (nc, op->client); |
2399 | GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header, | ||
2276 | GNUNET_NO); | 2400 | GNUNET_NO); |
2277 | GNUNET_free (op); | 2401 | GNUNET_free (op); |
2278 | return GNUNET_YES; | 2402 | return GNUNET_YES; |
@@ -2284,15 +2408,17 @@ store_recv_state_var (void *cls, const char *name, | |||
2284 | * or GNUNET_PSYCSTORE_state_get_prefix() | 2408 | * or GNUNET_PSYCSTORE_state_get_prefix() |
2285 | */ | 2409 | */ |
2286 | static void | 2410 | static void |
2287 | store_recv_state_result (void *cls, int64_t result, const char *err_msg) | 2411 | store_recv_state_result (void *cls, int64_t result, |
2412 | const char *err_msg, uint16_t err_msg_size) | ||
2288 | { | 2413 | { |
2289 | struct OperationClosure *opcls = cls; | 2414 | struct Operation *op = cls; |
2290 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2415 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2291 | "%p History replay #%" PRIu64 ": " | 2416 | "%p History replay #%" PRIu64 ": " |
2292 | "PSYCSTORE returned %" PRId64 " (%s)\n", | 2417 | "PSYCSTORE returned %" PRId64 " (%.*s)\n", |
2293 | opcls->chn, opcls->op_id, result, err_msg); | 2418 | op->chn, op->op_id, result, err_msg_size, err_msg); |
2294 | 2419 | ||
2295 | client_send_result (opcls->client, opcls->op_id, result, err_msg); | 2420 | // FIXME: client might have been disconnected |
2421 | client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); | ||
2296 | } | 2422 | } |
2297 | 2423 | ||
2298 | 2424 | ||
@@ -2314,18 +2440,15 @@ client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client, | |||
2314 | const char *name = (const char *) &req[1]; | 2440 | const char *name = (const char *) &req[1]; |
2315 | if (0 == name_size || '\0' != name[name_size - 1]) | 2441 | if (0 == name_size || '\0' != name[name_size - 1]) |
2316 | { | 2442 | { |
2443 | GNUNET_break (0); | ||
2317 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | 2444 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
2318 | return; | 2445 | return; |
2319 | } | 2446 | } |
2320 | 2447 | ||
2321 | struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); | 2448 | struct Operation *op = op_add (chn, client, req->op_id, 0); |
2322 | opcls->client = client; | ||
2323 | opcls->chn = chn; | ||
2324 | opcls->op_id = req->op_id; | ||
2325 | |||
2326 | GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name, | 2449 | GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name, |
2327 | &store_recv_state_var, | 2450 | &store_recv_state_var, |
2328 | &store_recv_state_result, opcls); | 2451 | &store_recv_state_result, op); |
2329 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2452 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2330 | } | 2453 | } |
2331 | 2454 | ||
@@ -2348,20 +2471,16 @@ client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, | |||
2348 | const char *name = (const char *) &req[1]; | 2471 | const char *name = (const char *) &req[1]; |
2349 | if (0 == name_size || '\0' != name[name_size - 1]) | 2472 | if (0 == name_size || '\0' != name[name_size - 1]) |
2350 | { | 2473 | { |
2474 | GNUNET_break (0); | ||
2351 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | 2475 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
2352 | return; | 2476 | return; |
2353 | } | 2477 | } |
2354 | 2478 | ||
2355 | struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); | 2479 | struct Operation *op = op_add (chn, client, req->op_id, 0); |
2356 | opcls->client = client; | ||
2357 | opcls->chn = chn; | ||
2358 | opcls->op_id = req->op_id; | ||
2359 | |||
2360 | GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name, | 2480 | GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name, |
2361 | &store_recv_state_var, | 2481 | &store_recv_state_var, |
2362 | &store_recv_state_result, opcls); | 2482 | &store_recv_state_result, op); |
2363 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2483 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2364 | |||
2365 | } | 2484 | } |
2366 | 2485 | ||
2367 | 2486 | ||
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 4bc92532f..e85e14c0e 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h | |||
@@ -171,42 +171,6 @@ struct StateRequest | |||
171 | /**** service -> library ****/ | 171 | /**** service -> library ****/ |
172 | 172 | ||
173 | 173 | ||
174 | /** | ||
175 | * Answer from service to client about last operation. | ||
176 | */ | ||
177 | struct OperationResult | ||
178 | { | ||
179 | /** | ||
180 | * Types: | ||
181 | * - GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE | ||
182 | * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STORY_RESULT | ||
183 | * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_RESULT | ||
184 | */ | ||
185 | struct GNUNET_MessageHeader header; | ||
186 | |||
187 | uint32_t reserved GNUNET_PACKED; | ||
188 | |||
189 | /** | ||
190 | * Operation ID. | ||
191 | */ | ||
192 | uint64_t op_id GNUNET_PACKED; | ||
193 | |||
194 | /** | ||
195 | * Status code for the operation. | ||
196 | */ | ||
197 | uint64_t result_code GNUNET_PACKED; | ||
198 | |||
199 | /* Followed by: | ||
200 | * - on error: NUL-terminated error message | ||
201 | * - on success: one of the following message types | ||
202 | * | ||
203 | * For a STATE_RESULT, one of: | ||
204 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER | ||
205 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT | ||
206 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END | ||
207 | */ | ||
208 | }; | ||
209 | |||
210 | GNUNET_NETWORK_STRUCT_END | 174 | GNUNET_NETWORK_STRUCT_END |
211 | 175 | ||
212 | #endif | 176 | #endif |
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index ce994b272..7839aaf9e 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -43,33 +43,6 @@ | |||
43 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) | 43 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) |
44 | 44 | ||
45 | 45 | ||
46 | struct OperationListItem | ||
47 | { | ||
48 | struct OperationListItem *prev; | ||
49 | struct OperationListItem *next; | ||
50 | |||
51 | /** | ||
52 | * Operation ID. | ||
53 | */ | ||
54 | uint64_t op_id; | ||
55 | |||
56 | /** | ||
57 | * Continuation to invoke with the result of an operation. | ||
58 | */ | ||
59 | GNUNET_PSYC_ResultCallback result_cb; | ||
60 | |||
61 | /** | ||
62 | * State variable result callback. | ||
63 | */ | ||
64 | GNUNET_PSYC_StateVarCallback state_var_cb; | ||
65 | |||
66 | /** | ||
67 | * Closure for the callbacks. | ||
68 | */ | ||
69 | void *cls; | ||
70 | }; | ||
71 | |||
72 | |||
73 | /** | 46 | /** |
74 | * Handle to access PSYC channel operations for both the master and slaves. | 47 | * Handle to access PSYC channel operations for both the master and slaves. |
75 | */ | 48 | */ |
@@ -111,21 +84,6 @@ struct GNUNET_PSYC_Channel | |||
111 | void *disconnect_cls; | 84 | void *disconnect_cls; |
112 | 85 | ||
113 | /** | 86 | /** |
114 | * First operation in the linked list. | ||
115 | */ | ||
116 | struct OperationListItem *op_head; | ||
117 | |||
118 | /** | ||
119 | * Last operation in the linked list. | ||
120 | */ | ||
121 | struct OperationListItem *op_tail; | ||
122 | |||
123 | /** | ||
124 | * Last operation ID used. | ||
125 | */ | ||
126 | uint64_t last_op_id; | ||
127 | |||
128 | /** | ||
129 | * Are we polling for incoming messages right now? | 87 | * Are we polling for incoming messages right now? |
130 | */ | 88 | */ |
131 | uint8_t in_receive; | 89 | uint8_t in_receive; |
@@ -204,83 +162,62 @@ struct GNUNET_PSYC_SlaveTransmitHandle | |||
204 | }; | 162 | }; |
205 | 163 | ||
206 | 164 | ||
207 | /** | 165 | struct GNUNET_PSYC_HistoryRequest |
208 | * Get a fresh operation ID to distinguish between PSYCstore requests. | ||
209 | * | ||
210 | * @param h Handle to the PSYCstore service. | ||
211 | * @return next operation id to use | ||
212 | */ | ||
213 | static uint64_t | ||
214 | op_get_next_id (struct GNUNET_PSYC_Channel *chn) | ||
215 | { | ||
216 | return ++chn->last_op_id; | ||
217 | } | ||
218 | |||
219 | |||
220 | /** | ||
221 | * Find operation by ID. | ||
222 | * | ||
223 | * @return Operation, or NULL if none found. | ||
224 | */ | ||
225 | static struct OperationListItem * | ||
226 | op_find_by_id (struct GNUNET_PSYC_Channel *chn, uint64_t op_id) | ||
227 | { | 166 | { |
228 | struct OperationListItem *op = chn->op_head; | 167 | /** |
229 | while (NULL != op) | 168 | * Channel. |
230 | { | 169 | */ |
231 | if (op->op_id == op_id) | 170 | struct GNUNET_PSYC_Channel *chn; |
232 | return op; | ||
233 | op = op->next; | ||
234 | } | ||
235 | return NULL; | ||
236 | } | ||
237 | 171 | ||
172 | /** | ||
173 | * Operation ID. | ||
174 | */ | ||
175 | uint64_t op_id; | ||
238 | 176 | ||
239 | static uint64_t | 177 | /** |
240 | op_add (struct GNUNET_PSYC_Channel *chn, GNUNET_PSYC_ResultCallback result_cb, | 178 | * Message handler. |
241 | void *cls) | 179 | */ |
242 | { | 180 | struct GNUNET_PSYC_ReceiveHandle *recv; |
243 | if (NULL == result_cb) | ||
244 | return 0; | ||
245 | 181 | ||
246 | struct OperationListItem *op = GNUNET_malloc (sizeof (*op)); | 182 | /** |
247 | op->op_id = op_get_next_id (chn); | 183 | * Function to call when the operation finished. |
248 | op->result_cb = result_cb; | 184 | */ |
249 | op->cls = cls; | 185 | GNUNET_ResultCallback result_cb; |
250 | GNUNET_CONTAINER_DLL_insert_tail (chn->op_head, chn->op_tail, op); | ||
251 | 186 | ||
252 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 187 | /** |
253 | "%p Added operation #%" PRIu64 "\n", chn, op->op_id); | 188 | * Closure for @a result_cb. |
254 | return op->op_id; | 189 | */ |
255 | } | 190 | void *cls; |
191 | }; | ||
256 | 192 | ||
257 | 193 | ||
258 | static int | 194 | struct GNUNET_PSYC_StateRequest |
259 | op_result (struct GNUNET_PSYC_Channel *chn, uint64_t op_id, | ||
260 | int64_t result_code, const char *err_msg) | ||
261 | { | 195 | { |
262 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 196 | /** |
263 | "%p Received result for operation #%" PRIu64 ": %" PRId64 " (%s)\n", | 197 | * Channel. |
264 | chn, op_id, result_code, err_msg); | 198 | */ |
265 | if (0 == op_id) | 199 | struct GNUNET_PSYC_Channel *chn; |
266 | return GNUNET_NO; | ||
267 | 200 | ||
268 | struct OperationListItem *op = op_find_by_id (chn, op_id); | 201 | /** |
269 | if (NULL == op) | 202 | * Operation ID. |
270 | { | 203 | */ |
271 | LOG (GNUNET_ERROR_TYPE_WARNING, | 204 | uint64_t op_id; |
272 | "Could not find operation #%" PRIu64 "\n", op_id); | ||
273 | return GNUNET_NO; | ||
274 | } | ||
275 | 205 | ||
276 | GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op); | 206 | /** |
207 | * State variable result callback. | ||
208 | */ | ||
209 | GNUNET_PSYC_StateVarCallback var_cb; | ||
277 | 210 | ||
278 | if (NULL != op->result_cb) | 211 | /** |
279 | op->result_cb (op->cls, result_code, err_msg); | 212 | * Function to call when the operation finished. |
213 | */ | ||
214 | GNUNET_ResultCallback result_cb; | ||
280 | 215 | ||
281 | GNUNET_free (op); | 216 | /** |
282 | return GNUNET_YES; | 217 | * Closure for @a result_cb. |
283 | } | 218 | */ |
219 | void *cls; | ||
220 | }; | ||
284 | 221 | ||
285 | 222 | ||
286 | static void | 223 | static void |
@@ -313,22 +250,97 @@ channel_recv_result (void *cls, | |||
313 | struct GNUNET_PSYC_Channel * | 250 | struct GNUNET_PSYC_Channel * |
314 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | 251 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); |
315 | 252 | ||
253 | const struct GNUNET_OperationResultMessage * | ||
254 | res = (const struct GNUNET_OperationResultMessage *) msg; | ||
255 | |||
316 | uint16_t size = ntohs (msg->size); | 256 | uint16_t size = ntohs (msg->size); |
317 | const struct OperationResult *res = (const struct OperationResult *) msg; | 257 | if (size < sizeof (*res)) |
318 | const char *err_msg = NULL; | 258 | { /* Error, message too small. */ |
259 | GNUNET_break (0); | ||
260 | return; | ||
261 | } | ||
319 | 262 | ||
320 | if (sizeof (struct OperationResult) < size) | 263 | uint16_t data_size = size - sizeof (*res); |
321 | { | 264 | const char *data = (0 < data_size) ? (void *) &res[1] : NULL; |
322 | err_msg = (const char *) &res[1]; | 265 | GNUNET_CLIENT_MANAGER_op_result (chn->client, GNUNET_ntohll (res->op_id), |
323 | if ('\0' != err_msg[size - sizeof (struct OperationResult) - 1]) | 266 | GNUNET_ntohll_signed (res->result_code), |
324 | { | 267 | data, data_size); |
325 | GNUNET_break (0); | 268 | } |
326 | err_msg = NULL; | 269 | |
327 | } | 270 | |
271 | static void | ||
272 | op_recv_history_result (void *cls, int64_t result, | ||
273 | const void *data, uint16_t data_size) | ||
274 | { | ||
275 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
276 | "Received history replay result: %" PRId64 ".\n", result); | ||
277 | |||
278 | struct GNUNET_PSYC_HistoryRequest *hist = cls; | ||
279 | |||
280 | if (NULL != hist->result_cb) | ||
281 | hist->result_cb (hist->cls, result, data, data_size); | ||
282 | |||
283 | GNUNET_PSYC_receive_destroy (hist->recv); | ||
284 | GNUNET_free (hist); | ||
285 | } | ||
286 | |||
287 | |||
288 | static void | ||
289 | op_recv_state_result (void *cls, int64_t result, | ||
290 | const void *data, uint16_t data_size) | ||
291 | { | ||
292 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
293 | "Received state request result: %" PRId64 ".\n", result); | ||
294 | |||
295 | struct GNUNET_PSYC_StateRequest *sr = cls; | ||
296 | |||
297 | if (NULL != sr->result_cb) | ||
298 | sr->result_cb (sr->cls, result, data, data_size); | ||
299 | |||
300 | GNUNET_free (sr); | ||
301 | } | ||
302 | |||
303 | |||
304 | static void | ||
305 | channel_recv_history_result (void *cls, | ||
306 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
307 | const struct GNUNET_MessageHeader *msg) | ||
308 | { | ||
309 | struct GNUNET_PSYC_Channel * | ||
310 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | ||
311 | |||
312 | const struct GNUNET_OperationResultMessage * | ||
313 | res = (const struct GNUNET_OperationResultMessage *) msg; | ||
314 | struct GNUNET_PSYC_MessageHeader * | ||
315 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; | ||
316 | |||
317 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
318 | "%p Received historic fragment for message #%" PRIu64 ".\n", | ||
319 | chn, GNUNET_ntohll (pmsg->message_id)); | ||
320 | |||
321 | GNUNET_ResultCallback result_cb = NULL; | ||
322 | struct GNUNET_PSYC_HistoryRequest *hist = NULL; | ||
323 | |||
324 | if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client, | ||
325 | GNUNET_ntohll (res->op_id), | ||
326 | &result_cb, (void *) &hist)) | ||
327 | { /* Operation not found. */ | ||
328 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
329 | "%p Replay operation not found for historic fragment of message #%" | ||
330 | PRIu64 ".\n", | ||
331 | chn, GNUNET_ntohll (pmsg->message_id)); | ||
332 | return; | ||
328 | } | 333 | } |
329 | 334 | ||
330 | op_result (chn, GNUNET_ntohll (res->op_id), | 335 | uint16_t size = ntohs (msg->size); |
331 | GNUNET_ntohll (res->result_code) + INT64_MIN, err_msg); | 336 | if (size < sizeof (*res) + sizeof (*pmsg)) |
337 | { /* Error, message too small. */ | ||
338 | GNUNET_break (0); | ||
339 | return; | ||
340 | } | ||
341 | |||
342 | GNUNET_PSYC_receive_message (hist->recv, | ||
343 | (const struct GNUNET_PSYC_MessageHeader *) pmsg); | ||
332 | } | 344 | } |
333 | 345 | ||
334 | 346 | ||
@@ -340,12 +352,21 @@ channel_recv_state_result (void *cls, | |||
340 | struct GNUNET_PSYC_Channel * | 352 | struct GNUNET_PSYC_Channel * |
341 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | 353 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); |
342 | 354 | ||
343 | const struct OperationResult *res = (const struct OperationResult *) msg; | 355 | const struct GNUNET_OperationResultMessage * |
344 | struct OperationListItem *op = op_find_by_id (chn, GNUNET_ntohll (res->op_id)); | 356 | res = (const struct GNUNET_OperationResultMessage *) msg; |
345 | if (NULL == op || NULL == op->state_var_cb) | 357 | |
358 | GNUNET_ResultCallback result_cb = NULL; | ||
359 | struct GNUNET_PSYC_StateRequest *sr = NULL; | ||
360 | |||
361 | if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client, | ||
362 | GNUNET_ntohll (res->op_id), | ||
363 | &result_cb, (void *) &sr)) | ||
364 | { /* Operation not found. */ | ||
346 | return; | 365 | return; |
366 | } | ||
347 | 367 | ||
348 | const struct GNUNET_MessageHeader *modc = (struct GNUNET_MessageHeader *) &op[1]; | 368 | const struct GNUNET_MessageHeader * |
369 | modc = (struct GNUNET_MessageHeader *) &res[1]; | ||
349 | uint16_t modc_size = ntohs (modc->size); | 370 | uint16_t modc_size = ntohs (modc->size); |
350 | if (ntohs (msg->size) - sizeof (*msg) != modc_size) | 371 | if (ntohs (msg->size) - sizeof (*msg) != modc_size) |
351 | { | 372 | { |
@@ -366,13 +387,13 @@ channel_recv_state_result (void *cls, | |||
366 | GNUNET_break (0); | 387 | GNUNET_break (0); |
367 | return; | 388 | return; |
368 | } | 389 | } |
369 | op->state_var_cb (op->cls, name, name + name_size, ntohs (mod->value_size)); | 390 | sr->var_cb (sr->cls, name, name + name_size, ntohs (mod->value_size)); |
370 | break; | 391 | break; |
371 | } | 392 | } |
372 | 393 | ||
373 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | 394 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: |
374 | op->state_var_cb (op->cls, NULL, (const char *) &modc[1], | 395 | sr->var_cb (sr->cls, NULL, (const char *) &modc[1], |
375 | modc_size - sizeof (*modc)); | 396 | modc_size - sizeof (*modc)); |
376 | break; | 397 | break; |
377 | } | 398 | } |
378 | } | 399 | } |
@@ -412,11 +433,12 @@ master_recv_start_ack (void *cls, | |||
412 | 433 | ||
413 | struct GNUNET_PSYC_CountersResultMessage * | 434 | struct GNUNET_PSYC_CountersResultMessage * |
414 | cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; | 435 | cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; |
415 | int32_t result = ntohl (cres->result_code) + INT32_MIN; | 436 | int32_t result = GNUNET_ntohl_signed (cres->result_code); |
416 | if (GNUNET_OK != result && GNUNET_NO != result) | 437 | if (GNUNET_OK != result && GNUNET_NO != result) |
417 | { | 438 | { |
418 | LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master.\n"); | 439 | LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master: %ld\n", result); |
419 | GNUNET_break (0); | 440 | GNUNET_break (0); |
441 | /* FIXME: disconnect */ | ||
420 | } | 442 | } |
421 | if (NULL != mst->start_cb) | 443 | if (NULL != mst->start_cb) |
422 | mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); | 444 | mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); |
@@ -464,11 +486,12 @@ slave_recv_join_ack (void *cls, | |||
464 | sizeof (struct GNUNET_PSYC_Channel)); | 486 | sizeof (struct GNUNET_PSYC_Channel)); |
465 | struct GNUNET_PSYC_CountersResultMessage * | 487 | struct GNUNET_PSYC_CountersResultMessage * |
466 | cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; | 488 | cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; |
467 | int32_t result = ntohl (cres->result_code) + INT32_MIN; | 489 | int32_t result = GNUNET_ntohl_signed (cres->result_code); |
468 | if (GNUNET_YES != result && GNUNET_NO != result) | 490 | if (GNUNET_YES != result && GNUNET_NO != result) |
469 | { | 491 | { |
470 | LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n"); | 492 | LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n"); |
471 | GNUNET_break (0); | 493 | GNUNET_break (0); |
494 | /* FIXME: disconnect */ | ||
472 | } | 495 | } |
473 | if (NULL != slv->connect_cb) | 496 | if (NULL != slv->connect_cb) |
474 | slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); | 497 | slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); |
@@ -513,13 +536,17 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = | |||
513 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | 536 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, |
514 | sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, | 537 | sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, |
515 | 538 | ||
539 | { &channel_recv_history_result, NULL, | ||
540 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | ||
541 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
542 | |||
516 | { &channel_recv_state_result, NULL, | 543 | { &channel_recv_state_result, NULL, |
517 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | 544 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, |
518 | sizeof (struct OperationResult), GNUNET_YES }, | 545 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, |
519 | 546 | ||
520 | { &channel_recv_result, NULL, | 547 | { &channel_recv_result, NULL, |
521 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | 548 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, |
522 | sizeof (struct OperationResult), GNUNET_YES }, | 549 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, |
523 | 550 | ||
524 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | 551 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, |
525 | 552 | ||
@@ -545,13 +572,17 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = | |||
545 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, | 572 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, |
546 | sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, | 573 | sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, |
547 | 574 | ||
575 | { &channel_recv_history_result, NULL, | ||
576 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | ||
577 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
578 | |||
548 | { &channel_recv_state_result, NULL, | 579 | { &channel_recv_state_result, NULL, |
549 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | 580 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, |
550 | sizeof (struct OperationResult), GNUNET_YES }, | 581 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, |
551 | 582 | ||
552 | { &channel_recv_result, NULL, | 583 | { &channel_recv_result, NULL, |
553 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | 584 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, |
554 | sizeof (struct OperationResult), GNUNET_YES }, | 585 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, |
555 | 586 | ||
556 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | 587 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, |
557 | 588 | ||
@@ -1011,17 +1042,28 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv) | |||
1011 | * correctly; not doing so correctly will result in either denying other slaves | 1042 | * correctly; not doing so correctly will result in either denying other slaves |
1012 | * access or offering access to channel data to non-members. | 1043 | * access or offering access to channel data to non-members. |
1013 | * | 1044 | * |
1014 | * @param channel Channel handle. | 1045 | * @param chn |
1015 | * @param slave_key Identity of channel slave to add. | 1046 | * Channel handle. |
1016 | * @param announced_at ID of the message that announced the membership change. | 1047 | * @param slave_key |
1017 | * @param effective_since Addition of slave is in effect since this message ID. | 1048 | * Identity of channel slave to add. |
1049 | * @param announced_at | ||
1050 | * ID of the message that announced the membership change. | ||
1051 | * @param effective_since | ||
1052 | * Addition of slave is in effect since this message ID. | ||
1053 | * @param result_cb | ||
1054 | * Function to call with the result of the operation. | ||
1055 | * The @e result_code argument is #GNUNET_OK on success, or | ||
1056 | * #GNUNET_SYSERR on error. In case of an error, the @e data argument | ||
1057 | * can contain an optional error message. | ||
1058 | * @param cls | ||
1059 | * Closure for @a result_cb. | ||
1018 | */ | 1060 | */ |
1019 | void | 1061 | void |
1020 | GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, | 1062 | GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, |
1021 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | 1063 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, |
1022 | uint64_t announced_at, | 1064 | uint64_t announced_at, |
1023 | uint64_t effective_since, | 1065 | uint64_t effective_since, |
1024 | GNUNET_PSYC_ResultCallback result_cb, | 1066 | GNUNET_ResultCallback result_cb, |
1025 | void *cls) | 1067 | void *cls) |
1026 | { | 1068 | { |
1027 | struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); | 1069 | struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); |
@@ -1031,7 +1073,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, | |||
1031 | req->announced_at = GNUNET_htonll (announced_at); | 1073 | req->announced_at = GNUNET_htonll (announced_at); |
1032 | req->effective_since = GNUNET_htonll (effective_since); | 1074 | req->effective_since = GNUNET_htonll (effective_since); |
1033 | req->did_join = GNUNET_YES; | 1075 | req->did_join = GNUNET_YES; |
1034 | req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); | 1076 | req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client, |
1077 | result_cb, cls)); | ||
1035 | 1078 | ||
1036 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1079 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); |
1037 | } | 1080 | } |
@@ -1054,15 +1097,25 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, | |||
1054 | * denying members access or offering access to channel data to | 1097 | * denying members access or offering access to channel data to |
1055 | * non-members. | 1098 | * non-members. |
1056 | * | 1099 | * |
1057 | * @param channel Channel handle. | 1100 | * @param chn |
1058 | * @param slave_key Identity of channel slave to remove. | 1101 | * Channel handle. |
1059 | * @param announced_at ID of the message that announced the membership change. | 1102 | * @param slave_key |
1103 | * Identity of channel slave to remove. | ||
1104 | * @param announced_at | ||
1105 | * ID of the message that announced the membership change. | ||
1106 | * @param result_cb | ||
1107 | * Function to call with the result of the operation. | ||
1108 | * The @e result_code argument is #GNUNET_OK on success, or | ||
1109 | * #GNUNET_SYSERR on error. In case of an error, the @e data argument | ||
1110 | * can contain an optional error message. | ||
1111 | * @param cls | ||
1112 | * Closure for @a result_cb. | ||
1060 | */ | 1113 | */ |
1061 | void | 1114 | void |
1062 | GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, | 1115 | GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, |
1063 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | 1116 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, |
1064 | uint64_t announced_at, | 1117 | uint64_t announced_at, |
1065 | GNUNET_PSYC_ResultCallback result_cb, | 1118 | GNUNET_ResultCallback result_cb, |
1066 | void *cls) | 1119 | void *cls) |
1067 | { | 1120 | { |
1068 | struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); | 1121 | struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); |
@@ -1071,17 +1124,62 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, | |||
1071 | req->slave_key = *slave_key; | 1124 | req->slave_key = *slave_key; |
1072 | req->announced_at = GNUNET_htonll (announced_at); | 1125 | req->announced_at = GNUNET_htonll (announced_at); |
1073 | req->did_join = GNUNET_NO; | 1126 | req->did_join = GNUNET_NO; |
1074 | req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); | 1127 | req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client, |
1128 | result_cb, cls)); | ||
1129 | |||
1130 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | ||
1131 | } | ||
1132 | |||
1133 | |||
1134 | static struct GNUNET_PSYC_HistoryRequest * | ||
1135 | channel_history_replay (struct GNUNET_PSYC_Channel *chn, | ||
1136 | uint64_t start_message_id, | ||
1137 | uint64_t end_message_id, | ||
1138 | uint64_t message_limit, | ||
1139 | const char *method_prefix, | ||
1140 | uint32_t flags, | ||
1141 | GNUNET_PSYC_MessageCallback message_cb, | ||
1142 | GNUNET_PSYC_MessagePartCallback message_part_cb, | ||
1143 | GNUNET_ResultCallback result_cb, | ||
1144 | void *cls) | ||
1145 | { | ||
1146 | struct GNUNET_PSYC_HistoryRequestMessage *req; | ||
1147 | struct GNUNET_PSYC_HistoryRequest *hist = GNUNET_malloc (sizeof (*hist)); | ||
1148 | hist->chn = chn; | ||
1149 | hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); | ||
1150 | hist->result_cb = result_cb; | ||
1151 | hist->cls = cls; | ||
1152 | hist->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client, | ||
1153 | &op_recv_history_result, hist); | ||
1154 | |||
1155 | GNUNET_assert (NULL != method_prefix); | ||
1156 | uint16_t method_size = strnlen (method_prefix, | ||
1157 | GNUNET_SERVER_MAX_MESSAGE_SIZE | ||
1158 | - sizeof (*req)) + 1; | ||
1159 | GNUNET_assert ('\0' == method_prefix[method_size - 1]); | ||
1160 | req = GNUNET_malloc (sizeof (*req) + method_size); | ||
1161 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); | ||
1162 | req->header.size = htons (sizeof (*req) + method_size); | ||
1163 | req->start_message_id = GNUNET_htonll (start_message_id); | ||
1164 | req->end_message_id = GNUNET_htonll (end_message_id); | ||
1165 | req->message_limit = GNUNET_htonll (message_limit); | ||
1166 | req->flags = htonl (flags); | ||
1167 | req->op_id = GNUNET_htonll (hist->op_id); | ||
1168 | memcpy (&req[1], method_prefix, method_size); | ||
1075 | 1169 | ||
1076 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1170 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); |
1171 | return hist; | ||
1077 | } | 1172 | } |
1078 | 1173 | ||
1079 | 1174 | ||
1080 | /** | 1175 | /** |
1081 | * Request to replay a part of the message history of the channel. | 1176 | * Request to replay a part of the message history of the channel. |
1082 | * | 1177 | * |
1083 | * Historic messages (but NOT the state at the time) will be replayed (given to | 1178 | * Historic messages (but NOT the state at the time) will be replayed and given |
1084 | * the normal method handlers) if available and if access is permitted. | 1179 | * to the normal method handlers with a #GNUNET_PSYC_MESSAGE_HISTORIC flag set. |
1180 | * | ||
1181 | * Messages are retrieved from the local PSYCstore if available, | ||
1182 | * otherwise requested from the network. | ||
1085 | * | 1183 | * |
1086 | * @param channel | 1184 | * @param channel |
1087 | * Which channel should be replayed? | 1185 | * Which channel should be replayed? |
@@ -1089,8 +1187,10 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, | |||
1089 | * Earliest interesting point in history. | 1187 | * Earliest interesting point in history. |
1090 | * @param end_message_id | 1188 | * @param end_message_id |
1091 | * Last (inclusive) interesting point in history. | 1189 | * Last (inclusive) interesting point in history. |
1092 | * FIXME: @param method_prefix | 1190 | * @param method_prefix |
1093 | * Retrieve only messages with a matching method prefix. | 1191 | * Retrieve only messages with a matching method prefix. |
1192 | * @param flags | ||
1193 | * OR'ed enum GNUNET_PSYC_HistoryReplayFlags | ||
1094 | * @param result_cb | 1194 | * @param result_cb |
1095 | * Function to call when the requested history has been fully replayed. | 1195 | * Function to call when the requested history has been fully replayed. |
1096 | * @param cls | 1196 | * @param cls |
@@ -1098,22 +1198,20 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, | |||
1098 | * | 1198 | * |
1099 | * @return Handle to cancel history replay operation. | 1199 | * @return Handle to cancel history replay operation. |
1100 | */ | 1200 | */ |
1101 | void | 1201 | struct GNUNET_PSYC_HistoryRequest * |
1102 | GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn, | 1202 | GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn, |
1103 | uint64_t start_message_id, | 1203 | uint64_t start_message_id, |
1104 | uint64_t end_message_id, | 1204 | uint64_t end_message_id, |
1105 | /* FIXME: const char *method_prefix, */ | 1205 | const char *method_prefix, |
1106 | GNUNET_PSYC_ResultCallback result_cb, | 1206 | uint32_t flags, |
1207 | GNUNET_PSYC_MessageCallback message_cb, | ||
1208 | GNUNET_PSYC_MessagePartCallback message_part_cb, | ||
1209 | GNUNET_ResultCallback result_cb, | ||
1107 | void *cls) | 1210 | void *cls) |
1108 | { | 1211 | { |
1109 | struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); | 1212 | return channel_history_replay (chn, start_message_id, end_message_id, 0, |
1110 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); | 1213 | method_prefix, flags, |
1111 | req->header.size = htons (sizeof (*req)); | 1214 | message_cb, message_part_cb, result_cb, cls); |
1112 | req->start_message_id = GNUNET_htonll (start_message_id); | ||
1113 | req->end_message_id = GNUNET_htonll (end_message_id); | ||
1114 | req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); | ||
1115 | |||
1116 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | ||
1117 | } | 1215 | } |
1118 | 1216 | ||
1119 | 1217 | ||
@@ -1127,8 +1225,11 @@ GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn, | |||
1127 | * Which channel should be replayed? | 1225 | * Which channel should be replayed? |
1128 | * @param message_limit | 1226 | * @param message_limit |
1129 | * Maximum number of messages to replay. | 1227 | * Maximum number of messages to replay. |
1130 | * FIXME: @param method_prefix | 1228 | * @param method_prefix |
1131 | * Retrieve only messages with a matching method prefix. | 1229 | * Retrieve only messages with a matching method prefix. |
1230 | * Use NULL or "" to retrieve all. | ||
1231 | * @param flags | ||
1232 | * OR'ed enum GNUNET_PSYC_HistoryReplayFlags | ||
1132 | * @param result_cb | 1233 | * @param result_cb |
1133 | * Function to call when the requested history has been fully replayed. | 1234 | * Function to call when the requested history has been fully replayed. |
1134 | * @param cls | 1235 | * @param cls |
@@ -1136,20 +1237,78 @@ GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn, | |||
1136 | * | 1237 | * |
1137 | * @return Handle to cancel history replay operation. | 1238 | * @return Handle to cancel history replay operation. |
1138 | */ | 1239 | */ |
1139 | void | 1240 | struct GNUNET_PSYC_HistoryRequest * |
1140 | GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn, | 1241 | GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn, |
1141 | uint64_t message_limit, | 1242 | uint64_t message_limit, |
1142 | /* FIXME: const char *method_prefix, */ | 1243 | const char *method_prefix, |
1143 | GNUNET_PSYC_ResultCallback result_cb, | 1244 | uint32_t flags, |
1245 | GNUNET_PSYC_MessageCallback message_cb, | ||
1246 | GNUNET_PSYC_MessagePartCallback message_part_cb, | ||
1247 | GNUNET_ResultCallback result_cb, | ||
1144 | void *cls) | 1248 | void *cls) |
1145 | { | 1249 | { |
1146 | struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); | 1250 | return channel_history_replay (chn, 0, 0, message_limit, method_prefix, flags, |
1147 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); | 1251 | message_cb, message_part_cb, result_cb, cls); |
1148 | req->header.size = htons (sizeof (*req)); | 1252 | } |
1149 | req->message_limit = GNUNET_htonll (message_limit); | 1253 | |
1150 | req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); | 1254 | |
1255 | void | ||
1256 | GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel, | ||
1257 | struct GNUNET_PSYC_HistoryRequest *hist) | ||
1258 | { | ||
1259 | GNUNET_PSYC_receive_destroy (hist->recv); | ||
1260 | GNUNET_CLIENT_MANAGER_op_cancel (hist->chn->client, hist->op_id); | ||
1261 | GNUNET_free (hist); | ||
1262 | } | ||
1263 | |||
1264 | |||
1265 | /** | ||
1266 | * Retrieve the best matching channel state variable. | ||
1267 | * | ||
1268 | * If the requested variable name is not present in the state, the nearest | ||
1269 | * less-specific name is matched; for example, requesting "_a_b" will match "_a" | ||
1270 | * if "_a_b" does not exist. | ||
1271 | * | ||
1272 | * @param channel | ||
1273 | * Channel handle. | ||
1274 | * @param full_name | ||
1275 | * Full name of the requested variable. | ||
1276 | * The actual variable returned might have a shorter name. | ||
1277 | * @param var_cb | ||
1278 | * Function called once when a matching state variable is found. | ||
1279 | * Not called if there's no matching state variable. | ||
1280 | * @param result_cb | ||
1281 | * Function called after the operation finished. | ||
1282 | * (i.e. all state variables have been returned via @a state_cb) | ||
1283 | * @param cls | ||
1284 | * Closure for the callbacks. | ||
1285 | */ | ||
1286 | static struct GNUNET_PSYC_StateRequest * | ||
1287 | channel_state_get (struct GNUNET_PSYC_Channel *chn, | ||
1288 | uint16_t type, const char *name, | ||
1289 | GNUNET_PSYC_StateVarCallback var_cb, | ||
1290 | GNUNET_ResultCallback result_cb, void *cls) | ||
1291 | { | ||
1292 | struct StateRequest *req; | ||
1293 | struct GNUNET_PSYC_StateRequest *sr = GNUNET_malloc (sizeof (*sr)); | ||
1294 | sr->chn = chn; | ||
1295 | sr->var_cb = var_cb; | ||
1296 | sr->result_cb = result_cb; | ||
1297 | sr->cls = cls; | ||
1298 | sr->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client, | ||
1299 | &op_recv_state_result, sr); | ||
1300 | |||
1301 | GNUNET_assert (NULL != name); | ||
1302 | size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE | ||
1303 | - sizeof (*req)) + 1; | ||
1304 | req = GNUNET_malloc (sizeof (*req) + name_size); | ||
1305 | req->header.type = htons (type); | ||
1306 | req->header.size = htons (sizeof (*req) + name_size); | ||
1307 | req->op_id = GNUNET_htonll (sr->op_id); | ||
1308 | memcpy (&req[1], name, name_size); | ||
1151 | 1309 | ||
1152 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1310 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); |
1311 | return sr; | ||
1153 | } | 1312 | } |
1154 | 1313 | ||
1155 | 1314 | ||
@@ -1174,21 +1333,16 @@ GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn, | |||
1174 | * @param cls | 1333 | * @param cls |
1175 | * Closure for the callbacks. | 1334 | * Closure for the callbacks. |
1176 | */ | 1335 | */ |
1177 | void | 1336 | struct GNUNET_PSYC_StateRequest * |
1178 | GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn, | 1337 | GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn, |
1179 | const char *full_name, | 1338 | const char *full_name, |
1180 | GNUNET_PSYC_StateVarCallback var_cb, | 1339 | GNUNET_PSYC_StateVarCallback var_cb, |
1181 | GNUNET_PSYC_ResultCallback result_cb, | 1340 | GNUNET_ResultCallback result_cb, |
1182 | void *cls) | 1341 | void *cls) |
1183 | { | 1342 | { |
1184 | size_t name_size = strlen (full_name) + 1; | 1343 | return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, |
1185 | struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); | 1344 | full_name, var_cb, result_cb, cls); |
1186 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET); | ||
1187 | req->header.size = htons (sizeof (*req) + name_size); | ||
1188 | req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); | ||
1189 | memcpy (&req[1], full_name, name_size); | ||
1190 | 1345 | ||
1191 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | ||
1192 | } | 1346 | } |
1193 | 1347 | ||
1194 | 1348 | ||
@@ -1215,21 +1369,29 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn, | |||
1215 | * @param cls | 1369 | * @param cls |
1216 | * Closure for the callbacks. | 1370 | * Closure for the callbacks. |
1217 | */ | 1371 | */ |
1218 | void | 1372 | struct GNUNET_PSYC_StateRequest * |
1219 | GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn, | 1373 | GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn, |
1220 | const char *name_prefix, | 1374 | const char *name_prefix, |
1221 | GNUNET_PSYC_StateVarCallback var_cb, | 1375 | GNUNET_PSYC_StateVarCallback var_cb, |
1222 | GNUNET_PSYC_ResultCallback result_cb, | 1376 | GNUNET_ResultCallback result_cb, |
1223 | void *cls) | 1377 | void *cls) |
1224 | { | 1378 | { |
1225 | size_t name_size = strlen (name_prefix) + 1; | 1379 | return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, |
1226 | struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); | 1380 | name_prefix, var_cb, result_cb, cls); |
1227 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET); | 1381 | } |
1228 | req->header.size = htons (sizeof (*req) + name_size); | ||
1229 | req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); | ||
1230 | memcpy (&req[1], name_prefix, name_size); | ||
1231 | 1382 | ||
1232 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1383 | |
1384 | /** | ||
1385 | * Cancel a state request operation. | ||
1386 | * | ||
1387 | * @param sr | ||
1388 | * Handle for the operation to cancel. | ||
1389 | */ | ||
1390 | void | ||
1391 | GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr) | ||
1392 | { | ||
1393 | GNUNET_CLIENT_MANAGER_op_cancel (sr->chn->client, sr->op_id); | ||
1394 | GNUNET_free (sr); | ||
1233 | } | 1395 | } |
1234 | 1396 | ||
1235 | /* end of psyc_api.c */ | 1397 | /* end of psyc_api.c */ |
diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c index 961922ce4..ebbc2dad8 100644 --- a/src/psyc/psyc_util_lib.c +++ b/src/psyc/psyc_util_lib.c | |||
@@ -326,9 +326,13 @@ GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
326 | * The message part is added to the current message buffer. | 326 | * The message part is added to the current message buffer. |
327 | * When this buffer is full, it is added to the transmission queue. | 327 | * When this buffer is full, it is added to the transmission queue. |
328 | * | 328 | * |
329 | * @param tmit Transmission handle. | 329 | * @param tmit |
330 | * @param msg Message part, or NULL. | 330 | * Transmission handle. |
331 | * @param end End of message? #GNUNET_YES or #GNUNET_NO. | 331 | * @param msg |
332 | * Message part, or NULL. | ||
333 | * @param end | ||
334 | * End of message? | ||
335 | * #GNUNET_YES or #GNUNET_NO. | ||
332 | */ | 336 | */ |
333 | static void | 337 | static void |
334 | transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | 338 | transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, |
@@ -632,16 +636,24 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper, | |||
632 | /** | 636 | /** |
633 | * Transmit a message. | 637 | * Transmit a message. |
634 | * | 638 | * |
635 | * @param tmit Transmission handle. | 639 | * @param tmit |
636 | * @param method_name Which method should be invoked. | 640 | * Transmission handle. |
637 | * @param env Environment for the message. | 641 | * @param method_name |
638 | * Should stay available until the first call to notify_data. | 642 | * Which method should be invoked. |
639 | * Can be NULL if there are no modifiers or @a notify_mod is provided instead. | 643 | * @param env |
640 | * @param notify_mod Function to call to obtain modifiers. | 644 | * Environment for the message. |
641 | * Can be NULL if there are no modifiers or @a env is provided instead. | 645 | * Should stay available until the first call to notify_data. |
642 | * @param notify_data Function to call to obtain fragments of the data. | 646 | * Can be NULL if there are no modifiers or @a notify_mod is |
643 | * @param notify_cls Closure for @a notify_mod and @a notify_data. | 647 | * provided instead. |
644 | * @param flags Flags for the message being transmitted. | 648 | * @param notify_mod |
649 | * Function to call to obtain modifiers. | ||
650 | * Can be NULL if there are no modifiers or @a env is provided instead. | ||
651 | * @param notify_data | ||
652 | * Function to call to obtain fragments of the data. | ||
653 | * @param notify_cls | ||
654 | * Closure for @a notify_mod and @a notify_data. | ||
655 | * @param flags | ||
656 | * Flags for the message being transmitted. | ||
645 | * | 657 | * |
646 | * @return #GNUNET_OK if the transmission was started. | 658 | * @return #GNUNET_OK if the transmission was started. |
647 | * #GNUNET_SYSERR if another transmission is already going on. | 659 | * #GNUNET_SYSERR if another transmission is already going on. |
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 7160c13c6..ba31d9329 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -82,7 +82,7 @@ struct TransmitClosure | |||
82 | 82 | ||
83 | struct TransmitClosure *tmit; | 83 | struct TransmitClosure *tmit; |
84 | 84 | ||
85 | uint8_t join_req_count; | 85 | uint8_t join_req_count, end_count; |
86 | 86 | ||
87 | enum | 87 | enum |
88 | { | 88 | { |
@@ -105,6 +105,9 @@ enum | |||
105 | void | 105 | void |
106 | master_transmit (); | 106 | master_transmit (); |
107 | 107 | ||
108 | void | ||
109 | master_history_replay_latest (); | ||
110 | |||
108 | 111 | ||
109 | void master_stopped (void *cls) | 112 | void master_stopped (void *cls) |
110 | { | 113 | { |
@@ -198,6 +201,134 @@ end () | |||
198 | 201 | ||
199 | 202 | ||
200 | void | 203 | void |
204 | master_message_cb (void *cls, uint64_t message_id, uint32_t flags, | ||
205 | const struct GNUNET_PSYC_MessageHeader *msg) | ||
206 | { | ||
207 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
208 | "Test #%d: Master got PSYC message fragment of size %u " | ||
209 | "belonging to message ID %" PRIu64 " with flags %x\n", | ||
210 | test, ntohs (msg->header.size), message_id, flags); | ||
211 | // FIXME | ||
212 | } | ||
213 | |||
214 | |||
215 | void | ||
216 | master_message_part_cb (void *cls, uint64_t message_id, | ||
217 | uint64_t data_offset, uint32_t flags, | ||
218 | const struct GNUNET_MessageHeader *msg) | ||
219 | { | ||
220 | if (NULL == msg) | ||
221 | { | ||
222 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
223 | "Error while receiving message %" PRIu64 "\n", message_id); | ||
224 | return; | ||
225 | } | ||
226 | |||
227 | uint16_t type = ntohs (msg->type); | ||
228 | uint16_t size = ntohs (msg->size); | ||
229 | |||
230 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
231 | "Test #%d: Master got message part of type %u and size %u " | ||
232 | "belonging to message ID %" PRIu64 " with flags %x\n", | ||
233 | test, type, size, message_id, flags); | ||
234 | |||
235 | switch (test) | ||
236 | { | ||
237 | case TEST_SLAVE_TRANSMIT: | ||
238 | if (GNUNET_PSYC_MESSAGE_REQUEST != flags) | ||
239 | { | ||
240 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
241 | "Unexpected request flags: %x" PRIu32 "\n", flags); | ||
242 | GNUNET_assert (0); | ||
243 | return; | ||
244 | } | ||
245 | // FIXME: check rest of message | ||
246 | |||
247 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) | ||
248 | master_transmit (); | ||
249 | break; | ||
250 | |||
251 | case TEST_MASTER_TRANSMIT: | ||
252 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count) | ||
253 | master_history_replay_latest (); | ||
254 | break; | ||
255 | |||
256 | case TEST_MASTER_HISTORY_REPLAY: | ||
257 | case TEST_MASTER_HISTORY_REPLAY_LATEST: | ||
258 | if (GNUNET_PSYC_MESSAGE_HISTORIC != flags) | ||
259 | { | ||
260 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
261 | "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", | ||
262 | test, flags); | ||
263 | GNUNET_assert (0); | ||
264 | return; | ||
265 | } | ||
266 | break; | ||
267 | |||
268 | default: | ||
269 | GNUNET_assert (0); | ||
270 | } | ||
271 | } | ||
272 | |||
273 | |||
274 | void | ||
275 | slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, | ||
276 | const struct GNUNET_PSYC_MessageHeader *msg) | ||
277 | { | ||
278 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
279 | "Test #%d: Slave got PSYC message fragment of size %u " | ||
280 | "belonging to message ID %" PRIu64 " with flags %x\n", | ||
281 | test, ntohs (msg->header.size), message_id, flags); | ||
282 | // FIXME | ||
283 | } | ||
284 | |||
285 | |||
286 | void | ||
287 | slave_message_part_cb (void *cls, uint64_t message_id, | ||
288 | uint64_t data_offset, uint32_t flags, | ||
289 | const struct GNUNET_MessageHeader *msg) | ||
290 | { | ||
291 | if (NULL == msg) | ||
292 | { | ||
293 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
294 | "Error while receiving message " PRIu64 "\n", message_id); | ||
295 | return; | ||
296 | } | ||
297 | |||
298 | uint16_t type = ntohs (msg->type); | ||
299 | uint16_t size = ntohs (msg->size); | ||
300 | |||
301 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
302 | "Test #%d: Slave got message part of type %u and size %u " | ||
303 | "belonging to message ID %" PRIu64 " with flags %x\n", | ||
304 | test, type, size, message_id, flags); | ||
305 | |||
306 | switch (test) | ||
307 | { | ||
308 | case TEST_MASTER_TRANSMIT: | ||
309 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count) | ||
310 | master_history_replay_latest (); | ||
311 | break; | ||
312 | |||
313 | case TEST_SLAVE_HISTORY_REPLAY: | ||
314 | case TEST_SLAVE_HISTORY_REPLAY_LATEST: | ||
315 | if (GNUNET_PSYC_MESSAGE_HISTORIC != flags) | ||
316 | { | ||
317 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
318 | "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", | ||
319 | flags); | ||
320 | GNUNET_assert (0); | ||
321 | return; | ||
322 | } | ||
323 | break; | ||
324 | |||
325 | default: | ||
326 | GNUNET_assert (0); | ||
327 | } | ||
328 | } | ||
329 | |||
330 | |||
331 | void | ||
201 | state_get_var (void *cls, const char *name, const void *value, size_t value_size) | 332 | state_get_var (void *cls, const char *name, const void *value, size_t value_size) |
202 | { | 333 | { |
203 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 334 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -208,10 +339,12 @@ state_get_var (void *cls, const char *name, const void *value, size_t value_size | |||
208 | /*** Slave state_get_prefix() ***/ | 339 | /*** Slave state_get_prefix() ***/ |
209 | 340 | ||
210 | void | 341 | void |
211 | slave_state_get_prefix_result (void *cls, int64_t result, const char *err_msg) | 342 | slave_state_get_prefix_result (void *cls, int64_t result, |
343 | const void *err_msg, uint16_t err_msg_size) | ||
212 | { | 344 | { |
213 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 345 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
214 | "slave_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); | 346 | "slave_state_get_prefix:\t%" PRId64 " (%.s)\n", |
347 | result, err_msg_size, err_msg); | ||
215 | // FIXME: GNUNET_assert (2 == result); | 348 | // FIXME: GNUNET_assert (2 == result); |
216 | end (); | 349 | end (); |
217 | } | 350 | } |
@@ -230,7 +363,8 @@ slave_state_get_prefix () | |||
230 | 363 | ||
231 | 364 | ||
232 | void | 365 | void |
233 | master_state_get_prefix_result (void *cls, int64_t result, const char *err_msg) | 366 | master_state_get_prefix_result (void *cls, int64_t result, |
367 | const void *err_msg, uint16_t err_msg_size) | ||
234 | { | 368 | { |
235 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 369 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
236 | "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); | 370 | "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); |
@@ -252,10 +386,12 @@ master_state_get_prefix () | |||
252 | 386 | ||
253 | 387 | ||
254 | void | 388 | void |
255 | slave_state_get_result (void *cls, int64_t result, const char *err_msg) | 389 | slave_state_get_result (void *cls, int64_t result, |
390 | const void *err_msg, uint16_t err_msg_size) | ||
256 | { | 391 | { |
257 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 392 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
258 | "slave_state_get:\t%" PRId64 " (%s)\n", result, err_msg); | 393 | "slave_state_get:\t%" PRId64 " (%.*s)\n", |
394 | result, err_msg_size, err_msg); | ||
259 | // FIXME: GNUNET_assert (2 == result); | 395 | // FIXME: GNUNET_assert (2 == result); |
260 | master_state_get_prefix (); | 396 | master_state_get_prefix (); |
261 | } | 397 | } |
@@ -274,10 +410,12 @@ slave_state_get () | |||
274 | 410 | ||
275 | 411 | ||
276 | void | 412 | void |
277 | master_state_get_result (void *cls, int64_t result, const char *err_msg) | 413 | master_state_get_result (void *cls, int64_t result, |
414 | const void *err_msg, uint16_t err_msg_size) | ||
278 | { | 415 | { |
279 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 416 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
280 | "master_state_get:\t%" PRId64 " (%s)\n", result, err_msg); | 417 | "master_state_get:\t%" PRId64 " (%.*s)\n", |
418 | result, err_msg_size, err_msg); | ||
281 | // FIXME: GNUNET_assert (1 == result); | 419 | // FIXME: GNUNET_assert (1 == result); |
282 | slave_state_get (); | 420 | slave_state_get (); |
283 | } | 421 | } |
@@ -295,10 +433,12 @@ master_state_get () | |||
295 | /*** Slave history_replay() ***/ | 433 | /*** Slave history_replay() ***/ |
296 | 434 | ||
297 | void | 435 | void |
298 | slave_history_replay_result (void *cls, int64_t result, const char *err_msg) | 436 | slave_history_replay_result (void *cls, int64_t result, |
437 | const void *err_msg, uint16_t err_msg_size) | ||
299 | { | 438 | { |
300 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 439 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
301 | "slave_history_replay:\t%" PRId64 " (%s)\n", result, err_msg); | 440 | "slave_history_replay:\t%" PRId64 " (%.*s)\n", |
441 | result, err_msg_size, err_msg); | ||
302 | GNUNET_assert (9 == result); | 442 | GNUNET_assert (9 == result); |
303 | 443 | ||
304 | master_state_get (); | 444 | master_state_get (); |
@@ -309,9 +449,11 @@ void | |||
309 | slave_history_replay () | 449 | slave_history_replay () |
310 | { | 450 | { |
311 | test = TEST_SLAVE_HISTORY_REPLAY; | 451 | test = TEST_SLAVE_HISTORY_REPLAY; |
312 | GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, | 452 | GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "", |
313 | &slave_history_replay_result, | 453 | GNUNET_PSYC_HISTORY_REPLAY_LOCAL, |
314 | NULL); | 454 | &slave_message_cb, |
455 | &slave_message_part_cb, | ||
456 | &slave_history_replay_result, NULL); | ||
315 | } | 457 | } |
316 | 458 | ||
317 | 459 | ||
@@ -319,10 +461,12 @@ slave_history_replay () | |||
319 | 461 | ||
320 | 462 | ||
321 | void | 463 | void |
322 | master_history_replay_result (void *cls, int64_t result, const char *err_msg) | 464 | master_history_replay_result (void *cls, int64_t result, |
465 | const void *err_msg, uint16_t err_msg_size) | ||
323 | { | 466 | { |
324 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 467 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
325 | "master_history_replay:\t%" PRId64 " (%s)\n", result, err_msg); | 468 | "master_history_replay:\t%" PRId64 " (%.*s)\n", |
469 | result, err_msg_size, err_msg); | ||
326 | GNUNET_assert (9 == result); | 470 | GNUNET_assert (9 == result); |
327 | 471 | ||
328 | slave_history_replay (); | 472 | slave_history_replay (); |
@@ -333,9 +477,11 @@ void | |||
333 | master_history_replay () | 477 | master_history_replay () |
334 | { | 478 | { |
335 | test = TEST_MASTER_HISTORY_REPLAY; | 479 | test = TEST_MASTER_HISTORY_REPLAY; |
336 | GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, | 480 | GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "", |
337 | &master_history_replay_result, | 481 | GNUNET_PSYC_HISTORY_REPLAY_LOCAL, |
338 | NULL); | 482 | &master_message_cb, |
483 | &master_message_part_cb, | ||
484 | &master_history_replay_result, NULL); | ||
339 | } | 485 | } |
340 | 486 | ||
341 | 487 | ||
@@ -343,10 +489,12 @@ master_history_replay () | |||
343 | 489 | ||
344 | 490 | ||
345 | void | 491 | void |
346 | slave_history_replay_latest_result (void *cls, int64_t result, const char *err_msg) | 492 | slave_history_replay_latest_result (void *cls, int64_t result, |
493 | const void *err_msg, uint16_t err_msg_size) | ||
347 | { | 494 | { |
348 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 495 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
349 | "slave_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg); | 496 | "slave_history_replay_latest:\t%" PRId64 " (%.*s)\n", |
497 | result, err_msg_size, err_msg); | ||
350 | GNUNET_assert (9 == result); | 498 | GNUNET_assert (9 == result); |
351 | 499 | ||
352 | master_history_replay (); | 500 | master_history_replay (); |
@@ -357,7 +505,10 @@ void | |||
357 | slave_history_replay_latest () | 505 | slave_history_replay_latest () |
358 | { | 506 | { |
359 | test = TEST_SLAVE_HISTORY_REPLAY_LATEST; | 507 | test = TEST_SLAVE_HISTORY_REPLAY_LATEST; |
360 | GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, | 508 | GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, "", |
509 | GNUNET_PSYC_HISTORY_REPLAY_LOCAL, | ||
510 | &slave_message_cb, | ||
511 | &slave_message_part_cb, | ||
361 | &slave_history_replay_latest_result, | 512 | &slave_history_replay_latest_result, |
362 | NULL); | 513 | NULL); |
363 | } | 514 | } |
@@ -367,10 +518,12 @@ slave_history_replay_latest () | |||
367 | 518 | ||
368 | 519 | ||
369 | void | 520 | void |
370 | master_history_replay_latest_result (void *cls, int64_t result, const char *err_msg) | 521 | master_history_replay_latest_result (void *cls, int64_t result, |
522 | const void *err_msg, uint16_t err_msg_size) | ||
371 | { | 523 | { |
372 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 524 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
373 | "master_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg); | 525 | "master_history_replay_latest:\t%" PRId64 " (%.*s)\n", |
526 | result, err_msg_size, err_msg); | ||
374 | GNUNET_assert (9 == result); | 527 | GNUNET_assert (9 == result); |
375 | 528 | ||
376 | slave_history_replay_latest (); | 529 | slave_history_replay_latest (); |
@@ -381,139 +534,16 @@ void | |||
381 | master_history_replay_latest () | 534 | master_history_replay_latest () |
382 | { | 535 | { |
383 | test = TEST_MASTER_HISTORY_REPLAY_LATEST; | 536 | test = TEST_MASTER_HISTORY_REPLAY_LATEST; |
384 | GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, | 537 | GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, "", |
538 | GNUNET_PSYC_HISTORY_REPLAY_LOCAL, | ||
539 | &master_message_cb, | ||
540 | &master_message_part_cb, | ||
385 | &master_history_replay_latest_result, | 541 | &master_history_replay_latest_result, |
386 | NULL); | 542 | NULL); |
387 | } | 543 | } |
388 | 544 | ||
389 | 545 | ||
390 | void | 546 | void |
391 | master_message_cb (void *cls, uint64_t message_id, uint32_t flags, | ||
392 | const struct GNUNET_PSYC_MessageHeader *msg) | ||
393 | { | ||
394 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
395 | "Test #%d: Master got PSYC message fragment of size %u " | ||
396 | "belonging to message ID %" PRIu64 " with flags %x\n", | ||
397 | test, ntohs (msg->header.size), message_id, flags); | ||
398 | // FIXME | ||
399 | } | ||
400 | |||
401 | |||
402 | void | ||
403 | master_message_part_cb (void *cls, uint64_t message_id, | ||
404 | uint64_t data_offset, uint32_t flags, | ||
405 | const struct GNUNET_MessageHeader *msg) | ||
406 | { | ||
407 | if (NULL == msg) | ||
408 | { | ||
409 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
410 | "Error while receiving message %" PRIu64 "\n", message_id); | ||
411 | return; | ||
412 | } | ||
413 | |||
414 | uint16_t type = ntohs (msg->type); | ||
415 | uint16_t size = ntohs (msg->size); | ||
416 | |||
417 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
418 | "Test #%d: Master got message part of type %u and size %u " | ||
419 | "belonging to message ID %" PRIu64 " with flags %x\n", | ||
420 | test, type, size, message_id, flags); | ||
421 | |||
422 | switch (test) | ||
423 | { | ||
424 | case TEST_SLAVE_TRANSMIT: | ||
425 | if (GNUNET_PSYC_MESSAGE_REQUEST != flags) | ||
426 | { | ||
427 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
428 | "Unexpected request flags: %x" PRIu32 "\n", flags); | ||
429 | GNUNET_assert (0); | ||
430 | return; | ||
431 | } | ||
432 | // FIXME: check rest of message | ||
433 | |||
434 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) | ||
435 | master_transmit (); | ||
436 | break; | ||
437 | |||
438 | case TEST_MASTER_TRANSMIT: | ||
439 | break; | ||
440 | |||
441 | case TEST_MASTER_HISTORY_REPLAY: | ||
442 | case TEST_MASTER_HISTORY_REPLAY_LATEST: | ||
443 | if (GNUNET_PSYC_MESSAGE_HISTORIC != flags) | ||
444 | { | ||
445 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
446 | "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", | ||
447 | flags); | ||
448 | GNUNET_assert (0); | ||
449 | return; | ||
450 | } | ||
451 | break; | ||
452 | |||
453 | default: | ||
454 | GNUNET_assert (0); | ||
455 | } | ||
456 | } | ||
457 | |||
458 | |||
459 | void | ||
460 | slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, | ||
461 | const struct GNUNET_PSYC_MessageHeader *msg) | ||
462 | { | ||
463 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
464 | "Test #%d: Slave got PSYC message fragment of size %u " | ||
465 | "belonging to message ID %" PRIu64 " with flags %x\n", | ||
466 | test, ntohs (msg->header.size), message_id, flags); | ||
467 | // FIXME | ||
468 | } | ||
469 | |||
470 | |||
471 | void | ||
472 | slave_message_part_cb (void *cls, uint64_t message_id, | ||
473 | uint64_t data_offset, uint32_t flags, | ||
474 | const struct GNUNET_MessageHeader *msg) | ||
475 | { | ||
476 | if (NULL == msg) | ||
477 | { | ||
478 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
479 | "Error while receiving message " PRIu64 "\n", message_id); | ||
480 | return; | ||
481 | } | ||
482 | |||
483 | uint16_t type = ntohs (msg->type); | ||
484 | uint16_t size = ntohs (msg->size); | ||
485 | |||
486 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
487 | "Test #%d: Slave got message part of type %u and size %u " | ||
488 | "belonging to message ID %" PRIu64 " with flags %x\n", | ||
489 | test, type, size, message_id, flags); | ||
490 | |||
491 | switch (test) | ||
492 | { | ||
493 | case TEST_MASTER_TRANSMIT: | ||
494 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) | ||
495 | master_history_replay_latest (); | ||
496 | break; | ||
497 | |||
498 | case TEST_SLAVE_HISTORY_REPLAY: | ||
499 | case TEST_SLAVE_HISTORY_REPLAY_LATEST: | ||
500 | if (GNUNET_PSYC_MESSAGE_HISTORIC != flags) | ||
501 | { | ||
502 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
503 | "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", | ||
504 | flags); | ||
505 | GNUNET_assert (0); | ||
506 | return; | ||
507 | } | ||
508 | break; | ||
509 | |||
510 | default: | ||
511 | GNUNET_assert (0); | ||
512 | } | ||
513 | } | ||
514 | |||
515 | |||
516 | void | ||
517 | transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 547 | transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
518 | { | 548 | { |
519 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); | 549 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); |
@@ -665,27 +695,31 @@ slave_transmit () | |||
665 | tmit->data[0] = "slave test"; | 695 | tmit->data[0] = "slave test"; |
666 | tmit->data_count = 1; | 696 | tmit->data_count = 1; |
667 | tmit->slv_tmit | 697 | tmit->slv_tmit |
668 | = GNUNET_PSYC_slave_transmit (slv, "_request_test", tmit_notify_mod, | 698 | = GNUNET_PSYC_slave_transmit (slv, "_request_test", &tmit_notify_mod, |
669 | tmit_notify_data, tmit, | 699 | &tmit_notify_data, tmit, |
670 | GNUNET_PSYC_SLAVE_TRANSMIT_NONE); | 700 | GNUNET_PSYC_SLAVE_TRANSMIT_NONE); |
671 | } | 701 | } |
672 | 702 | ||
673 | 703 | ||
674 | void | 704 | void |
675 | slave_remove_cb (void *cls, int64_t result, const char *err_msg) | 705 | slave_remove_cb (void *cls, int64_t result, |
706 | const void *err_msg, uint16_t err_msg_size) | ||
676 | { | 707 | { |
677 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 708 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
678 | "slave_remove:\t%" PRId64 " (%s)\n", result, err_msg); | 709 | "slave_remove:\t%" PRId64 " (%.*s)\n", |
710 | result, err_msg_size, err_msg); | ||
679 | 711 | ||
680 | slave_transmit (); | 712 | slave_transmit (); |
681 | } | 713 | } |
682 | 714 | ||
683 | 715 | ||
684 | void | 716 | void |
685 | slave_add_cb (void *cls, int64_t result, const char *err_msg) | 717 | slave_add_cb (void *cls, int64_t result, |
718 | const void *err_msg, uint16_t err_msg_size) | ||
686 | { | 719 | { |
687 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 720 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
688 | "slave_add:\t%" PRId64 " (%s)\n", result, err_msg); | 721 | "slave_add:\t%" PRId64 " (%.*s)\n", |
722 | result, err_msg_size, err_msg); | ||
689 | 723 | ||
690 | struct GNUNET_PSYC_Channel *chn = cls; | 724 | struct GNUNET_PSYC_Channel *chn = cls; |
691 | GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2, | 725 | GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2, |
@@ -775,6 +809,8 @@ master_transmit () | |||
775 | { | 809 | { |
776 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n"); | 810 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n"); |
777 | test = TEST_MASTER_TRANSMIT; | 811 | test = TEST_MASTER_TRANSMIT; |
812 | end_count = 0; | ||
813 | |||
778 | uint32_t i, j; | 814 | uint32_t i, j; |
779 | 815 | ||
780 | char *name_max = "_test_max"; | 816 | char *name_max = "_test_max"; |
@@ -816,8 +852,8 @@ master_transmit () | |||
816 | tmit->data_delay[1] = 3; | 852 | tmit->data_delay[1] = 3; |
817 | tmit->data_count = 4; | 853 | tmit->data_count = 4; |
818 | tmit->mst_tmit | 854 | tmit->mst_tmit |
819 | = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod, | 855 | = GNUNET_PSYC_master_transmit (mst, "_notice_test", &tmit_notify_mod, |
820 | tmit_notify_data, tmit, | 856 | &tmit_notify_data, tmit, |
821 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); | 857 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); |
822 | } | 858 | } |
823 | 859 | ||