diff options
author | Gabor X Toth <*@tg-x.net> | 2015-08-28 13:33:43 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2015-08-28 13:33:43 +0000 |
commit | 38963d1e81332032e0ac774f4f2c6b804c38802a (patch) | |
tree | ce33b979e47fe332c7c744744d60077a7e1fefee /src/psyc | |
parent | b4fa14499c64140273850569247abda687803053 (diff) | |
download | gnunet-38963d1e81332032e0ac774f4f2c6b804c38802a.tar.gz gnunet-38963d1e81332032e0ac774f4f2c6b804c38802a.zip |
psyc/social: get state from psycstore
Diffstat (limited to 'src/psyc')
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 156 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 22 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 20 |
3 files changed, 119 insertions, 79 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 4c34f6108..2afc98040 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -1,3 +1,4 @@ | |||
1 | |||
1 | /* | 2 | /* |
2 | * This file is part of GNUnet | 3 | * This file is part of GNUnet |
3 | * Copyright (C) 2013 Christian Grothoff (and other contributing authors) | 4 | * Copyright (C) 2013 Christian Grothoff (and other contributing authors) |
@@ -171,6 +172,11 @@ struct FragmentQueue | |||
171 | uint8_t state; | 172 | uint8_t state; |
172 | 173 | ||
173 | /** | 174 | /** |
175 | * Whether the state is already modified in PSYCstore. | ||
176 | */ | ||
177 | uint8_t state_is_modified; | ||
178 | |||
179 | /** | ||
174 | * Is the message queued for delivery to the client? | 180 | * Is the message queued for delivery to the client? |
175 | * i.e. added to the recv_msgs queue | 181 | * i.e. added to the recv_msgs queue |
176 | */ | 182 | */ |
@@ -460,9 +466,9 @@ op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client, | |||
460 | 466 | ||
461 | 467 | ||
462 | static void | 468 | static void |
463 | op_remove (struct Channel *chn, struct Operation *op) | 469 | op_remove (struct Operation *op) |
464 | { | 470 | { |
465 | GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op); | 471 | GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op); |
466 | GNUNET_free (op); | 472 | GNUNET_free (op); |
467 | } | 473 | } |
468 | 474 | ||
@@ -1008,7 +1014,8 @@ client_send_mcast_msg (struct Channel *chn, | |||
1008 | chn, GNUNET_ntohll (mmsg->fragment_id), | 1014 | chn, GNUNET_ntohll (mmsg->fragment_id), |
1009 | GNUNET_ntohll (mmsg->message_id)); | 1015 | GNUNET_ntohll (mmsg->message_id)); |
1010 | 1016 | ||
1011 | struct GNUNET_PSYC_MessageHeader *pmsg = psyc_msg_new (mmsg, flags); | 1017 | struct GNUNET_PSYC_MessageHeader * |
1018 | pmsg = GNUNET_PSYC_message_header_create (mmsg, flags); | ||
1012 | client_send_msg (chn, &pmsg->header); | 1019 | client_send_msg (chn, &pmsg->header); |
1013 | GNUNET_free (pmsg); | 1020 | GNUNET_free (pmsg); |
1014 | } | 1021 | } |
@@ -1049,7 +1056,7 @@ client_send_mcast_req (struct Master *mst, | |||
1049 | /** | 1056 | /** |
1050 | * Insert a multicast message fragment into the queue belonging to the message. | 1057 | * Insert a multicast message fragment into the queue belonging to the message. |
1051 | * | 1058 | * |
1052 | * @param chn Channel. | 1059 | * @param chn Channel. |
1053 | * @param mmsg Multicast message fragment. | 1060 | * @param mmsg Multicast message fragment. |
1054 | * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. | 1061 | * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. |
1055 | * @param first_ptype First PSYC message part type in @a mmsg. | 1062 | * @param first_ptype First PSYC message part type in @a mmsg. |
@@ -1222,7 +1229,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id, | |||
1222 | struct GNUNET_CONTAINER_MultiHashMap | 1229 | struct GNUNET_CONTAINER_MultiHashMap |
1223 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, | 1230 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, |
1224 | &chn->pub_key_hash); | 1231 | &chn->pub_key_hash); |
1225 | GNUNET_assert (NULL != chan_msgs); | 1232 | GNUNET_assert (NULL != chan_msgs); // FIXME |
1226 | uint64_t frag_id; | 1233 | uint64_t frag_id; |
1227 | 1234 | ||
1228 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, | 1235 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, |
@@ -1279,8 +1286,8 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id, | |||
1279 | struct StateModifyClosure | 1286 | struct StateModifyClosure |
1280 | { | 1287 | { |
1281 | struct Channel *chn; | 1288 | struct Channel *chn; |
1282 | struct FragmentQueue *fragq; | 1289 | uint64_t msg_id; |
1283 | uint64_t message_id; | 1290 | struct GNUNET_HashCode msg_id_hash; |
1284 | }; | 1291 | }; |
1285 | 1292 | ||
1286 | 1293 | ||
@@ -1290,21 +1297,37 @@ store_recv_state_modify_result (void *cls, int64_t result, | |||
1290 | { | 1297 | { |
1291 | struct StateModifyClosure *mcls = cls; | 1298 | struct StateModifyClosure *mcls = cls; |
1292 | struct Channel *chn = mcls->chn; | 1299 | struct Channel *chn = mcls->chn; |
1293 | struct FragmentQueue *fragq = mcls->fragq; | 1300 | uint64_t msg_id = mcls->msg_id; |
1294 | uint64_t msg_id = mcls->message_id; | 1301 | |
1302 | struct FragmentQueue * | ||
1303 | fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash); | ||
1295 | 1304 | ||
1296 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1305 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1297 | "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n", | 1306 | "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n", |
1298 | chn, result, err_msg_size, err_msg); | 1307 | chn, result, err_msg_size, err_msg); |
1299 | 1308 | ||
1300 | if (GNUNET_OK == result) | 1309 | switch (result) |
1301 | { | 1310 | { |
1302 | chn->max_state_message_id = msg_id; | 1311 | case GNUNET_OK: |
1303 | chn->max_message_id = msg_id; | 1312 | case GNUNET_NO: |
1313 | if (NULL != fragq) | ||
1314 | fragq->state_is_modified = GNUNET_YES; | ||
1315 | if (chn->max_state_message_id < msg_id) | ||
1316 | chn->max_state_message_id = msg_id; | ||
1317 | if (chn->max_message_id < msg_id) | ||
1318 | chn->max_message_id = msg_id; | ||
1304 | 1319 | ||
1305 | fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); | 1320 | if (NULL != fragq) |
1321 | fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); | ||
1306 | GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); | 1322 | GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); |
1307 | message_queue_run (chn); | 1323 | message_queue_run (chn); |
1324 | break; | ||
1325 | |||
1326 | default: | ||
1327 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1328 | "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n", | ||
1329 | chn, result, err_msg_size, err_msg); | ||
1330 | /** @todo FIXME: handle state_modify error */ | ||
1308 | } | 1331 | } |
1309 | } | 1332 | } |
1310 | 1333 | ||
@@ -1349,42 +1372,58 @@ message_queue_run (struct Channel *chn) | |||
1349 | break; | 1372 | break; |
1350 | } | 1373 | } |
1351 | 1374 | ||
1352 | if (MSG_FRAG_STATE_HEADER == fragq->state) | 1375 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1376 | "%p Fragment queue entry: state: %u, state delta: " | ||
1377 | "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n", | ||
1378 | chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id); | ||
1379 | |||
1380 | if (MSG_FRAG_STATE_DATA <= fragq->state) | ||
1353 | { | 1381 | { |
1354 | /* Check if there's a missing message before the current one */ | 1382 | /* Check if there's a missing message before the current one */ |
1355 | if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) | 1383 | if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) |
1356 | { | 1384 | { |
1385 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n"); | ||
1386 | |||
1357 | if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) | 1387 | if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) |
1358 | && msg_id - 1 != chn->max_message_id) | 1388 | && (chn->max_message_id != msg_id - 1 |
1389 | && chn->max_message_id != msg_id)) | ||
1359 | { | 1390 | { |
1360 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1391 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1361 | "%p Out of order message. " | 1392 | "%p Out of order message. " |
1362 | "(%" PRIu64 " - 1 != %" PRIu64 ")\n", | 1393 | "(%" PRIu64 " != %" PRIu64 " - 1)\n", |
1363 | chn, msg_id, chn->max_message_id); | 1394 | chn, chn->max_message_id, msg_id); |
1364 | continue; | 1395 | break; |
1396 | // FIXME: keep track of messages processed in this queue run, | ||
1397 | // and only stop after reaching the end | ||
1365 | } | 1398 | } |
1366 | } | 1399 | } |
1367 | else | 1400 | else |
1368 | { | 1401 | { |
1369 | if (msg_id - fragq->state_delta != chn->max_state_message_id) | 1402 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n"); |
1403 | if (GNUNET_YES != fragq->state_is_modified) | ||
1370 | { | 1404 | { |
1371 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1405 | if (msg_id - fragq->state_delta != chn->max_state_message_id) |
1372 | "%p Out of order stateful message. " | 1406 | { |
1373 | "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", | 1407 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1374 | chn, msg_id, fragq->state_delta, chn->max_state_message_id); | 1408 | "%p Out of order stateful message. " |
1375 | continue; | 1409 | "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", |
1410 | chn, msg_id, fragq->state_delta, chn->max_state_message_id); | ||
1411 | break; | ||
1412 | // FIXME: keep track of messages processed in this queue run, | ||
1413 | // and only stop after reaching the end | ||
1414 | } | ||
1415 | |||
1416 | struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls)); | ||
1417 | mcls->chn = chn; | ||
1418 | mcls->msg_id = msg_id; | ||
1419 | mcls->msg_id_hash = msg_id_hash; | ||
1420 | |||
1421 | /* Apply modifiers to state in PSYCstore */ | ||
1422 | GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id, | ||
1423 | fragq->state_delta, | ||
1424 | store_recv_state_modify_result, mcls); | ||
1425 | break; // continue after asynchronous state modify result | ||
1376 | } | 1426 | } |
1377 | |||
1378 | struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls)); | ||
1379 | mcls->chn = chn; | ||
1380 | mcls->fragq = fragq; | ||
1381 | mcls->message_id = msg_id; | ||
1382 | |||
1383 | /* Apply modifiers to state in PSYCstore */ | ||
1384 | GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id, | ||
1385 | fragq->state_delta, | ||
1386 | store_recv_state_modify_result, mcls); | ||
1387 | break; | ||
1388 | } | 1427 | } |
1389 | chn->max_message_id = msg_id; | 1428 | chn->max_message_id = msg_id; |
1390 | } | 1429 | } |
@@ -2060,7 +2099,7 @@ static void | |||
2060 | master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, | 2099 | master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, |
2061 | uint16_t first_ptype, uint16_t last_ptype) | 2100 | uint16_t first_ptype, uint16_t last_ptype) |
2062 | { | 2101 | { |
2063 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst); | 2102 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst); |
2064 | 2103 | ||
2065 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) | 2104 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) |
2066 | { | 2105 | { |
@@ -2074,11 +2113,13 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, | |||
2074 | } | 2113 | } |
2075 | else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) | 2114 | else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) |
2076 | { | 2115 | { |
2116 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_modify flag\n", mst); | ||
2077 | pmeth->state_delta = GNUNET_htonll (tmit_msg->id | 2117 | pmeth->state_delta = GNUNET_htonll (tmit_msg->id |
2078 | - mst->max_state_message_id); | 2118 | - mst->max_state_message_id); |
2079 | } | 2119 | } |
2080 | else | 2120 | else |
2081 | { | 2121 | { |
2122 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_not_modified flag\n", mst); | ||
2082 | pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); | 2123 | pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); |
2083 | } | 2124 | } |
2084 | 2125 | ||
@@ -2226,14 +2267,6 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
2226 | }; | 2267 | }; |
2227 | 2268 | ||
2228 | 2269 | ||
2229 | struct MembershipStoreClosure | ||
2230 | { | ||
2231 | struct GNUNET_SERVER_Client *client; | ||
2232 | struct Channel *chn; | ||
2233 | uint64_t op_id; | ||
2234 | }; | ||
2235 | |||
2236 | |||
2237 | /** | 2270 | /** |
2238 | * Received result of GNUNET_PSYCSTORE_membership_store() | 2271 | * Received result of GNUNET_PSYCSTORE_membership_store() |
2239 | */ | 2272 | */ |
@@ -2241,12 +2274,13 @@ static void | |||
2241 | store_recv_membership_store_result (void *cls, int64_t result, | 2274 | store_recv_membership_store_result (void *cls, int64_t result, |
2242 | const char *err_msg, uint16_t err_msg_size) | 2275 | const char *err_msg, uint16_t err_msg_size) |
2243 | { | 2276 | { |
2244 | struct MembershipStoreClosure *mcls = cls; | 2277 | struct Operation *op = cls; |
2245 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2278 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2246 | "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n", | 2279 | "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n", |
2247 | mcls->chn, result, err_msg_size, err_msg); | 2280 | op->chn, result, err_msg_size, err_msg); |
2248 | 2281 | ||
2249 | client_send_result (mcls->client, mcls->op_id, result, err_msg, err_msg_size); | 2282 | client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); |
2283 | op_remove (op); | ||
2250 | } | 2284 | } |
2251 | 2285 | ||
2252 | 2286 | ||
@@ -2264,10 +2298,7 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, | |||
2264 | const struct ChannelMembershipStoreRequest * | 2298 | const struct ChannelMembershipStoreRequest * |
2265 | req = (const struct ChannelMembershipStoreRequest *) msg; | 2299 | req = (const struct ChannelMembershipStoreRequest *) msg; |
2266 | 2300 | ||
2267 | struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls)); | 2301 | struct Operation *op = op_add (chn, client, req->op_id, 0); |
2268 | mcls->client = client; | ||
2269 | mcls->chn = chn; | ||
2270 | mcls->op_id = req->op_id; | ||
2271 | 2302 | ||
2272 | uint64_t announced_at = GNUNET_ntohll (req->announced_at); | 2303 | uint64_t announced_at = GNUNET_ntohll (req->announced_at); |
2273 | uint64_t effective_since = GNUNET_ntohll (req->effective_since); | 2304 | uint64_t effective_since = GNUNET_ntohll (req->effective_since); |
@@ -2280,7 +2311,7 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, | |||
2280 | GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key, | 2311 | GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key, |
2281 | req->did_join, announced_at, effective_since, | 2312 | req->did_join, announced_at, effective_since, |
2282 | 0, /* FIXME: group_generation */ | 2313 | 0, /* FIXME: group_generation */ |
2283 | &store_recv_membership_store_result, mcls); | 2314 | &store_recv_membership_store_result, op); |
2284 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2315 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2285 | } | 2316 | } |
2286 | 2317 | ||
@@ -2313,7 +2344,7 @@ store_recv_fragment_history (void *cls, | |||
2313 | res->result_code = GNUNET_htonll (GNUNET_OK); | 2344 | res->result_code = GNUNET_htonll (GNUNET_OK); |
2314 | 2345 | ||
2315 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; | 2346 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; |
2316 | psyc_msg_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC); | 2347 | GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC); |
2317 | memcpy (&res[1], pmsg, psize); | 2348 | memcpy (&res[1], pmsg, psize); |
2318 | 2349 | ||
2319 | /** @todo FIXME: send only to requesting client */ | 2350 | /** @todo FIXME: send only to requesting client */ |
@@ -2339,7 +2370,7 @@ store_recv_fragment_history_result (void *cls, int64_t result, | |||
2339 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2370 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2340 | "%p History replay #%" PRIu64 ": " | 2371 | "%p History replay #%" PRIu64 ": " |
2341 | "PSYCSTORE returned %" PRId64 " (%.*s)\n", | 2372 | "PSYCSTORE returned %" PRId64 " (%.*s)\n", |
2342 | op->chn, op->op_id, result, err_msg_size, err_msg); | 2373 | op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); |
2343 | 2374 | ||
2344 | if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE) | 2375 | if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE) |
2345 | { | 2376 | { |
@@ -2347,6 +2378,7 @@ store_recv_fragment_history_result (void *cls, int64_t result, | |||
2347 | } | 2378 | } |
2348 | 2379 | ||
2349 | client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); | 2380 | client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); |
2381 | op_remove (op); | ||
2350 | } | 2382 | } |
2351 | 2383 | ||
2352 | 2384 | ||
@@ -2404,12 +2436,16 @@ client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, | |||
2404 | */ | 2436 | */ |
2405 | static int | 2437 | static int |
2406 | store_recv_state_var (void *cls, const char *name, | 2438 | store_recv_state_var (void *cls, const char *name, |
2407 | const void *value, size_t value_size) | 2439 | const void *value, uint32_t value_size) |
2408 | { | 2440 | { |
2409 | struct Operation *op = cls; | 2441 | struct Operation *op = cls; |
2410 | struct GNUNET_OperationResultMessage *res; | 2442 | struct GNUNET_OperationResultMessage *res; |
2411 | 2443 | ||
2412 | if (NULL != name) | 2444 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2445 | "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n", | ||
2446 | op->chn, GNUNET_ntohll (op->op_id), name); | ||
2447 | |||
2448 | if (NULL != name) /* First part */ | ||
2413 | { | 2449 | { |
2414 | uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; | 2450 | uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; |
2415 | struct GNUNET_PSYC_MessageModifier *mod; | 2451 | struct GNUNET_PSYC_MessageModifier *mod; |
@@ -2427,7 +2463,7 @@ store_recv_state_var (void *cls, const char *name, | |||
2427 | memcpy (&mod[1], name, name_size); | 2463 | memcpy (&mod[1], name, name_size); |
2428 | memcpy (((char *) &mod[1]) + name_size, value, value_size); | 2464 | memcpy (((char *) &mod[1]) + name_size, value, value_size); |
2429 | } | 2465 | } |
2430 | else | 2466 | else /* Continuation */ |
2431 | { | 2467 | { |
2432 | struct GNUNET_MessageHeader *mod; | 2468 | struct GNUNET_MessageHeader *mod; |
2433 | res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size); | 2469 | res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size); |
@@ -2445,7 +2481,6 @@ store_recv_state_var (void *cls, const char *name, | |||
2445 | GNUNET_SERVER_notification_context_add (nc, op->client); | 2481 | GNUNET_SERVER_notification_context_add (nc, op->client); |
2446 | GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header, | 2482 | GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header, |
2447 | GNUNET_NO); | 2483 | GNUNET_NO); |
2448 | GNUNET_free (op); | ||
2449 | return GNUNET_YES; | 2484 | return GNUNET_YES; |
2450 | } | 2485 | } |
2451 | 2486 | ||
@@ -2460,12 +2495,13 @@ store_recv_state_result (void *cls, int64_t result, | |||
2460 | { | 2495 | { |
2461 | struct Operation *op = cls; | 2496 | struct Operation *op = cls; |
2462 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2497 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2463 | "%p History replay #%" PRIu64 ": " | 2498 | "%p state_get #%" PRIu64 ": " |
2464 | "PSYCSTORE returned %" PRId64 " (%.*s)\n", | 2499 | "PSYCSTORE returned %" PRId64 " (%.*s)\n", |
2465 | op->chn, op->op_id, result, err_msg_size, err_msg); | 2500 | op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg); |
2466 | 2501 | ||
2467 | // FIXME: client might have been disconnected | 2502 | // FIXME: client might have been disconnected |
2468 | client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); | 2503 | client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); |
2504 | op_remove (op); | ||
2469 | } | 2505 | } |
2470 | 2506 | ||
2471 | 2507 | ||
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 5fc5391a0..b862eee9c 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -366,34 +366,36 @@ channel_recv_state_result (void *cls, | |||
366 | } | 366 | } |
367 | 367 | ||
368 | const struct GNUNET_MessageHeader * | 368 | const struct GNUNET_MessageHeader * |
369 | modc = (struct GNUNET_MessageHeader *) &res[1]; | 369 | mod = (struct GNUNET_MessageHeader *) &res[1]; |
370 | uint16_t modc_size = ntohs (modc->size); | 370 | uint16_t mod_size = ntohs (mod->size); |
371 | if (ntohs (msg->size) - sizeof (*msg) != modc_size) | 371 | if (ntohs (msg->size) - sizeof (*res) != mod_size) |
372 | { | 372 | { |
373 | GNUNET_break (0); | 373 | GNUNET_break (0); |
374 | return; | 374 | return; |
375 | } | 375 | } |
376 | switch (ntohs (modc->type)) | 376 | switch (ntohs (mod->type)) |
377 | { | 377 | { |
378 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | 378 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: |
379 | { | 379 | { |
380 | const struct GNUNET_PSYC_MessageModifier * | 380 | const struct GNUNET_PSYC_MessageModifier * |
381 | mod = (const struct GNUNET_PSYC_MessageModifier *) modc; | 381 | pmod = (const struct GNUNET_PSYC_MessageModifier *) mod; |
382 | 382 | ||
383 | const char *name = (const char *) &mod[1]; | 383 | const char *name = (const char *) &pmod[1]; |
384 | uint16_t name_size = ntohs (mod->name_size); | 384 | uint16_t name_size = ntohs (pmod->name_size); |
385 | if ('\0' != name[name_size - 1]) | 385 | if ('\0' != name[name_size - 1]) |
386 | { | 386 | { |
387 | GNUNET_break (0); | 387 | GNUNET_break (0); |
388 | return; | 388 | return; |
389 | } | 389 | } |
390 | sr->var_cb (sr->cls, name, name + name_size, ntohs (mod->value_size)); | 390 | sr->var_cb (sr->cls, mod, name, name + name_size, |
391 | ntohs (pmod->header.size) - sizeof (*pmod), | ||
392 | ntohs (pmod->value_size)); | ||
391 | break; | 393 | break; |
392 | } | 394 | } |
393 | 395 | ||
394 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | 396 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: |
395 | sr->var_cb (sr->cls, NULL, (const char *) &modc[1], | 397 | sr->var_cb (sr->cls, mod, NULL, (const char *) &mod[1], |
396 | modc_size - sizeof (*modc)); | 398 | mod_size - sizeof (*mod), 0); |
397 | break; | 399 | break; |
398 | } | 400 | } |
399 | } | 401 | } |
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index d62474afb..4e7979a4d 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -329,7 +329,9 @@ slave_message_part_cb (void *cls, uint64_t message_id, | |||
329 | 329 | ||
330 | 330 | ||
331 | void | 331 | void |
332 | state_get_var (void *cls, const char *name, const void *value, size_t value_size) | 332 | state_get_var (void *cls, const struct GNUNET_MessageHeader *mod, |
333 | const char *name, const void *value, | ||
334 | uint32_t value_size, uint32_t full_value_size) | ||
333 | { | 335 | { |
334 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 336 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
335 | "Got state var: %s\n%.*s\n", name, value_size, value); | 337 | "Got state var: %s\n%.*s\n", name, value_size, value); |
@@ -354,8 +356,8 @@ void | |||
354 | slave_state_get_prefix () | 356 | slave_state_get_prefix () |
355 | { | 357 | { |
356 | test = TEST_SLAVE_STATE_GET_PREFIX; | 358 | test = TEST_SLAVE_STATE_GET_PREFIX; |
357 | GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", &state_get_var, | 359 | GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", state_get_var, |
358 | &slave_state_get_prefix_result, NULL); | 360 | slave_state_get_prefix_result, NULL); |
359 | } | 361 | } |
360 | 362 | ||
361 | 363 | ||
@@ -377,8 +379,8 @@ void | |||
377 | master_state_get_prefix () | 379 | master_state_get_prefix () |
378 | { | 380 | { |
379 | test = TEST_MASTER_STATE_GET_PREFIX; | 381 | test = TEST_MASTER_STATE_GET_PREFIX; |
380 | GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", &state_get_var, | 382 | GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", state_get_var, |
381 | &master_state_get_prefix_result, NULL); | 383 | master_state_get_prefix_result, NULL); |
382 | } | 384 | } |
383 | 385 | ||
384 | 386 | ||
@@ -401,8 +403,8 @@ void | |||
401 | slave_state_get () | 403 | slave_state_get () |
402 | { | 404 | { |
403 | test = TEST_SLAVE_STATE_GET; | 405 | test = TEST_SLAVE_STATE_GET; |
404 | GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", &state_get_var, | 406 | GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", state_get_var, |
405 | &slave_state_get_result, NULL); | 407 | slave_state_get_result, NULL); |
406 | } | 408 | } |
407 | 409 | ||
408 | 410 | ||
@@ -425,8 +427,8 @@ void | |||
425 | master_state_get () | 427 | master_state_get () |
426 | { | 428 | { |
427 | test = TEST_MASTER_STATE_GET; | 429 | test = TEST_MASTER_STATE_GET; |
428 | GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", &state_get_var, | 430 | GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", state_get_var, |
429 | &master_state_get_result, NULL); | 431 | master_state_get_result, NULL); |
430 | } | 432 | } |
431 | 433 | ||
432 | 434 | ||