diff options
author | Gabor X Toth <*@tg-x.net> | 2015-07-18 00:03:06 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2015-07-18 00:03:06 +0000 |
commit | d38544730123a1f365ef287a7e88060d97b266b7 (patch) | |
tree | e66f5ee143adde1a5206f50bbbdd00a8a179fb81 /src/psyc | |
parent | 2275976cf61565bde4f17e8c2c0bc0d359541ac4 (diff) | |
download | gnunet-d38544730123a1f365ef287a7e88060d97b266b7.tar.gz gnunet-d38544730123a1f365ef287a7e88060d97b266b7.zip |
psyc/store: apply state modifiers
Diffstat (limited to 'src/psyc')
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 63 | ||||
-rw-r--r-- | src/psyc/psyc_util_lib.c | 7 |
2 files changed, 59 insertions, 11 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 72377680d..29ef07f10 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -416,6 +416,8 @@ struct Slave | |||
416 | static void | 416 | static void |
417 | transmit_message (struct Channel *chn); | 417 | transmit_message (struct Channel *chn); |
418 | 418 | ||
419 | static uint64_t | ||
420 | message_queue_run (struct Channel *chn); | ||
419 | 421 | ||
420 | static uint64_t | 422 | static uint64_t |
421 | message_queue_drop (struct Channel *chn); | 423 | message_queue_drop (struct Channel *chn); |
@@ -1274,6 +1276,39 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id, | |||
1274 | } | 1276 | } |
1275 | 1277 | ||
1276 | 1278 | ||
1279 | struct StateModifyClosure | ||
1280 | { | ||
1281 | struct Channel *chn; | ||
1282 | struct FragmentQueue *fragq; | ||
1283 | uint64_t message_id; | ||
1284 | }; | ||
1285 | |||
1286 | |||
1287 | void | ||
1288 | store_recv_state_modify_result (void *cls, int64_t result, | ||
1289 | const char *err_msg, uint16_t err_msg_size) | ||
1290 | { | ||
1291 | struct StateModifyClosure *mcls = cls; | ||
1292 | struct Channel *chn = mcls->chn; | ||
1293 | struct FragmentQueue *fragq = mcls->fragq; | ||
1294 | uint64_t msg_id = mcls->message_id; | ||
1295 | |||
1296 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1297 | "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n", | ||
1298 | chn, result, err_msg_size, err_msg); | ||
1299 | |||
1300 | if (GNUNET_OK == result) | ||
1301 | { | ||
1302 | chn->max_state_message_id = msg_id; | ||
1303 | chn->max_message_id = msg_id; | ||
1304 | |||
1305 | fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); | ||
1306 | GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); | ||
1307 | message_queue_run (chn); | ||
1308 | } | ||
1309 | } | ||
1310 | |||
1311 | |||
1277 | /** | 1312 | /** |
1278 | * Run message queue. | 1313 | * Run message queue. |
1279 | * | 1314 | * |
@@ -1294,6 +1329,7 @@ message_queue_run (struct Channel *chn) | |||
1294 | "%p Running message queue.\n", chn); | 1329 | "%p Running message queue.\n", chn); |
1295 | uint64_t n = 0; | 1330 | uint64_t n = 0; |
1296 | uint64_t msg_id; | 1331 | uint64_t msg_id; |
1332 | |||
1297 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, | 1333 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, |
1298 | &msg_id)) | 1334 | &msg_id)) |
1299 | { | 1335 | { |
@@ -1325,7 +1361,7 @@ message_queue_run (struct Channel *chn) | |||
1325 | "%p Out of order message. " | 1361 | "%p Out of order message. " |
1326 | "(%" PRIu64 " - 1 != %" PRIu64 ")\n", | 1362 | "(%" PRIu64 " - 1 != %" PRIu64 ")\n", |
1327 | chn, msg_id, chn->max_message_id); | 1363 | chn, msg_id, chn->max_message_id); |
1328 | break; | 1364 | continue; |
1329 | } | 1365 | } |
1330 | } | 1366 | } |
1331 | else | 1367 | else |
@@ -1336,14 +1372,19 @@ message_queue_run (struct Channel *chn) | |||
1336 | "%p Out of order stateful message. " | 1372 | "%p Out of order stateful message. " |
1337 | "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", | 1373 | "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", |
1338 | chn, msg_id, fragq->state_delta, chn->max_state_message_id); | 1374 | chn, msg_id, fragq->state_delta, chn->max_state_message_id); |
1339 | break; | 1375 | continue; |
1340 | } | 1376 | } |
1341 | #if TODO | 1377 | |
1342 | /* FIXME: apply modifiers to state in PSYCstore */ | 1378 | struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls)); |
1343 | GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id, | 1379 | mcls->chn = chn; |
1344 | store_recv_state_modify_result, cls); | 1380 | mcls->fragq = fragq; |
1345 | #endif | 1381 | mcls->message_id = msg_id; |
1346 | chn->max_state_message_id = msg_id; | 1382 | |
1383 | /* Apply modifiers to state in PSYCstore */ | ||
1384 | GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id, | ||
1385 | fragq->state_delta, | ||
1386 | store_recv_state_modify_result, mcls); | ||
1387 | break; | ||
1347 | } | 1388 | } |
1348 | chn->max_message_id = msg_id; | 1389 | chn->max_message_id = msg_id; |
1349 | } | 1390 | } |
@@ -1351,6 +1392,7 @@ message_queue_run (struct Channel *chn) | |||
1351 | GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); | 1392 | GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); |
1352 | n++; | 1393 | n++; |
1353 | } | 1394 | } |
1395 | |||
1354 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1396 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1355 | "%p Removed %" PRIu64 " messages from queue.\n", chn, n); | 1397 | "%p Removed %" PRIu64 " messages from queue.\n", chn, n); |
1356 | return n; | 1398 | return n; |
@@ -2039,6 +2081,11 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, | |||
2039 | { | 2081 | { |
2040 | pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); | 2082 | pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); |
2041 | } | 2083 | } |
2084 | |||
2085 | if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH) | ||
2086 | { | ||
2087 | /// @todo add state_hash to PSYC header | ||
2088 | } | ||
2042 | } | 2089 | } |
2043 | } | 2090 | } |
2044 | 2091 | ||
diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c index 13d66e6d6..4ad7a914b 100644 --- a/src/psyc/psyc_util_lib.c +++ b/src/psyc/psyc_util_lib.c | |||
@@ -343,7 +343,7 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
343 | 343 | ||
344 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 344 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
345 | "Queueing message part of type %u and size %u (end: %u)).\n", | 345 | "Queueing message part of type %u and size %u (end: %u)).\n", |
346 | ntohs (msg->type), size, end); | 346 | NULL != msg ? ntohs (msg->type) : 0, size, end); |
347 | 347 | ||
348 | if (NULL != tmit->msg) | 348 | if (NULL != tmit->msg) |
349 | { | 349 | { |
@@ -917,7 +917,8 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, | |||
917 | } | 917 | } |
918 | 918 | ||
919 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 919 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
920 | "Received message part from PSYC.\n"); | 920 | "Received message part of type %u and size %u from PSYC.\n", |
921 | ptype, psize); | ||
921 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); | 922 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); |
922 | 923 | ||
923 | switch (ptype) | 924 | switch (ptype) |
@@ -1118,7 +1119,7 @@ GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data, | |||
1118 | ptype, psize); | 1119 | ptype, psize); |
1119 | return GNUNET_SYSERR; | 1120 | return GNUNET_SYSERR; |
1120 | } | 1121 | } |
1121 | /* FIXME: check message part order */ | 1122 | /** @todo FIXME: check message part order */ |
1122 | } | 1123 | } |
1123 | return parts; | 1124 | return parts; |
1124 | } | 1125 | } |