aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorxrs <xrs@mail36.net>2018-01-02 17:12:39 +0100
committerxrs <xrs@mail36.net>2018-01-02 17:12:39 +0100
commit3611a5295f95fad5d9e1fdb3866e7db9ecf8f47e (patch)
treedb191de206319d924f1bb49e65cdf8a9612ba85c /src/psyc
parentcc90f1df695ccee3d35fe00fa179a856e4128009 (diff)
parent61f532f18450e0d7c72f0c17f4a20b5854cf53bf (diff)
downloadgnunet-3611a5295f95fad5d9e1fdb3866e7db9ecf8f47e.tar.gz
gnunet-3611a5295f95fad5d9e1fdb3866e7db9ecf8f47e.zip
Merge branch 'master' of ssh://gnunet.org/gnunet
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c131
-rw-r--r--src/psyc/psyc_api.c103
-rw-r--r--src/psyc/test_psyc.c28
-rw-r--r--src/psyc/test_psyc.conf54
4 files changed, 182 insertions, 134 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 73a3ae4ee..cf161435a 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -279,7 +279,7 @@ struct Channel
279 * Is the client disconnected? 279 * Is the client disconnected?
280 * #GNUNET_YES or #GNUNET_NO 280 * #GNUNET_YES or #GNUNET_NO
281 */ 281 */
282 uint8_t is_disconnected; 282 uint8_t is_disconnecting;
283 283
284 /** 284 /**
285 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)? 285 * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
@@ -508,8 +508,6 @@ cleanup_master (struct Master *mst)
508{ 508{
509 struct Channel *chn = &mst->channel; 509 struct Channel *chn = &mst->channel;
510 510
511 if (NULL != mst->origin)
512 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
513 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); 511 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
514 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst); 512 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
515} 513}
@@ -546,11 +544,6 @@ cleanup_slave (struct Slave *slv)
546 GNUNET_free (slv->relays); 544 GNUNET_free (slv->relays);
547 slv->relays = NULL; 545 slv->relays = NULL;
548 } 546 }
549 if (NULL != slv->member)
550 {
551 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
552 slv->member = NULL;
553 }
554 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); 547 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
555} 548}
556 549
@@ -603,15 +596,16 @@ client_notify_disconnect (void *cls,
603 if (NULL == chn) 596 if (NULL == chn)
604 { 597 {
605 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606 "%p User context is NULL in client_disconnect()\n", 599 "%p User context is NULL in client_notify_disconnect ()\n",
607 chn); 600 chn);
608 GNUNET_break (0); 601 GNUNET_break (0);
609 return; 602 return;
610 } 603 }
611 604
612 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 605 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
613 "%p Client (%s) disconnected from channel %s\n", 606 "%p Client %p (%s) disconnected from channel %s\n",
614 chn, 607 chn,
608 client,
615 (GNUNET_YES == chn->is_master) ? "master" : "slave", 609 (GNUNET_YES == chn->is_master) ? "master" : "slave",
616 GNUNET_h2s (&chn->pub_key_hash)); 610 GNUNET_h2s (&chn->pub_key_hash));
617 611
@@ -645,15 +639,8 @@ client_notify_disconnect (void *cls,
645 chn, 639 chn,
646 (GNUNET_YES == chn->is_master) ? "master" : "slave", 640 (GNUNET_YES == chn->is_master) ? "master" : "slave",
647 GNUNET_h2s (&chn->pub_key_hash)); 641 GNUNET_h2s (&chn->pub_key_hash));
648 chn->is_disconnected = GNUNET_YES; 642 chn->is_disconnecting = GNUNET_YES;
649 if (NULL != chn->tmit_head) 643 cleanup_channel (chn);
650 { /* Send pending messages to multicast before cleanup. */
651 transmit_message (chn);
652 }
653 else
654 {
655 cleanup_channel (chn);
656 }
657 } 644 }
658} 645}
659 646
@@ -688,7 +675,7 @@ client_send_msg (const struct Channel *chn,
688 const struct GNUNET_MessageHeader *msg) 675 const struct GNUNET_MessageHeader *msg)
689{ 676{
690 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 677 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
691 "%p Sending message to clients.\n", 678 "Sending message to clients of channel %p.\n",
692 chn); 679 chn);
693 680
694 struct ClientList *cli = chn->clients_head; 681 struct ClientList *cli = chn->clients_head;
@@ -699,7 +686,6 @@ client_send_msg (const struct Channel *chn,
699 686
700 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client), 687 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
701 env); 688 env);
702
703 cli = cli->next; 689 cli = cli->next;
704 } 690 }
705} 691}
@@ -734,7 +720,7 @@ client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
734 GNUNET_memcpy (&res[1], data, data_size); 720 GNUNET_memcpy (&res[1], data, data_size);
735 721
736 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 722 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
737 "%p Sending result to client for operation #%" PRIu64 ": %" PRId64 " (size: %u)\n", 723 "%p Sending result to client for OP ID %" PRIu64 ": %" PRId64 " (size: %u)\n",
738 client, 724 client,
739 GNUNET_ntohll (op_id), 725 GNUNET_ntohll (op_id),
740 result_code, 726 result_code,
@@ -1202,12 +1188,12 @@ fragment_queue_insert (struct Channel *chn,
1202 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype 1188 else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
1203 || frag_offset == fragq->header_size) 1189 || frag_offset == fragq->header_size)
1204 { /* header is now complete */ 1190 { /* header is now complete */
1205 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1191 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1206 "%p Header of message %" PRIu64 " is complete.\n", 1192 "%p Header of message %" PRIu64 " is complete.\n",
1207 chn, 1193 chn,
1208 GNUNET_ntohll (mmsg->message_id)); 1194 GNUNET_ntohll (mmsg->message_id));
1209 1195
1210 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1196 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1211 "%p Adding message %" PRIu64 " to queue.\n", 1197 "%p Adding message %" PRIu64 " to queue.\n",
1212 chn, 1198 chn,
1213 GNUNET_ntohll (mmsg->message_id)); 1199 GNUNET_ntohll (mmsg->message_id));
@@ -1215,7 +1201,7 @@ fragment_queue_insert (struct Channel *chn,
1215 } 1201 }
1216 else 1202 else
1217 { 1203 {
1218 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1219 "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n", 1205 "%p Header of message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1220 chn, 1206 chn,
1221 GNUNET_ntohll (mmsg->message_id), 1207 GNUNET_ntohll (mmsg->message_id),
@@ -1230,7 +1216,7 @@ fragment_queue_insert (struct Channel *chn,
1230 if (frag_offset == fragq->size) 1216 if (frag_offset == fragq->size)
1231 fragq->state = MSG_FRAG_STATE_END; 1217 fragq->state = MSG_FRAG_STATE_END;
1232 else 1218 else
1233 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1219 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1234 "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n", 1220 "%p Message %" PRIu64 " is NOT complete yet: %" PRIu64 " != %" PRIu64 "\n",
1235 chn, 1221 chn,
1236 GNUNET_ntohll (mmsg->message_id), 1222 GNUNET_ntohll (mmsg->message_id),
@@ -1285,7 +1271,7 @@ static void
1285fragment_queue_run (struct Channel *chn, uint64_t msg_id, 1271fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1286 struct FragmentQueue *fragq, uint8_t drop) 1272 struct FragmentQueue *fragq, uint8_t drop)
1287{ 1273{
1288 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1274 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1289 "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n", 1275 "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n",
1290 chn, 1276 chn,
1291 msg_id, 1277 msg_id,
@@ -1413,7 +1399,7 @@ store_recv_state_modify_result (void *cls, int64_t result,
1413static uint64_t 1399static uint64_t
1414message_queue_run (struct Channel *chn) 1400message_queue_run (struct Channel *chn)
1415{ 1401{
1416 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1417 "%p Running message queue.\n", chn); 1403 "%p Running message queue.\n", chn);
1418 uint64_t n = 0; 1404 uint64_t n = 0;
1419 uint64_t msg_id; 1405 uint64_t msg_id;
@@ -1421,7 +1407,7 @@ message_queue_run (struct Channel *chn)
1421 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, 1407 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
1422 &msg_id)) 1408 &msg_id))
1423 { 1409 {
1424 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1410 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1425 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id); 1411 "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
1426 struct GNUNET_HashCode msg_id_hash; 1412 struct GNUNET_HashCode msg_id_hash;
1427 hash_key_from_hll (&msg_id_hash, msg_id); 1413 hash_key_from_hll (&msg_id_hash, msg_id);
@@ -1431,7 +1417,7 @@ message_queue_run (struct Channel *chn)
1431 1417
1432 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER) 1418 if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
1433 { 1419 {
1434 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1420 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1435 "%p No fragq (%p) or header not complete.\n", 1421 "%p No fragq (%p) or header not complete.\n",
1436 chn, fragq); 1422 chn, fragq);
1437 break; 1423 break;
@@ -1453,7 +1439,7 @@ message_queue_run (struct Channel *chn)
1453 && (chn->max_message_id != msg_id - 1 1439 && (chn->max_message_id != msg_id - 1
1454 && chn->max_message_id != msg_id)) 1440 && chn->max_message_id != msg_id))
1455 { 1441 {
1456 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1457 "%p Out of order message. " 1443 "%p Out of order message. "
1458 "(%" PRIu64 " != %" PRIu64 " - 1)\n", 1444 "(%" PRIu64 " != %" PRIu64 " - 1)\n",
1459 chn, chn->max_message_id, msg_id); 1445 chn, chn->max_message_id, msg_id);
@@ -1469,7 +1455,7 @@ message_queue_run (struct Channel *chn)
1469 { 1455 {
1470 if (msg_id - fragq->state_delta != chn->max_state_message_id) 1456 if (msg_id - fragq->state_delta != chn->max_state_message_id)
1471 { 1457 {
1472 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1458 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1473 "%p Out of order stateful message. " 1459 "%p Out of order stateful message. "
1474 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", 1460 "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
1475 chn, msg_id, fragq->state_delta, chn->max_state_message_id); 1461 chn, msg_id, fragq->state_delta, chn->max_state_message_id);
@@ -1515,8 +1501,6 @@ message_queue_run (struct Channel *chn)
1515static uint64_t 1501static uint64_t
1516message_queue_drop (struct Channel *chn) 1502message_queue_drop (struct Channel *chn)
1517{ 1503{
1518 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1519 "%p Dropping message queue.\n", chn);
1520 uint64_t n = 0; 1504 uint64_t n = 0;
1521 uint64_t msg_id; 1505 uint64_t msg_id;
1522 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, 1506 while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
@@ -1703,7 +1687,7 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1703 res.result_code = htonl (result); 1687 res.result_code = htonl (result);
1704 res.max_message_id = GNUNET_htonll (max_message_id); 1688 res.max_message_id = GNUNET_htonll (max_message_id);
1705 1689
1706 if (GNUNET_OK == result || GNUNET_NO == result) 1690 if (GNUNET_YES == result || GNUNET_NO == result)
1707 { 1691 {
1708 chn->max_message_id = max_message_id; 1692 chn->max_message_id = max_message_id;
1709 chn->max_state_message_id = max_state_message_id; 1693 chn->max_state_message_id = max_state_message_id;
@@ -1831,6 +1815,9 @@ handle_client_slave_join (void *cls,
1831 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key; 1815 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1832 struct GNUNET_HashCode pub_key_hash, slv_pub_hash; 1816 struct GNUNET_HashCode pub_key_hash, slv_pub_hash;
1833 1817
1818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1819 "got join request from client %p\n",
1820 client);
1834 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key); 1821 GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
1835 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash); 1822 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_hash);
1836 GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash); 1823 GNUNET_CRYPTO_hash (&req->channel_pub_key, sizeof (req->channel_pub_key), &pub_key_hash);
@@ -1905,7 +1892,7 @@ handle_client_slave_join (void *cls,
1905 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn, 1892 GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
1906 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 1893 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1907 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key, 1894 chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
1908 &store_recv_slave_counters, slv); 1895 &store_recv_slave_counters, slv);
1909 } 1896 }
1910 else 1897 else
1911 { 1898 {
@@ -1952,8 +1939,9 @@ handle_client_slave_join (void *cls,
1952 } 1939 }
1953 1940
1954 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1941 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1955 "%p Client connected as slave to channel %s.\n", 1942 "Client %p connected as slave to channel %s.\n",
1956 slv, GNUNET_h2s (&chn->pub_key_hash)); 1943 client,
1944 GNUNET_h2s (&chn->pub_key_hash));
1957 1945
1958 struct ClientList *cli = GNUNET_malloc (sizeof (*cli)); 1946 struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
1959 cli->client = client; 1947 cli->client = client;
@@ -2037,6 +2025,49 @@ handle_client_join_decision (void *cls,
2037} 2025}
2038 2026
2039 2027
2028static void
2029channel_part_cb (void *cls)
2030{
2031 struct GNUNET_SERVICE_Client *client = cls;
2032 struct GNUNET_MQ_Envelope *env;
2033
2034 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_ACK);
2035 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
2036 env);
2037}
2038
2039
2040static void
2041handle_client_part_request (void *cls,
2042 const struct GNUNET_MessageHeader *msg)
2043{
2044 struct Client *c = cls;
2045
2046 c->channel->is_disconnecting = GNUNET_YES;
2047 if (GNUNET_YES == c->channel->is_master)
2048 {
2049 struct Master *mst = (struct Master *) c->channel;
2050
2051 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2052 "Got part request from master %p\n",
2053 mst);
2054 GNUNET_assert (NULL != mst->origin);
2055 GNUNET_MULTICAST_origin_stop (mst->origin, channel_part_cb, c->client);
2056 }
2057 else
2058 {
2059 struct Slave *slv = (struct Slave *) c->channel;
2060
2061 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062 "Got part request from slave %p\n",
2063 slv);
2064 GNUNET_assert (NULL != slv->member);
2065 GNUNET_MULTICAST_member_part (slv->member, channel_part_cb, c->client);
2066 }
2067 GNUNET_SERVICE_client_continue (c->client);
2068}
2069
2070
2040/** 2071/**
2041 * Send acknowledgement to a client. 2072 * Send acknowledgement to a client.
2042 * 2073 *
@@ -2096,7 +2127,7 @@ transmit_notify (void *cls, size_t *data_size, void *data)
2096 { 2127 {
2097 GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn); 2128 GNUNET_SCHEDULER_add_now (&schedule_transmit_message, chn);
2098 } 2129 }
2099 else if (GNUNET_YES == chn->is_disconnected 2130 else if (GNUNET_YES == chn->is_disconnecting
2100 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) 2131 && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
2101 { 2132 {
2102 /* FIXME: handle partial message (when still in_transmit) */ 2133 /* FIXME: handle partial message (when still in_transmit) */
@@ -2208,12 +2239,10 @@ transmit_message (struct Channel *chn)
2208static void 2239static void
2209master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg) 2240master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2210{ 2241{
2211 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
2212
2213 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) 2242 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
2214 { 2243 {
2215 tmit_msg->id = ++mst->max_message_id; 2244 tmit_msg->id = ++mst->max_message_id;
2216 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 2245 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2217 "%p master_queue_message: message_id=%" PRIu64 "\n", 2246 "%p master_queue_message: message_id=%" PRIu64 "\n",
2218 mst, tmit_msg->id); 2247 mst, tmit_msg->id);
2219 struct GNUNET_PSYC_MessageMethod *pmeth 2248 struct GNUNET_PSYC_MessageMethod *pmeth
@@ -2225,7 +2254,7 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2225 } 2254 }
2226 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) 2255 else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
2227 { 2256 {
2228 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 2257 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2229 "%p master_queue_message: state_delta=%" PRIu64 "\n", 2258 "%p master_queue_message: state_delta=%" PRIu64 "\n",
2230 mst, tmit_msg->id - mst->max_state_message_id); 2259 mst, tmit_msg->id - mst->max_state_message_id);
2231 pmeth->state_delta = GNUNET_htonll (tmit_msg->id 2260 pmeth->state_delta = GNUNET_htonll (tmit_msg->id
@@ -2234,7 +2263,7 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
2234 } 2263 }
2235 else 2264 else
2236 { 2265 {
2237 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 2266 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2238 "%p master_queue_message: state not modified\n", mst); 2267 "%p master_queue_message: state not modified\n", mst);
2239 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); 2268 pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
2240 } 2269 }
@@ -2359,7 +2388,9 @@ handle_client_psyc_message (void *cls,
2359 if (GNUNET_YES != chn->is_ready) 2388 if (GNUNET_YES != chn->is_ready)
2360 { 2389 {
2361 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 2390 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2362 "%p Channel is not ready yet, disconnecting client.\n", chn); 2391 "%p Channel is not ready yet, disconnecting client %p.\n",
2392 chn,
2393 client);
2363 GNUNET_break (0); 2394 GNUNET_break (0);
2364 GNUNET_SERVICE_client_drop (client); 2395 GNUNET_SERVICE_client_drop (client);
2365 return; 2396 return;
@@ -2789,9 +2820,9 @@ run (void *cls,
2789GNUNET_SERVICE_MAIN 2820GNUNET_SERVICE_MAIN
2790("psyc", 2821("psyc",
2791 GNUNET_SERVICE_OPTION_NONE, 2822 GNUNET_SERVICE_OPTION_NONE,
2792 run, 2823 &run,
2793 client_notify_connect, 2824 &client_notify_connect,
2794 client_notify_disconnect, 2825 &client_notify_disconnect,
2795 NULL, 2826 NULL,
2796 GNUNET_MQ_hd_fixed_size (client_master_start, 2827 GNUNET_MQ_hd_fixed_size (client_master_start,
2797 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 2828 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
@@ -2805,6 +2836,10 @@ GNUNET_SERVICE_MAIN
2805 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 2836 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
2806 struct GNUNET_PSYC_JoinDecisionMessage, 2837 struct GNUNET_PSYC_JoinDecisionMessage,
2807 NULL), 2838 NULL),
2839 GNUNET_MQ_hd_fixed_size (client_part_request,
2840 GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST,
2841 struct GNUNET_MessageHeader,
2842 NULL),
2808 GNUNET_MQ_hd_var_size (client_psyc_message, 2843 GNUNET_MQ_hd_var_size (client_psyc_message,
2809 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 2844 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
2810 struct GNUNET_MessageHeader, 2845 struct GNUNET_MessageHeader,
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index c93d8b383..d8f4c98bc 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -260,6 +260,10 @@ handle_channel_result (void *cls,
260 GNUNET_OP_result (chn->op, GNUNET_ntohll (res->op_id), 260 GNUNET_OP_result (chn->op, GNUNET_ntohll (res->op_id),
261 GNUNET_ntohll (res->result_code), 261 GNUNET_ntohll (res->result_code),
262 data, data_size, NULL); 262 data, data_size, NULL);
263
264 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
265 "handle_channel_result: Received result message with OP ID %" PRIu64 "\n",
266 GNUNET_ntohll (res->op_id));
263} 267}
264 268
265 269
@@ -555,6 +559,9 @@ handle_slave_join_decision (void *cls,
555static void 559static void
556channel_cleanup (struct GNUNET_PSYC_Channel *chn) 560channel_cleanup (struct GNUNET_PSYC_Channel *chn)
557{ 561{
562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
563 "cleaning up channel %p\n",
564 chn);
558 if (NULL != chn->tmit) 565 if (NULL != chn->tmit)
559 { 566 {
560 GNUNET_PSYC_transmit_destroy (chn->tmit); 567 GNUNET_PSYC_transmit_destroy (chn->tmit);
@@ -562,6 +569,7 @@ channel_cleanup (struct GNUNET_PSYC_Channel *chn)
562 } 569 }
563 if (NULL != chn->recv) 570 if (NULL != chn->recv)
564 { 571 {
572
565 GNUNET_PSYC_receive_destroy (chn->recv); 573 GNUNET_PSYC_receive_destroy (chn->recv);
566 chn->recv = NULL; 574 chn->recv = NULL;
567 } 575 }
@@ -585,30 +593,12 @@ channel_cleanup (struct GNUNET_PSYC_Channel *chn)
585 593
586 594
587static void 595static void
588channel_disconnect (struct GNUNET_PSYC_Channel *chn, 596handle_channel_part_ack (void *cls,
589 GNUNET_ContinuationCallback cb, 597 const struct GNUNET_MessageHeader *msg)
590 void *cls)
591{ 598{
592 chn->is_disconnecting = GNUNET_YES; 599 struct GNUNET_PSYC_Channel *chn = cls;
593 chn->disconnect_cb = cb;
594 chn->disconnect_cls = cls;
595 600
596 if (NULL != chn->mq) 601 channel_cleanup (chn);
597 {
598 struct GNUNET_MQ_Envelope *env = GNUNET_MQ_get_last_envelope (chn->mq);
599 if (NULL != env)
600 {
601 GNUNET_MQ_notify_sent (env, (GNUNET_SCHEDULER_TaskCallback) channel_cleanup, chn);
602 }
603 else
604 {
605 channel_cleanup (chn);
606 }
607 }
608 else
609 {
610 channel_cleanup (chn);
611 }
612} 602}
613 603
614 604
@@ -671,6 +661,10 @@ master_connect (struct GNUNET_PSYC_Master *mst)
671 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, 661 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
672 struct GNUNET_PSYC_JoinRequestMessage, 662 struct GNUNET_PSYC_JoinRequestMessage,
673 mst), 663 mst),
664 GNUNET_MQ_hd_fixed_size (channel_part_ack,
665 GNUNET_MESSAGE_TYPE_PSYC_PART_ACK,
666 struct GNUNET_MessageHeader,
667 chn),
674 GNUNET_MQ_hd_var_size (channel_message, 668 GNUNET_MQ_hd_var_size (channel_message,
675 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 669 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
676 struct GNUNET_PSYC_MessageHeader, 670 struct GNUNET_PSYC_MessageHeader,
@@ -694,8 +688,11 @@ master_connect (struct GNUNET_PSYC_Master *mst)
694 GNUNET_MQ_handler_end () 688 GNUNET_MQ_handler_end ()
695 }; 689 };
696 690
697 chn->mq = GNUNET_CLIENT_connect (chn->cfg, "psyc", 691 chn->mq = GNUNET_CLIENT_connect (chn->cfg,
698 handlers, master_disconnected, mst); 692 "psyc",
693 handlers,
694 &master_disconnected,
695 mst);
699 GNUNET_assert (NULL != chn->mq); 696 GNUNET_assert (NULL != chn->mq);
700 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); 697 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq);
701 698
@@ -780,10 +777,13 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst,
780 void *stop_cls) 777 void *stop_cls)
781{ 778{
782 struct GNUNET_PSYC_Channel *chn = &mst->chn; 779 struct GNUNET_PSYC_Channel *chn = &mst->chn;
780 struct GNUNET_MQ_Envelope *env;
783 781
784 /* FIXME: send msg to service */ 782 chn->is_disconnecting = GNUNET_YES;
785 783 chn->disconnect_cb = stop_cb;
786 channel_disconnect (chn, stop_cb, stop_cls); 784 chn->disconnect_cls = stop_cls;
785 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST);
786 GNUNET_MQ_send (chn->mq, env);
787} 787}
788 788
789 789
@@ -931,7 +931,8 @@ slave_reconnect (void *cls)
931 * Reconnect after backoff period. 931 * Reconnect after backoff period.
932 */ 932 */
933static void 933static void
934slave_disconnected (void *cls, enum GNUNET_MQ_Error error) 934slave_disconnected (void *cls,
935 enum GNUNET_MQ_Error error)
935{ 936{
936 struct GNUNET_PSYC_Slave *slv = cls; 937 struct GNUNET_PSYC_Slave *slv = cls;
937 struct GNUNET_PSYC_Channel *chn = &slv->chn; 938 struct GNUNET_PSYC_Channel *chn = &slv->chn;
@@ -950,7 +951,7 @@ slave_disconnected (void *cls, enum GNUNET_MQ_Error error)
950 chn->mq = NULL; 951 chn->mq = NULL;
951 } 952 }
952 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay, 953 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay,
953 slave_reconnect, 954 &slave_reconnect,
954 slv); 955 slv);
955 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay); 956 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay);
956} 957}
@@ -970,6 +971,10 @@ slave_connect (struct GNUNET_PSYC_Slave *slv)
970 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 971 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
971 struct GNUNET_PSYC_JoinDecisionMessage, 972 struct GNUNET_PSYC_JoinDecisionMessage,
972 slv), 973 slv),
974 GNUNET_MQ_hd_fixed_size (channel_part_ack,
975 GNUNET_MESSAGE_TYPE_PSYC_PART_ACK,
976 struct GNUNET_MessageHeader,
977 chn),
973 GNUNET_MQ_hd_var_size (channel_message, 978 GNUNET_MQ_hd_var_size (channel_message,
974 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 979 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
975 struct GNUNET_PSYC_MessageHeader, 980 struct GNUNET_PSYC_MessageHeader,
@@ -993,9 +998,19 @@ slave_connect (struct GNUNET_PSYC_Slave *slv)
993 GNUNET_MQ_handler_end () 998 GNUNET_MQ_handler_end ()
994 }; 999 };
995 1000
996 chn->mq = GNUNET_CLIENT_connect (chn->cfg, "psyc", 1001 chn->mq = GNUNET_CLIENT_connect (chn->cfg,
997 handlers, slave_disconnected, slv); 1002 "psyc",
998 GNUNET_assert (NULL != chn->mq); 1003 handlers,
1004 &slave_disconnected,
1005 slv);
1006 if (NULL == chn->mq)
1007 {
1008 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay,
1009 &slave_reconnect,
1010 slv);
1011 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay);
1012 return;
1013 }
999 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); 1014 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq);
1000 1015
1001 GNUNET_MQ_send_copy (chn->mq, chn->connect_env); 1016 GNUNET_MQ_send_copy (chn->mq, chn->connect_env);
@@ -1107,10 +1122,13 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv,
1107 void *part_cls) 1122 void *part_cls)
1108{ 1123{
1109 struct GNUNET_PSYC_Channel *chn = &slv->chn; 1124 struct GNUNET_PSYC_Channel *chn = &slv->chn;
1125 struct GNUNET_MQ_Envelope *env;
1110 1126
1111 /* FIXME: send msg to service */ 1127 chn->is_disconnecting = GNUNET_YES;
1112 1128 chn->disconnect_cb = part_cb;
1113 channel_disconnect (chn, part_cb, part_cls); 1129 chn->disconnect_cls = part_cls;
1130 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST);
1131 GNUNET_MQ_send (chn->mq, env);
1114} 1132}
1115 1133
1116 1134
@@ -1233,6 +1251,9 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1233 req->did_join = GNUNET_YES; 1251 req->did_join = GNUNET_YES;
1234 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); 1252 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL));
1235 1253
1254 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255 "GNUNET_PSYC_channel_slave_add, OP ID: %" PRIu64 "\n",
1256 GNUNET_ntohll (req->op_id));
1236 GNUNET_MQ_send (chn->mq, env); 1257 GNUNET_MQ_send (chn->mq, env);
1237} 1258}
1238 1259
@@ -1283,6 +1304,9 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1283 req->did_join = GNUNET_NO; 1304 req->did_join = GNUNET_NO;
1284 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); 1305 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL));
1285 1306
1307 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1308 "GNUNET_PSYC_channel_slave_remove, OP ID: %" PRIu64 "\n",
1309 GNUNET_ntohll (req->op_id));
1286 GNUNET_MQ_send (chn->mq, env); 1310 GNUNET_MQ_send (chn->mq, env);
1287} 1311}
1288 1312
@@ -1321,6 +1345,10 @@ channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1321 req->message_limit = GNUNET_htonll (message_limit); 1345 req->message_limit = GNUNET_htonll (message_limit);
1322 req->flags = htonl (flags); 1346 req->flags = htonl (flags);
1323 req->op_id = GNUNET_htonll (hist->op_id); 1347 req->op_id = GNUNET_htonll (hist->op_id);
1348
1349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1350 "channel_history_replay, OP ID: %" PRIu64 "\n",
1351 GNUNET_ntohll (req->op_id));
1324 GNUNET_memcpy (&req[1], method_prefix, method_size); 1352 GNUNET_memcpy (&req[1], method_prefix, method_size);
1325 1353
1326 GNUNET_MQ_send (chn->mq, env); 1354 GNUNET_MQ_send (chn->mq, env);
@@ -1459,6 +1487,11 @@ channel_state_get (struct GNUNET_PSYC_Channel *chn,
1459 struct GNUNET_MQ_Envelope * 1487 struct GNUNET_MQ_Envelope *
1460 env = GNUNET_MQ_msg_extra (req, name_size, type); 1488 env = GNUNET_MQ_msg_extra (req, name_size, type);
1461 req->op_id = GNUNET_htonll (sr->op_id); 1489 req->op_id = GNUNET_htonll (sr->op_id);
1490
1491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1492 "channel_state_get, OP ID: %" PRIu64 "\n",
1493 GNUNET_ntohll (req->op_id));
1494
1462 GNUNET_memcpy (&req[1], name, name_size); 1495 GNUNET_memcpy (&req[1], name, name_size);
1463 1496
1464 GNUNET_MQ_send (chn->mq, env); 1497 GNUNET_MQ_send (chn->mq, env);
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 03a1890b1..370befb9d 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -755,15 +755,22 @@ slave_add ()
755 755
756 756
757static void 757static void
758schedule_second_slave_join (void *cls)
759{
760 slave_join (TEST_SLAVE_JOIN_ACCEPT);
761}
762
763
764static void
758first_slave_parted (void *cls) 765first_slave_parted (void *cls)
759{ 766{
760 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "First slave parted.\n"); 767 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "First slave parted.\n");
761 slave_join (TEST_SLAVE_JOIN_ACCEPT); 768 GNUNET_SCHEDULER_add_now (&schedule_second_slave_join, NULL);
762} 769}
763 770
764 771
765static void 772static void
766schedule_slave_part (void *cls) 773schedule_first_slave_part (void *cls)
767{ 774{
768 GNUNET_PSYC_slave_part (slv, GNUNET_NO, &first_slave_parted, NULL); 775 GNUNET_PSYC_slave_part (slv, GNUNET_NO, &first_slave_parted, NULL);
769} 776}
@@ -783,7 +790,7 @@ join_decision_cb (void *cls,
783 case TEST_SLAVE_JOIN_REJECT: 790 case TEST_SLAVE_JOIN_REJECT:
784 GNUNET_assert (0 == is_admitted); 791 GNUNET_assert (0 == is_admitted);
785 GNUNET_assert (1 == join_req_count); 792 GNUNET_assert (1 == join_req_count);
786 GNUNET_SCHEDULER_add_now (&schedule_slave_part, NULL); 793 GNUNET_SCHEDULER_add_now (&schedule_first_slave_part, NULL);
787 break; 794 break;
788 795
789 case TEST_SLAVE_JOIN_ACCEPT: 796 case TEST_SLAVE_JOIN_ACCEPT:
@@ -844,11 +851,18 @@ slave_join (int t)
844 struct GNUNET_PSYC_Message * 851 struct GNUNET_PSYC_Message *
845 join_msg = GNUNET_PSYC_message_create ("_request_join", env, "some data", 9); 852 join_msg = GNUNET_PSYC_message_create ("_request_join", env, "some data", 9);
846 853
847 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, 854 slv = GNUNET_PSYC_slave_join (cfg,
855 &channel_pub_key,
856 slave_key,
848 GNUNET_PSYC_SLAVE_JOIN_NONE, 857 GNUNET_PSYC_SLAVE_JOIN_NONE,
849 &origin, 0, NULL, 858 &origin,
850 &slave_message_cb, &slave_message_part_cb, 859 0,
851 &slave_connect_cb, &join_decision_cb, NULL, 860 NULL,
861 &slave_message_cb,
862 &slave_message_part_cb,
863 &slave_connect_cb,
864 &join_decision_cb,
865 NULL,
852 join_msg); 866 join_msg);
853 GNUNET_free (join_msg); 867 GNUNET_free (join_msg);
854 slv_chn = GNUNET_PSYC_slave_get_channel (slv); 868 slv_chn = GNUNET_PSYC_slave_get_channel (slv);
diff --git a/src/psyc/test_psyc.conf b/src/psyc/test_psyc.conf
index 25a332ea0..e00a614d2 100644
--- a/src/psyc/test_psyc.conf
+++ b/src/psyc/test_psyc.conf
@@ -1,50 +1,16 @@
1[arm] 1@INLINE@ ../../contrib/no_forcestart.conf
2#GLOBAL_POSTFIX=-L ERROR
3 2
4[vpn] 3[PATHS]
5AUTOSTART = NO 4GNUNET_TEST_HOME = /tmp/gnunet-test-psyc/
6 5
7[peerinfo] 6[transport]
8# Do not use shipped gnunet HELLOs 7PLUGINS = tcp
9USE_INCLUDED_HELLOS = NO
10
11# Option to disable all disk IO; only useful for testbed runs
12# (large-scale experiments); disables persistence of HELLOs!
13NO_IO = YES
14
15[hostlist]
16FORCESTART = NO
17AUTOSTART = NO
18 8
19[nat] 9[nat]
20FORCESTART = NO 10DISABLEV6 = YES
21AUTOSTART = NO
22ENABLE_UPNP = NO 11ENABLE_UPNP = NO
12BEHIND_NAT = NO
13ALLOW_NAT = NO
14INTERNAL_ADDRESS = 127.0.0.1
15EXTERNAL_ADDRESS = 127.0.0.1
23 16
24[fs]
25FORCESTART = NO
26AUTOSTART = NO
27
28[vpn]
29FORCESTART = NO
30AUTOSTART = NO
31
32[revocation]
33FORCESTART = NO
34AUTOSTART = NO
35
36[gns]
37FORCESTART = NO
38AUTOSTART = NO
39
40[namestore]
41FORCESTART = NO
42AUTOSTART = NO
43
44[namecache]
45FORCESTART = NO
46AUTOSTART = NO
47
48[topology]
49FORCESTART = NO
50AUTOSTART = NO