aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-07-30 21:18:13 +0000
committerGabor X Toth <*@tg-x.net>2014-07-30 21:18:13 +0000
commit40884377f3126bbecbfd3243d47224b3094914f9 (patch)
tree9f32aab9064b199178282a0c9918313e0aa30049 /src/psyc
parent831718fa44b2c56577aa4e36a479fef6debb8cea (diff)
downloadgnunet-40884377f3126bbecbfd3243d47224b3094914f9.tar.gz
gnunet-40884377f3126bbecbfd3243d47224b3094914f9.zip
psyc, psycstore: retrieve state and history
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/Makefile.am2
-rw-r--r--src/psyc/gnunet-service-psyc.c272
-rw-r--r--src/psyc/psyc.h57
-rw-r--r--src/psyc/psyc_api.c414
-rw-r--r--src/psyc/test_psyc.c348
5 files changed, 922 insertions, 171 deletions
diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am
index b18605ab9..fb4341751 100644
--- a/src/psyc/Makefile.am
+++ b/src/psyc/Makefile.am
@@ -86,12 +86,14 @@ test_psyc_LDADD = \
86 libgnunetpsyc.la \ 86 libgnunetpsyc.la \
87 libgnunetpsycutil.la \ 87 libgnunetpsycutil.la \
88 $(top_builddir)/src/testing/libgnunettesting.la \ 88 $(top_builddir)/src/testing/libgnunettesting.la \
89 $(top_builddir)/src/core/libgnunetcore.la \
89 $(top_builddir)/src/env/libgnunetenv.la \ 90 $(top_builddir)/src/env/libgnunetenv.la \
90 $(top_builddir)/src/util/libgnunetutil.la 91 $(top_builddir)/src/util/libgnunetutil.la
91test_psyc_DEPENDENCIES = \ 92test_psyc_DEPENDENCIES = \
92 libgnunetpsyc.la \ 93 libgnunetpsyc.la \
93 libgnunetpsycutil.la \ 94 libgnunetpsycutil.la \
94 $(top_builddir)/src/testing/libgnunettesting.la \ 95 $(top_builddir)/src/testing/libgnunettesting.la \
96 $(top_builddir)/src/core/libgnunetcore.la \
95 $(top_builddir)/src/env/libgnunetenv.la \ 97 $(top_builddir)/src/env/libgnunetenv.la \
96 $(top_builddir)/src/util/libgnunetutil.la 98 $(top_builddir)/src/util/libgnunetutil.la
97 99
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index e7020bc69..3adc34d2a 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -402,6 +402,14 @@ struct Slave
402}; 402};
403 403
404 404
405struct OperationClosure
406{
407 struct GNUNET_SERVER_Client *client;
408 struct Channel *chn;
409 uint64_t op_id;
410};
411
412
405static void 413static void
406transmit_message (struct Channel *chn); 414transmit_message (struct Channel *chn);
407 415
@@ -585,6 +593,46 @@ client_send_msg (const struct Channel *chn,
585 593
586 594
587/** 595/**
596 * Send a result code back to the client.
597 *
598 * @param client
599 * Client that should receive the result code.
600 * @param result_code
601 * Code to transmit.
602 * @param op_id
603 * Operation ID in network byte order.
604 * @param err_msg
605 * Error message to include (or NULL for none).
606 */
607static void
608client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
609 int64_t result_code, const char *err_msg)
610{
611 struct OperationResult *res;
612 size_t err_len = 0; // FIXME: maximum length
613
614 if (NULL != err_msg)
615 err_len = strlen (err_msg) + 1;
616 res = GNUNET_malloc (sizeof (struct OperationResult) + err_len);
617 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
618 res->header.size = htons (sizeof (struct OperationResult) + err_len);
619 res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1);
620 res->op_id = op_id;
621 if (0 < err_len)
622 memcpy (&res[1], err_msg, err_len);
623 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
624 "%p Sending result to client for operation #%" PRIu64 ": "
625 "%" PRId64 " (%s)\n",
626 client, GNUNET_ntohll (op_id), result_code, err_msg);
627
628 GNUNET_SERVER_notification_context_add (nc, client);
629 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
630 GNUNET_NO);
631 GNUNET_free (res);
632}
633
634
635/**
588 * Closure for join_mem_test_cb() 636 * Closure for join_mem_test_cb()
589 */ 637 */
590struct JoinMemTestClosure 638struct JoinMemTestClosure
@@ -799,7 +847,8 @@ mcast_recv_replay_fragment (void *cls,
799 847
800{ 848{
801 struct Channel *chn = cls; 849 struct Channel *chn = cls;
802 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key, fragment_id, 850 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
851 fragment_id, fragment_id,
803 &store_recv_fragment_replay, 852 &store_recv_fragment_replay,
804 &store_recv_fragment_replay_result, rh); 853 &store_recv_fragment_replay_result, rh);
805} 854}
@@ -817,7 +866,8 @@ mcast_recv_replay_message (void *cls,
817 struct GNUNET_MULTICAST_ReplayHandle *rh) 866 struct GNUNET_MULTICAST_ReplayHandle *rh)
818{ 867{
819 struct Channel *chn = cls; 868 struct Channel *chn = cls;
820 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, message_id, 869 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
870 message_id, message_id,
821 &store_recv_fragment_replay, 871 &store_recv_fragment_replay,
822 &store_recv_fragment_replay_result, rh); 872 &store_recv_fragment_replay_result, rh);
823} 873}
@@ -865,7 +915,8 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
865 */ 915 */
866static void 916static void
867client_send_mcast_msg (struct Channel *chn, 917client_send_mcast_msg (struct Channel *chn,
868 const struct GNUNET_MULTICAST_MessageHeader *mmsg) 918 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
919 uint32_t flags)
869{ 920{
870 struct GNUNET_PSYC_MessageHeader *pmsg; 921 struct GNUNET_PSYC_MessageHeader *pmsg;
871 uint16_t size = ntohs (mmsg->header.size); 922 uint16_t size = ntohs (mmsg->header.size);
@@ -882,6 +933,7 @@ client_send_mcast_msg (struct Channel *chn,
882 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 933 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
883 pmsg->message_id = mmsg->message_id; 934 pmsg->message_id = mmsg->message_id;
884 pmsg->fragment_offset = mmsg->fragment_offset; 935 pmsg->fragment_offset = mmsg->fragment_offset;
936 pmsg->flags = htonl (flags);
885 937
886 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); 938 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
887 client_send_msg (chn, &pmsg->header); 939 client_send_msg (chn, &pmsg->header);
@@ -1111,7 +1163,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1111 { 1163 {
1112 if (GNUNET_NO == drop) 1164 if (GNUNET_NO == drop)
1113 { 1165 {
1114 client_send_mcast_msg (chn, cache_entry->mmsg); 1166 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1115 } 1167 }
1116 if (cache_entry->ref_count <= 1) 1168 if (cache_entry->ref_count <= 1)
1117 { 1169 {
@@ -1375,10 +1427,10 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1375 struct Channel *chn = &mst->chn; 1427 struct Channel *chn = &mst->chn;
1376 chn->store_op = NULL; 1428 chn->store_op = NULL;
1377 1429
1378 struct CountersResult res; 1430 struct GNUNET_PSYC_CountersResultMessage res;
1379 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1431 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1380 res.header.size = htons (sizeof (res)); 1432 res.header.size = htons (sizeof (res));
1381 res.result_code = htonl (result); 1433 res.result_code = htonl (result - INT32_MIN);
1382 res.max_message_id = GNUNET_htonll (max_message_id); 1434 res.max_message_id = GNUNET_htonll (max_message_id);
1383 1435
1384 if (GNUNET_OK == result || GNUNET_NO == result) 1436 if (GNUNET_OK == result || GNUNET_NO == result)
@@ -1421,10 +1473,10 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1421 struct Channel *chn = &slv->chn; 1473 struct Channel *chn = &slv->chn;
1422 chn->store_op = NULL; 1474 chn->store_op = NULL;
1423 1475
1424 struct CountersResult res; 1476 struct GNUNET_PSYC_CountersResultMessage res;
1425 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1477 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1426 res.header.size = htons (sizeof (res)); 1478 res.header.size = htons (sizeof (res));
1427 res.result_code = htonl (result); 1479 res.result_code = htonl (result - INT32_MIN);
1428 res.max_message_id = GNUNET_htonll (max_message_id); 1480 res.max_message_id = GNUNET_htonll (max_message_id);
1429 1481
1430 if (GNUNET_OK == result || GNUNET_NO == result) 1482 if (GNUNET_OK == result || GNUNET_NO == result)
@@ -1511,10 +1563,10 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1511 { 1563 {
1512 chn = &mst->chn; 1564 chn = &mst->chn;
1513 1565
1514 struct CountersResult res; 1566 struct GNUNET_PSYC_CountersResultMessage res;
1515 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1567 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1516 res.header.size = htons (sizeof (res)); 1568 res.header.size = htons (sizeof (res));
1517 res.result_code = htonl (GNUNET_OK); 1569 res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN);
1518 res.max_message_id = GNUNET_htonll (mst->max_message_id); 1570 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1519 1571
1520 GNUNET_SERVER_notification_context_add (nc, client); 1572 GNUNET_SERVER_notification_context_add (nc, client);
@@ -1621,10 +1673,10 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1621 { 1673 {
1622 chn = &slv->chn; 1674 chn = &slv->chn;
1623 1675
1624 struct CountersResult res; 1676 struct GNUNET_PSYC_CountersResultMessage res;
1625 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1677 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1626 res.header.size = htons (sizeof (res)); 1678 res.header.size = htons (sizeof (res));
1627 res.result_code = htonl (GNUNET_OK); 1679 res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN);
1628 res.max_message_id = GNUNET_htonll (chn->max_message_id); 1680 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1629 1681
1630 GNUNET_SERVER_notification_context_add (nc, client); 1682 GNUNET_SERVER_notification_context_add (nc, client);
@@ -2047,17 +2099,26 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2047}; 2099};
2048 2100
2049 2101
2102struct MembershipStoreClosure
2103{
2104 struct GNUNET_SERVER_Client *client;
2105 struct Channel *chn;
2106 uint64_t op_id;
2107};
2108
2109
2050/** 2110/**
2051 * Received result of GNUNET_PSYCSTORE_membership_store() 2111 * Received result of GNUNET_PSYCSTORE_membership_store()
2052 */ 2112 */
2053static void 2113static void
2054store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg) 2114store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg)
2055{ 2115{
2056 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls; 2116 struct MembershipStoreClosure *mcls = cls;
2057 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2117 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2058 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n", 2118 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n",
2059 mth, result, err_msg); 2119 mcls->chn, result, err_msg);
2060 /* FIXME: send result to client */ 2120
2121 client_send_result (mcls->client, mcls->op_id, result, err_msg);
2061} 2122}
2062 2123
2063 2124
@@ -2075,6 +2136,11 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2075 const struct ChannelMembershipStoreRequest * 2136 const struct ChannelMembershipStoreRequest *
2076 req = (const struct ChannelMembershipStoreRequest *) msg; 2137 req = (const struct ChannelMembershipStoreRequest *) msg;
2077 2138
2139 struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls));
2140 mcls->client = client;
2141 mcls->chn = chn;
2142 mcls->op_id = req->op_id;
2143
2078 uint64_t announced_at = GNUNET_ntohll (req->announced_at); 2144 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2079 uint64_t effective_since = GNUNET_ntohll (req->effective_since); 2145 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2080 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2086,19 +2152,138 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2086 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key, 2152 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2087 req->did_join, announced_at, effective_since, 2153 req->did_join, announced_at, effective_since,
2088 0, /* FIXME: group_generation */ 2154 0, /* FIXME: group_generation */
2089 &store_recv_membership_store_result, chn); 2155 &store_recv_membership_store_result, mcls);
2090 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2156 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2091} 2157}
2092 2158
2093 2159
2160static int
2161store_recv_fragment_history (void *cls,
2162 struct GNUNET_MULTICAST_MessageHeader *msg,
2163 enum GNUNET_PSYCSTORE_MessageFlags flags)
2164{
2165 struct OperationClosure *opcls = cls;
2166 struct Channel *chn = opcls->chn;
2167 client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC);
2168 return GNUNET_YES;
2169}
2170
2171
2172/**
2173 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
2174 */
2175static void
2176store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg)
2177{
2178 struct OperationClosure *opcls = cls;
2179 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2180 "%p History replay #%" PRIu64 ": "
2181 "PSYCSTORE returned %" PRId64 " (%s)\n",
2182 opcls->chn, opcls->op_id, result, err_msg);
2183
2184 client_send_result (opcls->client, opcls->op_id, result, err_msg);
2185}
2186
2187
2094/** 2188/**
2095 * Client requests channel history from PSYCstore. 2189 * Client requests channel history from PSYCstore.
2096 */ 2190 */
2097static void 2191static void
2098client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client, 2192client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2099 const struct GNUNET_MessageHeader *msg) 2193 const struct GNUNET_MessageHeader *msg)
2194{
2195 struct Channel *
2196 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2197 GNUNET_assert (NULL != chn);
2198
2199 const struct HistoryRequest *
2200 req = (const struct HistoryRequest *) msg;
2201
2202 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2203 opcls->client = client;
2204 opcls->chn = chn;
2205 opcls->op_id = req->op_id;
2206
2207 if (0 == req->message_limit)
2208 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2209 GNUNET_ntohll (req->start_message_id),
2210 GNUNET_ntohll (req->end_message_id),
2211 &store_recv_fragment_history,
2212 &store_recv_fragment_history_result, opcls);
2213 else
2214 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2215 GNUNET_ntohll (req->message_limit),
2216 &store_recv_fragment_history,
2217 &store_recv_fragment_history_result,
2218 opcls);
2219
2220 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2221}
2222
2223
2224/**
2225 * Received state var from PSYCstore, send it to client.
2226 */
2227static int
2228store_recv_state_var (void *cls, const char *name,
2229 const void *value, size_t value_size)
2230{
2231 struct OperationClosure *opcls = cls;
2232 struct OperationResult *op;
2233
2234 if (NULL != name)
2235 {
2236 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2237 struct GNUNET_PSYC_MessageModifier *mod;
2238 op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size);
2239 op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size);
2240 op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2241 op->op_id = opcls->op_id;
2242
2243 mod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
2244 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2245 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2246 mod->name_size = htons (name_size);
2247 mod->value_size = htonl (value_size);
2248 mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2249 memcpy (&mod[1], name, name_size);
2250 memcpy (((char *) &mod[1]) + name_size, value, value_size);
2251 }
2252 else
2253 {
2254 struct GNUNET_MessageHeader *mod;
2255 op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size);
2256 op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size);
2257 op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2258 op->op_id = opcls->op_id;
2259
2260 mod = (struct GNUNET_MessageHeader *) &op[1];
2261 mod->size = htons (sizeof (*mod) + value_size);
2262 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2263 memcpy (&mod[1], value, value_size);
2264 }
2265
2266 GNUNET_SERVER_notification_context_add (nc, opcls->client);
2267 GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header,
2268 GNUNET_NO);
2269 return GNUNET_YES;
2270}
2271
2272
2273/**
2274 * Received result of GNUNET_PSYCSTORE_state_get()
2275 * or GNUNET_PSYCSTORE_state_get_prefix()
2276 */
2277static void
2278store_recv_state_result (void *cls, int64_t result, const char *err_msg)
2100{ 2279{
2280 struct OperationClosure *opcls = cls;
2281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2282 "%p History replay #%" PRIu64 ": "
2283 "PSYCSTORE returned %" PRId64 " (%s)\n",
2284 opcls->chn, opcls->op_id, result, err_msg);
2101 2285
2286 client_send_result (opcls->client, opcls->op_id, result, err_msg);
2102} 2287}
2103 2288
2104 2289
@@ -2109,7 +2294,30 @@ static void
2109client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client, 2294client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2110 const struct GNUNET_MessageHeader *msg) 2295 const struct GNUNET_MessageHeader *msg)
2111{ 2296{
2297 struct Channel *
2298 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2299 GNUNET_assert (NULL != chn);
2300
2301 const struct StateRequest *
2302 req = (const struct StateRequest *) msg;
2112 2303
2304 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2305 const char *name = (const char *) &req[1];
2306 if (0 == name_size || '\0' != name[name_size - 1])
2307 {
2308 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2309 return;
2310 }
2311
2312 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2313 opcls->client = client;
2314 opcls->chn = chn;
2315 opcls->op_id = req->op_id;
2316
2317 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2318 &store_recv_state_var,
2319 &store_recv_state_result, opcls);
2320 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2113} 2321}
2114 2322
2115 2323
@@ -2120,6 +2328,30 @@ static void
2120client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, 2328client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2121 const struct GNUNET_MessageHeader *msg) 2329 const struct GNUNET_MessageHeader *msg)
2122{ 2330{
2331 struct Channel *
2332 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2333 GNUNET_assert (NULL != chn);
2334
2335 const struct StateRequest *
2336 req = (const struct StateRequest *) msg;
2337
2338 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2339 const char *name = (const char *) &req[1];
2340 if (0 == name_size || '\0' != name[name_size - 1])
2341 {
2342 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2343 return;
2344 }
2345
2346 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2347 opcls->client = client;
2348 opcls->chn = chn;
2349 opcls->op_id = req->op_id;
2350
2351 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2352 &store_recv_state_var,
2353 &store_recv_state_result, opcls);
2354 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2123 2355
2124} 2356}
2125 2357
@@ -2140,8 +2372,8 @@ static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2140 { &client_recv_membership_store, NULL, 2372 { &client_recv_membership_store, NULL,
2141 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 }, 2373 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2142 2374
2143 { &client_recv_story_request, NULL, 2375 { &client_recv_history_replay, NULL,
2144 GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 }, 2376 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2145 2377
2146 { &client_recv_state_get, NULL, 2378 { &client_recv_state_get, NULL,
2147 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, 2379 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 21131e7d3..f6d40ddb4 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -112,30 +112,39 @@ struct ChannelMembershipStoreRequest
112 */ 112 */
113 struct GNUNET_MessageHeader header; 113 struct GNUNET_MessageHeader header;
114 114
115 uint32_t reserved; 115 uint32_t reserved GNUNET_PACKED;
116
117 uint64_t op_id GNUNET_PACKED;
116 118
117 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; 119 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
118 120
119 uint64_t announced_at; 121 uint64_t announced_at GNUNET_PACKED;
120 122
121 uint64_t effective_since; 123 uint64_t effective_since GNUNET_PACKED;
122 124
123 uint8_t did_join; 125 uint8_t did_join;
124}; 126};
125 127
126 128
127struct StoryRequest 129struct HistoryRequest
128{ 130{
129 /** 131 /**
130 * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STORY_REQUEST 132 * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_HISTORY_REQUEST
131 */ 133 */
132 struct GNUNET_MessageHeader header; 134 struct GNUNET_MessageHeader header;
133 135
134 uint64_t op_id; 136 uint32_t reserved GNUNET_PACKED;
137
138 /**
139 * ID for this operation.
140 */
141 uint64_t op_id GNUNET_PACKED;
142
143 uint64_t start_message_id GNUNET_PACKED;
135 144
136 uint64_t start_message_id; 145 uint64_t end_message_id GNUNET_PACKED;
137 146
138 uint64_t end_message_id; 147 uint64_t message_limit GNUNET_PACKED;
139}; 148};
140 149
141 150
@@ -148,10 +157,12 @@ struct StateRequest
148 */ 157 */
149 struct GNUNET_MessageHeader header; 158 struct GNUNET_MessageHeader header;
150 159
160 uint32_t reserved GNUNET_PACKED;
161
151 /** 162 /**
152 * ID for this operation. 163 * ID for this operation.
153 */ 164 */
154 uint64_t op_id; 165 uint64_t op_id GNUNET_PACKED;
155 166
156 /* Followed by NUL-terminated name. */ 167 /* Followed by NUL-terminated name. */
157}; 168};
@@ -160,25 +171,6 @@ struct StateRequest
160/**** service -> library ****/ 171/**** service -> library ****/
161 172
162 173
163struct CountersResult
164{
165 /**
166 * Type: GNUNET_MESSAGE_TYPE_PSYC_RESULT_COUNTERS
167 */
168 struct GNUNET_MessageHeader header;
169
170 /**
171 * Status code for the operation.
172 */
173 int32_t result_code GNUNET_PACKED;
174
175 /**
176 * Last message ID sent to the channel.
177 */
178 uint64_t max_message_id;
179};
180
181
182/** 174/**
183 * Answer from service to client about last operation. 175 * Answer from service to client about last operation.
184 */ 176 */
@@ -192,23 +184,22 @@ struct OperationResult
192 */ 184 */
193 struct GNUNET_MessageHeader header; 185 struct GNUNET_MessageHeader header;
194 186
187 uint32_t reserved GNUNET_PACKED;
188
195 /** 189 /**
196 * Operation ID. 190 * Operation ID.
197 */ 191 */
198 uint32_t op_id GNUNET_PACKED; 192 uint64_t op_id GNUNET_PACKED;
199 193
200 /** 194 /**
201 * Status code for the operation. 195 * Status code for the operation.
202 */ 196 */
203 int64_t result_code GNUNET_PACKED; 197 uint64_t result_code GNUNET_PACKED;
204 198
205 /* Followed by: 199 /* Followed by:
206 * - on error: NUL-terminated error message 200 * - on error: NUL-terminated error message
207 * - on success: one of the following message types 201 * - on success: one of the following message types
208 * 202 *
209 * For a STORY_RESULT:
210 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE
211 *
212 * For a STATE_RESULT, one of: 203 * For a STATE_RESULT, one of:
213 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER 204 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER
214 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT 205 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index ca25b1b01..8cce89704 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -43,6 +43,33 @@
43#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) 43#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
44 44
45 45
46struct OperationListItem
47{
48 struct OperationListItem *prev;
49 struct OperationListItem *next;
50
51 /**
52 * Operation ID.
53 */
54 uint64_t op_id;
55
56 /**
57 * Continuation to invoke with the result of an operation.
58 */
59 GNUNET_PSYC_ResultCallback result_cb;
60
61 /**
62 * State variable result callback.
63 */
64 GNUNET_PSYC_StateVarCallback state_var_cb;
65
66 /**
67 * Closure for the callbacks.
68 */
69 void *cls;
70};
71
72
46/** 73/**
47 * Handle to access PSYC channel operations for both the master and slaves. 74 * Handle to access PSYC channel operations for both the master and slaves.
48 */ 75 */
@@ -84,6 +111,21 @@ struct GNUNET_PSYC_Channel
84 void *disconnect_cls; 111 void *disconnect_cls;
85 112
86 /** 113 /**
114 * First operation in the linked list.
115 */
116 struct OperationListItem *op_head;
117
118 /**
119 * Last operation in the linked list.
120 */
121 struct OperationListItem *op_tail;
122
123 /**
124 * Last operation ID used.
125 */
126 uint64_t last_op_id;
127
128 /**
87 * Are we polling for incoming messages right now? 129 * Are we polling for incoming messages right now?
88 */ 130 */
89 uint8_t in_receive; 131 uint8_t in_receive;
@@ -163,21 +205,82 @@ struct GNUNET_PSYC_SlaveTransmitHandle
163 205
164 206
165/** 207/**
166 * Handle to a story telling operation. 208 * Get a fresh operation ID to distinguish between PSYCstore requests.
209 *
210 * @param h Handle to the PSYCstore service.
211 * @return next operation id to use
167 */ 212 */
168struct GNUNET_PSYC_Story 213static uint64_t
214op_get_next_id (struct GNUNET_PSYC_Channel *chn)
169{ 215{
170 216 return ++chn->last_op_id;
171}; 217}
172 218
173 219
174/** 220/**
175 * Handle for a state query operation. 221 * Find operation by ID.
222 *
223 * @return Operation, or NULL if none found.
176 */ 224 */
177struct GNUNET_PSYC_StateQuery 225static struct OperationListItem *
226op_find_by_id (struct GNUNET_PSYC_Channel *chn, uint64_t op_id)
178{ 227{
228 struct OperationListItem *op = chn->op_head;
229 while (NULL != op)
230 {
231 if (op->op_id == op_id)
232 return op;
233 op = op->next;
234 }
235 return NULL;
236}
179 237
180}; 238
239static uint64_t
240op_add (struct GNUNET_PSYC_Channel *chn, GNUNET_PSYC_ResultCallback result_cb,
241 void *cls)
242{
243 if (NULL == result_cb)
244 return 0;
245
246 struct OperationListItem *op = GNUNET_malloc (sizeof (*op));
247 op->op_id = op_get_next_id (chn);
248 op->result_cb = result_cb;
249 op->cls = cls;
250 GNUNET_CONTAINER_DLL_insert_tail (chn->op_head, chn->op_tail, op);
251
252 LOG (GNUNET_ERROR_TYPE_DEBUG,
253 "%p Added operation #%" PRIu64 "\n", chn, op->op_id);
254 return op->op_id;
255}
256
257
258static int
259op_result (struct GNUNET_PSYC_Channel *chn, uint64_t op_id,
260 int64_t result_code, const char *err_msg)
261{
262 LOG (GNUNET_ERROR_TYPE_DEBUG,
263 "%p Received result for operation #%" PRIu64 ": %" PRId64 " (%s)\n",
264 chn, op_id, result_code, err_msg);
265 if (0 == op_id)
266 return GNUNET_NO;
267
268 struct OperationListItem *op = op_find_by_id (chn, op_id);
269 if (NULL == op)
270 {
271 LOG (GNUNET_ERROR_TYPE_WARNING,
272 "Could not find operation #%" PRIu64 "\n", op_id);
273 return GNUNET_NO;
274 }
275
276 GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op);
277
278 if (NULL != op->result_cb)
279 op->result_cb (op->cls, result_code, err_msg);
280
281 GNUNET_free (op);
282 return GNUNET_YES;
283}
181 284
182 285
183static void 286static void
@@ -203,6 +306,79 @@ channel_recv_disconnect (void *cls,
203 306
204 307
205static void 308static void
309channel_recv_result (void *cls,
310 struct GNUNET_CLIENT_MANAGER_Connection *client,
311 const struct GNUNET_MessageHeader *msg)
312{
313 struct GNUNET_PSYC_Channel *
314 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
315
316 uint16_t size = ntohs (msg->size);
317 const struct OperationResult *res = (const struct OperationResult *) msg;
318 const char *err_msg = NULL;
319
320 if (sizeof (struct OperationResult) < size)
321 {
322 err_msg = (const char *) &res[1];
323 if ('\0' != err_msg[size - sizeof (struct OperationResult) - 1])
324 {
325 GNUNET_break (0);
326 err_msg = NULL;
327 }
328 }
329
330 op_result (chn, GNUNET_ntohll (res->op_id),
331 GNUNET_ntohll (res->result_code) + INT64_MIN, err_msg);
332}
333
334
335static void
336channel_recv_state_result (void *cls,
337 struct GNUNET_CLIENT_MANAGER_Connection *client,
338 const struct GNUNET_MessageHeader *msg)
339{
340 struct GNUNET_PSYC_Channel *
341 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
342
343 const struct OperationResult *res = (const struct OperationResult *) msg;
344 struct OperationListItem *op = op_find_by_id (chn, GNUNET_ntohll (res->op_id));
345 if (NULL == op || NULL == op->state_var_cb)
346 return;
347
348 const struct GNUNET_MessageHeader *modc = (struct GNUNET_MessageHeader *) &op[1];
349 uint16_t modc_size = ntohs (modc->size);
350 if (ntohs (msg->size) - sizeof (*msg) != modc_size)
351 {
352 GNUNET_break (0);
353 return;
354 }
355 switch (ntohs (modc->type))
356 {
357 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
358 {
359 const struct GNUNET_PSYC_MessageModifier *
360 mod = (const struct GNUNET_PSYC_MessageModifier *) modc;
361
362 const char *name = (const char *) &mod[1];
363 uint16_t name_size = ntohs (mod->name_size);
364 if ('\0' != name[name_size - 1])
365 {
366 GNUNET_break (0);
367 return;
368 }
369 op->state_var_cb (op->cls, name, name + name_size, ntohs (mod->value_size));
370 break;
371 }
372
373 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
374 op->state_var_cb (op->cls, NULL, (const char *) &modc[1],
375 modc_size - sizeof (*modc));
376 break;
377 }
378}
379
380
381static void
206channel_recv_message (void *cls, 382channel_recv_message (void *cls,
207 struct GNUNET_CLIENT_MANAGER_Connection *client, 383 struct GNUNET_CLIENT_MANAGER_Connection *client,
208 const struct GNUNET_MessageHeader *msg) 384 const struct GNUNET_MessageHeader *msg)
@@ -234,9 +410,16 @@ master_recv_start_ack (void *cls,
234 mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, 410 mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
235 sizeof (struct GNUNET_PSYC_Channel)); 411 sizeof (struct GNUNET_PSYC_Channel));
236 412
237 struct CountersResult *cres = (struct CountersResult *) msg; 413 struct GNUNET_PSYC_CountersResultMessage *
414 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
415 int32_t result = ntohl (cres->result_code) + INT32_MIN;
416 if (GNUNET_OK != result && GNUNET_NO != result)
417 {
418 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master.\n");
419 GNUNET_break (0);
420 }
238 if (NULL != mst->start_cb) 421 if (NULL != mst->start_cb)
239 mst->start_cb (mst->cb_cls, GNUNET_ntohll (cres->max_message_id)); 422 mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
240} 423}
241 424
242 425
@@ -279,9 +462,16 @@ slave_recv_join_ack (void *cls,
279 struct GNUNET_PSYC_Slave * 462 struct GNUNET_PSYC_Slave *
280 slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, 463 slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
281 sizeof (struct GNUNET_PSYC_Channel)); 464 sizeof (struct GNUNET_PSYC_Channel));
282 struct CountersResult *cres = (struct CountersResult *) msg; 465 struct GNUNET_PSYC_CountersResultMessage *
466 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
467 int32_t result = ntohl (cres->result_code) + INT32_MIN;
468 if (GNUNET_YES != result && GNUNET_NO != result)
469 {
470 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n");
471 GNUNET_break (0);
472 }
283 if (NULL != slv->connect_cb) 473 if (NULL != slv->connect_cb)
284 slv->connect_cb (slv->cb_cls, GNUNET_ntohll (cres->max_message_id)); 474 slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
285} 475}
286 476
287 477
@@ -317,12 +507,20 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] =
317 507
318 { &master_recv_start_ack, NULL, 508 { &master_recv_start_ack, NULL,
319 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, 509 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK,
320 sizeof (struct CountersResult), GNUNET_NO }, 510 sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO },
321 511
322 { &master_recv_join_request, NULL, 512 { &master_recv_join_request, NULL,
323 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, 513 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
324 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, 514 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES },
325 515
516 { &channel_recv_state_result, NULL,
517 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
518 sizeof (struct OperationResult), GNUNET_YES },
519
520 { &channel_recv_result, NULL,
521 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
522 sizeof (struct OperationResult), GNUNET_YES },
523
326 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 524 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
327 525
328 { NULL, NULL, 0, 0, GNUNET_NO } 526 { NULL, NULL, 0, 0, GNUNET_NO }
@@ -341,12 +539,20 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] =
341 539
342 { &slave_recv_join_ack, NULL, 540 { &slave_recv_join_ack, NULL,
343 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK, 541 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK,
344 sizeof (struct CountersResult), GNUNET_NO }, 542 sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO },
345 543
346 { &slave_recv_join_decision, NULL, 544 { &slave_recv_join_decision, NULL,
347 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 545 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
348 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, 546 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES },
349 547
548 { &channel_recv_state_result, NULL,
549 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
550 sizeof (struct OperationResult), GNUNET_YES },
551
552 { &channel_recv_result, NULL,
553 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
554 sizeof (struct OperationResult), GNUNET_YES },
555
350 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 556 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
351 557
352 { NULL, NULL, 0, 0, GNUNET_NO } 558 { NULL, NULL, 0, 0, GNUNET_NO }
@@ -808,7 +1014,9 @@ void
808GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, 1014GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
809 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 1015 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
810 uint64_t announced_at, 1016 uint64_t announced_at,
811 uint64_t effective_since) 1017 uint64_t effective_since,
1018 GNUNET_PSYC_ResultCallback result_cb,
1019 void *cls)
812{ 1020{
813 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); 1021 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req));
814 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); 1022 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE);
@@ -817,6 +1025,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
817 req->announced_at = GNUNET_htonll (announced_at); 1025 req->announced_at = GNUNET_htonll (announced_at);
818 req->effective_since = GNUNET_htonll (effective_since); 1026 req->effective_since = GNUNET_htonll (effective_since);
819 req->did_join = GNUNET_YES; 1027 req->did_join = GNUNET_YES;
1028 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1029
820 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1030 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
821} 1031}
822 1032
@@ -845,7 +1055,9 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
845void 1055void
846GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, 1056GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
847 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 1057 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
848 uint64_t announced_at) 1058 uint64_t announced_at,
1059 GNUNET_PSYC_ResultCallback result_cb,
1060 void *cls)
849{ 1061{
850 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); 1062 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req));
851 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); 1063 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE);
@@ -853,57 +1065,85 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
853 req->slave_key = *slave_key; 1065 req->slave_key = *slave_key;
854 req->announced_at = GNUNET_htonll (announced_at); 1066 req->announced_at = GNUNET_htonll (announced_at);
855 req->did_join = GNUNET_NO; 1067 req->did_join = GNUNET_NO;
1068 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1069
856 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1070 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
857} 1071}
858 1072
859 1073
860/** 1074/**
861 * Request to be told the message history of the channel. 1075 * Request to replay a part of the message history of the channel.
862 * 1076 *
863 * Historic messages (but NOT the state at the time) will be replayed (given to 1077 * Historic messages (but NOT the state at the time) will be replayed (given to
864 * the normal method handlers) if available and if access is permitted. 1078 * the normal method handlers) if available and if access is permitted.
865 * 1079 *
866 * To get the latest message, use 0 for both the start and end message ID. 1080 * @param channel
867 * 1081 * Which channel should be replayed?
868 * @param channel Which channel should be replayed? 1082 * @param start_message_id
869 * @param start_message_id Earliest interesting point in history. 1083 * Earliest interesting point in history.
870 * @param end_message_id Last (exclusive) interesting point in history. 1084 * @param end_message_id
871 * @param message_cb Function to invoke on message parts received from the story. 1085 * Last (inclusive) interesting point in history.
872 * @param finish_cb Function to call when the requested story has been fully 1086 * FIXME: @param method_prefix
873 * told (counting message IDs might not suffice, as some messages 1087 * Retrieve only messages with a matching method prefix.
874 * might be secret and thus the listener would not know the story is 1088 * @param result_cb
875 * finished without being told explicitly) once this function 1089 * Function to call when the requested history has been fully replayed.
876 * has been called, the client must not call 1090 * @param cls
877 * GNUNET_PSYC_channel_story_tell_cancel() anymore. 1091 * Closure for the callbacks.
878 * @param cls Closure for the callbacks. 1092 *
879 * 1093 * @return Handle to cancel history replay operation.
880 * @return Handle to cancel story telling operation.
881 */ 1094 */
882struct GNUNET_PSYC_Story * 1095void
883GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, 1096GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
884 uint64_t start_message_id, 1097 uint64_t start_message_id,
885 uint64_t end_message_id, 1098 uint64_t end_message_id,
886 GNUNET_PSYC_MessageCallback message_cb, 1099 /* FIXME: const char *method_prefix, */
887 GNUNET_PSYC_MessagePartCallback message_part_cb, 1100 GNUNET_PSYC_ResultCallback result_cb,
888 GNUNET_PSYC_FinishCallback finish_cb, 1101 void *cls)
889 void *cls)
890{ 1102{
891 return NULL; 1103 struct HistoryRequest *req = GNUNET_malloc (sizeof (*req));
1104 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
1105 req->header.size = htons (sizeof (*req));
1106 req->start_message_id = GNUNET_htonll (start_message_id);
1107 req->end_message_id = GNUNET_htonll (end_message_id);
1108 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1109
1110 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
892} 1111}
893 1112
894 1113
895/** 1114/**
896 * Abort story telling. 1115 * Request to replay the latest messages from the message history of the channel.
897 * 1116 *
898 * This function must not be called from within method handlers (as given to 1117 * Historic messages (but NOT the state at the time) will be replayed (given to
899 * GNUNET_PSYC_slave_join()) of the slave. 1118 * the normal method handlers) if available and if access is permitted.
900 * 1119 *
901 * @param story Story telling operation to stop. 1120 * @param channel
1121 * Which channel should be replayed?
1122 * @param message_limit
1123 * Maximum number of messages to replay.
1124 * FIXME: @param method_prefix
1125 * Retrieve only messages with a matching method prefix.
1126 * @param result_cb
1127 * Function to call when the requested history has been fully replayed.
1128 * @param cls
1129 * Closure for the callbacks.
1130 *
1131 * @return Handle to cancel history replay operation.
902 */ 1132 */
903void 1133void
904GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story) 1134GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn,
1135 uint64_t message_limit,
1136 /* FIXME: const char *method_prefix, */
1137 GNUNET_PSYC_ResultCallback result_cb,
1138 void *cls)
905{ 1139{
1140 struct HistoryRequest *req = GNUNET_malloc (sizeof (*req));
1141 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
1142 req->header.size = htons (sizeof (*req));
1143 req->message_limit = GNUNET_htonll (message_limit);
1144 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
906 1145
1146 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
907} 1147}
908 1148
909 1149
@@ -914,22 +1154,35 @@ GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story)
914 * less-specific name is matched; for example, requesting "_a_b" will match "_a" 1154 * less-specific name is matched; for example, requesting "_a_b" will match "_a"
915 * if "_a_b" does not exist. 1155 * if "_a_b" does not exist.
916 * 1156 *
917 * @param channel Channel handle. 1157 * @param channel
918 * @param full_name Full name of the requested variable, the actual variable 1158 * Channel handle.
919 * returned might have a shorter name.. 1159 * @param full_name
920 * @param cb Function called once when a matching state variable is found. 1160 * Full name of the requested variable.
1161 * The actual variable returned might have a shorter name.
1162 * @param var_cb
1163 * Function called once when a matching state variable is found.
921 * Not called if there's no matching state variable. 1164 * Not called if there's no matching state variable.
922 * @param cb_cls Closure for the callbacks. 1165 * @param result_cb
923 * 1166 * Function called after the operation finished.
924 * @return Handle that can be used to cancel the query operation. 1167 * (i.e. all state variables have been returned via @a state_cb)
1168 * @param cls
1169 * Closure for the callbacks.
925 */ 1170 */
926struct GNUNET_PSYC_StateQuery * 1171void
927GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel, 1172GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn,
928 const char *full_name, 1173 const char *full_name,
929 GNUNET_PSYC_StateCallback cb, 1174 GNUNET_PSYC_StateVarCallback var_cb,
930 void *cb_cls) 1175 GNUNET_PSYC_ResultCallback result_cb,
1176 void *cls)
931{ 1177{
932 return NULL; 1178 size_t name_size = strlen (full_name) + 1;
1179 struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size);
1180 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET);
1181 req->header.size = htons (sizeof (*req) + name_size);
1182 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1183 memcpy (&req[1], full_name, name_size);
1184
1185 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
933} 1186}
934 1187
935 1188
@@ -943,33 +1196,34 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
943 * The @a state_cb is invoked on all matching state variables asynchronously, as 1196 * The @a state_cb is invoked on all matching state variables asynchronously, as
944 * the state is stored in and retrieved from the PSYCstore, 1197 * the state is stored in and retrieved from the PSYCstore,
945 * 1198 *
946 * @param channel Channel handle. 1199 * @param channel
947 * @param name_prefix Prefix of the state variable name to match. 1200 * Channel handle.
948 * @param cb Function to call with the matching state variables. 1201 * @param name_prefix
949 * @param cb_cls Closure for the callbacks. 1202 * Prefix of the state variable name to match.
950 * 1203 * @param var_cb
951 * @return Handle that can be used to cancel the query operation. 1204 * Function called once when a matching state variable is found.
952 */ 1205 * Not called if there's no matching state variable.
953struct GNUNET_PSYC_StateQuery * 1206 * @param result_cb
954GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel, 1207 * Function called after the operation finished.
955 const char *name_prefix, 1208 * (i.e. all state variables have been returned via @a state_cb)
956 GNUNET_PSYC_StateCallback cb, 1209 * @param cls
957 void *cb_cls) 1210 * Closure for the callbacks.
958{
959 return NULL;
960}
961
962
963/**
964 * Cancel a state query operation.
965 *
966 * @param query Handle for the operation to cancel.
967 */ 1211 */
968void 1212void
969GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateQuery *query) 1213GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn,
1214 const char *name_prefix,
1215 GNUNET_PSYC_StateVarCallback var_cb,
1216 GNUNET_PSYC_ResultCallback result_cb,
1217 void *cls)
970{ 1218{
1219 size_t name_size = strlen (name_prefix) + 1;
1220 struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size);
1221 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET);
1222 req->header.size = htons (sizeof (*req) + name_size);
1223 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1224 memcpy (&req[1], name_prefix, name_size);
971 1225
1226 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
972} 1227}
973 1228
974
975/* end of psyc_api.c */ 1229/* end of psyc_api.c */
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 5eadef62c..044895809 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -35,6 +35,7 @@
35#include "gnunet_env_lib.h" 35#include "gnunet_env_lib.h"
36#include "gnunet_psyc_util_lib.h" 36#include "gnunet_psyc_util_lib.h"
37#include "gnunet_psyc_service.h" 37#include "gnunet_psyc_service.h"
38#include "gnunet_core_service.h"
38 39
39#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) 40#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
40 41
@@ -45,6 +46,9 @@ int res;
45 46
46const struct GNUNET_CONFIGURATION_Handle *cfg; 47const struct GNUNET_CONFIGURATION_Handle *cfg;
47 48
49struct GNUNET_CORE_Handle *core;
50struct GNUNET_PeerIdentity this_peer;
51
48/** 52/**
49 * Handle for task for timeout termination. 53 * Handle for task for timeout termination.
50 */ 54 */
@@ -53,6 +57,8 @@ GNUNET_SCHEDULER_TaskIdentifier end_badly_task;
53struct GNUNET_PSYC_Master *mst; 57struct GNUNET_PSYC_Master *mst;
54struct GNUNET_PSYC_Slave *slv; 58struct GNUNET_PSYC_Slave *slv;
55 59
60struct GNUNET_PSYC_Channel *mst_chn, *slv_chn;
61
56struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key; 62struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key;
57struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key; 63struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key;
58 64
@@ -80,9 +86,19 @@ uint8_t join_req_count;
80 86
81enum 87enum
82{ 88{
83 TEST_NONE, 89 TEST_NONE = 0,
84 TEST_SLAVE_TRANSMIT, 90 TEST_MASTER_START = 1,
85 TEST_MASTER_TRANSMIT, 91 TEST_SLAVE_JOIN = 2,
92 TEST_SLAVE_TRANSMIT = 3,
93 TEST_MASTER_TRANSMIT = 4,
94 TEST_MASTER_HISTORY_REPLAY_LATEST = 5,
95 TEST_SLAVE_HISTORY_REPLAY_LATEST = 6,
96 TEST_MASTER_HISTORY_REPLAY = 7,
97 TEST_SLAVE_HISTORY_REPLAY = 8,
98 TEST_MASTER_STATE_GET = 9,
99 TEST_SLAVE_STATE_GET = 10,
100 TEST_MASTER_STATE_GET_PREFIX = 11,
101 TEST_SLAVE_STATE_GET_PREFIX = 12,
86} test; 102} test;
87 103
88 104
@@ -118,6 +134,11 @@ void slave_parted (void *cls)
118void 134void
119cleanup () 135cleanup ()
120{ 136{
137 if (NULL != core)
138 {
139 GNUNET_CORE_disconnect (core);
140 core = NULL;
141 }
121 if (NULL != slv) 142 if (NULL != slv)
122 { 143 {
123 GNUNET_PSYC_slave_part (slv, GNUNET_NO, &slave_parted, NULL); 144 GNUNET_PSYC_slave_part (slv, GNUNET_NO, &slave_parted, NULL);
@@ -177,13 +198,203 @@ end ()
177 198
178 199
179void 200void
201state_get_var (void *cls, const char *name, const void *value, size_t value_size)
202{
203 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
204 "Got state var: %s\n%.*s\n", name, value_size, value);
205}
206
207
208/*** Slave state_get_prefix() ***/
209
210void
211slave_state_get_prefix_result (void *cls, int64_t result, const char *err_msg)
212{
213 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
214 "slave_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg);
215 // FIXME: GNUNET_assert (2 == result);
216 end ();
217}
218
219
220void
221slave_state_get_prefix ()
222{
223 test = TEST_SLAVE_STATE_GET_PREFIX;
224 GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", &state_get_var,
225 &slave_state_get_prefix_result, NULL);
226}
227
228
229/*** Master state_get_prefix() ***/
230
231
232void
233master_state_get_prefix_result (void *cls, int64_t result, const char *err_msg)
234{
235 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
236 "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg);
237 // FIXME: GNUNET_assert (2 == result);
238 slave_state_get_prefix ();
239}
240
241
242void
243master_state_get_prefix ()
244{
245 test = TEST_MASTER_STATE_GET_PREFIX;
246 GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", &state_get_var,
247 &master_state_get_prefix_result, NULL);
248}
249
250
251/*** Slave state_get() ***/
252
253
254void
255slave_state_get_result (void *cls, int64_t result, const char *err_msg)
256{
257 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
258 "slave_state_get:\t%" PRId64 " (%s)\n", result, err_msg);
259 // FIXME: GNUNET_assert (2 == result);
260 master_state_get_prefix ();
261}
262
263
264void
265slave_state_get ()
266{
267 test = TEST_SLAVE_STATE_GET;
268 GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", &state_get_var,
269 &slave_state_get_result, NULL);
270}
271
272
273/*** Master state_get() ***/
274
275
276void
277master_state_get_result (void *cls, int64_t result, const char *err_msg)
278{
279 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
280 "master_state_get:\t%" PRId64 " (%s)\n", result, err_msg);
281 // FIXME: GNUNET_assert (1 == result);
282 slave_state_get ();
283}
284
285
286void
287master_state_get ()
288{
289 test = TEST_MASTER_STATE_GET;
290 GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", &state_get_var,
291 &master_state_get_result, NULL);
292}
293
294
295/*** Slave history_replay() ***/
296
297void
298slave_history_replay_result (void *cls, int64_t result, const char *err_msg)
299{
300 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
301 "slave_history_replay:\t%" PRId64 " (%s)\n", result, err_msg);
302 GNUNET_assert (9 == result);
303
304 master_state_get ();
305}
306
307
308void
309slave_history_replay ()
310{
311 test = TEST_SLAVE_HISTORY_REPLAY;
312 GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1,
313 &slave_history_replay_result,
314 NULL);
315}
316
317
318/*** Master history_replay() ***/
319
320
321void
322master_history_replay_result (void *cls, int64_t result, const char *err_msg)
323{
324 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
325 "master_history_replay:\t%" PRId64 " (%s)\n", result, err_msg);
326 GNUNET_assert (9 == result);
327
328 slave_history_replay ();
329}
330
331
332void
333master_history_replay ()
334{
335 test = TEST_MASTER_HISTORY_REPLAY;
336 GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1,
337 &master_history_replay_result,
338 NULL);
339}
340
341
342/*** Slave history_replay_latest() ***/
343
344
345void
346slave_history_replay_latest_result (void *cls, int64_t result, const char *err_msg)
347{
348 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
349 "slave_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg);
350 GNUNET_assert (9 == result);
351
352 master_history_replay ();
353}
354
355
356void
357slave_history_replay_latest ()
358{
359 test = TEST_SLAVE_HISTORY_REPLAY_LATEST;
360 GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1,
361 &slave_history_replay_latest_result,
362 NULL);
363}
364
365
366/*** Master history_replay_latest() ***/
367
368
369void
370master_history_replay_latest_result (void *cls, int64_t result, const char *err_msg)
371{
372 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
373 "master_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg);
374 GNUNET_assert (9 == result);
375
376 slave_history_replay_latest ();
377}
378
379
380void
381master_history_replay_latest ()
382{
383 test = TEST_MASTER_HISTORY_REPLAY_LATEST;
384 GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1,
385 &master_history_replay_latest_result,
386 NULL);
387}
388
389
390void
180master_message_cb (void *cls, uint64_t message_id, uint32_t flags, 391master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
181 const struct GNUNET_PSYC_MessageHeader *msg) 392 const struct GNUNET_PSYC_MessageHeader *msg)
182{ 393{
183 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 394 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
184 "Master got PSYC message fragment of size %u " 395 "Test #%d: Master got PSYC message fragment of size %u "
185 "belonging to message ID %llu with flags %x\n", 396 "belonging to message ID %" PRIu64 " with flags %x\n",
186 ntohs (msg->header.size), message_id, flags); 397 test, ntohs (msg->header.size), message_id, flags);
187 // FIXME 398 // FIXME
188} 399}
189 400
@@ -196,7 +407,7 @@ master_message_part_cb (void *cls, uint64_t message_id,
196 if (NULL == msg) 407 if (NULL == msg)
197 { 408 {
198 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 409 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
199 "Error while receiving message %llu\n", message_id); 410 "Error while receiving message %" PRIu64 "\n", message_id);
200 return; 411 return;
201 } 412 }
202 413
@@ -204,9 +415,9 @@ master_message_part_cb (void *cls, uint64_t message_id,
204 uint16_t size = ntohs (msg->size); 415 uint16_t size = ntohs (msg->size);
205 416
206 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 417 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
207 "Master got message part of type %u and size %u " 418 "Test #%d: Master got message part of type %u and size %u "
208 "belonging to message ID %llu with flags %x\n", 419 "belonging to message ID %" PRIu64 " with flags %x\n",
209 type, size, message_id, flags); 420 test, type, size, message_id, flags);
210 421
211 switch (test) 422 switch (test)
212 { 423 {
@@ -227,6 +438,18 @@ master_message_part_cb (void *cls, uint64_t message_id,
227 case TEST_MASTER_TRANSMIT: 438 case TEST_MASTER_TRANSMIT:
228 break; 439 break;
229 440
441 case TEST_MASTER_HISTORY_REPLAY:
442 case TEST_MASTER_HISTORY_REPLAY_LATEST:
443 if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
444 {
445 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
446 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
447 flags);
448 GNUNET_assert (0);
449 return;
450 }
451 break;
452
230 default: 453 default:
231 GNUNET_assert (0); 454 GNUNET_assert (0);
232 } 455 }
@@ -238,9 +461,9 @@ slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
238 const struct GNUNET_PSYC_MessageHeader *msg) 461 const struct GNUNET_PSYC_MessageHeader *msg)
239{ 462{
240 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 463 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
241 "Slave got PSYC message fragment of size %u " 464 "Test #%d: Slave got PSYC message fragment of size %u "
242 "belonging to message ID %llu with flags %x\n", 465 "belonging to message ID %" PRIu64 " with flags %x\n",
243 ntohs (msg->header.size), message_id, flags); 466 test, ntohs (msg->header.size), message_id, flags);
244 // FIXME 467 // FIXME
245} 468}
246 469
@@ -253,7 +476,7 @@ slave_message_part_cb (void *cls, uint64_t message_id,
253 if (NULL == msg) 476 if (NULL == msg)
254 { 477 {
255 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 478 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
256 "Error while receiving message %llu\n", message_id); 479 "Error while receiving message " PRIu64 "\n", message_id);
257 return; 480 return;
258 } 481 }
259 482
@@ -261,15 +484,27 @@ slave_message_part_cb (void *cls, uint64_t message_id,
261 uint16_t size = ntohs (msg->size); 484 uint16_t size = ntohs (msg->size);
262 485
263 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 486 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
264 "Slave got message part of type %u and size %u " 487 "Test #%d: Slave got message part of type %u and size %u "
265 "belonging to message ID %llu with flags %x\n", 488 "belonging to message ID %" PRIu64 " with flags %x\n",
266 type, size, message_id, flags); 489 test, type, size, message_id, flags);
267 490
268 switch (test) 491 switch (test)
269 { 492 {
270 case TEST_MASTER_TRANSMIT: 493 case TEST_MASTER_TRANSMIT:
271 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) 494 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
272 end (); 495 master_history_replay_latest ();
496 break;
497
498 case TEST_SLAVE_HISTORY_REPLAY:
499 case TEST_SLAVE_HISTORY_REPLAY_LATEST:
500 if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
501 {
502 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
503 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
504 flags);
505 GNUNET_assert (0);
506 return;
507 }
273 break; 508 break;
274 509
275 default: 510 default:
@@ -417,7 +652,6 @@ slave_transmit ()
417{ 652{
418 653
419 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n"); 654 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n");
420
421 test = TEST_SLAVE_TRANSMIT; 655 test = TEST_SLAVE_TRANSMIT;
422 656
423 tmit = GNUNET_new (struct TransmitClosure); 657 tmit = GNUNET_new (struct TransmitClosure);
@@ -438,6 +672,29 @@ slave_transmit ()
438 672
439 673
440void 674void
675slave_remove_cb (void *cls, int64_t result, const char *err_msg)
676{
677 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
678 "slave_remove:\t%" PRId64 " (%s)\n", result, err_msg);
679
680 slave_transmit ();
681}
682
683
684void
685slave_add_cb (void *cls, int64_t result, const char *err_msg)
686{
687 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
688 "slave_add:\t%" PRId64 " (%s)\n", result, err_msg);
689
690 struct GNUNET_PSYC_Channel *chn = cls;
691 GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
692 &slave_remove_cb, chn);
693
694}
695
696
697void
441join_decision_cb (void *cls, 698join_decision_cb (void *cls,
442 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn, 699 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn,
443 int is_admitted, 700 int is_admitted,
@@ -453,7 +710,8 @@ join_decision_cb (void *cls,
453 return; 710 return;
454 } 711 }
455 712
456 slave_transmit (); 713 struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
714 GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn);
457} 715}
458 716
459 717
@@ -473,19 +731,17 @@ join_request_cb (void *cls,
473 /* Reject first request */ 731 /* Reject first request */
474 int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO; 732 int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO;
475 GNUNET_PSYC_join_decision (jh, is_admitted, 0, NULL, NULL); 733 GNUNET_PSYC_join_decision (jh, is_admitted, 0, NULL, NULL);
476
477 /* Membership store */
478 struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
479 GNUNET_PSYC_channel_slave_add (chn, slave_key, 2, 2);
480 GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2);
481} 734}
482 735
483 736
484void 737void
485slave_connect_cb (void *cls, uint64_t max_message_id) 738slave_connect_cb (void *cls, int result, uint64_t max_message_id)
486{ 739{
487 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 740 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
488 "Slave connected: %lu\n", max_message_id); 741 "Slave connected: %d, max_message_id: %" PRIu64 "\n",
742 result, max_message_id);
743 GNUNET_assert (TEST_SLAVE_JOIN == test);
744 GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
489} 745}
490 746
491 747
@@ -493,8 +749,9 @@ void
493slave_join () 749slave_join ()
494{ 750{
495 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n"); 751 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
752 test = TEST_SLAVE_JOIN;
496 753
497 struct GNUNET_PeerIdentity origin = {}; // FIXME: this peer 754 struct GNUNET_PeerIdentity origin = this_peer;
498 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); 755 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
499 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, 756 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
500 "_foo", "bar baz", 7); 757 "_foo", "bar baz", 7);
@@ -507,6 +764,7 @@ slave_join ()
507 &slave_message_cb, &slave_message_part_cb, 764 &slave_message_cb, &slave_message_part_cb,
508 &slave_connect_cb, &join_decision_cb, NULL, 765 &slave_connect_cb, &join_decision_cb, NULL,
509 join_msg); 766 join_msg);
767 slv_chn = GNUNET_PSYC_slave_get_channel (slv);
510 GNUNET_ENV_environment_destroy (env); 768 GNUNET_ENV_environment_destroy (env);
511} 769}
512 770
@@ -564,10 +822,13 @@ master_transmit ()
564 822
565 823
566void 824void
567master_start_cb (void *cls, uint64_t max_message_id) 825master_start_cb (void *cls, int result, uint64_t max_message_id)
568{ 826{
569 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 827 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
570 "Master started: %" PRIu64 "\n", max_message_id); 828 "Master started: %d, max_message_id: %" PRIu64 "\n",
829 result, max_message_id);
830 GNUNET_assert (TEST_MASTER_START == test);
831 GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
571 slave_join (); 832 slave_join ();
572} 833}
573 834
@@ -576,10 +837,12 @@ void
576master_start () 837master_start ()
577{ 838{
578 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); 839 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
840 test = TEST_MASTER_START;
579 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, 841 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE,
580 &master_start_cb, &join_request_cb, 842 &master_start_cb, &join_request_cb,
581 &master_message_cb, &master_message_part_cb, 843 &master_message_cb, &master_message_part_cb,
582 NULL); 844 NULL);
845 mst_chn = GNUNET_PSYC_master_get_channel (mst);
583} 846}
584 847
585void 848void
@@ -589,6 +852,21 @@ schedule_master_start (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
589} 852}
590 853
591 854
855void
856core_connected (void *cls, const struct GNUNET_PeerIdentity *my_identity)
857{
858 this_peer = *my_identity;
859
860#if DEBUG_TEST_PSYC
861 master_start ();
862#else
863 /* Allow some time for the services to initialize. */
864 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
865 &schedule_master_start, NULL);
866#endif
867
868}
869
592/** 870/**
593 * Main function of the test, run from scheduler. 871 * Main function of the test, run from scheduler.
594 * 872 *
@@ -615,14 +893,8 @@ run (void *cls,
615 GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); 893 GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key);
616 GNUNET_CRYPTO_ecdsa_key_get_public (slave_key, &slave_pub_key); 894 GNUNET_CRYPTO_ecdsa_key_get_public (slave_key, &slave_pub_key);
617 895
618#if DEBUG_TEST_PSYC 896 core = GNUNET_CORE_connect (cfg, NULL, &core_connected, NULL, NULL,
619 master_start (); 897 NULL, GNUNET_NO, NULL, GNUNET_NO, NULL);
620#else
621 /* Allow some time for the services to initialize. */
622 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
623 &schedule_master_start, NULL);
624#endif
625 return;
626} 898}
627 899
628 900