diff options
author | Gabor X Toth <*@tg-x.net> | 2014-07-23 16:19:49 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-07-23 16:19:49 +0000 |
commit | 3cf8ba0b60f8495892fa76635e9c23555d0a304c (patch) | |
tree | 5f27648bdb3cf3409628e4e5edc26f811cbd03a5 /src/psyc | |
parent | 252b5599987b7ba03b879a8c2d1c455ad4c9834a (diff) | |
download | gnunet-3cf8ba0b60f8495892fa76635e9c23555d0a304c.tar.gz gnunet-3cf8ba0b60f8495892fa76635e9c23555d0a304c.zip |
social: implement enter/leave/messaging; psyc: improvements and fixes
- social: implement enter/leave, send/receive messages, slicer
- psyc, social: add struct GNUNET_PSYC_Message for single-fragment join messages
- psyc: add message callback in addition to message part callback
- client_manager, social, psyc, multicast: add disconnect callback
Diffstat (limited to 'src/psyc')
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 112 | ||||
-rw-r--r-- | src/psyc/psyc.h | 16 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 133 | ||||
-rw-r--r-- | src/psyc/psyc_util_lib.c | 268 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 93 |
5 files changed, 456 insertions, 166 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 866275a79..8fc080baf 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -168,7 +168,7 @@ struct FragmentQueue | |||
168 | * Is the message queued for delivery to the client? | 168 | * Is the message queued for delivery to the client? |
169 | * i.e. added to the recv_msgs queue | 169 | * i.e. added to the recv_msgs queue |
170 | */ | 170 | */ |
171 | uint8_t queued; | 171 | uint8_t is_queued; |
172 | }; | 172 | }; |
173 | 173 | ||
174 | 174 | ||
@@ -382,7 +382,7 @@ struct Slave | |||
382 | /** | 382 | /** |
383 | * Join request to be transmitted to the master on join. | 383 | * Join request to be transmitted to the master on join. |
384 | */ | 384 | */ |
385 | struct GNUNET_MessageHeader *join_req; | 385 | struct GNUNET_PSYC_Message *join_msg; |
386 | 386 | ||
387 | /** | 387 | /** |
388 | * Join decision received from multicast. | 388 | * Join decision received from multicast. |
@@ -435,7 +435,7 @@ cleanup_master (struct Master *mst) | |||
435 | struct Channel *chn = &mst->chn; | 435 | struct Channel *chn = &mst->chn; |
436 | 436 | ||
437 | if (NULL != mst->origin) | 437 | if (NULL != mst->origin) |
438 | GNUNET_MULTICAST_origin_stop (mst->origin); | 438 | GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME |
439 | GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); | 439 | GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); |
440 | GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn); | 440 | GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn); |
441 | } | 441 | } |
@@ -462,12 +462,21 @@ cleanup_slave (struct Slave *slv) | |||
462 | } | 462 | } |
463 | GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); | 463 | GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); |
464 | 464 | ||
465 | if (NULL != slv->join_req) | 465 | if (NULL != slv->join_msg) |
466 | GNUNET_free (slv->join_req); | 466 | { |
467 | GNUNET_free (slv->join_msg); | ||
468 | slv->join_msg = NULL; | ||
469 | } | ||
467 | if (NULL != slv->relays) | 470 | if (NULL != slv->relays) |
471 | { | ||
468 | GNUNET_free (slv->relays); | 472 | GNUNET_free (slv->relays); |
473 | slv->relays = NULL; | ||
474 | } | ||
469 | if (NULL != slv->member) | 475 | if (NULL != slv->member) |
470 | GNUNET_MULTICAST_member_part (slv->member); | 476 | { |
477 | GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME | ||
478 | slv->member = NULL; | ||
479 | } | ||
471 | GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn); | 480 | GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn); |
472 | } | 481 | } |
473 | 482 | ||
@@ -482,7 +491,10 @@ cleanup_channel (struct Channel *chn) | |||
482 | GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash); | 491 | GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash); |
483 | 492 | ||
484 | if (NULL != chn->store_op) | 493 | if (NULL != chn->store_op) |
494 | { | ||
485 | GNUNET_PSYCSTORE_operation_cancel (chn->store_op); | 495 | GNUNET_PSYCSTORE_operation_cancel (chn->store_op); |
496 | chn->store_op = NULL; | ||
497 | } | ||
486 | 498 | ||
487 | (GNUNET_YES == chn->is_master) | 499 | (GNUNET_YES == chn->is_master) |
488 | ? cleanup_master ((struct Master *) chn) | 500 | ? cleanup_master ((struct Master *) chn) |
@@ -574,7 +586,7 @@ struct JoinMemTestClosure | |||
574 | struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; | 586 | struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; |
575 | struct Channel *chn; | 587 | struct Channel *chn; |
576 | struct GNUNET_MULTICAST_JoinHandle *jh; | 588 | struct GNUNET_MULTICAST_JoinHandle *jh; |
577 | struct MasterJoinRequest *master_join_req; | 589 | struct GNUNET_PSYC_JoinRequestMessage *join_msg; |
578 | }; | 590 | }; |
579 | 591 | ||
580 | 592 | ||
@@ -594,14 +606,14 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg) | |||
594 | &slave_key_hash); | 606 | &slave_key_hash); |
595 | GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh, | 607 | GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh, |
596 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 608 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
597 | client_send_msg (jcls->chn, &jcls->master_join_req->header); | 609 | client_send_msg (jcls->chn, &jcls->join_msg->header); |
598 | } | 610 | } |
599 | else | 611 | else |
600 | { | 612 | { |
601 | // FIXME: add relays | 613 | // FIXME: add relays |
602 | GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL); | 614 | GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL); |
603 | } | 615 | } |
604 | GNUNET_free (jcls->master_join_req); | 616 | GNUNET_free (jcls->join_msg); |
605 | GNUNET_free (jcls); | 617 | GNUNET_free (jcls); |
606 | } | 618 | } |
607 | 619 | ||
@@ -633,7 +645,8 @@ mcast_recv_join_request (void *cls, | |||
633 | } | 645 | } |
634 | } | 646 | } |
635 | 647 | ||
636 | struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_msg_size); | 648 | struct GNUNET_PSYC_JoinRequestMessage * |
649 | req = GNUNET_malloc (sizeof (*req) + join_msg_size); | ||
637 | req->header.size = htons (sizeof (*req) + join_msg_size); | 650 | req->header.size = htons (sizeof (*req) + join_msg_size); |
638 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST); | 651 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST); |
639 | req->slave_key = *slave_key; | 652 | req->slave_key = *slave_key; |
@@ -644,7 +657,7 @@ mcast_recv_join_request (void *cls, | |||
644 | jcls->slave_key = *slave_key; | 657 | jcls->slave_key = *slave_key; |
645 | jcls->chn = chn; | 658 | jcls->chn = chn; |
646 | jcls->jh = jh; | 659 | jcls->jh = jh; |
647 | jcls->master_join_req = req; | 660 | jcls->join_msg = req; |
648 | 661 | ||
649 | GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key, | 662 | GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key, |
650 | chn->max_message_id, 0, | 663 | chn->max_message_id, 0, |
@@ -780,6 +793,7 @@ client_send_mcast_msg (struct Channel *chn, | |||
780 | pmsg->header.size = htons (psize); | 793 | pmsg->header.size = htons (psize); |
781 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | 794 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); |
782 | pmsg->message_id = mmsg->message_id; | 795 | pmsg->message_id = mmsg->message_id; |
796 | pmsg->fragment_offset = mmsg->fragment_offset; | ||
783 | 797 | ||
784 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); | 798 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); |
785 | client_send_msg (chn, &pmsg->header); | 799 | client_send_msg (chn, &pmsg->header); |
@@ -810,6 +824,7 @@ client_send_mcast_req (struct Master *mst, | |||
810 | pmsg->header.size = htons (psize); | 824 | pmsg->header.size = htons (psize); |
811 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | 825 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); |
812 | pmsg->message_id = req->request_id; | 826 | pmsg->message_id = req->request_id; |
827 | pmsg->fragment_offset = req->fragment_offset; | ||
813 | pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); | 828 | pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); |
814 | 829 | ||
815 | memcpy (&pmsg[1], &req[1], size - sizeof (*req)); | 830 | memcpy (&pmsg[1], &req[1], size - sizeof (*req)); |
@@ -870,11 +885,12 @@ fragment_queue_insert (struct Channel *chn, | |||
870 | { | 885 | { |
871 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 886 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
872 | "%p Adding message fragment to cache. " | 887 | "%p Adding message fragment to cache. " |
873 | "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", " | 888 | "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n", |
874 | "header_size: %" PRIu64 " + %u).\n", | ||
875 | chn, GNUNET_ntohll (mmsg->message_id), | 889 | chn, GNUNET_ntohll (mmsg->message_id), |
876 | GNUNET_ntohll (mmsg->fragment_id), | 890 | GNUNET_ntohll (mmsg->fragment_id)); |
877 | fragq->header_size, size); | 891 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
892 | "%p header_size: %" PRIu64 " + %u\n", | ||
893 | chn, fragq->header_size, size); | ||
878 | cache_entry = GNUNET_new (struct RecvCacheEntry); | 894 | cache_entry = GNUNET_new (struct RecvCacheEntry); |
879 | cache_entry->ref_count = 1; | 895 | cache_entry->ref_count = 1; |
880 | cache_entry->mmsg = GNUNET_malloc (size); | 896 | cache_entry->mmsg = GNUNET_malloc (size); |
@@ -955,11 +971,11 @@ fragment_queue_insert (struct Channel *chn, | |||
955 | case MSG_FRAG_STATE_DATA: | 971 | case MSG_FRAG_STATE_DATA: |
956 | case MSG_FRAG_STATE_END: | 972 | case MSG_FRAG_STATE_END: |
957 | case MSG_FRAG_STATE_CANCEL: | 973 | case MSG_FRAG_STATE_CANCEL: |
958 | if (GNUNET_NO == fragq->queued) | 974 | if (GNUNET_NO == fragq->is_queued) |
959 | { | 975 | { |
960 | GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL, | 976 | GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL, |
961 | GNUNET_ntohll (mmsg->message_id)); | 977 | GNUNET_ntohll (mmsg->message_id)); |
962 | fragq->queued = GNUNET_YES; | 978 | fragq->is_queued = GNUNET_YES; |
963 | } | 979 | } |
964 | } | 980 | } |
965 | 981 | ||
@@ -1034,7 +1050,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id, | |||
1034 | if (MSG_FRAG_STATE_END <= fragq->state) | 1050 | if (MSG_FRAG_STATE_END <= fragq->state) |
1035 | { | 1051 | { |
1036 | struct GNUNET_HashCode msg_id_hash; | 1052 | struct GNUNET_HashCode msg_id_hash; |
1037 | hash_key_from_nll (&msg_id_hash, msg_id); | 1053 | hash_key_from_hll (&msg_id_hash, msg_id); |
1038 | 1054 | ||
1039 | GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq); | 1055 | GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq); |
1040 | GNUNET_CONTAINER_heap_destroy (fragq->fragments); | 1056 | GNUNET_CONTAINER_heap_destroy (fragq->fragments); |
@@ -1042,7 +1058,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id, | |||
1042 | } | 1058 | } |
1043 | else | 1059 | else |
1044 | { | 1060 | { |
1045 | fragq->queued = GNUNET_NO; | 1061 | fragq->is_queued = GNUNET_NO; |
1046 | } | 1062 | } |
1047 | } | 1063 | } |
1048 | 1064 | ||
@@ -1331,13 +1347,18 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, | |||
1331 | = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, | 1347 | = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, |
1332 | &slv->origin, | 1348 | &slv->origin, |
1333 | slv->relay_count, slv->relays, | 1349 | slv->relay_count, slv->relays, |
1334 | slv->join_req, | 1350 | &slv->join_msg->header, |
1335 | &mcast_recv_join_request, | 1351 | &mcast_recv_join_request, |
1336 | &mcast_recv_join_decision, | 1352 | &mcast_recv_join_decision, |
1337 | &mcast_recv_membership_test, | 1353 | &mcast_recv_membership_test, |
1338 | &mcast_recv_replay_fragment, | 1354 | &mcast_recv_replay_fragment, |
1339 | &mcast_recv_replay_message, | 1355 | &mcast_recv_replay_message, |
1340 | &mcast_recv_message, chn); | 1356 | &mcast_recv_message, chn); |
1357 | if (NULL != slv->join_msg) | ||
1358 | { | ||
1359 | GNUNET_free (slv->join_msg); | ||
1360 | slv->join_msg = NULL; | ||
1361 | } | ||
1341 | } | 1362 | } |
1342 | else | 1363 | else |
1343 | { | 1364 | { |
@@ -1435,6 +1456,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
1435 | { | 1456 | { |
1436 | const struct SlaveJoinRequest *req | 1457 | const struct SlaveJoinRequest *req |
1437 | = (const struct SlaveJoinRequest *) msg; | 1458 | = (const struct SlaveJoinRequest *) msg; |
1459 | uint16_t req_size = ntohs (req->header.size); | ||
1438 | 1460 | ||
1439 | struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key; | 1461 | struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key; |
1440 | struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash; | 1462 | struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash; |
@@ -1460,15 +1482,32 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
1460 | slv->pub_key_hash = slv_pub_key_hash; | 1482 | slv->pub_key_hash = slv_pub_key_hash; |
1461 | slv->origin = req->origin; | 1483 | slv->origin = req->origin; |
1462 | slv->relay_count = ntohl (req->relay_count); | 1484 | slv->relay_count = ntohl (req->relay_count); |
1485 | |||
1486 | const struct GNUNET_PeerIdentity * | ||
1487 | relays = (const struct GNUNET_PeerIdentity *) &req[1]; | ||
1488 | uint16_t relay_size = slv->relay_count * sizeof (*relays); | ||
1489 | uint16_t join_msg_size = 0; | ||
1490 | |||
1491 | if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader) | ||
1492 | <= req_size) | ||
1493 | { | ||
1494 | join_msg_size = ntohs (slv->join_msg->header.size); | ||
1495 | slv->join_msg = GNUNET_malloc (join_msg_size); | ||
1496 | memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size); | ||
1497 | } | ||
1498 | if (sizeof (*req) + relay_size + join_msg_size != req_size) | ||
1499 | { | ||
1500 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1501 | "%u + %u + %u != %u\n", | ||
1502 | sizeof (*req), relay_size, join_msg_size, req_size); | ||
1503 | GNUNET_break (0); | ||
1504 | GNUNET_SERVER_client_disconnect (client); | ||
1505 | return; | ||
1506 | } | ||
1463 | if (0 < slv->relay_count) | 1507 | if (0 < slv->relay_count) |
1464 | { | 1508 | { |
1465 | const struct GNUNET_PeerIdentity *relays | 1509 | slv->relays = GNUNET_malloc (relay_size); |
1466 | = (const struct GNUNET_PeerIdentity *) &req[1]; | 1510 | memcpy (slv->relays, &req[1], relay_size); |
1467 | slv->relays | ||
1468 | = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity)); | ||
1469 | uint32_t i; | ||
1470 | for (i = 0; i < slv->relay_count; i++) | ||
1471 | memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); | ||
1472 | } | 1511 | } |
1473 | 1512 | ||
1474 | chn = &slv->chn; | 1513 | chn = &slv->chn; |
@@ -1510,14 +1549,18 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
1510 | = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, | 1549 | = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, |
1511 | &slv->origin, | 1550 | &slv->origin, |
1512 | slv->relay_count, slv->relays, | 1551 | slv->relay_count, slv->relays, |
1513 | slv->join_req, | 1552 | &slv->join_msg->header, |
1514 | &mcast_recv_join_request, | 1553 | &mcast_recv_join_request, |
1515 | &mcast_recv_join_decision, | 1554 | &mcast_recv_join_decision, |
1516 | &mcast_recv_membership_test, | 1555 | &mcast_recv_membership_test, |
1517 | &mcast_recv_replay_fragment, | 1556 | &mcast_recv_replay_fragment, |
1518 | &mcast_recv_replay_message, | 1557 | &mcast_recv_replay_message, |
1519 | &mcast_recv_message, chn); | 1558 | &mcast_recv_message, chn); |
1520 | 1559 | if (NULL != slv->join_msg) | |
1560 | { | ||
1561 | GNUNET_free (slv->join_msg); | ||
1562 | slv->join_msg = NULL; | ||
1563 | } | ||
1521 | } | 1564 | } |
1522 | else if (NULL != slv->join_dcsn) | 1565 | else if (NULL != slv->join_dcsn) |
1523 | { | 1566 | { |
@@ -1549,13 +1592,14 @@ struct JoinDecisionClosure | |||
1549 | 1592 | ||
1550 | 1593 | ||
1551 | /** | 1594 | /** |
1552 | * Iterator callback for responding to join requests of a slave. | 1595 | * Iterator callback for sending join decisions to multicast. |
1553 | */ | 1596 | */ |
1554 | static int | 1597 | static int |
1555 | mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash, | 1598 | mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash, |
1556 | void *jh) | 1599 | void *value) |
1557 | { | 1600 | { |
1558 | struct JoinDecisionClosure *jcls = cls; | 1601 | struct JoinDecisionClosure *jcls = cls; |
1602 | struct GNUNET_MULTICAST_JoinHandle *jh = value; | ||
1559 | // FIXME: add relays | 1603 | // FIXME: add relays |
1560 | GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg); | 1604 | GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg); |
1561 | return GNUNET_YES; | 1605 | return GNUNET_YES; |
@@ -1579,8 +1623,7 @@ client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client, | |||
1579 | struct JoinDecisionClosure jcls; | 1623 | struct JoinDecisionClosure jcls; |
1580 | jcls.is_admitted = ntohl (dcsn->is_admitted); | 1624 | jcls.is_admitted = ntohl (dcsn->is_admitted); |
1581 | jcls.msg | 1625 | jcls.msg |
1582 | = (sizeof (*dcsn) + sizeof (struct GNUNET_PSYC_MessageHeader) | 1626 | = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size)) |
1583 | <= ntohs (msg->size)) | ||
1584 | ? (struct GNUNET_MessageHeader *) &dcsn[1] | 1627 | ? (struct GNUNET_MessageHeader *) &dcsn[1] |
1585 | : NULL; | 1628 | : NULL; |
1586 | 1629 | ||
@@ -1901,6 +1944,9 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
1901 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | 1944 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
1902 | return; | 1945 | return; |
1903 | } | 1946 | } |
1947 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1948 | "%p Received message with first part type %u and last part type %u.\n", | ||
1949 | chn, first_ptype, last_ptype); | ||
1904 | 1950 | ||
1905 | queue_message (chn, client, size - sizeof (*msg), &msg[1], | 1951 | queue_message (chn, client, size - sizeof (*msg), &msg[1], |
1906 | first_ptype, last_ptype); | 1952 | first_ptype, last_ptype); |
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 995fb1fa4..82800a334 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h | |||
@@ -229,22 +229,6 @@ struct OperationResult | |||
229 | */ | 229 | */ |
230 | }; | 230 | }; |
231 | 231 | ||
232 | |||
233 | struct MasterJoinRequest | ||
234 | { | ||
235 | /** | ||
236 | * Type: GNUNET_MESSAGE_TYPE_PSYC_MASTER_JOIN_REQUEST | ||
237 | */ | ||
238 | struct GNUNET_MessageHeader header; | ||
239 | /** | ||
240 | * Public key of the joining slave. | ||
241 | */ | ||
242 | struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; | ||
243 | |||
244 | /* Followed by struct GNUNET_MessageHeader join_request */ | ||
245 | }; | ||
246 | |||
247 | |||
248 | GNUNET_NETWORK_STRUCT_END | 232 | GNUNET_NETWORK_STRUCT_END |
249 | 233 | ||
250 | #endif | 234 | #endif |
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 88b007a0f..a43b1ef5f 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -74,6 +74,16 @@ struct GNUNET_PSYC_Channel | |||
74 | struct GNUNET_MessageHeader *connect_msg; | 74 | struct GNUNET_MessageHeader *connect_msg; |
75 | 75 | ||
76 | /** | 76 | /** |
77 | * Function called after disconnected from the service. | ||
78 | */ | ||
79 | GNUNET_ContinuationCallback disconnect_cb; | ||
80 | |||
81 | /** | ||
82 | * Closure for @a disconnect_cb. | ||
83 | */ | ||
84 | void *disconnect_cls; | ||
85 | |||
86 | /** | ||
77 | * Are we polling for incoming messages right now? | 87 | * Are we polling for incoming messages right now? |
78 | */ | 88 | */ |
79 | uint8_t in_receive; | 89 | uint8_t in_receive; |
@@ -82,6 +92,12 @@ struct GNUNET_PSYC_Channel | |||
82 | * Is this a master or slave channel? | 92 | * Is this a master or slave channel? |
83 | */ | 93 | */ |
84 | uint8_t is_master; | 94 | uint8_t is_master; |
95 | |||
96 | /** | ||
97 | * Is this channel in the process of disconnecting from the service? | ||
98 | * #GNUNET_YES or #GNUNET_NO | ||
99 | */ | ||
100 | uint8_t is_disconnecting; | ||
85 | }; | 101 | }; |
86 | 102 | ||
87 | 103 | ||
@@ -232,19 +248,26 @@ master_recv_join_request (void *cls, | |||
232 | struct GNUNET_PSYC_Master * | 248 | struct GNUNET_PSYC_Master * |
233 | mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | 249 | mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, |
234 | sizeof (struct GNUNET_PSYC_Channel)); | 250 | sizeof (struct GNUNET_PSYC_Channel)); |
235 | 251 | if (NULL == mst->join_req_cb) | |
236 | const struct MasterJoinRequest *req = (const struct MasterJoinRequest *) msg; | 252 | return; |
237 | 253 | ||
238 | struct GNUNET_PSYC_MessageHeader *pmsg = NULL; | 254 | const struct GNUNET_PSYC_JoinRequestMessage * |
239 | if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*pmsg)) | 255 | req = (const struct GNUNET_PSYC_JoinRequestMessage *) msg; |
240 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &req[1]; | 256 | const struct GNUNET_PSYC_Message *join_msg = NULL; |
257 | if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) | ||
258 | { | ||
259 | join_msg = (struct GNUNET_PSYC_Message *) &req[1]; | ||
260 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
261 | "Received join_msg of type %u and size %u.\n", | ||
262 | ntohs (join_msg->header.type), ntohs (join_msg->header.size)); | ||
263 | } | ||
241 | 264 | ||
242 | struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); | 265 | struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); |
243 | jh->mst = mst; | 266 | jh->mst = mst; |
244 | jh->slave_key = req->slave_key; | 267 | jh->slave_key = req->slave_key; |
245 | 268 | ||
246 | if (NULL != mst->join_req_cb) | 269 | if (NULL != mst->join_req_cb) |
247 | mst->join_req_cb (mst->cb_cls, &req->slave_key, pmsg, jh); | 270 | mst->join_req_cb (mst->cb_cls, req, &req->slave_key, join_msg, jh); |
248 | } | 271 | } |
249 | 272 | ||
250 | 273 | ||
@@ -273,13 +296,12 @@ slave_recv_join_decision (void *cls, | |||
273 | const struct GNUNET_PSYC_JoinDecisionMessage * | 296 | const struct GNUNET_PSYC_JoinDecisionMessage * |
274 | dcsn = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg; | 297 | dcsn = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg; |
275 | 298 | ||
276 | struct GNUNET_PSYC_MessageHeader *pmsg = NULL; | 299 | struct GNUNET_PSYC_Message *pmsg = NULL; |
277 | if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) | 300 | if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) |
278 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1]; | 301 | pmsg = (struct GNUNET_PSYC_Message *) &dcsn[1]; |
279 | 302 | ||
280 | struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); | ||
281 | if (NULL != slv->join_dcsn_cb) | 303 | if (NULL != slv->join_dcsn_cb) |
282 | slv->join_dcsn_cb (slv->cb_cls, ntohl (dcsn->is_admitted), pmsg); | 304 | slv->join_dcsn_cb (slv->cb_cls, dcsn, ntohl (dcsn->is_admitted), pmsg); |
283 | } | 305 | } |
284 | 306 | ||
285 | 307 | ||
@@ -299,7 +321,7 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = | |||
299 | 321 | ||
300 | { &master_recv_join_request, NULL, | 322 | { &master_recv_join_request, NULL, |
301 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | 323 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, |
302 | sizeof (struct MasterJoinRequest), GNUNET_YES }, | 324 | sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, |
303 | 325 | ||
304 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | 326 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, |
305 | 327 | ||
@@ -331,6 +353,35 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = | |||
331 | }; | 353 | }; |
332 | 354 | ||
333 | 355 | ||
356 | static void | ||
357 | channel_cleanup (struct GNUNET_PSYC_Channel *chn) | ||
358 | { | ||
359 | GNUNET_PSYC_transmit_destroy (chn->tmit); | ||
360 | GNUNET_PSYC_receive_destroy (chn->recv); | ||
361 | GNUNET_free (chn->connect_msg); | ||
362 | if (NULL != chn->disconnect_cb) | ||
363 | chn->disconnect_cb (chn->disconnect_cls); | ||
364 | } | ||
365 | |||
366 | |||
367 | static void | ||
368 | master_cleanup (void *cls) | ||
369 | { | ||
370 | struct GNUNET_PSYC_Master *mst = cls; | ||
371 | channel_cleanup (&mst->chn); | ||
372 | GNUNET_free (mst); | ||
373 | } | ||
374 | |||
375 | |||
376 | static void | ||
377 | slave_cleanup (void *cls) | ||
378 | { | ||
379 | struct GNUNET_PSYC_Slave *slv = cls; | ||
380 | channel_cleanup (&slv->chn); | ||
381 | GNUNET_free (slv); | ||
382 | } | ||
383 | |||
384 | |||
334 | /** | 385 | /** |
335 | * Start a PSYC master channel. | 386 | * Start a PSYC master channel. |
336 | * | 387 | * |
@@ -367,6 +418,7 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
367 | GNUNET_PSYC_MasterStartCallback start_cb, | 418 | GNUNET_PSYC_MasterStartCallback start_cb, |
368 | GNUNET_PSYC_JoinRequestCallback join_request_cb, | 419 | GNUNET_PSYC_JoinRequestCallback join_request_cb, |
369 | GNUNET_PSYC_MessageCallback message_cb, | 420 | GNUNET_PSYC_MessageCallback message_cb, |
421 | GNUNET_PSYC_MessagePartCallback message_part_cb, | ||
370 | void *cls) | 422 | void *cls) |
371 | { | 423 | { |
372 | struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); | 424 | struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); |
@@ -390,7 +442,7 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
390 | GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn)); | 442 | GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn)); |
391 | 443 | ||
392 | chn->tmit = GNUNET_PSYC_transmit_create (chn->client); | 444 | chn->tmit = GNUNET_PSYC_transmit_create (chn->client); |
393 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls); | 445 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); |
394 | 446 | ||
395 | channel_send_connect_msg (chn); | 447 | channel_send_connect_msg (chn); |
396 | return mst; | 448 | return mst; |
@@ -404,10 +456,21 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
404 | * @param keep_active FIXME | 456 | * @param keep_active FIXME |
405 | */ | 457 | */ |
406 | void | 458 | void |
407 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst) | 459 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst, |
460 | int keep_active, | ||
461 | GNUNET_ContinuationCallback stop_cb, | ||
462 | void *stop_cls) | ||
408 | { | 463 | { |
409 | GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES); | 464 | struct GNUNET_PSYC_Channel *chn = &mst->chn; |
410 | GNUNET_free (mst); | 465 | |
466 | /* FIXME: send msg to service */ | ||
467 | |||
468 | chn->is_disconnecting = GNUNET_YES; | ||
469 | chn->disconnect_cb = stop_cb; | ||
470 | chn->disconnect_cls = stop_cls; | ||
471 | |||
472 | GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES, | ||
473 | &master_cleanup, mst); | ||
411 | } | 474 | } |
412 | 475 | ||
413 | 476 | ||
@@ -439,7 +502,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
439 | int is_admitted, | 502 | int is_admitted, |
440 | uint32_t relay_count, | 503 | uint32_t relay_count, |
441 | const struct GNUNET_PeerIdentity *relays, | 504 | const struct GNUNET_PeerIdentity *relays, |
442 | const struct GNUNET_PSYC_MessageHeader *join_resp) | 505 | const struct GNUNET_PSYC_Message *join_resp) |
443 | { | 506 | { |
444 | struct GNUNET_PSYC_Channel *chn = &jh->mst->chn; | 507 | struct GNUNET_PSYC_Channel *chn = &jh->mst->chn; |
445 | struct GNUNET_PSYC_JoinDecisionMessage *dcsn; | 508 | struct GNUNET_PSYC_JoinDecisionMessage *dcsn; |
@@ -461,6 +524,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
461 | memcpy (&dcsn[1], join_resp, join_resp_size); | 524 | memcpy (&dcsn[1], join_resp, join_resp_size); |
462 | 525 | ||
463 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header); | 526 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header); |
527 | GNUNET_free (jh); | ||
464 | return GNUNET_OK; | 528 | return GNUNET_OK; |
465 | } | 529 | } |
466 | 530 | ||
@@ -576,15 +640,19 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
576 | uint32_t relay_count, | 640 | uint32_t relay_count, |
577 | const struct GNUNET_PeerIdentity *relays, | 641 | const struct GNUNET_PeerIdentity *relays, |
578 | GNUNET_PSYC_MessageCallback message_cb, | 642 | GNUNET_PSYC_MessageCallback message_cb, |
643 | GNUNET_PSYC_MessagePartCallback message_part_cb, | ||
579 | GNUNET_PSYC_SlaveConnectCallback connect_cb, | 644 | GNUNET_PSYC_SlaveConnectCallback connect_cb, |
580 | GNUNET_PSYC_JoinDecisionCallback join_decision_cb, | 645 | GNUNET_PSYC_JoinDecisionCallback join_decision_cb, |
581 | void *cls, | 646 | void *cls, |
582 | const struct GNUNET_MessageHeader *join_msg) | 647 | const struct GNUNET_PSYC_Message *join_msg) |
583 | { | 648 | { |
584 | struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); | 649 | struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); |
585 | struct GNUNET_PSYC_Channel *chn = &slv->chn; | 650 | struct GNUNET_PSYC_Channel *chn = &slv->chn; |
651 | |||
652 | uint16_t relay_size = relay_count * sizeof (*relays); | ||
653 | uint16_t join_msg_size = ntohs (join_msg->header.size); | ||
586 | struct SlaveJoinRequest *req | 654 | struct SlaveJoinRequest *req |
587 | = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays)); | 655 | = GNUNET_malloc (sizeof (*req) + relay_size + join_msg_size); |
588 | req->header.size = htons (sizeof (*req) | 656 | req->header.size = htons (sizeof (*req) |
589 | + relay_count * sizeof (*relays)); | 657 | + relay_count * sizeof (*relays)); |
590 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); | 658 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); |
@@ -592,7 +660,12 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
592 | req->slave_key = *slave_key; | 660 | req->slave_key = *slave_key; |
593 | req->origin = *origin; | 661 | req->origin = *origin; |
594 | req->relay_count = htonl (relay_count); | 662 | req->relay_count = htonl (relay_count); |
595 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); | 663 | |
664 | if (0 < relay_size) | ||
665 | memcpy (&req[1], relays, relay_size); | ||
666 | |||
667 | if (0 < join_msg_size) | ||
668 | memcpy ((char *) &req[1] + relay_size, join_msg, join_msg_size); | ||
596 | 669 | ||
597 | chn->connect_msg = (struct GNUNET_MessageHeader *) req; | 670 | chn->connect_msg = (struct GNUNET_MessageHeader *) req; |
598 | chn->cfg = cfg; | 671 | chn->cfg = cfg; |
@@ -605,7 +678,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
605 | chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers); | 678 | chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers); |
606 | GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn)); | 679 | GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn)); |
607 | 680 | ||
608 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls); | 681 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); |
609 | chn->tmit = GNUNET_PSYC_transmit_create (chn->client); | 682 | chn->tmit = GNUNET_PSYC_transmit_create (chn->client); |
610 | 683 | ||
611 | channel_send_connect_msg (chn); | 684 | channel_send_connect_msg (chn); |
@@ -622,10 +695,21 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
622 | * @param slave Slave handle. | 695 | * @param slave Slave handle. |
623 | */ | 696 | */ |
624 | void | 697 | void |
625 | GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv) | 698 | GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv, |
699 | int keep_active, | ||
700 | GNUNET_ContinuationCallback part_cb, | ||
701 | void *part_cls) | ||
626 | { | 702 | { |
627 | GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES); | 703 | struct GNUNET_PSYC_Channel *chn = &slv->chn; |
628 | GNUNET_free (slv); | 704 | |
705 | /* FIXME: send msg to service */ | ||
706 | |||
707 | chn->is_disconnecting = GNUNET_YES; | ||
708 | chn->disconnect_cb = part_cb; | ||
709 | chn->disconnect_cls = part_cls; | ||
710 | |||
711 | GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES, | ||
712 | &slave_cleanup, slv); | ||
629 | } | 713 | } |
630 | 714 | ||
631 | 715 | ||
@@ -796,6 +880,7 @@ GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, | |||
796 | uint64_t start_message_id, | 880 | uint64_t start_message_id, |
797 | uint64_t end_message_id, | 881 | uint64_t end_message_id, |
798 | GNUNET_PSYC_MessageCallback message_cb, | 882 | GNUNET_PSYC_MessageCallback message_cb, |
883 | GNUNET_PSYC_MessagePartCallback message_part_cb, | ||
799 | GNUNET_PSYC_FinishCallback finish_cb, | 884 | GNUNET_PSYC_FinishCallback finish_cb, |
800 | void *cls) | 885 | void *cls) |
801 | { | 886 | { |
diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c index 8ab0bdad2..6177976de 100644 --- a/src/psyc/psyc_util_lib.c +++ b/src/psyc/psyc_util_lib.c | |||
@@ -76,7 +76,11 @@ struct GNUNET_PSYC_TransmitHandle | |||
76 | * | 76 | * |
77 | */ | 77 | */ |
78 | const char *mod_value; | 78 | const char *mod_value; |
79 | size_t mod_value_size; | 79 | |
80 | /** | ||
81 | * Number of bytes remaining to be transmitted from the current modifier value. | ||
82 | */ | ||
83 | uint32_t mod_value_remaining; | ||
80 | 84 | ||
81 | /** | 85 | /** |
82 | * State of the current message being received from client. | 86 | * State of the current message being received from client. |
@@ -104,14 +108,14 @@ struct GNUNET_PSYC_TransmitHandle | |||
104 | struct GNUNET_PSYC_ReceiveHandle | 108 | struct GNUNET_PSYC_ReceiveHandle |
105 | { | 109 | { |
106 | /** | 110 | /** |
107 | * Message part callback. | 111 | * Message callback. |
108 | */ | 112 | */ |
109 | GNUNET_PSYC_MessageCallback message_cb; | 113 | GNUNET_PSYC_MessageCallback message_cb; |
110 | 114 | ||
111 | /** | 115 | /** |
112 | * Message part callback for historic message. | 116 | * Message part callback. |
113 | */ | 117 | */ |
114 | GNUNET_PSYC_MessageCallback hist_message_cb; | 118 | GNUNET_PSYC_MessagePartCallback message_part_cb; |
115 | 119 | ||
116 | /** | 120 | /** |
117 | * Closure for the callbacks. | 121 | * Closure for the callbacks. |
@@ -149,6 +153,7 @@ struct GNUNET_PSYC_ReceiveHandle | |||
149 | uint32_t mod_value_size; | 153 | uint32_t mod_value_size; |
150 | }; | 154 | }; |
151 | 155 | ||
156 | |||
152 | /**** Messages ****/ | 157 | /**** Messages ****/ |
153 | 158 | ||
154 | 159 | ||
@@ -167,7 +172,7 @@ struct GNUNET_PSYC_ReceiveHandle | |||
167 | * @return Message header with size information, | 172 | * @return Message header with size information, |
168 | * followed by the message parts. | 173 | * followed by the message parts. |
169 | */ | 174 | */ |
170 | struct GNUNET_MessageHeader * | 175 | struct GNUNET_PSYC_Message * |
171 | GNUNET_PSYC_message_create (const char *method_name, | 176 | GNUNET_PSYC_message_create (const char *method_name, |
172 | const struct GNUNET_ENV_Environment *env, | 177 | const struct GNUNET_ENV_Environment *env, |
173 | const void *data, | 178 | const void *data, |
@@ -188,7 +193,7 @@ GNUNET_PSYC_message_create (const char *method_name, | |||
188 | } | 193 | } |
189 | } | 194 | } |
190 | 195 | ||
191 | struct GNUNET_MessageHeader *msg; | 196 | struct GNUNET_PSYC_Message *msg; |
192 | uint16_t method_name_size = strlen (method_name) + 1; | 197 | uint16_t method_name_size = strlen (method_name) + 1; |
193 | if (method_name_size == 1) | 198 | if (method_name_size == 1) |
194 | return NULL; | 199 | return NULL; |
@@ -199,12 +204,13 @@ GNUNET_PSYC_message_create (const char *method_name, | |||
199 | + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0)/* data */ | 204 | + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0)/* data */ |
200 | + sizeof (*pmsg); /* end of message */ | 205 | + sizeof (*pmsg); /* end of message */ |
201 | msg = GNUNET_malloc (msg_size); | 206 | msg = GNUNET_malloc (msg_size); |
202 | msg->size = htons (msg_size); | 207 | msg->header.size = htons (msg_size); |
203 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | 208 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */ |
204 | 209 | ||
205 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1]; | 210 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1]; |
211 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | ||
206 | pmeth->header.size = htons (sizeof (*pmeth) + method_name_size); | 212 | pmeth->header.size = htons (sizeof (*pmeth) + method_name_size); |
207 | memcpy (pmeth, method_name, method_name_size); | 213 | memcpy (&pmeth[1], method_name, method_name_size); |
208 | 214 | ||
209 | uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size; | 215 | uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size; |
210 | if (NULL != env) | 216 | if (NULL != env) |
@@ -215,7 +221,7 @@ GNUNET_PSYC_message_create (const char *method_name, | |||
215 | uint16_t mod_name_size = strlen (mod->name) + 1; | 221 | uint16_t mod_name_size = strlen (mod->name) + 1; |
216 | pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p); | 222 | pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p); |
217 | pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | 223 | pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); |
218 | pmod->header.size = sizeof (*pmod) + mod_name_size + 1 + mod->value_size; | 224 | pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size; |
219 | p += pmod->header.size; | 225 | p += pmod->header.size; |
220 | pmod->header.size = htons (pmod->header.size); | 226 | pmod->header.size = htons (pmod->header.size); |
221 | 227 | ||
@@ -241,6 +247,7 @@ GNUNET_PSYC_message_create (const char *method_name, | |||
241 | pmsg->size = htons (sizeof (*pmsg)); | 247 | pmsg->size = htons (sizeof (*pmsg)); |
242 | pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | 248 | pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); |
243 | 249 | ||
250 | GNUNET_assert (p + sizeof (*pmsg) == msg_size); | ||
244 | return msg; | 251 | return msg; |
245 | } | 252 | } |
246 | 253 | ||
@@ -276,8 +283,8 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, | |||
276 | uint16_t name_size = ntohs (mod->name_size); | 283 | uint16_t name_size = ntohs (mod->name_size); |
277 | char oper = ' ' < mod->oper ? mod->oper : ' '; | 284 | char oper = ' ' < mod->oper ? mod->oper : ' '; |
278 | GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1], | 285 | GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1], |
279 | size - sizeof (*mod) - name_size - 1, | 286 | size - sizeof (*mod) - name_size, |
280 | ((char *) &mod[1]) + name_size + 1); | 287 | ((char *) &mod[1]) + name_size); |
281 | break; | 288 | break; |
282 | } | 289 | } |
283 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | 290 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: |
@@ -331,7 +338,7 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
331 | uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0; | 338 | uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0; |
332 | 339 | ||
333 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 340 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
334 | "Queueing message of type %u and size %u (end: %u)).\n", | 341 | "Queueing message part of type %u and size %u (end: %u)).\n", |
335 | ntohs (msg->type), size, end); | 342 | ntohs (msg->type), size, end); |
336 | 343 | ||
337 | if (NULL != tmit->msg) | 344 | if (NULL != tmit->msg) |
@@ -396,6 +403,9 @@ transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
396 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | 403 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); |
397 | 404 | ||
398 | int notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); | 405 | int notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]); |
406 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
407 | "transmit_data (ret: %d, size: %u): %.*s\n", | ||
408 | notify_ret, data_size, data_size, &msg[1]); | ||
399 | switch (notify_ret) | 409 | switch (notify_ret) |
400 | { | 410 | { |
401 | case GNUNET_NO: | 411 | case GNUNET_NO: |
@@ -463,9 +473,15 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
463 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); | 473 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); |
464 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], | 474 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1], |
465 | &mod->oper, &mod->value_size); | 475 | &mod->oper, &mod->value_size); |
466 | mod->name_size = strnlen ((char *) &mod[1], data_size); | 476 | |
477 | mod->name_size = strnlen ((char *) &mod[1], data_size) + 1; | ||
478 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
479 | "transmit_mod (ret: %d, size: %u + %u): %.*s\n", | ||
480 | notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]); | ||
467 | if (mod->name_size < data_size) | 481 | if (mod->name_size < data_size) |
468 | { | 482 | { |
483 | tmit->mod_value_remaining | ||
484 | = mod->value_size - (data_size - mod->name_size); | ||
469 | mod->value_size = htonl (mod->value_size); | 485 | mod->value_size = htonl (mod->value_size); |
470 | mod->name_size = htons (mod->name_size); | 486 | mod->name_size = htons (mod->name_size); |
471 | } | 487 | } |
@@ -483,6 +499,10 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
483 | msg->size = sizeof (struct GNUNET_MessageHeader); | 499 | msg->size = sizeof (struct GNUNET_MessageHeader); |
484 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, | 500 | notify_ret = tmit->notify_mod (tmit->notify_mod_cls, |
485 | &data_size, &msg[1], NULL, NULL); | 501 | &data_size, &msg[1], NULL, NULL); |
502 | tmit->mod_value_remaining -= data_size; | ||
503 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
504 | "transmit_mod (ret: %d, size: %u): %.*s\n", | ||
505 | notify_ret, data_size, data_size, &msg[1]); | ||
486 | break; | 506 | break; |
487 | } | 507 | } |
488 | default: | 508 | default: |
@@ -497,26 +517,19 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
497 | tmit->paused = GNUNET_YES; | 517 | tmit->paused = GNUNET_YES; |
498 | return; | 518 | return; |
499 | } | 519 | } |
500 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT; | 520 | tmit->state |
521 | = (0 == tmit->mod_value_remaining) | ||
522 | ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER | ||
523 | : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT; | ||
501 | break; | 524 | break; |
502 | 525 | ||
503 | case GNUNET_YES: | 526 | case GNUNET_YES: /* End of modifiers. */ |
504 | if (0 == data_size) | 527 | GNUNET_assert (0 == tmit->mod_value_remaining); |
505 | { | ||
506 | /* End of modifiers. */ | ||
507 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA; | ||
508 | if (0 == tmit->acks_pending) | ||
509 | transmit_data (tmit); | ||
510 | |||
511 | return; | ||
512 | } | ||
513 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; | ||
514 | break; | 528 | break; |
515 | 529 | ||
516 | default: | 530 | default: |
517 | LOG (GNUNET_ERROR_TYPE_ERROR, | 531 | LOG (GNUNET_ERROR_TYPE_ERROR, |
518 | "TransmitNotifyModifier callback returned error " | 532 | "TransmitNotifyModifier callback returned with error.\n"); |
519 | "when requesting a modifier.\n"); | ||
520 | 533 | ||
521 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; | 534 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL; |
522 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 535 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
@@ -533,7 +546,16 @@ transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
533 | transmit_queue_insert (tmit, msg, GNUNET_NO); | 546 | transmit_queue_insert (tmit, msg, GNUNET_NO); |
534 | } | 547 | } |
535 | 548 | ||
536 | transmit_mod (tmit); | 549 | if (GNUNET_YES == notify_ret) |
550 | { | ||
551 | tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA; | ||
552 | if (0 == tmit->acks_pending) | ||
553 | transmit_data (tmit); | ||
554 | } | ||
555 | else | ||
556 | { | ||
557 | transmit_mod (tmit); | ||
558 | } | ||
537 | } | 559 | } |
538 | 560 | ||
539 | 561 | ||
@@ -547,9 +569,10 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper, | |||
547 | size_t value_size = 0; | 569 | size_t value_size = 0; |
548 | const char *value = NULL; | 570 | const char *value = NULL; |
549 | 571 | ||
550 | if (NULL != oper && NULL != tmit->mod) | 572 | if (NULL != oper) |
551 | { /* New modifier */ | 573 | { /* New modifier */ |
552 | tmit->mod = tmit->mod->next; | 574 | if (NULL != tmit->mod) |
575 | tmit->mod = tmit->mod->next; | ||
553 | if (NULL == tmit->mod) | 576 | if (NULL == tmit->mod) |
554 | { /* No more modifiers, continue with data */ | 577 | { /* No more modifiers, continue with data */ |
555 | *data_size = 0; | 578 | *data_size = 0; |
@@ -559,30 +582,29 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper, | |||
559 | GNUNET_assert (tmit->mod->value_size < UINT32_MAX); | 582 | GNUNET_assert (tmit->mod->value_size < UINT32_MAX); |
560 | *full_value_size = tmit->mod->value_size; | 583 | *full_value_size = tmit->mod->value_size; |
561 | *oper = tmit->mod->oper; | 584 | *oper = tmit->mod->oper; |
562 | name_size = strlen (tmit->mod->name); | 585 | name_size = strlen (tmit->mod->name) + 1; |
563 | 586 | ||
564 | if (name_size + 1 + tmit->mod->value_size <= *data_size) | 587 | if (name_size + tmit->mod->value_size <= *data_size) |
565 | { | 588 | { |
566 | *data_size = name_size + 1 + tmit->mod->value_size; | 589 | *data_size = name_size + tmit->mod->value_size; |
567 | } | 590 | } |
568 | else | 591 | else |
569 | { | 592 | { |
570 | tmit->mod_value_size = tmit->mod->value_size; | 593 | value_size = *data_size - name_size; |
571 | value_size = *data_size - name_size - 1; | ||
572 | tmit->mod_value_size -= value_size; | ||
573 | tmit->mod_value = tmit->mod->value + value_size; | 594 | tmit->mod_value = tmit->mod->value + value_size; |
574 | } | 595 | } |
575 | 596 | ||
576 | memcpy (data, tmit->mod->name, name_size); | 597 | memcpy (data, tmit->mod->name, name_size); |
577 | ((char *)data)[name_size] = '\0'; | 598 | memcpy ((char *)data + name_size, tmit->mod->value, value_size); |
578 | memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size); | 599 | return GNUNET_NO; |
579 | } | 600 | } |
580 | else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size) | 601 | else |
581 | { /* Modifier continuation */ | 602 | { /* Modifier continuation */ |
603 | GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining); | ||
582 | value = tmit->mod_value; | 604 | value = tmit->mod_value; |
583 | if (tmit->mod_value_size <= *data_size) | 605 | if (tmit->mod_value_remaining <= *data_size) |
584 | { | 606 | { |
585 | value_size = tmit->mod_value_size; | 607 | value_size = tmit->mod_value_remaining; |
586 | tmit->mod_value = NULL; | 608 | tmit->mod_value = NULL; |
587 | } | 609 | } |
588 | else | 610 | else |
@@ -590,7 +612,6 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper, | |||
590 | value_size = *data_size; | 612 | value_size = *data_size; |
591 | tmit->mod_value += value_size; | 613 | tmit->mod_value += value_size; |
592 | } | 614 | } |
593 | tmit->mod_value_size -= value_size; | ||
594 | 615 | ||
595 | if (*data_size < value_size) | 616 | if (*data_size < value_size) |
596 | { | 617 | { |
@@ -603,9 +624,8 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper, | |||
603 | 624 | ||
604 | *data_size = value_size; | 625 | *data_size = value_size; |
605 | memcpy (data, value, value_size); | 626 | memcpy (data, value, value_size); |
627 | return (tmit->mod_value = NULL) ? GNUNET_YES : GNUNET_NO; | ||
606 | } | 628 | } |
607 | |||
608 | return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO; | ||
609 | } | 629 | } |
610 | 630 | ||
611 | 631 | ||
@@ -663,10 +683,16 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
663 | { | 683 | { |
664 | tmit->notify_mod = &transmit_notify_env; | 684 | tmit->notify_mod = &transmit_notify_env; |
665 | tmit->notify_mod_cls = tmit; | 685 | tmit->notify_mod_cls = tmit; |
666 | tmit->mod | 686 | if (NULL != env) |
667 | = (NULL != env) | 687 | { |
668 | ? GNUNET_ENV_environment_head (env) | 688 | struct GNUNET_ENV_Modifier mod = {}; |
669 | : NULL; | 689 | mod.next = GNUNET_ENV_environment_head (env); |
690 | tmit->mod = &mod; | ||
691 | } | ||
692 | else | ||
693 | { | ||
694 | tmit->mod = NULL; | ||
695 | } | ||
670 | } | 696 | } |
671 | 697 | ||
672 | transmit_mod (tmit); | 698 | transmit_mod (tmit); |
@@ -762,12 +788,12 @@ GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit) | |||
762 | */ | 788 | */ |
763 | struct GNUNET_PSYC_ReceiveHandle * | 789 | struct GNUNET_PSYC_ReceiveHandle * |
764 | GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb, | 790 | GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb, |
765 | GNUNET_PSYC_MessageCallback hist_message_cb, | 791 | GNUNET_PSYC_MessagePartCallback message_part_cb, |
766 | void *cb_cls) | 792 | void *cb_cls) |
767 | { | 793 | { |
768 | struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv)); | 794 | struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv)); |
769 | recv->message_cb = message_cb; | 795 | recv->message_cb = message_cb; |
770 | recv->hist_message_cb = hist_message_cb; | 796 | recv->message_part_cb = message_part_cb; |
771 | recv->cb_cls = cb_cls; | 797 | recv->cb_cls = cb_cls; |
772 | return recv; | 798 | return recv; |
773 | } | 799 | } |
@@ -800,13 +826,11 @@ GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv) | |||
800 | static void | 826 | static void |
801 | recv_error (struct GNUNET_PSYC_ReceiveHandle *recv) | 827 | recv_error (struct GNUNET_PSYC_ReceiveHandle *recv) |
802 | { | 828 | { |
803 | GNUNET_PSYC_MessageCallback message_cb | 829 | if (NULL != recv->message_part_cb) |
804 | = recv->flags & GNUNET_PSYC_MESSAGE_HISTORIC | 830 | recv->message_part_cb (recv->cb_cls, recv->message_id, 0, recv->flags, NULL); |
805 | ? recv->hist_message_cb | ||
806 | : recv->message_cb; | ||
807 | 831 | ||
808 | if (NULL != message_cb) | 832 | if (NULL != recv->message_cb) |
809 | message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL); | 833 | recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL); |
810 | 834 | ||
811 | GNUNET_PSYC_receive_reset (recv); | 835 | GNUNET_PSYC_receive_reset (recv); |
812 | } | 836 | } |
@@ -827,6 +851,7 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, | |||
827 | { | 851 | { |
828 | uint16_t size = ntohs (msg->header.size); | 852 | uint16_t size = ntohs (msg->header.size); |
829 | uint32_t flags = ntohl (msg->flags); | 853 | uint32_t flags = ntohl (msg->flags); |
854 | uint64_t message_id; | ||
830 | 855 | ||
831 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, | 856 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, |
832 | (struct GNUNET_MessageHeader *) msg); | 857 | (struct GNUNET_MessageHeader *) msg); |
@@ -858,6 +883,7 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, | |||
858 | recv_error (recv); | 883 | recv_error (recv); |
859 | return GNUNET_SYSERR; | 884 | return GNUNET_SYSERR; |
860 | } | 885 | } |
886 | message_id = recv->message_id; | ||
861 | 887 | ||
862 | uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; | 888 | uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; |
863 | 889 | ||
@@ -964,10 +990,10 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, | |||
964 | 990 | ||
965 | uint16_t name_size = ntohs (mod->name_size); | 991 | uint16_t name_size = ntohs (mod->name_size); |
966 | recv->mod_value_size_expected = ntohl (mod->value_size); | 992 | recv->mod_value_size_expected = ntohl (mod->value_size); |
967 | recv->mod_value_size = psize - sizeof (*mod) - name_size - 1; | 993 | recv->mod_value_size = psize - sizeof (*mod) - name_size; |
968 | 994 | ||
969 | if (psize < sizeof (*mod) + name_size + 1 | 995 | if (psize < sizeof (*mod) + name_size |
970 | || '\0' != *((char *) &mod[1] + name_size) | 996 | || '\0' != *((char *) &mod[1] + name_size - 1) |
971 | || recv->mod_value_size_expected < recv->mod_value_size) | 997 | || recv->mod_value_size_expected < recv->mod_value_size) |
972 | { | 998 | { |
973 | LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n"); | 999 | LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n"); |
@@ -1018,13 +1044,9 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, | |||
1018 | } | 1044 | } |
1019 | } | 1045 | } |
1020 | 1046 | ||
1021 | GNUNET_PSYC_MessageCallback message_cb | 1047 | if (NULL != recv->message_part_cb) |
1022 | = recv->flags & GNUNET_PSYC_MESSAGE_HISTORIC | 1048 | recv->message_part_cb (recv->cb_cls, recv->message_id, 0, // FIXME: data_offset |
1023 | ? recv->hist_message_cb | 1049 | recv->flags, pmsg); |
1024 | : recv->message_cb; | ||
1025 | |||
1026 | if (NULL != message_cb) | ||
1027 | message_cb (recv->cb_cls, recv->message_id, recv->flags, pmsg); | ||
1028 | 1050 | ||
1029 | switch (ptype) | 1051 | switch (ptype) |
1030 | { | 1052 | { |
@@ -1034,6 +1056,9 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv, | |||
1034 | break; | 1056 | break; |
1035 | } | 1057 | } |
1036 | } | 1058 | } |
1059 | |||
1060 | if (NULL != recv->message_cb) | ||
1061 | recv->message_cb (recv->cb_cls, message_id, flags, msg); | ||
1037 | return GNUNET_OK; | 1062 | return GNUNET_OK; |
1038 | } | 1063 | } |
1039 | 1064 | ||
@@ -1063,6 +1088,7 @@ GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data, | |||
1063 | for (pos = 0; pos < data_size; pos += psize, parts++) | 1088 | for (pos = 0; pos < data_size; pos += psize, parts++) |
1064 | { | 1089 | { |
1065 | pmsg = (const struct GNUNET_MessageHeader *) (data + pos); | 1090 | pmsg = (const struct GNUNET_MessageHeader *) (data + pos); |
1091 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); | ||
1066 | psize = ntohs (pmsg->size); | 1092 | psize = ntohs (pmsg->size); |
1067 | ptype = ntohs (pmsg->type); | 1093 | ptype = ntohs (pmsg->type); |
1068 | if (0 == parts && NULL != first_ptype) | 1094 | if (0 == parts && NULL != first_ptype) |
@@ -1084,3 +1110,111 @@ GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data, | |||
1084 | } | 1110 | } |
1085 | return parts; | 1111 | return parts; |
1086 | } | 1112 | } |
1113 | |||
1114 | |||
1115 | struct ParseMessageClosure | ||
1116 | { | ||
1117 | struct GNUNET_ENV_Environment *env; | ||
1118 | const char **method_name; | ||
1119 | const void **data; | ||
1120 | uint16_t *data_size; | ||
1121 | enum GNUNET_PSYC_MessageState msg_state; | ||
1122 | }; | ||
1123 | |||
1124 | |||
1125 | static void | ||
1126 | parse_message_part_cb (void *cls, uint64_t message_id, uint64_t data_offset, | ||
1127 | uint32_t flags, const struct GNUNET_MessageHeader *msg) | ||
1128 | { | ||
1129 | struct ParseMessageClosure *pmc = cls; | ||
1130 | if (NULL == msg) | ||
1131 | { | ||
1132 | pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; | ||
1133 | return; | ||
1134 | } | ||
1135 | |||
1136 | switch (ntohs (msg->type)) | ||
1137 | { | ||
1138 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
1139 | { | ||
1140 | struct GNUNET_PSYC_MessageMethod * | ||
1141 | pmeth = (struct GNUNET_PSYC_MessageMethod *) msg; | ||
1142 | *pmc->method_name = (const char *) &pmeth[1]; | ||
1143 | pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD; | ||
1144 | break; | ||
1145 | } | ||
1146 | |||
1147 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
1148 | { | ||
1149 | struct GNUNET_PSYC_MessageModifier * | ||
1150 | pmod = (struct GNUNET_PSYC_MessageModifier *) msg; | ||
1151 | |||
1152 | const char *name = (const char *) &pmod[1]; | ||
1153 | const void *value = name + pmod->name_size; | ||
1154 | GNUNET_ENV_environment_add (pmc->env, pmod->oper, name, value, | ||
1155 | pmod->value_size); | ||
1156 | pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; | ||
1157 | break; | ||
1158 | } | ||
1159 | |||
1160 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
1161 | *pmc->data = &msg[1]; | ||
1162 | *pmc->data_size = ntohs (msg->size) - sizeof (*msg); | ||
1163 | pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA; | ||
1164 | break; | ||
1165 | |||
1166 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
1167 | pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END; | ||
1168 | break; | ||
1169 | |||
1170 | default: | ||
1171 | pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; | ||
1172 | } | ||
1173 | } | ||
1174 | |||
1175 | |||
1176 | /** | ||
1177 | * Parse PSYC message. | ||
1178 | * | ||
1179 | * @param msg | ||
1180 | * The PSYC message to parse. | ||
1181 | * @param[out] method_name | ||
1182 | * Pointer to the method name inside @a pmsg. | ||
1183 | * @param env | ||
1184 | * The environment for the message with a list of modifiers. | ||
1185 | * @param[out] data | ||
1186 | * Pointer to data inside @a pmsg. | ||
1187 | * @param[out] data_size | ||
1188 | * Size of @data is written here. | ||
1189 | * | ||
1190 | * @return #GNUNET_OK on success, | ||
1191 | * #GNUNET_SYSERR on parse error. | ||
1192 | */ | ||
1193 | int | ||
1194 | GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_Message *msg, | ||
1195 | const char **method_name, | ||
1196 | struct GNUNET_ENV_Environment *env, | ||
1197 | const void **data, | ||
1198 | uint16_t *data_size) | ||
1199 | { | ||
1200 | struct ParseMessageClosure cls; | ||
1201 | cls.env = env; | ||
1202 | cls.method_name = method_name; | ||
1203 | cls.data = data; | ||
1204 | cls.data_size = data_size; | ||
1205 | |||
1206 | uint16_t msg_size = ntohs (msg->header.size); | ||
1207 | struct GNUNET_PSYC_MessageHeader * | ||
1208 | pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg)); | ||
1209 | memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg)); | ||
1210 | |||
1211 | struct GNUNET_PSYC_ReceiveHandle * | ||
1212 | recv = GNUNET_PSYC_receive_create (NULL, &parse_message_part_cb, &cls); | ||
1213 | GNUNET_PSYC_receive_message (recv, pmsg); | ||
1214 | GNUNET_PSYC_receive_destroy (recv); | ||
1215 | GNUNET_free (pmsg); | ||
1216 | |||
1217 | return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state) | ||
1218 | ? GNUNET_OK | ||
1219 | : GNUNET_SYSERR; | ||
1220 | } | ||
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 0077bc9b7..495e3be47 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -62,8 +62,6 @@ static struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key; | |||
62 | static struct GNUNET_CRYPTO_EddsaPublicKey channel_pub_key; | 62 | static struct GNUNET_CRYPTO_EddsaPublicKey channel_pub_key; |
63 | static struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; | 63 | static struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; |
64 | 64 | ||
65 | struct GNUNET_PSYC_MasterTransmitHandle *mth; | ||
66 | |||
67 | struct TransmitClosure | 65 | struct TransmitClosure |
68 | { | 66 | { |
69 | struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit; | 67 | struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit; |
@@ -95,6 +93,28 @@ static void | |||
95 | master_transmit (); | 93 | master_transmit (); |
96 | 94 | ||
97 | 95 | ||
96 | void master_stopped (void *cls) | ||
97 | { | ||
98 | if (NULL != tmit) | ||
99 | { | ||
100 | GNUNET_ENV_environment_destroy (tmit->env); | ||
101 | GNUNET_free (tmit); | ||
102 | tmit = NULL; | ||
103 | } | ||
104 | GNUNET_SCHEDULER_shutdown (); | ||
105 | } | ||
106 | |||
107 | void slave_parted (void *cls) | ||
108 | { | ||
109 | if (NULL != mst) | ||
110 | { | ||
111 | GNUNET_PSYC_master_stop (mst, GNUNET_NO, &master_stopped, NULL); | ||
112 | mst = NULL; | ||
113 | } | ||
114 | else | ||
115 | master_stopped (NULL); | ||
116 | } | ||
117 | |||
98 | /** | 118 | /** |
99 | * Clean up all resources used. | 119 | * Clean up all resources used. |
100 | */ | 120 | */ |
@@ -103,21 +123,11 @@ cleanup () | |||
103 | { | 123 | { |
104 | if (NULL != slv) | 124 | if (NULL != slv) |
105 | { | 125 | { |
106 | GNUNET_PSYC_slave_part (slv); | 126 | GNUNET_PSYC_slave_part (slv, GNUNET_NO, &slave_parted, NULL); |
107 | slv = NULL; | 127 | slv = NULL; |
108 | } | 128 | } |
109 | if (NULL != mst) | 129 | else |
110 | { | 130 | slave_parted (NULL); |
111 | GNUNET_PSYC_master_stop (mst); | ||
112 | mst = NULL; | ||
113 | } | ||
114 | if (NULL != tmit) | ||
115 | { | ||
116 | GNUNET_ENV_environment_destroy (tmit->env); | ||
117 | GNUNET_free (tmit); | ||
118 | tmit = NULL; | ||
119 | } | ||
120 | GNUNET_SCHEDULER_shutdown (); | ||
121 | } | 131 | } |
122 | 132 | ||
123 | 133 | ||
@@ -171,7 +181,20 @@ end () | |||
171 | 181 | ||
172 | static void | 182 | static void |
173 | master_message_cb (void *cls, uint64_t message_id, uint32_t flags, | 183 | master_message_cb (void *cls, uint64_t message_id, uint32_t flags, |
174 | const struct GNUNET_MessageHeader *msg) | 184 | const struct GNUNET_PSYC_MessageHeader *msg) |
185 | { | ||
186 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
187 | "Master got PSYC message fragment of size %u " | ||
188 | "belonging to message ID %llu with flags %x\n", | ||
189 | ntohs (msg->header.size), message_id, flags); | ||
190 | // FIXME | ||
191 | } | ||
192 | |||
193 | |||
194 | static void | ||
195 | master_message_part_cb (void *cls, uint64_t message_id, | ||
196 | uint64_t data_offset, uint32_t flags, | ||
197 | const struct GNUNET_MessageHeader *msg) | ||
175 | { | 198 | { |
176 | if (NULL == msg) | 199 | if (NULL == msg) |
177 | { | 200 | { |
@@ -215,7 +238,20 @@ master_message_cb (void *cls, uint64_t message_id, uint32_t flags, | |||
215 | 238 | ||
216 | static void | 239 | static void |
217 | slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, | 240 | slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, |
218 | const struct GNUNET_MessageHeader *msg) | 241 | const struct GNUNET_PSYC_MessageHeader *msg) |
242 | { | ||
243 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
244 | "Slave got PSYC message fragment of size %u " | ||
245 | "belonging to message ID %llu with flags %x\n", | ||
246 | ntohs (msg->header.size), message_id, flags); | ||
247 | // FIXME | ||
248 | } | ||
249 | |||
250 | |||
251 | static void | ||
252 | slave_message_part_cb (void *cls, uint64_t message_id, | ||
253 | uint64_t data_offset, uint32_t flags, | ||
254 | const struct GNUNET_MessageHeader *msg) | ||
219 | { | 255 | { |
220 | if (NULL == msg) | 256 | if (NULL == msg) |
221 | { | 257 | { |
@@ -371,7 +407,7 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper, | |||
371 | memcpy (data, value, value_size); | 407 | memcpy (data, value, value_size); |
372 | } | 408 | } |
373 | 409 | ||
374 | return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO; | 410 | return GNUNET_NO; |
375 | } | 411 | } |
376 | 412 | ||
377 | 413 | ||
@@ -380,8 +416,10 @@ slave_join (); | |||
380 | 416 | ||
381 | 417 | ||
382 | static void | 418 | static void |
383 | join_decision_cb (void *cls, int is_admitted, | 419 | join_decision_cb (void *cls, |
384 | const struct GNUNET_PSYC_MessageHeader *join_msg) | 420 | const struct GNUNET_PSYC_JoinDecisionMessage *dcsn, |
421 | int is_admitted, | ||
422 | const struct GNUNET_PSYC_Message *join_msg) | ||
385 | { | 423 | { |
386 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 424 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
387 | "Slave got join decision: %d\n", is_admitted); | 425 | "Slave got join decision: %d\n", is_admitted); |
@@ -415,8 +453,10 @@ join_decision_cb (void *cls, int is_admitted, | |||
415 | 453 | ||
416 | 454 | ||
417 | static void | 455 | static void |
418 | join_request_cb (void *cls, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | 456 | join_request_cb (void *cls, |
419 | const struct GNUNET_PSYC_MessageHeader *msg, | 457 | const struct GNUNET_PSYC_JoinRequestMessage *req, |
458 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | ||
459 | const struct GNUNET_PSYC_Message *join_msg, | ||
420 | struct GNUNET_PSYC_JoinHandle *jh) | 460 | struct GNUNET_PSYC_JoinHandle *jh) |
421 | { | 461 | { |
422 | struct GNUNET_HashCode slave_key_hash; | 462 | struct GNUNET_HashCode slave_key_hash; |
@@ -450,11 +490,11 @@ slave_join () | |||
450 | "_foo", "bar baz", 7); | 490 | "_foo", "bar baz", 7); |
451 | GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, | 491 | GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, |
452 | "_foo_bar", "foo bar baz", 11); | 492 | "_foo_bar", "foo bar baz", 11); |
453 | struct GNUNET_MessageHeader * | 493 | struct GNUNET_PSYC_Message * |
454 | join_msg = GNUNET_PSYC_message_create ("_request_join", env, "some data", 9); | 494 | join_msg = GNUNET_PSYC_message_create ("_request_join", env, "some data", 9); |
455 | 495 | ||
456 | slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, | 496 | slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, 0, NULL, |
457 | 0, NULL, &slave_message_cb, | 497 | &slave_message_cb, &slave_message_part_cb, |
458 | &slave_connect_cb, &join_decision_cb, NULL, | 498 | &slave_connect_cb, &join_decision_cb, NULL, |
459 | join_msg); | 499 | join_msg); |
460 | GNUNET_ENV_environment_destroy (env); | 500 | GNUNET_ENV_environment_destroy (env); |
@@ -551,7 +591,8 @@ run (void *cls, | |||
551 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); | 591 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); |
552 | mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, | 592 | mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, |
553 | &master_start_cb, &join_request_cb, | 593 | &master_start_cb, &join_request_cb, |
554 | &master_message_cb, NULL); | 594 | &master_message_cb, &master_message_part_cb, |
595 | NULL); | ||
555 | } | 596 | } |
556 | 597 | ||
557 | 598 | ||