diff options
-rw-r--r-- | src/include/gnunet_psyc_service.h | 32 | ||||
-rw-r--r-- | src/multicast/gnunet-service-multicast.c | 8 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 4 | ||||
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 54 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 22 | ||||
-rw-r--r-- | src/psycstore/gnunet-service-psycstore.c | 10 | ||||
-rw-r--r-- | src/psycstore/plugin_psycstore_sqlite.c | 11 | ||||
-rw-r--r-- | src/psycstore/psyc_util_lib.c | 53 | ||||
-rw-r--r-- | src/util/client_manager.c | 4 |
9 files changed, 125 insertions, 73 deletions
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h index 1346e77c7..80404f837 100644 --- a/src/include/gnunet_psyc_service.h +++ b/src/include/gnunet_psyc_service.h | |||
@@ -537,11 +537,18 @@ typedef void | |||
537 | /** | 537 | /** |
538 | * Method called from PSYC upon receiving part of a message. | 538 | * Method called from PSYC upon receiving part of a message. |
539 | * | 539 | * |
540 | * @param cls Closure. | 540 | * @param cls |
541 | * @param message_id Sequence number of the message. | 541 | * Closure. |
542 | * @param data_offset Byte offset of data, only set if @a msg has a type | 542 | * @param slave_key |
543 | * #GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA | 543 | * Public key of the slave sending the message. |
544 | * @param flags OR'ed GNUNET_PSYC_MessageFlags | 544 | * Only set for channel master. |
545 | * @param message_id | ||
546 | * Sequence number of the message. | ||
547 | * @param flags | ||
548 | * OR'ed GNUNET_PSYC_MessageFlags | ||
549 | * @param data_offset | ||
550 | * Byte offset of data, only set if @a msg has a type | ||
551 | * #GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA | ||
545 | * @param msg Message part, one of the following types: | 552 | * @param msg Message part, one of the following types: |
546 | * - #GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_HEADER | 553 | * - #GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_HEADER |
547 | * - #GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD | 554 | * - #GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD |
@@ -552,19 +559,24 @@ typedef void | |||
552 | */ | 559 | */ |
553 | typedef void | 560 | typedef void |
554 | (*GNUNET_PSYC_MessagePartCallback) (void *cls, | 561 | (*GNUNET_PSYC_MessagePartCallback) (void *cls, |
562 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | ||
555 | uint64_t message_id, | 563 | uint64_t message_id, |
556 | uint64_t data_offset, | ||
557 | uint32_t flags, | 564 | uint32_t flags, |
565 | uint64_t data_offset, | ||
558 | const struct GNUNET_MessageHeader *msg); | 566 | const struct GNUNET_MessageHeader *msg); |
559 | 567 | ||
560 | 568 | ||
561 | /** | 569 | /** |
562 | * Method called from PSYC upon receiving a join request. | 570 | * Method called from PSYC upon receiving a join request. |
563 | * | 571 | * |
564 | * @param cls Closure. | 572 | * @param cls |
565 | * @param slave_key Public key of the slave requesting join. | 573 | * Closure. |
566 | * @param join_msg Join message sent along with the request. | 574 | * @param slave_key |
567 | * @param jh Join handle to use with GNUNET_PSYC_join_decision() | 575 | * Public key of the slave requesting join. |
576 | * @param join_msg | ||
577 | * Join message sent along with the request. | ||
578 | * @param jh | ||
579 | * Join handle to use with GNUNET_PSYC_join_decision() | ||
568 | */ | 580 | */ |
569 | typedef void | 581 | typedef void |
570 | (*GNUNET_PSYC_JoinRequestCallback) (void *cls, | 582 | (*GNUNET_PSYC_JoinRequestCallback) (void *cls, |
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c index afd5102ec..a9b15cef1 100644 --- a/src/multicast/gnunet-service-multicast.c +++ b/src/multicast/gnunet-service-multicast.c | |||
@@ -788,9 +788,11 @@ client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
788 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 788 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
789 | "%p Client connected to group %s..\n", | 789 | "%p Client connected to group %s..\n", |
790 | mem, GNUNET_h2s (&grp->pub_key_hash)); | 790 | mem, GNUNET_h2s (&grp->pub_key_hash)); |
791 | char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key); | ||
791 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 792 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
792 | "%p ..as member %s.\n", | 793 | "%p ..as member %s (%s).\n", |
793 | mem, GNUNET_h2s (&mem_pub_key_hash)); | 794 | mem, GNUNET_h2s (&mem->pub_key_hash), str); |
795 | GNUNET_free (str); | ||
794 | 796 | ||
795 | GNUNET_SERVER_client_set_user_context (client, grp); | 797 | GNUNET_SERVER_client_set_user_context (client, grp); |
796 | 798 | ||
@@ -833,6 +835,7 @@ client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
833 | if (0 < join_msg_size) | 835 | if (0 < join_msg_size) |
834 | memcpy (&req[1], join_msg, join_msg_size); | 836 | memcpy (&req[1], join_msg, join_msg_size); |
835 | 837 | ||
838 | req->member_key = mem->pub_key; | ||
836 | req->purpose.size = htonl (msg_size | 839 | req->purpose.size = htonl (msg_size |
837 | - sizeof (req->header) | 840 | - sizeof (req->header) |
838 | - sizeof (req->reserved) | 841 | - sizeof (req->reserved) |
@@ -1000,6 +1003,7 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, | |||
1000 | /* FIXME: yucky, should use separate message structs for P2P and CS! */ | 1003 | /* FIXME: yucky, should use separate message structs for P2P and CS! */ |
1001 | out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (m); | 1004 | out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (m); |
1002 | 1005 | ||
1006 | out->member_key = mem->pub_key; | ||
1003 | out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id); | 1007 | out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id); |
1004 | out->purpose.size = htonl (ntohs (out->header.size) | 1008 | out->purpose.size = htonl (ntohs (out->header.size) |
1005 | - sizeof (out->header) | 1009 | - sizeof (out->header) |
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index ce663542d..5cd729203 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c | |||
@@ -267,7 +267,7 @@ group_recv_message (void *cls, | |||
267 | "Calling message callback with a message of size %u.\n", | 267 | "Calling message callback with a message of size %u.\n", |
268 | ntohs (mmsg->header.size)); | 268 | ntohs (mmsg->header.size)); |
269 | 269 | ||
270 | if (NULL != grp->message_cb) | 270 | if (GNUNET_YES != grp->is_disconnecting && NULL != grp->message_cb) |
271 | grp->message_cb (grp->cb_cls, mmsg); | 271 | grp->message_cb (grp->cb_cls, mmsg); |
272 | } | 272 | } |
273 | 273 | ||
@@ -848,7 +848,7 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem, | |||
848 | grp->disconnect_cls = part_cls; | 848 | grp->disconnect_cls = part_cls; |
849 | 849 | ||
850 | GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES, | 850 | GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES, |
851 | &member_cleanup, mem); | 851 | member_cleanup, mem); |
852 | } | 852 | } |
853 | 853 | ||
854 | 854 | ||
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 2afc98040..e20b2280e 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -534,8 +534,12 @@ cleanup_slave (struct Slave *slv) | |||
534 | static void | 534 | static void |
535 | cleanup_channel (struct Channel *chn) | 535 | cleanup_channel (struct Channel *chn) |
536 | { | 536 | { |
537 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
538 | "%p Cleaning up channel %s. master? %u\n", | ||
539 | chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master); | ||
537 | message_queue_drop (chn); | 540 | message_queue_drop (chn); |
538 | GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash); | 541 | GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags); |
542 | chn->recv_frags = NULL; | ||
539 | 543 | ||
540 | if (NULL != chn->store_op) | 544 | if (NULL != chn->store_op) |
541 | { | 545 | { |
@@ -565,6 +569,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
565 | 569 | ||
566 | struct Channel * | 570 | struct Channel * |
567 | chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 571 | chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
572 | chn->is_disconnected = GNUNET_YES; | ||
568 | 573 | ||
569 | if (NULL == chn) | 574 | if (NULL == chn) |
570 | { | 575 | { |
@@ -1046,6 +1051,7 @@ client_send_mcast_req (struct Master *mst, | |||
1046 | pmsg->message_id = req->request_id; | 1051 | pmsg->message_id = req->request_id; |
1047 | pmsg->fragment_offset = req->fragment_offset; | 1052 | pmsg->fragment_offset = req->fragment_offset; |
1048 | pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); | 1053 | pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); |
1054 | pmsg->slave_key = req->member_key; | ||
1049 | 1055 | ||
1050 | memcpy (&pmsg[1], &req[1], size - sizeof (*req)); | 1056 | memcpy (&pmsg[1], &req[1], size - sizeof (*req)); |
1051 | client_send_msg (chn, &pmsg->header); | 1057 | client_send_msg (chn, &pmsg->header); |
@@ -1229,7 +1235,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id, | |||
1229 | struct GNUNET_CONTAINER_MultiHashMap | 1235 | struct GNUNET_CONTAINER_MultiHashMap |
1230 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, | 1236 | *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, |
1231 | &chn->pub_key_hash); | 1237 | &chn->pub_key_hash); |
1232 | GNUNET_assert (NULL != chan_msgs); // FIXME | 1238 | GNUNET_assert (NULL != chan_msgs); |
1233 | uint64_t frag_id; | 1239 | uint64_t frag_id; |
1234 | 1240 | ||
1235 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, | 1241 | while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, |
@@ -1382,7 +1388,7 @@ message_queue_run (struct Channel *chn) | |||
1382 | /* Check if there's a missing message before the current one */ | 1388 | /* Check if there's a missing message before the current one */ |
1383 | if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) | 1389 | if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) |
1384 | { | 1390 | { |
1385 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n"); | 1391 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn); |
1386 | 1392 | ||
1387 | if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) | 1393 | if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) |
1388 | && (chn->max_message_id != msg_id - 1 | 1394 | && (chn->max_message_id != msg_id - 1 |
@@ -1399,7 +1405,7 @@ message_queue_run (struct Channel *chn) | |||
1399 | } | 1405 | } |
1400 | else | 1406 | else |
1401 | { | 1407 | { |
1402 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n"); | 1408 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn); |
1403 | if (GNUNET_YES != fragq->state_is_modified) | 1409 | if (GNUNET_YES != fragq->state_is_modified) |
1404 | { | 1410 | { |
1405 | if (msg_id - fragq->state_delta != chn->max_state_message_id) | 1411 | if (msg_id - fragq->state_delta != chn->max_state_message_id) |
@@ -1594,12 +1600,12 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id, | |||
1594 | mst->max_group_generation = max_group_generation; | 1600 | mst->max_group_generation = max_group_generation; |
1595 | mst->origin | 1601 | mst->origin |
1596 | = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, | 1602 | = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, |
1597 | &mcast_recv_join_request, | 1603 | mcast_recv_join_request, |
1598 | &mcast_recv_membership_test, | 1604 | mcast_recv_membership_test, |
1599 | &mcast_recv_replay_fragment, | 1605 | mcast_recv_replay_fragment, |
1600 | &mcast_recv_replay_message, | 1606 | mcast_recv_replay_message, |
1601 | &mcast_recv_request, | 1607 | mcast_recv_request, |
1602 | &mcast_recv_message, chn); | 1608 | mcast_recv_message, chn); |
1603 | chn->is_ready = GNUNET_YES; | 1609 | chn->is_ready = GNUNET_YES; |
1604 | } | 1610 | } |
1605 | else | 1611 | else |
@@ -1641,12 +1647,12 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, | |||
1641 | &slv->origin, | 1647 | &slv->origin, |
1642 | slv->relay_count, slv->relays, | 1648 | slv->relay_count, slv->relays, |
1643 | &slv->join_msg->header, | 1649 | &slv->join_msg->header, |
1644 | &mcast_recv_join_request, | 1650 | mcast_recv_join_request, |
1645 | &mcast_recv_join_decision, | 1651 | mcast_recv_join_decision, |
1646 | &mcast_recv_membership_test, | 1652 | mcast_recv_membership_test, |
1647 | &mcast_recv_replay_fragment, | 1653 | mcast_recv_replay_fragment, |
1648 | &mcast_recv_replay_message, | 1654 | mcast_recv_replay_message, |
1649 | &mcast_recv_message, chn); | 1655 | mcast_recv_message, chn); |
1650 | if (NULL != slv->join_msg) | 1656 | if (NULL != slv->join_msg) |
1651 | { | 1657 | { |
1652 | GNUNET_free (slv->join_msg); | 1658 | GNUNET_free (slv->join_msg); |
@@ -2004,7 +2010,7 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
2004 | else if (GNUNET_YES == chn->is_disconnected) | 2010 | else if (GNUNET_YES == chn->is_disconnected) |
2005 | { | 2011 | { |
2006 | /* FIXME: handle partial message (when still in_transmit) */ | 2012 | /* FIXME: handle partial message (when still in_transmit) */ |
2007 | cleanup_channel (chn); | 2013 | return GNUNET_SYSERR; |
2008 | } | 2014 | } |
2009 | return ret; | 2015 | return ret; |
2010 | } | 2016 | } |
@@ -2113,13 +2119,17 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, | |||
2113 | } | 2119 | } |
2114 | else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) | 2120 | else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) |
2115 | { | 2121 | { |
2116 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_modify flag\n", mst); | 2122 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
2123 | "%p master_queue_message: state_delta=%" PRIu64 "\n", | ||
2124 | mst, tmit_msg->id - mst->max_state_message_id); | ||
2117 | pmeth->state_delta = GNUNET_htonll (tmit_msg->id | 2125 | pmeth->state_delta = GNUNET_htonll (tmit_msg->id |
2118 | - mst->max_state_message_id); | 2126 | - mst->max_state_message_id); |
2127 | mst->max_state_message_id = tmit_msg->id; | ||
2119 | } | 2128 | } |
2120 | else | 2129 | else |
2121 | { | 2130 | { |
2122 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_not_modified flag\n", mst); | 2131 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
2132 | "%p master_queue_message: state not modified\n", mst); | ||
2123 | pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); | 2133 | pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); |
2124 | } | 2134 | } |
2125 | 2135 | ||
@@ -2234,7 +2244,9 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
2234 | uint16_t size = ntohs (msg->size); | 2244 | uint16_t size = ntohs (msg->size); |
2235 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) | 2245 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) |
2236 | { | 2246 | { |
2237 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn); | 2247 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
2248 | "%p Message payload too large: %u < %u.\n", | ||
2249 | chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg)); | ||
2238 | GNUNET_break (0); | 2250 | GNUNET_break (0); |
2239 | transmit_cancel (chn, client); | 2251 | transmit_cancel (chn, client); |
2240 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | 2252 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
@@ -2613,7 +2625,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
2613 | masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | 2625 | masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); |
2614 | slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | 2626 | slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); |
2615 | channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | 2627 | channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); |
2616 | recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | 2628 | recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); |
2617 | nc = GNUNET_SERVER_notification_context_create (server, 1); | 2629 | nc = GNUNET_SERVER_notification_context_create (server, 1); |
2618 | GNUNET_SERVER_add_handlers (server, server_handlers); | 2630 | GNUNET_SERVER_add_handlers (server, server_handlers); |
2619 | GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); | 2631 | GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); |
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 4e7979a4d..863eaab88 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -213,8 +213,9 @@ master_message_cb (void *cls, uint64_t message_id, uint32_t flags, | |||
213 | 213 | ||
214 | 214 | ||
215 | void | 215 | void |
216 | master_message_part_cb (void *cls, uint64_t message_id, | 216 | master_message_part_cb (void *cls, |
217 | uint64_t data_offset, uint32_t flags, | 217 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, |
218 | uint64_t message_id, uint32_t flags, uint64_t data_offset, | ||
218 | const struct GNUNET_MessageHeader *msg) | 219 | const struct GNUNET_MessageHeader *msg) |
219 | { | 220 | { |
220 | if (NULL == msg) | 221 | if (NULL == msg) |
@@ -284,8 +285,9 @@ slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, | |||
284 | 285 | ||
285 | 286 | ||
286 | void | 287 | void |
287 | slave_message_part_cb (void *cls, uint64_t message_id, | 288 | slave_message_part_cb (void *cls, |
288 | uint64_t data_offset, uint32_t flags, | 289 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, |
290 | uint64_t message_id, uint32_t flags, uint64_t data_offset, | ||
289 | const struct GNUNET_MessageHeader *msg) | 291 | const struct GNUNET_MessageHeader *msg) |
290 | { | 292 | { |
291 | if (NULL == msg) | 293 | if (NULL == msg) |
@@ -453,9 +455,9 @@ slave_history_replay () | |||
453 | test = TEST_SLAVE_HISTORY_REPLAY; | 455 | test = TEST_SLAVE_HISTORY_REPLAY; |
454 | GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "", | 456 | GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "", |
455 | GNUNET_PSYC_HISTORY_REPLAY_LOCAL, | 457 | GNUNET_PSYC_HISTORY_REPLAY_LOCAL, |
456 | &slave_message_cb, | 458 | slave_message_cb, |
457 | &slave_message_part_cb, | 459 | slave_message_part_cb, |
458 | &slave_history_replay_result, NULL); | 460 | slave_history_replay_result, NULL); |
459 | } | 461 | } |
460 | 462 | ||
461 | 463 | ||
@@ -481,9 +483,9 @@ master_history_replay () | |||
481 | test = TEST_MASTER_HISTORY_REPLAY; | 483 | test = TEST_MASTER_HISTORY_REPLAY; |
482 | GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "", | 484 | GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "", |
483 | GNUNET_PSYC_HISTORY_REPLAY_LOCAL, | 485 | GNUNET_PSYC_HISTORY_REPLAY_LOCAL, |
484 | &master_message_cb, | 486 | master_message_cb, |
485 | &master_message_part_cb, | 487 | master_message_part_cb, |
486 | &master_history_replay_result, NULL); | 488 | master_history_replay_result, NULL); |
487 | } | 489 | } |
488 | 490 | ||
489 | 491 | ||
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c index 1f9de54f8..01eb5d707 100644 --- a/src/psycstore/gnunet-service-psycstore.c +++ b/src/psycstore/gnunet-service-psycstore.c | |||
@@ -508,8 +508,10 @@ struct StateModifyClosure | |||
508 | 508 | ||
509 | 509 | ||
510 | static void | 510 | static void |
511 | recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset, | 511 | recv_state_message_part (void *cls, |
512 | uint32_t flags, const struct GNUNET_MessageHeader *msg) | 512 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, |
513 | uint64_t message_id, uint32_t flags, uint64_t data_offset, | ||
514 | const struct GNUNET_MessageHeader *msg) | ||
513 | { | 515 | { |
514 | struct StateModifyClosure *scls = cls; | 516 | struct StateModifyClosure *scls = cls; |
515 | uint16_t psize; | 517 | uint16_t psize; |
@@ -618,7 +620,7 @@ recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg, | |||
618 | 620 | ||
619 | if (NULL == scls->recv) | 621 | if (NULL == scls->recv) |
620 | { | 622 | { |
621 | scls->recv = GNUNET_PSYC_receive_create (NULL, &recv_state_message_part, | 623 | scls->recv = GNUNET_PSYC_receive_create (NULL, recv_state_message_part, |
622 | scls); | 624 | scls); |
623 | } | 625 | } |
624 | 626 | ||
@@ -660,7 +662,7 @@ handle_state_modify (void *cls, | |||
660 | { | 662 | { |
661 | ret = db->message_get (db->cls, &req->channel_key, | 663 | ret = db->message_get (db->cls, &req->channel_key, |
662 | message_id, message_id, | 664 | message_id, message_id, |
663 | &ret_frags, &recv_state_fragment, &scls); | 665 | &ret_frags, recv_state_fragment, &scls); |
664 | if (GNUNET_OK != ret) | 666 | if (GNUNET_OK != ret) |
665 | { | 667 | { |
666 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 668 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c index 1bf14644b..8f9a3ef63 100644 --- a/src/psycstore/plugin_psycstore_sqlite.c +++ b/src/psycstore/plugin_psycstore_sqlite.c | |||
@@ -598,7 +598,7 @@ database_setup (struct Plugin *plugin) | |||
598 | "SELECT name, value_current\n" | 598 | "SELECT name, value_current\n" |
599 | "FROM state\n" | 599 | "FROM state\n" |
600 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" | 600 | "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" |
601 | " AND (name = ? OR name LIKE ?);", | 601 | " AND (name = ? OR substr(name, 1, ?) = ? || '_');", |
602 | &plugin->select_state_prefix); | 602 | &plugin->select_state_prefix); |
603 | 603 | ||
604 | sql_prepare (plugin->dbh, | 604 | sql_prepare (plugin->dbh, |
@@ -1763,16 +1763,12 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_ | |||
1763 | int ret = GNUNET_SYSERR; | 1763 | int ret = GNUNET_SYSERR; |
1764 | sqlite3_stmt *stmt = plugin->select_state_prefix; | 1764 | sqlite3_stmt *stmt = plugin->select_state_prefix; |
1765 | size_t name_len = strlen (name); | 1765 | size_t name_len = strlen (name); |
1766 | char *name_prefix; | ||
1767 | 1766 | ||
1768 | GNUNET_asprintf (&name_prefix, | ||
1769 | "%s_%%", | ||
1770 | name); | ||
1771 | if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, | 1767 | if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, |
1772 | sizeof (*channel_key), SQLITE_STATIC) | 1768 | sizeof (*channel_key), SQLITE_STATIC) |
1773 | || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC) | 1769 | || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC) |
1774 | || SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2, | 1770 | || SQLITE_OK != sqlite3_bind_int (stmt, 3, name_len + 1) |
1775 | SQLITE_STATIC)) | 1771 | || SQLITE_OK != sqlite3_bind_text (stmt, 4, name, name_len, SQLITE_STATIC)) |
1776 | { | 1772 | { |
1777 | LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | 1773 | LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, |
1778 | "sqlite3_bind"); | 1774 | "sqlite3_bind"); |
@@ -1808,7 +1804,6 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_ | |||
1808 | LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, | 1804 | LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, |
1809 | "sqlite3_reset"); | 1805 | "sqlite3_reset"); |
1810 | } | 1806 | } |
1811 | GNUNET_free (name_prefix); | ||
1812 | return ret; | 1807 | return ret; |
1813 | } | 1808 | } |
1814 | 1809 | ||
diff --git a/src/psycstore/psyc_util_lib.c b/src/psycstore/psyc_util_lib.c index 7b13ac57f..e45bcafb3 100644 --- a/src/psycstore/psyc_util_lib.c +++ b/src/psycstore/psyc_util_lib.c | |||
@@ -401,12 +401,17 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
401 | static void | 401 | static void |
402 | transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) | 402 | transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) |
403 | { | 403 | { |
404 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | 404 | int notify_ret = GNUNET_YES; |
405 | uint16_t data_size = 0; | ||
405 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | 406 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; |
406 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; | 407 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; |
407 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | 408 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); |
408 | 409 | ||
409 | int notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); | 410 | if (NULL != tmit->notify_data) |
411 | { | ||
412 | data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | ||
413 | notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); | ||
414 | } | ||
410 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 415 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
411 | "transmit_data (ret: %d, size: %u): %.*s\n", | 416 | "transmit_data (ret: %d, size: %u): %.*s\n", |
412 | notify_ret, data_size, data_size, &msg[1]); | 417 | notify_ret, data_size, data_size, &msg[1]); |
@@ -461,10 +466,11 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
461 | static void | 466 | static void |
462 | transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | 467 | transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) |
463 | { | 468 | { |
464 | uint16_t max_data_size, data_size; | 469 | uint16_t max_data_size = 0; |
470 | uint16_t data_size = 0; | ||
465 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | 471 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; |
466 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; | 472 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; |
467 | int notify_ret; | 473 | int notify_ret = GNUNET_YES; |
468 | 474 | ||
469 | switch (tmit->state) | 475 | switch (tmit->state) |
470 | { | 476 | { |
@@ -472,11 +478,16 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
472 | { | 478 | { |
473 | struct GNUNET_PSYC_MessageModifier *mod | 479 | struct GNUNET_PSYC_MessageModifier *mod |
474 | = (struct GNUNET_PSYC_MessageModifier *) msg; | 480 | = (struct GNUNET_PSYC_MessageModifier *) msg; |
475 | max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; | ||
476 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | 481 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); |
477 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); | 482 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); |
478 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], | 483 | |
479 | &mod->oper, &mod->value_size); | 484 | if (NULL != tmit->notify_mod) |
485 | { | ||
486 | max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; | ||
487 | data_size = max_data_size; | ||
488 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], | ||
489 | &mod->oper, &mod->value_size); | ||
490 | } | ||
480 | 491 | ||
481 | mod->name_size = strnlen ((char *) &mod[1], data_size) + 1; | 492 | mod->name_size = strnlen ((char *) &mod[1], data_size) + 1; |
482 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 493 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -498,11 +509,16 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
498 | } | 509 | } |
499 | case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: | 510 | case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT: |
500 | { | 511 | { |
501 | max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; | ||
502 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | 512 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); |
503 | msg->size = sizeof (struct GNUNET_MessageHeader); | 513 | msg->size = sizeof (struct GNUNET_MessageHeader); |
504 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, | 514 | |
505 | &data_size, &msg[1], NULL, NULL); | 515 | if (NULL != tmit->notify_mod) |
516 | { | ||
517 | max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; | ||
518 | data_size = max_data_size; | ||
519 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, | ||
520 | &data_size, &msg[1], NULL, NULL); | ||
521 | } | ||
506 | tmit->mod_value_remaining -= data_size; | 522 | tmit->mod_value_remaining -= data_size; |
507 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 523 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
508 | "transmit_mod (ret: %d, size: %u): %.*s\n", | 524 | "transmit_mod (ret: %d, size: %u): %.*s\n", |
@@ -847,7 +863,8 @@ static void | |||
847 | recv_error (struct GNUNET_PSYC_ReceiveHandle *recv) | 863 | recv_error (struct GNUNET_PSYC_ReceiveHandle *recv) |
848 | { | 864 | { |
849 | if (NULL != recv->message_part_cb) | 865 | if (NULL != recv->message_part_cb) |
850 | recv->message_part_cb (recv->cb_cls, recv->message_id, 0, recv->flags, NULL); | 866 | recv->message_part_cb (recv->cb_cls, NULL, recv->message_id, recv->flags, |
867 | 0, NULL); | ||
851 | 868 | ||
852 | if (NULL != recv->message_cb) | 869 | if (NULL != recv->message_cb) |
853 | recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL); | 870 | recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL); |
@@ -1066,8 +1083,10 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, | |||
1066 | } | 1083 | } |
1067 | 1084 | ||
1068 | if (NULL != recv->message_part_cb) | 1085 | if (NULL != recv->message_part_cb) |
1069 | recv->message_part_cb (recv->cb_cls, recv->message_id, 0, // FIXME: data_offset | 1086 | recv->message_part_cb (recv->cb_cls, &recv->slave_key, |
1070 | recv->flags, pmsg); | 1087 | recv->message_id, recv->flags, |
1088 | 0, // FIXME: data_offset | ||
1089 | pmsg); | ||
1071 | 1090 | ||
1072 | switch (ptype) | 1091 | switch (ptype) |
1073 | { | 1092 | { |
@@ -1144,8 +1163,10 @@ struct ParseMessageClosure | |||
1144 | 1163 | ||
1145 | 1164 | ||
1146 | static void | 1165 | static void |
1147 | parse_message_part_cb (void *cls, uint64_t message_id, uint64_t data_offset, | 1166 | parse_message_part_cb (void *cls, |
1148 | uint32_t flags, const struct GNUNET_MessageHeader *msg) | 1167 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, |
1168 | uint64_t message_id, uint32_t flags, uint64_t data_offset, | ||
1169 | const struct GNUNET_MessageHeader *msg) | ||
1149 | { | 1170 | { |
1150 | struct ParseMessageClosure *pmc = cls; | 1171 | struct ParseMessageClosure *pmc = cls; |
1151 | if (NULL == msg) | 1172 | if (NULL == msg) |
@@ -1230,7 +1251,7 @@ GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_Message *msg, | |||
1230 | memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg)); | 1251 | memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg)); |
1231 | 1252 | ||
1232 | struct GNUNET_PSYC_ReceiveHandle * | 1253 | struct GNUNET_PSYC_ReceiveHandle * |
1233 | recv = GNUNET_PSYC_receive_create (NULL, &parse_message_part_cb, &cls); | 1254 | recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls); |
1234 | GNUNET_PSYC_receive_message (recv, pmsg); | 1255 | GNUNET_PSYC_receive_message (recv, pmsg); |
1235 | GNUNET_PSYC_receive_destroy (recv); | 1256 | GNUNET_PSYC_receive_destroy (recv); |
1236 | GNUNET_free (pmsg); | 1257 | GNUNET_free (pmsg); |
diff --git a/src/util/client_manager.c b/src/util/client_manager.c index d6e5f155b..98cf5b1ad 100644 --- a/src/util/client_manager.c +++ b/src/util/client_manager.c | |||
@@ -191,6 +191,10 @@ recv_message (void *cls, const struct GNUNET_MessageHeader *msg) | |||
191 | size = ntohs (msg->size); | 191 | size = ntohs (msg->size); |
192 | /* FIXME: decrease reconnect_delay gradually after a successful reconnection */ | 192 | /* FIXME: decrease reconnect_delay gradually after a successful reconnection */ |
193 | } | 193 | } |
194 | else /* disconnected */ | ||
195 | { | ||
196 | mgr->client_tmit = NULL; | ||
197 | } | ||
194 | 198 | ||
195 | size_t i = 0; | 199 | size_t i = 0; |
196 | while (NULL != mgr->handlers[i].callback) | 200 | while (NULL != mgr->handlers[i].callback) |