aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/gnunet-service-psyc.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-07-23 16:19:49 +0000
committerGabor X Toth <*@tg-x.net>2014-07-23 16:19:49 +0000
commit3cf8ba0b60f8495892fa76635e9c23555d0a304c (patch)
tree5f27648bdb3cf3409628e4e5edc26f811cbd03a5 /src/psyc/gnunet-service-psyc.c
parent252b5599987b7ba03b879a8c2d1c455ad4c9834a (diff)
downloadgnunet-3cf8ba0b60f8495892fa76635e9c23555d0a304c.tar.gz
gnunet-3cf8ba0b60f8495892fa76635e9c23555d0a304c.zip
social: implement enter/leave/messaging; psyc: improvements and fixes
- social: implement enter/leave, send/receive messages, slicer - psyc, social: add struct GNUNET_PSYC_Message for single-fragment join messages - psyc: add message callback in addition to message part callback - client_manager, social, psyc, multicast: add disconnect callback
Diffstat (limited to 'src/psyc/gnunet-service-psyc.c')
-rw-r--r--src/psyc/gnunet-service-psyc.c112
1 files changed, 79 insertions, 33 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 866275a79..8fc080baf 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -168,7 +168,7 @@ struct FragmentQueue
168 * Is the message queued for delivery to the client? 168 * Is the message queued for delivery to the client?
169 * i.e. added to the recv_msgs queue 169 * i.e. added to the recv_msgs queue
170 */ 170 */
171 uint8_t queued; 171 uint8_t is_queued;
172}; 172};
173 173
174 174
@@ -382,7 +382,7 @@ struct Slave
382 /** 382 /**
383 * Join request to be transmitted to the master on join. 383 * Join request to be transmitted to the master on join.
384 */ 384 */
385 struct GNUNET_MessageHeader *join_req; 385 struct GNUNET_PSYC_Message *join_msg;
386 386
387 /** 387 /**
388 * Join decision received from multicast. 388 * Join decision received from multicast.
@@ -435,7 +435,7 @@ cleanup_master (struct Master *mst)
435 struct Channel *chn = &mst->chn; 435 struct Channel *chn = &mst->chn;
436 436
437 if (NULL != mst->origin) 437 if (NULL != mst->origin)
438 GNUNET_MULTICAST_origin_stop (mst->origin); 438 GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
439 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); 439 GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
440 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn); 440 GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
441} 441}
@@ -462,12 +462,21 @@ cleanup_slave (struct Slave *slv)
462 } 462 }
463 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); 463 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
464 464
465 if (NULL != slv->join_req) 465 if (NULL != slv->join_msg)
466 GNUNET_free (slv->join_req); 466 {
467 GNUNET_free (slv->join_msg);
468 slv->join_msg = NULL;
469 }
467 if (NULL != slv->relays) 470 if (NULL != slv->relays)
471 {
468 GNUNET_free (slv->relays); 472 GNUNET_free (slv->relays);
473 slv->relays = NULL;
474 }
469 if (NULL != slv->member) 475 if (NULL != slv->member)
470 GNUNET_MULTICAST_member_part (slv->member); 476 {
477 GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
478 slv->member = NULL;
479 }
471 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn); 480 GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
472} 481}
473 482
@@ -482,7 +491,10 @@ cleanup_channel (struct Channel *chn)
482 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash); 491 GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
483 492
484 if (NULL != chn->store_op) 493 if (NULL != chn->store_op)
494 {
485 GNUNET_PSYCSTORE_operation_cancel (chn->store_op); 495 GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
496 chn->store_op = NULL;
497 }
486 498
487 (GNUNET_YES == chn->is_master) 499 (GNUNET_YES == chn->is_master)
488 ? cleanup_master ((struct Master *) chn) 500 ? cleanup_master ((struct Master *) chn)
@@ -574,7 +586,7 @@ struct JoinMemTestClosure
574 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; 586 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
575 struct Channel *chn; 587 struct Channel *chn;
576 struct GNUNET_MULTICAST_JoinHandle *jh; 588 struct GNUNET_MULTICAST_JoinHandle *jh;
577 struct MasterJoinRequest *master_join_req; 589 struct GNUNET_PSYC_JoinRequestMessage *join_msg;
578}; 590};
579 591
580 592
@@ -594,14 +606,14 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
594 &slave_key_hash); 606 &slave_key_hash);
595 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh, 607 GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
596 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 608 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
597 client_send_msg (jcls->chn, &jcls->master_join_req->header); 609 client_send_msg (jcls->chn, &jcls->join_msg->header);
598 } 610 }
599 else 611 else
600 { 612 {
601 // FIXME: add relays 613 // FIXME: add relays
602 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL); 614 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
603 } 615 }
604 GNUNET_free (jcls->master_join_req); 616 GNUNET_free (jcls->join_msg);
605 GNUNET_free (jcls); 617 GNUNET_free (jcls);
606} 618}
607 619
@@ -633,7 +645,8 @@ mcast_recv_join_request (void *cls,
633 } 645 }
634 } 646 }
635 647
636 struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_msg_size); 648 struct GNUNET_PSYC_JoinRequestMessage *
649 req = GNUNET_malloc (sizeof (*req) + join_msg_size);
637 req->header.size = htons (sizeof (*req) + join_msg_size); 650 req->header.size = htons (sizeof (*req) + join_msg_size);
638 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST); 651 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
639 req->slave_key = *slave_key; 652 req->slave_key = *slave_key;
@@ -644,7 +657,7 @@ mcast_recv_join_request (void *cls,
644 jcls->slave_key = *slave_key; 657 jcls->slave_key = *slave_key;
645 jcls->chn = chn; 658 jcls->chn = chn;
646 jcls->jh = jh; 659 jcls->jh = jh;
647 jcls->master_join_req = req; 660 jcls->join_msg = req;
648 661
649 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key, 662 GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
650 chn->max_message_id, 0, 663 chn->max_message_id, 0,
@@ -780,6 +793,7 @@ client_send_mcast_msg (struct Channel *chn,
780 pmsg->header.size = htons (psize); 793 pmsg->header.size = htons (psize);
781 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 794 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
782 pmsg->message_id = mmsg->message_id; 795 pmsg->message_id = mmsg->message_id;
796 pmsg->fragment_offset = mmsg->fragment_offset;
783 797
784 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); 798 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
785 client_send_msg (chn, &pmsg->header); 799 client_send_msg (chn, &pmsg->header);
@@ -810,6 +824,7 @@ client_send_mcast_req (struct Master *mst,
810 pmsg->header.size = htons (psize); 824 pmsg->header.size = htons (psize);
811 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 825 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
812 pmsg->message_id = req->request_id; 826 pmsg->message_id = req->request_id;
827 pmsg->fragment_offset = req->fragment_offset;
813 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); 828 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
814 829
815 memcpy (&pmsg[1], &req[1], size - sizeof (*req)); 830 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
@@ -870,11 +885,12 @@ fragment_queue_insert (struct Channel *chn,
870 { 885 {
871 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 886 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872 "%p Adding message fragment to cache. " 887 "%p Adding message fragment to cache. "
873 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", " 888 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
874 "header_size: %" PRIu64 " + %u).\n",
875 chn, GNUNET_ntohll (mmsg->message_id), 889 chn, GNUNET_ntohll (mmsg->message_id),
876 GNUNET_ntohll (mmsg->fragment_id), 890 GNUNET_ntohll (mmsg->fragment_id));
877 fragq->header_size, size); 891 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
892 "%p header_size: %" PRIu64 " + %u\n",
893 chn, fragq->header_size, size);
878 cache_entry = GNUNET_new (struct RecvCacheEntry); 894 cache_entry = GNUNET_new (struct RecvCacheEntry);
879 cache_entry->ref_count = 1; 895 cache_entry->ref_count = 1;
880 cache_entry->mmsg = GNUNET_malloc (size); 896 cache_entry->mmsg = GNUNET_malloc (size);
@@ -955,11 +971,11 @@ fragment_queue_insert (struct Channel *chn,
955 case MSG_FRAG_STATE_DATA: 971 case MSG_FRAG_STATE_DATA:
956 case MSG_FRAG_STATE_END: 972 case MSG_FRAG_STATE_END:
957 case MSG_FRAG_STATE_CANCEL: 973 case MSG_FRAG_STATE_CANCEL:
958 if (GNUNET_NO == fragq->queued) 974 if (GNUNET_NO == fragq->is_queued)
959 { 975 {
960 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL, 976 GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
961 GNUNET_ntohll (mmsg->message_id)); 977 GNUNET_ntohll (mmsg->message_id));
962 fragq->queued = GNUNET_YES; 978 fragq->is_queued = GNUNET_YES;
963 } 979 }
964 } 980 }
965 981
@@ -1034,7 +1050,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1034 if (MSG_FRAG_STATE_END <= fragq->state) 1050 if (MSG_FRAG_STATE_END <= fragq->state)
1035 { 1051 {
1036 struct GNUNET_HashCode msg_id_hash; 1052 struct GNUNET_HashCode msg_id_hash;
1037 hash_key_from_nll (&msg_id_hash, msg_id); 1053 hash_key_from_hll (&msg_id_hash, msg_id);
1038 1054
1039 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq); 1055 GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
1040 GNUNET_CONTAINER_heap_destroy (fragq->fragments); 1056 GNUNET_CONTAINER_heap_destroy (fragq->fragments);
@@ -1042,7 +1058,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
1042 } 1058 }
1043 else 1059 else
1044 { 1060 {
1045 fragq->queued = GNUNET_NO; 1061 fragq->is_queued = GNUNET_NO;
1046 } 1062 }
1047} 1063}
1048 1064
@@ -1331,13 +1347,18 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1331 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, 1347 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1332 &slv->origin, 1348 &slv->origin,
1333 slv->relay_count, slv->relays, 1349 slv->relay_count, slv->relays,
1334 slv->join_req, 1350 &slv->join_msg->header,
1335 &mcast_recv_join_request, 1351 &mcast_recv_join_request,
1336 &mcast_recv_join_decision, 1352 &mcast_recv_join_decision,
1337 &mcast_recv_membership_test, 1353 &mcast_recv_membership_test,
1338 &mcast_recv_replay_fragment, 1354 &mcast_recv_replay_fragment,
1339 &mcast_recv_replay_message, 1355 &mcast_recv_replay_message,
1340 &mcast_recv_message, chn); 1356 &mcast_recv_message, chn);
1357 if (NULL != slv->join_msg)
1358 {
1359 GNUNET_free (slv->join_msg);
1360 slv->join_msg = NULL;
1361 }
1341 } 1362 }
1342 else 1363 else
1343 { 1364 {
@@ -1435,6 +1456,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1435{ 1456{
1436 const struct SlaveJoinRequest *req 1457 const struct SlaveJoinRequest *req
1437 = (const struct SlaveJoinRequest *) msg; 1458 = (const struct SlaveJoinRequest *) msg;
1459 uint16_t req_size = ntohs (req->header.size);
1438 1460
1439 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key; 1461 struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
1440 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash; 1462 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
@@ -1460,15 +1482,32 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1460 slv->pub_key_hash = slv_pub_key_hash; 1482 slv->pub_key_hash = slv_pub_key_hash;
1461 slv->origin = req->origin; 1483 slv->origin = req->origin;
1462 slv->relay_count = ntohl (req->relay_count); 1484 slv->relay_count = ntohl (req->relay_count);
1485
1486 const struct GNUNET_PeerIdentity *
1487 relays = (const struct GNUNET_PeerIdentity *) &req[1];
1488 uint16_t relay_size = slv->relay_count * sizeof (*relays);
1489 uint16_t join_msg_size = 0;
1490
1491 if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
1492 <= req_size)
1493 {
1494 join_msg_size = ntohs (slv->join_msg->header.size);
1495 slv->join_msg = GNUNET_malloc (join_msg_size);
1496 memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
1497 }
1498 if (sizeof (*req) + relay_size + join_msg_size != req_size)
1499 {
1500 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1501 "%u + %u + %u != %u\n",
1502 sizeof (*req), relay_size, join_msg_size, req_size);
1503 GNUNET_break (0);
1504 GNUNET_SERVER_client_disconnect (client);
1505 return;
1506 }
1463 if (0 < slv->relay_count) 1507 if (0 < slv->relay_count)
1464 { 1508 {
1465 const struct GNUNET_PeerIdentity *relays 1509 slv->relays = GNUNET_malloc (relay_size);
1466 = (const struct GNUNET_PeerIdentity *) &req[1]; 1510 memcpy (slv->relays, &req[1], relay_size);
1467 slv->relays
1468 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
1469 uint32_t i;
1470 for (i = 0; i < slv->relay_count; i++)
1471 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1472 } 1511 }
1473 1512
1474 chn = &slv->chn; 1513 chn = &slv->chn;
@@ -1510,14 +1549,18 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1510 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, 1549 = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
1511 &slv->origin, 1550 &slv->origin,
1512 slv->relay_count, slv->relays, 1551 slv->relay_count, slv->relays,
1513 slv->join_req, 1552 &slv->join_msg->header,
1514 &mcast_recv_join_request, 1553 &mcast_recv_join_request,
1515 &mcast_recv_join_decision, 1554 &mcast_recv_join_decision,
1516 &mcast_recv_membership_test, 1555 &mcast_recv_membership_test,
1517 &mcast_recv_replay_fragment, 1556 &mcast_recv_replay_fragment,
1518 &mcast_recv_replay_message, 1557 &mcast_recv_replay_message,
1519 &mcast_recv_message, chn); 1558 &mcast_recv_message, chn);
1520 1559 if (NULL != slv->join_msg)
1560 {
1561 GNUNET_free (slv->join_msg);
1562 slv->join_msg = NULL;
1563 }
1521 } 1564 }
1522 else if (NULL != slv->join_dcsn) 1565 else if (NULL != slv->join_dcsn)
1523 { 1566 {
@@ -1549,13 +1592,14 @@ struct JoinDecisionClosure
1549 1592
1550 1593
1551/** 1594/**
1552 * Iterator callback for responding to join requests of a slave. 1595 * Iterator callback for sending join decisions to multicast.
1553 */ 1596 */
1554static int 1597static int
1555mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash, 1598mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
1556 void *jh) 1599 void *value)
1557{ 1600{
1558 struct JoinDecisionClosure *jcls = cls; 1601 struct JoinDecisionClosure *jcls = cls;
1602 struct GNUNET_MULTICAST_JoinHandle *jh = value;
1559 // FIXME: add relays 1603 // FIXME: add relays
1560 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg); 1604 GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
1561 return GNUNET_YES; 1605 return GNUNET_YES;
@@ -1579,8 +1623,7 @@ client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
1579 struct JoinDecisionClosure jcls; 1623 struct JoinDecisionClosure jcls;
1580 jcls.is_admitted = ntohl (dcsn->is_admitted); 1624 jcls.is_admitted = ntohl (dcsn->is_admitted);
1581 jcls.msg 1625 jcls.msg
1582 = (sizeof (*dcsn) + sizeof (struct GNUNET_PSYC_MessageHeader) 1626 = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
1583 <= ntohs (msg->size))
1584 ? (struct GNUNET_MessageHeader *) &dcsn[1] 1627 ? (struct GNUNET_MessageHeader *) &dcsn[1]
1585 : NULL; 1628 : NULL;
1586 1629
@@ -1901,6 +1944,9 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1901 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1944 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1902 return; 1945 return;
1903 } 1946 }
1947 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1948 "%p Received message with first part type %u and last part type %u.\n",
1949 chn, first_ptype, last_ptype);
1904 1950
1905 queue_message (chn, client, size - sizeof (*msg), &msg[1], 1951 queue_message (chn, client, size - sizeof (*msg), &msg[1],
1906 first_ptype, last_ptype); 1952 first_ptype, last_ptype);