aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-08-28 13:33:43 +0000
committerGabor X Toth <*@tg-x.net>2015-08-28 13:33:43 +0000
commit38963d1e81332032e0ac774f4f2c6b804c38802a (patch)
treece33b979e47fe332c7c744744d60077a7e1fefee /src/psyc
parentb4fa14499c64140273850569247abda687803053 (diff)
downloadgnunet-38963d1e81332032e0ac774f4f2c6b804c38802a.tar.gz
gnunet-38963d1e81332032e0ac774f4f2c6b804c38802a.zip
psyc/social: get state from psycstore
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c156
-rw-r--r--src/psyc/psyc_api.c22
-rw-r--r--src/psyc/test_psyc.c20
3 files changed, 119 insertions, 79 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 4c34f6108..2afc98040 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -1,3 +1,4 @@
1
1/* 2/*
2 * This file is part of GNUnet 3 * This file is part of GNUnet
3 * Copyright (C) 2013 Christian Grothoff (and other contributing authors) 4 * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
@@ -171,6 +172,11 @@ struct FragmentQueue
171 uint8_t state; 172 uint8_t state;
172 173
173 /** 174 /**
175 * Whether the state is already modified in PSYCstore.
176 */
177 uint8_t state_is_modified;
178
179 /**
174 * Is the message queued for delivery to the client? 180 * Is the message queued for delivery to the client?
175 * i.e. added to the recv_msgs queue 181 * i.e. added to the recv_msgs queue
176 */ 182 */
@@ -460,9 +466,9 @@ op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
460 466
461 467
462static void 468static void
463op_remove (struct Channel *chn, struct Operation *op) 469op_remove (struct Operation *op)
464{ 470{
465 GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op); 471 GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
466 GNUNET_free (op); 472 GNUNET_free (op);
467} 473}
468 474
@@ -1008,7 +1014,8 @@ client_send_mcast_msg (struct Channel *chn,
1008 chn, GNUNET_ntohll (mmsg->fragment_id), 1014 chn, GNUNET_ntohll (mmsg->fragment_id),
1009 GNUNET_ntohll (mmsg->message_id)); 1015 GNUNET_ntohll (mmsg->message_id));
1010 1016
1011 struct GNUNET_PSYC_MessageHeader *pmsg = psyc_msg_new (mmsg, flags); 1017 struct GNUNET_PSYC_MessageHeader *
1018 pmsg = GNUNET_PSYC_message_header_create (mmsg, flags);
1012 client_send_msg (chn, &pmsg->header); 1019 client_send_msg (chn, &pmsg->header);
1013 GNUNET_free (pmsg); 1020 GNUNET_free (pmsg);
1014} 1021}
@@ -1049,7 +1056,7 @@ client_send_mcast_req (struct Master *mst,
1049/** 1056/**
1050 * Insert a multicast message fragment into the queue belonging to the message. 1057 * Insert a multicast message fragment into the queue belonging to the message.
1051 * 1058 *
1052 * @param chn Channel. 1059 * @param chn Channel.
1053 * @param mmsg Multicast message fragment. 1060 * @param mmsg Multicast message fragment.
1054 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. 1061 * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
1055 * @param first_ptype First PSYC message part type in @a mmsg. 1062 * @param first_ptype First PSYC message part type in @a mmsg.
@@ -1222,7 +1229,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1222 struct GNUNET_CONTAINER_MultiHashMap 1229 struct GNUNET_CONTAINER_MultiHashMap
1223 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, 1230 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1224 &chn->pub_key_hash); 1231 &chn->pub_key_hash);
1225 GNUNET_assert (NULL != chan_msgs); 1232 GNUNET_assert (NULL != chan_msgs); // FIXME
1226 uint64_t frag_id; 1233 uint64_t frag_id;
1227 1234
1228 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, 1235 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
@@ -1279,8 +1286,8 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1279struct StateModifyClosure 1286struct StateModifyClosure
1280{ 1287{
1281 struct Channel *chn; 1288 struct Channel *chn;
1282 struct FragmentQueue *fragq; 1289 uint64_t msg_id;
1283 uint64_t message_id; 1290 struct GNUNET_HashCode msg_id_hash;
1284}; 1291};
1285 1292
1286 1293
@@ -1290,21 +1297,37 @@ store_recv_state_modify_result (void *cls, int64_t result,
1290{ 1297{
1291 struct StateModifyClosure *mcls = cls; 1298 struct StateModifyClosure *mcls = cls;
1292 struct Channel *chn = mcls->chn; 1299 struct Channel *chn = mcls->chn;
1293 struct FragmentQueue *fragq = mcls->fragq; 1300 uint64_t msg_id = mcls->msg_id;
1294 uint64_t msg_id = mcls->message_id; 1301
1302 struct FragmentQueue *
1303 fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &mcls->msg_id_hash);
1295 1304
1296 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1305 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1297 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n", 1306 "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
1298 chn, result, err_msg_size, err_msg); 1307 chn, result, err_msg_size, err_msg);
1299 1308
1300 if (GNUNET_OK == result) 1309 switch (result)
1301 { 1310 {
1302 chn->max_state_message_id = msg_id; 1311 case GNUNET_OK:
1303 chn->max_message_id = msg_id; 1312 case GNUNET_NO:
1313 if (NULL != fragq)
1314 fragq->state_is_modified = GNUNET_YES;
1315 if (chn->max_state_message_id < msg_id)
1316 chn->max_state_message_id = msg_id;
1317 if (chn->max_message_id < msg_id)
1318 chn->max_message_id = msg_id;
1304 1319
1305 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); 1320 if (NULL != fragq)
1321 fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
1306 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); 1322 GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
1307 message_queue_run (chn); 1323 message_queue_run (chn);
1324 break;
1325
1326 default:
1327 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1328 "%p GNUNET_PSYCSTORE_state_modify() failed with error %" PRId64 " (%.*s)\n",
1329 chn, result, err_msg_size, err_msg);
1330 /** @todo FIXME: handle state_modify error */
1308 } 1331 }
1309} 1332}
1310 1333
@@ -1349,42 +1372,58 @@ message_queue_run (struct Channel *chn)
1349 break; 1372 break;
1350 } 1373 }
1351 1374
1352 if (MSG_FRAG_STATE_HEADER == fragq->state) 1375 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1376 "%p Fragment queue entry: state: %u, state delta: "
1377 "%" PRIu64 " - %" PRIu64 " ?= %" PRIu64 "\n",
1378 chn, fragq->state, msg_id, fragq->state_delta, chn->max_state_message_id);
1379
1380 if (MSG_FRAG_STATE_DATA <= fragq->state)
1353 { 1381 {
1354 /* Check if there's a missing message before the current one */ 1382 /* Check if there's a missing message before the current one */
1355 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) 1383 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1356 { 1384 {
1385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n");
1386
1357 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) 1387 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1358 && msg_id - 1 != chn->max_message_id) 1388 && (chn->max_message_id != msg_id - 1
1389 && chn->max_message_id != msg_id))
1359 { 1390 {
1360 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1391 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1361 "%p Out of order message. " 1392 "%p Out of order message. "
1362 "(%" PRIu64 " - 1 != %" PRIu64 ")\n", 1393 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1363 chn, msg_id, chn->max_message_id); 1394 chn, chn->max_message_id, msg_id);
1364 continue; 1395 break;
1396 // FIXME: keep track of messages processed in this queue run,
1397 // and only stop after reaching the end
1365 } 1398 }
1366 } 1399 }
1367 else 1400 else
1368 { 1401 {
1369 if (msg_id - fragq->state_delta != chn->max_state_message_id) 1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n");
1403 if (GNUNET_YES != fragq->state_is_modified)
1370 { 1404 {
1371 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1405 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1372 "%p Out of order stateful message. " 1406 {
1373 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", 1407 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1374 chn, msg_id, fragq->state_delta, chn->max_state_message_id); 1408 "%p Out of order stateful message. "
1375 continue; 1409 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1410 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
1411 break;
1412 // FIXME: keep track of messages processed in this queue run,
1413 // and only stop after reaching the end
1414 }
1415
1416 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1417 mcls->chn = chn;
1418 mcls->msg_id = msg_id;
1419 mcls->msg_id_hash = msg_id_hash;
1420
1421 /* Apply modifiers to state in PSYCstore */
1422 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1423 fragq->state_delta,
1424 store_recv_state_modify_result, mcls);
1425 break; // continue after asynchronous state modify result
1376 } 1426 }
1377
1378 struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
1379 mcls->chn = chn;
1380 mcls->fragq = fragq;
1381 mcls->message_id = msg_id;
1382
1383 /* Apply modifiers to state in PSYCstore */
1384 GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
1385 fragq->state_delta,
1386 store_recv_state_modify_result, mcls);
1387 break;
1388 } 1427 }
1389 chn->max_message_id = msg_id; 1428 chn->max_message_id = msg_id;
1390 } 1429 }
@@ -2060,7 +2099,7 @@ static void
2060master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, 2099master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2061 uint16_t first_ptype, uint16_t last_ptype) 2100 uint16_t first_ptype, uint16_t last_ptype)
2062{ 2101{
2063 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst); 2102 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2064 2103
2065 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) 2104 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
2066 { 2105 {
@@ -2074,11 +2113,13 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2074 } 2113 }
2075 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) 2114 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2076 { 2115 {
2116 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_modify flag\n", mst);
2077 pmeth->state_delta = GNUNET_htonll (tmit_msg->id 2117 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2078 - mst->max_state_message_id); 2118 - mst->max_state_message_id);
2079 } 2119 }
2080 else 2120 else
2081 { 2121 {
2122 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_not_modified flag\n", mst);
2082 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); 2123 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2083 } 2124 }
2084 2125
@@ -2226,14 +2267,6 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2226}; 2267};
2227 2268
2228 2269
2229struct MembershipStoreClosure
2230{
2231 struct GNUNET_SERVER_Client *client;
2232 struct Channel *chn;
2233 uint64_t op_id;
2234};
2235
2236
2237/** 2270/**
2238 * Received result of GNUNET_PSYCSTORE_membership_store() 2271 * Received result of GNUNET_PSYCSTORE_membership_store()
2239 */ 2272 */
@@ -2241,12 +2274,13 @@ static void
2241store_recv_membership_store_result (void *cls, int64_t result, 2274store_recv_membership_store_result (void *cls, int64_t result,
2242 const char *err_msg, uint16_t err_msg_size) 2275 const char *err_msg, uint16_t err_msg_size)
2243{ 2276{
2244 struct MembershipStoreClosure *mcls = cls; 2277 struct Operation *op = cls;
2245 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2278 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2246 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n", 2279 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2247 mcls->chn, result, err_msg_size, err_msg); 2280 op->chn, result, err_msg_size, err_msg);
2248 2281
2249 client_send_result (mcls->client, mcls->op_id, result, err_msg, err_msg_size); 2282 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2283 op_remove (op);
2250} 2284}
2251 2285
2252 2286
@@ -2264,10 +2298,7 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2264 const struct ChannelMembershipStoreRequest * 2298 const struct ChannelMembershipStoreRequest *
2265 req = (const struct ChannelMembershipStoreRequest *) msg; 2299 req = (const struct ChannelMembershipStoreRequest *) msg;
2266 2300
2267 struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls)); 2301 struct Operation *op = op_add (chn, client, req->op_id, 0);
2268 mcls->client = client;
2269 mcls->chn = chn;
2270 mcls->op_id = req->op_id;
2271 2302
2272 uint64_t announced_at = GNUNET_ntohll (req->announced_at); 2303 uint64_t announced_at = GNUNET_ntohll (req->announced_at);
2273 uint64_t effective_since = GNUNET_ntohll (req->effective_since); 2304 uint64_t effective_since = GNUNET_ntohll (req->effective_since);
@@ -2280,7 +2311,7 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2280 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key, 2311 GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
2281 req->did_join, announced_at, effective_since, 2312 req->did_join, announced_at, effective_since,
2282 0, /* FIXME: group_generation */ 2313 0, /* FIXME: group_generation */
2283 &store_recv_membership_store_result, mcls); 2314 &store_recv_membership_store_result, op);
2284 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2315 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2285} 2316}
2286 2317
@@ -2313,7 +2344,7 @@ store_recv_fragment_history (void *cls,
2313 res->result_code = GNUNET_htonll (GNUNET_OK); 2344 res->result_code = GNUNET_htonll (GNUNET_OK);
2314 2345
2315 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; 2346 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2316 psyc_msg_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC); 2347 GNUNET_PSYC_message_header_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2317 memcpy (&res[1], pmsg, psize); 2348 memcpy (&res[1], pmsg, psize);
2318 2349
2319 /** @todo FIXME: send only to requesting client */ 2350 /** @todo FIXME: send only to requesting client */
@@ -2339,7 +2370,7 @@ store_recv_fragment_history_result (void *cls, int64_t result,
2339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2370 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2340 "%p History replay #%" PRIu64 ": " 2371 "%p History replay #%" PRIu64 ": "
2341 "PSYCSTORE returned %" PRId64 " (%.*s)\n", 2372 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2342 op->chn, op->op_id, result, err_msg_size, err_msg); 2373 op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2343 2374
2344 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE) 2375 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2345 { 2376 {
@@ -2347,6 +2378,7 @@ store_recv_fragment_history_result (void *cls, int64_t result,
2347 } 2378 }
2348 2379
2349 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); 2380 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2381 op_remove (op);
2350} 2382}
2351 2383
2352 2384
@@ -2404,12 +2436,16 @@ client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2404 */ 2436 */
2405static int 2437static int
2406store_recv_state_var (void *cls, const char *name, 2438store_recv_state_var (void *cls, const char *name,
2407 const void *value, size_t value_size) 2439 const void *value, uint32_t value_size)
2408{ 2440{
2409 struct Operation *op = cls; 2441 struct Operation *op = cls;
2410 struct GNUNET_OperationResultMessage *res; 2442 struct GNUNET_OperationResultMessage *res;
2411 2443
2412 if (NULL != name) 2444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2445 "%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
2446 op->chn, GNUNET_ntohll (op->op_id), name);
2447
2448 if (NULL != name) /* First part */
2413 { 2449 {
2414 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; 2450 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2415 struct GNUNET_PSYC_MessageModifier *mod; 2451 struct GNUNET_PSYC_MessageModifier *mod;
@@ -2427,7 +2463,7 @@ store_recv_state_var (void *cls, const char *name,
2427 memcpy (&mod[1], name, name_size); 2463 memcpy (&mod[1], name, name_size);
2428 memcpy (((char *) &mod[1]) + name_size, value, value_size); 2464 memcpy (((char *) &mod[1]) + name_size, value, value_size);
2429 } 2465 }
2430 else 2466 else /* Continuation */
2431 { 2467 {
2432 struct GNUNET_MessageHeader *mod; 2468 struct GNUNET_MessageHeader *mod;
2433 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size); 2469 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
@@ -2445,7 +2481,6 @@ store_recv_state_var (void *cls, const char *name,
2445 GNUNET_SERVER_notification_context_add (nc, op->client); 2481 GNUNET_SERVER_notification_context_add (nc, op->client);
2446 GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header, 2482 GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2447 GNUNET_NO); 2483 GNUNET_NO);
2448 GNUNET_free (op);
2449 return GNUNET_YES; 2484 return GNUNET_YES;
2450} 2485}
2451 2486
@@ -2460,12 +2495,13 @@ store_recv_state_result (void *cls, int64_t result,
2460{ 2495{
2461 struct Operation *op = cls; 2496 struct Operation *op = cls;
2462 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2497 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2463 "%p History replay #%" PRIu64 ": " 2498 "%p state_get #%" PRIu64 ": "
2464 "PSYCSTORE returned %" PRId64 " (%.*s)\n", 2499 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2465 op->chn, op->op_id, result, err_msg_size, err_msg); 2500 op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size, err_msg);
2466 2501
2467 // FIXME: client might have been disconnected 2502 // FIXME: client might have been disconnected
2468 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); 2503 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2504 op_remove (op);
2469} 2505}
2470 2506
2471 2507
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 5fc5391a0..b862eee9c 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -366,34 +366,36 @@ channel_recv_state_result (void *cls,
366 } 366 }
367 367
368 const struct GNUNET_MessageHeader * 368 const struct GNUNET_MessageHeader *
369 modc = (struct GNUNET_MessageHeader *) &res[1]; 369 mod = (struct GNUNET_MessageHeader *) &res[1];
370 uint16_t modc_size = ntohs (modc->size); 370 uint16_t mod_size = ntohs (mod->size);
371 if (ntohs (msg->size) - sizeof (*msg) != modc_size) 371 if (ntohs (msg->size) - sizeof (*res) != mod_size)
372 { 372 {
373 GNUNET_break (0); 373 GNUNET_break (0);
374 return; 374 return;
375 } 375 }
376 switch (ntohs (modc->type)) 376 switch (ntohs (mod->type))
377 { 377 {
378 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: 378 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
379 { 379 {
380 const struct GNUNET_PSYC_MessageModifier * 380 const struct GNUNET_PSYC_MessageModifier *
381 mod = (const struct GNUNET_PSYC_MessageModifier *) modc; 381 pmod = (const struct GNUNET_PSYC_MessageModifier *) mod;
382 382
383 const char *name = (const char *) &mod[1]; 383 const char *name = (const char *) &pmod[1];
384 uint16_t name_size = ntohs (mod->name_size); 384 uint16_t name_size = ntohs (pmod->name_size);
385 if ('\0' != name[name_size - 1]) 385 if ('\0' != name[name_size - 1])
386 { 386 {
387 GNUNET_break (0); 387 GNUNET_break (0);
388 return; 388 return;
389 } 389 }
390 sr->var_cb (sr->cls, name, name + name_size, ntohs (mod->value_size)); 390 sr->var_cb (sr->cls, mod, name, name + name_size,
391 ntohs (pmod->header.size) - sizeof (*pmod),
392 ntohs (pmod->value_size));
391 break; 393 break;
392 } 394 }
393 395
394 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: 396 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
395 sr->var_cb (sr->cls, NULL, (const char *) &modc[1], 397 sr->var_cb (sr->cls, mod, NULL, (const char *) &mod[1],
396 modc_size - sizeof (*modc)); 398 mod_size - sizeof (*mod), 0);
397 break; 399 break;
398 } 400 }
399} 401}
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index d62474afb..4e7979a4d 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -329,7 +329,9 @@ slave_message_part_cb (void *cls, uint64_t message_id,
329 329
330 330
331void 331void
332state_get_var (void *cls, const char *name, const void *value, size_t value_size) 332state_get_var (void *cls, const struct GNUNET_MessageHeader *mod,
333 const char *name, const void *value,
334 uint32_t value_size, uint32_t full_value_size)
333{ 335{
334 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
335 "Got state var: %s\n%.*s\n", name, value_size, value); 337 "Got state var: %s\n%.*s\n", name, value_size, value);
@@ -354,8 +356,8 @@ void
354slave_state_get_prefix () 356slave_state_get_prefix ()
355{ 357{
356 test = TEST_SLAVE_STATE_GET_PREFIX; 358 test = TEST_SLAVE_STATE_GET_PREFIX;
357 GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", &state_get_var, 359 GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", state_get_var,
358 &slave_state_get_prefix_result, NULL); 360 slave_state_get_prefix_result, NULL);
359} 361}
360 362
361 363
@@ -377,8 +379,8 @@ void
377master_state_get_prefix () 379master_state_get_prefix ()
378{ 380{
379 test = TEST_MASTER_STATE_GET_PREFIX; 381 test = TEST_MASTER_STATE_GET_PREFIX;
380 GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", &state_get_var, 382 GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", state_get_var,
381 &master_state_get_prefix_result, NULL); 383 master_state_get_prefix_result, NULL);
382} 384}
383 385
384 386
@@ -401,8 +403,8 @@ void
401slave_state_get () 403slave_state_get ()
402{ 404{
403 test = TEST_SLAVE_STATE_GET; 405 test = TEST_SLAVE_STATE_GET;
404 GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", &state_get_var, 406 GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", state_get_var,
405 &slave_state_get_result, NULL); 407 slave_state_get_result, NULL);
406} 408}
407 409
408 410
@@ -425,8 +427,8 @@ void
425master_state_get () 427master_state_get ()
426{ 428{
427 test = TEST_MASTER_STATE_GET; 429 test = TEST_MASTER_STATE_GET;
428 GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", &state_get_var, 430 GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", state_get_var,
429 &master_state_get_result, NULL); 431 master_state_get_result, NULL);
430} 432}
431 433
432 434