aboutsummaryrefslogtreecommitdiff
path: root/src/seti/gnunet-service-seti.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/seti/gnunet-service-seti.c')
-rw-r--r--src/seti/gnunet-service-seti.c997
1 files changed, 244 insertions, 753 deletions
diff --git a/src/seti/gnunet-service-seti.c b/src/seti/gnunet-service-seti.c
index 3b8da01cd..037181bde 100644
--- a/src/seti/gnunet-service-seti.c
+++ b/src/seti/gnunet-service-seti.c
@@ -108,28 +108,6 @@ struct Operation;
108 108
109 109
110/** 110/**
111 * MutationEvent gives information about changes
112 * to an element (removal / addition) in a set content.
113 */
114struct MutationEvent
115{
116 /**
117 * First generation affected by this mutation event.
118 *
119 * If @a generation is 0, this mutation event is a list
120 * sentinel element.
121 */
122 unsigned int generation;
123
124 /**
125 * If @a added is #GNUNET_YES, then this is a
126 * `remove` event, otherwise it is an `add` event.
127 */
128 int added;
129};
130
131
132/**
133 * Information about an element element in the set. All elements are 111 * Information about an element element in the set. All elements are
134 * stored in a hash-table from their hash-code to their `struct 112 * stored in a hash-table from their hash-code to their `struct
135 * Element`, so that the remove and add operations are reasonably 113 * Element`, so that the remove and add operations are reasonably
@@ -150,20 +128,9 @@ struct ElementEntry
150 struct GNUNET_HashCode element_hash; 128 struct GNUNET_HashCode element_hash;
151 129
152 /** 130 /**
153 * If @a mutations is not NULL, it contains 131 * Generation in which the element was added.
154 * a list of mutations, ordered by increasing generation.
155 * The list is terminated by a sentinel event with `generation`
156 * set to 0.
157 *
158 * If @a mutations is NULL, then this element exists in all generations
159 * of the respective set content this element belongs to.
160 */
161 struct MutationEvent *mutations;
162
163 /**
164 * Number of elements in the array @a mutations.
165 */ 132 */
166 unsigned int mutations_size; 133 unsigned int generation_added;
167 134
168 /** 135 /**
169 * #GNUNET_YES if the element is a remote element, and does not belong 136 * #GNUNET_YES if the element is a remote element, and does not belong
@@ -285,7 +252,13 @@ struct Operation
285 /** 252 /**
286 * When are elements sent to the client, and which elements are sent? 253 * When are elements sent to the client, and which elements are sent?
287 */ 254 */
288 enum GNUNET_SET_ResultMode result_mode; 255 int return_intersection;
256
257 /**
258 * Lower bound for the set size, used only when
259 * byzantine mode is enabled.
260 */
261 int byzantine_lower_bound;
289 262
290 /** 263 /**
291 * Always use delta operation instead of sending full sets, 264 * Always use delta operation instead of sending full sets,
@@ -306,12 +279,6 @@ struct Operation
306 int byzantine; 279 int byzantine;
307 280
308 /** 281 /**
309 * Lower bound for the set size, used only when
310 * byzantine mode is enabled.
311 */
312 int byzantine_lower_bound;
313
314 /**
315 * Unique request id for the request from a remote peer, sent to the 282 * Unique request id for the request from a remote peer, sent to the
316 * client, which will accept or reject the request. Set to '0' iff 283 * client, which will accept or reject the request. Set to '0' iff
317 * the request has not been suggested yet. 284 * the request has not been suggested yet.
@@ -319,8 +286,7 @@ struct Operation
319 uint32_t suggest_id; 286 uint32_t suggest_id;
320 287
321 /** 288 /**
322 * Generation in which the operation handle 289 * Generation in which the operation handle was created.
323 * was created.
324 */ 290 */
325 unsigned int generation_created; 291 unsigned int generation_created;
326}; 292};
@@ -338,20 +304,6 @@ struct SetContent
338 struct GNUNET_CONTAINER_MultiHashMap *elements; 304 struct GNUNET_CONTAINER_MultiHashMap *elements;
339 305
340 /** 306 /**
341 * Mutations requested by the client that we're
342 * unable to execute right now because we're iterating
343 * over the underlying hash map of elements.
344 */
345 struct PendingMutation *pending_mutations_head;
346
347 /**
348 * Mutations requested by the client that we're
349 * unable to execute right now because we're iterating
350 * over the underlying hash map of elements.
351 */
352 struct PendingMutation *pending_mutations_tail;
353
354 /**
355 * Number of references to the content. 307 * Number of references to the content.
356 */ 308 */
357 unsigned int refcount; 309 unsigned int refcount;
@@ -368,49 +320,6 @@ struct SetContent
368}; 320};
369 321
370 322
371struct GenerationRange
372{
373 /**
374 * First generation that is excluded.
375 */
376 unsigned int start;
377
378 /**
379 * Generation after the last excluded generation.
380 */
381 unsigned int end;
382};
383
384
385/**
386 * Information about a mutation to apply to a set.
387 */
388struct PendingMutation
389{
390 /**
391 * Mutations are kept in a DLL.
392 */
393 struct PendingMutation *prev;
394
395 /**
396 * Mutations are kept in a DLL.
397 */
398 struct PendingMutation *next;
399
400 /**
401 * Set this mutation is about.
402 */
403 struct Set *set;
404
405 /**
406 * Message that describes the desired mutation.
407 * May only be a #GNUNET_MESSAGE_TYPE_SET_ADD or
408 * #GNUNET_MESSAGE_TYPE_SET_REMOVE.
409 */
410 struct GNUNET_SET_ElementMessage *msg;
411};
412
413
414/** 323/**
415 * A set that supports a specific operation with other peers. 324 * A set that supports a specific operation with other peers.
416 */ 325 */
@@ -444,12 +353,6 @@ struct Set
444 struct SetState *state; 353 struct SetState *state;
445 354
446 /** 355 /**
447 * Current state of iterating elements for the client.
448 * NULL if we are not currently iterating.
449 */
450 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
451
452 /**
453 * Evaluate operations are held in a linked list. 356 * Evaluate operations are held in a linked list.
454 */ 357 */
455 struct Operation *ops_head; 358 struct Operation *ops_head;
@@ -460,36 +363,11 @@ struct Set
460 struct Operation *ops_tail; 363 struct Operation *ops_tail;
461 364
462 /** 365 /**
463 * List of generations we have to exclude, due to lazy copies.
464 */
465 struct GenerationRange *excluded_generations;
466
467 /**
468 * Current generation, that is, number of previously executed 366 * Current generation, that is, number of previously executed
469 * operations and lazy copies on the underlying set content. 367 * operations and lazy copies on the underlying set content.
470 */ 368 */
471 unsigned int current_generation; 369 unsigned int current_generation;
472 370
473 /**
474 * Number of elements in array @a excluded_generations.
475 */
476 unsigned int excluded_generations_size;
477
478 /**
479 * Type of operation supported for this set
480 */
481 enum GNUNET_SET_OperationType operation;
482
483 /**
484 * Generation we're currently iteration over.
485 */
486 unsigned int iter_generation;
487
488 /**
489 * Each @e iter is assigned a unique number, so that the client
490 * can distinguish iterations.
491 */
492 uint16_t iteration_id;
493}; 371};
494 372
495 373
@@ -724,7 +602,7 @@ send_client_removed_element (struct Operation *op,
724 struct GNUNET_MQ_Envelope *ev; 602 struct GNUNET_MQ_Envelope *ev;
725 struct GNUNET_SET_ResultMessage *rm; 603 struct GNUNET_SET_ResultMessage *rm;
726 604
727 if (GNUNET_SET_RESULT_REMOVED != op->result_mode) 605 if (GNUNET_NO != op->return_intersection)
728 return; /* Wrong mode for transmitting removed elements */ 606 return; /* Wrong mode for transmitting removed elements */
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 607 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "Sending removed element (size %u) to client\n", 608 "Sending removed element (size %u) to client\n",
@@ -736,13 +614,13 @@ send_client_removed_element (struct Operation *op,
736 GNUNET_assert (0 != op->client_request_id); 614 GNUNET_assert (0 != op->client_request_id);
737 ev = GNUNET_MQ_msg_extra (rm, 615 ev = GNUNET_MQ_msg_extra (rm,
738 element->size, 616 element->size,
739 GNUNET_MESSAGE_TYPE_SET_RESULT); 617 GNUNET_MESSAGE_TYPE_SETI_RESULT);
740 if (NULL == ev) 618 if (NULL == ev)
741 { 619 {
742 GNUNET_break (0); 620 GNUNET_break (0);
743 return; 621 return;
744 } 622 }
745 rm->result_status = htons (GNUNET_SET_STATUS_OK); 623 rm->result_status = htons (GNUNET_SET_STATUS_DEL_LOCAL);
746 rm->request_id = htonl (op->client_request_id); 624 rm->request_id = htonl (op->client_request_id);
747 rm->element_type = element->element_type; 625 rm->element_type = element->element_type;
748 GNUNET_memcpy (&rm[1], 626 GNUNET_memcpy (&rm[1],
@@ -770,7 +648,6 @@ filtered_map_initialization (void *cls,
770 struct ElementEntry *ee = value; 648 struct ElementEntry *ee = value;
771 struct GNUNET_HashCode mutated_hash; 649 struct GNUNET_HashCode mutated_hash;
772 650
773
774 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
775 "FIMA called for %s:%u\n", 652 "FIMA called for %s:%u\n",
776 GNUNET_h2s (&ee->element_hash), 653 GNUNET_h2s (&ee->element_hash),
@@ -934,8 +811,8 @@ fail_intersection_operation (struct Operation *op)
934 op->state->my_elements = NULL; 811 op->state->my_elements = NULL;
935 } 812 }
936 ev = GNUNET_MQ_msg (msg, 813 ev = GNUNET_MQ_msg (msg,
937 GNUNET_MESSAGE_TYPE_SET_RESULT); 814 GNUNET_MESSAGE_TYPE_SETI_RESULT);
938 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 815 msg->result_status = htons (GNUNET_SETI_STATUS_FAILURE);
939 msg->request_id = htonl (op->client_request_id); 816 msg->request_id = htonl (op->client_request_id);
940 msg->element_type = htons (0); 817 msg->element_type = htons (0);
941 GNUNET_MQ_send (op->set->cs->mq, 818 GNUNET_MQ_send (op->set->cs->mq,
@@ -999,7 +876,7 @@ send_bloomfilter (struct Operation *op)
999 chunk_size = bf_size; 876 chunk_size = bf_size;
1000 ev = GNUNET_MQ_msg_extra (msg, 877 ev = GNUNET_MQ_msg_extra (msg,
1001 chunk_size, 878 chunk_size,
1002 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); 879 GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_BF);
1003 GNUNET_assert (GNUNET_SYSERR != 880 GNUNET_assert (GNUNET_SYSERR !=
1004 GNUNET_CONTAINER_bloomfilter_get_raw_data ( 881 GNUNET_CONTAINER_bloomfilter_get_raw_data (
1005 op->state->local_bf, 882 op->state->local_bf,
@@ -1028,7 +905,7 @@ send_bloomfilter (struct Operation *op)
1028 chunk_size = bf_size - offset; 905 chunk_size = bf_size - offset;
1029 ev = GNUNET_MQ_msg_extra (msg, 906 ev = GNUNET_MQ_msg_extra (msg,
1030 chunk_size, 907 chunk_size,
1031 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); 908 GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_BF);
1032 GNUNET_memcpy (&msg[1], 909 GNUNET_memcpy (&msg[1],
1033 &bf_data[offset], 910 &bf_data[offset],
1034 chunk_size); 911 chunk_size);
@@ -1067,7 +944,7 @@ send_client_done_and_destroy (void *cls)
1067 1, 944 1,
1068 GNUNET_NO); 945 GNUNET_NO);
1069 ev = GNUNET_MQ_msg (rm, 946 ev = GNUNET_MQ_msg (rm,
1070 GNUNET_MESSAGE_TYPE_SET_RESULT); 947 GNUNET_MESSAGE_TYPE_SETI_RESULT);
1071 rm->request_id = htonl (op->client_request_id); 948 rm->request_id = htonl (op->client_request_id);
1072 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 949 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1073 rm->element_type = htons (0); 950 rm->element_type = htons (0);
@@ -1114,7 +991,7 @@ send_p2p_done (struct Operation *op)
1114 GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase); 991 GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase);
1115 GNUNET_assert (GNUNET_NO == op->state->channel_death_expected); 992 GNUNET_assert (GNUNET_NO == op->state->channel_death_expected);
1116 ev = GNUNET_MQ_msg (idm, 993 ev = GNUNET_MQ_msg (idm,
1117 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); 994 GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_DONE);
1118 idm->final_element_count = htonl (op->state->my_element_count); 995 idm->final_element_count = htonl (op->state->my_element_count);
1119 idm->element_xor_hash = op->state->my_xor; 996 idm->element_xor_hash = op->state->my_xor;
1120 GNUNET_MQ_notify_sent (ev, 997 GNUNET_MQ_notify_sent (ev,
@@ -1176,7 +1053,7 @@ send_remaining_elements (void *cls)
1176 GNUNET_assert (0 != op->client_request_id); 1053 GNUNET_assert (0 != op->client_request_id);
1177 ev = GNUNET_MQ_msg_extra (rm, 1054 ev = GNUNET_MQ_msg_extra (rm,
1178 element->size, 1055 element->size,
1179 GNUNET_MESSAGE_TYPE_SET_RESULT); 1056 GNUNET_MESSAGE_TYPE_SETI_RESULT);
1180 GNUNET_assert (NULL != ev); 1057 GNUNET_assert (NULL != ev);
1181 rm->result_status = htons (GNUNET_SET_STATUS_OK); 1058 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1182 rm->request_id = htonl (op->client_request_id); 1059 rm->request_id = htonl (op->client_request_id);
@@ -1243,7 +1120,7 @@ send_element_count (struct Operation *op)
1243 "Sending our element count (%u)\n", 1120 "Sending our element count (%u)\n",
1244 op->state->my_element_count); 1121 op->state->my_element_count);
1245 ev = GNUNET_MQ_msg (msg, 1122 ev = GNUNET_MQ_msg (msg,
1246 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); 1123 GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_ELEMENT_INFO);
1247 msg->sender_element_count = htonl (op->state->my_element_count); 1124 msg->sender_element_count = htonl (op->state->my_element_count);
1248 GNUNET_MQ_send (op->mq, ev); 1125 GNUNET_MQ_send (op->mq, ev);
1249} 1126}
@@ -1273,7 +1150,7 @@ begin_bf_exchange (struct Operation *op)
1273 * @param cls the intersection operation 1150 * @param cls the intersection operation
1274 * @param mh the header of the message 1151 * @param mh the header of the message
1275 */ 1152 */
1276void 1153static void
1277handle_intersection_p2p_element_info (void *cls, 1154handle_intersection_p2p_element_info (void *cls,
1278 const struct 1155 const struct
1279 IntersectionElementInfoMessage *msg) 1156 IntersectionElementInfoMessage *msg)
@@ -1327,7 +1204,6 @@ process_bf (struct Operation *op)
1327 GNUNET_break_op (0); 1204 GNUNET_break_op (0);
1328 fail_intersection_operation (op); 1205 fail_intersection_operation (op);
1329 return; 1206 return;
1330
1331 case PHASE_COUNT_SENT: 1207 case PHASE_COUNT_SENT:
1332 /* This is the first BF being sent, build our initial map with 1208 /* This is the first BF being sent, build our initial map with
1333 filtering in place */ 1209 filtering in place */
@@ -1336,24 +1212,20 @@ process_bf (struct Operation *op)
1336 &filtered_map_initialization, 1212 &filtered_map_initialization,
1337 op); 1213 op);
1338 break; 1214 break;
1339
1340 case PHASE_BF_EXCHANGE: 1215 case PHASE_BF_EXCHANGE:
1341 /* Update our set by reduction */ 1216 /* Update our set by reduction */
1342 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, 1217 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
1343 &iterator_bf_reduce, 1218 &iterator_bf_reduce,
1344 op); 1219 op);
1345 break; 1220 break;
1346
1347 case PHASE_MUST_SEND_DONE: 1221 case PHASE_MUST_SEND_DONE:
1348 GNUNET_break_op (0); 1222 GNUNET_break_op (0);
1349 fail_intersection_operation (op); 1223 fail_intersection_operation (op);
1350 return; 1224 return;
1351
1352 case PHASE_DONE_RECEIVED: 1225 case PHASE_DONE_RECEIVED:
1353 GNUNET_break_op (0); 1226 GNUNET_break_op (0);
1354 fail_intersection_operation (op); 1227 fail_intersection_operation (op);
1355 return; 1228 return;
1356
1357 case PHASE_FINISHED: 1229 case PHASE_FINISHED:
1358 GNUNET_break_op (0); 1230 GNUNET_break_op (0);
1359 fail_intersection_operation (op); 1231 fail_intersection_operation (op);
@@ -1420,7 +1292,7 @@ check_intersection_p2p_bf (void *cls,
1420 * @param cls the intersection operation 1292 * @param cls the intersection operation
1421 * @param msg the header of the message 1293 * @param msg the header of the message
1422 */ 1294 */
1423static 1295static void
1424handle_intersection_p2p_bf (void *cls, 1296handle_intersection_p2p_bf (void *cls,
1425 const struct BFMessage *msg) 1297 const struct BFMessage *msg)
1426{ 1298{
@@ -1613,214 +1485,6 @@ handle_intersection_p2p_done (void *cls,
1613 1485
1614 1486
1615/** 1487/**
1616 * Initiate a set intersection operation with a remote peer.
1617 *
1618 * @param op operation that is created, should be initialized to
1619 * begin the evaluation
1620 * @param opaque_context message to be transmitted to the listener
1621 * to convince it to accept, may be NULL
1622 * @return operation-specific state to keep in @a op
1623 */
1624static struct OperationState *
1625intersection_evaluate (struct Operation *op,
1626 const struct GNUNET_MessageHeader *opaque_context)
1627{
1628 struct OperationState *state;
1629 struct GNUNET_MQ_Envelope *ev;
1630 struct OperationRequestMessage *msg;
1631
1632 ev = GNUNET_MQ_msg_nested_mh (msg,
1633 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1634 opaque_context);
1635 if (NULL == ev)
1636 {
1637 /* the context message is too large!? */
1638 GNUNET_break (0);
1639 return NULL;
1640 }
1641 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1642 "Initiating intersection operation evaluation\n");
1643 state = GNUNET_new (struct OperationState);
1644 /* we started the operation, thus we have to send the operation request */
1645 state->phase = PHASE_INITIAL;
1646 state->my_element_count = op->set->state->current_set_element_count;
1647 state->my_elements
1648 = GNUNET_CONTAINER_multihashmap_create (state->my_element_count,
1649 GNUNET_YES);
1650
1651 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1652 msg->element_count = htonl (state->my_element_count);
1653 GNUNET_MQ_send (op->mq,
1654 ev);
1655 state->phase = PHASE_COUNT_SENT;
1656 if (NULL != opaque_context)
1657 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1658 "Sent op request with context message\n");
1659 else
1660 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1661 "Sent op request without context message\n");
1662 return state;
1663}
1664
1665
1666/**
1667 * Accept an intersection operation request from a remote peer. Only
1668 * initializes the private operation state.
1669 *
1670 * @param op operation that will be accepted as an intersection operation
1671 */
1672static struct OperationState *
1673intersection_accept (struct Operation *op)
1674{
1675 struct OperationState *state;
1676
1677 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1678 "Accepting set intersection operation\n");
1679 state = GNUNET_new (struct OperationState);
1680 state->phase = PHASE_INITIAL;
1681 state->my_element_count
1682 = op->set->state->current_set_element_count;
1683 state->my_elements
1684 = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count,
1685 op->remote_element_count),
1686 GNUNET_YES);
1687 op->state = state;
1688 if (op->remote_element_count < state->my_element_count)
1689 {
1690 /* If the other peer (Alice) has fewer elements than us (Bob),
1691 we just send the count as Alice should send the first BF */
1692 send_element_count (op);
1693 state->phase = PHASE_COUNT_SENT;
1694 return state;
1695 }
1696 /* We have fewer elements, so we start with the BF */
1697 begin_bf_exchange (op);
1698 return state;
1699}
1700
1701
1702/**
1703 * Destroy the intersection operation. Only things specific to the
1704 * intersection operation are destroyed.
1705 *
1706 * @param op intersection operation to destroy
1707 */
1708static void
1709intersection_op_cancel (struct Operation *op)
1710{
1711 /* check if the op was canceled twice */
1712 GNUNET_assert (NULL != op->state);
1713 if (NULL != op->state->remote_bf)
1714 {
1715 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1716 op->state->remote_bf = NULL;
1717 }
1718 if (NULL != op->state->local_bf)
1719 {
1720 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1721 op->state->local_bf = NULL;
1722 }
1723 if (NULL != op->state->my_elements)
1724 {
1725 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1726 op->state->my_elements = NULL;
1727 }
1728 if (NULL != op->state->full_result_iter)
1729 {
1730 GNUNET_CONTAINER_multihashmap_iterator_destroy (
1731 op->state->full_result_iter);
1732 op->state->full_result_iter = NULL;
1733 }
1734 GNUNET_free (op->state);
1735 op->state = NULL;
1736 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1737 "Destroying intersection op state done\n");
1738}
1739
1740
1741/**
1742 * Create a new set supporting the intersection operation.
1743 *
1744 * @return the newly created set
1745 */
1746static struct SetState *
1747intersection_set_create ()
1748{
1749 struct SetState *set_state;
1750
1751 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1752 "Intersection set created\n");
1753 set_state = GNUNET_new (struct SetState);
1754 set_state->current_set_element_count = 0;
1755
1756 return set_state;
1757}
1758
1759
1760/**
1761 * Add the element from the given element message to the set.
1762 *
1763 * @param set_state state of the set want to add to
1764 * @param ee the element to add to the set
1765 */
1766static void
1767intersection_add (struct SetState *set_state,
1768 struct ElementEntry *ee)
1769{
1770 set_state->current_set_element_count++;
1771}
1772
1773
1774/**
1775 * Destroy a set that supports the intersection operation
1776 *
1777 * @param set_state the set to destroy
1778 */
1779static void
1780intersection_set_destroy (struct SetState *set_state)
1781{
1782 GNUNET_free (set_state);
1783}
1784
1785
1786/**
1787 * Remove the element given in the element message from the set.
1788 *
1789 * @param set_state state of the set to remove from
1790 * @param element set element to remove
1791 */
1792static void
1793intersection_remove (struct SetState *set_state,
1794 struct ElementEntry *element)
1795{
1796 GNUNET_assert (0 < set_state->current_set_element_count);
1797 set_state->current_set_element_count--;
1798}
1799
1800
1801/**
1802 * Callback for channel death for the intersection operation.
1803 *
1804 * @param op operation that lost the channel
1805 */
1806static void
1807intersection_channel_death (struct Operation *op)
1808{
1809 if (GNUNET_YES == op->state->channel_death_expected)
1810 {
1811 /* oh goodie, we are done! */
1812 send_client_done_and_destroy (op);
1813 }
1814 else
1815 {
1816 /* sorry, channel went down early, too bad. */
1817 _GSS_operation_destroy (op,
1818 GNUNET_YES);
1819 }
1820}
1821
1822
1823/**
1824 * Get the incoming socket associated with the given id. 1488 * Get the incoming socket associated with the given id.
1825 * 1489 *
1826 * @param listener the listener to look in 1490 * @param listener the listener to look in
@@ -1870,184 +1534,17 @@ incoming_destroy (struct Operation *op)
1870 1534
1871 1535
1872/** 1536/**
1873 * Context for the #garbage_collect_cb().
1874 */
1875struct GarbageContext
1876{
1877 /**
1878 * Map for which we are garbage collecting removed elements.
1879 */
1880 struct GNUNET_CONTAINER_MultiHashMap *map;
1881
1882 /**
1883 * Lowest generation for which an operation is still pending.
1884 */
1885 unsigned int min_op_generation;
1886
1887 /**
1888 * Largest generation for which an operation is still pending.
1889 */
1890 unsigned int max_op_generation;
1891};
1892
1893
1894/**
1895 * Function invoked to check if an element can be removed from
1896 * the set's history because it is no longer needed.
1897 *
1898 * @param cls the `struct GarbageContext *`
1899 * @param key key of the element in the map
1900 * @param value the `struct ElementEntry *`
1901 * @return #GNUNET_OK (continue to iterate)
1902 */
1903static int
1904garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value)
1905{
1906 // struct GarbageContext *gc = cls;
1907 // struct ElementEntry *ee = value;
1908
1909 // if (GNUNET_YES != ee->removed)
1910 // return GNUNET_OK;
1911 // if ( (gc->max_op_generation < ee->generation_added) ||
1912 // (ee->generation_removed > gc->min_op_generation) )
1913 // {
1914 // GNUNET_assert (GNUNET_YES ==
1915 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
1916 // key,
1917 // ee));
1918 // GNUNET_free (ee);
1919 // }
1920 return GNUNET_OK;
1921}
1922
1923
1924/**
1925 * Collect and destroy elements that are not needed anymore, because
1926 * their lifetime (as determined by their generation) does not overlap
1927 * with any active set operation.
1928 *
1929 * @param set set to garbage collect
1930 */
1931static void
1932collect_generation_garbage (struct Set *set)
1933{
1934 struct GarbageContext gc;
1935
1936 gc.min_op_generation = UINT_MAX;
1937 gc.max_op_generation = 0;
1938 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
1939 {
1940 gc.min_op_generation =
1941 GNUNET_MIN (gc.min_op_generation, op->generation_created);
1942 gc.max_op_generation =
1943 GNUNET_MAX (gc.max_op_generation, op->generation_created);
1944 }
1945 gc.map = set->content->elements;
1946 GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
1947 &garbage_collect_cb,
1948 &gc);
1949}
1950
1951
1952/**
1953 * Is @a generation in the range of exclusions?
1954 *
1955 * @param generation generation to query
1956 * @param excluded array of generations where the element is excluded
1957 * @param excluded_size length of the @a excluded array
1958 * @return #GNUNET_YES if @a generation is in any of the ranges
1959 */
1960static int
1961is_excluded_generation (unsigned int generation,
1962 struct GenerationRange *excluded,
1963 unsigned int excluded_size)
1964{
1965 for (unsigned int i = 0; i < excluded_size; i++)
1966 if ((generation >= excluded[i].start) && (generation < excluded[i].end))
1967 return GNUNET_YES;
1968 return GNUNET_NO;
1969}
1970
1971
1972/**
1973 * Is element @a ee part of the set during @a query_generation?
1974 *
1975 * @param ee element to test
1976 * @param query_generation generation to query
1977 * @param excluded array of generations where the element is excluded
1978 * @param excluded_size length of the @a excluded array
1979 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
1980 */
1981static int
1982is_element_of_generation (struct ElementEntry *ee,
1983 unsigned int query_generation,
1984 struct GenerationRange *excluded,
1985 unsigned int excluded_size)
1986{
1987 struct MutationEvent *mut;
1988 int is_present;
1989
1990 GNUNET_assert (NULL != ee->mutations);
1991 if (GNUNET_YES ==
1992 is_excluded_generation (query_generation, excluded, excluded_size))
1993 {
1994 GNUNET_break (0);
1995 return GNUNET_NO;
1996 }
1997
1998 is_present = GNUNET_NO;
1999
2000 /* Could be made faster with binary search, but lists
2001 are small, so why bother. */
2002 for (unsigned int i = 0; i < ee->mutations_size; i++)
2003 {
2004 mut = &ee->mutations[i];
2005
2006 if (mut->generation > query_generation)
2007 {
2008 /* The mutation doesn't apply to our generation
2009 anymore. We can'b break here, since mutations aren't
2010 sorted by generation. */
2011 continue;
2012 }
2013
2014 if (GNUNET_YES ==
2015 is_excluded_generation (mut->generation, excluded, excluded_size))
2016 {
2017 /* The generation is excluded (because it belongs to another
2018 fork via a lazy copy) and thus mutations aren't considered
2019 for membership testing. */
2020 continue;
2021 }
2022
2023 /* This would be an inconsistency in how we manage mutations. */
2024 if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added))
2025 GNUNET_assert (0);
2026 /* Likewise. */
2027 if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added))
2028 GNUNET_assert (0);
2029
2030 is_present = mut->added;
2031 }
2032
2033 return is_present;
2034}
2035
2036
2037/**
2038 * Is element @a ee part of the set used by @a op? 1537 * Is element @a ee part of the set used by @a op?
2039 * 1538 *
2040 * @param ee element to test 1539 * @param ee element to test
2041 * @param op operation the defines the set and its generation 1540 * @param op operation the defines the set and its generation
2042 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not 1541 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
2043 */ 1542 */
2044int 1543static int
2045_GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op) 1544_GSS_is_element_of_operation (struct ElementEntry *ee,
1545 struct Operation *op)
2046{ 1546{
2047 return is_element_of_generation (ee, 1547 return op->generation_created >= ee->generation_added;
2048 op->generation_created,
2049 op->set->excluded_generations,
2050 op->set->excluded_generations_size);
2051} 1548}
2052 1549
2053 1550
@@ -2062,10 +1559,9 @@ _GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op)
2062 * as there may be multiple operations per set. 1559 * as there may be multiple operations per set.
2063 * 1560 *
2064 * @param op operation to destroy 1561 * @param op operation to destroy
2065 * @param gc #GNUNET_YES to perform garbage collection on the set
2066 */ 1562 */
2067void 1563static void
2068_GSS_operation_destroy (struct Operation *op, int gc) 1564_GSS_operation_destroy (struct Operation *op)
2069{ 1565{
2070 struct Set *set = op->set; 1566 struct Set *set = op->set;
2071 struct GNUNET_CADET_Channel *channel; 1567 struct GNUNET_CADET_Channel *channel;
@@ -2074,12 +1570,39 @@ _GSS_operation_destroy (struct Operation *op, int gc)
2074 GNUNET_assert (NULL == op->listener); 1570 GNUNET_assert (NULL == op->listener);
2075 if (NULL != op->state) 1571 if (NULL != op->state)
2076 { 1572 {
2077 intersection_cancel (op); // FIXME: inline 1573 /* check if the op was canceled twice */
1574 GNUNET_assert (NULL != op->state);
1575 if (NULL != op->state->remote_bf)
1576 {
1577 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1578 op->state->remote_bf = NULL;
1579 }
1580 if (NULL != op->state->local_bf)
1581 {
1582 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1583 op->state->local_bf = NULL;
1584 }
1585 if (NULL != op->state->my_elements)
1586 {
1587 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1588 op->state->my_elements = NULL;
1589 }
1590 if (NULL != op->state->full_result_iter)
1591 {
1592 GNUNET_CONTAINER_multihashmap_iterator_destroy (
1593 op->state->full_result_iter);
1594 op->state->full_result_iter = NULL;
1595 }
1596 GNUNET_free (op->state);
2078 op->state = NULL; 1597 op->state = NULL;
1598 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1599 "Destroying intersection op state done\n");
2079 } 1600 }
2080 if (NULL != set) 1601 if (NULL != set)
2081 { 1602 {
2082 GNUNET_CONTAINER_DLL_remove (set->ops_head, set->ops_tail, op); 1603 GNUNET_CONTAINER_DLL_remove (set->ops_head,
1604 set->ops_tail,
1605 op);
2083 op->set = NULL; 1606 op->set = NULL;
2084 } 1607 }
2085 if (NULL != op->context_msg) 1608 if (NULL != op->context_msg)
@@ -2094,8 +1617,6 @@ _GSS_operation_destroy (struct Operation *op, int gc)
2094 op->channel = NULL; 1617 op->channel = NULL;
2095 GNUNET_CADET_channel_destroy (channel); 1618 GNUNET_CADET_channel_destroy (channel);
2096 } 1619 }
2097 if ((NULL != set) && (GNUNET_YES == gc))
2098 collect_generation_garbage (set);
2099 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, 1620 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
2100 * there was a channel end handler that will free 'op' on the call stack. */ 1621 * there was a channel end handler that will free 'op' on the call stack. */
2101} 1622}
@@ -2166,8 +1687,6 @@ client_disconnect_cb (void *cls,
2166 if (NULL != (set = cs->set)) 1687 if (NULL != (set = cs->set))
2167 { 1688 {
2168 struct SetContent *content = set->content; 1689 struct SetContent *content = set->content;
2169 struct PendingMutation *pm;
2170 struct PendingMutation *pm_current;
2171 1690
2172 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n"); 1691 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
2173 /* Destroy pending set operations */ 1692 /* Destroy pending set operations */
@@ -2176,8 +1695,7 @@ client_disconnect_cb (void *cls,
2176 1695
2177 /* Destroy operation-specific state */ 1696 /* Destroy operation-specific state */
2178 GNUNET_assert (NULL != set->state); 1697 GNUNET_assert (NULL != set->state);
2179 intersection_set_destroy (set->state); // FIXME: inline 1698 GNUNET_free (set->state);
2180 set->state = NULL;
2181 1699
2182 /* Clean up ongoing iterations */ 1700 /* Clean up ongoing iterations */
2183 if (NULL != set->iter) 1701 if (NULL != set->iter)
@@ -2187,21 +1705,6 @@ client_disconnect_cb (void *cls,
2187 set->iteration_id++; 1705 set->iteration_id++;
2188 } 1706 }
2189 1707
2190 /* discard any pending mutations that reference this set */
2191 pm = content->pending_mutations_head;
2192 while (NULL != pm)
2193 {
2194 pm_current = pm;
2195 pm = pm->next;
2196 if (pm_current->set == set)
2197 {
2198 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
2199 content->pending_mutations_tail,
2200 pm_current);
2201 GNUNET_free (pm_current);
2202 }
2203 }
2204
2205 /* free set content (or at least decrement RC) */ 1708 /* free set content (or at least decrement RC) */
2206 set->content = NULL; 1709 set->content = NULL;
2207 GNUNET_assert (0 != content->refcount); 1710 GNUNET_assert (0 != content->refcount);
@@ -2260,7 +1763,8 @@ client_disconnect_cb (void *cls,
2260 * #GNUNET_SYSERR to destroy the channel 1763 * #GNUNET_SYSERR to destroy the channel
2261 */ 1764 */
2262static int 1765static int
2263check_incoming_msg (void *cls, const struct OperationRequestMessage *msg) 1766check_incoming_msg (void *cls,
1767 const struct OperationRequestMessage *msg)
2264{ 1768{
2265 struct Operation *op = cls; 1769 struct Operation *op = cls;
2266 struct Listener *listener = op->listener; 1770 struct Listener *listener = op->listener;
@@ -2313,7 +1817,8 @@ check_incoming_msg (void *cls, const struct OperationRequestMessage *msg)
2313 * #GNUNET_SYSERR to destroy the channel 1817 * #GNUNET_SYSERR to destroy the channel
2314 */ 1818 */
2315static void 1819static void
2316handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg) 1820handle_incoming_msg (void *cls,
1821 const struct OperationRequestMessage *msg)
2317{ 1822{
2318 struct Operation *op = cls; 1823 struct Operation *op = cls;
2319 struct Listener *listener = op->listener; 1824 struct Listener *listener = op->listener;
@@ -2358,121 +1863,6 @@ handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg)
2358 1863
2359 1864
2360/** 1865/**
2361 * Add an element to @a set as specified by @a msg
2362 *
2363 * @param set set to manipulate
2364 * @param msg message specifying the change
2365 */
2366static void
2367execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
2368{
2369 struct GNUNET_SET_Element el;
2370 struct ElementEntry *ee;
2371 struct GNUNET_HashCode hash;
2372
2373 GNUNET_assert (GNUNET_MESSAGE_TYPE_SETI_ADD == ntohs (msg->header.type));
2374 el.size = ntohs (msg->header.size) - sizeof(*msg);
2375 el.data = &msg[1];
2376 el.element_type = ntohs (msg->element_type);
2377 GNUNET_SET_element_hash (&el, &hash);
2378 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash);
2379 if (NULL == ee)
2380 {
2381 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2382 "Client inserts element %s of size %u\n",
2383 GNUNET_h2s (&hash),
2384 el.size);
2385 ee = GNUNET_malloc (el.size + sizeof(*ee));
2386 ee->element.size = el.size;
2387 GNUNET_memcpy (&ee[1], el.data, el.size);
2388 ee->element.data = &ee[1];
2389 ee->element.element_type = el.element_type;
2390 ee->remote = GNUNET_NO;
2391 ee->mutations = NULL;
2392 ee->mutations_size = 0;
2393 ee->element_hash = hash;
2394 GNUNET_break (GNUNET_YES ==
2395 GNUNET_CONTAINER_multihashmap_put (
2396 set->content->elements,
2397 &ee->element_hash,
2398 ee,
2399 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2400 }
2401 else if (GNUNET_YES ==
2402 is_element_of_generation (ee,
2403 set->current_generation,
2404 set->excluded_generations,
2405 set->excluded_generations_size))
2406 {
2407 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2408 "Client inserted element %s of size %u twice (ignored)\n",
2409 GNUNET_h2s (&hash),
2410 el.size);
2411
2412 /* same element inserted twice */
2413 return;
2414 }
2415
2416 {
2417 struct MutationEvent mut = { .generation = set->current_generation,
2418 .added = GNUNET_YES };
2419 GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
2420 }
2421 // FIXME: inline
2422 intersection_add (set->state,
2423 ee);
2424}
2425
2426
2427/**
2428 * Perform a mutation on a set as specified by the @a msg
2429 *
2430 * @param set the set to mutate
2431 * @param msg specification of what to change
2432 */
2433static void
2434execute_mutation (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
2435{
2436 switch (ntohs (msg->header.type))
2437 {
2438 case GNUNET_MESSAGE_TYPE_SETI_ADD: // FIXME: inline!
2439 execute_add (set, msg);
2440 break;
2441 default:
2442 GNUNET_break (0);
2443 }
2444}
2445
2446
2447/**
2448 * Execute mutations that were delayed on a set because of
2449 * pending operations.
2450 *
2451 * @param set the set to execute mutations on
2452 */
2453static void
2454execute_delayed_mutations (struct Set *set)
2455{
2456 struct PendingMutation *pm;
2457
2458 if (0 != set->content->iterator_count)
2459 return; /* still cannot do this */
2460 while (NULL != (pm = set->content->pending_mutations_head))
2461 {
2462 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
2463 set->content->pending_mutations_tail,
2464 pm);
2465 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2466 "Executing pending mutation on %p.\n",
2467 pm->set);
2468 execute_mutation (pm->set, pm->msg);
2469 GNUNET_free (pm->msg);
2470 GNUNET_free (pm);
2471 }
2472}
2473
2474
2475/**
2476 * Send the next element of a set to the set's client. The next element is given by 1866 * Send the next element of a set to the set's client. The next element is given by
2477 * the set's current hashmap iterator. The set's iterator will be set to NULL if there 1867 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
2478 * are no more elements in the set. The caller must ensure that the set's iterator is 1868 * are no more elements in the set. The caller must ensure that the set's iterator is
@@ -2542,7 +1932,8 @@ send_client_element (struct Set *set)
2542 * @param m message sent by the client 1932 * @param m message sent by the client
2543 */ 1933 */
2544static void 1934static void
2545handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m) 1935handle_client_iterate (void *cls,
1936 const struct GNUNET_MessageHeader *m)
2546{ 1937{
2547 struct ClientState *cs = cls; 1938 struct ClientState *cs = cls;
2548 struct Set *set; 1939 struct Set *set;
@@ -2584,7 +1975,8 @@ handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m)
2584 * @param m message sent by the client 1975 * @param m message sent by the client
2585 */ 1976 */
2586static void 1977static void
2587handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg) 1978handle_client_create_set (void *cls,
1979 const struct GNUNET_SET_CreateMessage *msg)
2588{ 1980{
2589 struct ClientState *cs = cls; 1981 struct ClientState *cs = cls;
2590 struct Set *set; 1982 struct Set *set;
@@ -2600,24 +1992,18 @@ handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg)
2600 return; 1992 return;
2601 } 1993 }
2602 set = GNUNET_new (struct Set); 1994 set = GNUNET_new (struct Set);
2603 switch (ntohl (msg->operation))
2604 { 1995 {
2605 case GNUNET_SET_OPERATION_INTERSECTION: 1996 struct SetState *set_state;
2606 set->vt = _GSS_intersection_vt ();
2607 break;
2608 1997
2609 case GNUNET_SET_OPERATION_UNION: 1998 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2610 set->vt = _GSS_union_vt (); 1999 "Intersection set created\n");
2611 break; 2000 set_state = GNUNET_new (struct SetState);
2001 set_state->current_set_element_count = 0;
2612 2002
2613 default: 2003 set->state = set_state;
2614 GNUNET_free (set);
2615 GNUNET_break (0);
2616 GNUNET_SERVICE_client_drop (cs->client);
2617 return;
2618 } 2004 }
2619 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); 2005
2620 set->state = intersection_set_create (); // FIXME: inline 2006
2621 if (NULL == set->state) 2007 if (NULL == set->state)
2622 { 2008 {
2623 /* initialization failed (i.e. out of memory) */ 2009 /* initialization failed (i.e. out of memory) */
@@ -2627,7 +2013,8 @@ handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg)
2627 } 2013 }
2628 set->content = GNUNET_new (struct SetContent); 2014 set->content = GNUNET_new (struct SetContent);
2629 set->content->refcount = 1; 2015 set->content->refcount = 1;
2630 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 2016 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
2017 GNUNET_YES);
2631 set->cs = cs; 2018 set->cs = cs;
2632 cs->set = set; 2019 cs->set = set;
2633 GNUNET_SERVICE_client_continue (cs->client); 2020 GNUNET_SERVICE_client_continue (cs->client);
@@ -2679,17 +2066,21 @@ channel_new_cb (void *cls,
2679 struct Listener *listener = cls; 2066 struct Listener *listener = cls;
2680 struct Operation *op; 2067 struct Operation *op;
2681 2068
2682 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n"); 2069 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2070 "New incoming channel\n");
2683 op = GNUNET_new (struct Operation); 2071 op = GNUNET_new (struct Operation);
2684 op->listener = listener; 2072 op->listener = listener;
2685 op->peer = *source; 2073 op->peer = *source;
2686 op->channel = channel; 2074 op->channel = channel;
2687 op->mq = GNUNET_CADET_get_mq (op->channel); 2075 op->mq = GNUNET_CADET_get_mq (op->channel);
2688 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); 2076 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
2077 UINT32_MAX);
2689 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, 2078 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
2690 &incoming_timeout_cb, 2079 &incoming_timeout_cb,
2691 op); 2080 op);
2692 GNUNET_CONTAINER_DLL_insert (listener->op_head, listener->op_tail, op); 2081 GNUNET_CONTAINER_DLL_insert (listener->op_head,
2082 listener->op_tail,
2083 op);
2693 return op; 2084 return op;
2694} 2085}
2695 2086
@@ -2711,7 +2102,8 @@ channel_new_cb (void *cls,
2711 * @param channel connection to the other end (henceforth invalid) 2102 * @param channel connection to the other end (henceforth invalid)
2712 */ 2103 */
2713static void 2104static void
2714channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel) 2105channel_end_cb (void *channel_ctx,
2106 const struct GNUNET_CADET_Channel *channel)
2715{ 2107{
2716 struct Operation *op = channel_ctx; 2108 struct Operation *op = channel_ctx;
2717 2109
@@ -2725,12 +2117,13 @@ channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
2725 * and be replaced by inlining more specific 2117 * and be replaced by inlining more specific
2726 * logic in the various places where it is called. 2118 * logic in the various places where it is called.
2727 */ 2119 */
2728void 2120static void
2729_GSS_operation_destroy2 (struct Operation *op) 2121_GSS_operation_destroy2 (struct Operation *op)
2730{ 2122{
2731 struct GNUNET_CADET_Channel *channel; 2123 struct GNUNET_CADET_Channel *channel;
2732 2124
2733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n"); 2125 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2126 "channel_end_cb called\n");
2734 if (NULL != (channel = op->channel)) 2127 if (NULL != (channel = op->channel))
2735 { 2128 {
2736 /* This will free op; called conditionally as this helper function 2129 /* This will free op; called conditionally as this helper function
@@ -2744,9 +2137,22 @@ _GSS_operation_destroy2 (struct Operation *op)
2744 return; 2137 return;
2745 } 2138 }
2746 if (NULL != op->set) 2139 if (NULL != op->set)
2747 intersection_channel_death (op); // FIXME: inline 2140 {
2141 if (GNUNET_YES == op->state->channel_death_expected)
2142 {
2143 /* oh goodie, we are done! */
2144 send_client_done_and_destroy (op);
2145 }
2146 else
2147 {
2148 /* sorry, channel went down early, too bad. */
2149 _GSS_operation_destroy (op,
2150 GNUNET_YES);
2151 }
2152 }
2748 else 2153 else
2749 _GSS_operation_destroy (op, GNUNET_YES); 2154 _GSS_operation_destroy (op,
2155 GNUNET_YES);
2750 GNUNET_free (op); 2156 GNUNET_free (op);
2751} 2157}
2752 2158
@@ -2781,7 +2187,8 @@ channel_window_cb (void *cls,
2781 * @param msg message sent by the client 2187 * @param msg message sent by the client
2782 */ 2188 */
2783static void 2189static void
2784handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg) 2190handle_client_listen (void *cls,
2191 const struct GNUNET_SET_ListenMessage *msg)
2785{ 2192{
2786 struct ClientState *cs = cls; 2193 struct ClientState *cs = cls;
2787 struct GNUNET_MQ_MessageHandler cadet_handlers[] = 2194 struct GNUNET_MQ_MessageHandler cadet_handlers[] =
@@ -2816,7 +2223,9 @@ handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg)
2816 cs->listener = listener; 2223 cs->listener = listener;
2817 listener->app_id = msg->app_id; 2224 listener->app_id = msg->app_id;
2818 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); 2225 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
2819 GNUNET_CONTAINER_DLL_insert (listener_head, listener_tail, listener); 2226 GNUNET_CONTAINER_DLL_insert (listener_head,
2227 listener_tail,
2228 listener);
2820 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2229 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2821 "New listener created (op %u, port %s)\n", 2230 "New listener created (op %u, port %s)\n",
2822 listener->operation, 2231 listener->operation,
@@ -2840,7 +2249,8 @@ handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg)
2840 * @param msg message sent by the client 2249 * @param msg message sent by the client
2841 */ 2250 */
2842static void 2251static void
2843handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg) 2252handle_client_reject (void *cls,
2253 const struct GNUNET_SET_RejectMessage *msg)
2844{ 2254{
2845 struct ClientState *cs = cls; 2255 struct ClientState *cs = cls;
2846 struct Operation *op; 2256 struct Operation *op;
@@ -2872,7 +2282,8 @@ handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg)
2872 * @param msg message sent by the client 2282 * @param msg message sent by the client
2873 */ 2283 */
2874static int 2284static int
2875check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) 2285check_client_mutation (void *cls,
2286 const struct GNUNET_SET_ElementMessage *msg)
2876{ 2287{
2877 /* NOTE: Technically, we should probably check with the 2288 /* NOTE: Technically, we should probably check with the
2878 block library whether the element we are given is well-formed */ 2289 block library whether the element we are given is well-formed */
@@ -2881,16 +2292,20 @@ check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
2881 2292
2882 2293
2883/** 2294/**
2884 * Called when a client wants to add or remove an element to a set it inhabits. 2295 * Called when a client wants to add an element to a set it inhabits.
2885 * 2296 *
2886 * @param cls client that sent the message 2297 * @param cls client that sent the message
2887 * @param msg message sent by the client 2298 * @param msg message sent by the client
2888 */ 2299 */
2889static void 2300static void
2890handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) 2301handle_client_set_add (void *cls,
2302 const struct GNUNET_SET_ElementMessage *msg)
2891{ 2303{
2892 struct ClientState *cs = cls; 2304 struct ClientState *cs = cls;
2893 struct Set *set; 2305 struct Set *set;
2306 struct GNUNET_SET_Element el;
2307 struct ElementEntry *ee;
2308 struct GNUNET_HashCode hash;
2894 2309
2895 if (NULL == (set = cs->set)) 2310 if (NULL == (set = cs->set))
2896 { 2311 {
@@ -2900,23 +2315,45 @@ handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
2900 return; 2315 return;
2901 } 2316 }
2902 GNUNET_SERVICE_client_continue (cs->client); 2317 GNUNET_SERVICE_client_continue (cs->client);
2903 2318 el.size = ntohs (msg->header.size) - sizeof(*msg);
2904 if (0 != set->content->iterator_count) 2319 el.data = &msg[1];
2320 el.element_type = ntohs (msg->element_type);
2321 GNUNET_ISET_element_hash (&el,
2322 &hash);
2323 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
2324 &hash);
2325 if (NULL == ee)
2905 { 2326 {
2906 struct PendingMutation *pm; 2327 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2907 2328 "Client inserts element %s of size %u\n",
2908 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n"); 2329 GNUNET_h2s (&hash),
2909 pm = GNUNET_new (struct PendingMutation); 2330 el.size);
2910 pm->msg = 2331 ee = GNUNET_malloc (el.size + sizeof(*ee));
2911 (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header); 2332 ee->element.size = el.size;
2912 pm->set = set; 2333 GNUNET_memcpy (&ee[1], el.data, el.size);
2913 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head, 2334 ee->element.data = &ee[1];
2914 set->content->pending_mutations_tail, 2335 ee->element.element_type = el.element_type;
2915 pm); 2336 ee->remote = GNUNET_NO;
2337 ee->mutations = NULL;
2338 ee->mutations_size = 0;
2339 ee->element_hash = hash;
2340 GNUNET_break (GNUNET_YES ==
2341 GNUNET_CONTAINER_multihashmap_put (
2342 set->content->elements,
2343 &ee->element_hash,
2344 ee,
2345 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2346 }
2347 else
2348 {
2349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2350 "Client inserted element %s of size %u twice (ignored)\n",
2351 GNUNET_h2s (&hash),
2352 el.size);
2353 /* same element inserted twice */
2916 return; 2354 return;
2917 } 2355 }
2918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n"); 2356 set->state->current_set_element_count++;
2919 execute_mutation (set, msg);
2920} 2357}
2921 2358
2922 2359
@@ -2929,24 +2366,13 @@ handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
2929static void 2366static void
2930advance_generation (struct Set *set) 2367advance_generation (struct Set *set)
2931{ 2368{
2932 struct GenerationRange r;
2933
2934 if (set->current_generation == set->content->latest_generation) 2369 if (set->current_generation == set->content->latest_generation)
2935 { 2370 {
2936 set->content->latest_generation++; 2371 set->content->latest_generation++;
2937 set->current_generation++; 2372 set->current_generation++;
2938 return; 2373 return;
2939 } 2374 }
2940
2941 GNUNET_assert (set->current_generation < set->content->latest_generation); 2375 GNUNET_assert (set->current_generation < set->content->latest_generation);
2942
2943 r.start = set->current_generation + 1;
2944 r.end = set->content->latest_generation + 1;
2945 set->content->latest_generation = r.end;
2946 set->current_generation = r.end;
2947 GNUNET_array_append (set->excluded_generations,
2948 set->excluded_generations_size,
2949 r);
2950} 2376}
2951 2377
2952 2378
@@ -2960,7 +2386,8 @@ advance_generation (struct Set *set)
2960 * @return #GNUNET_OK if the message is well-formed 2386 * @return #GNUNET_OK if the message is well-formed
2961 */ 2387 */
2962static int 2388static int
2963check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) 2389check_client_evaluate (void *cls,
2390 const struct GNUNET_SET_EvaluateMessage *msg)
2964{ 2391{
2965 /* FIXME: suboptimal, even if the context below could be NULL, 2392 /* FIXME: suboptimal, even if the context below could be NULL,
2966 there are malformed messages this does not check for... */ 2393 there are malformed messages this does not check for... */
@@ -2977,7 +2404,8 @@ check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
2977 * @param msg message sent by the client 2404 * @param msg message sent by the client
2978 */ 2405 */
2979static void 2406static void
2980handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) 2407handle_client_evaluate (void *cls,
2408 const struct GNUNET_SET_EvaluateMessage *msg)
2981{ 2409{
2982 struct ClientState *cs = cls; 2410 struct ClientState *cs = cls;
2983 struct Operation *op = GNUNET_new (struct Operation); 2411 struct Operation *op = GNUNET_new (struct Operation);
@@ -3010,7 +2438,8 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
3010 GNUNET_SERVICE_client_drop (cs->client); 2438 GNUNET_SERVICE_client_drop (cs->client);
3011 return; 2439 return;
3012 } 2440 }
3013 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); 2441 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
2442 UINT32_MAX);
3014 op->peer = msg->target_peer; 2443 op->peer = msg->target_peer;
3015 op->result_mode = ntohl (msg->result_mode); 2444 op->result_mode = ntohl (msg->result_mode);
3016 op->client_request_id = ntohl (msg->request_id); 2445 op->client_request_id = ntohl (msg->request_id);
@@ -3025,7 +2454,9 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
3025 op->set = set; 2454 op->set = set;
3026 op->generation_created = set->current_generation; 2455 op->generation_created = set->current_generation;
3027 advance_generation (set); 2456 advance_generation (set);
3028 GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); 2457 GNUNET_CONTAINER_DLL_insert (set->ops_head,
2458 set->ops_tail,
2459 op);
3029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2460 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3030 "Creating new CADET channel to port %s for set operation type %u\n", 2461 "Creating new CADET channel to port %s for set operation type %u\n",
3031 GNUNET_h2s (&msg->app_id), 2462 GNUNET_h2s (&msg->app_id),
@@ -3038,12 +2469,42 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
3038 &channel_end_cb, 2469 &channel_end_cb,
3039 cadet_handlers); 2470 cadet_handlers);
3040 op->mq = GNUNET_CADET_get_mq (op->channel); 2471 op->mq = GNUNET_CADET_get_mq (op->channel);
3041 op->state = intersection_evaluate (op, context); // FIXME: inline!
3042 if (NULL == op->state)
3043 { 2472 {
3044 GNUNET_break (0); 2473 struct OperationState *state;
3045 GNUNET_SERVICE_client_drop (cs->client); 2474 struct GNUNET_MQ_Envelope *ev;
3046 return; 2475 struct OperationRequestMessage *msg;
2476
2477 ev = GNUNET_MQ_msg_nested_mh (msg,
2478 GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST,
2479 context);
2480 if (NULL == ev)
2481 {
2482 /* the context message is too large!? */
2483 GNUNET_break (0);
2484 GNUNET_SERVICE_client_drop (cs->client);
2485 return;
2486 }
2487 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2488 "Initiating intersection operation evaluation\n");
2489 state = GNUNET_new (struct OperationState);
2490 /* we started the operation, thus we have to send the operation request */
2491 state->phase = PHASE_INITIAL;
2492 state->my_element_count = op->set->state->current_set_element_count;
2493 state->my_elements
2494 = GNUNET_CONTAINER_multihashmap_create (state->my_element_count,
2495 GNUNET_YES);
2496
2497 msg->element_count = htonl (state->my_element_count);
2498 GNUNET_MQ_send (op->mq,
2499 ev);
2500 state->phase = PHASE_COUNT_SENT;
2501 if (NULL != opaque_context)
2502 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2503 "Sent op request with context message\n");
2504 else
2505 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2506 "Sent op request without context message\n");
2507 op->state = state;
3047 } 2508 }
3048 GNUNET_SERVICE_client_continue (cs->client); 2509 GNUNET_SERVICE_client_continue (cs->client);
3049} 2510}
@@ -3056,7 +2517,8 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
3056 * @param msg the message 2517 * @param msg the message
3057 */ 2518 */
3058static void 2519static void
3059handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg) 2520handle_client_cancel (void *cls,
2521 const struct GNUNET_SET_CancelMessage *msg)
3060{ 2522{
3061 struct ClientState *cs = cls; 2523 struct ClientState *cs = cls;
3062 struct Set *set; 2524 struct Set *set;
@@ -3085,7 +2547,8 @@ handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg)
3085 * the other peer disconnecting. The client may not know about this 2547 * the other peer disconnecting. The client may not know about this
3086 * yet and try to cancel the (just barely non-existent) operation. 2548 * yet and try to cancel the (just barely non-existent) operation.
3087 * So this is not a hard error. 2549 * So this is not a hard error.
3088 */GNUNET_log (GNUNET_ERROR_TYPE_INFO, 2550 *///
2551 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3089 "Client canceled non-existent op %u\n", 2552 "Client canceled non-existent op %u\n",
3090 (uint32_t) ntohl (msg->request_id)); 2553 (uint32_t) ntohl (msg->request_id));
3091 } 2554 }
@@ -3109,7 +2572,8 @@ handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg)
3109 * @param msg the message 2572 * @param msg the message
3110 */ 2573 */
3111static void 2574static void
3112handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg) 2575handle_client_accept (void *cls,
2576 const struct GNUNET_SET_AcceptMessage *msg)
3113{ 2577{
3114 struct ClientState *cs = cls; 2578 struct ClientState *cs = cls;
3115 struct Set *set; 2579 struct Set *set;
@@ -3136,7 +2600,8 @@ handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg)
3136 cs, 2600 cs,
3137 ntohl (msg->accept_reject_id), 2601 ntohl (msg->accept_reject_id),
3138 cs->listener); 2602 cs->listener);
3139 ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SETI_RESULT); 2603 ev = GNUNET_MQ_msg (result_message,
2604 GNUNET_MESSAGE_TYPE_SETI_RESULT);
3140 result_message->request_id = msg->request_id; 2605 result_message->request_id = msg->request_id;
3141 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); 2606 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
3142 GNUNET_MQ_send (set->cs->mq, ev); 2607 GNUNET_MQ_send (set->cs->mq, ev);
@@ -3163,12 +2628,34 @@ handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg)
3163 op->generation_created = set->current_generation; 2628 op->generation_created = set->current_generation;
3164 advance_generation (set); 2629 advance_generation (set);
3165 GNUNET_assert (NULL == op->state); 2630 GNUNET_assert (NULL == op->state);
3166 op->state = intersection_accept (op); // FIXME: inline
3167 if (NULL == op->state)
3168 { 2631 {
3169 GNUNET_break (0); 2632 struct OperationState *state;
3170 GNUNET_SERVICE_client_drop (cs->client); 2633
3171 return; 2634 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2635 "Accepting set intersection operation\n");
2636 state = GNUNET_new (struct OperationState);
2637 state->phase = PHASE_INITIAL;
2638 state->my_element_count
2639 = op->set->state->current_set_element_count;
2640 state->my_elements
2641 = GNUNET_CONTAINER_multihashmap_create (
2642 GNUNET_MIN (state->my_element_count,
2643 op->remote_element_count),
2644 GNUNET_YES);
2645 op->state = state;
2646 if (op->remote_element_count < state->my_element_count)
2647 {
2648 /* If the other peer (Alice) has fewer elements than us (Bob),
2649 we just send the count as Alice should send the first BF */
2650 send_element_count (op);
2651 state->phase = PHASE_COUNT_SENT;
2652 }
2653 else
2654 {
2655 /* We have fewer elements, so we start with the BF */
2656 begin_bf_exchange (op);
2657 }
2658 op->state = state;
3172 } 2659 }
3173 /* Now allow CADET to continue, as we did not do this in 2660 /* Now allow CADET to continue, as we did not do this in
3174 #handle_incoming_msg (as we wanted to first see if the 2661 #handle_incoming_msg (as we wanted to first see if the
@@ -3196,8 +2683,10 @@ shutdown_task (void *cls)
3196 cadet = NULL; 2683 cadet = NULL;
3197 } 2684 }
3198 } 2685 }
3199 GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES); 2686 GNUNET_STATISTICS_destroy (_GSS_statistics,
3200 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n"); 2687 GNUNET_YES);
2688 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2689 "handled shutdown request\n");
3201} 2690}
3202 2691
3203 2692
@@ -3217,8 +2706,10 @@ run (void *cls,
3217 /* FIXME: need to modify SERVICE (!) API to allow 2706 /* FIXME: need to modify SERVICE (!) API to allow
3218 us to run a shutdown task *after* clients were 2707 us to run a shutdown task *after* clients were
3219 forcefully disconnected! */ 2708 forcefully disconnected! */
3220 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); 2709 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3221 _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); 2710 NULL);
2711 _GSS_statistics = GNUNET_STATISTICS_create ("seti",
2712 cfg);
3222 cadet = GNUNET_CADET_connect (cfg); 2713 cadet = GNUNET_CADET_connect (cfg);
3223 if (NULL == cadet) 2714 if (NULL == cadet)
3224 { 2715 {
@@ -3234,7 +2725,7 @@ run (void *cls,
3234 * Define "main" method using service macro. 2725 * Define "main" method using service macro.
3235 */ 2726 */
3236GNUNET_SERVICE_MAIN ( 2727GNUNET_SERVICE_MAIN (
3237 "set", 2728 "seti",
3238 GNUNET_SERVICE_OPTION_NONE, 2729 GNUNET_SERVICE_OPTION_NONE,
3239 &run, 2730 &run,
3240 &client_connect_cb, 2731 &client_connect_cb,
@@ -3244,7 +2735,7 @@ GNUNET_SERVICE_MAIN (
3244 GNUNET_MESSAGE_TYPE_SETI_ACCEPT, 2735 GNUNET_MESSAGE_TYPE_SETI_ACCEPT,
3245 struct GNUNET_SET_AcceptMessage, 2736 struct GNUNET_SET_AcceptMessage,
3246 NULL), 2737 NULL),
3247 GNUNET_MQ_hd_var_size (client_mutation, 2738 GNUNET_MQ_hd_var_size (client_set_add,
3248 GNUNET_MESSAGE_TYPE_SETI_ADD, 2739 GNUNET_MESSAGE_TYPE_SETI_ADD,
3249 struct GNUNET_SET_ElementMessage, 2740 struct GNUNET_SET_ElementMessage,
3250 NULL), 2741 NULL),