aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/gnunet-service-psyc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/psyc/gnunet-service-psyc.c')
-rw-r--r--src/psyc/gnunet-service-psyc.c272
1 files changed, 252 insertions, 20 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index e7020bc69..3adc34d2a 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -402,6 +402,14 @@ struct Slave
402}; 402};
403 403
404 404
405struct OperationClosure
406{
407 struct GNUNET_SERVER_Client *client;
408 struct Channel *chn;
409 uint64_t op_id;
410};
411
412
405static void 413static void
406transmit_message (struct Channel *chn); 414transmit_message (struct Channel *chn);
407 415
@@ -585,6 +593,46 @@ client_send_msg (const struct Channel *chn,
585 593
586 594
587/** 595/**
596 * Send a result code back to the client.
597 *
598 * @param client
599 * Client that should receive the result code.
600 * @param result_code
601 * Code to transmit.
602 * @param op_id
603 * Operation ID in network byte order.
604 * @param err_msg
605 * Error message to include (or NULL for none).
606 */
607static void
608client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
609 int64_t result_code, const char *err_msg)
610{
611 struct OperationResult *res;
612 size_t err_len = 0; // FIXME: maximum length
613
614 if (NULL != err_msg)
615 err_len = strlen (err_msg) + 1;
616 res = GNUNET_malloc (sizeof (struct OperationResult) + err_len);
617 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
618 res->header.size = htons (sizeof (struct OperationResult) + err_len);
619 res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1);
620 res->op_id = op_id;
621 if (0 < err_len)
622 memcpy (&res[1], err_msg, err_len);
623 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
624 "%p Sending result to client for operation #%" PRIu64 ": "
625 "%" PRId64 " (%s)\n",
626 client, GNUNET_ntohll (op_id), result_code, err_msg);
627
628 GNUNET_SERVER_notification_context_add (nc, client);
629 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
630 GNUNET_NO);
631 GNUNET_free (res);
632}
633
634
635/**
588 * Closure for join_mem_test_cb() 636 * Closure for join_mem_test_cb()
589 */ 637 */
590struct JoinMemTestClosure 638struct JoinMemTestClosure
@@ -799,7 +847,8 @@ mcast_recv_replay_fragment (void *cls,
799 847
800{ 848{
801 struct Channel *chn = cls; 849 struct Channel *chn = cls;
802 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key, fragment_id, 850 GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
851 fragment_id, fragment_id,
803 &store_recv_fragment_replay, 852 &store_recv_fragment_replay,
804 &store_recv_fragment_replay_result, rh); 853 &store_recv_fragment_replay_result, rh);
805} 854}
@@ -817,7 +866,8 @@ mcast_recv_replay_message (void *cls,
817 struct GNUNET_MULTICAST_ReplayHandle *rh) 866 struct GNUNET_MULTICAST_ReplayHandle *rh)
818{ 867{
819 struct Channel *chn = cls; 868 struct Channel *chn = cls;
820 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, message_id, 869 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
870 message_id, message_id,
821 &store_recv_fragment_replay, 871 &store_recv_fragment_replay,
822 &store_recv_fragment_replay_result, rh); 872 &store_recv_fragment_replay_result, rh);
823} 873}
@@ -865,7 +915,8 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
865 */ 915 */
866static void 916static void
867client_send_mcast_msg (struct Channel *chn, 917client_send_mcast_msg (struct Channel *chn,
868 const struct GNUNET_MULTICAST_MessageHeader *mmsg) 918 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
919 uint32_t flags)
869{ 920{
870 struct GNUNET_PSYC_MessageHeader *pmsg; 921 struct GNUNET_PSYC_MessageHeader *pmsg;
871 uint16_t size = ntohs (mmsg->header.size); 922 uint16_t size = ntohs (mmsg->header.size);
@@ -882,6 +933,7 @@ client_send_mcast_msg (struct Channel *chn,
882 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 933 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
883 pmsg->message_id = mmsg->message_id; 934 pmsg->message_id = mmsg->message_id;
884 pmsg->fragment_offset = mmsg->fragment_offset; 935 pmsg->fragment_offset = mmsg->fragment_offset;
936 pmsg->flags = htonl (flags);
885 937
886 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); 938 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
887 client_send_msg (chn, &pmsg->header); 939 client_send_msg (chn, &pmsg->header);
@@ -1111,7 +1163,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1111 { 1163 {
1112 if (GNUNET_NO == drop) 1164 if (GNUNET_NO == drop)
1113 { 1165 {
1114 client_send_mcast_msg (chn, cache_entry->mmsg); 1166 client_send_mcast_msg (chn, cache_entry->mmsg, 0);
1115 } 1167 }
1116 if (cache_entry->ref_count <= 1) 1168 if (cache_entry->ref_count <= 1)
1117 { 1169 {
@@ -1375,10 +1427,10 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1375 struct Channel *chn = &mst->chn; 1427 struct Channel *chn = &mst->chn;
1376 chn->store_op = NULL; 1428 chn->store_op = NULL;
1377 1429
1378 struct CountersResult res; 1430 struct GNUNET_PSYC_CountersResultMessage res;
1379 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1431 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1380 res.header.size = htons (sizeof (res)); 1432 res.header.size = htons (sizeof (res));
1381 res.result_code = htonl (result); 1433 res.result_code = htonl (result - INT32_MIN);
1382 res.max_message_id = GNUNET_htonll (max_message_id); 1434 res.max_message_id = GNUNET_htonll (max_message_id);
1383 1435
1384 if (GNUNET_OK == result || GNUNET_NO == result) 1436 if (GNUNET_OK == result || GNUNET_NO == result)
@@ -1421,10 +1473,10 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1421 struct Channel *chn = &slv->chn; 1473 struct Channel *chn = &slv->chn;
1422 chn->store_op = NULL; 1474 chn->store_op = NULL;
1423 1475
1424 struct CountersResult res; 1476 struct GNUNET_PSYC_CountersResultMessage res;
1425 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1477 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1426 res.header.size = htons (sizeof (res)); 1478 res.header.size = htons (sizeof (res));
1427 res.result_code = htonl (result); 1479 res.result_code = htonl (result - INT32_MIN);
1428 res.max_message_id = GNUNET_htonll (max_message_id); 1480 res.max_message_id = GNUNET_htonll (max_message_id);
1429 1481
1430 if (GNUNET_OK == result || GNUNET_NO == result) 1482 if (GNUNET_OK == result || GNUNET_NO == result)
@@ -1511,10 +1563,10 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1511 { 1563 {
1512 chn = &mst->chn; 1564 chn = &mst->chn;
1513 1565
1514 struct CountersResult res; 1566 struct GNUNET_PSYC_CountersResultMessage res;
1515 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1567 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1516 res.header.size = htons (sizeof (res)); 1568 res.header.size = htons (sizeof (res));
1517 res.result_code = htonl (GNUNET_OK); 1569 res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN);
1518 res.max_message_id = GNUNET_htonll (mst->max_message_id); 1570 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1519 1571
1520 GNUNET_SERVER_notification_context_add (nc, client); 1572 GNUNET_SERVER_notification_context_add (nc, client);
@@ -1621,10 +1673,10 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1621 { 1673 {
1622 chn = &slv->chn; 1674 chn = &slv->chn;
1623 1675
1624 struct CountersResult res; 1676 struct GNUNET_PSYC_CountersResultMessage res;
1625 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1677 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1626 res.header.size = htons (sizeof (res)); 1678 res.header.size = htons (sizeof (res));
1627 res.result_code = htonl (GNUNET_OK); 1679 res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN);
1628 res.max_message_id = GNUNET_htonll (chn->max_message_id); 1680 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1629 1681
1630 GNUNET_SERVER_notification_context_add (nc, client); 1682 GNUNET_SERVER_notification_context_add (nc, client);
@@ -2047,17 +2099,26 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2047}; 2099};
2048 2100
2049 2101
2102struct MembershipStoreClosure
2103{
2104 struct GNUNET_SERVER_Client *client;
2105 struct Channel *chn;
2106 uint64_t op_id;
2107};
2108
2109
2050/** 2110/**
2051 * Received result of GNUNET_PSYCSTORE_membership_store() 2111 * Received result of GNUNET_PSYCSTORE_membership_store()
2052 */ 2112 */
2053static void 2113static void
2054store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg) 2114store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg)
2055{ 2115{
2056 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls; 2116 struct MembershipStoreClosure *mcls = cls;
2057 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2117 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2058 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n", 2118 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n",
2059 mth, result, err_msg); 2119 mcls->chn, result, err_msg);
2060 /* FIXME: send result to client */ 2120
2121 client_send_result (mcls->client, mcls->op_id, result, err_msg);
2061} 2122}
2062 2123
2063 2124
@@ -2075,6 +2136,11 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2075 const struct ChannelMembershipStoreRequest * 2136 const struct ChannelMembershipStoreRequest *
2076 req = (const struct ChannelMembershipStoreRequest *) msg; 2137 req = (const struct ChannelMembershipStoreRequest *) msg;
2077 2138
2139 struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls));
2140 mcls->client = client;
2141 mcls->chn = chn;
2142 mcls->op_id = req->op_id;
2143
2078 uint64_t announced_at = GNUNET_ntohll (req->announced_at); 2144 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2079 uint64_t effective_since = GNUNET_ntohll (req->effective_since); 2145 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
2080 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2086,19 +2152,138 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2086 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key, 2152 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2087 req->did_join, announced_at, effective_since, 2153 req->did_join, announced_at, effective_since,
2088 0, /* FIXME: group_generation */ 2154 0, /* FIXME: group_generation */
2089 &store_recv_membership_store_result, chn); 2155 &store_recv_membership_store_result, mcls);
2090 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2156 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2091} 2157}
2092 2158
2093 2159
2160static int
2161store_recv_fragment_history (void *cls,
2162 struct GNUNET_MULTICAST_MessageHeader *msg,
2163 enum GNUNET_PSYCSTORE_MessageFlags flags)
2164{
2165 struct OperationClosure *opcls = cls;
2166 struct Channel *chn = opcls->chn;
2167 client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC);
2168 return GNUNET_YES;
2169}
2170
2171
2172/**
2173 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
2174 */
2175static void
2176store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg)
2177{
2178 struct OperationClosure *opcls = cls;
2179 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2180 "%p History replay #%" PRIu64 ": "
2181 "PSYCSTORE returned %" PRId64 " (%s)\n",
2182 opcls->chn, opcls->op_id, result, err_msg);
2183
2184 client_send_result (opcls->client, opcls->op_id, result, err_msg);
2185}
2186
2187
2094/** 2188/**
2095 * Client requests channel history from PSYCstore. 2189 * Client requests channel history from PSYCstore.
2096 */ 2190 */
2097static void 2191static void
2098client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client, 2192client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2099 const struct GNUNET_MessageHeader *msg) 2193 const struct GNUNET_MessageHeader *msg)
2194{
2195 struct Channel *
2196 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2197 GNUNET_assert (NULL != chn);
2198
2199 const struct HistoryRequest *
2200 req = (const struct HistoryRequest *) msg;
2201
2202 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2203 opcls->client = client;
2204 opcls->chn = chn;
2205 opcls->op_id = req->op_id;
2206
2207 if (0 == req->message_limit)
2208 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2209 GNUNET_ntohll (req->start_message_id),
2210 GNUNET_ntohll (req->end_message_id),
2211 &store_recv_fragment_history,
2212 &store_recv_fragment_history_result, opcls);
2213 else
2214 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2215 GNUNET_ntohll (req->message_limit),
2216 &store_recv_fragment_history,
2217 &store_recv_fragment_history_result,
2218 opcls);
2219
2220 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2221}
2222
2223
2224/**
2225 * Received state var from PSYCstore, send it to client.
2226 */
2227static int
2228store_recv_state_var (void *cls, const char *name,
2229 const void *value, size_t value_size)
2230{
2231 struct OperationClosure *opcls = cls;
2232 struct OperationResult *op;
2233
2234 if (NULL != name)
2235 {
2236 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2237 struct GNUNET_PSYC_MessageModifier *mod;
2238 op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size);
2239 op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size);
2240 op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2241 op->op_id = opcls->op_id;
2242
2243 mod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
2244 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2245 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2246 mod->name_size = htons (name_size);
2247 mod->value_size = htonl (value_size);
2248 mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
2249 memcpy (&mod[1], name, name_size);
2250 memcpy (((char *) &mod[1]) + name_size, value, value_size);
2251 }
2252 else
2253 {
2254 struct GNUNET_MessageHeader *mod;
2255 op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size);
2256 op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size);
2257 op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2258 op->op_id = opcls->op_id;
2259
2260 mod = (struct GNUNET_MessageHeader *) &op[1];
2261 mod->size = htons (sizeof (*mod) + value_size);
2262 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2263 memcpy (&mod[1], value, value_size);
2264 }
2265
2266 GNUNET_SERVER_notification_context_add (nc, opcls->client);
2267 GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header,
2268 GNUNET_NO);
2269 return GNUNET_YES;
2270}
2271
2272
2273/**
2274 * Received result of GNUNET_PSYCSTORE_state_get()
2275 * or GNUNET_PSYCSTORE_state_get_prefix()
2276 */
2277static void
2278store_recv_state_result (void *cls, int64_t result, const char *err_msg)
2100{ 2279{
2280 struct OperationClosure *opcls = cls;
2281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2282 "%p History replay #%" PRIu64 ": "
2283 "PSYCSTORE returned %" PRId64 " (%s)\n",
2284 opcls->chn, opcls->op_id, result, err_msg);
2101 2285
2286 client_send_result (opcls->client, opcls->op_id, result, err_msg);
2102} 2287}
2103 2288
2104 2289
@@ -2109,7 +2294,30 @@ static void
2109client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client, 2294client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2110 const struct GNUNET_MessageHeader *msg) 2295 const struct GNUNET_MessageHeader *msg)
2111{ 2296{
2297 struct Channel *
2298 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2299 GNUNET_assert (NULL != chn);
2300
2301 const struct StateRequest *
2302 req = (const struct StateRequest *) msg;
2112 2303
2304 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2305 const char *name = (const char *) &req[1];
2306 if (0 == name_size || '\0' != name[name_size - 1])
2307 {
2308 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2309 return;
2310 }
2311
2312 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2313 opcls->client = client;
2314 opcls->chn = chn;
2315 opcls->op_id = req->op_id;
2316
2317 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2318 &store_recv_state_var,
2319 &store_recv_state_result, opcls);
2320 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2113} 2321}
2114 2322
2115 2323
@@ -2120,6 +2328,30 @@ static void
2120client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, 2328client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2121 const struct GNUNET_MessageHeader *msg) 2329 const struct GNUNET_MessageHeader *msg)
2122{ 2330{
2331 struct Channel *
2332 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2333 GNUNET_assert (NULL != chn);
2334
2335 const struct StateRequest *
2336 req = (const struct StateRequest *) msg;
2337
2338 uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
2339 const char *name = (const char *) &req[1];
2340 if (0 == name_size || '\0' != name[name_size - 1])
2341 {
2342 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2343 return;
2344 }
2345
2346 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
2347 opcls->client = client;
2348 opcls->chn = chn;
2349 opcls->op_id = req->op_id;
2350
2351 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2352 &store_recv_state_var,
2353 &store_recv_state_result, opcls);
2354 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2123 2355
2124} 2356}
2125 2357
@@ -2140,8 +2372,8 @@ static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2140 { &client_recv_membership_store, NULL, 2372 { &client_recv_membership_store, NULL,
2141 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 }, 2373 GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
2142 2374
2143 { &client_recv_story_request, NULL, 2375 { &client_recv_history_replay, NULL,
2144 GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 }, 2376 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
2145 2377
2146 { &client_recv_state_get, NULL, 2378 { &client_recv_state_get, NULL,
2147 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, 2379 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },