aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-03-11 18:15:38 +0100
committerChristian Grothoff <christian@grothoff.org>2017-03-11 18:15:38 +0100
commitabdec5e11ff11bb10d32c013e11344a54786f80f (patch)
treec2b8eb6705efa8ac8278a6024d8ab19222471f0e /src/set
parent4e981fb2bd74f21c33adf05d7999b05704d6909b (diff)
downloadgnunet-abdec5e11ff11bb10d32c013e11344a54786f80f.tar.gz
gnunet-abdec5e11ff11bb10d32c013e11344a54786f80f.zip
cleaning up set handlers, eliminating 2nd level demultiplexing and improving use of types
Diffstat (limited to 'src/set')
-rw-r--r--src/set/Makefile.am2
-rw-r--r--src/set/gnunet-service-set.c284
-rw-r--r--src/set/gnunet-service-set.h50
-rw-r--r--src/set/gnunet-service-set_intersection.c139
-rw-r--r--src/set/gnunet-service-set_union.c645
-rw-r--r--src/set/set_api.c12
-rw-r--r--src/set/test_set_union_copy.c1
7 files changed, 598 insertions, 535 deletions
diff --git a/src/set/Makefile.am b/src/set/Makefile.am
index cfe95bc1a..03c258352 100644
--- a/src/set/Makefile.am
+++ b/src/set/Makefile.am
@@ -51,7 +51,7 @@ gnunet_set_ibf_profiler_LDADD = \
51 51
52gnunet_service_set_SOURCES = \ 52gnunet_service_set_SOURCES = \
53 gnunet-service-set.c gnunet-service-set.h \ 53 gnunet-service-set.c gnunet-service-set.h \
54 gnunet-service-set_union.c \ 54 gnunet-service-set_union.c gnunet-service-set_union.h \
55 gnunet-service-set_intersection.c \ 55 gnunet-service-set_intersection.c \
56 ibf.c ibf.h \ 56 ibf.c ibf.h \
57 gnunet-service-set_union_strata_estimator.c gnunet-service-set_union_strata_estimator.h \ 57 gnunet-service-set_union_strata_estimator.c gnunet-service-set_union_strata_estimator.h \
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 454ad9784..8f1506c6a 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -24,6 +24,8 @@
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 */ 25 */
26#include "gnunet-service-set.h" 26#include "gnunet-service-set.h"
27#include "gnunet-service-set_union.h"
28#include "gnunet-service-set_intersection.h"
27#include "gnunet-service-set_protocol.h" 29#include "gnunet-service-set_protocol.h"
28#include "gnunet_statistics_service.h" 30#include "gnunet_statistics_service.h"
29 31
@@ -476,6 +478,7 @@ _GSS_operation_destroy (struct Operation *op,
476 op->channel = NULL; 478 op->channel = NULL;
477 GNUNET_CADET_channel_destroy (channel); 479 GNUNET_CADET_channel_destroy (channel);
478 } 480 }
481
479 if (GNUNET_YES == gc) 482 if (GNUNET_YES == gc)
480 collect_generation_garbage (set); 483 collect_generation_garbage (set);
481 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, 484 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
@@ -682,7 +685,7 @@ client_disconnect_cb (void *cls,
682 { 685 {
683 struct Operation *curr = op; 686 struct Operation *curr = op;
684 op = op->next; 687 op = op->next;
685 if ( (GNUNET_YES == curr->is_incoming) && 688 if ( (GNUNET_YES == curr->is_incoming) &&
686 (curr->listener == listener) ) 689 (curr->listener == listener) )
687 incoming_destroy (curr); 690 incoming_destroy (curr);
688 } 691 }
@@ -733,6 +736,38 @@ incoming_suggest (struct Operation *incoming,
733 736
734 737
735/** 738/**
739 * Check a request for a set operation from another peer.
740 *
741 * @param cls the operation state
742 * @param msg the received message
743 * @return #GNUNET_OK if the channel should be kept alive,
744 * #GNUNET_SYSERR to destroy the channel
745 */
746static int
747check_incoming_msg (void *cls,
748 const struct OperationRequestMessage *msg)
749{
750 struct Operation *op = cls;
751 const struct GNUNET_MessageHeader *nested_context;
752
753 /* double operation request */
754 if (NULL != op->spec)
755 {
756 GNUNET_break_op (0);
757 return GNUNET_SYSERR;
758 }
759 nested_context = GNUNET_MQ_extract_nested_mh (msg);
760 if ( (NULL != nested_context) &&
761 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
762 {
763 GNUNET_break_op (0);
764 return GNUNET_SYSERR;
765 }
766 return GNUNET_OK;
767}
768
769
770/**
736 * Handle a request for a set operation from another peer. Checks if we 771 * Handle a request for a set operation from another peer. Checks if we
737 * have a listener waiting for such a request (and in that case initiates 772 * have a listener waiting for such a request (and in that case initiates
738 * asking the listener about accepting the connection). If no listener 773 * asking the listener about accepting the connection). If no listener
@@ -744,42 +779,23 @@ incoming_suggest (struct Operation *incoming,
744 * our virtual table and subsequent msgs would be routed differently (as 779 * our virtual table and subsequent msgs would be routed differently (as
745 * we then know what type of operation this is). 780 * we then know what type of operation this is).
746 * 781 *
747 * @param op the operation state 782 * @param cls the operation state
748 * @param mh the received message 783 * @param msg the received message
749 * @return #GNUNET_OK if the channel should be kept alive, 784 * @return #GNUNET_OK if the channel should be kept alive,
750 * #GNUNET_SYSERR to destroy the channel 785 * #GNUNET_SYSERR to destroy the channel
751 */ 786 */
752static int 787static void
753handle_incoming_msg (struct Operation *op, 788handle_incoming_msg (void *cls,
754 const struct GNUNET_MessageHeader *mh) 789 const struct OperationRequestMessage *msg)
755{ 790{
756 const struct OperationRequestMessage *msg; 791 struct Operation *op = cls;
757 struct Listener *listener = op->listener; 792 struct Listener *listener = op->listener;
758 struct OperationSpecification *spec; 793 struct OperationSpecification *spec;
759 const struct GNUNET_MessageHeader *nested_context; 794 const struct GNUNET_MessageHeader *nested_context;
760 795
761 msg = (const struct OperationRequestMessage *) mh;
762 GNUNET_assert (GNUNET_YES == op->is_incoming); 796 GNUNET_assert (GNUNET_YES == op->is_incoming);
763 if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
764 {
765 GNUNET_break_op (0);
766 return GNUNET_SYSERR;
767 }
768 /* double operation request */
769 if (NULL != op->spec)
770 {
771 GNUNET_break_op (0);
772 return GNUNET_SYSERR;
773 }
774 spec = GNUNET_new (struct OperationSpecification); 797 spec = GNUNET_new (struct OperationSpecification);
775 nested_context = GNUNET_MQ_extract_nested_mh (msg); 798 nested_context = GNUNET_MQ_extract_nested_mh (msg);
776 if ( (NULL != nested_context) &&
777 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
778 {
779 GNUNET_break_op (0);
780 GNUNET_free (spec);
781 return GNUNET_SYSERR;
782 }
783 /* Make a copy of the nested_context (application-specific context 799 /* Make a copy of the nested_context (application-specific context
784 information that is opaque to set) so we can pass it to the 800 information that is opaque to set) so we can pass it to the
785 listener later on */ 801 listener later on */
@@ -792,7 +808,6 @@ handle_incoming_msg (struct Operation *op,
792 spec->peer = op->peer; 808 spec->peer = op->peer;
793 spec->remote_element_count = ntohl (msg->element_count); 809 spec->remote_element_count = ntohl (msg->element_count);
794 op->spec = spec; 810 op->spec = spec;
795
796 listener = op->listener; 811 listener = op->listener;
797 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 812 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
798 "Received P2P operation request (op %u, port %s) for active listener\n", 813 "Received P2P operation request (op %u, port %s) for active listener\n",
@@ -800,7 +815,6 @@ handle_incoming_msg (struct Operation *op,
800 GNUNET_h2s (&listener->app_id)); 815 GNUNET_h2s (&listener->app_id));
801 incoming_suggest (op, 816 incoming_suggest (op,
802 listener); 817 listener);
803 return GNUNET_OK;
804} 818}
805 819
806 820
@@ -1103,9 +1117,11 @@ handle_client_create_set (void *cls,
1103 { 1117 {
1104 case GNUNET_SET_OPERATION_INTERSECTION: 1118 case GNUNET_SET_OPERATION_INTERSECTION:
1105 set->vt = _GSS_intersection_vt (); 1119 set->vt = _GSS_intersection_vt ();
1120 set->type = OT_INTERSECTION;
1106 break; 1121 break;
1107 case GNUNET_SET_OPERATION_UNION: 1122 case GNUNET_SET_OPERATION_UNION:
1108 set->vt = _GSS_union_vt (); 1123 set->vt = _GSS_union_vt ();
1124 set->type = OT_UNION;
1109 break; 1125 break;
1110 default: 1126 default:
1111 GNUNET_free (set); 1127 GNUNET_free (set);
@@ -1196,7 +1212,6 @@ channel_new_cb (void *cls,
1196 const struct GNUNET_PeerIdentity *source) 1212 const struct GNUNET_PeerIdentity *source)
1197{ 1213{
1198 static const struct SetVT incoming_vt = { 1214 static const struct SetVT incoming_vt = {
1199 .msg_handler = &handle_incoming_msg,
1200 .peer_disconnect = &handle_incoming_disconnect 1215 .peer_disconnect = &handle_incoming_disconnect
1201 }; 1216 };
1202 struct Listener *listener = cls; 1217 struct Listener *listener = cls;
@@ -1290,60 +1305,6 @@ channel_window_cb (void *cls,
1290 /* FIXME: not implemented, we could do flow control here... */ 1305 /* FIXME: not implemented, we could do flow control here... */
1291} 1306}
1292 1307
1293/**
1294 * FIXME: hack-job. Migrate to proper handler array use!
1295 *
1296 * @param cls local state associated with the channel.
1297 * @param message The actual message.
1298 */
1299static int
1300check_p2p_message (void *cls,
1301 const struct GNUNET_MessageHeader *message)
1302{
1303 return GNUNET_OK;
1304}
1305
1306
1307/**
1308 * FIXME: hack-job. Migrate to proper handler array use!
1309 *
1310 * Functions with this signature are called whenever a message is
1311 * received via a cadet channel.
1312 *
1313 * The msg_handler is a virtual table set in initially either when a peer
1314 * creates a new channel with us, or once we create a new channel
1315 * ourselves (evaluate).
1316 *
1317 * Once we know the exact type of operation (union/intersection), the vt is
1318 * replaced with an operation specific instance (_GSS_[op]_vt).
1319 *
1320 * @param cls local state associated with the channel.
1321 * @param message The actual message.
1322 */
1323static void
1324handle_p2p_message (void *cls,
1325 const struct GNUNET_MessageHeader *message)
1326{
1327 struct Operation *op = cls;
1328 int ret;
1329
1330 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331 "Dispatching cadet message (type: %u)\n",
1332 ntohs (message->type));
1333 /* do this before the handler, as the handler might kill the channel */
1334 GNUNET_CADET_receive_done (op->channel);
1335 if (NULL != op->vt)
1336 ret = op->vt->msg_handler (op,
1337 message);
1338 else
1339 ret = GNUNET_SYSERR;
1340 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1341 "Handled cadet message (type: %u)\n",
1342 ntohs (message->type));
1343 if (GNUNET_OK != ret)
1344 GNUNET_CADET_channel_destroy (op->channel);
1345}
1346
1347 1308
1348/** 1309/**
1349 * Called when a client wants to create a new listener. 1310 * Called when a client wants to create a new listener.
@@ -1357,66 +1318,66 @@ handle_client_listen (void *cls,
1357{ 1318{
1358 struct GNUNET_SERVICE_Client *client = cls; 1319 struct GNUNET_SERVICE_Client *client = cls;
1359 struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 1320 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1360 GNUNET_MQ_hd_var_size (p2p_message, 1321 GNUNET_MQ_hd_var_size (incoming_msg,
1361 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 1322 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1362 struct GNUNET_MessageHeader, 1323 struct OperationRequestMessage,
1363 NULL), 1324 NULL),
1364 GNUNET_MQ_hd_var_size (p2p_message, 1325 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1365 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 1326 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1366 struct GNUNET_MessageHeader, 1327 struct IBFMessage,
1367 NULL), 1328 NULL),
1368 GNUNET_MQ_hd_var_size (p2p_message, 1329 GNUNET_MQ_hd_var_size (union_p2p_elements,
1369 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 1330 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1370 struct GNUNET_MessageHeader, 1331 struct GNUNET_SET_ElementMessage,
1371 NULL), 1332 NULL),
1372 GNUNET_MQ_hd_var_size (p2p_message, 1333 GNUNET_MQ_hd_var_size (union_p2p_offer,
1373 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 1334 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1374 struct GNUNET_MessageHeader, 1335 struct GNUNET_MessageHeader,
1375 NULL), 1336 NULL),
1376 GNUNET_MQ_hd_var_size (p2p_message, 1337 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1377 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 1338 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1378 struct GNUNET_MessageHeader, 1339 struct InquiryMessage,
1379 NULL), 1340 NULL),
1380 GNUNET_MQ_hd_var_size (p2p_message, 1341 GNUNET_MQ_hd_var_size (union_p2p_demand,
1381 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 1342 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1382 struct GNUNET_MessageHeader, 1343 struct GNUNET_MessageHeader,
1383 NULL), 1344 NULL),
1384 GNUNET_MQ_hd_var_size (p2p_message, 1345 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1385 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 1346 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1386 struct GNUNET_MessageHeader, 1347 struct GNUNET_MessageHeader,
1387 NULL), 1348 NULL),
1388 GNUNET_MQ_hd_var_size (p2p_message, 1349 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1389 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, 1350 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1390 struct GNUNET_MessageHeader, 1351 struct GNUNET_MessageHeader,
1391 NULL), 1352 NULL),
1392 GNUNET_MQ_hd_var_size (p2p_message, 1353 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1393 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, 1354 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1394 struct GNUNET_MessageHeader, 1355 struct GNUNET_MessageHeader,
1395 NULL), 1356 NULL),
1396 GNUNET_MQ_hd_var_size (p2p_message, 1357 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1397 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 1358 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1398 struct GNUNET_MessageHeader, 1359 struct StrataEstimatorMessage,
1399 NULL), 1360 NULL),
1400 GNUNET_MQ_hd_var_size (p2p_message, 1361 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1401 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, 1362 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1402 struct GNUNET_MessageHeader, 1363 struct StrataEstimatorMessage,
1403 NULL), 1364 NULL),
1404 GNUNET_MQ_hd_var_size (p2p_message, 1365 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1405 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, 1366 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1406 struct GNUNET_MessageHeader, 1367 struct GNUNET_SET_ElementMessage,
1407 NULL), 1368 NULL),
1408 GNUNET_MQ_hd_var_size (p2p_message, 1369 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1409 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 1370 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1410 struct GNUNET_MessageHeader, 1371 struct IntersectionElementInfoMessage,
1411 NULL), 1372 NULL),
1412 GNUNET_MQ_hd_var_size (p2p_message, 1373 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1413 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 1374 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1414 struct GNUNET_MessageHeader, 1375 struct BFMessage,
1415 NULL),
1416 GNUNET_MQ_hd_var_size (p2p_message,
1417 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1418 struct GNUNET_MessageHeader,
1419 NULL), 1376 NULL),
1377 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1378 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1379 struct IntersectionDoneMessage,
1380 NULL),
1420 GNUNET_MQ_handler_end () 1381 GNUNET_MQ_handler_end ()
1421 }; 1382 };
1422 struct Listener *listener; 1383 struct Listener *listener;
@@ -1623,66 +1584,66 @@ handle_client_evaluate (void *cls,
1623 struct GNUNET_SERVICE_Client *client = cls; 1584 struct GNUNET_SERVICE_Client *client = cls;
1624 struct Operation *op = GNUNET_new (struct Operation); 1585 struct Operation *op = GNUNET_new (struct Operation);
1625 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { 1586 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1626 GNUNET_MQ_hd_var_size (p2p_message, 1587 GNUNET_MQ_hd_var_size (incoming_msg,
1627 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 1588 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1628 struct GNUNET_MessageHeader, 1589 struct OperationRequestMessage,
1629 op), 1590 op),
1630 GNUNET_MQ_hd_var_size (p2p_message, 1591 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1631 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 1592 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
1632 struct GNUNET_MessageHeader, 1593 struct IBFMessage,
1633 op), 1594 op),
1634 GNUNET_MQ_hd_var_size (p2p_message, 1595 GNUNET_MQ_hd_var_size (union_p2p_elements,
1635 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 1596 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
1636 struct GNUNET_MessageHeader, 1597 struct GNUNET_SET_ElementMessage,
1637 op), 1598 op),
1638 GNUNET_MQ_hd_var_size (p2p_message, 1599 GNUNET_MQ_hd_var_size (union_p2p_offer,
1639 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 1600 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
1640 struct GNUNET_MessageHeader, 1601 struct GNUNET_MessageHeader,
1641 op), 1602 op),
1642 GNUNET_MQ_hd_var_size (p2p_message, 1603 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1643 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 1604 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
1644 struct GNUNET_MessageHeader, 1605 struct InquiryMessage,
1645 op), 1606 op),
1646 GNUNET_MQ_hd_var_size (p2p_message, 1607 GNUNET_MQ_hd_var_size (union_p2p_demand,
1647 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 1608 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
1648 struct GNUNET_MessageHeader, 1609 struct GNUNET_MessageHeader,
1649 op), 1610 op),
1650 GNUNET_MQ_hd_var_size (p2p_message, 1611 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1651 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 1612 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
1652 struct GNUNET_MessageHeader, 1613 struct GNUNET_MessageHeader,
1653 op), 1614 op),
1654 GNUNET_MQ_hd_var_size (p2p_message, 1615 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1616 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1617 struct GNUNET_MessageHeader,
1618 op),
1619 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1620 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1621 struct GNUNET_MessageHeader,
1622 op),
1623 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1655 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 1624 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
1656 struct GNUNET_MessageHeader, 1625 struct StrataEstimatorMessage,
1657 op), 1626 op),
1658 GNUNET_MQ_hd_var_size (p2p_message, 1627 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1659 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, 1628 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
1660 struct GNUNET_MessageHeader, 1629 struct StrataEstimatorMessage,
1661 op),
1662 GNUNET_MQ_hd_var_size (p2p_message,
1663 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
1664 struct GNUNET_MessageHeader,
1665 op),
1666 GNUNET_MQ_hd_var_size (p2p_message,
1667 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
1668 struct GNUNET_MessageHeader,
1669 op), 1630 op),
1670 GNUNET_MQ_hd_var_size (p2p_message, 1631 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1671 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, 1632 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
1672 struct GNUNET_MessageHeader, 1633 struct GNUNET_SET_ElementMessage,
1673 op),
1674 GNUNET_MQ_hd_var_size (p2p_message,
1675 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1676 struct GNUNET_MessageHeader,
1677 op), 1634 op),
1678 GNUNET_MQ_hd_var_size (p2p_message, 1635 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1636 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
1637 struct IntersectionElementInfoMessage,
1638 op),
1639 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1679 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 1640 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
1680 struct GNUNET_MessageHeader, 1641 struct BFMessage,
1681 op),
1682 GNUNET_MQ_hd_var_size (p2p_message,
1683 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1684 struct GNUNET_MessageHeader,
1685 op), 1642 op),
1643 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1644 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
1645 struct IntersectionDoneMessage,
1646 op),
1686 GNUNET_MQ_handler_end () 1647 GNUNET_MQ_handler_end ()
1687 }; 1648 };
1688 struct Set *set; 1649 struct Set *set;
@@ -1717,7 +1678,7 @@ handle_client_evaluate (void *cls,
1717 // mutations won't interfer with the running operation. 1678 // mutations won't interfer with the running operation.
1718 op->generation_created = set->current_generation; 1679 op->generation_created = set->current_generation;
1719 advance_generation (set); 1680 advance_generation (set);
1720 1681 op->type = set->type;
1721 op->vt = set->vt; 1682 op->vt = set->vt;
1722 GNUNET_CONTAINER_DLL_insert (set->ops_head, 1683 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1723 set->ops_tail, 1684 set->ops_tail,
@@ -1886,9 +1847,11 @@ handle_client_copy_lazy_connect (void *cls,
1886 { 1847 {
1887 case GNUNET_SET_OPERATION_INTERSECTION: 1848 case GNUNET_SET_OPERATION_INTERSECTION:
1888 set->vt = _GSS_intersection_vt (); 1849 set->vt = _GSS_intersection_vt ();
1850 set->type = OT_INTERSECTION;
1889 break; 1851 break;
1890 case GNUNET_SET_OPERATION_UNION: 1852 case GNUNET_SET_OPERATION_UNION:
1891 set->vt = _GSS_union_vt (); 1853 set->vt = _GSS_union_vt ();
1854 set->type = OT_UNION;
1892 break; 1855 break;
1893 default: 1856 default:
1894 GNUNET_assert (0); 1857 GNUNET_assert (0);
@@ -2057,6 +2020,7 @@ handle_client_accept (void *cls,
2057 advance_generation (set); 2020 advance_generation (set);
2058 2021
2059 op->vt = set->vt; 2022 op->vt = set->vt;
2023 op->type = set->type;
2060 op->vt->accept (op); 2024 op->vt->accept (op);
2061 GNUNET_SERVICE_client_continue (client); 2025 GNUNET_SERVICE_client_continue (client);
2062} 2026}
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index 68d8fe81f..c981430ef 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -213,20 +213,6 @@ typedef void
213 213
214 214
215/** 215/**
216 * Signature of functions that implement the message handling for
217 * the different set operations.
218 *
219 * @param op operation state
220 * @param msg received message
221 * @return #GNUNET_OK on success, #GNUNET_SYSERR to
222 * destroy the operation and the tunnel
223 */
224typedef int
225(*MsgHandlerImpl) (struct Operation *op,
226 const struct GNUNET_MessageHeader *msg);
227
228
229/**
230 * Signature of functions that implement operation cancellation 216 * Signature of functions that implement operation cancellation
231 * 217 *
232 * @param op operation state 218 * @param op operation state
@@ -276,11 +262,6 @@ struct SetVT
276 DestroySetImpl destroy_set; 262 DestroySetImpl destroy_set;
277 263
278 /** 264 /**
279 * Callback for handling operation-specific messages.
280 */
281 MsgHandlerImpl msg_handler;
282
283 /**
284 * Callback for handling the remote peer's disconnect. 265 * Callback for handling the remote peer's disconnect.
285 */ 266 */
286 PeerDisconnectImpl peer_disconnect; 267 PeerDisconnectImpl peer_disconnect;
@@ -364,6 +345,27 @@ struct Listener;
364 345
365 346
366/** 347/**
348 * Possible set operations.
349 */
350enum OperationType {
351 /**
352 * Operation type unknown.
353 */
354 OT_UNKNOWN = 0,
355
356 /**
357 * We are performing a union.
358 */
359 OT_UNION,
360
361 /**
362 * We are performing an intersection.
363 */
364 OT_INTERSECTION
365};
366
367
368/**
367 * Operation context used to execute a set operation. 369 * Operation context used to execute a set operation.
368 */ 370 */
369struct Operation 371struct Operation
@@ -427,6 +429,11 @@ struct Operation
427 struct GNUNET_SCHEDULER_Task *timeout_task; 429 struct GNUNET_SCHEDULER_Task *timeout_task;
428 430
429 /** 431 /**
432 * What type of operation is this?
433 */
434 enum OperationType type;
435
436 /**
430 * Unique request id for the request from a remote peer, sent to the 437 * Unique request id for the request from a remote peer, sent to the
431 * client, which will accept or reject the request. Set to '0' iff 438 * client, which will accept or reject the request. Set to '0' iff
432 * the request has not been suggested yet. 439 * the request has not been suggested yet.
@@ -582,6 +589,11 @@ struct Set
582 struct Operation *ops_tail; 589 struct Operation *ops_tail;
583 590
584 /** 591 /**
592 * What type of operation is this set for?
593 */
594 enum OperationType type;
595
596 /**
585 * Current generation, that is, number of previously executed 597 * Current generation, that is, number of previously executed
586 * operations and lazy copies on the underlying set content. 598 * operations and lazy copies on the underlying set content.
587 */ 599 */
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index 9fe1eabe6..b298f7b41 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2013, 2014 GNUnet e.V. 3 Copyright (C) 2013-2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -28,6 +28,7 @@
28#include "gnunet-service-set.h" 28#include "gnunet-service-set.h"
29#include "gnunet_block_lib.h" 29#include "gnunet_block_lib.h"
30#include "gnunet-service-set_protocol.h" 30#include "gnunet-service-set_protocol.h"
31#include "gnunet-service-set_intersection.h"
31#include <gcrypt.h> 32#include <gcrypt.h>
32 33
33 34
@@ -550,6 +551,8 @@ send_remaining_elements (void *cls)
550 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 551 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
551 "Sending done and destroy because iterator ran out\n"); 552 "Sending done and destroy because iterator ran out\n");
552 op->keep--; 553 op->keep--;
554 GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
555 op->state->full_result_iter = NULL;
553 send_client_done_and_destroy (op); 556 send_client_done_and_destroy (op);
554 return; 557 return;
555 } 558 }
@@ -627,9 +630,6 @@ process_bf (struct Operation *op)
627 case PHASE_COUNT_SENT: 630 case PHASE_COUNT_SENT:
628 /* This is the first BF being sent, build our initial map with 631 /* This is the first BF being sent, build our initial map with
629 filtering in place */ 632 filtering in place */
630 op->state->my_elements
631 = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
632 GNUNET_YES);
633 op->state->my_element_count = 0; 633 op->state->my_element_count = 0;
634 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, 634 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
635 &filtered_map_initialization, 635 &filtered_map_initialization,
@@ -665,41 +665,53 @@ process_bf (struct Operation *op)
665 665
666 666
667/** 667/**
668 * Check an BF message from a remote peer.
669 *
670 * @param cls the intersection operation
671 * @param msg the header of the message
672 * @return #GNUNET_OK if @a msg is well-formed
673 */
674int
675check_intersection_p2p_bf (void *cls,
676 const struct BFMessage *msg)
677{
678 struct Operation *op = cls;
679
680 if (OT_INTERSECTION != op->type)
681 {
682 GNUNET_break_op (0);
683 return GNUNET_SYSERR;
684 }
685 return GNUNET_OK;
686}
687
688
689/**
668 * Handle an BF message from a remote peer. 690 * Handle an BF message from a remote peer.
669 * 691 *
670 * @param cls the intersection operation 692 * @param cls the intersection operation
671 * @param mh the header of the message 693 * @param msg the header of the message
672 */ 694 */
673static void 695void
674handle_p2p_bf (void *cls, 696handle_intersection_p2p_bf (void *cls,
675 const struct GNUNET_MessageHeader *mh) 697 const struct BFMessage *msg)
676{ 698{
677 struct Operation *op = cls; 699 struct Operation *op = cls;
678 const struct BFMessage *msg;
679 uint32_t bf_size; 700 uint32_t bf_size;
680 uint32_t chunk_size; 701 uint32_t chunk_size;
681 uint32_t bf_bits_per_element; 702 uint32_t bf_bits_per_element;
682 uint16_t msize;
683 703
684 msize = htons (mh->size);
685 if (msize < sizeof (struct BFMessage))
686 {
687 GNUNET_break_op (0);
688 fail_intersection_operation (op);
689 return;
690 }
691 msg = (const struct BFMessage *) mh;
692 switch (op->state->phase) 704 switch (op->state->phase)
693 { 705 {
694 case PHASE_INITIAL: 706 case PHASE_INITIAL:
695 GNUNET_break_op (0); 707 GNUNET_break_op (0);
696 fail_intersection_operation (op); 708 fail_intersection_operation (op);
697 break; 709 return;
698 case PHASE_COUNT_SENT: 710 case PHASE_COUNT_SENT:
699 case PHASE_BF_EXCHANGE: 711 case PHASE_BF_EXCHANGE:
700 bf_size = ntohl (msg->bloomfilter_total_length); 712 bf_size = ntohl (msg->bloomfilter_total_length);
701 bf_bits_per_element = ntohl (msg->bits_per_element); 713 bf_bits_per_element = ntohl (msg->bits_per_element);
702 chunk_size = msize - sizeof (struct BFMessage); 714 chunk_size = htons (msg->header.size) - sizeof (struct BFMessage);
703 op->state->other_xor = msg->element_xor_hash; 715 op->state->other_xor = msg->element_xor_hash;
704 if (bf_size == chunk_size) 716 if (bf_size == chunk_size)
705 { 717 {
@@ -717,7 +729,7 @@ handle_p2p_bf (void *cls,
717 op->state->salt = ntohl (msg->sender_mutator); 729 op->state->salt = ntohl (msg->sender_mutator);
718 op->spec->remote_element_count = ntohl (msg->sender_element_count); 730 op->spec->remote_element_count = ntohl (msg->sender_element_count);
719 process_bf (op); 731 process_bf (op);
720 return; 732 break;
721 } 733 }
722 /* multipart chunk */ 734 /* multipart chunk */
723 if (NULL == op->state->bf_data) 735 if (NULL == op->state->bf_data)
@@ -764,8 +776,9 @@ handle_p2p_bf (void *cls,
764 default: 776 default:
765 GNUNET_break_op (0); 777 GNUNET_break_op (0);
766 fail_intersection_operation (op); 778 fail_intersection_operation (op);
767 break; 779 return;
768 } 780 }
781 GNUNET_CADET_receive_done (op->channel);
769} 782}
770 783
771 784
@@ -836,6 +849,7 @@ static void
836begin_bf_exchange (struct Operation *op) 849begin_bf_exchange (struct Operation *op)
837{ 850{
838 op->state->phase = PHASE_BF_EXCHANGE; 851 op->state->phase = PHASE_BF_EXCHANGE;
852 GNUNET_assert (NULL == op->state->my_elements);
839 op->state->my_elements 853 op->state->my_elements
840 = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, 854 = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
841 GNUNET_YES); 855 GNUNET_YES);
@@ -853,20 +867,18 @@ begin_bf_exchange (struct Operation *op)
853 * @param cls the intersection operation 867 * @param cls the intersection operation
854 * @param mh the header of the message 868 * @param mh the header of the message
855 */ 869 */
856static void 870void
857handle_p2p_element_info (void *cls, 871handle_intersection_p2p_element_info (void *cls,
858 const struct GNUNET_MessageHeader *mh) 872 const struct IntersectionElementInfoMessage *msg)
859{ 873{
860 struct Operation *op = cls; 874 struct Operation *op = cls;
861 const struct IntersectionElementInfoMessage *msg;
862 875
863 if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage)) 876 if (OT_INTERSECTION != op->type)
864 { 877 {
865 GNUNET_break_op (0); 878 GNUNET_break_op (0);
866 fail_intersection_operation(op); 879 fail_intersection_operation(op);
867 return; 880 return;
868 } 881 }
869 msg = (const struct IntersectionElementInfoMessage *) mh;
870 op->spec->remote_element_count = ntohl (msg->sender_element_count); 882 op->spec->remote_element_count = ntohl (msg->sender_element_count);
871 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 883 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872 "Received remote element count (%u), I have %u\n", 884 "Received remote element count (%u), I have %u\n",
@@ -884,6 +896,7 @@ handle_p2p_element_info (void *cls,
884 } 896 }
885 GNUNET_break (NULL == op->state->remote_bf); 897 GNUNET_break (NULL == op->state->remote_bf);
886 begin_bf_exchange (op); 898 begin_bf_exchange (op);
899 GNUNET_CADET_receive_done (op->channel);
887} 900}
888 901
889 902
@@ -955,28 +968,26 @@ filter_all (void *cls,
955 * @param cls the intersection operation 968 * @param cls the intersection operation
956 * @param mh the message 969 * @param mh the message
957 */ 970 */
958static void 971void
959handle_p2p_done (void *cls, 972handle_intersection_p2p_done (void *cls,
960 const struct GNUNET_MessageHeader *mh) 973 const struct IntersectionDoneMessage *idm)
961{ 974{
962 struct Operation *op = cls; 975 struct Operation *op = cls;
963 const struct IntersectionDoneMessage *idm;
964 976
965 if (PHASE_BF_EXCHANGE != op->state->phase) 977 if (OT_INTERSECTION != op->type)
966 { 978 {
967 /* wrong phase to conclude? FIXME: Or should we allow this
968 if the other peer has _initially_ already an empty set? */
969 GNUNET_break_op (0); 979 GNUNET_break_op (0);
970 fail_intersection_operation (op); 980 fail_intersection_operation(op);
971 return; 981 return;
972 } 982 }
973 if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage)) 983 if (PHASE_BF_EXCHANGE != op->state->phase)
974 { 984 {
985 /* wrong phase to conclude? FIXME: Or should we allow this
986 if the other peer has _initially_ already an empty set? */
975 GNUNET_break_op (0); 987 GNUNET_break_op (0);
976 fail_intersection_operation (op); 988 fail_intersection_operation (op);
977 return; 989 return;
978 } 990 }
979 idm = (const struct IntersectionDoneMessage *) mh;
980 if (0 == ntohl (idm->final_element_count)) 991 if (0 == ntohl (idm->final_element_count))
981 { 992 {
982 /* other peer determined empty set is the intersection, 993 /* other peer determined empty set is the intersection,
@@ -1000,6 +1011,7 @@ handle_p2p_done (void *cls,
1000 op->state->my_element_count); 1011 op->state->my_element_count);
1001 op->state->phase = PHASE_FINISHED; 1012 op->state->phase = PHASE_FINISHED;
1002 finish_and_destroy (op); 1013 finish_and_destroy (op);
1014 GNUNET_CADET_receive_done (op->channel);
1003} 1015}
1004 1016
1005 1017
@@ -1064,11 +1076,11 @@ intersection_accept (struct Operation *op)
1064 op->state->phase = PHASE_INITIAL; 1076 op->state->phase = PHASE_INITIAL;
1065 op->state->my_element_count 1077 op->state->my_element_count
1066 = op->spec->set->state->current_set_element_count; 1078 = op->spec->set->state->current_set_element_count;
1079 GNUNET_assert (NULL == op->state->my_elements);
1067 op->state->my_elements 1080 op->state->my_elements
1068 = GNUNET_CONTAINER_multihashmap_create 1081 = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count,
1069 (GNUNET_MIN (op->state->my_element_count, 1082 op->spec->remote_element_count),
1070 op->spec->remote_element_count), 1083 GNUNET_YES);
1071 GNUNET_YES);
1072 if (op->spec->remote_element_count < op->state->my_element_count) 1084 if (op->spec->remote_element_count < op->state->my_element_count)
1073 { 1085 {
1074 /* If the other peer (Alice) has fewer elements than us (Bob), 1086 /* If the other peer (Alice) has fewer elements than us (Bob),
@@ -1083,43 +1095,6 @@ intersection_accept (struct Operation *op)
1083 1095
1084 1096
1085/** 1097/**
1086 * Dispatch messages for a intersection operation.
1087 *
1088 * @param op the state of the intersection evaluate operation
1089 * @param mh the received message
1090 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1091 * #GNUNET_OK otherwise
1092 */
1093static int
1094intersection_handle_p2p_message (struct Operation *op,
1095 const struct GNUNET_MessageHeader *mh)
1096{
1097 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1098 "Received p2p message (t: %u, s: %u)\n",
1099 ntohs (mh->type), ntohs (mh->size));
1100 switch (ntohs (mh->type))
1101 {
1102 /* this message handler is not active until after we received an
1103 * operation request message, thus the ops request is not handled here
1104 */
1105 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
1106 handle_p2p_element_info (op, mh);
1107 break;
1108 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
1109 handle_p2p_bf (op, mh);
1110 break;
1111 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
1112 handle_p2p_done (op, mh);
1113 break;
1114 default:
1115 /* something wrong with cadet's message handlers? */
1116 GNUNET_assert (0);
1117 }
1118 return GNUNET_OK;
1119}
1120
1121
1122/**
1123 * Handler for peer-disconnects, notifies the client about the aborted 1098 * Handler for peer-disconnects, notifies the client about the aborted
1124 * operation. If we did not expect anything from the other peer, we 1099 * operation. If we did not expect anything from the other peer, we
1125 * gracefully terminate the operation. 1100 * gracefully terminate the operation.
@@ -1168,6 +1143,11 @@ intersection_op_cancel (struct Operation *op)
1168 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); 1143 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1169 op->state->my_elements = NULL; 1144 op->state->my_elements = NULL;
1170 } 1145 }
1146 if (NULL != op->state->full_result_iter)
1147 {
1148 GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
1149 op->state->full_result_iter = NULL;
1150 }
1171 GNUNET_free (op->state); 1151 GNUNET_free (op->state);
1172 op->state = NULL; 1152 op->state = NULL;
1173 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1153 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1245,7 +1225,6 @@ _GSS_intersection_vt ()
1245{ 1225{
1246 static const struct SetVT intersection_vt = { 1226 static const struct SetVT intersection_vt = {
1247 .create = &intersection_set_create, 1227 .create = &intersection_set_create,
1248 .msg_handler = &intersection_handle_p2p_message,
1249 .add = &intersection_add, 1228 .add = &intersection_add,
1250 .remove = &intersection_remove, 1229 .remove = &intersection_remove,
1251 .destroy_set = &intersection_set_destroy, 1230 .destroy_set = &intersection_set_destroy,
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index b5b602074..200bd4b8e 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2013-2016 GNUnet e.V. 3 Copyright (C) 2013-2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -19,15 +19,16 @@
19*/ 19*/
20/** 20/**
21 * @file set/gnunet-service-set_union.c 21 * @file set/gnunet-service-set_union.c
22
23 * @brief two-peer set operations 22 * @brief two-peer set operations
24 * @author Florian Dold 23 * @author Florian Dold
24 * @author Christian Grothoff
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_util_lib.h" 27#include "gnunet_util_lib.h"
28#include "gnunet_statistics_service.h" 28#include "gnunet_statistics_service.h"
29#include "gnunet-service-set.h" 29#include "gnunet-service-set.h"
30#include "ibf.h" 30#include "ibf.h"
31#include "gnunet-service-set_union.h"
31#include "gnunet-service-set_union_strata_estimator.h" 32#include "gnunet-service-set_union_strata_estimator.h"
32#include "gnunet-service-set_protocol.h" 33#include "gnunet-service-set_protocol.h"
33#include <gcrypt.h> 34#include <gcrypt.h>
@@ -813,42 +814,56 @@ send_full_set (struct Operation *op)
813 * Handle a strata estimator from a remote peer 814 * Handle a strata estimator from a remote peer
814 * 815 *
815 * @param cls the union operation 816 * @param cls the union operation
816 * @param mh the message 817 * @param msg the message
817 * @param is_compressed #GNUNET_YES if the estimator is compressed
818 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
819 * #GNUNET_OK otherwise
820 */ 818 */
821static int 819int
822handle_p2p_strata_estimator (void *cls, 820check_union_p2p_strata_estimator (void *cls,
823 const struct GNUNET_MessageHeader *mh, 821 const struct StrataEstimatorMessage *msg)
824 int is_compressed)
825{ 822{
826 struct Operation *op = cls; 823 struct Operation *op = cls;
827 struct StrataEstimator *remote_se; 824 int is_compressed;
828 struct StrataEstimatorMessage *msg = (void *) mh;
829 unsigned int diff;
830 uint64_t other_size;
831 size_t len; 825 size_t len;
832 826
833 GNUNET_STATISTICS_update (_GSS_statistics,
834 "# bytes of SE received",
835 ntohs (mh->size),
836 GNUNET_NO);
837
838 if (op->state->phase != PHASE_EXPECT_SE) 827 if (op->state->phase != PHASE_EXPECT_SE)
839 { 828 {
840 GNUNET_break (0); 829 GNUNET_break (0);
841 fail_union_operation (op);
842 return GNUNET_SYSERR; 830 return GNUNET_SYSERR;
843 } 831 }
844 len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage); 832 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
833 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
845 if ( (GNUNET_NO == is_compressed) && 834 if ( (GNUNET_NO == is_compressed) &&
846 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) 835 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
847 { 836 {
848 fail_union_operation (op);
849 GNUNET_break (0); 837 GNUNET_break (0);
850 return GNUNET_SYSERR; 838 return GNUNET_SYSERR;
851 } 839 }
840 return GNUNET_OK;
841}
842
843
844/**
845 * Handle a strata estimator from a remote peer
846 *
847 * @param cls the union operation
848 * @param msg the message
849 */
850void
851handle_union_p2p_strata_estimator (void *cls,
852 const struct StrataEstimatorMessage *msg)
853{
854 struct Operation *op = cls;
855 struct StrataEstimator *remote_se;
856 unsigned int diff;
857 uint64_t other_size;
858 size_t len;
859 int is_compressed;
860
861 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
862 GNUNET_STATISTICS_update (_GSS_statistics,
863 "# bytes of SE received",
864 ntohs (msg->header.size),
865 GNUNET_NO);
866 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
852 other_size = GNUNET_ntohll (msg->set_size); 867 other_size = GNUNET_ntohll (msg->set_size);
853 remote_se = strata_estimator_create (SE_STRATA_COUNT, 868 remote_se = strata_estimator_create (SE_STRATA_COUNT,
854 SE_IBF_SIZE, 869 SE_IBF_SIZE,
@@ -857,7 +872,7 @@ handle_p2p_strata_estimator (void *cls,
857 { 872 {
858 /* insufficient resources, fail */ 873 /* insufficient resources, fail */
859 fail_union_operation (op); 874 fail_union_operation (op);
860 return GNUNET_SYSERR; 875 return;
861 } 876 }
862 if (GNUNET_OK != 877 if (GNUNET_OK !=
863 strata_estimator_read (&msg[1], 878 strata_estimator_read (&msg[1],
@@ -866,18 +881,16 @@ handle_p2p_strata_estimator (void *cls,
866 remote_se)) 881 remote_se))
867 { 882 {
868 /* decompression failed */ 883 /* decompression failed */
869 fail_union_operation (op);
870 strata_estimator_destroy (remote_se); 884 strata_estimator_destroy (remote_se);
871 return GNUNET_SYSERR; 885 fail_union_operation (op);
886 return;
872 } 887 }
873 GNUNET_assert (NULL != op->state->se); 888 GNUNET_assert (NULL != op->state->se);
874 diff = strata_estimator_difference (remote_se, 889 diff = strata_estimator_difference (remote_se,
875 op->state->se); 890 op->state->se);
876 891
877 if (diff > 200) 892 if (diff > 200)
878 diff = diff * 3 / 2; 893 diff = diff * 3 / 2;
879
880
881 894
882 strata_estimator_destroy (remote_se); 895 strata_estimator_destroy (remote_se);
883 strata_estimator_destroy (op->state->se); 896 strata_estimator_destroy (op->state->se);
@@ -885,12 +898,14 @@ handle_p2p_strata_estimator (void *cls,
885 LOG (GNUNET_ERROR_TYPE_DEBUG, 898 LOG (GNUNET_ERROR_TYPE_DEBUG,
886 "got se diff=%d, using ibf size %d\n", 899 "got se diff=%d, using ibf size %d\n",
887 diff, 900 diff,
888 1<<get_order_from_difference (diff)); 901 1U << get_order_from_difference (diff));
889 902
890 { 903 {
891 char *set_debug; 904 char *set_debug;
905
892 set_debug = getenv ("GNUNET_SET_BENCHMARK"); 906 set_debug = getenv ("GNUNET_SET_BENCHMARK");
893 if ( (NULL != set_debug) && (0 == strcmp (set_debug, "1")) ) 907 if ( (NULL != set_debug) &&
908 (0 == strcmp (set_debug, "1")) )
894 { 909 {
895 FILE *f = fopen ("set.log", "a"); 910 FILE *f = fopen ("set.log", "a");
896 fprintf (f, "%llu\n", (unsigned long long) diff); 911 fprintf (f, "%llu\n", (unsigned long long) diff);
@@ -898,15 +913,16 @@ handle_p2p_strata_estimator (void *cls,
898 } 913 }
899 } 914 }
900 915
901 if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound)) 916 if ( (GNUNET_YES == op->spec->byzantine) &&
917 (other_size < op->spec->byzantine_lower_bound) )
902 { 918 {
903 GNUNET_break (0); 919 GNUNET_break (0);
904 fail_union_operation (op); 920 fail_union_operation (op);
905 return GNUNET_SYSERR; 921 return;
906 } 922 }
907 923
908 924 if ( (GNUNET_YES == op->spec->force_full) ||
909 if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4)) 925 (diff > op->state->initial_size / 4))
910 { 926 {
911 LOG (GNUNET_ERROR_TYPE_INFO, 927 LOG (GNUNET_ERROR_TYPE_INFO,
912 "Sending full set (diff=%d, own set=%u)\n", 928 "Sending full set (diff=%d, own set=%u)\n",
@@ -923,6 +939,7 @@ handle_p2p_strata_estimator (void *cls,
923 else 939 else
924 { 940 {
925 struct GNUNET_MQ_Envelope *ev; 941 struct GNUNET_MQ_Envelope *ev;
942
926 op->state->phase = PHASE_EXPECT_IBF; 943 op->state->phase = PHASE_EXPECT_IBF;
927 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); 944 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
928 GNUNET_MQ_send (op->mq, ev); 945 GNUNET_MQ_send (op->mq, ev);
@@ -942,11 +959,10 @@ handle_p2p_strata_estimator (void *cls,
942 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 959 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
943 "Failed to send IBF, closing connection\n"); 960 "Failed to send IBF, closing connection\n");
944 fail_union_operation (op); 961 fail_union_operation (op);
945 return GNUNET_SYSERR; 962 return;
946 } 963 }
947 } 964 }
948 965 GNUNET_CADET_receive_done (op->channel);
949 return GNUNET_OK;
950} 966}
951 967
952 968
@@ -1164,99 +1180,116 @@ decode_and_send (struct Operation *op)
1164 1180
1165 1181
1166/** 1182/**
1167 * Handle an IBF message from a remote peer. 1183 * Check an IBF message from a remote peer.
1168 * 1184 *
1169 * Reassemble the IBF from multiple pieces, and 1185 * Reassemble the IBF from multiple pieces, and
1170 * process the whole IBF once possible. 1186 * process the whole IBF once possible.
1171 * 1187 *
1172 * @param cls the union operation 1188 * @param cls the union operation
1173 * @param mh the header of the message 1189 * @param msg the header of the message
1174 * @return #GNUNET_SYSERR if the tunnel should be disconnected, 1190 * @return #GNUNET_OK if @a msg is well-formed
1175 * #GNUNET_OK otherwise
1176 */ 1191 */
1177static int 1192int
1178handle_p2p_ibf (void *cls, 1193check_union_p2p_ibf (void *cls,
1179 const struct GNUNET_MessageHeader *mh) 1194 const struct IBFMessage *msg)
1180{ 1195{
1181 struct Operation *op = cls; 1196 struct Operation *op = cls;
1182 const struct IBFMessage *msg;
1183 unsigned int buckets_in_message; 1197 unsigned int buckets_in_message;
1184 1198
1185 if (ntohs (mh->size) < sizeof (struct IBFMessage)) 1199 if (OT_UNION != op->type)
1186 { 1200 {
1187 GNUNET_break_op (0); 1201 GNUNET_break_op (0);
1188 fail_union_operation (op);
1189 return GNUNET_SYSERR; 1202 return GNUNET_SYSERR;
1190 } 1203 }
1191 msg = (const struct IBFMessage *) mh; 1204 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1192 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || 1205 if (0 == buckets_in_message)
1193 (op->state->phase == PHASE_EXPECT_IBF) )
1194 { 1206 {
1195 op->state->phase = PHASE_EXPECT_IBF_CONT; 1207 GNUNET_break_op (0);
1196 GNUNET_assert (NULL == op->state->remote_ibf); 1208 return GNUNET_SYSERR;
1197 LOG (GNUNET_ERROR_TYPE_DEBUG,
1198 "Creating new ibf of size %u\n",
1199 1 << msg->order);
1200 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1201 op->state->salt_receive = ntohl (msg->salt);
1202 LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1203 if (NULL == op->state->remote_ibf)
1204 {
1205 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1206 "Failed to parse remote IBF, closing connection\n");
1207 fail_union_operation (op);
1208 return GNUNET_SYSERR;
1209 }
1210 op->state->ibf_buckets_received = 0;
1211 if (0 != ntohl (msg->offset))
1212 {
1213 GNUNET_break_op (0);
1214 fail_union_operation (op);
1215 return GNUNET_SYSERR;
1216 }
1217 } 1209 }
1218 else if (op->state->phase == PHASE_EXPECT_IBF_CONT) 1210 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1211 {
1212 GNUNET_break_op (0);
1213 return GNUNET_SYSERR;
1214 }
1215 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1219 { 1216 {
1220 if (ntohl (msg->offset) != op->state->ibf_buckets_received) 1217 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1221 { 1218 {
1222 GNUNET_break_op (0); 1219 GNUNET_break_op (0);
1223 fail_union_operation (op);
1224 return GNUNET_SYSERR; 1220 return GNUNET_SYSERR;
1225 } 1221 }
1226 if (1<<msg->order != op->state->remote_ibf->size) 1222 if (1<<msg->order != op->state->remote_ibf->size)
1227 { 1223 {
1228 GNUNET_break_op (0); 1224 GNUNET_break_op (0);
1229 fail_union_operation (op);
1230 return GNUNET_SYSERR; 1225 return GNUNET_SYSERR;
1231 } 1226 }
1232 if (ntohl (msg->salt) != op->state->salt_receive) 1227 if (ntohl (msg->salt) != op->state->salt_receive)
1233 { 1228 {
1234 GNUNET_break_op (0); 1229 GNUNET_break_op (0);
1235 fail_union_operation (op);
1236 return GNUNET_SYSERR; 1230 return GNUNET_SYSERR;
1237 } 1231 }
1238 } 1232 }
1239 else 1233 else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1234 (op->state->phase != PHASE_EXPECT_IBF) )
1240 { 1235 {
1241 GNUNET_assert (0); 1236 GNUNET_break_op (0);
1237 return GNUNET_SYSERR;
1242 } 1238 }
1243 1239
1244 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; 1240 return GNUNET_OK;
1241}
1245 1242
1246 if (0 == buckets_in_message) 1243
1244/**
1245 * Handle an IBF message from a remote peer.
1246 *
1247 * Reassemble the IBF from multiple pieces, and
1248 * process the whole IBF once possible.
1249 *
1250 * @param cls the union operation
1251 * @param msg the header of the message
1252 */
1253void
1254handle_union_p2p_ibf (void *cls,
1255 const struct IBFMessage *msg)
1256{
1257 struct Operation *op = cls;
1258 unsigned int buckets_in_message;
1259
1260 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1261 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1262 (op->state->phase == PHASE_EXPECT_IBF) )
1247 { 1263 {
1248 GNUNET_break_op (0); 1264 op->state->phase = PHASE_EXPECT_IBF_CONT;
1249 fail_union_operation (op); 1265 GNUNET_assert (NULL == op->state->remote_ibf);
1250 return GNUNET_SYSERR; 1266 LOG (GNUNET_ERROR_TYPE_DEBUG,
1267 "Creating new ibf of size %u\n",
1268 1 << msg->order);
1269 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1270 op->state->salt_receive = ntohl (msg->salt);
1271 LOG (GNUNET_ERROR_TYPE_DEBUG,
1272 "Receiving new IBF with salt %u\n",
1273 op->state->salt_receive);
1274 if (NULL == op->state->remote_ibf)
1275 {
1276 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1277 "Failed to parse remote IBF, closing connection\n");
1278 fail_union_operation (op);
1279 return;
1280 }
1281 op->state->ibf_buckets_received = 0;
1282 if (0 != ntohl (msg->offset))
1283 {
1284 GNUNET_break_op (0);
1285 fail_union_operation (op);
1286 return;
1287 }
1251 } 1288 }
1252 1289 else
1253 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1254 { 1290 {
1255 GNUNET_break_op (0); 1291 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1256 fail_union_operation (op);
1257 return GNUNET_SYSERR;
1258 } 1292 }
1259
1260 GNUNET_assert (NULL != op->state->remote_ibf); 1293 GNUNET_assert (NULL != op->state->remote_ibf);
1261 1294
1262 ibf_read_slice (&msg[1], 1295 ibf_read_slice (&msg[1],
@@ -1276,10 +1309,11 @@ handle_p2p_ibf (void *cls,
1276 /* Internal error, best we can do is shut down */ 1309 /* Internal error, best we can do is shut down */
1277 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1310 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1278 "Failed to decode IBF, closing connection\n"); 1311 "Failed to decode IBF, closing connection\n");
1279 return GNUNET_SYSERR; 1312 fail_union_operation (op);
1313 return;
1280 } 1314 }
1281 } 1315 }
1282 return GNUNET_OK; 1316 GNUNET_CADET_receive_done (op->channel);
1283} 1317}
1284 1318
1285 1319
@@ -1343,6 +1377,11 @@ send_done_and_destroy (void *cls)
1343} 1377}
1344 1378
1345 1379
1380/**
1381 * Tests if the operation is finished, and if so notify.
1382 *
1383 * @param op operation to check
1384 */
1346static void 1385static void
1347maybe_finish (struct Operation *op) 1386maybe_finish (struct Operation *op)
1348{ 1387{
@@ -1382,46 +1421,59 @@ maybe_finish (struct Operation *op)
1382 1421
1383 1422
1384/** 1423/**
1385 * Handle an element message from a remote peer. 1424 * Check an element message from a remote peer.
1386 * Sent by the other peer either because we decoded an IBF and placed a demand,
1387 * or because the other peer switched to full set transmission.
1388 * 1425 *
1389 * @param cls the union operation 1426 * @param cls the union operation
1390 * @param mh the message 1427 * @param emsg the message
1391 */ 1428 */
1392static void 1429int
1393handle_p2p_elements (void *cls, 1430check_union_p2p_elements (void *cls,
1394 const struct GNUNET_MessageHeader *mh) 1431 const struct GNUNET_SET_ElementMessage *emsg)
1395{ 1432{
1396 struct Operation *op = cls; 1433 struct Operation *op = cls;
1397 struct ElementEntry *ee;
1398 const struct GNUNET_SET_ElementMessage *emsg;
1399 uint16_t element_size;
1400 1434
1401 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) 1435 if (OT_UNION != op->type)
1402 { 1436 {
1403 GNUNET_break_op (0); 1437 GNUNET_break_op (0);
1404 fail_union_operation (op); 1438 return GNUNET_SYSERR;
1405 return;
1406 } 1439 }
1407 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) 1440 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1408 { 1441 {
1409 GNUNET_break_op (0); 1442 GNUNET_break_op (0);
1410 fail_union_operation (op); 1443 return GNUNET_SYSERR;
1411 return;
1412 } 1444 }
1445 return GNUNET_OK;
1446}
1447
1413 1448
1414 emsg = (const struct GNUNET_SET_ElementMessage *) mh; 1449/**
1450 * Handle an element message from a remote peer.
1451 * Sent by the other peer either because we decoded an IBF and placed a demand,
1452 * or because the other peer switched to full set transmission.
1453 *
1454 * @param cls the union operation
1455 * @param emsg the message
1456 */
1457void
1458handle_union_p2p_elements (void *cls,
1459 const struct GNUNET_SET_ElementMessage *emsg)
1460{
1461 struct Operation *op = cls;
1462 struct ElementEntry *ee;
1463 struct KeyEntry *ke;
1464 uint16_t element_size;
1415 1465
1416 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); 1466 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1417 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); 1467 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1418 GNUNET_memcpy (&ee[1], &emsg[1], element_size); 1468 GNUNET_memcpy (&ee[1],
1469 &emsg[1],
1470 element_size);
1419 ee->element.size = element_size; 1471 ee->element.size = element_size;
1420 ee->element.data = &ee[1]; 1472 ee->element.data = &ee[1];
1421 ee->element.element_type = ntohs (emsg->element_type); 1473 ee->element.element_type = ntohs (emsg->element_type);
1422 ee->remote = GNUNET_YES; 1474 ee->remote = GNUNET_YES;
1423 GNUNET_SET_element_hash (&ee->element, &ee->element_hash); 1475 GNUNET_SET_element_hash (&ee->element,
1424 1476 &ee->element_hash);
1425 if (GNUNET_NO == 1477 if (GNUNET_NO ==
1426 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, 1478 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1427 &ee->element_hash, 1479 &ee->element_hash,
@@ -1429,7 +1481,6 @@ handle_p2p_elements (void *cls,
1429 { 1481 {
1430 /* We got something we didn't demand, since it's not in our map. */ 1482 /* We got something we didn't demand, since it's not in our map. */
1431 GNUNET_break_op (0); 1483 GNUNET_break_op (0);
1432 GNUNET_free (ee);
1433 fail_union_operation (op); 1484 fail_union_operation (op);
1434 return; 1485 return;
1435 } 1486 }
@@ -1448,10 +1499,9 @@ handle_p2p_elements (void *cls,
1448 1, 1499 1,
1449 GNUNET_NO); 1500 GNUNET_NO);
1450 1501
1451 op->state->received_total += 1; 1502 op->state->received_total++;
1452
1453 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1454 1503
1504 ke = op_get_element (op, &ee->element_hash);
1455 if (NULL != ke) 1505 if (NULL != ke)
1456 { 1506 {
1457 /* Got repeated element. Should not happen since 1507 /* Got repeated element. Should not happen since
@@ -1467,7 +1517,7 @@ handle_p2p_elements (void *cls,
1467 { 1517 {
1468 LOG (GNUNET_ERROR_TYPE_DEBUG, 1518 LOG (GNUNET_ERROR_TYPE_DEBUG,
1469 "Registering new element from remote peer\n"); 1519 "Registering new element from remote peer\n");
1470 op->state->received_fresh += 1; 1520 op->state->received_fresh++;
1471 op_register_element (op, ee, GNUNET_YES); 1521 op_register_element (op, ee, GNUNET_YES);
1472 /* only send results immediately if the client wants it */ 1522 /* only send results immediately if the client wants it */
1473 switch (op->spec->result_mode) 1523 switch (op->spec->result_mode)
@@ -1485,43 +1535,57 @@ handle_p2p_elements (void *cls,
1485 } 1535 }
1486 } 1536 }
1487 1537
1488 if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3) 1538 if ( (op->state->received_total > 8) &&
1539 (op->state->received_fresh < op->state->received_total / 3) )
1489 { 1540 {
1490 /* The other peer gave us lots of old elements, there's something wrong. */ 1541 /* The other peer gave us lots of old elements, there's something wrong. */
1491 GNUNET_break_op (0); 1542 GNUNET_break_op (0);
1492 fail_union_operation (op); 1543 fail_union_operation (op);
1493 return; 1544 return;
1494 } 1545 }
1495 1546 GNUNET_CADET_receive_done (op->channel);
1496 maybe_finish (op); 1547 maybe_finish (op);
1497} 1548}
1498 1549
1499 1550
1500/** 1551/**
1501 * Handle an element message from a remote peer. 1552 * Check a full element message from a remote peer.
1502 * 1553 *
1503 * @param cls the union operation 1554 * @param cls the union operation
1504 * @param mh the message 1555 * @param emsg the message
1505 */ 1556 */
1506static void 1557int
1507handle_p2p_full_element (void *cls, 1558check_union_p2p_full_element (void *cls,
1508 const struct GNUNET_MessageHeader *mh) 1559 const struct GNUNET_SET_ElementMessage *emsg)
1509{ 1560{
1510 struct Operation *op = cls; 1561 struct Operation *op = cls;
1511 struct ElementEntry *ee;
1512 const struct GNUNET_SET_ElementMessage *emsg;
1513 uint16_t element_size;
1514 1562
1515 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) 1563 if (OT_UNION != op->type)
1516 { 1564 {
1517 GNUNET_break_op (0); 1565 GNUNET_break_op (0);
1518 fail_union_operation (op); 1566 return GNUNET_SYSERR;
1519 return;
1520 } 1567 }
1568 // FIXME: check that we expect full elements here?
1569 return GNUNET_OK;
1570}
1521 1571
1522 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1523 1572
1524 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); 1573/**
1574 * Handle an element message from a remote peer.
1575 *
1576 * @param cls the union operation
1577 * @param emsg the message
1578 */
1579void
1580handle_union_p2p_full_element (void *cls,
1581 const struct GNUNET_SET_ElementMessage *emsg)
1582{
1583 struct Operation *op = cls;
1584 struct ElementEntry *ee;
1585 struct KeyEntry *ke;
1586 uint16_t element_size;
1587
1588 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1525 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); 1589 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1526 GNUNET_memcpy (&ee[1], &emsg[1], element_size); 1590 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1527 ee->element.size = element_size; 1591 ee->element.size = element_size;
@@ -1544,10 +1608,9 @@ handle_p2p_full_element (void *cls,
1544 1, 1608 1,
1545 GNUNET_NO); 1609 GNUNET_NO);
1546 1610
1547 op->state->received_total += 1; 1611 op->state->received_total++;
1548
1549 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1550 1612
1613 ke = op_get_element (op, &ee->element_hash);
1551 if (NULL != ke) 1614 if (NULL != ke)
1552 { 1615 {
1553 /* Got repeated element. Should not happen since 1616 /* Got repeated element. Should not happen since
@@ -1563,7 +1626,7 @@ handle_p2p_full_element (void *cls,
1563 { 1626 {
1564 LOG (GNUNET_ERROR_TYPE_DEBUG, 1627 LOG (GNUNET_ERROR_TYPE_DEBUG,
1565 "Registering new element from remote peer\n"); 1628 "Registering new element from remote peer\n");
1566 op->state->received_fresh += 1; 1629 op->state->received_fresh++;
1567 op_register_element (op, ee, GNUNET_YES); 1630 op_register_element (op, ee, GNUNET_YES);
1568 /* only send results immediately if the client wants it */ 1631 /* only send results immediately if the client wants it */
1569 switch (op->spec->result_mode) 1632 switch (op->spec->result_mode)
@@ -1581,8 +1644,8 @@ handle_p2p_full_element (void *cls,
1581 } 1644 }
1582 } 1645 }
1583 1646
1584 if ( (GNUNET_YES == op->spec->byzantine) && 1647 if ( (GNUNET_YES == op->spec->byzantine) &&
1585 (op->state->received_total > 384 + op->state->received_fresh * 4) && 1648 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1586 (op->state->received_fresh < op->state->received_total / 6) ) 1649 (op->state->received_fresh < op->state->received_total / 6) )
1587 { 1650 {
1588 /* The other peer gave us lots of old elements, there's something wrong. */ 1651 /* The other peer gave us lots of old elements, there's something wrong. */
@@ -1594,51 +1657,73 @@ handle_p2p_full_element (void *cls,
1594 fail_union_operation (op); 1657 fail_union_operation (op);
1595 return; 1658 return;
1596 } 1659 }
1660 GNUNET_CADET_receive_done (op->channel);
1597} 1661}
1598 1662
1663
1599/** 1664/**
1600 * Send offers (for GNUNET_Hash-es) in response 1665 * Send offers (for GNUNET_Hash-es) in response
1601 * to inquiries (for IBF_Key-s). 1666 * to inquiries (for IBF_Key-s).
1602 * 1667 *
1603 * @param cls the union operation 1668 * @param cls the union operation
1604 * @param mh the message 1669 * @param msg the message
1605 */ 1670 */
1606static void 1671int
1607handle_p2p_inquiry (void *cls, 1672check_union_p2p_inquiry (void *cls,
1608 const struct GNUNET_MessageHeader *mh) 1673 const struct InquiryMessage *msg)
1609{ 1674{
1610 struct Operation *op = cls; 1675 struct Operation *op = cls;
1611 const struct IBF_Key *ibf_key;
1612 unsigned int num_keys; 1676 unsigned int num_keys;
1613 struct InquiryMessage *msg;
1614 1677
1615 /* look up elements and send them */ 1678 if (OT_UNION != op->type)
1679 {
1680 GNUNET_break_op (0);
1681 return GNUNET_SYSERR;
1682 }
1616 if (op->state->phase != PHASE_INVENTORY_PASSIVE) 1683 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1617 { 1684 {
1618 GNUNET_break_op (0); 1685 GNUNET_break_op (0);
1619 fail_union_operation (op); 1686 return GNUNET_SYSERR;
1620 return;
1621 } 1687 }
1622 num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage)) 1688 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1623 / sizeof (struct IBF_Key); 1689 / sizeof (struct IBF_Key);
1624 if ((ntohs (mh->size) - sizeof (struct InquiryMessage)) 1690 if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1625 != num_keys * sizeof (struct IBF_Key)) 1691 != num_keys * sizeof (struct IBF_Key))
1626 { 1692 {
1627 GNUNET_break_op (0); 1693 GNUNET_break_op (0);
1628 fail_union_operation (op); 1694 return GNUNET_SYSERR;
1629 return;
1630 } 1695 }
1696 return GNUNET_OK;
1697}
1631 1698
1632 msg = (struct InquiryMessage *) mh;
1633 1699
1700/**
1701 * Send offers (for GNUNET_Hash-es) in response
1702 * to inquiries (for IBF_Key-s).
1703 *
1704 * @param cls the union operation
1705 * @param msg the message
1706 */
1707void
1708handle_union_p2p_inquiry (void *cls,
1709 const struct InquiryMessage *msg)
1710{
1711 struct Operation *op = cls;
1712 const struct IBF_Key *ibf_key;
1713 unsigned int num_keys;
1714
1715 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1716 / sizeof (struct IBF_Key);
1634 ibf_key = (const struct IBF_Key *) &msg[1]; 1717 ibf_key = (const struct IBF_Key *) &msg[1];
1635 while (0 != num_keys--) 1718 while (0 != num_keys--)
1636 { 1719 {
1637 struct IBF_Key unsalted_key; 1720 struct IBF_Key unsalted_key;
1721
1638 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); 1722 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1639 send_offers_for_key (op, unsalted_key); 1723 send_offers_for_key (op, unsalted_key);
1640 ibf_key++; 1724 ibf_key++;
1641 } 1725 }
1726 GNUNET_CADET_receive_done (op->channel);
1642} 1727}
1643 1728
1644 1729
@@ -1677,27 +1762,34 @@ send_missing_elements_iter (void *cls,
1677 1762
1678 1763
1679/** 1764/**
1680 * Handle a 1765 * Handle a request for full set transmission.
1681 * 1766 *
1682 * @parem cls closure, a set union operation 1767 * @parem cls closure, a set union operation
1683 * @param mh the demand message 1768 * @param mh the demand message
1684 */ 1769 */
1685static void 1770void
1686handle_p2p_request_full (void *cls, 1771handle_union_p2p_request_full (void *cls,
1687 const struct GNUNET_MessageHeader *mh) 1772 const struct GNUNET_MessageHeader *mh)
1688{ 1773{
1689 struct Operation *op = cls; 1774 struct Operation *op = cls;
1690 1775
1691 if (PHASE_EXPECT_IBF != op->state->phase) 1776 if (OT_UNION != op->type)
1692 { 1777 {
1778 GNUNET_break_op (0);
1693 fail_union_operation (op); 1779 fail_union_operation (op);
1780 return;
1781 }
1782 if (PHASE_EXPECT_IBF != op->state->phase)
1783 {
1694 GNUNET_break_op (0); 1784 GNUNET_break_op (0);
1785 fail_union_operation (op);
1695 return; 1786 return;
1696 } 1787 }
1697 1788
1698 // FIXME: we need to check that our set is larger than the 1789 // FIXME: we need to check that our set is larger than the
1699 // byzantine_lower_bound by some threshold 1790 // byzantine_lower_bound by some threshold
1700 send_full_set (op); 1791 send_full_set (op);
1792 GNUNET_CADET_receive_done (op->channel);
1701} 1793}
1702 1794
1703 1795
@@ -1707,56 +1799,97 @@ handle_p2p_request_full (void *cls,
1707 * @parem cls closure, a set union operation 1799 * @parem cls closure, a set union operation
1708 * @param mh the demand message 1800 * @param mh the demand message
1709 */ 1801 */
1710static void 1802void
1711handle_p2p_full_done (void *cls, 1803handle_union_p2p_full_done (void *cls,
1712 const struct GNUNET_MessageHeader *mh) 1804 const struct GNUNET_MessageHeader *mh)
1713{ 1805{
1714 struct Operation *op = cls; 1806 struct Operation *op = cls;
1715 1807
1716 if (PHASE_EXPECT_IBF == op->state->phase) 1808 switch (op->state->phase)
1717 { 1809 {
1718 struct GNUNET_MQ_Envelope *ev; 1810 case PHASE_EXPECT_IBF:
1719 1811 {
1720 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n"); 1812 struct GNUNET_MQ_Envelope *ev;
1721 1813
1722 /* send all the elements that did not come from the remote peer */ 1814 LOG (GNUNET_ERROR_TYPE_DEBUG,
1723 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 1815 "got FULL DONE, sending elements that other peer is missing\n");
1724 &send_missing_elements_iter,
1725 op);
1726 1816
1727 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); 1817 /* send all the elements that did not come from the remote peer */
1728 GNUNET_MQ_send (op->mq, ev); 1818 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1729 op->state->phase = PHASE_DONE; 1819 &send_missing_elements_iter,
1820 op);
1730 1821
1731 /* we now wait until the other peer shuts the tunnel down*/ 1822 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1823 GNUNET_MQ_send (op->mq, ev);
1824 op->state->phase = PHASE_DONE;
1825 /* we now wait until the other peer shuts the tunnel down*/
1826 }
1827 break;
1828 case PHASE_FULL_SENDING:
1829 {
1830 LOG (GNUNET_ERROR_TYPE_DEBUG,
1831 "got FULL DONE, finishing\n");
1832 /* We sent the full set, and got the response for that. We're done. */
1833 op->state->phase = PHASE_DONE;
1834 GNUNET_CADET_receive_done (op->channel);
1835 send_done_and_destroy (op);
1836 return;
1837 }
1838 break;
1839 default:
1840 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1841 "Handle full done phase is %u\n",
1842 (unsigned) op->state->phase);
1843 GNUNET_break_op (0);
1844 fail_union_operation (op);
1845 return;
1732 } 1846 }
1733 else if (PHASE_FULL_SENDING == op->state->phase) 1847 GNUNET_CADET_receive_done (op->channel);
1848}
1849
1850
1851/**
1852 * Check a demand by the other peer for elements based on a list
1853 * of `struct GNUNET_HashCode`s.
1854 *
1855 * @parem cls closure, a set union operation
1856 * @param mh the demand message
1857 * @return #GNUNET_OK if @a mh is well-formed
1858 */
1859int
1860check_union_p2p_demand (void *cls,
1861 const struct GNUNET_MessageHeader *mh)
1862{
1863 struct Operation *op = cls;
1864 unsigned int num_hashes;
1865
1866 if (OT_UNION != op->type)
1734 { 1867 {
1735 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n"); 1868 GNUNET_break_op (0);
1736 /* We sent the full set, and got the response for that. We're done. */ 1869 return GNUNET_SYSERR;
1737 op->state->phase = PHASE_DONE;
1738 send_done_and_destroy (op);
1739 } 1870 }
1740 else 1871 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1872 / sizeof (struct GNUNET_HashCode);
1873 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1874 != num_hashes * sizeof (struct GNUNET_HashCode))
1741 { 1875 {
1742 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1743 GNUNET_break_op (0); 1876 GNUNET_break_op (0);
1744 fail_union_operation (op); 1877 return GNUNET_SYSERR;
1745 return;
1746 } 1878 }
1879 return GNUNET_OK;
1747} 1880}
1748 1881
1749 1882
1750/** 1883/**
1751 * Handle a demand by the other peer for elements based on a list 1884 * Handle a demand by the other peer for elements based on a list
1752 * of GNUNET_HashCode-s. 1885 * of `struct GNUNET_HashCode`s.
1753 * 1886 *
1754 * @parem cls closure, a set union operation 1887 * @parem cls closure, a set union operation
1755 * @param mh the demand message 1888 * @param mh the demand message
1756 */ 1889 */
1757static void 1890void
1758handle_p2p_demand (void *cls, 1891handle_union_p2p_demand (void *cls,
1759 const struct GNUNET_MessageHeader *mh) 1892 const struct GNUNET_MessageHeader *mh)
1760{ 1893{
1761 struct Operation *op = cls; 1894 struct Operation *op = cls;
1762 struct ElementEntry *ee; 1895 struct ElementEntry *ee;
@@ -1767,19 +1900,12 @@ handle_p2p_demand (void *cls,
1767 1900
1768 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 1901 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1769 / sizeof (struct GNUNET_HashCode); 1902 / sizeof (struct GNUNET_HashCode);
1770 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1771 != num_hashes * sizeof (struct GNUNET_HashCode))
1772 {
1773 GNUNET_break_op (0);
1774 fail_union_operation (op);
1775 return;
1776 }
1777
1778 for (hash = (const struct GNUNET_HashCode *) &mh[1]; 1903 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1779 num_hashes > 0; 1904 num_hashes > 0;
1780 hash++, num_hashes--) 1905 hash++, num_hashes--)
1781 { 1906 {
1782 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash); 1907 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1908 hash);
1783 if (NULL == ee) 1909 if (NULL == ee)
1784 { 1910 {
1785 /* Demand for non-existing element. */ 1911 /* Demand for non-existing element. */
@@ -1823,31 +1949,35 @@ handle_p2p_demand (void *cls,
1823 break; 1949 break;
1824 } 1950 }
1825 } 1951 }
1952 GNUNET_CADET_receive_done (op->channel);
1826} 1953}
1827 1954
1828 1955
1829/** 1956/**
1830 * Handle offers (of GNUNET_HashCode-s) and 1957 * Check offer (of `struct GNUNET_HashCode`s).
1831 * respond with demands (of GNUNET_HashCode-s).
1832 * 1958 *
1833 * @param cls the union operation 1959 * @param cls the union operation
1834 * @param mh the message 1960 * @param mh the message
1961 * @return #GNUNET_OK if @a mh is well-formed
1835 */ 1962 */
1836static void 1963int
1837handle_p2p_offer (void *cls, 1964check_union_p2p_offer (void *cls,
1838 const struct GNUNET_MessageHeader *mh) 1965 const struct GNUNET_MessageHeader *mh)
1839{ 1966{
1840 struct Operation *op = cls; 1967 struct Operation *op = cls;
1841 const struct GNUNET_HashCode *hash;
1842 unsigned int num_hashes; 1968 unsigned int num_hashes;
1843 1969
1970 if (OT_UNION != op->type)
1971 {
1972 GNUNET_break_op (0);
1973 return GNUNET_SYSERR;
1974 }
1844 /* look up elements and send them */ 1975 /* look up elements and send them */
1845 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && 1976 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1846 (op->state->phase != PHASE_INVENTORY_ACTIVE)) 1977 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1847 { 1978 {
1848 GNUNET_break_op (0); 1979 GNUNET_break_op (0);
1849 fail_union_operation (op); 1980 return GNUNET_SYSERR;
1850 return;
1851 } 1981 }
1852 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 1982 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1853 / sizeof (struct GNUNET_HashCode); 1983 / sizeof (struct GNUNET_HashCode);
@@ -1855,10 +1985,29 @@ handle_p2p_offer (void *cls,
1855 != num_hashes * sizeof (struct GNUNET_HashCode)) 1985 != num_hashes * sizeof (struct GNUNET_HashCode))
1856 { 1986 {
1857 GNUNET_break_op (0); 1987 GNUNET_break_op (0);
1858 fail_union_operation (op); 1988 return GNUNET_SYSERR;
1859 return;
1860 } 1989 }
1990 return GNUNET_OK;
1991}
1861 1992
1993
1994/**
1995 * Handle offers (of `struct GNUNET_HashCode`s) and
1996 * respond with demands (of `struct GNUNET_HashCode`s).
1997 *
1998 * @param cls the union operation
1999 * @param mh the message
2000 */
2001void
2002handle_union_p2p_offer (void *cls,
2003 const struct GNUNET_MessageHeader *mh)
2004{
2005 struct Operation *op = cls;
2006 const struct GNUNET_HashCode *hash;
2007 unsigned int num_hashes;
2008
2009 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2010 / sizeof (struct GNUNET_HashCode);
1862 for (hash = (const struct GNUNET_HashCode *) &mh[1]; 2011 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1863 num_hashes > 0; 2012 num_hashes > 0;
1864 hash++, num_hashes--) 2013 hash++, num_hashes--)
@@ -1897,6 +2046,7 @@ handle_p2p_offer (void *cls,
1897 *(struct GNUNET_HashCode *) &demands[1] = *hash; 2046 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1898 GNUNET_MQ_send (op->mq, ev); 2047 GNUNET_MQ_send (op->mq, ev);
1899 } 2048 }
2049 GNUNET_CADET_receive_done (op->channel);
1900} 2050}
1901 2051
1902 2052
@@ -1906,16 +2056,22 @@ handle_p2p_offer (void *cls,
1906 * @param cls the union operation 2056 * @param cls the union operation
1907 * @param mh the message 2057 * @param mh the message
1908 */ 2058 */
1909static void 2059void
1910handle_p2p_done (void *cls, 2060handle_union_p2p_done (void *cls,
1911 const struct GNUNET_MessageHeader *mh) 2061 const struct GNUNET_MessageHeader *mh)
1912{ 2062{
1913 struct Operation *op = cls; 2063 struct Operation *op = cls;
1914 2064
1915 if (op->state->phase == PHASE_INVENTORY_PASSIVE) 2065 if (OT_UNION != op->type)
1916 { 2066 {
2067 GNUNET_break_op (0);
2068 fail_union_operation (op);
2069 return;
2070 }
2071 switch (op->state->phase)
2072 {
2073 case PHASE_INVENTORY_PASSIVE:
1917 /* We got all requests, but still have to send our elements in response. */ 2074 /* We got all requests, but still have to send our elements in response. */
1918
1919 op->state->phase = PHASE_FINISH_WAITING; 2075 op->state->phase = PHASE_FINISH_WAITING;
1920 2076
1921 LOG (GNUNET_ERROR_TYPE_DEBUG, 2077 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1929,11 +2085,10 @@ handle_p2p_done (void *cls,
1929 * all our demands are satisfied, so that the active 2085 * all our demands are satisfied, so that the active
1930 * peer can quit if we gave him everything. 2086 * peer can quit if we gave him everything.
1931 */ 2087 */
2088 GNUNET_CADET_receive_done (op->channel);
1932 maybe_finish (op); 2089 maybe_finish (op);
1933 return; 2090 return;
1934 } 2091 case PHASE_INVENTORY_ACTIVE:
1935 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1936 {
1937 LOG (GNUNET_ERROR_TYPE_DEBUG, 2092 LOG (GNUNET_ERROR_TYPE_DEBUG,
1938 "got DONE (as active partner), waiting to finish\n"); 2093 "got DONE (as active partner), waiting to finish\n");
1939 /* All demands of the other peer are satisfied, 2094 /* All demands of the other peer are satisfied,
@@ -1944,11 +2099,14 @@ handle_p2p_done (void *cls,
1944 * to the other peer once our demands are met. 2099 * to the other peer once our demands are met.
1945 */ 2100 */
1946 op->state->phase = PHASE_FINISH_CLOSING; 2101 op->state->phase = PHASE_FINISH_CLOSING;
2102 GNUNET_CADET_receive_done (op->channel);
1947 maybe_finish (op); 2103 maybe_finish (op);
1948 return; 2104 return;
2105 default:
2106 GNUNET_break_op (0);
2107 fail_union_operation (op);
2108 return;
1949 } 2109 }
1950 GNUNET_break_op (0);
1951 fail_union_operation (op);
1952} 2110}
1953 2111
1954 2112
@@ -2119,62 +2277,6 @@ union_set_destroy (struct SetState *set_state)
2119 2277
2120 2278
2121/** 2279/**
2122 * Dispatch messages for a union operation.
2123 *
2124 * @param op the state of the union evaluate operation
2125 * @param mh the received message
2126 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
2127 * #GNUNET_OK otherwise
2128 */
2129int
2130union_handle_p2p_message (struct Operation *op,
2131 const struct GNUNET_MessageHeader *mh)
2132{
2133 //LOG (GNUNET_ERROR_TYPE_DEBUG,
2134 // "received p2p message (t: %u, s: %u)\n",
2135 // ntohs (mh->type),
2136 // ntohs (mh->size));
2137 switch (ntohs (mh->type))
2138 {
2139 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
2140 return handle_p2p_ibf (op, mh);
2141 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
2142 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
2143 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
2144 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
2145 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
2146 handle_p2p_elements (op, mh);
2147 break;
2148 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2149 handle_p2p_full_element (op, mh);
2150 break;
2151 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
2152 handle_p2p_inquiry (op, mh);
2153 break;
2154 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
2155 handle_p2p_done (op, mh);
2156 break;
2157 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
2158 handle_p2p_offer (op, mh);
2159 break;
2160 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
2161 handle_p2p_demand (op, mh);
2162 break;
2163 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2164 handle_p2p_full_done (op, mh);
2165 break;
2166 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2167 handle_p2p_request_full (op, mh);
2168 break;
2169 default:
2170 /* Something wrong with cadet's message handlers? */
2171 GNUNET_assert (0);
2172 }
2173 return GNUNET_OK;
2174}
2175
2176
2177/**
2178 * Handler for peer-disconnects, notifies the client 2280 * Handler for peer-disconnects, notifies the client
2179 * about the aborted operation in case the op was not concluded. 2281 * about the aborted operation in case the op was not concluded.
2180 * 2282 *
@@ -2240,7 +2342,6 @@ _GSS_union_vt ()
2240{ 2342{
2241 static const struct SetVT union_vt = { 2343 static const struct SetVT union_vt = {
2242 .create = &union_set_create, 2344 .create = &union_set_create,
2243 .msg_handler = &union_handle_p2p_message,
2244 .add = &union_add, 2345 .add = &union_add,
2245 .remove = &union_remove, 2346 .remove = &union_remove,
2246 .destroy_set = &union_set_destroy, 2347 .destroy_set = &union_set_destroy,
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 04a4e4910..bc428f9f6 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -76,6 +76,8 @@ struct GNUNET_SET_Handle
76 76
77 /** 77 /**
78 * Should the set be destroyed once all operations are gone? 78 * Should the set be destroyed once all operations are gone?
79 * #GNUNET_SYSERR if #GNUNET_SET_destroy() must raise this flag,
80 * #GNUNET_YES if #GNUNET_SET_destroy() did raise this flag.
79 */ 81 */
80 int destroy_requested; 82 int destroy_requested;
81 83
@@ -345,11 +347,13 @@ handle_iter_done (void *cls,
345 347
346 if (NULL == iter) 348 if (NULL == iter)
347 return; 349 return;
350 set->destroy_requested = GNUNET_SYSERR;
348 set->iterator = NULL; 351 set->iterator = NULL;
349 set->iteration_id++; 352 set->iteration_id++;
350 iter (set->iterator_cls, 353 iter (set->iterator_cls,
351 NULL); 354 NULL);
352 355 if (GNUNET_SYSERR == set->destroy_requested)
356 set->destroy_requested = GNUNET_NO;
353 if (GNUNET_YES == set->destroy_requested) 357 if (GNUNET_YES == set->destroy_requested)
354 GNUNET_SET_destroy (set); 358 GNUNET_SET_destroy (set);
355} 359}
@@ -736,7 +740,9 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
736 /* destroying set while iterator is active is currently 740 /* destroying set while iterator is active is currently
737 not supported; we should expand the API to allow 741 not supported; we should expand the API to allow
738 clients to explicitly cancel the iteration! */ 742 clients to explicitly cancel the iteration! */
739 if ( (NULL != set->ops_head) || (NULL != set->iterator) ) 743 if ( (NULL != set->ops_head) ||
744 (NULL != set->iterator) ||
745 (GNUNET_SYSERR == set->destroy_requested) )
740 { 746 {
741 LOG (GNUNET_ERROR_TYPE_DEBUG, 747 LOG (GNUNET_ERROR_TYPE_DEBUG,
742 "Set operations are pending, delaying set destruction\n"); 748 "Set operations are pending, delaying set destruction\n");
@@ -809,7 +815,7 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
809 msg->force_delta = GNUNET_YES; 815 msg->force_delta = GNUNET_YES;
810 break; 816 break;
811 default: 817 default:
812 LOG (GNUNET_ERROR_TYPE_ERROR, 818 LOG (GNUNET_ERROR_TYPE_ERROR,
813 "Option with type %d not recognized\n", (int) opt->type); 819 "Option with type %d not recognized\n", (int) opt->type);
814 } 820 }
815 } 821 }
diff --git a/src/set/test_set_union_copy.c b/src/set/test_set_union_copy.c
index c887a8958..a1eba6311 100644
--- a/src/set/test_set_union_copy.c
+++ b/src/set/test_set_union_copy.c
@@ -122,6 +122,7 @@ check_count_iter (void *cls,
122 return GNUNET_NO; 122 return GNUNET_NO;
123 } 123 }
124 ci_cls->cont (ci_cls->cont_cls); 124 ci_cls->cont (ci_cls->cont_cls);
125 GNUNET_free (ci_cls);
125 return GNUNET_NO; 126 return GNUNET_NO;
126 } 127 }
127 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 128 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,