From caf375948ecc718bac6d75f415cc1c8324a9199c Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Thu, 23 Feb 2017 17:13:39 +0100 Subject: implement union via sending whole set --- src/set/gnunet-service-set_union.c | 351 +++++++++++++++++++++++++++++++++---- 1 file changed, 319 insertions(+), 32 deletions(-) (limited to 'src/set/gnunet-service-set_union.c') diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 137216ed7..d2dfe049b 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -115,14 +115,22 @@ enum UnionOperationPhase * In the penultimate phase, * we wait until all our demands * are satisfied. Then we send a done - * message, and wait for another done message.*/ + * message, and wait for another done message. + */ PHASE_FINISH_WAITING, /** * In the ultimate phase, we wait until * our demands are satisfied and then - * quit (sending another DONE message). */ - PHASE_DONE + * quit (sending another DONE message). + */ + PHASE_DONE, + + /** + * After sending the full set, wait for responses with the elements + * that the local peer is missing. + */ + PHASE_FULL_SENDING, }; @@ -214,6 +222,14 @@ struct KeyEntry * is #GNUNET_YES. */ struct ElementEntry *element; + + /** + * Did we receive this element? + * Even if element->is_foreign is false, we might + * have received the element, so this indicates that + * the other peer has it. + */ + int received; }; @@ -372,6 +388,16 @@ get_ibf_key (const struct GNUNET_HashCode *src) } +/** + * Context for #op_get_element_iterator + */ +struct GetElementContext +{ + struct GNUNET_HashCode hash; + struct KeyEntry *k; +}; + + /** * Iterator over the mapping from IBF keys to element entries. Checks if we * have an element with a given GNUNET_HashCode. @@ -383,17 +409,20 @@ get_ibf_key (const struct GNUNET_HashCode *src) * #GNUNET_NO if we've found the element. */ static int -op_has_element_iterator (void *cls, +op_get_element_iterator (void *cls, uint32_t key, void *value) { - struct GNUNET_HashCode *element_hash = cls; + struct GetElementContext *ctx = cls; struct KeyEntry *k = value; GNUNET_assert (NULL != k); if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, - element_hash)) + &ctx->hash)) + { + ctx->k = k; return GNUNET_NO; + } return GNUNET_YES; } @@ -406,23 +435,29 @@ op_has_element_iterator (void *cls, * @param element_hash hash of the element to look for * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise */ -static int -op_has_element (struct Operation *op, +static struct KeyEntry * +op_get_element (struct Operation *op, const struct GNUNET_HashCode *element_hash) { int ret; struct IBF_Key ibf_key; + struct GetElementContext ctx = { 0 }; + + ctx.hash = *element_hash; ibf_key = get_ibf_key (element_hash); ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val, - op_has_element_iterator, - (void *) element_hash); + op_get_element_iterator, + &ctx); /* was the iteration aborted because we found the element? */ if (GNUNET_SYSERR == ret) - return GNUNET_YES; - return GNUNET_NO; + { + GNUNET_assert (NULL != ctx.k); + return ctx.k; + } + return NULL; } @@ -438,10 +473,12 @@ op_has_element (struct Operation *op, * * @param op the union operation * @param ee the element entry + * @parem received was this element received from the remote peer? */ static void op_register_element (struct Operation *op, - struct ElementEntry *ee) + struct ElementEntry *ee, + int received) { struct IBF_Key ibf_key; struct KeyEntry *k; @@ -450,6 +487,7 @@ op_register_element (struct Operation *op, k = GNUNET_new (struct KeyEntry); k->element = ee; k->ibf_key = ibf_key; + k->received = received; GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, @@ -535,11 +573,29 @@ init_key_to_element_iterator (void *cls, GNUNET_assert (GNUNET_NO == ee->remote); - op_register_element (op, ee); + op_register_element (op, ee, GNUNET_NO); return GNUNET_YES; } +/** + * Initialize the IBF key to element mapping local to this set + * operation. + * + * @param op the set union operation + */ +static void +initialize_key_to_element (struct Operation *op) +{ + unsigned int len; + + GNUNET_assert (NULL == op->state->key_to_element); + len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); + op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); + GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op); +} + + /** * Create an ibf with the operation's elements * of the specified size @@ -552,15 +608,8 @@ static int prepare_ibf (struct Operation *op, uint32_t size) { - if (NULL == op->state->key_to_element) - { - unsigned int len; + GNUNET_assert (NULL != op->state->key_to_element); - len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); - op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); - GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, - init_key_to_element_iterator, op); - } if (NULL != op->state->local_ibf) ibf_destroy (op->state->local_ibf); op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); @@ -708,6 +757,47 @@ get_order_from_difference (unsigned int diff) } +/** + * Send a set element. + * + * @param cls the union operation `struct Operation *` + * @param key unused + * @param value the `struct ElementEntry *` to insert + * into the key-to-element mapping + * @return #GNUNET_YES (to continue iterating) + */ +static int +send_element_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct Operation *op = cls; + struct GNUNET_SET_ElementMessage *emsg; + struct GNUNET_SET_Element *el = value; + struct GNUNET_MQ_Envelope *ev; + + ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); + emsg->element_type = htonl (el->element_type); + GNUNET_memcpy (&emsg[1], el->data, el->size); + GNUNET_MQ_send (op->mq, ev); + return GNUNET_YES; +} + + +static void +send_full_set (struct Operation *op) +{ + struct GNUNET_MQ_Envelope *ev; + + op->state->phase = PHASE_FULL_SENDING; + + (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, + &send_element_iterator, op); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); + GNUNET_MQ_send (op->mq, ev); +} + + /** * Handle a strata estimator from a remote peer * @@ -776,16 +866,29 @@ handle_p2p_strata_estimator (void *cls, "got se diff=%d, using ibf size %d\n", diff, 1< GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements) / 2) { - /* Internal error, best we can do is shut the connection */ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to send IBF, closing connection\n"); - fail_union_operation (op); - return GNUNET_SYSERR; + LOG (GNUNET_ERROR_TYPE_INFO, + "Sending full set (diff=%d, own set=%u)\n", + diff, + GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); + send_full_set (op); + } + else + { + if (GNUNET_OK != + send_ibf (op, + get_order_from_difference (diff))) + { + /* Internal error, best we can do is shut the connection */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to send IBF, closing connection\n"); + fail_union_operation (op); + return GNUNET_SYSERR; + } } + return GNUNET_OK; } @@ -1288,7 +1391,9 @@ handle_p2p_elements (void *cls, op->state->received_total += 1; - if (GNUNET_YES == op_has_element (op, &ee->element_hash)) + struct KeyEntry *ke = op_get_element (op, &ee->element_hash); + + if (NULL != ke) { /* Got repeated element. Should not happen since * we track demands. */ @@ -1296,6 +1401,7 @@ handle_p2p_elements (void *cls, "# repeated elements", 1, GNUNET_NO); + ke->received = GNUNET_YES; GNUNET_free (ee); } else @@ -1303,7 +1409,7 @@ handle_p2p_elements (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Registering new element from remote peer\n"); op->state->received_fresh += 1; - op_register_element (op, ee); + op_register_element (op, ee, GNUNET_YES); /* only send results immediately if the client wants it */ switch (op->spec->result_mode) { @@ -1332,6 +1438,99 @@ handle_p2p_elements (void *cls, } +/** + * Handle an element message from a remote peer. + * + * @param cls the union operation + * @param mh the message + */ +static void +handle_p2p_full_element (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + struct ElementEntry *ee; + const struct GNUNET_SET_ElementMessage *emsg; + uint16_t element_size; + + if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + + emsg = (const struct GNUNET_SET_ElementMessage *) mh; + + element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); + ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); + GNUNET_memcpy (&ee[1], &emsg[1], element_size); + ee->element.size = element_size; + ee->element.data = &ee[1]; + ee->element.element_type = ntohs (emsg->element_type); + ee->remote = GNUNET_YES; + GNUNET_SET_element_hash (&ee->element, &ee->element_hash); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got element (full diff, size %u, hash %s) from peer\n", + (unsigned int) element_size, + GNUNET_h2s (&ee->element_hash)); + + GNUNET_STATISTICS_update (_GSS_statistics, + "# received elements", + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (_GSS_statistics, + "# exchanged elements", + 1, + GNUNET_NO); + + op->state->received_total += 1; + + struct KeyEntry *ke = op_get_element (op, &ee->element_hash); + + if (NULL != ke) + { + /* Got repeated element. Should not happen since + * we track demands. */ + GNUNET_STATISTICS_update (_GSS_statistics, + "# repeated elements", + 1, + GNUNET_NO); + ke->received = GNUNET_YES; + GNUNET_free (ee); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Registering new element from remote peer\n"); + op->state->received_fresh += 1; + op_register_element (op, ee, GNUNET_YES); + /* only send results immediately if the client wants it */ + switch (op->spec->result_mode) + { + case GNUNET_SET_RESULT_ADDED: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); + break; + case GNUNET_SET_RESULT_SYMMETRIC: + send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); + break; + default: + /* Result mode not supported, should have been caught earlier. */ + GNUNET_break (0); + break; + } + } + + if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3) + { + /* The other peer gave us lots of old elements, there's something wrong. */ + GNUNET_break_op (0); + fail_union_operation (op); + return; + } +} + /** * Send offers (for GNUNET_Hash-es) in response * to inquiries (for IBF_Key-s). @@ -1378,6 +1577,85 @@ handle_p2p_inquiry (void *cls, } +/** + * Iterator over hash map entries, called to + * destroy the linked list of colliding ibf key entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. + */ +static int +send_missing_elements_iter (void *cls, + uint32_t key, + void *value) +{ + struct Operation *op = cls; + struct KeyEntry *ke = value; + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ElementMessage *emsg; + struct ElementEntry *ee = ke->element; + + if (GNUNET_YES == ke->received) + return GNUNET_YES; + + ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); + GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); + emsg->reserved = htons (0); + emsg->element_type = htons (ee->element.element_type); + GNUNET_MQ_send (op->mq, ev); + + return GNUNET_YES; +} + +/** + * Handle a "full done" message. + * + * @parem cls closure, a set union operation + * @param mh the demand message + */ +static void +handle_p2p_full_done (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + + if (PHASE_EXPECT_IBF == op->state->phase) + { + struct GNUNET_MQ_Envelope *ev; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n"); + + /* send all the elements that did not come from the remote peer */ + GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, + &send_missing_elements_iter, + op); + + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); + GNUNET_MQ_send (op->mq, ev); + op->state->phase = PHASE_DONE; + + /* we now wait until the other peer shuts the tunnel down*/ + } + else if (PHASE_FULL_SENDING == op->state->phase) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n"); + /* We sent the full set, and got the response for that. We're done. */ + op->state->phase = PHASE_DONE; + send_done_and_destroy (op); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase); + GNUNET_break_op (0); + fail_union_operation (op); + return; + } +} + + /** * Handle a demand by the other peer for elements based on a list * of GNUNET_HashCode-s. @@ -1635,6 +1913,8 @@ union_evaluate (struct Operation *op, else LOG (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); + + initialize_key_to_element (op); } @@ -1664,6 +1944,7 @@ union_accept (struct Operation *op) op->state->se = strata_estimator_dup (op->spec->set->state->se); op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); op->state->salt_receive = op->state->salt_send = 42; + initialize_key_to_element (op); /* kick off the operation */ send_strata_estimator (op); } @@ -1771,6 +2052,9 @@ union_handle_p2p_message (struct Operation *op, case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: handle_p2p_elements (op, mh); break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT: + handle_p2p_full_element (op, mh); + break; case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: handle_p2p_inquiry (op, mh); break; @@ -1783,6 +2067,9 @@ union_handle_p2p_message (struct Operation *op, case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: handle_p2p_demand (op, mh); break; + case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE: + handle_p2p_full_done (op, mh); + break; default: /* Something wrong with cadet's message handlers? */ GNUNET_assert (0); -- cgit v1.2.3