aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/gnunet_psyc_service.h32
-rw-r--r--src/multicast/gnunet-service-multicast.c8
-rw-r--r--src/multicast/multicast_api.c4
-rw-r--r--src/psyc/gnunet-service-psyc.c54
-rw-r--r--src/psyc/test_psyc.c22
-rw-r--r--src/psycstore/gnunet-service-psycstore.c10
-rw-r--r--src/psycstore/plugin_psycstore_sqlite.c11
-rw-r--r--src/psycstore/psyc_util_lib.c53
-rw-r--r--src/util/client_manager.c4
9 files changed, 125 insertions, 73 deletions
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h
index 1346e77c7..80404f837 100644
--- a/src/include/gnunet_psyc_service.h
+++ b/src/include/gnunet_psyc_service.h
@@ -537,11 +537,18 @@ typedef void
537/** 537/**
538 * Method called from PSYC upon receiving part of a message. 538 * Method called from PSYC upon receiving part of a message.
539 * 539 *
540 * @param cls Closure. 540 * @param cls
541 * @param message_id Sequence number of the message. 541 * Closure.
542 * @param data_offset Byte offset of data, only set if @a msg has a type 542 * @param slave_key
543 * #GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA 543 * Public key of the slave sending the message.
544 * @param flags OR'ed GNUNET_PSYC_MessageFlags 544 * Only set for channel master.
545 * @param message_id
546 * Sequence number of the message.
547 * @param flags
548 * OR'ed GNUNET_PSYC_MessageFlags
549 * @param data_offset
550 * Byte offset of data, only set if @a msg has a type
551 * #GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA
545 * @param msg Message part, one of the following types: 552 * @param msg Message part, one of the following types:
546 * - #GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_HEADER 553 * - #GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_HEADER
547 * - #GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD 554 * - #GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
@@ -552,19 +559,24 @@ typedef void
552 */ 559 */
553typedef void 560typedef void
554(*GNUNET_PSYC_MessagePartCallback) (void *cls, 561(*GNUNET_PSYC_MessagePartCallback) (void *cls,
562 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
555 uint64_t message_id, 563 uint64_t message_id,
556 uint64_t data_offset,
557 uint32_t flags, 564 uint32_t flags,
565 uint64_t data_offset,
558 const struct GNUNET_MessageHeader *msg); 566 const struct GNUNET_MessageHeader *msg);
559 567
560 568
561/** 569/**
562 * Method called from PSYC upon receiving a join request. 570 * Method called from PSYC upon receiving a join request.
563 * 571 *
564 * @param cls Closure. 572 * @param cls
565 * @param slave_key Public key of the slave requesting join. 573 * Closure.
566 * @param join_msg Join message sent along with the request. 574 * @param slave_key
567 * @param jh Join handle to use with GNUNET_PSYC_join_decision() 575 * Public key of the slave requesting join.
576 * @param join_msg
577 * Join message sent along with the request.
578 * @param jh
579 * Join handle to use with GNUNET_PSYC_join_decision()
568 */ 580 */
569typedef void 581typedef void
570(*GNUNET_PSYC_JoinRequestCallback) (void *cls, 582(*GNUNET_PSYC_JoinRequestCallback) (void *cls,
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c
index afd5102ec..a9b15cef1 100644
--- a/src/multicast/gnunet-service-multicast.c
+++ b/src/multicast/gnunet-service-multicast.c
@@ -788,9 +788,11 @@ client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client,
788 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 788 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
789 "%p Client connected to group %s..\n", 789 "%p Client connected to group %s..\n",
790 mem, GNUNET_h2s (&grp->pub_key_hash)); 790 mem, GNUNET_h2s (&grp->pub_key_hash));
791 char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key);
791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 792 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792 "%p ..as member %s.\n", 793 "%p ..as member %s (%s).\n",
793 mem, GNUNET_h2s (&mem_pub_key_hash)); 794 mem, GNUNET_h2s (&mem->pub_key_hash), str);
795 GNUNET_free (str);
794 796
795 GNUNET_SERVER_client_set_user_context (client, grp); 797 GNUNET_SERVER_client_set_user_context (client, grp);
796 798
@@ -833,6 +835,7 @@ client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client,
833 if (0 < join_msg_size) 835 if (0 < join_msg_size)
834 memcpy (&req[1], join_msg, join_msg_size); 836 memcpy (&req[1], join_msg, join_msg_size);
835 837
838 req->member_key = mem->pub_key;
836 req->purpose.size = htonl (msg_size 839 req->purpose.size = htonl (msg_size
837 - sizeof (req->header) 840 - sizeof (req->header)
838 - sizeof (req->reserved) 841 - sizeof (req->reserved)
@@ -1000,6 +1003,7 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
1000 /* FIXME: yucky, should use separate message structs for P2P and CS! */ 1003 /* FIXME: yucky, should use separate message structs for P2P and CS! */
1001 out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (m); 1004 out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (m);
1002 1005
1006 out->member_key = mem->pub_key;
1003 out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id); 1007 out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id);
1004 out->purpose.size = htonl (ntohs (out->header.size) 1008 out->purpose.size = htonl (ntohs (out->header.size)
1005 - sizeof (out->header) 1009 - sizeof (out->header)
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index ce663542d..5cd729203 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -267,7 +267,7 @@ group_recv_message (void *cls,
267 "Calling message callback with a message of size %u.\n", 267 "Calling message callback with a message of size %u.\n",
268 ntohs (mmsg->header.size)); 268 ntohs (mmsg->header.size));
269 269
270 if (NULL != grp->message_cb) 270 if (GNUNET_YES != grp->is_disconnecting && NULL != grp->message_cb)
271 grp->message_cb (grp->cb_cls, mmsg); 271 grp->message_cb (grp->cb_cls, mmsg);
272} 272}
273 273
@@ -848,7 +848,7 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
848 grp->disconnect_cls = part_cls; 848 grp->disconnect_cls = part_cls;
849 849
850 GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES, 850 GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES,
851 &member_cleanup, mem); 851 member_cleanup, mem);
852} 852}
853 853
854 854
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 2afc98040..e20b2280e 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -534,8 +534,12 @@ cleanup_slave (struct Slave *slv)
534static void 534static void
535cleanup_channel (struct Channel *chn) 535cleanup_channel (struct Channel *chn)
536{ 536{
537 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
538 "%p Cleaning up channel %s. master? %u\n",
539 chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
537 message_queue_drop (chn); 540 message_queue_drop (chn);
538 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash); 541 GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
542 chn->recv_frags = NULL;
539 543
540 if (NULL != chn->store_op) 544 if (NULL != chn->store_op)
541 { 545 {
@@ -565,6 +569,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
565 569
566 struct Channel * 570 struct Channel *
567 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); 571 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
572 chn->is_disconnected = GNUNET_YES;
568 573
569 if (NULL == chn) 574 if (NULL == chn)
570 { 575 {
@@ -1046,6 +1051,7 @@ client_send_mcast_req (struct Master *mst,
1046 pmsg->message_id = req->request_id; 1051 pmsg->message_id = req->request_id;
1047 pmsg->fragment_offset = req->fragment_offset; 1052 pmsg->fragment_offset = req->fragment_offset;
1048 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); 1053 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1054 pmsg->slave_key = req->member_key;
1049 1055
1050 memcpy (&pmsg[1], &req[1], size - sizeof (*req)); 1056 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1051 client_send_msg (chn, &pmsg->header); 1057 client_send_msg (chn, &pmsg->header);
@@ -1229,7 +1235,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1229 struct GNUNET_CONTAINER_MultiHashMap 1235 struct GNUNET_CONTAINER_MultiHashMap
1230 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, 1236 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1231 &chn->pub_key_hash); 1237 &chn->pub_key_hash);
1232 GNUNET_assert (NULL != chan_msgs); // FIXME 1238 GNUNET_assert (NULL != chan_msgs);
1233 uint64_t frag_id; 1239 uint64_t frag_id;
1234 1240
1235 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, 1241 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
@@ -1382,7 +1388,7 @@ message_queue_run (struct Channel *chn)
1382 /* Check if there's a missing message before the current one */ 1388 /* Check if there's a missing message before the current one */
1383 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) 1389 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1384 { 1390 {
1385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n"); 1391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1386 1392
1387 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) 1393 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1388 && (chn->max_message_id != msg_id - 1 1394 && (chn->max_message_id != msg_id - 1
@@ -1399,7 +1405,7 @@ message_queue_run (struct Channel *chn)
1399 } 1405 }
1400 else 1406 else
1401 { 1407 {
1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n"); 1408 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1403 if (GNUNET_YES != fragq->state_is_modified) 1409 if (GNUNET_YES != fragq->state_is_modified)
1404 { 1410 {
1405 if (msg_id - fragq->state_delta != chn->max_state_message_id) 1411 if (msg_id - fragq->state_delta != chn->max_state_message_id)
@@ -1594,12 +1600,12 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1594 mst->max_group_generation = max_group_generation; 1600 mst->max_group_generation = max_group_generation;
1595 mst->origin 1601 mst->origin
1596 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, 1602 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1597 &mcast_recv_join_request, 1603 mcast_recv_join_request,
1598 &mcast_recv_membership_test, 1604 mcast_recv_membership_test,
1599 &mcast_recv_replay_fragment, 1605 mcast_recv_replay_fragment,
1600 &mcast_recv_replay_message, 1606 mcast_recv_replay_message,
1601 &mcast_recv_request, 1607 mcast_recv_request,
1602 &mcast_recv_message, chn); 1608 mcast_recv_message, chn);
1603 chn->is_ready = GNUNET_YES; 1609 chn->is_ready = GNUNET_YES;
1604 } 1610 }
1605 else 1611 else
@@ -1641,12 +1647,12 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1641 &slv->origin, 1647 &slv->origin,
1642 slv->relay_count, slv->relays, 1648 slv->relay_count, slv->relays,
1643 &slv->join_msg->header, 1649 &slv->join_msg->header,
1644 &mcast_recv_join_request, 1650 mcast_recv_join_request,
1645 &mcast_recv_join_decision, 1651 mcast_recv_join_decision,
1646 &mcast_recv_membership_test, 1652 mcast_recv_membership_test,
1647 &mcast_recv_replay_fragment, 1653 mcast_recv_replay_fragment,
1648 &mcast_recv_replay_message, 1654 mcast_recv_replay_message,
1649 &mcast_recv_message, chn); 1655 mcast_recv_message, chn);
1650 if (NULL != slv->join_msg) 1656 if (NULL != slv->join_msg)
1651 { 1657 {
1652 GNUNET_free (slv->join_msg); 1658 GNUNET_free (slv->join_msg);
@@ -2004,7 +2010,7 @@ transmit_notify (void *cls, size_t *data_size, void *data)
2004 else if (GNUNET_YES == chn->is_disconnected) 2010 else if (GNUNET_YES == chn->is_disconnected)
2005 { 2011 {
2006 /* FIXME: handle partial message (when still in_transmit) */ 2012 /* FIXME: handle partial message (when still in_transmit) */
2007 cleanup_channel (chn); 2013 return GNUNET_SYSERR;
2008 } 2014 }
2009 return ret; 2015 return ret;
2010} 2016}
@@ -2113,13 +2119,17 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2113 } 2119 }
2114 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) 2120 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2115 { 2121 {
2116 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_modify flag\n", mst); 2122 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2123 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2124 mst, tmit_msg->id - mst->max_state_message_id);
2117 pmeth->state_delta = GNUNET_htonll (tmit_msg->id 2125 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2118 - mst->max_state_message_id); 2126 - mst->max_state_message_id);
2127 mst->max_state_message_id = tmit_msg->id;
2119 } 2128 }
2120 else 2129 else
2121 { 2130 {
2122 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_not_modified flag\n", mst); 2131 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2132 "%p master_queue_message: state not modified\n", mst);
2123 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); 2133 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2124 } 2134 }
2125 2135
@@ -2234,7 +2244,9 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2234 uint16_t size = ntohs (msg->size); 2244 uint16_t size = ntohs (msg->size);
2235 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) 2245 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2236 { 2246 {
2237 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn); 2247 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2248 "%p Message payload too large: %u < %u.\n",
2249 chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
2238 GNUNET_break (0); 2250 GNUNET_break (0);
2239 transmit_cancel (chn, client); 2251 transmit_cancel (chn, client);
2240 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2252 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
@@ -2613,7 +2625,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
2613 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 2625 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2614 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 2626 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2615 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 2627 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2616 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 2628 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2617 nc = GNUNET_SERVER_notification_context_create (server, 1); 2629 nc = GNUNET_SERVER_notification_context_create (server, 1);
2618 GNUNET_SERVER_add_handlers (server, server_handlers); 2630 GNUNET_SERVER_add_handlers (server, server_handlers);
2619 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); 2631 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 4e7979a4d..863eaab88 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -213,8 +213,9 @@ master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
213 213
214 214
215void 215void
216master_message_part_cb (void *cls, uint64_t message_id, 216master_message_part_cb (void *cls,
217 uint64_t data_offset, uint32_t flags, 217 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
218 uint64_t message_id, uint32_t flags, uint64_t data_offset,
218 const struct GNUNET_MessageHeader *msg) 219 const struct GNUNET_MessageHeader *msg)
219{ 220{
220 if (NULL == msg) 221 if (NULL == msg)
@@ -284,8 +285,9 @@ slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
284 285
285 286
286void 287void
287slave_message_part_cb (void *cls, uint64_t message_id, 288slave_message_part_cb (void *cls,
288 uint64_t data_offset, uint32_t flags, 289 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
290 uint64_t message_id, uint32_t flags, uint64_t data_offset,
289 const struct GNUNET_MessageHeader *msg) 291 const struct GNUNET_MessageHeader *msg)
290{ 292{
291 if (NULL == msg) 293 if (NULL == msg)
@@ -453,9 +455,9 @@ slave_history_replay ()
453 test = TEST_SLAVE_HISTORY_REPLAY; 455 test = TEST_SLAVE_HISTORY_REPLAY;
454 GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "", 456 GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "",
455 GNUNET_PSYC_HISTORY_REPLAY_LOCAL, 457 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
456 &slave_message_cb, 458 slave_message_cb,
457 &slave_message_part_cb, 459 slave_message_part_cb,
458 &slave_history_replay_result, NULL); 460 slave_history_replay_result, NULL);
459} 461}
460 462
461 463
@@ -481,9 +483,9 @@ master_history_replay ()
481 test = TEST_MASTER_HISTORY_REPLAY; 483 test = TEST_MASTER_HISTORY_REPLAY;
482 GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "", 484 GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "",
483 GNUNET_PSYC_HISTORY_REPLAY_LOCAL, 485 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
484 &master_message_cb, 486 master_message_cb,
485 &master_message_part_cb, 487 master_message_part_cb,
486 &master_history_replay_result, NULL); 488 master_history_replay_result, NULL);
487} 489}
488 490
489 491
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c
index 1f9de54f8..01eb5d707 100644
--- a/src/psycstore/gnunet-service-psycstore.c
+++ b/src/psycstore/gnunet-service-psycstore.c
@@ -508,8 +508,10 @@ struct StateModifyClosure
508 508
509 509
510static void 510static void
511recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset, 511recv_state_message_part (void *cls,
512 uint32_t flags, const struct GNUNET_MessageHeader *msg) 512 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
513 uint64_t message_id, uint32_t flags, uint64_t data_offset,
514 const struct GNUNET_MessageHeader *msg)
513{ 515{
514 struct StateModifyClosure *scls = cls; 516 struct StateModifyClosure *scls = cls;
515 uint16_t psize; 517 uint16_t psize;
@@ -618,7 +620,7 @@ recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg,
618 620
619 if (NULL == scls->recv) 621 if (NULL == scls->recv)
620 { 622 {
621 scls->recv = GNUNET_PSYC_receive_create (NULL, &recv_state_message_part, 623 scls->recv = GNUNET_PSYC_receive_create (NULL, recv_state_message_part,
622 scls); 624 scls);
623 } 625 }
624 626
@@ -660,7 +662,7 @@ handle_state_modify (void *cls,
660 { 662 {
661 ret = db->message_get (db->cls, &req->channel_key, 663 ret = db->message_get (db->cls, &req->channel_key,
662 message_id, message_id, 664 message_id, message_id,
663 &ret_frags, &recv_state_fragment, &scls); 665 &ret_frags, recv_state_fragment, &scls);
664 if (GNUNET_OK != ret) 666 if (GNUNET_OK != ret)
665 { 667 {
666 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 668 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c
index 1bf14644b..8f9a3ef63 100644
--- a/src/psycstore/plugin_psycstore_sqlite.c
+++ b/src/psycstore/plugin_psycstore_sqlite.c
@@ -598,7 +598,7 @@ database_setup (struct Plugin *plugin)
598 "SELECT name, value_current\n" 598 "SELECT name, value_current\n"
599 "FROM state\n" 599 "FROM state\n"
600 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" 600 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
601 " AND (name = ? OR name LIKE ?);", 601 " AND (name = ? OR substr(name, 1, ?) = ? || '_');",
602 &plugin->select_state_prefix); 602 &plugin->select_state_prefix);
603 603
604 sql_prepare (plugin->dbh, 604 sql_prepare (plugin->dbh,
@@ -1763,16 +1763,12 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
1763 int ret = GNUNET_SYSERR; 1763 int ret = GNUNET_SYSERR;
1764 sqlite3_stmt *stmt = plugin->select_state_prefix; 1764 sqlite3_stmt *stmt = plugin->select_state_prefix;
1765 size_t name_len = strlen (name); 1765 size_t name_len = strlen (name);
1766 char *name_prefix;
1767 1766
1768 GNUNET_asprintf (&name_prefix,
1769 "%s_%%",
1770 name);
1771 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1767 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1772 sizeof (*channel_key), SQLITE_STATIC) 1768 sizeof (*channel_key), SQLITE_STATIC)
1773 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC) 1769 || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
1774 || SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2, 1770 || SQLITE_OK != sqlite3_bind_int (stmt, 3, name_len + 1)
1775 SQLITE_STATIC)) 1771 || SQLITE_OK != sqlite3_bind_text (stmt, 4, name, name_len, SQLITE_STATIC))
1776 { 1772 {
1777 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1773 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1778 "sqlite3_bind"); 1774 "sqlite3_bind");
@@ -1808,7 +1804,6 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
1808 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1804 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1809 "sqlite3_reset"); 1805 "sqlite3_reset");
1810 } 1806 }
1811 GNUNET_free (name_prefix);
1812 return ret; 1807 return ret;
1813} 1808}
1814 1809
diff --git a/src/psycstore/psyc_util_lib.c b/src/psycstore/psyc_util_lib.c
index 7b13ac57f..e45bcafb3 100644
--- a/src/psycstore/psyc_util_lib.c
+++ b/src/psycstore/psyc_util_lib.c
@@ -401,12 +401,17 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
401static void 401static void
402transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) 402transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
403{ 403{
404 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; 404 int notify_ret = GNUNET_YES;
405 uint16_t data_size = 0;
405 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; 406 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
406 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; 407 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
407 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); 408 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
408 409
409 int notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); 410 if (NULL != tmit->notify_data)
411 {
412 data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
413 notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
414 }
410 LOG (GNUNET_ERROR_TYPE_DEBUG, 415 LOG (GNUNET_ERROR_TYPE_DEBUG,
411 "transmit_data (ret: %d, size: %u): %.*s\n", 416 "transmit_data (ret: %d, size: %u): %.*s\n",
412 notify_ret, data_size, data_size, &msg[1]); 417 notify_ret, data_size, data_size, &msg[1]);
@@ -461,10 +466,11 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
461static void 466static void
462transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) 467transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
463{ 468{
464 uint16_t max_data_size, data_size; 469 uint16_t max_data_size = 0;
470 uint16_t data_size = 0;
465 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; 471 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
466 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; 472 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
467 int notify_ret; 473 int notify_ret = GNUNET_YES;
468 474
469 switch (tmit->state) 475 switch (tmit->state)
470 { 476 {
@@ -472,11 +478,16 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
472 { 478 {
473 struct GNUNET_PSYC_MessageModifier *mod 479 struct GNUNET_PSYC_MessageModifier *mod
474 = (struct GNUNET_PSYC_MessageModifier *) msg; 480 = (struct GNUNET_PSYC_MessageModifier *) msg;
475 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
476 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); 481 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
477 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); 482 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
478 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], 483
479 &mod->oper, &mod->value_size); 484 if (NULL != tmit->notify_mod)
485 {
486 max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
487 data_size = max_data_size;
488 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
489 &mod->oper, &mod->value_size);
490 }
480 491
481 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1; 492 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
482 LOG (GNUNET_ERROR_TYPE_DEBUG, 493 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -498,11 +509,16 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
498 } 509 }
499 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: 510 case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
500 { 511 {
501 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
502 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); 512 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
503 msg->size = sizeof (struct GNUNET_MessageHeader); 513 msg->size = sizeof (struct GNUNET_MessageHeader);
504 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, 514
505 &data_size, &msg[1], NULL, NULL); 515 if (NULL != tmit->notify_mod)
516 {
517 max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
518 data_size = max_data_size;
519 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
520 &data_size, &msg[1], NULL, NULL);
521 }
506 tmit->mod_value_remaining -= data_size; 522 tmit->mod_value_remaining -= data_size;
507 LOG (GNUNET_ERROR_TYPE_DEBUG, 523 LOG (GNUNET_ERROR_TYPE_DEBUG,
508 "transmit_mod (ret: %d, size: %u): %.*s\n", 524 "transmit_mod (ret: %d, size: %u): %.*s\n",
@@ -847,7 +863,8 @@ static void
847recv_error (struct GNUNET_PSYC_ReceiveHandle *recv) 863recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
848{ 864{
849 if (NULL != recv->message_part_cb) 865 if (NULL != recv->message_part_cb)
850 recv->message_part_cb (recv->cb_cls, recv->message_id, 0, recv->flags, NULL); 866 recv->message_part_cb (recv->cb_cls, NULL, recv->message_id, recv->flags,
867 0, NULL);
851 868
852 if (NULL != recv->message_cb) 869 if (NULL != recv->message_cb)
853 recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL); 870 recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
@@ -1066,8 +1083,10 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
1066 } 1083 }
1067 1084
1068 if (NULL != recv->message_part_cb) 1085 if (NULL != recv->message_part_cb)
1069 recv->message_part_cb (recv->cb_cls, recv->message_id, 0, // FIXME: data_offset 1086 recv->message_part_cb (recv->cb_cls, &recv->slave_key,
1070 recv->flags, pmsg); 1087 recv->message_id, recv->flags,
1088 0, // FIXME: data_offset
1089 pmsg);
1071 1090
1072 switch (ptype) 1091 switch (ptype)
1073 { 1092 {
@@ -1144,8 +1163,10 @@ struct ParseMessageClosure
1144 1163
1145 1164
1146static void 1165static void
1147parse_message_part_cb (void *cls, uint64_t message_id, uint64_t data_offset, 1166parse_message_part_cb (void *cls,
1148 uint32_t flags, const struct GNUNET_MessageHeader *msg) 1167 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1168 uint64_t message_id, uint32_t flags, uint64_t data_offset,
1169 const struct GNUNET_MessageHeader *msg)
1149{ 1170{
1150 struct ParseMessageClosure *pmc = cls; 1171 struct ParseMessageClosure *pmc = cls;
1151 if (NULL == msg) 1172 if (NULL == msg)
@@ -1230,7 +1251,7 @@ GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_Message *msg,
1230 memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg)); 1251 memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1231 1252
1232 struct GNUNET_PSYC_ReceiveHandle * 1253 struct GNUNET_PSYC_ReceiveHandle *
1233 recv = GNUNET_PSYC_receive_create (NULL, &parse_message_part_cb, &cls); 1254 recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
1234 GNUNET_PSYC_receive_message (recv, pmsg); 1255 GNUNET_PSYC_receive_message (recv, pmsg);
1235 GNUNET_PSYC_receive_destroy (recv); 1256 GNUNET_PSYC_receive_destroy (recv);
1236 GNUNET_free (pmsg); 1257 GNUNET_free (pmsg);
diff --git a/src/util/client_manager.c b/src/util/client_manager.c
index d6e5f155b..98cf5b1ad 100644
--- a/src/util/client_manager.c
+++ b/src/util/client_manager.c
@@ -191,6 +191,10 @@ recv_message (void *cls, const struct GNUNET_MessageHeader *msg)
191 size = ntohs (msg->size); 191 size = ntohs (msg->size);
192 /* FIXME: decrease reconnect_delay gradually after a successful reconnection */ 192 /* FIXME: decrease reconnect_delay gradually after a successful reconnection */
193 } 193 }
194 else /* disconnected */
195 {
196 mgr->client_tmit = NULL;
197 }
194 198
195 size_t i = 0; 199 size_t i = 0;
196 while (NULL != mgr->handlers[i].callback) 200 while (NULL != mgr->handlers[i].callback)