aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-07-23 16:19:49 +0000
committerGabor X Toth <*@tg-x.net>2014-07-23 16:19:49 +0000
commit3cf8ba0b60f8495892fa76635e9c23555d0a304c (patch)
tree5f27648bdb3cf3409628e4e5edc26f811cbd03a5 /src/psyc
parent252b5599987b7ba03b879a8c2d1c455ad4c9834a (diff)
downloadgnunet-3cf8ba0b60f8495892fa76635e9c23555d0a304c.tar.gz
gnunet-3cf8ba0b60f8495892fa76635e9c23555d0a304c.zip
social: implement enter/leave/messaging; psyc: improvements and fixes
- social: implement enter/leave, send/receive messages, slicer - psyc, social: add struct GNUNET_PSYC_Message for single-fragment join messages - psyc: add message callback in addition to message part callback - client_manager, social, psyc, multicast: add disconnect callback
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c112
-rw-r--r--src/psyc/psyc.h16
-rw-r--r--src/psyc/psyc_api.c133
-rw-r--r--src/psyc/psyc_util_lib.c268
-rw-r--r--src/psyc/test_psyc.c93
5 files changed, 456 insertions, 166 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 866275a79..8fc080baf 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -168,7 +168,7 @@ struct FragmentQueue
168 * Is the message queued for delivery to the client? 168 * Is the message queued for delivery to the client?
169 * i.e. added to the recv_msgs queue 169 * i.e. added to the recv_msgs queue
170 */ 170 */
171 uint8_t queued; 171 uint8_t is_queued;
172}; 172};
173 173
174 174
@@ -382,7 +382,7 @@ struct Slave
382 /** 382 /**
383 * Join request to be transmitted to the master on join. 383 * Join request to be transmitted to the master on join.
384 */ 384 */
385 struct GNUNET_MessageHeader *join_req; 385 struct GNUNET_PSYC_Message *join_msg;
386 386
387 /** 387 /**
388 * Join decision received from multicast. 388 * Join decision received from multicast.
@@ -435,7 +435,7 @@ cleanup_master (struct Master *mst)
435 struct Channel *chn = &mst->chn; 435 struct Channel *chn = &mst->chn;
436 436
437 if (NULL != mst->origin) 437 if (NULL != mst->origin)
438 GNUNET_MULTICAST_origin_stop (mst->origin); 438 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
439 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); 439 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
440 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn); 440 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
441} 441}
@@ -462,12 +462,21 @@ cleanup_slave (struct Slave *slv)
462 } 462 }
463 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); 463 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
464 464
465 if (NULL != slv->join_req) 465 if (NULL != slv->join_msg)
466 GNUNET_free (slv->join_req); 466 {
467 GNUNET_free (slv->join_msg);
468 slv->join_msg = NULL;
469 }
467 if (NULL != slv->relays) 470 if (NULL != slv->relays)
471 {
468 GNUNET_free (slv->relays); 472 GNUNET_free (slv->relays);
473 slv->relays = NULL;
474 }
469 if (NULL != slv->member) 475 if (NULL != slv->member)
470 GNUNET_MULTICAST_member_part (slv->member); 476 {
477 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
478 slv->member = NULL;
479 }
471 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn); 480 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
472} 481}
473 482
@@ -482,7 +491,10 @@ cleanup_channel (struct Channel *chn)
482 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash); 491 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
483 492
484 if (NULL != chn->store_op) 493 if (NULL != chn->store_op)
494 {
485 GNUNET_PSYCSTORE_operation_cancel (chn->store_op); 495 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
496 chn->store_op = NULL;
497 }
486 498
487 (GNUNET_YES == chn->is_master) 499 (GNUNET_YES == chn->is_master)
488 ? cleanup_master ((struct Master *) chn) 500 ? cleanup_master ((struct Master *) chn)
@@ -574,7 +586,7 @@ struct JoinMemTestClosure
574 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; 586 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
575 struct Channel *chn; 587 struct Channel *chn;
576 struct GNUNET_MULTICAST_JoinHandle *jh; 588 struct GNUNET_MULTICAST_JoinHandle *jh;
577 struct MasterJoinRequest *master_join_req; 589 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
578}; 590};
579 591
580 592
@@ -594,14 +606,14 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
594 &slave_key_hash); 606 &slave_key_hash);
595 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh, 607 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
596 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 608 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
597 client_send_msg (jcls->chn, &jcls->master_join_req->header); 609 client_send_msg (jcls->chn, &jcls->join_msg->header);
598 } 610 }
599 else 611 else
600 { 612 {
601 // FIXME: add relays 613 // FIXME: add relays
602 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL); 614 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
603 } 615 }
604 GNUNET_free (jcls->master_join_req); 616 GNUNET_free (jcls->join_msg);
605 GNUNET_free (jcls); 617 GNUNET_free (jcls);
606} 618}
607 619
@@ -633,7 +645,8 @@ mcast_recv_join_request (void *cls,
633 } 645 }
634 } 646 }
635 647
636 struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_msg_size); 648 struct GNUNET_PSYC_JoinRequestMessage *
649 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
637 req->header.size = htons (sizeof (*req) + join_msg_size); 650 req->header.size = htons (sizeof (*req) + join_msg_size);
638 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST); 651 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
639 req->slave_key = *slave_key; 652 req->slave_key = *slave_key;
@@ -644,7 +657,7 @@ mcast_recv_join_request (void *cls,
644 jcls->slave_key = *slave_key; 657 jcls->slave_key = *slave_key;
645 jcls->chn = chn; 658 jcls->chn = chn;
646 jcls->jh = jh; 659 jcls->jh = jh;
647 jcls->master_join_req = req; 660 jcls->join_msg = req;
648 661
649 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key, 662 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
650 chn->max_message_id, 0, 663 chn->max_message_id, 0,
@@ -780,6 +793,7 @@ client_send_mcast_msg (struct Channel *chn,
780 pmsg->header.size = htons (psize); 793 pmsg->header.size = htons (psize);
781 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 794 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
782 pmsg->message_id = mmsg->message_id; 795 pmsg->message_id = mmsg->message_id;
796 pmsg->fragment_offset = mmsg->fragment_offset;
783 797
784 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); 798 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
785 client_send_msg (chn, &pmsg->header); 799 client_send_msg (chn, &pmsg->header);
@@ -810,6 +824,7 @@ client_send_mcast_req (struct Master *mst,
810 pmsg->header.size = htons (psize); 824 pmsg->header.size = htons (psize);
811 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 825 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
812 pmsg->message_id = req->request_id; 826 pmsg->message_id = req->request_id;
827 pmsg->fragment_offset = req->fragment_offset;
813 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); 828 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
814 829
815 memcpy (&pmsg[1], &req[1], size - sizeof (*req)); 830 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
@@ -870,11 +885,12 @@ fragment_queue_insert (struct Channel *chn,
870 { 885 {
871 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 886 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872 "%p Adding message fragment to cache. " 887 "%p Adding message fragment to cache. "
873 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", " 888 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
874 "header_size: %" PRIu64 " + %u).\n",
875 chn, GNUNET_ntohll (mmsg->message_id), 889 chn, GNUNET_ntohll (mmsg->message_id),
876 GNUNET_ntohll (mmsg->fragment_id), 890 GNUNET_ntohll (mmsg->fragment_id));
877 fragq->header_size, size); 891 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
892 "%p header_size: %" PRIu64 " + %u\n",
893 chn, fragq->header_size, size);
878 cache_entry = GNUNET_new (struct RecvCacheEntry); 894 cache_entry = GNUNET_new (struct RecvCacheEntry);
879 cache_entry->ref_count = 1; 895 cache_entry->ref_count = 1;
880 cache_entry->mmsg = GNUNET_malloc (size); 896 cache_entry->mmsg = GNUNET_malloc (size);
@@ -955,11 +971,11 @@ fragment_queue_insert (struct Channel *chn,
955 case MSG_FRAG_STATE_DATA: 971 case MSG_FRAG_STATE_DATA:
956 case MSG_FRAG_STATE_END: 972 case MSG_FRAG_STATE_END:
957 case MSG_FRAG_STATE_CANCEL: 973 case MSG_FRAG_STATE_CANCEL:
958 if (GNUNET_NO == fragq->queued) 974 if (GNUNET_NO == fragq->is_queued)
959 { 975 {
960 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL, 976 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
961 GNUNET_ntohll (mmsg->message_id)); 977 GNUNET_ntohll (mmsg->message_id));
962 fragq->queued = GNUNET_YES; 978 fragq->is_queued = GNUNET_YES;
963 } 979 }
964 } 980 }
965 981
@@ -1034,7 +1050,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1034 if (MSG_FRAG_STATE_END <= fragq->state) 1050 if (MSG_FRAG_STATE_END <= fragq->state)
1035 { 1051 {
1036 struct GNUNET_HashCode msg_id_hash; 1052 struct GNUNET_HashCode msg_id_hash;
1037 hash_key_from_nll (&msg_id_hash, msg_id); 1053 hash_key_from_hll (&msg_id_hash, msg_id);
1038 1054
1039 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq); 1055 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1040 GNUNET_CONTAINER_heap_destroy (fragq->fragments); 1056 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
@@ -1042,7 +1058,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1042 } 1058 }
1043 else 1059 else
1044 { 1060 {
1045 fragq->queued = GNUNET_NO; 1061 fragq->is_queued = GNUNET_NO;
1046 } 1062 }
1047} 1063}
1048 1064
@@ -1331,13 +1347,18 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1331 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, 1347 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1332 &slv->origin, 1348 &slv->origin,
1333 slv->relay_count, slv->relays, 1349 slv->relay_count, slv->relays,
1334 slv->join_req, 1350 &slv->join_msg->header,
1335 &mcast_recv_join_request, 1351 &mcast_recv_join_request,
1336 &mcast_recv_join_decision, 1352 &mcast_recv_join_decision,
1337 &mcast_recv_membership_test, 1353 &mcast_recv_membership_test,
1338 &mcast_recv_replay_fragment, 1354 &mcast_recv_replay_fragment,
1339 &mcast_recv_replay_message, 1355 &mcast_recv_replay_message,
1340 &mcast_recv_message, chn); 1356 &mcast_recv_message, chn);
1357 if (NULL != slv->join_msg)
1358 {
1359 GNUNET_free (slv->join_msg);
1360 slv->join_msg = NULL;
1361 }
1341 } 1362 }
1342 else 1363 else
1343 { 1364 {
@@ -1435,6 +1456,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1435{ 1456{
1436 const struct SlaveJoinRequest *req 1457 const struct SlaveJoinRequest *req
1437 = (const struct SlaveJoinRequest *) msg; 1458 = (const struct SlaveJoinRequest *) msg;
1459 uint16_t req_size = ntohs (req->header.size);
1438 1460
1439 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key; 1461 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1440 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash; 1462 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
@@ -1460,15 +1482,32 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1460 slv->pub_key_hash = slv_pub_key_hash; 1482 slv->pub_key_hash = slv_pub_key_hash;
1461 slv->origin = req->origin; 1483 slv->origin = req->origin;
1462 slv->relay_count = ntohl (req->relay_count); 1484 slv->relay_count = ntohl (req->relay_count);
1485
1486 const struct GNUNET_PeerIdentity *
1487 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1488 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1489 uint16_t join_msg_size = 0;
1490
1491 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1492 <= req_size)
1493 {
1494 join_msg_size = ntohs (slv->join_msg->header.size);
1495 slv->join_msg = GNUNET_malloc (join_msg_size);
1496 memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
1497 }
1498 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1499 {
1500 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1501 "%u + %u + %u != %u\n",
1502 sizeof (*req), relay_size, join_msg_size, req_size);
1503 GNUNET_break (0);
1504 GNUNET_SERVER_client_disconnect (client);
1505 return;
1506 }
1463 if (0 < slv->relay_count) 1507 if (0 < slv->relay_count)
1464 { 1508 {
1465 const struct GNUNET_PeerIdentity *relays 1509 slv->relays = GNUNET_malloc (relay_size);
1466 = (const struct GNUNET_PeerIdentity *) &req[1]; 1510 memcpy (slv->relays, &req[1], relay_size);
1467 slv->relays
1468 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
1469 uint32_t i;
1470 for (i = 0; i < slv->relay_count; i++)
1471 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1472 } 1511 }
1473 1512
1474 chn = &slv->chn; 1513 chn = &slv->chn;
@@ -1510,14 +1549,18 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1510 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, 1549 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1511 &slv->origin, 1550 &slv->origin,
1512 slv->relay_count, slv->relays, 1551 slv->relay_count, slv->relays,
1513 slv->join_req, 1552 &slv->join_msg->header,
1514 &mcast_recv_join_request, 1553 &mcast_recv_join_request,
1515 &mcast_recv_join_decision, 1554 &mcast_recv_join_decision,
1516 &mcast_recv_membership_test, 1555 &mcast_recv_membership_test,
1517 &mcast_recv_replay_fragment, 1556 &mcast_recv_replay_fragment,
1518 &mcast_recv_replay_message, 1557 &mcast_recv_replay_message,
1519 &mcast_recv_message, chn); 1558 &mcast_recv_message, chn);
1520 1559 if (NULL != slv->join_msg)
1560 {
1561 GNUNET_free (slv->join_msg);
1562 slv->join_msg = NULL;
1563 }
1521 } 1564 }
1522 else if (NULL != slv->join_dcsn) 1565 else if (NULL != slv->join_dcsn)
1523 { 1566 {
@@ -1549,13 +1592,14 @@ struct JoinDecisionClosure
1549 1592
1550 1593
1551/** 1594/**
1552 * Iterator callback for responding to join requests of a slave. 1595 * Iterator callback for sending join decisions to multicast.
1553 */ 1596 */
1554static int 1597static int
1555mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash, 1598mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1556 void *jh) 1599 void *value)
1557{ 1600{
1558 struct JoinDecisionClosure *jcls = cls; 1601 struct JoinDecisionClosure *jcls = cls;
1602 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1559 // FIXME: add relays 1603 // FIXME: add relays
1560 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg); 1604 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1561 return GNUNET_YES; 1605 return GNUNET_YES;
@@ -1579,8 +1623,7 @@ client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1579 struct JoinDecisionClosure jcls; 1623 struct JoinDecisionClosure jcls;
1580 jcls.is_admitted = ntohl (dcsn->is_admitted); 1624 jcls.is_admitted = ntohl (dcsn->is_admitted);
1581 jcls.msg 1625 jcls.msg
1582 = (sizeof (*dcsn) + sizeof (struct GNUNET_PSYC_MessageHeader) 1626 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1583 <= ntohs (msg->size))
1584 ? (struct GNUNET_MessageHeader *) &dcsn[1] 1627 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1585 : NULL; 1628 : NULL;
1586 1629
@@ -1901,6 +1944,9 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1901 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1944 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1902 return; 1945 return;
1903 } 1946 }
1947 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1948 "%p Received message with first part type %u and last part type %u.\n",
1949 chn, first_ptype, last_ptype);
1904 1950
1905 queue_message (chn, client, size - sizeof (*msg), &msg[1], 1951 queue_message (chn, client, size - sizeof (*msg), &msg[1],
1906 first_ptype, last_ptype); 1952 first_ptype, last_ptype);
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 995fb1fa4..82800a334 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -229,22 +229,6 @@ struct OperationResult
229 */ 229 */
230}; 230};
231 231
232
233struct MasterJoinRequest
234{
235 /**
236 * Type: GNUNET_MESSAGE_TYPE_PSYC_MASTER_JOIN_REQUEST
237 */
238 struct GNUNET_MessageHeader header;
239 /**
240 * Public key of the joining slave.
241 */
242 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
243
244 /* Followed by struct GNUNET_MessageHeader join_request */
245};
246
247
248GNUNET_NETWORK_STRUCT_END 232GNUNET_NETWORK_STRUCT_END
249 233
250#endif 234#endif
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 88b007a0f..a43b1ef5f 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -74,6 +74,16 @@ struct GNUNET_PSYC_Channel
74 struct GNUNET_MessageHeader *connect_msg; 74 struct GNUNET_MessageHeader *connect_msg;
75 75
76 /** 76 /**
77 * Function called after disconnected from the service.
78 */
79 GNUNET_ContinuationCallback disconnect_cb;
80
81 /**
82 * Closure for @a disconnect_cb.
83 */
84 void *disconnect_cls;
85
86 /**
77 * Are we polling for incoming messages right now? 87 * Are we polling for incoming messages right now?
78 */ 88 */
79 uint8_t in_receive; 89 uint8_t in_receive;
@@ -82,6 +92,12 @@ struct GNUNET_PSYC_Channel
82 * Is this a master or slave channel? 92 * Is this a master or slave channel?
83 */ 93 */
84 uint8_t is_master; 94 uint8_t is_master;
95
96 /**
97 * Is this channel in the process of disconnecting from the service?
98 * #GNUNET_YES or #GNUNET_NO
99 */
100 uint8_t is_disconnecting;
85}; 101};
86 102
87 103
@@ -232,19 +248,26 @@ master_recv_join_request (void *cls,
232 struct GNUNET_PSYC_Master * 248 struct GNUNET_PSYC_Master *
233 mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, 249 mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
234 sizeof (struct GNUNET_PSYC_Channel)); 250 sizeof (struct GNUNET_PSYC_Channel));
235 251 if (NULL == mst->join_req_cb)
236 const struct MasterJoinRequest *req = (const struct MasterJoinRequest *) msg; 252 return;
237 253
238 struct GNUNET_PSYC_MessageHeader *pmsg = NULL; 254 const struct GNUNET_PSYC_JoinRequestMessage *
239 if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*pmsg)) 255 req = (const struct GNUNET_PSYC_JoinRequestMessage *) msg;
240 pmsg = (struct GNUNET_PSYC_MessageHeader *) &req[1]; 256 const struct GNUNET_PSYC_Message *join_msg = NULL;
257 if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size))
258 {
259 join_msg = (struct GNUNET_PSYC_Message *) &req[1];
260 LOG (GNUNET_ERROR_TYPE_ERROR,
261 "Received join_msg of type %u and size %u.\n",
262 ntohs (join_msg->header.type), ntohs (join_msg->header.size));
263 }
241 264
242 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); 265 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
243 jh->mst = mst; 266 jh->mst = mst;
244 jh->slave_key = req->slave_key; 267 jh->slave_key = req->slave_key;
245 268
246 if (NULL != mst->join_req_cb) 269 if (NULL != mst->join_req_cb)
247 mst->join_req_cb (mst->cb_cls, &req->slave_key, pmsg, jh); 270 mst->join_req_cb (mst->cb_cls, req, &req->slave_key, join_msg, jh);
248} 271}
249 272
250 273
@@ -273,13 +296,12 @@ slave_recv_join_decision (void *cls,
273 const struct GNUNET_PSYC_JoinDecisionMessage * 296 const struct GNUNET_PSYC_JoinDecisionMessage *
274 dcsn = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg; 297 dcsn = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
275 298
276 struct GNUNET_PSYC_MessageHeader *pmsg = NULL; 299 struct GNUNET_PSYC_Message *pmsg = NULL;
277 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) 300 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg))
278 pmsg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1]; 301 pmsg = (struct GNUNET_PSYC_Message *) &dcsn[1];
279 302
280 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
281 if (NULL != slv->join_dcsn_cb) 303 if (NULL != slv->join_dcsn_cb)
282 slv->join_dcsn_cb (slv->cb_cls, ntohl (dcsn->is_admitted), pmsg); 304 slv->join_dcsn_cb (slv->cb_cls, dcsn, ntohl (dcsn->is_admitted), pmsg);
283} 305}
284 306
285 307
@@ -299,7 +321,7 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] =
299 321
300 { &master_recv_join_request, NULL, 322 { &master_recv_join_request, NULL,
301 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, 323 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
302 sizeof (struct MasterJoinRequest), GNUNET_YES }, 324 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES },
303 325
304 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 326 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
305 327
@@ -331,6 +353,35 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] =
331}; 353};
332 354
333 355
356static void
357channel_cleanup (struct GNUNET_PSYC_Channel *chn)
358{
359 GNUNET_PSYC_transmit_destroy (chn->tmit);
360 GNUNET_PSYC_receive_destroy (chn->recv);
361 GNUNET_free (chn->connect_msg);
362 if (NULL != chn->disconnect_cb)
363 chn->disconnect_cb (chn->disconnect_cls);
364}
365
366
367static void
368master_cleanup (void *cls)
369{
370 struct GNUNET_PSYC_Master *mst = cls;
371 channel_cleanup (&mst->chn);
372 GNUNET_free (mst);
373}
374
375
376static void
377slave_cleanup (void *cls)
378{
379 struct GNUNET_PSYC_Slave *slv = cls;
380 channel_cleanup (&slv->chn);
381 GNUNET_free (slv);
382}
383
384
334/** 385/**
335 * Start a PSYC master channel. 386 * Start a PSYC master channel.
336 * 387 *
@@ -367,6 +418,7 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
367 GNUNET_PSYC_MasterStartCallback start_cb, 418 GNUNET_PSYC_MasterStartCallback start_cb,
368 GNUNET_PSYC_JoinRequestCallback join_request_cb, 419 GNUNET_PSYC_JoinRequestCallback join_request_cb,
369 GNUNET_PSYC_MessageCallback message_cb, 420 GNUNET_PSYC_MessageCallback message_cb,
421 GNUNET_PSYC_MessagePartCallback message_part_cb,
370 void *cls) 422 void *cls)
371{ 423{
372 struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); 424 struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst));
@@ -390,7 +442,7 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
390 GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn)); 442 GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn));
391 443
392 chn->tmit = GNUNET_PSYC_transmit_create (chn->client); 444 chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
393 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls); 445 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
394 446
395 channel_send_connect_msg (chn); 447 channel_send_connect_msg (chn);
396 return mst; 448 return mst;
@@ -404,10 +456,21 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
404 * @param keep_active FIXME 456 * @param keep_active FIXME
405 */ 457 */
406void 458void
407GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst) 459GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst,
460 int keep_active,
461 GNUNET_ContinuationCallback stop_cb,
462 void *stop_cls)
408{ 463{
409 GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES); 464 struct GNUNET_PSYC_Channel *chn = &mst->chn;
410 GNUNET_free (mst); 465
466 /* FIXME: send msg to service */
467
468 chn->is_disconnecting = GNUNET_YES;
469 chn->disconnect_cb = stop_cb;
470 chn->disconnect_cls = stop_cls;
471
472 GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES,
473 &master_cleanup, mst);
411} 474}
412 475
413 476
@@ -439,7 +502,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
439 int is_admitted, 502 int is_admitted,
440 uint32_t relay_count, 503 uint32_t relay_count,
441 const struct GNUNET_PeerIdentity *relays, 504 const struct GNUNET_PeerIdentity *relays,
442 const struct GNUNET_PSYC_MessageHeader *join_resp) 505 const struct GNUNET_PSYC_Message *join_resp)
443{ 506{
444 struct GNUNET_PSYC_Channel *chn = &jh->mst->chn; 507 struct GNUNET_PSYC_Channel *chn = &jh->mst->chn;
445 struct GNUNET_PSYC_JoinDecisionMessage *dcsn; 508 struct GNUNET_PSYC_JoinDecisionMessage *dcsn;
@@ -461,6 +524,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
461 memcpy (&dcsn[1], join_resp, join_resp_size); 524 memcpy (&dcsn[1], join_resp, join_resp_size);
462 525
463 GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header); 526 GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header);
527 GNUNET_free (jh);
464 return GNUNET_OK; 528 return GNUNET_OK;
465} 529}
466 530
@@ -576,15 +640,19 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
576 uint32_t relay_count, 640 uint32_t relay_count,
577 const struct GNUNET_PeerIdentity *relays, 641 const struct GNUNET_PeerIdentity *relays,
578 GNUNET_PSYC_MessageCallback message_cb, 642 GNUNET_PSYC_MessageCallback message_cb,
643 GNUNET_PSYC_MessagePartCallback message_part_cb,
579 GNUNET_PSYC_SlaveConnectCallback connect_cb, 644 GNUNET_PSYC_SlaveConnectCallback connect_cb,
580 GNUNET_PSYC_JoinDecisionCallback join_decision_cb, 645 GNUNET_PSYC_JoinDecisionCallback join_decision_cb,
581 void *cls, 646 void *cls,
582 const struct GNUNET_MessageHeader *join_msg) 647 const struct GNUNET_PSYC_Message *join_msg)
583{ 648{
584 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); 649 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
585 struct GNUNET_PSYC_Channel *chn = &slv->chn; 650 struct GNUNET_PSYC_Channel *chn = &slv->chn;
651
652 uint16_t relay_size = relay_count * sizeof (*relays);
653 uint16_t join_msg_size = ntohs (join_msg->header.size);
586 struct SlaveJoinRequest *req 654 struct SlaveJoinRequest *req
587 = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays)); 655 = GNUNET_malloc (sizeof (*req) + relay_size + join_msg_size);
588 req->header.size = htons (sizeof (*req) 656 req->header.size = htons (sizeof (*req)
589 + relay_count * sizeof (*relays)); 657 + relay_count * sizeof (*relays));
590 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); 658 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
@@ -592,7 +660,12 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
592 req->slave_key = *slave_key; 660 req->slave_key = *slave_key;
593 req->origin = *origin; 661 req->origin = *origin;
594 req->relay_count = htonl (relay_count); 662 req->relay_count = htonl (relay_count);
595 memcpy (&req[1], relays, relay_count * sizeof (*relays)); 663
664 if (0 < relay_size)
665 memcpy (&req[1], relays, relay_size);
666
667 if (0 < join_msg_size)
668 memcpy ((char *) &req[1] + relay_size, join_msg, join_msg_size);
596 669
597 chn->connect_msg = (struct GNUNET_MessageHeader *) req; 670 chn->connect_msg = (struct GNUNET_MessageHeader *) req;
598 chn->cfg = cfg; 671 chn->cfg = cfg;
@@ -605,7 +678,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
605 chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers); 678 chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers);
606 GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn)); 679 GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn));
607 680
608 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls); 681 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
609 chn->tmit = GNUNET_PSYC_transmit_create (chn->client); 682 chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
610 683
611 channel_send_connect_msg (chn); 684 channel_send_connect_msg (chn);
@@ -622,10 +695,21 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
622 * @param slave Slave handle. 695 * @param slave Slave handle.
623 */ 696 */
624void 697void
625GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv) 698GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv,
699 int keep_active,
700 GNUNET_ContinuationCallback part_cb,
701 void *part_cls)
626{ 702{
627 GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES); 703 struct GNUNET_PSYC_Channel *chn = &slv->chn;
628 GNUNET_free (slv); 704
705 /* FIXME: send msg to service */
706
707 chn->is_disconnecting = GNUNET_YES;
708 chn->disconnect_cb = part_cb;
709 chn->disconnect_cls = part_cls;
710
711 GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES,
712 &slave_cleanup, slv);
629} 713}
630 714
631 715
@@ -796,6 +880,7 @@ GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel,
796 uint64_t start_message_id, 880 uint64_t start_message_id,
797 uint64_t end_message_id, 881 uint64_t end_message_id,
798 GNUNET_PSYC_MessageCallback message_cb, 882 GNUNET_PSYC_MessageCallback message_cb,
883 GNUNET_PSYC_MessagePartCallback message_part_cb,
799 GNUNET_PSYC_FinishCallback finish_cb, 884 GNUNET_PSYC_FinishCallback finish_cb,
800 void *cls) 885 void *cls)
801{ 886{
diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c
index 8ab0bdad2..6177976de 100644
--- a/src/psyc/psyc_util_lib.c
+++ b/src/psyc/psyc_util_lib.c
@@ -76,7 +76,11 @@ struct GNUNET_PSYC_TransmitHandle
76 * 76 *
77 */ 77 */
78 const char *mod_value; 78 const char *mod_value;
79 size_t mod_value_size; 79
80 /**
81 * Number of bytes remaining to be transmitted from the current modifier value.
82 */
83 uint32_t mod_value_remaining;
80 84
81 /** 85 /**
82 * State of the current message being received from client. 86 * State of the current message being received from client.
@@ -104,14 +108,14 @@ struct GNUNET_PSYC_TransmitHandle
104struct GNUNET_PSYC_ReceiveHandle 108struct GNUNET_PSYC_ReceiveHandle
105{ 109{
106 /** 110 /**
107 * Message part callback. 111 * Message callback.
108 */ 112 */
109 GNUNET_PSYC_MessageCallback message_cb; 113 GNUNET_PSYC_MessageCallback message_cb;
110 114
111 /** 115 /**
112 * Message part callback for historic message. 116 * Message part callback.
113 */ 117 */
114 GNUNET_PSYC_MessageCallback hist_message_cb; 118 GNUNET_PSYC_MessagePartCallback message_part_cb;
115 119
116 /** 120 /**
117 * Closure for the callbacks. 121 * Closure for the callbacks.
@@ -149,6 +153,7 @@ struct GNUNET_PSYC_ReceiveHandle
149 uint32_t mod_value_size; 153 uint32_t mod_value_size;
150}; 154};
151 155
156
152/**** Messages ****/ 157/**** Messages ****/
153 158
154 159
@@ -167,7 +172,7 @@ struct GNUNET_PSYC_ReceiveHandle
167 * @return Message header with size information, 172 * @return Message header with size information,
168 * followed by the message parts. 173 * followed by the message parts.
169 */ 174 */
170struct GNUNET_MessageHeader * 175struct GNUNET_PSYC_Message *
171GNUNET_PSYC_message_create (const char *method_name, 176GNUNET_PSYC_message_create (const char *method_name,
172 const struct GNUNET_ENV_Environment *env, 177 const struct GNUNET_ENV_Environment *env,
173 const void *data, 178 const void *data,
@@ -188,7 +193,7 @@ GNUNET_PSYC_message_create (const char *method_name,
188 } 193 }
189 } 194 }
190 195
191 struct GNUNET_MessageHeader *msg; 196 struct GNUNET_PSYC_Message *msg;
192 uint16_t method_name_size = strlen (method_name) + 1; 197 uint16_t method_name_size = strlen (method_name) + 1;
193 if (method_name_size == 1) 198 if (method_name_size == 1)
194 return NULL; 199 return NULL;
@@ -199,12 +204,13 @@ GNUNET_PSYC_message_create (const char *method_name,
199 + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0)/* data */ 204 + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0)/* data */
200 + sizeof (*pmsg); /* end of message */ 205 + sizeof (*pmsg); /* end of message */
201 msg = GNUNET_malloc (msg_size); 206 msg = GNUNET_malloc (msg_size);
202 msg->size = htons (msg_size); 207 msg->header.size = htons (msg_size);
203 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 208 msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
204 209
205 pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1]; 210 pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
211 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
206 pmeth->header.size = htons (sizeof (*pmeth) + method_name_size); 212 pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
207 memcpy (pmeth, method_name, method_name_size); 213 memcpy (&pmeth[1], method_name, method_name_size);
208 214
209 uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size; 215 uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
210 if (NULL != env) 216 if (NULL != env)
@@ -215,7 +221,7 @@ GNUNET_PSYC_message_create (const char *method_name,
215 uint16_t mod_name_size = strlen (mod->name) + 1; 221 uint16_t mod_name_size = strlen (mod->name) + 1;
216 pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p); 222 pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
217 pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); 223 pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
218 pmod->header.size = sizeof (*pmod) + mod_name_size + 1 + mod->value_size; 224 pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
219 p += pmod->header.size; 225 p += pmod->header.size;
220 pmod->header.size = htons (pmod->header.size); 226 pmod->header.size = htons (pmod->header.size);
221 227
@@ -241,6 +247,7 @@ GNUNET_PSYC_message_create (const char *method_name,
241 pmsg->size = htons (sizeof (*pmsg)); 247 pmsg->size = htons (sizeof (*pmsg));
242 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); 248 pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
243 249
250 GNUNET_assert (p + sizeof (*pmsg) == msg_size);
244 return msg; 251 return msg;
245} 252}
246 253
@@ -276,8 +283,8 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
276 uint16_t name_size = ntohs (mod->name_size); 283 uint16_t name_size = ntohs (mod->name_size);
277 char oper = ' ' < mod->oper ? mod->oper : ' '; 284 char oper = ' ' < mod->oper ? mod->oper : ' ';
278 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1], 285 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
279 size - sizeof (*mod) - name_size - 1, 286 size - sizeof (*mod) - name_size,
280 ((char *) &mod[1]) + name_size + 1); 287 ((char *) &mod[1]) + name_size);
281 break; 288 break;
282 } 289 }
283 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: 290 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
@@ -331,7 +338,7 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
331 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0; 338 uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
332 339
333 LOG (GNUNET_ERROR_TYPE_DEBUG, 340 LOG (GNUNET_ERROR_TYPE_DEBUG,
334 "Queueing message of type %u and size %u (end: %u)).\n", 341 "Queueing message part of type %u and size %u (end: %u)).\n",
335 ntohs (msg->type), size, end); 342 ntohs (msg->type), size, end);
336 343
337 if (NULL != tmit->msg) 344 if (NULL != tmit->msg)
@@ -396,6 +403,9 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
396 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); 403 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
397 404
398 int notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); 405 int notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
406 LOG (GNUNET_ERROR_TYPE_DEBUG,
407 "transmit_data (ret: %d, size: %u): %.*s\n",
408 notify_ret, data_size, data_size, &msg[1]);
399 switch (notify_ret) 409 switch (notify_ret)
400 { 410 {
401 case GNUNET_NO: 411 case GNUNET_NO:
@@ -463,9 +473,15 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
463 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); 473 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
464 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], 474 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
465 &mod->oper, &mod->value_size); 475 &mod->oper, &mod->value_size);
466 mod->name_size = strnlen ((char *) &mod[1], data_size); 476
477 mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
478 LOG (GNUNET_ERROR_TYPE_DEBUG,
479 "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
480 notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
467 if (mod->name_size < data_size) 481 if (mod->name_size < data_size)
468 { 482 {
483 tmit->mod_value_remaining
484 = mod->value_size - (data_size - mod->name_size);
469 mod->value_size = htonl (mod->value_size); 485 mod->value_size = htonl (mod->value_size);
470 mod->name_size = htons (mod->name_size); 486 mod->name_size = htons (mod->name_size);
471 } 487 }
@@ -483,6 +499,10 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
483 msg->size = sizeof (struct GNUNET_MessageHeader); 499 msg->size = sizeof (struct GNUNET_MessageHeader);
484 notify_ret = tmit->notify_mod (tmit->notify_mod_cls, 500 notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
485 &data_size, &msg[1], NULL, NULL); 501 &data_size, &msg[1], NULL, NULL);
502 tmit->mod_value_remaining -= data_size;
503 LOG (GNUNET_ERROR_TYPE_DEBUG,
504 "transmit_mod (ret: %d, size: %u): %.*s\n",
505 notify_ret, data_size, data_size, &msg[1]);
486 break; 506 break;
487 } 507 }
488 default: 508 default:
@@ -497,26 +517,19 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
497 tmit->paused = GNUNET_YES; 517 tmit->paused = GNUNET_YES;
498 return; 518 return;
499 } 519 }
500 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT; 520 tmit->state
521 = (0 == tmit->mod_value_remaining)
522 ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
523 : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
501 break; 524 break;
502 525
503 case GNUNET_YES: 526 case GNUNET_YES: /* End of modifiers. */
504 if (0 == data_size) 527 GNUNET_assert (0 == tmit->mod_value_remaining);
505 {
506 /* End of modifiers. */
507 tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
508 if (0 == tmit->acks_pending)
509 transmit_data (tmit);
510
511 return;
512 }
513 tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
514 break; 528 break;
515 529
516 default: 530 default:
517 LOG (GNUNET_ERROR_TYPE_ERROR, 531 LOG (GNUNET_ERROR_TYPE_ERROR,
518 "TransmitNotifyModifier callback returned error " 532 "TransmitNotifyModifier callback returned with error.\n");
519 "when requesting a modifier.\n");
520 533
521 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; 534 tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
522 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 535 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
@@ -533,7 +546,16 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
533 transmit_queue_insert (tmit, msg, GNUNET_NO); 546 transmit_queue_insert (tmit, msg, GNUNET_NO);
534 } 547 }
535 548
536 transmit_mod (tmit); 549 if (GNUNET_YES == notify_ret)
550 {
551 tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
552 if (0 == tmit->acks_pending)
553 transmit_data (tmit);
554 }
555 else
556 {
557 transmit_mod (tmit);
558 }
537} 559}
538 560
539 561
@@ -547,9 +569,10 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
547 size_t value_size = 0; 569 size_t value_size = 0;
548 const char *value = NULL; 570 const char *value = NULL;
549 571
550 if (NULL != oper && NULL != tmit->mod) 572 if (NULL != oper)
551 { /* New modifier */ 573 { /* New modifier */
552 tmit->mod = tmit->mod->next; 574 if (NULL != tmit->mod)
575 tmit->mod = tmit->mod->next;
553 if (NULL == tmit->mod) 576 if (NULL == tmit->mod)
554 { /* No more modifiers, continue with data */ 577 { /* No more modifiers, continue with data */
555 *data_size = 0; 578 *data_size = 0;
@@ -559,30 +582,29 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
559 GNUNET_assert (tmit->mod->value_size < UINT32_MAX); 582 GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
560 *full_value_size = tmit->mod->value_size; 583 *full_value_size = tmit->mod->value_size;
561 *oper = tmit->mod->oper; 584 *oper = tmit->mod->oper;
562 name_size = strlen (tmit->mod->name); 585 name_size = strlen (tmit->mod->name) + 1;
563 586
564 if (name_size + 1 + tmit->mod->value_size <= *data_size) 587 if (name_size + tmit->mod->value_size <= *data_size)
565 { 588 {
566 *data_size = name_size + 1 + tmit->mod->value_size; 589 *data_size = name_size + tmit->mod->value_size;
567 } 590 }
568 else 591 else
569 { 592 {
570 tmit->mod_value_size = tmit->mod->value_size; 593 value_size = *data_size - name_size;
571 value_size = *data_size - name_size - 1;
572 tmit->mod_value_size -= value_size;
573 tmit->mod_value = tmit->mod->value + value_size; 594 tmit->mod_value = tmit->mod->value + value_size;
574 } 595 }
575 596
576 memcpy (data, tmit->mod->name, name_size); 597 memcpy (data, tmit->mod->name, name_size);
577 ((char *)data)[name_size] = '\0'; 598 memcpy ((char *)data + name_size, tmit->mod->value, value_size);
578 memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size); 599 return GNUNET_NO;
579 } 600 }
580 else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size) 601 else
581 { /* Modifier continuation */ 602 { /* Modifier continuation */
603 GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
582 value = tmit->mod_value; 604 value = tmit->mod_value;
583 if (tmit->mod_value_size <= *data_size) 605 if (tmit->mod_value_remaining <= *data_size)
584 { 606 {
585 value_size = tmit->mod_value_size; 607 value_size = tmit->mod_value_remaining;
586 tmit->mod_value = NULL; 608 tmit->mod_value = NULL;
587 } 609 }
588 else 610 else
@@ -590,7 +612,6 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
590 value_size = *data_size; 612 value_size = *data_size;
591 tmit->mod_value += value_size; 613 tmit->mod_value += value_size;
592 } 614 }
593 tmit->mod_value_size -= value_size;
594 615
595 if (*data_size < value_size) 616 if (*data_size < value_size)
596 { 617 {
@@ -603,9 +624,8 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
603 624
604 *data_size = value_size; 625 *data_size = value_size;
605 memcpy (data, value, value_size); 626 memcpy (data, value, value_size);
627 return (tmit->mod_value = NULL) ? GNUNET_YES : GNUNET_NO;
606 } 628 }
607
608 return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO;
609} 629}
610 630
611 631
@@ -663,10 +683,16 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
663 { 683 {
664 tmit->notify_mod = &transmit_notify_env; 684 tmit->notify_mod = &transmit_notify_env;
665 tmit->notify_mod_cls = tmit; 685 tmit->notify_mod_cls = tmit;
666 tmit->mod 686 if (NULL != env)
667 = (NULL != env) 687 {
668 ? GNUNET_ENV_environment_head (env) 688 struct GNUNET_ENV_Modifier mod = {};
669 : NULL; 689 mod.next = GNUNET_ENV_environment_head (env);
690 tmit->mod = &mod;
691 }
692 else
693 {
694 tmit->mod = NULL;
695 }
670 } 696 }
671 697
672 transmit_mod (tmit); 698 transmit_mod (tmit);
@@ -762,12 +788,12 @@ GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
762 */ 788 */
763struct GNUNET_PSYC_ReceiveHandle * 789struct GNUNET_PSYC_ReceiveHandle *
764GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb, 790GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
765 GNUNET_PSYC_MessageCallback hist_message_cb, 791 GNUNET_PSYC_MessagePartCallback message_part_cb,
766 void *cb_cls) 792 void *cb_cls)
767{ 793{
768 struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv)); 794 struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
769 recv->message_cb = message_cb; 795 recv->message_cb = message_cb;
770 recv->hist_message_cb = hist_message_cb; 796 recv->message_part_cb = message_part_cb;
771 recv->cb_cls = cb_cls; 797 recv->cb_cls = cb_cls;
772 return recv; 798 return recv;
773} 799}
@@ -800,13 +826,11 @@ GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
800static void 826static void
801recv_error (struct GNUNET_PSYC_ReceiveHandle *recv) 827recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
802{ 828{
803 GNUNET_PSYC_MessageCallback message_cb 829 if (NULL != recv->message_part_cb)
804 = recv->flags & GNUNET_PSYC_MESSAGE_HISTORIC 830 recv->message_part_cb (recv->cb_cls, recv->message_id, 0, recv->flags, NULL);
805 ? recv->hist_message_cb
806 : recv->message_cb;
807 831
808 if (NULL != message_cb) 832 if (NULL != recv->message_cb)
809 message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL); 833 recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
810 834
811 GNUNET_PSYC_receive_reset (recv); 835 GNUNET_PSYC_receive_reset (recv);
812} 836}
@@ -827,6 +851,7 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
827{ 851{
828 uint16_t size = ntohs (msg->header.size); 852 uint16_t size = ntohs (msg->header.size);
829 uint32_t flags = ntohl (msg->flags); 853 uint32_t flags = ntohl (msg->flags);
854 uint64_t message_id;
830 855
831 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, 856 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
832 (struct GNUNET_MessageHeader *) msg); 857 (struct GNUNET_MessageHeader *) msg);
@@ -858,6 +883,7 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
858 recv_error (recv); 883 recv_error (recv);
859 return GNUNET_SYSERR; 884 return GNUNET_SYSERR;
860 } 885 }
886 message_id = recv->message_id;
861 887
862 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; 888 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
863 889
@@ -964,10 +990,10 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
964 990
965 uint16_t name_size = ntohs (mod->name_size); 991 uint16_t name_size = ntohs (mod->name_size);
966 recv->mod_value_size_expected = ntohl (mod->value_size); 992 recv->mod_value_size_expected = ntohl (mod->value_size);
967 recv->mod_value_size = psize - sizeof (*mod) - name_size - 1; 993 recv->mod_value_size = psize - sizeof (*mod) - name_size;
968 994
969 if (psize < sizeof (*mod) + name_size + 1 995 if (psize < sizeof (*mod) + name_size
970 || '\0' != *((char *) &mod[1] + name_size) 996 || '\0' != *((char *) &mod[1] + name_size - 1)
971 || recv->mod_value_size_expected < recv->mod_value_size) 997 || recv->mod_value_size_expected < recv->mod_value_size)
972 { 998 {
973 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n"); 999 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
@@ -1018,13 +1044,9 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
1018 } 1044 }
1019 } 1045 }
1020 1046
1021 GNUNET_PSYC_MessageCallback message_cb 1047 if (NULL != recv->message_part_cb)
1022 = recv->flags & GNUNET_PSYC_MESSAGE_HISTORIC 1048 recv->message_part_cb (recv->cb_cls, recv->message_id, 0, // FIXME: data_offset
1023 ? recv->hist_message_cb 1049 recv->flags, pmsg);
1024 : recv->message_cb;
1025
1026 if (NULL != message_cb)
1027 message_cb (recv->cb_cls, recv->message_id, recv->flags, pmsg);
1028 1050
1029 switch (ptype) 1051 switch (ptype)
1030 { 1052 {
@@ -1034,6 +1056,9 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
1034 break; 1056 break;
1035 } 1057 }
1036 } 1058 }
1059
1060 if (NULL != recv->message_cb)
1061 recv->message_cb (recv->cb_cls, message_id, flags, msg);
1037 return GNUNET_OK; 1062 return GNUNET_OK;
1038} 1063}
1039 1064
@@ -1063,6 +1088,7 @@ GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1063 for (pos = 0; pos < data_size; pos += psize, parts++) 1088 for (pos = 0; pos < data_size; pos += psize, parts++)
1064 { 1089 {
1065 pmsg = (const struct GNUNET_MessageHeader *) (data + pos); 1090 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
1091 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
1066 psize = ntohs (pmsg->size); 1092 psize = ntohs (pmsg->size);
1067 ptype = ntohs (pmsg->type); 1093 ptype = ntohs (pmsg->type);
1068 if (0 == parts && NULL != first_ptype) 1094 if (0 == parts && NULL != first_ptype)
@@ -1084,3 +1110,111 @@ GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
1084 } 1110 }
1085 return parts; 1111 return parts;
1086} 1112}
1113
1114
1115struct ParseMessageClosure
1116{
1117 struct GNUNET_ENV_Environment *env;
1118 const char **method_name;
1119 const void **data;
1120 uint16_t *data_size;
1121 enum GNUNET_PSYC_MessageState msg_state;
1122};
1123
1124
1125static void
1126parse_message_part_cb (void *cls, uint64_t message_id, uint64_t data_offset,
1127 uint32_t flags, const struct GNUNET_MessageHeader *msg)
1128{
1129 struct ParseMessageClosure *pmc = cls;
1130 if (NULL == msg)
1131 {
1132 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1133 return;
1134 }
1135
1136 switch (ntohs (msg->type))
1137 {
1138 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
1139 {
1140 struct GNUNET_PSYC_MessageMethod *
1141 pmeth = (struct GNUNET_PSYC_MessageMethod *) msg;
1142 *pmc->method_name = (const char *) &pmeth[1];
1143 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
1144 break;
1145 }
1146
1147 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
1148 {
1149 struct GNUNET_PSYC_MessageModifier *
1150 pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
1151
1152 const char *name = (const char *) &pmod[1];
1153 const void *value = name + pmod->name_size;
1154 GNUNET_ENV_environment_add (pmc->env, pmod->oper, name, value,
1155 pmod->value_size);
1156 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
1157 break;
1158 }
1159
1160 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
1161 *pmc->data = &msg[1];
1162 *pmc->data_size = ntohs (msg->size) - sizeof (*msg);
1163 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
1164 break;
1165
1166 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
1167 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
1168 break;
1169
1170 default:
1171 pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
1172 }
1173}
1174
1175
1176/**
1177 * Parse PSYC message.
1178 *
1179 * @param msg
1180 * The PSYC message to parse.
1181 * @param[out] method_name
1182 * Pointer to the method name inside @a pmsg.
1183 * @param env
1184 * The environment for the message with a list of modifiers.
1185 * @param[out] data
1186 * Pointer to data inside @a pmsg.
1187 * @param[out] data_size
1188 * Size of @data is written here.
1189 *
1190 * @return #GNUNET_OK on success,
1191 * #GNUNET_SYSERR on parse error.
1192 */
1193int
1194GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_Message *msg,
1195 const char **method_name,
1196 struct GNUNET_ENV_Environment *env,
1197 const void **data,
1198 uint16_t *data_size)
1199{
1200 struct ParseMessageClosure cls;
1201 cls.env = env;
1202 cls.method_name = method_name;
1203 cls.data = data;
1204 cls.data_size = data_size;
1205
1206 uint16_t msg_size = ntohs (msg->header.size);
1207 struct GNUNET_PSYC_MessageHeader *
1208 pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
1209 memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
1210
1211 struct GNUNET_PSYC_ReceiveHandle *
1212 recv = GNUNET_PSYC_receive_create (NULL, &parse_message_part_cb, &cls);
1213 GNUNET_PSYC_receive_message (recv, pmsg);
1214 GNUNET_PSYC_receive_destroy (recv);
1215 GNUNET_free (pmsg);
1216
1217 return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
1218 ? GNUNET_OK
1219 : GNUNET_SYSERR;
1220}
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 0077bc9b7..495e3be47 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -62,8 +62,6 @@ static struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key;
62static struct GNUNET_CRYPTO_EddsaPublicKey channel_pub_key; 62static struct GNUNET_CRYPTO_EddsaPublicKey channel_pub_key;
63static struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; 63static struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
64 64
65struct GNUNET_PSYC_MasterTransmitHandle *mth;
66
67struct TransmitClosure 65struct TransmitClosure
68{ 66{
69 struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit; 67 struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit;
@@ -95,6 +93,28 @@ static void
95master_transmit (); 93master_transmit ();
96 94
97 95
96void master_stopped (void *cls)
97{
98 if (NULL != tmit)
99 {
100 GNUNET_ENV_environment_destroy (tmit->env);
101 GNUNET_free (tmit);
102 tmit = NULL;
103 }
104 GNUNET_SCHEDULER_shutdown ();
105}
106
107void slave_parted (void *cls)
108{
109 if (NULL != mst)
110 {
111 GNUNET_PSYC_master_stop (mst, GNUNET_NO, &master_stopped, NULL);
112 mst = NULL;
113 }
114 else
115 master_stopped (NULL);
116}
117
98/** 118/**
99 * Clean up all resources used. 119 * Clean up all resources used.
100 */ 120 */
@@ -103,21 +123,11 @@ cleanup ()
103{ 123{
104 if (NULL != slv) 124 if (NULL != slv)
105 { 125 {
106 GNUNET_PSYC_slave_part (slv); 126 GNUNET_PSYC_slave_part (slv, GNUNET_NO, &slave_parted, NULL);
107 slv = NULL; 127 slv = NULL;
108 } 128 }
109 if (NULL != mst) 129 else
110 { 130 slave_parted (NULL);
111 GNUNET_PSYC_master_stop (mst);
112 mst = NULL;
113 }
114 if (NULL != tmit)
115 {
116 GNUNET_ENV_environment_destroy (tmit->env);
117 GNUNET_free (tmit);
118 tmit = NULL;
119 }
120 GNUNET_SCHEDULER_shutdown ();
121} 131}
122 132
123 133
@@ -171,7 +181,20 @@ end ()
171 181
172static void 182static void
173master_message_cb (void *cls, uint64_t message_id, uint32_t flags, 183master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
174 const struct GNUNET_MessageHeader *msg) 184 const struct GNUNET_PSYC_MessageHeader *msg)
185{
186 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
187 "Master got PSYC message fragment of size %u "
188 "belonging to message ID %llu with flags %x\n",
189 ntohs (msg->header.size), message_id, flags);
190 // FIXME
191}
192
193
194static void
195master_message_part_cb (void *cls, uint64_t message_id,
196 uint64_t data_offset, uint32_t flags,
197 const struct GNUNET_MessageHeader *msg)
175{ 198{
176 if (NULL == msg) 199 if (NULL == msg)
177 { 200 {
@@ -215,7 +238,20 @@ master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
215 238
216static void 239static void
217slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, 240slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
218 const struct GNUNET_MessageHeader *msg) 241 const struct GNUNET_PSYC_MessageHeader *msg)
242{
243 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
244 "Slave got PSYC message fragment of size %u "
245 "belonging to message ID %llu with flags %x\n",
246 ntohs (msg->header.size), message_id, flags);
247 // FIXME
248}
249
250
251static void
252slave_message_part_cb (void *cls, uint64_t message_id,
253 uint64_t data_offset, uint32_t flags,
254 const struct GNUNET_MessageHeader *msg)
219{ 255{
220 if (NULL == msg) 256 if (NULL == msg)
221 { 257 {
@@ -371,7 +407,7 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
371 memcpy (data, value, value_size); 407 memcpy (data, value, value_size);
372 } 408 }
373 409
374 return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO; 410 return GNUNET_NO;
375} 411}
376 412
377 413
@@ -380,8 +416,10 @@ slave_join ();
380 416
381 417
382static void 418static void
383join_decision_cb (void *cls, int is_admitted, 419join_decision_cb (void *cls,
384 const struct GNUNET_PSYC_MessageHeader *join_msg) 420 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn,
421 int is_admitted,
422 const struct GNUNET_PSYC_Message *join_msg)
385{ 423{
386 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 424 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
387 "Slave got join decision: %d\n", is_admitted); 425 "Slave got join decision: %d\n", is_admitted);
@@ -415,8 +453,10 @@ join_decision_cb (void *cls, int is_admitted,
415 453
416 454
417static void 455static void
418join_request_cb (void *cls, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 456join_request_cb (void *cls,
419 const struct GNUNET_PSYC_MessageHeader *msg, 457 const struct GNUNET_PSYC_JoinRequestMessage *req,
458 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
459 const struct GNUNET_PSYC_Message *join_msg,
420 struct GNUNET_PSYC_JoinHandle *jh) 460 struct GNUNET_PSYC_JoinHandle *jh)
421{ 461{
422 struct GNUNET_HashCode slave_key_hash; 462 struct GNUNET_HashCode slave_key_hash;
@@ -450,11 +490,11 @@ slave_join ()
450 "_foo", "bar baz", 7); 490 "_foo", "bar baz", 7);
451 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, 491 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
452 "_foo_bar", "foo bar baz", 11); 492 "_foo_bar", "foo bar baz", 11);
453 struct GNUNET_MessageHeader * 493 struct GNUNET_PSYC_Message *
454 join_msg = GNUNET_PSYC_message_create ("_request_join", env, "some data", 9); 494 join_msg = GNUNET_PSYC_message_create ("_request_join", env, "some data", 9);
455 495
456 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, 496 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, 0, NULL,
457 0, NULL, &slave_message_cb, 497 &slave_message_cb, &slave_message_part_cb,
458 &slave_connect_cb, &join_decision_cb, NULL, 498 &slave_connect_cb, &join_decision_cb, NULL,
459 join_msg); 499 join_msg);
460 GNUNET_ENV_environment_destroy (env); 500 GNUNET_ENV_environment_destroy (env);
@@ -551,7 +591,8 @@ run (void *cls,
551 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); 591 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
552 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, 592 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE,
553 &master_start_cb, &join_request_cb, 593 &master_start_cb, &join_request_cb,
554 &master_message_cb, NULL); 594 &master_message_cb, &master_message_part_cb,
595 NULL);
555} 596}
556 597
557 598