aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-07-18 00:03:06 +0000
committerGabor X Toth <*@tg-x.net>2015-07-18 00:03:06 +0000
commitd38544730123a1f365ef287a7e88060d97b266b7 (patch)
treee66f5ee143adde1a5206f50bbbdd00a8a179fb81 /src/psyc
parent2275976cf61565bde4f17e8c2c0bc0d359541ac4 (diff)
downloadgnunet-d38544730123a1f365ef287a7e88060d97b266b7.tar.gz
gnunet-d38544730123a1f365ef287a7e88060d97b266b7.zip
psyc/store: apply state modifiers
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c63
-rw-r--r--src/psyc/psyc_util_lib.c7
2 files changed, 59 insertions, 11 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 72377680d..29ef07f10 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -416,6 +416,8 @@ struct Slave
416static void 416static void
417transmit_message (struct Channel *chn); 417transmit_message (struct Channel *chn);
418 418
419static uint64_t
420message_queue_run (struct Channel *chn);
419 421
420static uint64_t 422static uint64_t
421message_queue_drop (struct Channel *chn); 423message_queue_drop (struct Channel *chn);
@@ -1274,6 +1276,39 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1274} 1276}
1275 1277
1276 1278
1279struct StateModifyClosure
1280{
1281 struct Channel *chn;
1282 struct FragmentQueue *fragq;
1283 uint64_t message_id;
1284};
1285
1286
1287void
1288store_recv_state_modify_result (void *cls, int64_t result,
1289 const char *err_msg, uint16_t err_msg_size)
1290{
1291 struct StateModifyClosure *mcls = cls;
1292 struct Channel *chn = mcls->chn;
1293 struct FragmentQueue *fragq = mcls->fragq;
1294 uint64_t msg_id = mcls->message_id;
1295
1296 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1297 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1298 chn, result, err_msg_size, err_msg);
1299
1300 if (GNUNET_OK == result)
1301 {
1302 chn->max_state_message_id = msg_id;
1303 chn->max_message_id = msg_id;
1304
1305 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1306 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1307 message_queue_run (chn);
1308 }
1309}
1310
1311
1277/** 1312/**
1278 * Run message queue. 1313 * Run message queue.
1279 * 1314 *
@@ -1294,6 +1329,7 @@ message_queue_run (struct Channel *chn)
1294 "%p Running message queue.\n", chn); 1329 "%p Running message queue.\n", chn);
1295 uint64_t n = 0; 1330 uint64_t n = 0;
1296 uint64_t msg_id; 1331 uint64_t msg_id;
1332
1297 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, 1333 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1298 &msg_id)) 1334 &msg_id))
1299 { 1335 {
@@ -1325,7 +1361,7 @@ message_queue_run (struct Channel *chn)
1325 "%p Out of order message. " 1361 "%p Out of order message. "
1326 "(%" PRIu64 " - 1 != %" PRIu64 ")\n", 1362 "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
1327 chn, msg_id, chn->max_message_id); 1363 chn, msg_id, chn->max_message_id);
1328 break; 1364 continue;
1329 } 1365 }
1330 } 1366 }
1331 else 1367 else
@@ -1336,14 +1372,19 @@ message_queue_run (struct Channel *chn)
1336 "%p Out of order stateful message. " 1372 "%p Out of order stateful message. "
1337 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", 1373 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1338 chn, msg_id, fragq->state_delta, chn->max_state_message_id); 1374 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1339 break; 1375 continue;
1340 } 1376 }
1341#if TODO 1377
1342 /* FIXME: apply modifiers to state in PSYCstore */ 1378 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1343 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id, 1379 mcls->chn = chn;
1344 store_recv_state_modify_result, cls); 1380 mcls->fragq = fragq;
1345#endif 1381 mcls->message_id = msg_id;
1346 chn->max_state_message_id = msg_id; 1382
1383 /* Apply modifiers to state in PSYCstore */
1384 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1385 fragq->state_delta,
1386 store_recv_state_modify_result, mcls);
1387 break;
1347 } 1388 }
1348 chn->max_message_id = msg_id; 1389 chn->max_message_id = msg_id;
1349 } 1390 }
@@ -1351,6 +1392,7 @@ message_queue_run (struct Channel *chn)
1351 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); 1392 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1352 n++; 1393 n++;
1353 } 1394 }
1395
1354 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1396 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355 "%p Removed %" PRIu64 " messages from queue.\n", chn, n); 1397 "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
1356 return n; 1398 return n;
@@ -2039,6 +2081,11 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2039 { 2081 {
2040 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); 2082 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2041 } 2083 }
2084
2085 if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
2086 {
2087 /// @todo add state_hash to PSYC header
2088 }
2042 } 2089 }
2043} 2090}
2044 2091
diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c
index 13d66e6d6..4ad7a914b 100644
--- a/src/psyc/psyc_util_lib.c
+++ b/src/psyc/psyc_util_lib.c
@@ -343,7 +343,7 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
343 343
344 LOG (GNUNET_ERROR_TYPE_DEBUG, 344 LOG (GNUNET_ERROR_TYPE_DEBUG,
345 "Queueing message part of type %u and size %u (end: %u)).\n", 345 "Queueing message part of type %u and size %u (end: %u)).\n",
346 ntohs (msg->type), size, end); 346 NULL != msg ? ntohs (msg->type) : 0, size, end);
347 347
348 if (NULL != tmit->msg) 348 if (NULL != tmit->msg)
349 { 349 {
@@ -917,7 +917,8 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
917 } 917 }
918 918
919 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 919 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
920 "Received message part from PSYC.\n"); 920 "Received message part of type %u and size %u from PSYC.\n",
921 ptype, psize);
921 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); 922 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
922 923
923 switch (ptype) 924 switch (ptype)
@@ -1118,7 +1119,7 @@ GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1118 ptype, psize); 1119 ptype, psize);
1119 return GNUNET_SYSERR; 1120 return GNUNET_SYSERR;
1120 } 1121 }
1121 /* FIXME: check message part order */ 1122 /** @todo FIXME: check message part order */
1122 } 1123 }
1123 return parts; 1124 return parts;
1124} 1125}