diff options
author | Gabor X Toth <*@tg-x.net> | 2014-07-30 21:18:13 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-07-30 21:18:13 +0000 |
commit | 40884377f3126bbecbfd3243d47224b3094914f9 (patch) | |
tree | 9f32aab9064b199178282a0c9918313e0aa30049 /src/psyc | |
parent | 831718fa44b2c56577aa4e36a479fef6debb8cea (diff) | |
download | gnunet-40884377f3126bbecbfd3243d47224b3094914f9.tar.gz gnunet-40884377f3126bbecbfd3243d47224b3094914f9.zip |
psyc, psycstore: retrieve state and history
Diffstat (limited to 'src/psyc')
-rw-r--r-- | src/psyc/Makefile.am | 2 | ||||
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 272 | ||||
-rw-r--r-- | src/psyc/psyc.h | 57 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 414 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 348 |
5 files changed, 922 insertions, 171 deletions
diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am index b18605ab9..fb4341751 100644 --- a/src/psyc/Makefile.am +++ b/src/psyc/Makefile.am | |||
@@ -86,12 +86,14 @@ test_psyc_LDADD = \ | |||
86 | libgnunetpsyc.la \ | 86 | libgnunetpsyc.la \ |
87 | libgnunetpsycutil.la \ | 87 | libgnunetpsycutil.la \ |
88 | $(top_builddir)/src/testing/libgnunettesting.la \ | 88 | $(top_builddir)/src/testing/libgnunettesting.la \ |
89 | $(top_builddir)/src/core/libgnunetcore.la \ | ||
89 | $(top_builddir)/src/env/libgnunetenv.la \ | 90 | $(top_builddir)/src/env/libgnunetenv.la \ |
90 | $(top_builddir)/src/util/libgnunetutil.la | 91 | $(top_builddir)/src/util/libgnunetutil.la |
91 | test_psyc_DEPENDENCIES = \ | 92 | test_psyc_DEPENDENCIES = \ |
92 | libgnunetpsyc.la \ | 93 | libgnunetpsyc.la \ |
93 | libgnunetpsycutil.la \ | 94 | libgnunetpsycutil.la \ |
94 | $(top_builddir)/src/testing/libgnunettesting.la \ | 95 | $(top_builddir)/src/testing/libgnunettesting.la \ |
96 | $(top_builddir)/src/core/libgnunetcore.la \ | ||
95 | $(top_builddir)/src/env/libgnunetenv.la \ | 97 | $(top_builddir)/src/env/libgnunetenv.la \ |
96 | $(top_builddir)/src/util/libgnunetutil.la | 98 | $(top_builddir)/src/util/libgnunetutil.la |
97 | 99 | ||
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index e7020bc69..3adc34d2a 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -402,6 +402,14 @@ struct Slave | |||
402 | }; | 402 | }; |
403 | 403 | ||
404 | 404 | ||
405 | struct OperationClosure | ||
406 | { | ||
407 | struct GNUNET_SERVER_Client *client; | ||
408 | struct Channel *chn; | ||
409 | uint64_t op_id; | ||
410 | }; | ||
411 | |||
412 | |||
405 | static void | 413 | static void |
406 | transmit_message (struct Channel *chn); | 414 | transmit_message (struct Channel *chn); |
407 | 415 | ||
@@ -585,6 +593,46 @@ client_send_msg (const struct Channel *chn, | |||
585 | 593 | ||
586 | 594 | ||
587 | /** | 595 | /** |
596 | * Send a result code back to the client. | ||
597 | * | ||
598 | * @param client | ||
599 | * Client that should receive the result code. | ||
600 | * @param result_code | ||
601 | * Code to transmit. | ||
602 | * @param op_id | ||
603 | * Operation ID in network byte order. | ||
604 | * @param err_msg | ||
605 | * Error message to include (or NULL for none). | ||
606 | */ | ||
607 | static void | ||
608 | client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id, | ||
609 | int64_t result_code, const char *err_msg) | ||
610 | { | ||
611 | struct OperationResult *res; | ||
612 | size_t err_len = 0; // FIXME: maximum length | ||
613 | |||
614 | if (NULL != err_msg) | ||
615 | err_len = strlen (err_msg) + 1; | ||
616 | res = GNUNET_malloc (sizeof (struct OperationResult) + err_len); | ||
617 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); | ||
618 | res->header.size = htons (sizeof (struct OperationResult) + err_len); | ||
619 | res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1); | ||
620 | res->op_id = op_id; | ||
621 | if (0 < err_len) | ||
622 | memcpy (&res[1], err_msg, err_len); | ||
623 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
624 | "%p Sending result to client for operation #%" PRIu64 ": " | ||
625 | "%" PRId64 " (%s)\n", | ||
626 | client, GNUNET_ntohll (op_id), result_code, err_msg); | ||
627 | |||
628 | GNUNET_SERVER_notification_context_add (nc, client); | ||
629 | GNUNET_SERVER_notification_context_unicast (nc, client, &res->header, | ||
630 | GNUNET_NO); | ||
631 | GNUNET_free (res); | ||
632 | } | ||
633 | |||
634 | |||
635 | /** | ||
588 | * Closure for join_mem_test_cb() | 636 | * Closure for join_mem_test_cb() |
589 | */ | 637 | */ |
590 | struct JoinMemTestClosure | 638 | struct JoinMemTestClosure |
@@ -799,7 +847,8 @@ mcast_recv_replay_fragment (void *cls, | |||
799 | 847 | ||
800 | { | 848 | { |
801 | struct Channel *chn = cls; | 849 | struct Channel *chn = cls; |
802 | GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key, fragment_id, | 850 | GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key, |
851 | fragment_id, fragment_id, | ||
803 | &store_recv_fragment_replay, | 852 | &store_recv_fragment_replay, |
804 | &store_recv_fragment_replay_result, rh); | 853 | &store_recv_fragment_replay_result, rh); |
805 | } | 854 | } |
@@ -817,7 +866,8 @@ mcast_recv_replay_message (void *cls, | |||
817 | struct GNUNET_MULTICAST_ReplayHandle *rh) | 866 | struct GNUNET_MULTICAST_ReplayHandle *rh) |
818 | { | 867 | { |
819 | struct Channel *chn = cls; | 868 | struct Channel *chn = cls; |
820 | GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, message_id, | 869 | GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, |
870 | message_id, message_id, | ||
821 | &store_recv_fragment_replay, | 871 | &store_recv_fragment_replay, |
822 | &store_recv_fragment_replay_result, rh); | 872 | &store_recv_fragment_replay_result, rh); |
823 | } | 873 | } |
@@ -865,7 +915,8 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) | |||
865 | */ | 915 | */ |
866 | static void | 916 | static void |
867 | client_send_mcast_msg (struct Channel *chn, | 917 | client_send_mcast_msg (struct Channel *chn, |
868 | const struct GNUNET_MULTICAST_MessageHeader *mmsg) | 918 | const struct GNUNET_MULTICAST_MessageHeader *mmsg, |
919 | uint32_t flags) | ||
869 | { | 920 | { |
870 | struct GNUNET_PSYC_MessageHeader *pmsg; | 921 | struct GNUNET_PSYC_MessageHeader *pmsg; |
871 | uint16_t size = ntohs (mmsg->header.size); | 922 | uint16_t size = ntohs (mmsg->header.size); |
@@ -882,6 +933,7 @@ client_send_mcast_msg (struct Channel *chn, | |||
882 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | 933 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); |
883 | pmsg->message_id = mmsg->message_id; | 934 | pmsg->message_id = mmsg->message_id; |
884 | pmsg->fragment_offset = mmsg->fragment_offset; | 935 | pmsg->fragment_offset = mmsg->fragment_offset; |
936 | pmsg->flags = htonl (flags); | ||
885 | 937 | ||
886 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); | 938 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); |
887 | client_send_msg (chn, &pmsg->header); | 939 | client_send_msg (chn, &pmsg->header); |
@@ -1111,7 +1163,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id, | |||
1111 | { | 1163 | { |
1112 | if (GNUNET_NO == drop) | 1164 | if (GNUNET_NO == drop) |
1113 | { | 1165 | { |
1114 | client_send_mcast_msg (chn, cache_entry->mmsg); | 1166 | client_send_mcast_msg (chn, cache_entry->mmsg, 0); |
1115 | } | 1167 | } |
1116 | if (cache_entry->ref_count <= 1) | 1168 | if (cache_entry->ref_count <= 1) |
1117 | { | 1169 | { |
@@ -1375,10 +1427,10 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id, | |||
1375 | struct Channel *chn = &mst->chn; | 1427 | struct Channel *chn = &mst->chn; |
1376 | chn->store_op = NULL; | 1428 | chn->store_op = NULL; |
1377 | 1429 | ||
1378 | struct CountersResult res; | 1430 | struct GNUNET_PSYC_CountersResultMessage res; |
1379 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); | 1431 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); |
1380 | res.header.size = htons (sizeof (res)); | 1432 | res.header.size = htons (sizeof (res)); |
1381 | res.result_code = htonl (result); | 1433 | res.result_code = htonl (result - INT32_MIN); |
1382 | res.max_message_id = GNUNET_htonll (max_message_id); | 1434 | res.max_message_id = GNUNET_htonll (max_message_id); |
1383 | 1435 | ||
1384 | if (GNUNET_OK == result || GNUNET_NO == result) | 1436 | if (GNUNET_OK == result || GNUNET_NO == result) |
@@ -1421,10 +1473,10 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, | |||
1421 | struct Channel *chn = &slv->chn; | 1473 | struct Channel *chn = &slv->chn; |
1422 | chn->store_op = NULL; | 1474 | chn->store_op = NULL; |
1423 | 1475 | ||
1424 | struct CountersResult res; | 1476 | struct GNUNET_PSYC_CountersResultMessage res; |
1425 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); | 1477 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); |
1426 | res.header.size = htons (sizeof (res)); | 1478 | res.header.size = htons (sizeof (res)); |
1427 | res.result_code = htonl (result); | 1479 | res.result_code = htonl (result - INT32_MIN); |
1428 | res.max_message_id = GNUNET_htonll (max_message_id); | 1480 | res.max_message_id = GNUNET_htonll (max_message_id); |
1429 | 1481 | ||
1430 | if (GNUNET_OK == result || GNUNET_NO == result) | 1482 | if (GNUNET_OK == result || GNUNET_NO == result) |
@@ -1511,10 +1563,10 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client, | |||
1511 | { | 1563 | { |
1512 | chn = &mst->chn; | 1564 | chn = &mst->chn; |
1513 | 1565 | ||
1514 | struct CountersResult res; | 1566 | struct GNUNET_PSYC_CountersResultMessage res; |
1515 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); | 1567 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); |
1516 | res.header.size = htons (sizeof (res)); | 1568 | res.header.size = htons (sizeof (res)); |
1517 | res.result_code = htonl (GNUNET_OK); | 1569 | res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN); |
1518 | res.max_message_id = GNUNET_htonll (mst->max_message_id); | 1570 | res.max_message_id = GNUNET_htonll (mst->max_message_id); |
1519 | 1571 | ||
1520 | GNUNET_SERVER_notification_context_add (nc, client); | 1572 | GNUNET_SERVER_notification_context_add (nc, client); |
@@ -1621,10 +1673,10 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
1621 | { | 1673 | { |
1622 | chn = &slv->chn; | 1674 | chn = &slv->chn; |
1623 | 1675 | ||
1624 | struct CountersResult res; | 1676 | struct GNUNET_PSYC_CountersResultMessage res; |
1625 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); | 1677 | res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); |
1626 | res.header.size = htons (sizeof (res)); | 1678 | res.header.size = htons (sizeof (res)); |
1627 | res.result_code = htonl (GNUNET_OK); | 1679 | res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN); |
1628 | res.max_message_id = GNUNET_htonll (chn->max_message_id); | 1680 | res.max_message_id = GNUNET_htonll (chn->max_message_id); |
1629 | 1681 | ||
1630 | GNUNET_SERVER_notification_context_add (nc, client); | 1682 | GNUNET_SERVER_notification_context_add (nc, client); |
@@ -2047,17 +2099,26 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
2047 | }; | 2099 | }; |
2048 | 2100 | ||
2049 | 2101 | ||
2102 | struct MembershipStoreClosure | ||
2103 | { | ||
2104 | struct GNUNET_SERVER_Client *client; | ||
2105 | struct Channel *chn; | ||
2106 | uint64_t op_id; | ||
2107 | }; | ||
2108 | |||
2109 | |||
2050 | /** | 2110 | /** |
2051 | * Received result of GNUNET_PSYCSTORE_membership_store() | 2111 | * Received result of GNUNET_PSYCSTORE_membership_store() |
2052 | */ | 2112 | */ |
2053 | static void | 2113 | static void |
2054 | store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg) | 2114 | store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg) |
2055 | { | 2115 | { |
2056 | struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls; | 2116 | struct MembershipStoreClosure *mcls = cls; |
2057 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2117 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2058 | "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n", | 2118 | "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n", |
2059 | mth, result, err_msg); | 2119 | mcls->chn, result, err_msg); |
2060 | /* FIXME: send result to client */ | 2120 | |
2121 | client_send_result (mcls->client, mcls->op_id, result, err_msg); | ||
2061 | } | 2122 | } |
2062 | 2123 | ||
2063 | 2124 | ||
@@ -2075,6 +2136,11 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, | |||
2075 | const struct ChannelMembershipStoreRequest * | 2136 | const struct ChannelMembershipStoreRequest * |
2076 | req = (const struct ChannelMembershipStoreRequest *) msg; | 2137 | req = (const struct ChannelMembershipStoreRequest *) msg; |
2077 | 2138 | ||
2139 | struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls)); | ||
2140 | mcls->client = client; | ||
2141 | mcls->chn = chn; | ||
2142 | mcls->op_id = req->op_id; | ||
2143 | |||
2078 | uint64_t announced_at = GNUNET_ntohll (req->announced_at); | 2144 | uint64_t announced_at = GNUNET_ntohll (req->announced_at); |
2079 | uint64_t effective_since = GNUNET_ntohll (req->effective_since); | 2145 | uint64_t effective_since = GNUNET_ntohll (req->effective_since); |
2080 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2146 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -2086,19 +2152,138 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, | |||
2086 | GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key, | 2152 | GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key, |
2087 | req->did_join, announced_at, effective_since, | 2153 | req->did_join, announced_at, effective_since, |
2088 | 0, /* FIXME: group_generation */ | 2154 | 0, /* FIXME: group_generation */ |
2089 | &store_recv_membership_store_result, chn); | 2155 | &store_recv_membership_store_result, mcls); |
2090 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2156 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2091 | } | 2157 | } |
2092 | 2158 | ||
2093 | 2159 | ||
2160 | static int | ||
2161 | store_recv_fragment_history (void *cls, | ||
2162 | struct GNUNET_MULTICAST_MessageHeader *msg, | ||
2163 | enum GNUNET_PSYCSTORE_MessageFlags flags) | ||
2164 | { | ||
2165 | struct OperationClosure *opcls = cls; | ||
2166 | struct Channel *chn = opcls->chn; | ||
2167 | client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC); | ||
2168 | return GNUNET_YES; | ||
2169 | } | ||
2170 | |||
2171 | |||
2172 | /** | ||
2173 | * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. | ||
2174 | */ | ||
2175 | static void | ||
2176 | store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg) | ||
2177 | { | ||
2178 | struct OperationClosure *opcls = cls; | ||
2179 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2180 | "%p History replay #%" PRIu64 ": " | ||
2181 | "PSYCSTORE returned %" PRId64 " (%s)\n", | ||
2182 | opcls->chn, opcls->op_id, result, err_msg); | ||
2183 | |||
2184 | client_send_result (opcls->client, opcls->op_id, result, err_msg); | ||
2185 | } | ||
2186 | |||
2187 | |||
2094 | /** | 2188 | /** |
2095 | * Client requests channel history from PSYCstore. | 2189 | * Client requests channel history from PSYCstore. |
2096 | */ | 2190 | */ |
2097 | static void | 2191 | static void |
2098 | client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client, | 2192 | client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, |
2099 | const struct GNUNET_MessageHeader *msg) | 2193 | const struct GNUNET_MessageHeader *msg) |
2194 | { | ||
2195 | struct Channel * | ||
2196 | chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); | ||
2197 | GNUNET_assert (NULL != chn); | ||
2198 | |||
2199 | const struct HistoryRequest * | ||
2200 | req = (const struct HistoryRequest *) msg; | ||
2201 | |||
2202 | struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); | ||
2203 | opcls->client = client; | ||
2204 | opcls->chn = chn; | ||
2205 | opcls->op_id = req->op_id; | ||
2206 | |||
2207 | if (0 == req->message_limit) | ||
2208 | GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL, | ||
2209 | GNUNET_ntohll (req->start_message_id), | ||
2210 | GNUNET_ntohll (req->end_message_id), | ||
2211 | &store_recv_fragment_history, | ||
2212 | &store_recv_fragment_history_result, opcls); | ||
2213 | else | ||
2214 | GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL, | ||
2215 | GNUNET_ntohll (req->message_limit), | ||
2216 | &store_recv_fragment_history, | ||
2217 | &store_recv_fragment_history_result, | ||
2218 | opcls); | ||
2219 | |||
2220 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
2221 | } | ||
2222 | |||
2223 | |||
2224 | /** | ||
2225 | * Received state var from PSYCstore, send it to client. | ||
2226 | */ | ||
2227 | static int | ||
2228 | store_recv_state_var (void *cls, const char *name, | ||
2229 | const void *value, size_t value_size) | ||
2230 | { | ||
2231 | struct OperationClosure *opcls = cls; | ||
2232 | struct OperationResult *op; | ||
2233 | |||
2234 | if (NULL != name) | ||
2235 | { | ||
2236 | uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; | ||
2237 | struct GNUNET_PSYC_MessageModifier *mod; | ||
2238 | op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size); | ||
2239 | op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size); | ||
2240 | op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); | ||
2241 | op->op_id = opcls->op_id; | ||
2242 | |||
2243 | mod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; | ||
2244 | mod->header.size = htons (sizeof (*mod) + name_size + value_size); | ||
2245 | mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | ||
2246 | mod->name_size = htons (name_size); | ||
2247 | mod->value_size = htonl (value_size); | ||
2248 | mod->oper = htons (GNUNET_ENV_OP_ASSIGN); | ||
2249 | memcpy (&mod[1], name, name_size); | ||
2250 | memcpy (((char *) &mod[1]) + name_size, value, value_size); | ||
2251 | } | ||
2252 | else | ||
2253 | { | ||
2254 | struct GNUNET_MessageHeader *mod; | ||
2255 | op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size); | ||
2256 | op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size); | ||
2257 | op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); | ||
2258 | op->op_id = opcls->op_id; | ||
2259 | |||
2260 | mod = (struct GNUNET_MessageHeader *) &op[1]; | ||
2261 | mod->size = htons (sizeof (*mod) + value_size); | ||
2262 | mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | ||
2263 | memcpy (&mod[1], value, value_size); | ||
2264 | } | ||
2265 | |||
2266 | GNUNET_SERVER_notification_context_add (nc, opcls->client); | ||
2267 | GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header, | ||
2268 | GNUNET_NO); | ||
2269 | return GNUNET_YES; | ||
2270 | } | ||
2271 | |||
2272 | |||
2273 | /** | ||
2274 | * Received result of GNUNET_PSYCSTORE_state_get() | ||
2275 | * or GNUNET_PSYCSTORE_state_get_prefix() | ||
2276 | */ | ||
2277 | static void | ||
2278 | store_recv_state_result (void *cls, int64_t result, const char *err_msg) | ||
2100 | { | 2279 | { |
2280 | struct OperationClosure *opcls = cls; | ||
2281 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2282 | "%p History replay #%" PRIu64 ": " | ||
2283 | "PSYCSTORE returned %" PRId64 " (%s)\n", | ||
2284 | opcls->chn, opcls->op_id, result, err_msg); | ||
2101 | 2285 | ||
2286 | client_send_result (opcls->client, opcls->op_id, result, err_msg); | ||
2102 | } | 2287 | } |
2103 | 2288 | ||
2104 | 2289 | ||
@@ -2109,7 +2294,30 @@ static void | |||
2109 | client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client, | 2294 | client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client, |
2110 | const struct GNUNET_MessageHeader *msg) | 2295 | const struct GNUNET_MessageHeader *msg) |
2111 | { | 2296 | { |
2297 | struct Channel * | ||
2298 | chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); | ||
2299 | GNUNET_assert (NULL != chn); | ||
2300 | |||
2301 | const struct StateRequest * | ||
2302 | req = (const struct StateRequest *) msg; | ||
2112 | 2303 | ||
2304 | uint16_t name_size = ntohs (req->header.size) - sizeof (*req); | ||
2305 | const char *name = (const char *) &req[1]; | ||
2306 | if (0 == name_size || '\0' != name[name_size - 1]) | ||
2307 | { | ||
2308 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
2309 | return; | ||
2310 | } | ||
2311 | |||
2312 | struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); | ||
2313 | opcls->client = client; | ||
2314 | opcls->chn = chn; | ||
2315 | opcls->op_id = req->op_id; | ||
2316 | |||
2317 | GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name, | ||
2318 | &store_recv_state_var, | ||
2319 | &store_recv_state_result, opcls); | ||
2320 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
2113 | } | 2321 | } |
2114 | 2322 | ||
2115 | 2323 | ||
@@ -2120,6 +2328,30 @@ static void | |||
2120 | client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, | 2328 | client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, |
2121 | const struct GNUNET_MessageHeader *msg) | 2329 | const struct GNUNET_MessageHeader *msg) |
2122 | { | 2330 | { |
2331 | struct Channel * | ||
2332 | chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); | ||
2333 | GNUNET_assert (NULL != chn); | ||
2334 | |||
2335 | const struct StateRequest * | ||
2336 | req = (const struct StateRequest *) msg; | ||
2337 | |||
2338 | uint16_t name_size = ntohs (req->header.size) - sizeof (*req); | ||
2339 | const char *name = (const char *) &req[1]; | ||
2340 | if (0 == name_size || '\0' != name[name_size - 1]) | ||
2341 | { | ||
2342 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
2343 | return; | ||
2344 | } | ||
2345 | |||
2346 | struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); | ||
2347 | opcls->client = client; | ||
2348 | opcls->chn = chn; | ||
2349 | opcls->op_id = req->op_id; | ||
2350 | |||
2351 | GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name, | ||
2352 | &store_recv_state_var, | ||
2353 | &store_recv_state_result, opcls); | ||
2354 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
2123 | 2355 | ||
2124 | } | 2356 | } |
2125 | 2357 | ||
@@ -2140,8 +2372,8 @@ static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { | |||
2140 | { &client_recv_membership_store, NULL, | 2372 | { &client_recv_membership_store, NULL, |
2141 | GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 }, | 2373 | GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 }, |
2142 | 2374 | ||
2143 | { &client_recv_story_request, NULL, | 2375 | { &client_recv_history_replay, NULL, |
2144 | GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 }, | 2376 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 }, |
2145 | 2377 | ||
2146 | { &client_recv_state_get, NULL, | 2378 | { &client_recv_state_get, NULL, |
2147 | GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, | 2379 | GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, |
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 21131e7d3..f6d40ddb4 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h | |||
@@ -112,30 +112,39 @@ struct ChannelMembershipStoreRequest | |||
112 | */ | 112 | */ |
113 | struct GNUNET_MessageHeader header; | 113 | struct GNUNET_MessageHeader header; |
114 | 114 | ||
115 | uint32_t reserved; | 115 | uint32_t reserved GNUNET_PACKED; |
116 | |||
117 | uint64_t op_id GNUNET_PACKED; | ||
116 | 118 | ||
117 | struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; | 119 | struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; |
118 | 120 | ||
119 | uint64_t announced_at; | 121 | uint64_t announced_at GNUNET_PACKED; |
120 | 122 | ||
121 | uint64_t effective_since; | 123 | uint64_t effective_since GNUNET_PACKED; |
122 | 124 | ||
123 | uint8_t did_join; | 125 | uint8_t did_join; |
124 | }; | 126 | }; |
125 | 127 | ||
126 | 128 | ||
127 | struct StoryRequest | 129 | struct HistoryRequest |
128 | { | 130 | { |
129 | /** | 131 | /** |
130 | * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STORY_REQUEST | 132 | * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_HISTORY_REQUEST |
131 | */ | 133 | */ |
132 | struct GNUNET_MessageHeader header; | 134 | struct GNUNET_MessageHeader header; |
133 | 135 | ||
134 | uint64_t op_id; | 136 | uint32_t reserved GNUNET_PACKED; |
137 | |||
138 | /** | ||
139 | * ID for this operation. | ||
140 | */ | ||
141 | uint64_t op_id GNUNET_PACKED; | ||
142 | |||
143 | uint64_t start_message_id GNUNET_PACKED; | ||
135 | 144 | ||
136 | uint64_t start_message_id; | 145 | uint64_t end_message_id GNUNET_PACKED; |
137 | 146 | ||
138 | uint64_t end_message_id; | 147 | uint64_t message_limit GNUNET_PACKED; |
139 | }; | 148 | }; |
140 | 149 | ||
141 | 150 | ||
@@ -148,10 +157,12 @@ struct StateRequest | |||
148 | */ | 157 | */ |
149 | struct GNUNET_MessageHeader header; | 158 | struct GNUNET_MessageHeader header; |
150 | 159 | ||
160 | uint32_t reserved GNUNET_PACKED; | ||
161 | |||
151 | /** | 162 | /** |
152 | * ID for this operation. | 163 | * ID for this operation. |
153 | */ | 164 | */ |
154 | uint64_t op_id; | 165 | uint64_t op_id GNUNET_PACKED; |
155 | 166 | ||
156 | /* Followed by NUL-terminated name. */ | 167 | /* Followed by NUL-terminated name. */ |
157 | }; | 168 | }; |
@@ -160,25 +171,6 @@ struct StateRequest | |||
160 | /**** service -> library ****/ | 171 | /**** service -> library ****/ |
161 | 172 | ||
162 | 173 | ||
163 | struct CountersResult | ||
164 | { | ||
165 | /** | ||
166 | * Type: GNUNET_MESSAGE_TYPE_PSYC_RESULT_COUNTERS | ||
167 | */ | ||
168 | struct GNUNET_MessageHeader header; | ||
169 | |||
170 | /** | ||
171 | * Status code for the operation. | ||
172 | */ | ||
173 | int32_t result_code GNUNET_PACKED; | ||
174 | |||
175 | /** | ||
176 | * Last message ID sent to the channel. | ||
177 | */ | ||
178 | uint64_t max_message_id; | ||
179 | }; | ||
180 | |||
181 | |||
182 | /** | 174 | /** |
183 | * Answer from service to client about last operation. | 175 | * Answer from service to client about last operation. |
184 | */ | 176 | */ |
@@ -192,23 +184,22 @@ struct OperationResult | |||
192 | */ | 184 | */ |
193 | struct GNUNET_MessageHeader header; | 185 | struct GNUNET_MessageHeader header; |
194 | 186 | ||
187 | uint32_t reserved GNUNET_PACKED; | ||
188 | |||
195 | /** | 189 | /** |
196 | * Operation ID. | 190 | * Operation ID. |
197 | */ | 191 | */ |
198 | uint32_t op_id GNUNET_PACKED; | 192 | uint64_t op_id GNUNET_PACKED; |
199 | 193 | ||
200 | /** | 194 | /** |
201 | * Status code for the operation. | 195 | * Status code for the operation. |
202 | */ | 196 | */ |
203 | int64_t result_code GNUNET_PACKED; | 197 | uint64_t result_code GNUNET_PACKED; |
204 | 198 | ||
205 | /* Followed by: | 199 | /* Followed by: |
206 | * - on error: NUL-terminated error message | 200 | * - on error: NUL-terminated error message |
207 | * - on success: one of the following message types | 201 | * - on success: one of the following message types |
208 | * | 202 | * |
209 | * For a STORY_RESULT: | ||
210 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE | ||
211 | * | ||
212 | * For a STATE_RESULT, one of: | 203 | * For a STATE_RESULT, one of: |
213 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER | 204 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER |
214 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT | 205 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT |
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index ca25b1b01..8cce89704 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -43,6 +43,33 @@ | |||
43 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) | 43 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) |
44 | 44 | ||
45 | 45 | ||
46 | struct OperationListItem | ||
47 | { | ||
48 | struct OperationListItem *prev; | ||
49 | struct OperationListItem *next; | ||
50 | |||
51 | /** | ||
52 | * Operation ID. | ||
53 | */ | ||
54 | uint64_t op_id; | ||
55 | |||
56 | /** | ||
57 | * Continuation to invoke with the result of an operation. | ||
58 | */ | ||
59 | GNUNET_PSYC_ResultCallback result_cb; | ||
60 | |||
61 | /** | ||
62 | * State variable result callback. | ||
63 | */ | ||
64 | GNUNET_PSYC_StateVarCallback state_var_cb; | ||
65 | |||
66 | /** | ||
67 | * Closure for the callbacks. | ||
68 | */ | ||
69 | void *cls; | ||
70 | }; | ||
71 | |||
72 | |||
46 | /** | 73 | /** |
47 | * Handle to access PSYC channel operations for both the master and slaves. | 74 | * Handle to access PSYC channel operations for both the master and slaves. |
48 | */ | 75 | */ |
@@ -84,6 +111,21 @@ struct GNUNET_PSYC_Channel | |||
84 | void *disconnect_cls; | 111 | void *disconnect_cls; |
85 | 112 | ||
86 | /** | 113 | /** |
114 | * First operation in the linked list. | ||
115 | */ | ||
116 | struct OperationListItem *op_head; | ||
117 | |||
118 | /** | ||
119 | * Last operation in the linked list. | ||
120 | */ | ||
121 | struct OperationListItem *op_tail; | ||
122 | |||
123 | /** | ||
124 | * Last operation ID used. | ||
125 | */ | ||
126 | uint64_t last_op_id; | ||
127 | |||
128 | /** | ||
87 | * Are we polling for incoming messages right now? | 129 | * Are we polling for incoming messages right now? |
88 | */ | 130 | */ |
89 | uint8_t in_receive; | 131 | uint8_t in_receive; |
@@ -163,21 +205,82 @@ struct GNUNET_PSYC_SlaveTransmitHandle | |||
163 | 205 | ||
164 | 206 | ||
165 | /** | 207 | /** |
166 | * Handle to a story telling operation. | 208 | * Get a fresh operation ID to distinguish between PSYCstore requests. |
209 | * | ||
210 | * @param h Handle to the PSYCstore service. | ||
211 | * @return next operation id to use | ||
167 | */ | 212 | */ |
168 | struct GNUNET_PSYC_Story | 213 | static uint64_t |
214 | op_get_next_id (struct GNUNET_PSYC_Channel *chn) | ||
169 | { | 215 | { |
170 | 216 | return ++chn->last_op_id; | |
171 | }; | 217 | } |
172 | 218 | ||
173 | 219 | ||
174 | /** | 220 | /** |
175 | * Handle for a state query operation. | 221 | * Find operation by ID. |
222 | * | ||
223 | * @return Operation, or NULL if none found. | ||
176 | */ | 224 | */ |
177 | struct GNUNET_PSYC_StateQuery | 225 | static struct OperationListItem * |
226 | op_find_by_id (struct GNUNET_PSYC_Channel *chn, uint64_t op_id) | ||
178 | { | 227 | { |
228 | struct OperationListItem *op = chn->op_head; | ||
229 | while (NULL != op) | ||
230 | { | ||
231 | if (op->op_id == op_id) | ||
232 | return op; | ||
233 | op = op->next; | ||
234 | } | ||
235 | return NULL; | ||
236 | } | ||
179 | 237 | ||
180 | }; | 238 | |
239 | static uint64_t | ||
240 | op_add (struct GNUNET_PSYC_Channel *chn, GNUNET_PSYC_ResultCallback result_cb, | ||
241 | void *cls) | ||
242 | { | ||
243 | if (NULL == result_cb) | ||
244 | return 0; | ||
245 | |||
246 | struct OperationListItem *op = GNUNET_malloc (sizeof (*op)); | ||
247 | op->op_id = op_get_next_id (chn); | ||
248 | op->result_cb = result_cb; | ||
249 | op->cls = cls; | ||
250 | GNUNET_CONTAINER_DLL_insert_tail (chn->op_head, chn->op_tail, op); | ||
251 | |||
252 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
253 | "%p Added operation #%" PRIu64 "\n", chn, op->op_id); | ||
254 | return op->op_id; | ||
255 | } | ||
256 | |||
257 | |||
258 | static int | ||
259 | op_result (struct GNUNET_PSYC_Channel *chn, uint64_t op_id, | ||
260 | int64_t result_code, const char *err_msg) | ||
261 | { | ||
262 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
263 | "%p Received result for operation #%" PRIu64 ": %" PRId64 " (%s)\n", | ||
264 | chn, op_id, result_code, err_msg); | ||
265 | if (0 == op_id) | ||
266 | return GNUNET_NO; | ||
267 | |||
268 | struct OperationListItem *op = op_find_by_id (chn, op_id); | ||
269 | if (NULL == op) | ||
270 | { | ||
271 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
272 | "Could not find operation #%" PRIu64 "\n", op_id); | ||
273 | return GNUNET_NO; | ||
274 | } | ||
275 | |||
276 | GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op); | ||
277 | |||
278 | if (NULL != op->result_cb) | ||
279 | op->result_cb (op->cls, result_code, err_msg); | ||
280 | |||
281 | GNUNET_free (op); | ||
282 | return GNUNET_YES; | ||
283 | } | ||
181 | 284 | ||
182 | 285 | ||
183 | static void | 286 | static void |
@@ -203,6 +306,79 @@ channel_recv_disconnect (void *cls, | |||
203 | 306 | ||
204 | 307 | ||
205 | static void | 308 | static void |
309 | channel_recv_result (void *cls, | ||
310 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
311 | const struct GNUNET_MessageHeader *msg) | ||
312 | { | ||
313 | struct GNUNET_PSYC_Channel * | ||
314 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | ||
315 | |||
316 | uint16_t size = ntohs (msg->size); | ||
317 | const struct OperationResult *res = (const struct OperationResult *) msg; | ||
318 | const char *err_msg = NULL; | ||
319 | |||
320 | if (sizeof (struct OperationResult) < size) | ||
321 | { | ||
322 | err_msg = (const char *) &res[1]; | ||
323 | if ('\0' != err_msg[size - sizeof (struct OperationResult) - 1]) | ||
324 | { | ||
325 | GNUNET_break (0); | ||
326 | err_msg = NULL; | ||
327 | } | ||
328 | } | ||
329 | |||
330 | op_result (chn, GNUNET_ntohll (res->op_id), | ||
331 | GNUNET_ntohll (res->result_code) + INT64_MIN, err_msg); | ||
332 | } | ||
333 | |||
334 | |||
335 | static void | ||
336 | channel_recv_state_result (void *cls, | ||
337 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
338 | const struct GNUNET_MessageHeader *msg) | ||
339 | { | ||
340 | struct GNUNET_PSYC_Channel * | ||
341 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | ||
342 | |||
343 | const struct OperationResult *res = (const struct OperationResult *) msg; | ||
344 | struct OperationListItem *op = op_find_by_id (chn, GNUNET_ntohll (res->op_id)); | ||
345 | if (NULL == op || NULL == op->state_var_cb) | ||
346 | return; | ||
347 | |||
348 | const struct GNUNET_MessageHeader *modc = (struct GNUNET_MessageHeader *) &op[1]; | ||
349 | uint16_t modc_size = ntohs (modc->size); | ||
350 | if (ntohs (msg->size) - sizeof (*msg) != modc_size) | ||
351 | { | ||
352 | GNUNET_break (0); | ||
353 | return; | ||
354 | } | ||
355 | switch (ntohs (modc->type)) | ||
356 | { | ||
357 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
358 | { | ||
359 | const struct GNUNET_PSYC_MessageModifier * | ||
360 | mod = (const struct GNUNET_PSYC_MessageModifier *) modc; | ||
361 | |||
362 | const char *name = (const char *) &mod[1]; | ||
363 | uint16_t name_size = ntohs (mod->name_size); | ||
364 | if ('\0' != name[name_size - 1]) | ||
365 | { | ||
366 | GNUNET_break (0); | ||
367 | return; | ||
368 | } | ||
369 | op->state_var_cb (op->cls, name, name + name_size, ntohs (mod->value_size)); | ||
370 | break; | ||
371 | } | ||
372 | |||
373 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
374 | op->state_var_cb (op->cls, NULL, (const char *) &modc[1], | ||
375 | modc_size - sizeof (*modc)); | ||
376 | break; | ||
377 | } | ||
378 | } | ||
379 | |||
380 | |||
381 | static void | ||
206 | channel_recv_message (void *cls, | 382 | channel_recv_message (void *cls, |
207 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 383 | struct GNUNET_CLIENT_MANAGER_Connection *client, |
208 | const struct GNUNET_MessageHeader *msg) | 384 | const struct GNUNET_MessageHeader *msg) |
@@ -234,9 +410,16 @@ master_recv_start_ack (void *cls, | |||
234 | mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | 410 | mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, |
235 | sizeof (struct GNUNET_PSYC_Channel)); | 411 | sizeof (struct GNUNET_PSYC_Channel)); |
236 | 412 | ||
237 | struct CountersResult *cres = (struct CountersResult *) msg; | 413 | struct GNUNET_PSYC_CountersResultMessage * |
414 | cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; | ||
415 | int32_t result = ntohl (cres->result_code) + INT32_MIN; | ||
416 | if (GNUNET_OK != result && GNUNET_NO != result) | ||
417 | { | ||
418 | LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master.\n"); | ||
419 | GNUNET_break (0); | ||
420 | } | ||
238 | if (NULL != mst->start_cb) | 421 | if (NULL != mst->start_cb) |
239 | mst->start_cb (mst->cb_cls, GNUNET_ntohll (cres->max_message_id)); | 422 | mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); |
240 | } | 423 | } |
241 | 424 | ||
242 | 425 | ||
@@ -279,9 +462,16 @@ slave_recv_join_ack (void *cls, | |||
279 | struct GNUNET_PSYC_Slave * | 462 | struct GNUNET_PSYC_Slave * |
280 | slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | 463 | slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, |
281 | sizeof (struct GNUNET_PSYC_Channel)); | 464 | sizeof (struct GNUNET_PSYC_Channel)); |
282 | struct CountersResult *cres = (struct CountersResult *) msg; | 465 | struct GNUNET_PSYC_CountersResultMessage * |
466 | cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; | ||
467 | int32_t result = ntohl (cres->result_code) + INT32_MIN; | ||
468 | if (GNUNET_YES != result && GNUNET_NO != result) | ||
469 | { | ||
470 | LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n"); | ||
471 | GNUNET_break (0); | ||
472 | } | ||
283 | if (NULL != slv->connect_cb) | 473 | if (NULL != slv->connect_cb) |
284 | slv->connect_cb (slv->cb_cls, GNUNET_ntohll (cres->max_message_id)); | 474 | slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); |
285 | } | 475 | } |
286 | 476 | ||
287 | 477 | ||
@@ -317,12 +507,20 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = | |||
317 | 507 | ||
318 | { &master_recv_start_ack, NULL, | 508 | { &master_recv_start_ack, NULL, |
319 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, | 509 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, |
320 | sizeof (struct CountersResult), GNUNET_NO }, | 510 | sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, |
321 | 511 | ||
322 | { &master_recv_join_request, NULL, | 512 | { &master_recv_join_request, NULL, |
323 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | 513 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, |
324 | sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, | 514 | sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, |
325 | 515 | ||
516 | { &channel_recv_state_result, NULL, | ||
517 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
518 | sizeof (struct OperationResult), GNUNET_YES }, | ||
519 | |||
520 | { &channel_recv_result, NULL, | ||
521 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
522 | sizeof (struct OperationResult), GNUNET_YES }, | ||
523 | |||
326 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | 524 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, |
327 | 525 | ||
328 | { NULL, NULL, 0, 0, GNUNET_NO } | 526 | { NULL, NULL, 0, 0, GNUNET_NO } |
@@ -341,12 +539,20 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = | |||
341 | 539 | ||
342 | { &slave_recv_join_ack, NULL, | 540 | { &slave_recv_join_ack, NULL, |
343 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK, | 541 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK, |
344 | sizeof (struct CountersResult), GNUNET_NO }, | 542 | sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, |
345 | 543 | ||
346 | { &slave_recv_join_decision, NULL, | 544 | { &slave_recv_join_decision, NULL, |
347 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, | 545 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, |
348 | sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, | 546 | sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, |
349 | 547 | ||
548 | { &channel_recv_state_result, NULL, | ||
549 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
550 | sizeof (struct OperationResult), GNUNET_YES }, | ||
551 | |||
552 | { &channel_recv_result, NULL, | ||
553 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
554 | sizeof (struct OperationResult), GNUNET_YES }, | ||
555 | |||
350 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | 556 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, |
351 | 557 | ||
352 | { NULL, NULL, 0, 0, GNUNET_NO } | 558 | { NULL, NULL, 0, 0, GNUNET_NO } |
@@ -808,7 +1014,9 @@ void | |||
808 | GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, | 1014 | GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, |
809 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | 1015 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, |
810 | uint64_t announced_at, | 1016 | uint64_t announced_at, |
811 | uint64_t effective_since) | 1017 | uint64_t effective_since, |
1018 | GNUNET_PSYC_ResultCallback result_cb, | ||
1019 | void *cls) | ||
812 | { | 1020 | { |
813 | struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); | 1021 | struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); |
814 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); | 1022 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); |
@@ -817,6 +1025,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, | |||
817 | req->announced_at = GNUNET_htonll (announced_at); | 1025 | req->announced_at = GNUNET_htonll (announced_at); |
818 | req->effective_since = GNUNET_htonll (effective_since); | 1026 | req->effective_since = GNUNET_htonll (effective_since); |
819 | req->did_join = GNUNET_YES; | 1027 | req->did_join = GNUNET_YES; |
1028 | req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); | ||
1029 | |||
820 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1030 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); |
821 | } | 1031 | } |
822 | 1032 | ||
@@ -845,7 +1055,9 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, | |||
845 | void | 1055 | void |
846 | GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, | 1056 | GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, |
847 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | 1057 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, |
848 | uint64_t announced_at) | 1058 | uint64_t announced_at, |
1059 | GNUNET_PSYC_ResultCallback result_cb, | ||
1060 | void *cls) | ||
849 | { | 1061 | { |
850 | struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); | 1062 | struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); |
851 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); | 1063 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); |
@@ -853,57 +1065,85 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, | |||
853 | req->slave_key = *slave_key; | 1065 | req->slave_key = *slave_key; |
854 | req->announced_at = GNUNET_htonll (announced_at); | 1066 | req->announced_at = GNUNET_htonll (announced_at); |
855 | req->did_join = GNUNET_NO; | 1067 | req->did_join = GNUNET_NO; |
1068 | req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); | ||
1069 | |||
856 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1070 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); |
857 | } | 1071 | } |
858 | 1072 | ||
859 | 1073 | ||
860 | /** | 1074 | /** |
861 | * Request to be told the message history of the channel. | 1075 | * Request to replay a part of the message history of the channel. |
862 | * | 1076 | * |
863 | * Historic messages (but NOT the state at the time) will be replayed (given to | 1077 | * Historic messages (but NOT the state at the time) will be replayed (given to |
864 | * the normal method handlers) if available and if access is permitted. | 1078 | * the normal method handlers) if available and if access is permitted. |
865 | * | 1079 | * |
866 | * To get the latest message, use 0 for both the start and end message ID. | 1080 | * @param channel |
867 | * | 1081 | * Which channel should be replayed? |
868 | * @param channel Which channel should be replayed? | 1082 | * @param start_message_id |
869 | * @param start_message_id Earliest interesting point in history. | 1083 | * Earliest interesting point in history. |
870 | * @param end_message_id Last (exclusive) interesting point in history. | 1084 | * @param end_message_id |
871 | * @param message_cb Function to invoke on message parts received from the story. | 1085 | * Last (inclusive) interesting point in history. |
872 | * @param finish_cb Function to call when the requested story has been fully | 1086 | * FIXME: @param method_prefix |
873 | * told (counting message IDs might not suffice, as some messages | 1087 | * Retrieve only messages with a matching method prefix. |
874 | * might be secret and thus the listener would not know the story is | 1088 | * @param result_cb |
875 | * finished without being told explicitly) once this function | 1089 | * Function to call when the requested history has been fully replayed. |
876 | * has been called, the client must not call | 1090 | * @param cls |
877 | * GNUNET_PSYC_channel_story_tell_cancel() anymore. | 1091 | * Closure for the callbacks. |
878 | * @param cls Closure for the callbacks. | 1092 | * |
879 | * | 1093 | * @return Handle to cancel history replay operation. |
880 | * @return Handle to cancel story telling operation. | ||
881 | */ | 1094 | */ |
882 | struct GNUNET_PSYC_Story * | 1095 | void |
883 | GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, | 1096 | GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn, |
884 | uint64_t start_message_id, | 1097 | uint64_t start_message_id, |
885 | uint64_t end_message_id, | 1098 | uint64_t end_message_id, |
886 | GNUNET_PSYC_MessageCallback message_cb, | 1099 | /* FIXME: const char *method_prefix, */ |
887 | GNUNET_PSYC_MessagePartCallback message_part_cb, | 1100 | GNUNET_PSYC_ResultCallback result_cb, |
888 | GNUNET_PSYC_FinishCallback finish_cb, | 1101 | void *cls) |
889 | void *cls) | ||
890 | { | 1102 | { |
891 | return NULL; | 1103 | struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); |
1104 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); | ||
1105 | req->header.size = htons (sizeof (*req)); | ||
1106 | req->start_message_id = GNUNET_htonll (start_message_id); | ||
1107 | req->end_message_id = GNUNET_htonll (end_message_id); | ||
1108 | req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); | ||
1109 | |||
1110 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | ||
892 | } | 1111 | } |
893 | 1112 | ||
894 | 1113 | ||
895 | /** | 1114 | /** |
896 | * Abort story telling. | 1115 | * Request to replay the latest messages from the message history of the channel. |
897 | * | 1116 | * |
898 | * This function must not be called from within method handlers (as given to | 1117 | * Historic messages (but NOT the state at the time) will be replayed (given to |
899 | * GNUNET_PSYC_slave_join()) of the slave. | 1118 | * the normal method handlers) if available and if access is permitted. |
900 | * | 1119 | * |
901 | * @param story Story telling operation to stop. | 1120 | * @param channel |
1121 | * Which channel should be replayed? | ||
1122 | * @param message_limit | ||
1123 | * Maximum number of messages to replay. | ||
1124 | * FIXME: @param method_prefix | ||
1125 | * Retrieve only messages with a matching method prefix. | ||
1126 | * @param result_cb | ||
1127 | * Function to call when the requested history has been fully replayed. | ||
1128 | * @param cls | ||
1129 | * Closure for the callbacks. | ||
1130 | * | ||
1131 | * @return Handle to cancel history replay operation. | ||
902 | */ | 1132 | */ |
903 | void | 1133 | void |
904 | GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story) | 1134 | GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn, |
1135 | uint64_t message_limit, | ||
1136 | /* FIXME: const char *method_prefix, */ | ||
1137 | GNUNET_PSYC_ResultCallback result_cb, | ||
1138 | void *cls) | ||
905 | { | 1139 | { |
1140 | struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); | ||
1141 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); | ||
1142 | req->header.size = htons (sizeof (*req)); | ||
1143 | req->message_limit = GNUNET_htonll (message_limit); | ||
1144 | req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); | ||
906 | 1145 | ||
1146 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | ||
907 | } | 1147 | } |
908 | 1148 | ||
909 | 1149 | ||
@@ -914,22 +1154,35 @@ GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story) | |||
914 | * less-specific name is matched; for example, requesting "_a_b" will match "_a" | 1154 | * less-specific name is matched; for example, requesting "_a_b" will match "_a" |
915 | * if "_a_b" does not exist. | 1155 | * if "_a_b" does not exist. |
916 | * | 1156 | * |
917 | * @param channel Channel handle. | 1157 | * @param channel |
918 | * @param full_name Full name of the requested variable, the actual variable | 1158 | * Channel handle. |
919 | * returned might have a shorter name.. | 1159 | * @param full_name |
920 | * @param cb Function called once when a matching state variable is found. | 1160 | * Full name of the requested variable. |
1161 | * The actual variable returned might have a shorter name. | ||
1162 | * @param var_cb | ||
1163 | * Function called once when a matching state variable is found. | ||
921 | * Not called if there's no matching state variable. | 1164 | * Not called if there's no matching state variable. |
922 | * @param cb_cls Closure for the callbacks. | 1165 | * @param result_cb |
923 | * | 1166 | * Function called after the operation finished. |
924 | * @return Handle that can be used to cancel the query operation. | 1167 | * (i.e. all state variables have been returned via @a state_cb) |
1168 | * @param cls | ||
1169 | * Closure for the callbacks. | ||
925 | */ | 1170 | */ |
926 | struct GNUNET_PSYC_StateQuery * | 1171 | void |
927 | GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel, | 1172 | GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn, |
928 | const char *full_name, | 1173 | const char *full_name, |
929 | GNUNET_PSYC_StateCallback cb, | 1174 | GNUNET_PSYC_StateVarCallback var_cb, |
930 | void *cb_cls) | 1175 | GNUNET_PSYC_ResultCallback result_cb, |
1176 | void *cls) | ||
931 | { | 1177 | { |
932 | return NULL; | 1178 | size_t name_size = strlen (full_name) + 1; |
1179 | struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); | ||
1180 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET); | ||
1181 | req->header.size = htons (sizeof (*req) + name_size); | ||
1182 | req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); | ||
1183 | memcpy (&req[1], full_name, name_size); | ||
1184 | |||
1185 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | ||
933 | } | 1186 | } |
934 | 1187 | ||
935 | 1188 | ||
@@ -943,33 +1196,34 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel, | |||
943 | * The @a state_cb is invoked on all matching state variables asynchronously, as | 1196 | * The @a state_cb is invoked on all matching state variables asynchronously, as |
944 | * the state is stored in and retrieved from the PSYCstore, | 1197 | * the state is stored in and retrieved from the PSYCstore, |
945 | * | 1198 | * |
946 | * @param channel Channel handle. | 1199 | * @param channel |
947 | * @param name_prefix Prefix of the state variable name to match. | 1200 | * Channel handle. |
948 | * @param cb Function to call with the matching state variables. | 1201 | * @param name_prefix |
949 | * @param cb_cls Closure for the callbacks. | 1202 | * Prefix of the state variable name to match. |
950 | * | 1203 | * @param var_cb |
951 | * @return Handle that can be used to cancel the query operation. | 1204 | * Function called once when a matching state variable is found. |
952 | */ | 1205 | * Not called if there's no matching state variable. |
953 | struct GNUNET_PSYC_StateQuery * | 1206 | * @param result_cb |
954 | GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel, | 1207 | * Function called after the operation finished. |
955 | const char *name_prefix, | 1208 | * (i.e. all state variables have been returned via @a state_cb) |
956 | GNUNET_PSYC_StateCallback cb, | 1209 | * @param cls |
957 | void *cb_cls) | 1210 | * Closure for the callbacks. |
958 | { | ||
959 | return NULL; | ||
960 | } | ||
961 | |||
962 | |||
963 | /** | ||
964 | * Cancel a state query operation. | ||
965 | * | ||
966 | * @param query Handle for the operation to cancel. | ||
967 | */ | 1211 | */ |
968 | void | 1212 | void |
969 | GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateQuery *query) | 1213 | GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn, |
1214 | const char *name_prefix, | ||
1215 | GNUNET_PSYC_StateVarCallback var_cb, | ||
1216 | GNUNET_PSYC_ResultCallback result_cb, | ||
1217 | void *cls) | ||
970 | { | 1218 | { |
1219 | size_t name_size = strlen (name_prefix) + 1; | ||
1220 | struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); | ||
1221 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET); | ||
1222 | req->header.size = htons (sizeof (*req) + name_size); | ||
1223 | req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); | ||
1224 | memcpy (&req[1], name_prefix, name_size); | ||
971 | 1225 | ||
1226 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | ||
972 | } | 1227 | } |
973 | 1228 | ||
974 | |||
975 | /* end of psyc_api.c */ | 1229 | /* end of psyc_api.c */ |
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 5eadef62c..044895809 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -35,6 +35,7 @@ | |||
35 | #include "gnunet_env_lib.h" | 35 | #include "gnunet_env_lib.h" |
36 | #include "gnunet_psyc_util_lib.h" | 36 | #include "gnunet_psyc_util_lib.h" |
37 | #include "gnunet_psyc_service.h" | 37 | #include "gnunet_psyc_service.h" |
38 | #include "gnunet_core_service.h" | ||
38 | 39 | ||
39 | #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) | 40 | #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) |
40 | 41 | ||
@@ -45,6 +46,9 @@ int res; | |||
45 | 46 | ||
46 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 47 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
47 | 48 | ||
49 | struct GNUNET_CORE_Handle *core; | ||
50 | struct GNUNET_PeerIdentity this_peer; | ||
51 | |||
48 | /** | 52 | /** |
49 | * Handle for task for timeout termination. | 53 | * Handle for task for timeout termination. |
50 | */ | 54 | */ |
@@ -53,6 +57,8 @@ GNUNET_SCHEDULER_TaskIdentifier end_badly_task; | |||
53 | struct GNUNET_PSYC_Master *mst; | 57 | struct GNUNET_PSYC_Master *mst; |
54 | struct GNUNET_PSYC_Slave *slv; | 58 | struct GNUNET_PSYC_Slave *slv; |
55 | 59 | ||
60 | struct GNUNET_PSYC_Channel *mst_chn, *slv_chn; | ||
61 | |||
56 | struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key; | 62 | struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key; |
57 | struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key; | 63 | struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key; |
58 | 64 | ||
@@ -80,9 +86,19 @@ uint8_t join_req_count; | |||
80 | 86 | ||
81 | enum | 87 | enum |
82 | { | 88 | { |
83 | TEST_NONE, | 89 | TEST_NONE = 0, |
84 | TEST_SLAVE_TRANSMIT, | 90 | TEST_MASTER_START = 1, |
85 | TEST_MASTER_TRANSMIT, | 91 | TEST_SLAVE_JOIN = 2, |
92 | TEST_SLAVE_TRANSMIT = 3, | ||
93 | TEST_MASTER_TRANSMIT = 4, | ||
94 | TEST_MASTER_HISTORY_REPLAY_LATEST = 5, | ||
95 | TEST_SLAVE_HISTORY_REPLAY_LATEST = 6, | ||
96 | TEST_MASTER_HISTORY_REPLAY = 7, | ||
97 | TEST_SLAVE_HISTORY_REPLAY = 8, | ||
98 | TEST_MASTER_STATE_GET = 9, | ||
99 | TEST_SLAVE_STATE_GET = 10, | ||
100 | TEST_MASTER_STATE_GET_PREFIX = 11, | ||
101 | TEST_SLAVE_STATE_GET_PREFIX = 12, | ||
86 | } test; | 102 | } test; |
87 | 103 | ||
88 | 104 | ||
@@ -118,6 +134,11 @@ void slave_parted (void *cls) | |||
118 | void | 134 | void |
119 | cleanup () | 135 | cleanup () |
120 | { | 136 | { |
137 | if (NULL != core) | ||
138 | { | ||
139 | GNUNET_CORE_disconnect (core); | ||
140 | core = NULL; | ||
141 | } | ||
121 | if (NULL != slv) | 142 | if (NULL != slv) |
122 | { | 143 | { |
123 | GNUNET_PSYC_slave_part (slv, GNUNET_NO, &slave_parted, NULL); | 144 | GNUNET_PSYC_slave_part (slv, GNUNET_NO, &slave_parted, NULL); |
@@ -177,13 +198,203 @@ end () | |||
177 | 198 | ||
178 | 199 | ||
179 | void | 200 | void |
201 | state_get_var (void *cls, const char *name, const void *value, size_t value_size) | ||
202 | { | ||
203 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
204 | "Got state var: %s\n%.*s\n", name, value_size, value); | ||
205 | } | ||
206 | |||
207 | |||
208 | /*** Slave state_get_prefix() ***/ | ||
209 | |||
210 | void | ||
211 | slave_state_get_prefix_result (void *cls, int64_t result, const char *err_msg) | ||
212 | { | ||
213 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
214 | "slave_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); | ||
215 | // FIXME: GNUNET_assert (2 == result); | ||
216 | end (); | ||
217 | } | ||
218 | |||
219 | |||
220 | void | ||
221 | slave_state_get_prefix () | ||
222 | { | ||
223 | test = TEST_SLAVE_STATE_GET_PREFIX; | ||
224 | GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", &state_get_var, | ||
225 | &slave_state_get_prefix_result, NULL); | ||
226 | } | ||
227 | |||
228 | |||
229 | /*** Master state_get_prefix() ***/ | ||
230 | |||
231 | |||
232 | void | ||
233 | master_state_get_prefix_result (void *cls, int64_t result, const char *err_msg) | ||
234 | { | ||
235 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
236 | "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); | ||
237 | // FIXME: GNUNET_assert (2 == result); | ||
238 | slave_state_get_prefix (); | ||
239 | } | ||
240 | |||
241 | |||
242 | void | ||
243 | master_state_get_prefix () | ||
244 | { | ||
245 | test = TEST_MASTER_STATE_GET_PREFIX; | ||
246 | GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", &state_get_var, | ||
247 | &master_state_get_prefix_result, NULL); | ||
248 | } | ||
249 | |||
250 | |||
251 | /*** Slave state_get() ***/ | ||
252 | |||
253 | |||
254 | void | ||
255 | slave_state_get_result (void *cls, int64_t result, const char *err_msg) | ||
256 | { | ||
257 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
258 | "slave_state_get:\t%" PRId64 " (%s)\n", result, err_msg); | ||
259 | // FIXME: GNUNET_assert (2 == result); | ||
260 | master_state_get_prefix (); | ||
261 | } | ||
262 | |||
263 | |||
264 | void | ||
265 | slave_state_get () | ||
266 | { | ||
267 | test = TEST_SLAVE_STATE_GET; | ||
268 | GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", &state_get_var, | ||
269 | &slave_state_get_result, NULL); | ||
270 | } | ||
271 | |||
272 | |||
273 | /*** Master state_get() ***/ | ||
274 | |||
275 | |||
276 | void | ||
277 | master_state_get_result (void *cls, int64_t result, const char *err_msg) | ||
278 | { | ||
279 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
280 | "master_state_get:\t%" PRId64 " (%s)\n", result, err_msg); | ||
281 | // FIXME: GNUNET_assert (1 == result); | ||
282 | slave_state_get (); | ||
283 | } | ||
284 | |||
285 | |||
286 | void | ||
287 | master_state_get () | ||
288 | { | ||
289 | test = TEST_MASTER_STATE_GET; | ||
290 | GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", &state_get_var, | ||
291 | &master_state_get_result, NULL); | ||
292 | } | ||
293 | |||
294 | |||
295 | /*** Slave history_replay() ***/ | ||
296 | |||
297 | void | ||
298 | slave_history_replay_result (void *cls, int64_t result, const char *err_msg) | ||
299 | { | ||
300 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
301 | "slave_history_replay:\t%" PRId64 " (%s)\n", result, err_msg); | ||
302 | GNUNET_assert (9 == result); | ||
303 | |||
304 | master_state_get (); | ||
305 | } | ||
306 | |||
307 | |||
308 | void | ||
309 | slave_history_replay () | ||
310 | { | ||
311 | test = TEST_SLAVE_HISTORY_REPLAY; | ||
312 | GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, | ||
313 | &slave_history_replay_result, | ||
314 | NULL); | ||
315 | } | ||
316 | |||
317 | |||
318 | /*** Master history_replay() ***/ | ||
319 | |||
320 | |||
321 | void | ||
322 | master_history_replay_result (void *cls, int64_t result, const char *err_msg) | ||
323 | { | ||
324 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
325 | "master_history_replay:\t%" PRId64 " (%s)\n", result, err_msg); | ||
326 | GNUNET_assert (9 == result); | ||
327 | |||
328 | slave_history_replay (); | ||
329 | } | ||
330 | |||
331 | |||
332 | void | ||
333 | master_history_replay () | ||
334 | { | ||
335 | test = TEST_MASTER_HISTORY_REPLAY; | ||
336 | GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, | ||
337 | &master_history_replay_result, | ||
338 | NULL); | ||
339 | } | ||
340 | |||
341 | |||
342 | /*** Slave history_replay_latest() ***/ | ||
343 | |||
344 | |||
345 | void | ||
346 | slave_history_replay_latest_result (void *cls, int64_t result, const char *err_msg) | ||
347 | { | ||
348 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
349 | "slave_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg); | ||
350 | GNUNET_assert (9 == result); | ||
351 | |||
352 | master_history_replay (); | ||
353 | } | ||
354 | |||
355 | |||
356 | void | ||
357 | slave_history_replay_latest () | ||
358 | { | ||
359 | test = TEST_SLAVE_HISTORY_REPLAY_LATEST; | ||
360 | GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, | ||
361 | &slave_history_replay_latest_result, | ||
362 | NULL); | ||
363 | } | ||
364 | |||
365 | |||
366 | /*** Master history_replay_latest() ***/ | ||
367 | |||
368 | |||
369 | void | ||
370 | master_history_replay_latest_result (void *cls, int64_t result, const char *err_msg) | ||
371 | { | ||
372 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
373 | "master_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg); | ||
374 | GNUNET_assert (9 == result); | ||
375 | |||
376 | slave_history_replay_latest (); | ||
377 | } | ||
378 | |||
379 | |||
380 | void | ||
381 | master_history_replay_latest () | ||
382 | { | ||
383 | test = TEST_MASTER_HISTORY_REPLAY_LATEST; | ||
384 | GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, | ||
385 | &master_history_replay_latest_result, | ||
386 | NULL); | ||
387 | } | ||
388 | |||
389 | |||
390 | void | ||
180 | master_message_cb (void *cls, uint64_t message_id, uint32_t flags, | 391 | master_message_cb (void *cls, uint64_t message_id, uint32_t flags, |
181 | const struct GNUNET_PSYC_MessageHeader *msg) | 392 | const struct GNUNET_PSYC_MessageHeader *msg) |
182 | { | 393 | { |
183 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 394 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
184 | "Master got PSYC message fragment of size %u " | 395 | "Test #%d: Master got PSYC message fragment of size %u " |
185 | "belonging to message ID %llu with flags %x\n", | 396 | "belonging to message ID %" PRIu64 " with flags %x\n", |
186 | ntohs (msg->header.size), message_id, flags); | 397 | test, ntohs (msg->header.size), message_id, flags); |
187 | // FIXME | 398 | // FIXME |
188 | } | 399 | } |
189 | 400 | ||
@@ -196,7 +407,7 @@ master_message_part_cb (void *cls, uint64_t message_id, | |||
196 | if (NULL == msg) | 407 | if (NULL == msg) |
197 | { | 408 | { |
198 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 409 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
199 | "Error while receiving message %llu\n", message_id); | 410 | "Error while receiving message %" PRIu64 "\n", message_id); |
200 | return; | 411 | return; |
201 | } | 412 | } |
202 | 413 | ||
@@ -204,9 +415,9 @@ master_message_part_cb (void *cls, uint64_t message_id, | |||
204 | uint16_t size = ntohs (msg->size); | 415 | uint16_t size = ntohs (msg->size); |
205 | 416 | ||
206 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 417 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
207 | "Master got message part of type %u and size %u " | 418 | "Test #%d: Master got message part of type %u and size %u " |
208 | "belonging to message ID %llu with flags %x\n", | 419 | "belonging to message ID %" PRIu64 " with flags %x\n", |
209 | type, size, message_id, flags); | 420 | test, type, size, message_id, flags); |
210 | 421 | ||
211 | switch (test) | 422 | switch (test) |
212 | { | 423 | { |
@@ -227,6 +438,18 @@ master_message_part_cb (void *cls, uint64_t message_id, | |||
227 | case TEST_MASTER_TRANSMIT: | 438 | case TEST_MASTER_TRANSMIT: |
228 | break; | 439 | break; |
229 | 440 | ||
441 | case TEST_MASTER_HISTORY_REPLAY: | ||
442 | case TEST_MASTER_HISTORY_REPLAY_LATEST: | ||
443 | if (GNUNET_PSYC_MESSAGE_HISTORIC != flags) | ||
444 | { | ||
445 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
446 | "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", | ||
447 | flags); | ||
448 | GNUNET_assert (0); | ||
449 | return; | ||
450 | } | ||
451 | break; | ||
452 | |||
230 | default: | 453 | default: |
231 | GNUNET_assert (0); | 454 | GNUNET_assert (0); |
232 | } | 455 | } |
@@ -238,9 +461,9 @@ slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, | |||
238 | const struct GNUNET_PSYC_MessageHeader *msg) | 461 | const struct GNUNET_PSYC_MessageHeader *msg) |
239 | { | 462 | { |
240 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 463 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
241 | "Slave got PSYC message fragment of size %u " | 464 | "Test #%d: Slave got PSYC message fragment of size %u " |
242 | "belonging to message ID %llu with flags %x\n", | 465 | "belonging to message ID %" PRIu64 " with flags %x\n", |
243 | ntohs (msg->header.size), message_id, flags); | 466 | test, ntohs (msg->header.size), message_id, flags); |
244 | // FIXME | 467 | // FIXME |
245 | } | 468 | } |
246 | 469 | ||
@@ -253,7 +476,7 @@ slave_message_part_cb (void *cls, uint64_t message_id, | |||
253 | if (NULL == msg) | 476 | if (NULL == msg) |
254 | { | 477 | { |
255 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 478 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
256 | "Error while receiving message %llu\n", message_id); | 479 | "Error while receiving message " PRIu64 "\n", message_id); |
257 | return; | 480 | return; |
258 | } | 481 | } |
259 | 482 | ||
@@ -261,15 +484,27 @@ slave_message_part_cb (void *cls, uint64_t message_id, | |||
261 | uint16_t size = ntohs (msg->size); | 484 | uint16_t size = ntohs (msg->size); |
262 | 485 | ||
263 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 486 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
264 | "Slave got message part of type %u and size %u " | 487 | "Test #%d: Slave got message part of type %u and size %u " |
265 | "belonging to message ID %llu with flags %x\n", | 488 | "belonging to message ID %" PRIu64 " with flags %x\n", |
266 | type, size, message_id, flags); | 489 | test, type, size, message_id, flags); |
267 | 490 | ||
268 | switch (test) | 491 | switch (test) |
269 | { | 492 | { |
270 | case TEST_MASTER_TRANSMIT: | 493 | case TEST_MASTER_TRANSMIT: |
271 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) | 494 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) |
272 | end (); | 495 | master_history_replay_latest (); |
496 | break; | ||
497 | |||
498 | case TEST_SLAVE_HISTORY_REPLAY: | ||
499 | case TEST_SLAVE_HISTORY_REPLAY_LATEST: | ||
500 | if (GNUNET_PSYC_MESSAGE_HISTORIC != flags) | ||
501 | { | ||
502 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
503 | "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", | ||
504 | flags); | ||
505 | GNUNET_assert (0); | ||
506 | return; | ||
507 | } | ||
273 | break; | 508 | break; |
274 | 509 | ||
275 | default: | 510 | default: |
@@ -417,7 +652,6 @@ slave_transmit () | |||
417 | { | 652 | { |
418 | 653 | ||
419 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n"); | 654 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n"); |
420 | |||
421 | test = TEST_SLAVE_TRANSMIT; | 655 | test = TEST_SLAVE_TRANSMIT; |
422 | 656 | ||
423 | tmit = GNUNET_new (struct TransmitClosure); | 657 | tmit = GNUNET_new (struct TransmitClosure); |
@@ -438,6 +672,29 @@ slave_transmit () | |||
438 | 672 | ||
439 | 673 | ||
440 | void | 674 | void |
675 | slave_remove_cb (void *cls, int64_t result, const char *err_msg) | ||
676 | { | ||
677 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
678 | "slave_remove:\t%" PRId64 " (%s)\n", result, err_msg); | ||
679 | |||
680 | slave_transmit (); | ||
681 | } | ||
682 | |||
683 | |||
684 | void | ||
685 | slave_add_cb (void *cls, int64_t result, const char *err_msg) | ||
686 | { | ||
687 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
688 | "slave_add:\t%" PRId64 " (%s)\n", result, err_msg); | ||
689 | |||
690 | struct GNUNET_PSYC_Channel *chn = cls; | ||
691 | GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2, | ||
692 | &slave_remove_cb, chn); | ||
693 | |||
694 | } | ||
695 | |||
696 | |||
697 | void | ||
441 | join_decision_cb (void *cls, | 698 | join_decision_cb (void *cls, |
442 | const struct GNUNET_PSYC_JoinDecisionMessage *dcsn, | 699 | const struct GNUNET_PSYC_JoinDecisionMessage *dcsn, |
443 | int is_admitted, | 700 | int is_admitted, |
@@ -453,7 +710,8 @@ join_decision_cb (void *cls, | |||
453 | return; | 710 | return; |
454 | } | 711 | } |
455 | 712 | ||
456 | slave_transmit (); | 713 | struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst); |
714 | GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn); | ||
457 | } | 715 | } |
458 | 716 | ||
459 | 717 | ||
@@ -473,19 +731,17 @@ join_request_cb (void *cls, | |||
473 | /* Reject first request */ | 731 | /* Reject first request */ |
474 | int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO; | 732 | int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO; |
475 | GNUNET_PSYC_join_decision (jh, is_admitted, 0, NULL, NULL); | 733 | GNUNET_PSYC_join_decision (jh, is_admitted, 0, NULL, NULL); |
476 | |||
477 | /* Membership store */ | ||
478 | struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst); | ||
479 | GNUNET_PSYC_channel_slave_add (chn, slave_key, 2, 2); | ||
480 | GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2); | ||
481 | } | 734 | } |
482 | 735 | ||
483 | 736 | ||
484 | void | 737 | void |
485 | slave_connect_cb (void *cls, uint64_t max_message_id) | 738 | slave_connect_cb (void *cls, int result, uint64_t max_message_id) |
486 | { | 739 | { |
487 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 740 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
488 | "Slave connected: %lu\n", max_message_id); | 741 | "Slave connected: %d, max_message_id: %" PRIu64 "\n", |
742 | result, max_message_id); | ||
743 | GNUNET_assert (TEST_SLAVE_JOIN == test); | ||
744 | GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); | ||
489 | } | 745 | } |
490 | 746 | ||
491 | 747 | ||
@@ -493,8 +749,9 @@ void | |||
493 | slave_join () | 749 | slave_join () |
494 | { | 750 | { |
495 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n"); | 751 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n"); |
752 | test = TEST_SLAVE_JOIN; | ||
496 | 753 | ||
497 | struct GNUNET_PeerIdentity origin = {}; // FIXME: this peer | 754 | struct GNUNET_PeerIdentity origin = this_peer; |
498 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); | 755 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); |
499 | GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, | 756 | GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, |
500 | "_foo", "bar baz", 7); | 757 | "_foo", "bar baz", 7); |
@@ -507,6 +764,7 @@ slave_join () | |||
507 | &slave_message_cb, &slave_message_part_cb, | 764 | &slave_message_cb, &slave_message_part_cb, |
508 | &slave_connect_cb, &join_decision_cb, NULL, | 765 | &slave_connect_cb, &join_decision_cb, NULL, |
509 | join_msg); | 766 | join_msg); |
767 | slv_chn = GNUNET_PSYC_slave_get_channel (slv); | ||
510 | GNUNET_ENV_environment_destroy (env); | 768 | GNUNET_ENV_environment_destroy (env); |
511 | } | 769 | } |
512 | 770 | ||
@@ -564,10 +822,13 @@ master_transmit () | |||
564 | 822 | ||
565 | 823 | ||
566 | void | 824 | void |
567 | master_start_cb (void *cls, uint64_t max_message_id) | 825 | master_start_cb (void *cls, int result, uint64_t max_message_id) |
568 | { | 826 | { |
569 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 827 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
570 | "Master started: %" PRIu64 "\n", max_message_id); | 828 | "Master started: %d, max_message_id: %" PRIu64 "\n", |
829 | result, max_message_id); | ||
830 | GNUNET_assert (TEST_MASTER_START == test); | ||
831 | GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); | ||
571 | slave_join (); | 832 | slave_join (); |
572 | } | 833 | } |
573 | 834 | ||
@@ -576,10 +837,12 @@ void | |||
576 | master_start () | 837 | master_start () |
577 | { | 838 | { |
578 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); | 839 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); |
840 | test = TEST_MASTER_START; | ||
579 | mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, | 841 | mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, |
580 | &master_start_cb, &join_request_cb, | 842 | &master_start_cb, &join_request_cb, |
581 | &master_message_cb, &master_message_part_cb, | 843 | &master_message_cb, &master_message_part_cb, |
582 | NULL); | 844 | NULL); |
845 | mst_chn = GNUNET_PSYC_master_get_channel (mst); | ||
583 | } | 846 | } |
584 | 847 | ||
585 | void | 848 | void |
@@ -589,6 +852,21 @@ schedule_master_start (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
589 | } | 852 | } |
590 | 853 | ||
591 | 854 | ||
855 | void | ||
856 | core_connected (void *cls, const struct GNUNET_PeerIdentity *my_identity) | ||
857 | { | ||
858 | this_peer = *my_identity; | ||
859 | |||
860 | #if DEBUG_TEST_PSYC | ||
861 | master_start (); | ||
862 | #else | ||
863 | /* Allow some time for the services to initialize. */ | ||
864 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, | ||
865 | &schedule_master_start, NULL); | ||
866 | #endif | ||
867 | |||
868 | } | ||
869 | |||
592 | /** | 870 | /** |
593 | * Main function of the test, run from scheduler. | 871 | * Main function of the test, run from scheduler. |
594 | * | 872 | * |
@@ -615,14 +893,8 @@ run (void *cls, | |||
615 | GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); | 893 | GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); |
616 | GNUNET_CRYPTO_ecdsa_key_get_public (slave_key, &slave_pub_key); | 894 | GNUNET_CRYPTO_ecdsa_key_get_public (slave_key, &slave_pub_key); |
617 | 895 | ||
618 | #if DEBUG_TEST_PSYC | 896 | core = GNUNET_CORE_connect (cfg, NULL, &core_connected, NULL, NULL, |
619 | master_start (); | 897 | NULL, GNUNET_NO, NULL, GNUNET_NO, NULL); |
620 | #else | ||
621 | /* Allow some time for the services to initialize. */ | ||
622 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, | ||
623 | &schedule_master_start, NULL); | ||
624 | #endif | ||
625 | return; | ||
626 | } | 898 | } |
627 | 899 | ||
628 | 900 | ||