From ce25faf22dabe2ba33ed8bd8bea4d0ed6cd7acf8 Mon Sep 17 00:00:00 2001 From: Christian Fuchs Date: Mon, 18 Nov 2013 17:49:57 +0000 Subject: more work on set intersection --- src/set/gnunet-service-set_intersection.c | 106 +++++++++++++++++++++--------- 1 file changed, 75 insertions(+), 31 deletions(-) (limited to 'src/set') diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 1c79d3c73..879f65809 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c @@ -558,38 +558,51 @@ send_element_count (struct Operation *op) GNUNET_MQ_send (op->mq, ev); } +/** + * Send a result message to the client indicating + * that the operation is over. + * After the result done message has been sent to the client, + * destroy the evaluate operation. + * + * @param op intersection operation + */ +static void +finish_and_destroy (struct Operation *op) +{ + GNUNET_assert (GNUNET_NO == op->state->client_done_sent); + + if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n"); + op->state->full_result_iter = + GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->my_elements); + send_remaining_elements (op); + return; + } + send_done_and_destroy (op); +} /** * Handle a done message from a remote peer * - * @param cls the intersection operation + * @param cls the union operation * @param mh the message */ static void handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) { - struct OperationState *eo = cls; + struct Operation *op = cls; struct GNUNET_MQ_Envelope *ev; - if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) - { - /* we got all requests, but still have to send our elements as response */ - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); - eo->phase = PHASE_FINISHED; - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); - GNUNET_MQ_send (eo->mq, ev); - return; - } - if (eo->phase == PHASE_EXPECT_ELEMENTS) - { + if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); - eo->phase = PHASE_FINISHED; - send_client_done_and_destroy (eo); + + finish_and_destroy (op); return; } - GNUNET_break (0); - fail_intersection_operation (eo); + + GNUNET_break_op (0); + fail_intersection_operation (op); } @@ -766,27 +779,57 @@ send_done_and_destroy (void *cls) } /** - * Send a result message to the client indicating - * that the operation is over. - * After the result done message has been sent to the client, - * destroy the evaluate operation. + * Send all elements in the full result iterator. * - * @param op union operation + * @param cls operation */ static void -finish_and_destroy (struct Operation *op) +send_remaining_elements (void *cls) { - GNUNET_assert (GNUNET_NO == op->state->client_done_sent); + struct Operation *op = cls; + struct KeyEntry *ke; + int res; - if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) + res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke); + res = GNUNET_NO; + if (GNUNET_NO == res) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n"); - GNUNET_assert (NULL == op->state->full_result_iter); - op->state->full_result_iter = - GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->my_elements); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n"); + send_done_and_destroy (op); return; } - send_done_and_destroy (op); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n"); + + while (1) + { + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_SET_ResultMessage *rm; + struct GNUNET_SET_Element *element; + element = &ke->element->element; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size); + GNUNET_assert (0 != op->spec->client_request_id); + ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); + if (NULL == ev) + { + GNUNET_MQ_discard (ev); + GNUNET_break (0); + continue; + } + rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->request_id = htonl (op->spec->client_request_id); + rm->element_type = element->type; + memcpy (&rm[1], element->data, element->size); + if (ke->next_colliding == NULL) + { + GNUNET_MQ_notify_sent (ev, send_remaining_elements, op); + GNUNET_MQ_send (op->spec->set->client_mq, ev); + break; + } + GNUNET_MQ_send (op->spec->set->client_mq, ev); + ke = ke->next_colliding; + } } /** @@ -811,6 +854,7 @@ intersection_peer_disconnect (struct Operation *op) _GSS_operation_destroy (op); return; } + // else: the session has already been concluded GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); if (GNUNET_NO == op->state->client_done_sent) finish_and_destroy (op); -- cgit v1.2.3