aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-09-05 23:44:08 +0000
committerGabor X Toth <*@tg-x.net>2015-09-05 23:44:08 +0000
commit878b09fe6521df37c2d39f884a6c892b370473a5 (patch)
treeaccd38e3a5fb995546508ffb081d0f9f9fc850a0 /src/psyc
parent5e4dfeb7fb48a506f25af0c206ee65a5f7de7f58 (diff)
downloadgnunet-878b09fe6521df37c2d39f884a6c892b370473a5.tar.gz
gnunet-878b09fe6521df37c2d39f884a6c892b370473a5.zip
multicast, psyc, psycstore, client_manager fixes
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c54
-rw-r--r--src/psyc/test_psyc.c22
2 files changed, 45 insertions, 31 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 2afc98040..e20b2280e 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -534,8 +534,12 @@ cleanup_slave (struct Slave *slv)
534static void 534static void
535cleanup_channel (struct Channel *chn) 535cleanup_channel (struct Channel *chn)
536{ 536{
537 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
538 "%p Cleaning up channel %s. master? %u\n",
539 chn, GNUNET_h2s (&chn->pub_key_hash), chn->is_master);
537 message_queue_drop (chn); 540 message_queue_drop (chn);
538 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash); 541 GNUNET_CONTAINER_multihashmap_destroy (chn->recv_frags);
542 chn->recv_frags = NULL;
539 543
540 if (NULL != chn->store_op) 544 if (NULL != chn->store_op)
541 { 545 {
@@ -565,6 +569,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
565 569
566 struct Channel * 570 struct Channel *
567 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); 571 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
572 chn->is_disconnected = GNUNET_YES;
568 573
569 if (NULL == chn) 574 if (NULL == chn)
570 { 575 {
@@ -1046,6 +1051,7 @@ client_send_mcast_req (struct Master *mst,
1046 pmsg->message_id = req->request_id; 1051 pmsg->message_id = req->request_id;
1047 pmsg->fragment_offset = req->fragment_offset; 1052 pmsg->fragment_offset = req->fragment_offset;
1048 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); 1053 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
1054 pmsg->slave_key = req->member_key;
1049 1055
1050 memcpy (&pmsg[1], &req[1], size - sizeof (*req)); 1056 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1051 client_send_msg (chn, &pmsg->header); 1057 client_send_msg (chn, &pmsg->header);
@@ -1229,7 +1235,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1229 struct GNUNET_CONTAINER_MultiHashMap 1235 struct GNUNET_CONTAINER_MultiHashMap
1230 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, 1236 *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
1231 &chn->pub_key_hash); 1237 &chn->pub_key_hash);
1232 GNUNET_assert (NULL != chan_msgs); // FIXME 1238 GNUNET_assert (NULL != chan_msgs);
1233 uint64_t frag_id; 1239 uint64_t frag_id;
1234 1240
1235 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, 1241 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
@@ -1382,7 +1388,7 @@ message_queue_run (struct Channel *chn)
1382 /* Check if there's a missing message before the current one */ 1388 /* Check if there's a missing message before the current one */
1383 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) 1389 if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
1384 { 1390 {
1385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n"); 1391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state NOT modified\n", chn);
1386 1392
1387 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) 1393 if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
1388 && (chn->max_message_id != msg_id - 1 1394 && (chn->max_message_id != msg_id - 1
@@ -1399,7 +1405,7 @@ message_queue_run (struct Channel *chn)
1399 } 1405 }
1400 else 1406 else
1401 { 1407 {
1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n"); 1408 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p state modified\n", chn);
1403 if (GNUNET_YES != fragq->state_is_modified) 1409 if (GNUNET_YES != fragq->state_is_modified)
1404 { 1410 {
1405 if (msg_id - fragq->state_delta != chn->max_state_message_id) 1411 if (msg_id - fragq->state_delta != chn->max_state_message_id)
@@ -1594,12 +1600,12 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1594 mst->max_group_generation = max_group_generation; 1600 mst->max_group_generation = max_group_generation;
1595 mst->origin 1601 mst->origin
1596 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, 1602 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
1597 &mcast_recv_join_request, 1603 mcast_recv_join_request,
1598 &mcast_recv_membership_test, 1604 mcast_recv_membership_test,
1599 &mcast_recv_replay_fragment, 1605 mcast_recv_replay_fragment,
1600 &mcast_recv_replay_message, 1606 mcast_recv_replay_message,
1601 &mcast_recv_request, 1607 mcast_recv_request,
1602 &mcast_recv_message, chn); 1608 mcast_recv_message, chn);
1603 chn->is_ready = GNUNET_YES; 1609 chn->is_ready = GNUNET_YES;
1604 } 1610 }
1605 else 1611 else
@@ -1641,12 +1647,12 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1641 &slv->origin, 1647 &slv->origin,
1642 slv->relay_count, slv->relays, 1648 slv->relay_count, slv->relays,
1643 &slv->join_msg->header, 1649 &slv->join_msg->header,
1644 &mcast_recv_join_request, 1650 mcast_recv_join_request,
1645 &mcast_recv_join_decision, 1651 mcast_recv_join_decision,
1646 &mcast_recv_membership_test, 1652 mcast_recv_membership_test,
1647 &mcast_recv_replay_fragment, 1653 mcast_recv_replay_fragment,
1648 &mcast_recv_replay_message, 1654 mcast_recv_replay_message,
1649 &mcast_recv_message, chn); 1655 mcast_recv_message, chn);
1650 if (NULL != slv->join_msg) 1656 if (NULL != slv->join_msg)
1651 { 1657 {
1652 GNUNET_free (slv->join_msg); 1658 GNUNET_free (slv->join_msg);
@@ -2004,7 +2010,7 @@ transmit_notify (void *cls, size_t *data_size, void *data)
2004 else if (GNUNET_YES == chn->is_disconnected) 2010 else if (GNUNET_YES == chn->is_disconnected)
2005 { 2011 {
2006 /* FIXME: handle partial message (when still in_transmit) */ 2012 /* FIXME: handle partial message (when still in_transmit) */
2007 cleanup_channel (chn); 2013 return GNUNET_SYSERR;
2008 } 2014 }
2009 return ret; 2015 return ret;
2010} 2016}
@@ -2113,13 +2119,17 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
2113 } 2119 }
2114 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) 2120 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2115 { 2121 {
2116 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_modify flag\n", mst); 2122 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2123 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2124 mst, tmit_msg->id - mst->max_state_message_id);
2117 pmeth->state_delta = GNUNET_htonll (tmit_msg->id 2125 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
2118 - mst->max_state_message_id); 2126 - mst->max_state_message_id);
2127 mst->max_state_message_id = tmit_msg->id;
2119 } 2128 }
2120 else 2129 else
2121 { 2130 {
2122 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message: setting state_not_modified flag\n", mst); 2131 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2132 "%p master_queue_message: state not modified\n", mst);
2123 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); 2133 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2124 } 2134 }
2125 2135
@@ -2234,7 +2244,9 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
2234 uint16_t size = ntohs (msg->size); 2244 uint16_t size = ntohs (msg->size);
2235 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) 2245 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
2236 { 2246 {
2237 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn); 2247 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2248 "%p Message payload too large: %u < %u.\n",
2249 chn, GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, size - sizeof (*msg));
2238 GNUNET_break (0); 2250 GNUNET_break (0);
2239 transmit_cancel (chn, client); 2251 transmit_cancel (chn, client);
2240 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2252 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
@@ -2613,7 +2625,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
2613 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 2625 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2614 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 2626 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2615 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 2627 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2616 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 2628 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2617 nc = GNUNET_SERVER_notification_context_create (server, 1); 2629 nc = GNUNET_SERVER_notification_context_create (server, 1);
2618 GNUNET_SERVER_add_handlers (server, server_handlers); 2630 GNUNET_SERVER_add_handlers (server, server_handlers);
2619 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); 2631 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 4e7979a4d..863eaab88 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -213,8 +213,9 @@ master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
213 213
214 214
215void 215void
216master_message_part_cb (void *cls, uint64_t message_id, 216master_message_part_cb (void *cls,
217 uint64_t data_offset, uint32_t flags, 217 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
218 uint64_t message_id, uint32_t flags, uint64_t data_offset,
218 const struct GNUNET_MessageHeader *msg) 219 const struct GNUNET_MessageHeader *msg)
219{ 220{
220 if (NULL == msg) 221 if (NULL == msg)
@@ -284,8 +285,9 @@ slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
284 285
285 286
286void 287void
287slave_message_part_cb (void *cls, uint64_t message_id, 288slave_message_part_cb (void *cls,
288 uint64_t data_offset, uint32_t flags, 289 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
290 uint64_t message_id, uint32_t flags, uint64_t data_offset,
289 const struct GNUNET_MessageHeader *msg) 291 const struct GNUNET_MessageHeader *msg)
290{ 292{
291 if (NULL == msg) 293 if (NULL == msg)
@@ -453,9 +455,9 @@ slave_history_replay ()
453 test = TEST_SLAVE_HISTORY_REPLAY; 455 test = TEST_SLAVE_HISTORY_REPLAY;
454 GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "", 456 GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "",
455 GNUNET_PSYC_HISTORY_REPLAY_LOCAL, 457 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
456 &slave_message_cb, 458 slave_message_cb,
457 &slave_message_part_cb, 459 slave_message_part_cb,
458 &slave_history_replay_result, NULL); 460 slave_history_replay_result, NULL);
459} 461}
460 462
461 463
@@ -481,9 +483,9 @@ master_history_replay ()
481 test = TEST_MASTER_HISTORY_REPLAY; 483 test = TEST_MASTER_HISTORY_REPLAY;
482 GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "", 484 GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "",
483 GNUNET_PSYC_HISTORY_REPLAY_LOCAL, 485 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
484 &master_message_cb, 486 master_message_cb,
485 &master_message_part_cb, 487 master_message_part_cb,
486 &master_history_replay_result, NULL); 488 master_history_replay_result, NULL);
487} 489}
488 490
489 491