diff options
author | Christian Grothoff <christian@grothoff.org> | 2020-08-18 19:31:39 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2020-08-18 19:31:39 +0200 |
commit | 3fb3bf908f4977aef6bde6a450954e2704b14bcb (patch) | |
tree | 121dadd228d01da8f777f9e31c3b55ef5684e0ff | |
parent | 4d607f2f2838431cc7a349441f8f018ab99633a2 (diff) | |
download | gnunet-3fb3bf908f4977aef6bde6a450954e2704b14bcb.tar.gz gnunet-3fb3bf908f4977aef6bde6a450954e2704b14bcb.zip |
-splitting of set intersection functionality from set service (not yet finished, FTBFS)
-rw-r--r-- | src/seti/gnunet-service-seti.c | 997 |
1 files changed, 244 insertions, 753 deletions
diff --git a/src/seti/gnunet-service-seti.c b/src/seti/gnunet-service-seti.c index 3b8da01cd..037181bde 100644 --- a/src/seti/gnunet-service-seti.c +++ b/src/seti/gnunet-service-seti.c | |||
@@ -108,28 +108,6 @@ struct Operation; | |||
108 | 108 | ||
109 | 109 | ||
110 | /** | 110 | /** |
111 | * MutationEvent gives information about changes | ||
112 | * to an element (removal / addition) in a set content. | ||
113 | */ | ||
114 | struct MutationEvent | ||
115 | { | ||
116 | /** | ||
117 | * First generation affected by this mutation event. | ||
118 | * | ||
119 | * If @a generation is 0, this mutation event is a list | ||
120 | * sentinel element. | ||
121 | */ | ||
122 | unsigned int generation; | ||
123 | |||
124 | /** | ||
125 | * If @a added is #GNUNET_YES, then this is a | ||
126 | * `remove` event, otherwise it is an `add` event. | ||
127 | */ | ||
128 | int added; | ||
129 | }; | ||
130 | |||
131 | |||
132 | /** | ||
133 | * Information about an element element in the set. All elements are | 111 | * Information about an element element in the set. All elements are |
134 | * stored in a hash-table from their hash-code to their `struct | 112 | * stored in a hash-table from their hash-code to their `struct |
135 | * Element`, so that the remove and add operations are reasonably | 113 | * Element`, so that the remove and add operations are reasonably |
@@ -150,20 +128,9 @@ struct ElementEntry | |||
150 | struct GNUNET_HashCode element_hash; | 128 | struct GNUNET_HashCode element_hash; |
151 | 129 | ||
152 | /** | 130 | /** |
153 | * If @a mutations is not NULL, it contains | 131 | * Generation in which the element was added. |
154 | * a list of mutations, ordered by increasing generation. | ||
155 | * The list is terminated by a sentinel event with `generation` | ||
156 | * set to 0. | ||
157 | * | ||
158 | * If @a mutations is NULL, then this element exists in all generations | ||
159 | * of the respective set content this element belongs to. | ||
160 | */ | ||
161 | struct MutationEvent *mutations; | ||
162 | |||
163 | /** | ||
164 | * Number of elements in the array @a mutations. | ||
165 | */ | 132 | */ |
166 | unsigned int mutations_size; | 133 | unsigned int generation_added; |
167 | 134 | ||
168 | /** | 135 | /** |
169 | * #GNUNET_YES if the element is a remote element, and does not belong | 136 | * #GNUNET_YES if the element is a remote element, and does not belong |
@@ -285,7 +252,13 @@ struct Operation | |||
285 | /** | 252 | /** |
286 | * When are elements sent to the client, and which elements are sent? | 253 | * When are elements sent to the client, and which elements are sent? |
287 | */ | 254 | */ |
288 | enum GNUNET_SET_ResultMode result_mode; | 255 | int return_intersection; |
256 | |||
257 | /** | ||
258 | * Lower bound for the set size, used only when | ||
259 | * byzantine mode is enabled. | ||
260 | */ | ||
261 | int byzantine_lower_bound; | ||
289 | 262 | ||
290 | /** | 263 | /** |
291 | * Always use delta operation instead of sending full sets, | 264 | * Always use delta operation instead of sending full sets, |
@@ -306,12 +279,6 @@ struct Operation | |||
306 | int byzantine; | 279 | int byzantine; |
307 | 280 | ||
308 | /** | 281 | /** |
309 | * Lower bound for the set size, used only when | ||
310 | * byzantine mode is enabled. | ||
311 | */ | ||
312 | int byzantine_lower_bound; | ||
313 | |||
314 | /** | ||
315 | * Unique request id for the request from a remote peer, sent to the | 282 | * Unique request id for the request from a remote peer, sent to the |
316 | * client, which will accept or reject the request. Set to '0' iff | 283 | * client, which will accept or reject the request. Set to '0' iff |
317 | * the request has not been suggested yet. | 284 | * the request has not been suggested yet. |
@@ -319,8 +286,7 @@ struct Operation | |||
319 | uint32_t suggest_id; | 286 | uint32_t suggest_id; |
320 | 287 | ||
321 | /** | 288 | /** |
322 | * Generation in which the operation handle | 289 | * Generation in which the operation handle was created. |
323 | * was created. | ||
324 | */ | 290 | */ |
325 | unsigned int generation_created; | 291 | unsigned int generation_created; |
326 | }; | 292 | }; |
@@ -338,20 +304,6 @@ struct SetContent | |||
338 | struct GNUNET_CONTAINER_MultiHashMap *elements; | 304 | struct GNUNET_CONTAINER_MultiHashMap *elements; |
339 | 305 | ||
340 | /** | 306 | /** |
341 | * Mutations requested by the client that we're | ||
342 | * unable to execute right now because we're iterating | ||
343 | * over the underlying hash map of elements. | ||
344 | */ | ||
345 | struct PendingMutation *pending_mutations_head; | ||
346 | |||
347 | /** | ||
348 | * Mutations requested by the client that we're | ||
349 | * unable to execute right now because we're iterating | ||
350 | * over the underlying hash map of elements. | ||
351 | */ | ||
352 | struct PendingMutation *pending_mutations_tail; | ||
353 | |||
354 | /** | ||
355 | * Number of references to the content. | 307 | * Number of references to the content. |
356 | */ | 308 | */ |
357 | unsigned int refcount; | 309 | unsigned int refcount; |
@@ -368,49 +320,6 @@ struct SetContent | |||
368 | }; | 320 | }; |
369 | 321 | ||
370 | 322 | ||
371 | struct GenerationRange | ||
372 | { | ||
373 | /** | ||
374 | * First generation that is excluded. | ||
375 | */ | ||
376 | unsigned int start; | ||
377 | |||
378 | /** | ||
379 | * Generation after the last excluded generation. | ||
380 | */ | ||
381 | unsigned int end; | ||
382 | }; | ||
383 | |||
384 | |||
385 | /** | ||
386 | * Information about a mutation to apply to a set. | ||
387 | */ | ||
388 | struct PendingMutation | ||
389 | { | ||
390 | /** | ||
391 | * Mutations are kept in a DLL. | ||
392 | */ | ||
393 | struct PendingMutation *prev; | ||
394 | |||
395 | /** | ||
396 | * Mutations are kept in a DLL. | ||
397 | */ | ||
398 | struct PendingMutation *next; | ||
399 | |||
400 | /** | ||
401 | * Set this mutation is about. | ||
402 | */ | ||
403 | struct Set *set; | ||
404 | |||
405 | /** | ||
406 | * Message that describes the desired mutation. | ||
407 | * May only be a #GNUNET_MESSAGE_TYPE_SET_ADD or | ||
408 | * #GNUNET_MESSAGE_TYPE_SET_REMOVE. | ||
409 | */ | ||
410 | struct GNUNET_SET_ElementMessage *msg; | ||
411 | }; | ||
412 | |||
413 | |||
414 | /** | 323 | /** |
415 | * A set that supports a specific operation with other peers. | 324 | * A set that supports a specific operation with other peers. |
416 | */ | 325 | */ |
@@ -444,12 +353,6 @@ struct Set | |||
444 | struct SetState *state; | 353 | struct SetState *state; |
445 | 354 | ||
446 | /** | 355 | /** |
447 | * Current state of iterating elements for the client. | ||
448 | * NULL if we are not currently iterating. | ||
449 | */ | ||
450 | struct GNUNET_CONTAINER_MultiHashMapIterator *iter; | ||
451 | |||
452 | /** | ||
453 | * Evaluate operations are held in a linked list. | 356 | * Evaluate operations are held in a linked list. |
454 | */ | 357 | */ |
455 | struct Operation *ops_head; | 358 | struct Operation *ops_head; |
@@ -460,36 +363,11 @@ struct Set | |||
460 | struct Operation *ops_tail; | 363 | struct Operation *ops_tail; |
461 | 364 | ||
462 | /** | 365 | /** |
463 | * List of generations we have to exclude, due to lazy copies. | ||
464 | */ | ||
465 | struct GenerationRange *excluded_generations; | ||
466 | |||
467 | /** | ||
468 | * Current generation, that is, number of previously executed | 366 | * Current generation, that is, number of previously executed |
469 | * operations and lazy copies on the underlying set content. | 367 | * operations and lazy copies on the underlying set content. |
470 | */ | 368 | */ |
471 | unsigned int current_generation; | 369 | unsigned int current_generation; |
472 | 370 | ||
473 | /** | ||
474 | * Number of elements in array @a excluded_generations. | ||
475 | */ | ||
476 | unsigned int excluded_generations_size; | ||
477 | |||
478 | /** | ||
479 | * Type of operation supported for this set | ||
480 | */ | ||
481 | enum GNUNET_SET_OperationType operation; | ||
482 | |||
483 | /** | ||
484 | * Generation we're currently iteration over. | ||
485 | */ | ||
486 | unsigned int iter_generation; | ||
487 | |||
488 | /** | ||
489 | * Each @e iter is assigned a unique number, so that the client | ||
490 | * can distinguish iterations. | ||
491 | */ | ||
492 | uint16_t iteration_id; | ||
493 | }; | 371 | }; |
494 | 372 | ||
495 | 373 | ||
@@ -724,7 +602,7 @@ send_client_removed_element (struct Operation *op, | |||
724 | struct GNUNET_MQ_Envelope *ev; | 602 | struct GNUNET_MQ_Envelope *ev; |
725 | struct GNUNET_SET_ResultMessage *rm; | 603 | struct GNUNET_SET_ResultMessage *rm; |
726 | 604 | ||
727 | if (GNUNET_SET_RESULT_REMOVED != op->result_mode) | 605 | if (GNUNET_NO != op->return_intersection) |
728 | return; /* Wrong mode for transmitting removed elements */ | 606 | return; /* Wrong mode for transmitting removed elements */ |
729 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 607 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
730 | "Sending removed element (size %u) to client\n", | 608 | "Sending removed element (size %u) to client\n", |
@@ -736,13 +614,13 @@ send_client_removed_element (struct Operation *op, | |||
736 | GNUNET_assert (0 != op->client_request_id); | 614 | GNUNET_assert (0 != op->client_request_id); |
737 | ev = GNUNET_MQ_msg_extra (rm, | 615 | ev = GNUNET_MQ_msg_extra (rm, |
738 | element->size, | 616 | element->size, |
739 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 617 | GNUNET_MESSAGE_TYPE_SETI_RESULT); |
740 | if (NULL == ev) | 618 | if (NULL == ev) |
741 | { | 619 | { |
742 | GNUNET_break (0); | 620 | GNUNET_break (0); |
743 | return; | 621 | return; |
744 | } | 622 | } |
745 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 623 | rm->result_status = htons (GNUNET_SET_STATUS_DEL_LOCAL); |
746 | rm->request_id = htonl (op->client_request_id); | 624 | rm->request_id = htonl (op->client_request_id); |
747 | rm->element_type = element->element_type; | 625 | rm->element_type = element->element_type; |
748 | GNUNET_memcpy (&rm[1], | 626 | GNUNET_memcpy (&rm[1], |
@@ -770,7 +648,6 @@ filtered_map_initialization (void *cls, | |||
770 | struct ElementEntry *ee = value; | 648 | struct ElementEntry *ee = value; |
771 | struct GNUNET_HashCode mutated_hash; | 649 | struct GNUNET_HashCode mutated_hash; |
772 | 650 | ||
773 | |||
774 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 651 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
775 | "FIMA called for %s:%u\n", | 652 | "FIMA called for %s:%u\n", |
776 | GNUNET_h2s (&ee->element_hash), | 653 | GNUNET_h2s (&ee->element_hash), |
@@ -934,8 +811,8 @@ fail_intersection_operation (struct Operation *op) | |||
934 | op->state->my_elements = NULL; | 811 | op->state->my_elements = NULL; |
935 | } | 812 | } |
936 | ev = GNUNET_MQ_msg (msg, | 813 | ev = GNUNET_MQ_msg (msg, |
937 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 814 | GNUNET_MESSAGE_TYPE_SETI_RESULT); |
938 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 815 | msg->result_status = htons (GNUNET_SETI_STATUS_FAILURE); |
939 | msg->request_id = htonl (op->client_request_id); | 816 | msg->request_id = htonl (op->client_request_id); |
940 | msg->element_type = htons (0); | 817 | msg->element_type = htons (0); |
941 | GNUNET_MQ_send (op->set->cs->mq, | 818 | GNUNET_MQ_send (op->set->cs->mq, |
@@ -999,7 +876,7 @@ send_bloomfilter (struct Operation *op) | |||
999 | chunk_size = bf_size; | 876 | chunk_size = bf_size; |
1000 | ev = GNUNET_MQ_msg_extra (msg, | 877 | ev = GNUNET_MQ_msg_extra (msg, |
1001 | chunk_size, | 878 | chunk_size, |
1002 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); | 879 | GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_BF); |
1003 | GNUNET_assert (GNUNET_SYSERR != | 880 | GNUNET_assert (GNUNET_SYSERR != |
1004 | GNUNET_CONTAINER_bloomfilter_get_raw_data ( | 881 | GNUNET_CONTAINER_bloomfilter_get_raw_data ( |
1005 | op->state->local_bf, | 882 | op->state->local_bf, |
@@ -1028,7 +905,7 @@ send_bloomfilter (struct Operation *op) | |||
1028 | chunk_size = bf_size - offset; | 905 | chunk_size = bf_size - offset; |
1029 | ev = GNUNET_MQ_msg_extra (msg, | 906 | ev = GNUNET_MQ_msg_extra (msg, |
1030 | chunk_size, | 907 | chunk_size, |
1031 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); | 908 | GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_BF); |
1032 | GNUNET_memcpy (&msg[1], | 909 | GNUNET_memcpy (&msg[1], |
1033 | &bf_data[offset], | 910 | &bf_data[offset], |
1034 | chunk_size); | 911 | chunk_size); |
@@ -1067,7 +944,7 @@ send_client_done_and_destroy (void *cls) | |||
1067 | 1, | 944 | 1, |
1068 | GNUNET_NO); | 945 | GNUNET_NO); |
1069 | ev = GNUNET_MQ_msg (rm, | 946 | ev = GNUNET_MQ_msg (rm, |
1070 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 947 | GNUNET_MESSAGE_TYPE_SETI_RESULT); |
1071 | rm->request_id = htonl (op->client_request_id); | 948 | rm->request_id = htonl (op->client_request_id); |
1072 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 949 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
1073 | rm->element_type = htons (0); | 950 | rm->element_type = htons (0); |
@@ -1114,7 +991,7 @@ send_p2p_done (struct Operation *op) | |||
1114 | GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase); | 991 | GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase); |
1115 | GNUNET_assert (GNUNET_NO == op->state->channel_death_expected); | 992 | GNUNET_assert (GNUNET_NO == op->state->channel_death_expected); |
1116 | ev = GNUNET_MQ_msg (idm, | 993 | ev = GNUNET_MQ_msg (idm, |
1117 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE); | 994 | GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_DONE); |
1118 | idm->final_element_count = htonl (op->state->my_element_count); | 995 | idm->final_element_count = htonl (op->state->my_element_count); |
1119 | idm->element_xor_hash = op->state->my_xor; | 996 | idm->element_xor_hash = op->state->my_xor; |
1120 | GNUNET_MQ_notify_sent (ev, | 997 | GNUNET_MQ_notify_sent (ev, |
@@ -1176,7 +1053,7 @@ send_remaining_elements (void *cls) | |||
1176 | GNUNET_assert (0 != op->client_request_id); | 1053 | GNUNET_assert (0 != op->client_request_id); |
1177 | ev = GNUNET_MQ_msg_extra (rm, | 1054 | ev = GNUNET_MQ_msg_extra (rm, |
1178 | element->size, | 1055 | element->size, |
1179 | GNUNET_MESSAGE_TYPE_SET_RESULT); | 1056 | GNUNET_MESSAGE_TYPE_SETI_RESULT); |
1180 | GNUNET_assert (NULL != ev); | 1057 | GNUNET_assert (NULL != ev); |
1181 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 1058 | rm->result_status = htons (GNUNET_SET_STATUS_OK); |
1182 | rm->request_id = htonl (op->client_request_id); | 1059 | rm->request_id = htonl (op->client_request_id); |
@@ -1243,7 +1120,7 @@ send_element_count (struct Operation *op) | |||
1243 | "Sending our element count (%u)\n", | 1120 | "Sending our element count (%u)\n", |
1244 | op->state->my_element_count); | 1121 | op->state->my_element_count); |
1245 | ev = GNUNET_MQ_msg (msg, | 1122 | ev = GNUNET_MQ_msg (msg, |
1246 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); | 1123 | GNUNET_MESSAGE_TYPE_SETI_INTERSECTION_P2P_ELEMENT_INFO); |
1247 | msg->sender_element_count = htonl (op->state->my_element_count); | 1124 | msg->sender_element_count = htonl (op->state->my_element_count); |
1248 | GNUNET_MQ_send (op->mq, ev); | 1125 | GNUNET_MQ_send (op->mq, ev); |
1249 | } | 1126 | } |
@@ -1273,7 +1150,7 @@ begin_bf_exchange (struct Operation *op) | |||
1273 | * @param cls the intersection operation | 1150 | * @param cls the intersection operation |
1274 | * @param mh the header of the message | 1151 | * @param mh the header of the message |
1275 | */ | 1152 | */ |
1276 | void | 1153 | static void |
1277 | handle_intersection_p2p_element_info (void *cls, | 1154 | handle_intersection_p2p_element_info (void *cls, |
1278 | const struct | 1155 | const struct |
1279 | IntersectionElementInfoMessage *msg) | 1156 | IntersectionElementInfoMessage *msg) |
@@ -1327,7 +1204,6 @@ process_bf (struct Operation *op) | |||
1327 | GNUNET_break_op (0); | 1204 | GNUNET_break_op (0); |
1328 | fail_intersection_operation (op); | 1205 | fail_intersection_operation (op); |
1329 | return; | 1206 | return; |
1330 | |||
1331 | case PHASE_COUNT_SENT: | 1207 | case PHASE_COUNT_SENT: |
1332 | /* This is the first BF being sent, build our initial map with | 1208 | /* This is the first BF being sent, build our initial map with |
1333 | filtering in place */ | 1209 | filtering in place */ |
@@ -1336,24 +1212,20 @@ process_bf (struct Operation *op) | |||
1336 | &filtered_map_initialization, | 1212 | &filtered_map_initialization, |
1337 | op); | 1213 | op); |
1338 | break; | 1214 | break; |
1339 | |||
1340 | case PHASE_BF_EXCHANGE: | 1215 | case PHASE_BF_EXCHANGE: |
1341 | /* Update our set by reduction */ | 1216 | /* Update our set by reduction */ |
1342 | GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, | 1217 | GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements, |
1343 | &iterator_bf_reduce, | 1218 | &iterator_bf_reduce, |
1344 | op); | 1219 | op); |
1345 | break; | 1220 | break; |
1346 | |||
1347 | case PHASE_MUST_SEND_DONE: | 1221 | case PHASE_MUST_SEND_DONE: |
1348 | GNUNET_break_op (0); | 1222 | GNUNET_break_op (0); |
1349 | fail_intersection_operation (op); | 1223 | fail_intersection_operation (op); |
1350 | return; | 1224 | return; |
1351 | |||
1352 | case PHASE_DONE_RECEIVED: | 1225 | case PHASE_DONE_RECEIVED: |
1353 | GNUNET_break_op (0); | 1226 | GNUNET_break_op (0); |
1354 | fail_intersection_operation (op); | 1227 | fail_intersection_operation (op); |
1355 | return; | 1228 | return; |
1356 | |||
1357 | case PHASE_FINISHED: | 1229 | case PHASE_FINISHED: |
1358 | GNUNET_break_op (0); | 1230 | GNUNET_break_op (0); |
1359 | fail_intersection_operation (op); | 1231 | fail_intersection_operation (op); |
@@ -1420,7 +1292,7 @@ check_intersection_p2p_bf (void *cls, | |||
1420 | * @param cls the intersection operation | 1292 | * @param cls the intersection operation |
1421 | * @param msg the header of the message | 1293 | * @param msg the header of the message |
1422 | */ | 1294 | */ |
1423 | static | 1295 | static void |
1424 | handle_intersection_p2p_bf (void *cls, | 1296 | handle_intersection_p2p_bf (void *cls, |
1425 | const struct BFMessage *msg) | 1297 | const struct BFMessage *msg) |
1426 | { | 1298 | { |
@@ -1613,214 +1485,6 @@ handle_intersection_p2p_done (void *cls, | |||
1613 | 1485 | ||
1614 | 1486 | ||
1615 | /** | 1487 | /** |
1616 | * Initiate a set intersection operation with a remote peer. | ||
1617 | * | ||
1618 | * @param op operation that is created, should be initialized to | ||
1619 | * begin the evaluation | ||
1620 | * @param opaque_context message to be transmitted to the listener | ||
1621 | * to convince it to accept, may be NULL | ||
1622 | * @return operation-specific state to keep in @a op | ||
1623 | */ | ||
1624 | static struct OperationState * | ||
1625 | intersection_evaluate (struct Operation *op, | ||
1626 | const struct GNUNET_MessageHeader *opaque_context) | ||
1627 | { | ||
1628 | struct OperationState *state; | ||
1629 | struct GNUNET_MQ_Envelope *ev; | ||
1630 | struct OperationRequestMessage *msg; | ||
1631 | |||
1632 | ev = GNUNET_MQ_msg_nested_mh (msg, | ||
1633 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | ||
1634 | opaque_context); | ||
1635 | if (NULL == ev) | ||
1636 | { | ||
1637 | /* the context message is too large!? */ | ||
1638 | GNUNET_break (0); | ||
1639 | return NULL; | ||
1640 | } | ||
1641 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1642 | "Initiating intersection operation evaluation\n"); | ||
1643 | state = GNUNET_new (struct OperationState); | ||
1644 | /* we started the operation, thus we have to send the operation request */ | ||
1645 | state->phase = PHASE_INITIAL; | ||
1646 | state->my_element_count = op->set->state->current_set_element_count; | ||
1647 | state->my_elements | ||
1648 | = GNUNET_CONTAINER_multihashmap_create (state->my_element_count, | ||
1649 | GNUNET_YES); | ||
1650 | |||
1651 | msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); | ||
1652 | msg->element_count = htonl (state->my_element_count); | ||
1653 | GNUNET_MQ_send (op->mq, | ||
1654 | ev); | ||
1655 | state->phase = PHASE_COUNT_SENT; | ||
1656 | if (NULL != opaque_context) | ||
1657 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1658 | "Sent op request with context message\n"); | ||
1659 | else | ||
1660 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1661 | "Sent op request without context message\n"); | ||
1662 | return state; | ||
1663 | } | ||
1664 | |||
1665 | |||
1666 | /** | ||
1667 | * Accept an intersection operation request from a remote peer. Only | ||
1668 | * initializes the private operation state. | ||
1669 | * | ||
1670 | * @param op operation that will be accepted as an intersection operation | ||
1671 | */ | ||
1672 | static struct OperationState * | ||
1673 | intersection_accept (struct Operation *op) | ||
1674 | { | ||
1675 | struct OperationState *state; | ||
1676 | |||
1677 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1678 | "Accepting set intersection operation\n"); | ||
1679 | state = GNUNET_new (struct OperationState); | ||
1680 | state->phase = PHASE_INITIAL; | ||
1681 | state->my_element_count | ||
1682 | = op->set->state->current_set_element_count; | ||
1683 | state->my_elements | ||
1684 | = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count, | ||
1685 | op->remote_element_count), | ||
1686 | GNUNET_YES); | ||
1687 | op->state = state; | ||
1688 | if (op->remote_element_count < state->my_element_count) | ||
1689 | { | ||
1690 | /* If the other peer (Alice) has fewer elements than us (Bob), | ||
1691 | we just send the count as Alice should send the first BF */ | ||
1692 | send_element_count (op); | ||
1693 | state->phase = PHASE_COUNT_SENT; | ||
1694 | return state; | ||
1695 | } | ||
1696 | /* We have fewer elements, so we start with the BF */ | ||
1697 | begin_bf_exchange (op); | ||
1698 | return state; | ||
1699 | } | ||
1700 | |||
1701 | |||
1702 | /** | ||
1703 | * Destroy the intersection operation. Only things specific to the | ||
1704 | * intersection operation are destroyed. | ||
1705 | * | ||
1706 | * @param op intersection operation to destroy | ||
1707 | */ | ||
1708 | static void | ||
1709 | intersection_op_cancel (struct Operation *op) | ||
1710 | { | ||
1711 | /* check if the op was canceled twice */ | ||
1712 | GNUNET_assert (NULL != op->state); | ||
1713 | if (NULL != op->state->remote_bf) | ||
1714 | { | ||
1715 | GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); | ||
1716 | op->state->remote_bf = NULL; | ||
1717 | } | ||
1718 | if (NULL != op->state->local_bf) | ||
1719 | { | ||
1720 | GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); | ||
1721 | op->state->local_bf = NULL; | ||
1722 | } | ||
1723 | if (NULL != op->state->my_elements) | ||
1724 | { | ||
1725 | GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); | ||
1726 | op->state->my_elements = NULL; | ||
1727 | } | ||
1728 | if (NULL != op->state->full_result_iter) | ||
1729 | { | ||
1730 | GNUNET_CONTAINER_multihashmap_iterator_destroy ( | ||
1731 | op->state->full_result_iter); | ||
1732 | op->state->full_result_iter = NULL; | ||
1733 | } | ||
1734 | GNUNET_free (op->state); | ||
1735 | op->state = NULL; | ||
1736 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1737 | "Destroying intersection op state done\n"); | ||
1738 | } | ||
1739 | |||
1740 | |||
1741 | /** | ||
1742 | * Create a new set supporting the intersection operation. | ||
1743 | * | ||
1744 | * @return the newly created set | ||
1745 | */ | ||
1746 | static struct SetState * | ||
1747 | intersection_set_create () | ||
1748 | { | ||
1749 | struct SetState *set_state; | ||
1750 | |||
1751 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1752 | "Intersection set created\n"); | ||
1753 | set_state = GNUNET_new (struct SetState); | ||
1754 | set_state->current_set_element_count = 0; | ||
1755 | |||
1756 | return set_state; | ||
1757 | } | ||
1758 | |||
1759 | |||
1760 | /** | ||
1761 | * Add the element from the given element message to the set. | ||
1762 | * | ||
1763 | * @param set_state state of the set want to add to | ||
1764 | * @param ee the element to add to the set | ||
1765 | */ | ||
1766 | static void | ||
1767 | intersection_add (struct SetState *set_state, | ||
1768 | struct ElementEntry *ee) | ||
1769 | { | ||
1770 | set_state->current_set_element_count++; | ||
1771 | } | ||
1772 | |||
1773 | |||
1774 | /** | ||
1775 | * Destroy a set that supports the intersection operation | ||
1776 | * | ||
1777 | * @param set_state the set to destroy | ||
1778 | */ | ||
1779 | static void | ||
1780 | intersection_set_destroy (struct SetState *set_state) | ||
1781 | { | ||
1782 | GNUNET_free (set_state); | ||
1783 | } | ||
1784 | |||
1785 | |||
1786 | /** | ||
1787 | * Remove the element given in the element message from the set. | ||
1788 | * | ||
1789 | * @param set_state state of the set to remove from | ||
1790 | * @param element set element to remove | ||
1791 | */ | ||
1792 | static void | ||
1793 | intersection_remove (struct SetState *set_state, | ||
1794 | struct ElementEntry *element) | ||
1795 | { | ||
1796 | GNUNET_assert (0 < set_state->current_set_element_count); | ||
1797 | set_state->current_set_element_count--; | ||
1798 | } | ||
1799 | |||
1800 | |||
1801 | /** | ||
1802 | * Callback for channel death for the intersection operation. | ||
1803 | * | ||
1804 | * @param op operation that lost the channel | ||
1805 | */ | ||
1806 | static void | ||
1807 | intersection_channel_death (struct Operation *op) | ||
1808 | { | ||
1809 | if (GNUNET_YES == op->state->channel_death_expected) | ||
1810 | { | ||
1811 | /* oh goodie, we are done! */ | ||
1812 | send_client_done_and_destroy (op); | ||
1813 | } | ||
1814 | else | ||
1815 | { | ||
1816 | /* sorry, channel went down early, too bad. */ | ||
1817 | _GSS_operation_destroy (op, | ||
1818 | GNUNET_YES); | ||
1819 | } | ||
1820 | } | ||
1821 | |||
1822 | |||
1823 | /** | ||
1824 | * Get the incoming socket associated with the given id. | 1488 | * Get the incoming socket associated with the given id. |
1825 | * | 1489 | * |
1826 | * @param listener the listener to look in | 1490 | * @param listener the listener to look in |
@@ -1870,184 +1534,17 @@ incoming_destroy (struct Operation *op) | |||
1870 | 1534 | ||
1871 | 1535 | ||
1872 | /** | 1536 | /** |
1873 | * Context for the #garbage_collect_cb(). | ||
1874 | */ | ||
1875 | struct GarbageContext | ||
1876 | { | ||
1877 | /** | ||
1878 | * Map for which we are garbage collecting removed elements. | ||
1879 | */ | ||
1880 | struct GNUNET_CONTAINER_MultiHashMap *map; | ||
1881 | |||
1882 | /** | ||
1883 | * Lowest generation for which an operation is still pending. | ||
1884 | */ | ||
1885 | unsigned int min_op_generation; | ||
1886 | |||
1887 | /** | ||
1888 | * Largest generation for which an operation is still pending. | ||
1889 | */ | ||
1890 | unsigned int max_op_generation; | ||
1891 | }; | ||
1892 | |||
1893 | |||
1894 | /** | ||
1895 | * Function invoked to check if an element can be removed from | ||
1896 | * the set's history because it is no longer needed. | ||
1897 | * | ||
1898 | * @param cls the `struct GarbageContext *` | ||
1899 | * @param key key of the element in the map | ||
1900 | * @param value the `struct ElementEntry *` | ||
1901 | * @return #GNUNET_OK (continue to iterate) | ||
1902 | */ | ||
1903 | static int | ||
1904 | garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value) | ||
1905 | { | ||
1906 | // struct GarbageContext *gc = cls; | ||
1907 | // struct ElementEntry *ee = value; | ||
1908 | |||
1909 | // if (GNUNET_YES != ee->removed) | ||
1910 | // return GNUNET_OK; | ||
1911 | // if ( (gc->max_op_generation < ee->generation_added) || | ||
1912 | // (ee->generation_removed > gc->min_op_generation) ) | ||
1913 | // { | ||
1914 | // GNUNET_assert (GNUNET_YES == | ||
1915 | // GNUNET_CONTAINER_multihashmap_remove (gc->map, | ||
1916 | // key, | ||
1917 | // ee)); | ||
1918 | // GNUNET_free (ee); | ||
1919 | // } | ||
1920 | return GNUNET_OK; | ||
1921 | } | ||
1922 | |||
1923 | |||
1924 | /** | ||
1925 | * Collect and destroy elements that are not needed anymore, because | ||
1926 | * their lifetime (as determined by their generation) does not overlap | ||
1927 | * with any active set operation. | ||
1928 | * | ||
1929 | * @param set set to garbage collect | ||
1930 | */ | ||
1931 | static void | ||
1932 | collect_generation_garbage (struct Set *set) | ||
1933 | { | ||
1934 | struct GarbageContext gc; | ||
1935 | |||
1936 | gc.min_op_generation = UINT_MAX; | ||
1937 | gc.max_op_generation = 0; | ||
1938 | for (struct Operation *op = set->ops_head; NULL != op; op = op->next) | ||
1939 | { | ||
1940 | gc.min_op_generation = | ||
1941 | GNUNET_MIN (gc.min_op_generation, op->generation_created); | ||
1942 | gc.max_op_generation = | ||
1943 | GNUNET_MAX (gc.max_op_generation, op->generation_created); | ||
1944 | } | ||
1945 | gc.map = set->content->elements; | ||
1946 | GNUNET_CONTAINER_multihashmap_iterate (set->content->elements, | ||
1947 | &garbage_collect_cb, | ||
1948 | &gc); | ||
1949 | } | ||
1950 | |||
1951 | |||
1952 | /** | ||
1953 | * Is @a generation in the range of exclusions? | ||
1954 | * | ||
1955 | * @param generation generation to query | ||
1956 | * @param excluded array of generations where the element is excluded | ||
1957 | * @param excluded_size length of the @a excluded array | ||
1958 | * @return #GNUNET_YES if @a generation is in any of the ranges | ||
1959 | */ | ||
1960 | static int | ||
1961 | is_excluded_generation (unsigned int generation, | ||
1962 | struct GenerationRange *excluded, | ||
1963 | unsigned int excluded_size) | ||
1964 | { | ||
1965 | for (unsigned int i = 0; i < excluded_size; i++) | ||
1966 | if ((generation >= excluded[i].start) && (generation < excluded[i].end)) | ||
1967 | return GNUNET_YES; | ||
1968 | return GNUNET_NO; | ||
1969 | } | ||
1970 | |||
1971 | |||
1972 | /** | ||
1973 | * Is element @a ee part of the set during @a query_generation? | ||
1974 | * | ||
1975 | * @param ee element to test | ||
1976 | * @param query_generation generation to query | ||
1977 | * @param excluded array of generations where the element is excluded | ||
1978 | * @param excluded_size length of the @a excluded array | ||
1979 | * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not | ||
1980 | */ | ||
1981 | static int | ||
1982 | is_element_of_generation (struct ElementEntry *ee, | ||
1983 | unsigned int query_generation, | ||
1984 | struct GenerationRange *excluded, | ||
1985 | unsigned int excluded_size) | ||
1986 | { | ||
1987 | struct MutationEvent *mut; | ||
1988 | int is_present; | ||
1989 | |||
1990 | GNUNET_assert (NULL != ee->mutations); | ||
1991 | if (GNUNET_YES == | ||
1992 | is_excluded_generation (query_generation, excluded, excluded_size)) | ||
1993 | { | ||
1994 | GNUNET_break (0); | ||
1995 | return GNUNET_NO; | ||
1996 | } | ||
1997 | |||
1998 | is_present = GNUNET_NO; | ||
1999 | |||
2000 | /* Could be made faster with binary search, but lists | ||
2001 | are small, so why bother. */ | ||
2002 | for (unsigned int i = 0; i < ee->mutations_size; i++) | ||
2003 | { | ||
2004 | mut = &ee->mutations[i]; | ||
2005 | |||
2006 | if (mut->generation > query_generation) | ||
2007 | { | ||
2008 | /* The mutation doesn't apply to our generation | ||
2009 | anymore. We can'b break here, since mutations aren't | ||
2010 | sorted by generation. */ | ||
2011 | continue; | ||
2012 | } | ||
2013 | |||
2014 | if (GNUNET_YES == | ||
2015 | is_excluded_generation (mut->generation, excluded, excluded_size)) | ||
2016 | { | ||
2017 | /* The generation is excluded (because it belongs to another | ||
2018 | fork via a lazy copy) and thus mutations aren't considered | ||
2019 | for membership testing. */ | ||
2020 | continue; | ||
2021 | } | ||
2022 | |||
2023 | /* This would be an inconsistency in how we manage mutations. */ | ||
2024 | if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added)) | ||
2025 | GNUNET_assert (0); | ||
2026 | /* Likewise. */ | ||
2027 | if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added)) | ||
2028 | GNUNET_assert (0); | ||
2029 | |||
2030 | is_present = mut->added; | ||
2031 | } | ||
2032 | |||
2033 | return is_present; | ||
2034 | } | ||
2035 | |||
2036 | |||
2037 | /** | ||
2038 | * Is element @a ee part of the set used by @a op? | 1537 | * Is element @a ee part of the set used by @a op? |
2039 | * | 1538 | * |
2040 | * @param ee element to test | 1539 | * @param ee element to test |
2041 | * @param op operation the defines the set and its generation | 1540 | * @param op operation the defines the set and its generation |
2042 | * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not | 1541 | * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not |
2043 | */ | 1542 | */ |
2044 | int | 1543 | static int |
2045 | _GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op) | 1544 | _GSS_is_element_of_operation (struct ElementEntry *ee, |
1545 | struct Operation *op) | ||
2046 | { | 1546 | { |
2047 | return is_element_of_generation (ee, | 1547 | return op->generation_created >= ee->generation_added; |
2048 | op->generation_created, | ||
2049 | op->set->excluded_generations, | ||
2050 | op->set->excluded_generations_size); | ||
2051 | } | 1548 | } |
2052 | 1549 | ||
2053 | 1550 | ||
@@ -2062,10 +1559,9 @@ _GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op) | |||
2062 | * as there may be multiple operations per set. | 1559 | * as there may be multiple operations per set. |
2063 | * | 1560 | * |
2064 | * @param op operation to destroy | 1561 | * @param op operation to destroy |
2065 | * @param gc #GNUNET_YES to perform garbage collection on the set | ||
2066 | */ | 1562 | */ |
2067 | void | 1563 | static void |
2068 | _GSS_operation_destroy (struct Operation *op, int gc) | 1564 | _GSS_operation_destroy (struct Operation *op) |
2069 | { | 1565 | { |
2070 | struct Set *set = op->set; | 1566 | struct Set *set = op->set; |
2071 | struct GNUNET_CADET_Channel *channel; | 1567 | struct GNUNET_CADET_Channel *channel; |
@@ -2074,12 +1570,39 @@ _GSS_operation_destroy (struct Operation *op, int gc) | |||
2074 | GNUNET_assert (NULL == op->listener); | 1570 | GNUNET_assert (NULL == op->listener); |
2075 | if (NULL != op->state) | 1571 | if (NULL != op->state) |
2076 | { | 1572 | { |
2077 | intersection_cancel (op); // FIXME: inline | 1573 | /* check if the op was canceled twice */ |
1574 | GNUNET_assert (NULL != op->state); | ||
1575 | if (NULL != op->state->remote_bf) | ||
1576 | { | ||
1577 | GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); | ||
1578 | op->state->remote_bf = NULL; | ||
1579 | } | ||
1580 | if (NULL != op->state->local_bf) | ||
1581 | { | ||
1582 | GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); | ||
1583 | op->state->local_bf = NULL; | ||
1584 | } | ||
1585 | if (NULL != op->state->my_elements) | ||
1586 | { | ||
1587 | GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); | ||
1588 | op->state->my_elements = NULL; | ||
1589 | } | ||
1590 | if (NULL != op->state->full_result_iter) | ||
1591 | { | ||
1592 | GNUNET_CONTAINER_multihashmap_iterator_destroy ( | ||
1593 | op->state->full_result_iter); | ||
1594 | op->state->full_result_iter = NULL; | ||
1595 | } | ||
1596 | GNUNET_free (op->state); | ||
2078 | op->state = NULL; | 1597 | op->state = NULL; |
1598 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1599 | "Destroying intersection op state done\n"); | ||
2079 | } | 1600 | } |
2080 | if (NULL != set) | 1601 | if (NULL != set) |
2081 | { | 1602 | { |
2082 | GNUNET_CONTAINER_DLL_remove (set->ops_head, set->ops_tail, op); | 1603 | GNUNET_CONTAINER_DLL_remove (set->ops_head, |
1604 | set->ops_tail, | ||
1605 | op); | ||
2083 | op->set = NULL; | 1606 | op->set = NULL; |
2084 | } | 1607 | } |
2085 | if (NULL != op->context_msg) | 1608 | if (NULL != op->context_msg) |
@@ -2094,8 +1617,6 @@ _GSS_operation_destroy (struct Operation *op, int gc) | |||
2094 | op->channel = NULL; | 1617 | op->channel = NULL; |
2095 | GNUNET_CADET_channel_destroy (channel); | 1618 | GNUNET_CADET_channel_destroy (channel); |
2096 | } | 1619 | } |
2097 | if ((NULL != set) && (GNUNET_YES == gc)) | ||
2098 | collect_generation_garbage (set); | ||
2099 | /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, | 1620 | /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, |
2100 | * there was a channel end handler that will free 'op' on the call stack. */ | 1621 | * there was a channel end handler that will free 'op' on the call stack. */ |
2101 | } | 1622 | } |
@@ -2166,8 +1687,6 @@ client_disconnect_cb (void *cls, | |||
2166 | if (NULL != (set = cs->set)) | 1687 | if (NULL != (set = cs->set)) |
2167 | { | 1688 | { |
2168 | struct SetContent *content = set->content; | 1689 | struct SetContent *content = set->content; |
2169 | struct PendingMutation *pm; | ||
2170 | struct PendingMutation *pm_current; | ||
2171 | 1690 | ||
2172 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n"); | 1691 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n"); |
2173 | /* Destroy pending set operations */ | 1692 | /* Destroy pending set operations */ |
@@ -2176,8 +1695,7 @@ client_disconnect_cb (void *cls, | |||
2176 | 1695 | ||
2177 | /* Destroy operation-specific state */ | 1696 | /* Destroy operation-specific state */ |
2178 | GNUNET_assert (NULL != set->state); | 1697 | GNUNET_assert (NULL != set->state); |
2179 | intersection_set_destroy (set->state); // FIXME: inline | 1698 | GNUNET_free (set->state); |
2180 | set->state = NULL; | ||
2181 | 1699 | ||
2182 | /* Clean up ongoing iterations */ | 1700 | /* Clean up ongoing iterations */ |
2183 | if (NULL != set->iter) | 1701 | if (NULL != set->iter) |
@@ -2187,21 +1705,6 @@ client_disconnect_cb (void *cls, | |||
2187 | set->iteration_id++; | 1705 | set->iteration_id++; |
2188 | } | 1706 | } |
2189 | 1707 | ||
2190 | /* discard any pending mutations that reference this set */ | ||
2191 | pm = content->pending_mutations_head; | ||
2192 | while (NULL != pm) | ||
2193 | { | ||
2194 | pm_current = pm; | ||
2195 | pm = pm->next; | ||
2196 | if (pm_current->set == set) | ||
2197 | { | ||
2198 | GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head, | ||
2199 | content->pending_mutations_tail, | ||
2200 | pm_current); | ||
2201 | GNUNET_free (pm_current); | ||
2202 | } | ||
2203 | } | ||
2204 | |||
2205 | /* free set content (or at least decrement RC) */ | 1708 | /* free set content (or at least decrement RC) */ |
2206 | set->content = NULL; | 1709 | set->content = NULL; |
2207 | GNUNET_assert (0 != content->refcount); | 1710 | GNUNET_assert (0 != content->refcount); |
@@ -2260,7 +1763,8 @@ client_disconnect_cb (void *cls, | |||
2260 | * #GNUNET_SYSERR to destroy the channel | 1763 | * #GNUNET_SYSERR to destroy the channel |
2261 | */ | 1764 | */ |
2262 | static int | 1765 | static int |
2263 | check_incoming_msg (void *cls, const struct OperationRequestMessage *msg) | 1766 | check_incoming_msg (void *cls, |
1767 | const struct OperationRequestMessage *msg) | ||
2264 | { | 1768 | { |
2265 | struct Operation *op = cls; | 1769 | struct Operation *op = cls; |
2266 | struct Listener *listener = op->listener; | 1770 | struct Listener *listener = op->listener; |
@@ -2313,7 +1817,8 @@ check_incoming_msg (void *cls, const struct OperationRequestMessage *msg) | |||
2313 | * #GNUNET_SYSERR to destroy the channel | 1817 | * #GNUNET_SYSERR to destroy the channel |
2314 | */ | 1818 | */ |
2315 | static void | 1819 | static void |
2316 | handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg) | 1820 | handle_incoming_msg (void *cls, |
1821 | const struct OperationRequestMessage *msg) | ||
2317 | { | 1822 | { |
2318 | struct Operation *op = cls; | 1823 | struct Operation *op = cls; |
2319 | struct Listener *listener = op->listener; | 1824 | struct Listener *listener = op->listener; |
@@ -2358,121 +1863,6 @@ handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg) | |||
2358 | 1863 | ||
2359 | 1864 | ||
2360 | /** | 1865 | /** |
2361 | * Add an element to @a set as specified by @a msg | ||
2362 | * | ||
2363 | * @param set set to manipulate | ||
2364 | * @param msg message specifying the change | ||
2365 | */ | ||
2366 | static void | ||
2367 | execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg) | ||
2368 | { | ||
2369 | struct GNUNET_SET_Element el; | ||
2370 | struct ElementEntry *ee; | ||
2371 | struct GNUNET_HashCode hash; | ||
2372 | |||
2373 | GNUNET_assert (GNUNET_MESSAGE_TYPE_SETI_ADD == ntohs (msg->header.type)); | ||
2374 | el.size = ntohs (msg->header.size) - sizeof(*msg); | ||
2375 | el.data = &msg[1]; | ||
2376 | el.element_type = ntohs (msg->element_type); | ||
2377 | GNUNET_SET_element_hash (&el, &hash); | ||
2378 | ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash); | ||
2379 | if (NULL == ee) | ||
2380 | { | ||
2381 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2382 | "Client inserts element %s of size %u\n", | ||
2383 | GNUNET_h2s (&hash), | ||
2384 | el.size); | ||
2385 | ee = GNUNET_malloc (el.size + sizeof(*ee)); | ||
2386 | ee->element.size = el.size; | ||
2387 | GNUNET_memcpy (&ee[1], el.data, el.size); | ||
2388 | ee->element.data = &ee[1]; | ||
2389 | ee->element.element_type = el.element_type; | ||
2390 | ee->remote = GNUNET_NO; | ||
2391 | ee->mutations = NULL; | ||
2392 | ee->mutations_size = 0; | ||
2393 | ee->element_hash = hash; | ||
2394 | GNUNET_break (GNUNET_YES == | ||
2395 | GNUNET_CONTAINER_multihashmap_put ( | ||
2396 | set->content->elements, | ||
2397 | &ee->element_hash, | ||
2398 | ee, | ||
2399 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
2400 | } | ||
2401 | else if (GNUNET_YES == | ||
2402 | is_element_of_generation (ee, | ||
2403 | set->current_generation, | ||
2404 | set->excluded_generations, | ||
2405 | set->excluded_generations_size)) | ||
2406 | { | ||
2407 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2408 | "Client inserted element %s of size %u twice (ignored)\n", | ||
2409 | GNUNET_h2s (&hash), | ||
2410 | el.size); | ||
2411 | |||
2412 | /* same element inserted twice */ | ||
2413 | return; | ||
2414 | } | ||
2415 | |||
2416 | { | ||
2417 | struct MutationEvent mut = { .generation = set->current_generation, | ||
2418 | .added = GNUNET_YES }; | ||
2419 | GNUNET_array_append (ee->mutations, ee->mutations_size, mut); | ||
2420 | } | ||
2421 | // FIXME: inline | ||
2422 | intersection_add (set->state, | ||
2423 | ee); | ||
2424 | } | ||
2425 | |||
2426 | |||
2427 | /** | ||
2428 | * Perform a mutation on a set as specified by the @a msg | ||
2429 | * | ||
2430 | * @param set the set to mutate | ||
2431 | * @param msg specification of what to change | ||
2432 | */ | ||
2433 | static void | ||
2434 | execute_mutation (struct Set *set, const struct GNUNET_SET_ElementMessage *msg) | ||
2435 | { | ||
2436 | switch (ntohs (msg->header.type)) | ||
2437 | { | ||
2438 | case GNUNET_MESSAGE_TYPE_SETI_ADD: // FIXME: inline! | ||
2439 | execute_add (set, msg); | ||
2440 | break; | ||
2441 | default: | ||
2442 | GNUNET_break (0); | ||
2443 | } | ||
2444 | } | ||
2445 | |||
2446 | |||
2447 | /** | ||
2448 | * Execute mutations that were delayed on a set because of | ||
2449 | * pending operations. | ||
2450 | * | ||
2451 | * @param set the set to execute mutations on | ||
2452 | */ | ||
2453 | static void | ||
2454 | execute_delayed_mutations (struct Set *set) | ||
2455 | { | ||
2456 | struct PendingMutation *pm; | ||
2457 | |||
2458 | if (0 != set->content->iterator_count) | ||
2459 | return; /* still cannot do this */ | ||
2460 | while (NULL != (pm = set->content->pending_mutations_head)) | ||
2461 | { | ||
2462 | GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head, | ||
2463 | set->content->pending_mutations_tail, | ||
2464 | pm); | ||
2465 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2466 | "Executing pending mutation on %p.\n", | ||
2467 | pm->set); | ||
2468 | execute_mutation (pm->set, pm->msg); | ||
2469 | GNUNET_free (pm->msg); | ||
2470 | GNUNET_free (pm); | ||
2471 | } | ||
2472 | } | ||
2473 | |||
2474 | |||
2475 | /** | ||
2476 | * Send the next element of a set to the set's client. The next element is given by | 1866 | * Send the next element of a set to the set's client. The next element is given by |
2477 | * the set's current hashmap iterator. The set's iterator will be set to NULL if there | 1867 | * the set's current hashmap iterator. The set's iterator will be set to NULL if there |
2478 | * are no more elements in the set. The caller must ensure that the set's iterator is | 1868 | * are no more elements in the set. The caller must ensure that the set's iterator is |
@@ -2542,7 +1932,8 @@ send_client_element (struct Set *set) | |||
2542 | * @param m message sent by the client | 1932 | * @param m message sent by the client |
2543 | */ | 1933 | */ |
2544 | static void | 1934 | static void |
2545 | handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m) | 1935 | handle_client_iterate (void *cls, |
1936 | const struct GNUNET_MessageHeader *m) | ||
2546 | { | 1937 | { |
2547 | struct ClientState *cs = cls; | 1938 | struct ClientState *cs = cls; |
2548 | struct Set *set; | 1939 | struct Set *set; |
@@ -2584,7 +1975,8 @@ handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m) | |||
2584 | * @param m message sent by the client | 1975 | * @param m message sent by the client |
2585 | */ | 1976 | */ |
2586 | static void | 1977 | static void |
2587 | handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg) | 1978 | handle_client_create_set (void *cls, |
1979 | const struct GNUNET_SET_CreateMessage *msg) | ||
2588 | { | 1980 | { |
2589 | struct ClientState *cs = cls; | 1981 | struct ClientState *cs = cls; |
2590 | struct Set *set; | 1982 | struct Set *set; |
@@ -2600,24 +1992,18 @@ handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg) | |||
2600 | return; | 1992 | return; |
2601 | } | 1993 | } |
2602 | set = GNUNET_new (struct Set); | 1994 | set = GNUNET_new (struct Set); |
2603 | switch (ntohl (msg->operation)) | ||
2604 | { | 1995 | { |
2605 | case GNUNET_SET_OPERATION_INTERSECTION: | 1996 | struct SetState *set_state; |
2606 | set->vt = _GSS_intersection_vt (); | ||
2607 | break; | ||
2608 | 1997 | ||
2609 | case GNUNET_SET_OPERATION_UNION: | 1998 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2610 | set->vt = _GSS_union_vt (); | 1999 | "Intersection set created\n"); |
2611 | break; | 2000 | set_state = GNUNET_new (struct SetState); |
2001 | set_state->current_set_element_count = 0; | ||
2612 | 2002 | ||
2613 | default: | 2003 | set->state = set_state; |
2614 | GNUNET_free (set); | ||
2615 | GNUNET_break (0); | ||
2616 | GNUNET_SERVICE_client_drop (cs->client); | ||
2617 | return; | ||
2618 | } | 2004 | } |
2619 | set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); | 2005 | |
2620 | set->state = intersection_set_create (); // FIXME: inline | 2006 | |
2621 | if (NULL == set->state) | 2007 | if (NULL == set->state) |
2622 | { | 2008 | { |
2623 | /* initialization failed (i.e. out of memory) */ | 2009 | /* initialization failed (i.e. out of memory) */ |
@@ -2627,7 +2013,8 @@ handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg) | |||
2627 | } | 2013 | } |
2628 | set->content = GNUNET_new (struct SetContent); | 2014 | set->content = GNUNET_new (struct SetContent); |
2629 | set->content->refcount = 1; | 2015 | set->content->refcount = 1; |
2630 | set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | 2016 | set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, |
2017 | GNUNET_YES); | ||
2631 | set->cs = cs; | 2018 | set->cs = cs; |
2632 | cs->set = set; | 2019 | cs->set = set; |
2633 | GNUNET_SERVICE_client_continue (cs->client); | 2020 | GNUNET_SERVICE_client_continue (cs->client); |
@@ -2679,17 +2066,21 @@ channel_new_cb (void *cls, | |||
2679 | struct Listener *listener = cls; | 2066 | struct Listener *listener = cls; |
2680 | struct Operation *op; | 2067 | struct Operation *op; |
2681 | 2068 | ||
2682 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n"); | 2069 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2070 | "New incoming channel\n"); | ||
2683 | op = GNUNET_new (struct Operation); | 2071 | op = GNUNET_new (struct Operation); |
2684 | op->listener = listener; | 2072 | op->listener = listener; |
2685 | op->peer = *source; | 2073 | op->peer = *source; |
2686 | op->channel = channel; | 2074 | op->channel = channel; |
2687 | op->mq = GNUNET_CADET_get_mq (op->channel); | 2075 | op->mq = GNUNET_CADET_get_mq (op->channel); |
2688 | op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); | 2076 | op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, |
2077 | UINT32_MAX); | ||
2689 | op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, | 2078 | op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT, |
2690 | &incoming_timeout_cb, | 2079 | &incoming_timeout_cb, |
2691 | op); | 2080 | op); |
2692 | GNUNET_CONTAINER_DLL_insert (listener->op_head, listener->op_tail, op); | 2081 | GNUNET_CONTAINER_DLL_insert (listener->op_head, |
2082 | listener->op_tail, | ||
2083 | op); | ||
2693 | return op; | 2084 | return op; |
2694 | } | 2085 | } |
2695 | 2086 | ||
@@ -2711,7 +2102,8 @@ channel_new_cb (void *cls, | |||
2711 | * @param channel connection to the other end (henceforth invalid) | 2102 | * @param channel connection to the other end (henceforth invalid) |
2712 | */ | 2103 | */ |
2713 | static void | 2104 | static void |
2714 | channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel) | 2105 | channel_end_cb (void *channel_ctx, |
2106 | const struct GNUNET_CADET_Channel *channel) | ||
2715 | { | 2107 | { |
2716 | struct Operation *op = channel_ctx; | 2108 | struct Operation *op = channel_ctx; |
2717 | 2109 | ||
@@ -2725,12 +2117,13 @@ channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel) | |||
2725 | * and be replaced by inlining more specific | 2117 | * and be replaced by inlining more specific |
2726 | * logic in the various places where it is called. | 2118 | * logic in the various places where it is called. |
2727 | */ | 2119 | */ |
2728 | void | 2120 | static void |
2729 | _GSS_operation_destroy2 (struct Operation *op) | 2121 | _GSS_operation_destroy2 (struct Operation *op) |
2730 | { | 2122 | { |
2731 | struct GNUNET_CADET_Channel *channel; | 2123 | struct GNUNET_CADET_Channel *channel; |
2732 | 2124 | ||
2733 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n"); | 2125 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2126 | "channel_end_cb called\n"); | ||
2734 | if (NULL != (channel = op->channel)) | 2127 | if (NULL != (channel = op->channel)) |
2735 | { | 2128 | { |
2736 | /* This will free op; called conditionally as this helper function | 2129 | /* This will free op; called conditionally as this helper function |
@@ -2744,9 +2137,22 @@ _GSS_operation_destroy2 (struct Operation *op) | |||
2744 | return; | 2137 | return; |
2745 | } | 2138 | } |
2746 | if (NULL != op->set) | 2139 | if (NULL != op->set) |
2747 | intersection_channel_death (op); // FIXME: inline | 2140 | { |
2141 | if (GNUNET_YES == op->state->channel_death_expected) | ||
2142 | { | ||
2143 | /* oh goodie, we are done! */ | ||
2144 | send_client_done_and_destroy (op); | ||
2145 | } | ||
2146 | else | ||
2147 | { | ||
2148 | /* sorry, channel went down early, too bad. */ | ||
2149 | _GSS_operation_destroy (op, | ||
2150 | GNUNET_YES); | ||
2151 | } | ||
2152 | } | ||
2748 | else | 2153 | else |
2749 | _GSS_operation_destroy (op, GNUNET_YES); | 2154 | _GSS_operation_destroy (op, |
2155 | GNUNET_YES); | ||
2750 | GNUNET_free (op); | 2156 | GNUNET_free (op); |
2751 | } | 2157 | } |
2752 | 2158 | ||
@@ -2781,7 +2187,8 @@ channel_window_cb (void *cls, | |||
2781 | * @param msg message sent by the client | 2187 | * @param msg message sent by the client |
2782 | */ | 2188 | */ |
2783 | static void | 2189 | static void |
2784 | handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg) | 2190 | handle_client_listen (void *cls, |
2191 | const struct GNUNET_SET_ListenMessage *msg) | ||
2785 | { | 2192 | { |
2786 | struct ClientState *cs = cls; | 2193 | struct ClientState *cs = cls; |
2787 | struct GNUNET_MQ_MessageHandler cadet_handlers[] = | 2194 | struct GNUNET_MQ_MessageHandler cadet_handlers[] = |
@@ -2816,7 +2223,9 @@ handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg) | |||
2816 | cs->listener = listener; | 2223 | cs->listener = listener; |
2817 | listener->app_id = msg->app_id; | 2224 | listener->app_id = msg->app_id; |
2818 | listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); | 2225 | listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation); |
2819 | GNUNET_CONTAINER_DLL_insert (listener_head, listener_tail, listener); | 2226 | GNUNET_CONTAINER_DLL_insert (listener_head, |
2227 | listener_tail, | ||
2228 | listener); | ||
2820 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2229 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2821 | "New listener created (op %u, port %s)\n", | 2230 | "New listener created (op %u, port %s)\n", |
2822 | listener->operation, | 2231 | listener->operation, |
@@ -2840,7 +2249,8 @@ handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg) | |||
2840 | * @param msg message sent by the client | 2249 | * @param msg message sent by the client |
2841 | */ | 2250 | */ |
2842 | static void | 2251 | static void |
2843 | handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg) | 2252 | handle_client_reject (void *cls, |
2253 | const struct GNUNET_SET_RejectMessage *msg) | ||
2844 | { | 2254 | { |
2845 | struct ClientState *cs = cls; | 2255 | struct ClientState *cs = cls; |
2846 | struct Operation *op; | 2256 | struct Operation *op; |
@@ -2872,7 +2282,8 @@ handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg) | |||
2872 | * @param msg message sent by the client | 2282 | * @param msg message sent by the client |
2873 | */ | 2283 | */ |
2874 | static int | 2284 | static int |
2875 | check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) | 2285 | check_client_mutation (void *cls, |
2286 | const struct GNUNET_SET_ElementMessage *msg) | ||
2876 | { | 2287 | { |
2877 | /* NOTE: Technically, we should probably check with the | 2288 | /* NOTE: Technically, we should probably check with the |
2878 | block library whether the element we are given is well-formed */ | 2289 | block library whether the element we are given is well-formed */ |
@@ -2881,16 +2292,20 @@ check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) | |||
2881 | 2292 | ||
2882 | 2293 | ||
2883 | /** | 2294 | /** |
2884 | * Called when a client wants to add or remove an element to a set it inhabits. | 2295 | * Called when a client wants to add an element to a set it inhabits. |
2885 | * | 2296 | * |
2886 | * @param cls client that sent the message | 2297 | * @param cls client that sent the message |
2887 | * @param msg message sent by the client | 2298 | * @param msg message sent by the client |
2888 | */ | 2299 | */ |
2889 | static void | 2300 | static void |
2890 | handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) | 2301 | handle_client_set_add (void *cls, |
2302 | const struct GNUNET_SET_ElementMessage *msg) | ||
2891 | { | 2303 | { |
2892 | struct ClientState *cs = cls; | 2304 | struct ClientState *cs = cls; |
2893 | struct Set *set; | 2305 | struct Set *set; |
2306 | struct GNUNET_SET_Element el; | ||
2307 | struct ElementEntry *ee; | ||
2308 | struct GNUNET_HashCode hash; | ||
2894 | 2309 | ||
2895 | if (NULL == (set = cs->set)) | 2310 | if (NULL == (set = cs->set)) |
2896 | { | 2311 | { |
@@ -2900,23 +2315,45 @@ handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) | |||
2900 | return; | 2315 | return; |
2901 | } | 2316 | } |
2902 | GNUNET_SERVICE_client_continue (cs->client); | 2317 | GNUNET_SERVICE_client_continue (cs->client); |
2903 | 2318 | el.size = ntohs (msg->header.size) - sizeof(*msg); | |
2904 | if (0 != set->content->iterator_count) | 2319 | el.data = &msg[1]; |
2320 | el.element_type = ntohs (msg->element_type); | ||
2321 | GNUNET_ISET_element_hash (&el, | ||
2322 | &hash); | ||
2323 | ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, | ||
2324 | &hash); | ||
2325 | if (NULL == ee) | ||
2905 | { | 2326 | { |
2906 | struct PendingMutation *pm; | 2327 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2907 | 2328 | "Client inserts element %s of size %u\n", | |
2908 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n"); | 2329 | GNUNET_h2s (&hash), |
2909 | pm = GNUNET_new (struct PendingMutation); | 2330 | el.size); |
2910 | pm->msg = | 2331 | ee = GNUNET_malloc (el.size + sizeof(*ee)); |
2911 | (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header); | 2332 | ee->element.size = el.size; |
2912 | pm->set = set; | 2333 | GNUNET_memcpy (&ee[1], el.data, el.size); |
2913 | GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head, | 2334 | ee->element.data = &ee[1]; |
2914 | set->content->pending_mutations_tail, | 2335 | ee->element.element_type = el.element_type; |
2915 | pm); | 2336 | ee->remote = GNUNET_NO; |
2337 | ee->mutations = NULL; | ||
2338 | ee->mutations_size = 0; | ||
2339 | ee->element_hash = hash; | ||
2340 | GNUNET_break (GNUNET_YES == | ||
2341 | GNUNET_CONTAINER_multihashmap_put ( | ||
2342 | set->content->elements, | ||
2343 | &ee->element_hash, | ||
2344 | ee, | ||
2345 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
2346 | } | ||
2347 | else | ||
2348 | { | ||
2349 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2350 | "Client inserted element %s of size %u twice (ignored)\n", | ||
2351 | GNUNET_h2s (&hash), | ||
2352 | el.size); | ||
2353 | /* same element inserted twice */ | ||
2916 | return; | 2354 | return; |
2917 | } | 2355 | } |
2918 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n"); | 2356 | set->state->current_set_element_count++; |
2919 | execute_mutation (set, msg); | ||
2920 | } | 2357 | } |
2921 | 2358 | ||
2922 | 2359 | ||
@@ -2929,24 +2366,13 @@ handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg) | |||
2929 | static void | 2366 | static void |
2930 | advance_generation (struct Set *set) | 2367 | advance_generation (struct Set *set) |
2931 | { | 2368 | { |
2932 | struct GenerationRange r; | ||
2933 | |||
2934 | if (set->current_generation == set->content->latest_generation) | 2369 | if (set->current_generation == set->content->latest_generation) |
2935 | { | 2370 | { |
2936 | set->content->latest_generation++; | 2371 | set->content->latest_generation++; |
2937 | set->current_generation++; | 2372 | set->current_generation++; |
2938 | return; | 2373 | return; |
2939 | } | 2374 | } |
2940 | |||
2941 | GNUNET_assert (set->current_generation < set->content->latest_generation); | 2375 | GNUNET_assert (set->current_generation < set->content->latest_generation); |
2942 | |||
2943 | r.start = set->current_generation + 1; | ||
2944 | r.end = set->content->latest_generation + 1; | ||
2945 | set->content->latest_generation = r.end; | ||
2946 | set->current_generation = r.end; | ||
2947 | GNUNET_array_append (set->excluded_generations, | ||
2948 | set->excluded_generations_size, | ||
2949 | r); | ||
2950 | } | 2376 | } |
2951 | 2377 | ||
2952 | 2378 | ||
@@ -2960,7 +2386,8 @@ advance_generation (struct Set *set) | |||
2960 | * @return #GNUNET_OK if the message is well-formed | 2386 | * @return #GNUNET_OK if the message is well-formed |
2961 | */ | 2387 | */ |
2962 | static int | 2388 | static int |
2963 | check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) | 2389 | check_client_evaluate (void *cls, |
2390 | const struct GNUNET_SET_EvaluateMessage *msg) | ||
2964 | { | 2391 | { |
2965 | /* FIXME: suboptimal, even if the context below could be NULL, | 2392 | /* FIXME: suboptimal, even if the context below could be NULL, |
2966 | there are malformed messages this does not check for... */ | 2393 | there are malformed messages this does not check for... */ |
@@ -2977,7 +2404,8 @@ check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) | |||
2977 | * @param msg message sent by the client | 2404 | * @param msg message sent by the client |
2978 | */ | 2405 | */ |
2979 | static void | 2406 | static void |
2980 | handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) | 2407 | handle_client_evaluate (void *cls, |
2408 | const struct GNUNET_SET_EvaluateMessage *msg) | ||
2981 | { | 2409 | { |
2982 | struct ClientState *cs = cls; | 2410 | struct ClientState *cs = cls; |
2983 | struct Operation *op = GNUNET_new (struct Operation); | 2411 | struct Operation *op = GNUNET_new (struct Operation); |
@@ -3010,7 +2438,8 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) | |||
3010 | GNUNET_SERVICE_client_drop (cs->client); | 2438 | GNUNET_SERVICE_client_drop (cs->client); |
3011 | return; | 2439 | return; |
3012 | } | 2440 | } |
3013 | op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); | 2441 | op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, |
2442 | UINT32_MAX); | ||
3014 | op->peer = msg->target_peer; | 2443 | op->peer = msg->target_peer; |
3015 | op->result_mode = ntohl (msg->result_mode); | 2444 | op->result_mode = ntohl (msg->result_mode); |
3016 | op->client_request_id = ntohl (msg->request_id); | 2445 | op->client_request_id = ntohl (msg->request_id); |
@@ -3025,7 +2454,9 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) | |||
3025 | op->set = set; | 2454 | op->set = set; |
3026 | op->generation_created = set->current_generation; | 2455 | op->generation_created = set->current_generation; |
3027 | advance_generation (set); | 2456 | advance_generation (set); |
3028 | GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op); | 2457 | GNUNET_CONTAINER_DLL_insert (set->ops_head, |
2458 | set->ops_tail, | ||
2459 | op); | ||
3029 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2460 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
3030 | "Creating new CADET channel to port %s for set operation type %u\n", | 2461 | "Creating new CADET channel to port %s for set operation type %u\n", |
3031 | GNUNET_h2s (&msg->app_id), | 2462 | GNUNET_h2s (&msg->app_id), |
@@ -3038,12 +2469,42 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) | |||
3038 | &channel_end_cb, | 2469 | &channel_end_cb, |
3039 | cadet_handlers); | 2470 | cadet_handlers); |
3040 | op->mq = GNUNET_CADET_get_mq (op->channel); | 2471 | op->mq = GNUNET_CADET_get_mq (op->channel); |
3041 | op->state = intersection_evaluate (op, context); // FIXME: inline! | ||
3042 | if (NULL == op->state) | ||
3043 | { | 2472 | { |
3044 | GNUNET_break (0); | 2473 | struct OperationState *state; |
3045 | GNUNET_SERVICE_client_drop (cs->client); | 2474 | struct GNUNET_MQ_Envelope *ev; |
3046 | return; | 2475 | struct OperationRequestMessage *msg; |
2476 | |||
2477 | ev = GNUNET_MQ_msg_nested_mh (msg, | ||
2478 | GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST, | ||
2479 | context); | ||
2480 | if (NULL == ev) | ||
2481 | { | ||
2482 | /* the context message is too large!? */ | ||
2483 | GNUNET_break (0); | ||
2484 | GNUNET_SERVICE_client_drop (cs->client); | ||
2485 | return; | ||
2486 | } | ||
2487 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2488 | "Initiating intersection operation evaluation\n"); | ||
2489 | state = GNUNET_new (struct OperationState); | ||
2490 | /* we started the operation, thus we have to send the operation request */ | ||
2491 | state->phase = PHASE_INITIAL; | ||
2492 | state->my_element_count = op->set->state->current_set_element_count; | ||
2493 | state->my_elements | ||
2494 | = GNUNET_CONTAINER_multihashmap_create (state->my_element_count, | ||
2495 | GNUNET_YES); | ||
2496 | |||
2497 | msg->element_count = htonl (state->my_element_count); | ||
2498 | GNUNET_MQ_send (op->mq, | ||
2499 | ev); | ||
2500 | state->phase = PHASE_COUNT_SENT; | ||
2501 | if (NULL != opaque_context) | ||
2502 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2503 | "Sent op request with context message\n"); | ||
2504 | else | ||
2505 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2506 | "Sent op request without context message\n"); | ||
2507 | op->state = state; | ||
3047 | } | 2508 | } |
3048 | GNUNET_SERVICE_client_continue (cs->client); | 2509 | GNUNET_SERVICE_client_continue (cs->client); |
3049 | } | 2510 | } |
@@ -3056,7 +2517,8 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) | |||
3056 | * @param msg the message | 2517 | * @param msg the message |
3057 | */ | 2518 | */ |
3058 | static void | 2519 | static void |
3059 | handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg) | 2520 | handle_client_cancel (void *cls, |
2521 | const struct GNUNET_SET_CancelMessage *msg) | ||
3060 | { | 2522 | { |
3061 | struct ClientState *cs = cls; | 2523 | struct ClientState *cs = cls; |
3062 | struct Set *set; | 2524 | struct Set *set; |
@@ -3085,7 +2547,8 @@ handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg) | |||
3085 | * the other peer disconnecting. The client may not know about this | 2547 | * the other peer disconnecting. The client may not know about this |
3086 | * yet and try to cancel the (just barely non-existent) operation. | 2548 | * yet and try to cancel the (just barely non-existent) operation. |
3087 | * So this is not a hard error. | 2549 | * So this is not a hard error. |
3088 | */GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 2550 | */// |
2551 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
3089 | "Client canceled non-existent op %u\n", | 2552 | "Client canceled non-existent op %u\n", |
3090 | (uint32_t) ntohl (msg->request_id)); | 2553 | (uint32_t) ntohl (msg->request_id)); |
3091 | } | 2554 | } |
@@ -3109,7 +2572,8 @@ handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg) | |||
3109 | * @param msg the message | 2572 | * @param msg the message |
3110 | */ | 2573 | */ |
3111 | static void | 2574 | static void |
3112 | handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg) | 2575 | handle_client_accept (void *cls, |
2576 | const struct GNUNET_SET_AcceptMessage *msg) | ||
3113 | { | 2577 | { |
3114 | struct ClientState *cs = cls; | 2578 | struct ClientState *cs = cls; |
3115 | struct Set *set; | 2579 | struct Set *set; |
@@ -3136,7 +2600,8 @@ handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg) | |||
3136 | cs, | 2600 | cs, |
3137 | ntohl (msg->accept_reject_id), | 2601 | ntohl (msg->accept_reject_id), |
3138 | cs->listener); | 2602 | cs->listener); |
3139 | ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SETI_RESULT); | 2603 | ev = GNUNET_MQ_msg (result_message, |
2604 | GNUNET_MESSAGE_TYPE_SETI_RESULT); | ||
3140 | result_message->request_id = msg->request_id; | 2605 | result_message->request_id = msg->request_id; |
3141 | result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 2606 | result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
3142 | GNUNET_MQ_send (set->cs->mq, ev); | 2607 | GNUNET_MQ_send (set->cs->mq, ev); |
@@ -3163,12 +2628,34 @@ handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg) | |||
3163 | op->generation_created = set->current_generation; | 2628 | op->generation_created = set->current_generation; |
3164 | advance_generation (set); | 2629 | advance_generation (set); |
3165 | GNUNET_assert (NULL == op->state); | 2630 | GNUNET_assert (NULL == op->state); |
3166 | op->state = intersection_accept (op); // FIXME: inline | ||
3167 | if (NULL == op->state) | ||
3168 | { | 2631 | { |
3169 | GNUNET_break (0); | 2632 | struct OperationState *state; |
3170 | GNUNET_SERVICE_client_drop (cs->client); | 2633 | |
3171 | return; | 2634 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2635 | "Accepting set intersection operation\n"); | ||
2636 | state = GNUNET_new (struct OperationState); | ||
2637 | state->phase = PHASE_INITIAL; | ||
2638 | state->my_element_count | ||
2639 | = op->set->state->current_set_element_count; | ||
2640 | state->my_elements | ||
2641 | = GNUNET_CONTAINER_multihashmap_create ( | ||
2642 | GNUNET_MIN (state->my_element_count, | ||
2643 | op->remote_element_count), | ||
2644 | GNUNET_YES); | ||
2645 | op->state = state; | ||
2646 | if (op->remote_element_count < state->my_element_count) | ||
2647 | { | ||
2648 | /* If the other peer (Alice) has fewer elements than us (Bob), | ||
2649 | we just send the count as Alice should send the first BF */ | ||
2650 | send_element_count (op); | ||
2651 | state->phase = PHASE_COUNT_SENT; | ||
2652 | } | ||
2653 | else | ||
2654 | { | ||
2655 | /* We have fewer elements, so we start with the BF */ | ||
2656 | begin_bf_exchange (op); | ||
2657 | } | ||
2658 | op->state = state; | ||
3172 | } | 2659 | } |
3173 | /* Now allow CADET to continue, as we did not do this in | 2660 | /* Now allow CADET to continue, as we did not do this in |
3174 | #handle_incoming_msg (as we wanted to first see if the | 2661 | #handle_incoming_msg (as we wanted to first see if the |
@@ -3196,8 +2683,10 @@ shutdown_task (void *cls) | |||
3196 | cadet = NULL; | 2683 | cadet = NULL; |
3197 | } | 2684 | } |
3198 | } | 2685 | } |
3199 | GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES); | 2686 | GNUNET_STATISTICS_destroy (_GSS_statistics, |
3200 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n"); | 2687 | GNUNET_YES); |
2688 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
2689 | "handled shutdown request\n"); | ||
3201 | } | 2690 | } |
3202 | 2691 | ||
3203 | 2692 | ||
@@ -3217,8 +2706,10 @@ run (void *cls, | |||
3217 | /* FIXME: need to modify SERVICE (!) API to allow | 2706 | /* FIXME: need to modify SERVICE (!) API to allow |
3218 | us to run a shutdown task *after* clients were | 2707 | us to run a shutdown task *after* clients were |
3219 | forcefully disconnected! */ | 2708 | forcefully disconnected! */ |
3220 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); | 2709 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, |
3221 | _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); | 2710 | NULL); |
2711 | _GSS_statistics = GNUNET_STATISTICS_create ("seti", | ||
2712 | cfg); | ||
3222 | cadet = GNUNET_CADET_connect (cfg); | 2713 | cadet = GNUNET_CADET_connect (cfg); |
3223 | if (NULL == cadet) | 2714 | if (NULL == cadet) |
3224 | { | 2715 | { |
@@ -3234,7 +2725,7 @@ run (void *cls, | |||
3234 | * Define "main" method using service macro. | 2725 | * Define "main" method using service macro. |
3235 | */ | 2726 | */ |
3236 | GNUNET_SERVICE_MAIN ( | 2727 | GNUNET_SERVICE_MAIN ( |
3237 | "set", | 2728 | "seti", |
3238 | GNUNET_SERVICE_OPTION_NONE, | 2729 | GNUNET_SERVICE_OPTION_NONE, |
3239 | &run, | 2730 | &run, |
3240 | &client_connect_cb, | 2731 | &client_connect_cb, |
@@ -3244,7 +2735,7 @@ GNUNET_SERVICE_MAIN ( | |||
3244 | GNUNET_MESSAGE_TYPE_SETI_ACCEPT, | 2735 | GNUNET_MESSAGE_TYPE_SETI_ACCEPT, |
3245 | struct GNUNET_SET_AcceptMessage, | 2736 | struct GNUNET_SET_AcceptMessage, |
3246 | NULL), | 2737 | NULL), |
3247 | GNUNET_MQ_hd_var_size (client_mutation, | 2738 | GNUNET_MQ_hd_var_size (client_set_add, |
3248 | GNUNET_MESSAGE_TYPE_SETI_ADD, | 2739 | GNUNET_MESSAGE_TYPE_SETI_ADD, |
3249 | struct GNUNET_SET_ElementMessage, | 2740 | struct GNUNET_SET_ElementMessage, |
3250 | NULL), | 2741 | NULL), |