From 6f54b50858457dfa2b5f0b519fbf230e1119c6b2 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Wed, 15 May 2013 10:48:55 +0000 Subject: test cases for mq, set works --- src/include/gnunet_protocols.h | 2 - src/set/Makefile.am | 20 ++- src/set/gnunet-service-set.c | 180 +++++++++++++++++++------ src/set/gnunet-service-set.h | 28 ++-- src/set/gnunet-service-set_union.c | 262 ++++++++++++++++++++++++++++++++----- src/set/gnunet-set-bug.c | 2 + src/set/gnunet-set.c | 115 ++++++++++++++-- src/set/ibf.c | 1 + src/set/mq.c | 130 +++++++++++++++--- src/set/mq.h | 3 +- src/set/set.h | 28 ++-- src/set/set_api.c | 44 ++++--- src/set/strata_estimator.c | 2 + src/set/test_mq.c | 115 ++++++++++++++++ src/set/test_mq_client.c | 181 +++++++++++++++++++++++++ src/set/test_set.conf | 23 ++++ src/set/test_set_api.c | 101 +++++++++++++- 17 files changed, 1093 insertions(+), 144 deletions(-) create mode 100644 src/set/test_mq.c create mode 100644 src/set/test_mq_client.c (limited to 'src') diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 33e97b80c..a80d4afe7 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -1779,13 +1779,11 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_SET_ADD 573 - /** * Remove element from set */ #define GNUNET_MESSAGE_TYPE_SET_REMOVE 574 - /** * Listen for operation requests */ diff --git a/src/set/Makefile.am b/src/set/Makefile.am index 2de531ce3..a609840b1 100644 --- a/src/set/Makefile.am +++ b/src/set/Makefile.am @@ -69,7 +69,7 @@ libgnunetset_la_LDFLAGS = \ $(GN_LIB_LDFLAGS) check_PROGRAMS = \ - test_set_api + test_set_api test_mq test_mq_client if ENABLE_TEST_RUN TESTS = $(check_PROGRAMS) @@ -84,6 +84,24 @@ test_set_api_LDADD = \ test_set_api_DEPENDENCIES = \ libgnunetset.la + +test_mq_SOURCES = \ + test_mq.c \ + mq.c +test_mq_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/stream/libgnunetstream.la +test_mq_CFLAGS = $(AM_CFLAGS) + + +test_mq_client_SOURCES = \ + test_mq_client.c \ + mq.c +test_mq_client_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/stream/libgnunetstream.la +test_mq_client_CFLAGS = $(AM_CFLAGS) + EXTRA_DIST = \ test_set.conf diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index aea198a61..3ed896775 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c @@ -72,24 +72,11 @@ static struct Incoming *incoming_head; static struct Incoming *incoming_tail; /** - * Counter for allocating unique request IDs for clients. - * Used to identify incoming requests from remote peers. + * Counter for allocating unique IDs for clients, + * used to identify incoming operation requests from remote peers, + * that the client can choose to accept or refuse. */ -static uint32_t request_id = 1; - - -/** - * Disconnect a client and free all resources - * that the client allocated (e.g. Sets or Listeners) - * - * @param client the client to disconnect - */ -void -_GSS_client_disconnect (struct GNUNET_SERVER_Client *client) -{ - /* FIXME: clean up any data structures belonging to the client */ - GNUNET_SERVER_client_disconnect (client); -} +static uint32_t accept_id = 1; /** @@ -140,12 +127,83 @@ get_incoming (uint32_t id) { struct Incoming *incoming; for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) - if (incoming->request_id == id) + if (incoming->accept_id == id) return incoming; return NULL; } +/** + * Destroy a listener, free all resources associated with it. + * + * @param listener listener to destroy + */ +static void +destroy_listener (struct Listener *listener) +{ + if (NULL != listener->client_mq) + { + GNUNET_MQ_destroy (listener->client_mq); + listener->client_mq = NULL; + } + if (NULL != listener->client) + { + GNUNET_SERVER_client_drop (listener->client); + listener->client = NULL; + } + + GNUNET_CONTAINER_DLL_remove (listeners_head, listeners_tail, listener); + GNUNET_free (listener); +} + + +/** + * Destroy a set, and free all resources associated with it. + * + * @param set the set to destroy + */ +static void +destroy_set (struct Set *set) +{ + switch (set->operation) + { + case GNUNET_SET_OPERATION_INTERSECTION: + GNUNET_assert (0); + break; + case GNUNET_SET_OPERATION_UNION: + _GSS_union_set_destroy (set); + break; + default: + GNUNET_assert (0); + break; + } + GNUNET_CONTAINER_DLL_remove (sets_head, sets_tail, set); + GNUNET_free (set); +} + + +/** + * Clean up after a client after it is + * disconnected (either by us or by itself) + * + * @param cls closure, unused + * @param client the client to clean up after + */ +void +handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) +{ + struct Set *set; + struct Listener *listener; + + set = get_set (client); + if (NULL != set) + destroy_set (set); + listener = get_listener (client); + if (NULL != listener) + destroy_listener (listener); +} + + /** * Destroy an incoming request from a remote peer * @@ -186,16 +244,16 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) struct Listener *listener; const struct GNUNET_MessageHeader *context_msg; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got operation request\n"); - if (ntohs (mh->size) < sizeof *msg) { + /* message is to small for its type */ GNUNET_break (0); destroy_incoming (incoming); return; } else if (ntohs (mh->size) == sizeof *msg) { + /* there is no context message */ context_msg = NULL; } else @@ -209,13 +267,17 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) return; } } + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n", + ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); + /* find the appropriate listener */ for (listener = listeners_head; listener != NULL; listener = listener->next) { if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) || - (htons (msg->operation) != listener->operation) ) + (ntohs (msg->operation) != listener->operation) ) continue; mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST); if (GNUNET_OK != GNUNET_MQ_nest_mh (mqm, context_msg)) @@ -225,11 +287,15 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) GNUNET_break (0); return; } - incoming->request_id = request_id++; - cmsg->request_id = htonl (incoming->request_id); + incoming->accept_id = accept_id++; + cmsg->accept_id = htonl (incoming->accept_id); GNUNET_MQ_send (listener->client_mq, mqm); return; } + + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "set operation request from peer failed: " + "no set with matching application ID and operation type\n"); } @@ -248,7 +314,8 @@ handle_client_create (void *cls, struct SetCreateMessage *msg = (struct SetCreateMessage *) m; struct Set *set; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new set created\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", + ntohs (msg->operation)); if (NULL != get_set (client)) { @@ -276,9 +343,9 @@ handle_client_create (void *cls, } set->client = client; + GNUNET_SERVER_client_keep (client); set->client_mq = GNUNET_MQ_queue_for_server_client (client); GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set); - GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -297,19 +364,22 @@ handle_client_listen (void *cls, { struct ListenMessage *msg = (struct ListenMessage *) m; struct Listener *listener; - + if (NULL != get_listener (client)) { GNUNET_break (0); GNUNET_SERVER_client_disconnect (client); return; } - listener = GNUNET_new (struct Listener); + listener->client = client; + GNUNET_SERVER_client_keep (client); + listener->client_mq = GNUNET_MQ_queue_for_server_client (client); listener->app_id = msg->app_id; - listener->operation = msg->operation; + listener->operation = ntohs (msg->operation); GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); - + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new listener created (op %u, app %s)\n", + listener->operation, GNUNET_h2s (&listener->app_id)); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -333,13 +403,13 @@ handle_client_remove (void *cls, if (NULL == set) { GNUNET_break (0); - _GSS_client_disconnect (client); + GNUNET_SERVER_client_disconnect (client); return; } switch (set->operation) { case GNUNET_SET_OPERATION_UNION: - _GSS_union_add ((struct ElementMessage *) m, set); + _GSS_union_remove ((struct ElementMessage *) m, set); case GNUNET_SET_OPERATION_INTERSECTION: /* FIXME: cfuchs */ break; @@ -371,13 +441,13 @@ handle_client_add (void *cls, if (NULL == set) { GNUNET_break (0); - _GSS_client_disconnect (client); + GNUNET_SERVER_client_disconnect (client); return; } switch (set->operation) { case GNUNET_SET_OPERATION_UNION: - _GSS_union_remove ((struct ElementMessage *) m, set); + _GSS_union_add ((struct ElementMessage *) m, set); case GNUNET_SET_OPERATION_INTERSECTION: /* FIXME: cfuchs */ break; @@ -408,7 +478,7 @@ handle_client_evaluate (void *cls, if (NULL == set) { GNUNET_break (0); - _GSS_client_disconnect (client); + GNUNET_SERVER_client_disconnect (client); return; } @@ -481,22 +551,30 @@ handle_client_accept (void *cls, struct Incoming *incoming; struct AcceptMessage *msg = (struct AcceptMessage *) mh; - set = get_set (client); - if (NULL == set) + incoming = get_incoming (ntohl (msg->accept_id)); + + if (NULL == incoming) { GNUNET_break (0); - _GSS_client_disconnect (client); + GNUNET_SERVER_client_disconnect (client); return; } - incoming = get_incoming (ntohl (msg->request_id)); + if (0 == ntohl (msg->request_id)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); + destroy_incoming (incoming); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + + set = get_set (client); - if ( (NULL == incoming) || - (incoming->operation != set->operation) ) + if (NULL == set) { GNUNET_break (0); - _GSS_client_disconnect (client); + GNUNET_SERVER_client_disconnect (client); return; } @@ -513,8 +591,10 @@ handle_client_accept (void *cls, GNUNET_assert (0); break; } - /* FIXME: destroy incoming */ + /* note: _GSS_*_accept has to make sure the socket and mq are set to NULL, + * otherwise they will be destroyed and disconnected */ + destroy_incoming (incoming); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -574,6 +654,21 @@ shutdown_task (void *cls, stream_listen_socket = NULL; } + while (NULL != incoming_head) + { + destroy_incoming (incoming_head); + } + + while (NULL != listeners_head) + { + destroy_listener (listeners_head); + } + + while (NULL != sets_head) + { + destroy_set (sets_head); + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); } @@ -604,6 +699,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, configuration = cfg; GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); + GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL); GNUNET_SERVER_add_handlers (server, server_handlers); stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET, &stream_listen_cb, NULL, diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index a5a53671c..cc28e9701 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h @@ -109,8 +109,8 @@ struct Listener struct Listener *prev; /** - * Client that owns the set. - * Only one client may own a set. + * Client that owns the listener. + * Only one client may own a listener. */ struct GNUNET_SERVER_Client *client; @@ -188,9 +188,10 @@ struct Incoming /** * Unique request id for the request from - * a remote peer. + * a remote peer, sent to the client with will + * accept or reject the request. */ - uint32_t request_id; + uint32_t accept_id; }; @@ -200,16 +201,6 @@ struct Incoming extern const struct GNUNET_CONFIGURATION_Handle *configuration; -/** - * Disconnect a client and free all resources - * that the client allocated (e.g. Sets or Listeners) - * - * @param client the client to disconnect - */ -void -_GSS_client_disconnect (struct GNUNET_SERVER_Client *client); - - /** * Create a new set supporting the union operation * @@ -251,6 +242,15 @@ void _GSS_union_remove (struct ElementMessage *m, struct Set *set); +/** + * Destroy a set that supports the union operation + * + * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION + */ +void +_GSS_union_set_destroy (struct Set *set); + + /** * Accept an union operation request from a remote peer * diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index efedbcef6..694fb6056 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -134,11 +134,6 @@ struct UnionEvaluateOperation */ struct GNUNET_MQ_MessageQueue *mq; - /** - * Type of this operation - */ - enum GNUNET_SET_OperationType operation; - /** * Request ID to multiplex set operations to * the client inhabiting the set. @@ -330,6 +325,45 @@ struct UnionState }; + +/** + * Iterator over hash map entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. + */ +static int +destroy_elements_iterator (void *cls, + const struct GNUNET_HashCode * key, + void *value) +{ + struct ElementEntry *ee = value; + + GNUNET_free (ee); + return GNUNET_YES; +} + + +/** + * Destroy the elements belonging to a union set. + * + * @param us union state that contains the elements + */ +static void +destroy_elements (struct UnionState *us) +{ + if (NULL == us->elements) + return; + GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL); + GNUNET_CONTAINER_multihashmap_destroy (us->elements); + us->elements = NULL; +} + + /** * Destroy a union operation, and free all resources * associated with it. @@ -339,6 +373,38 @@ struct UnionState static void destroy_union_operation (struct UnionEvaluateOperation *eo) { + if (NULL != eo->mq) + { + GNUNET_MQ_destroy (eo->mq); + eo->mq = NULL; + } + if (NULL != eo->socket) + { + GNUNET_STREAM_close (eo->socket); + eo->socket = NULL; + } + if (NULL != eo->remote_ibf) + { + ibf_destroy (eo->remote_ibf); + eo->remote_ibf = NULL; + } + if (NULL != eo->local_ibf) + { + ibf_destroy (eo->local_ibf); + eo->local_ibf = NULL; + } + if (NULL != eo->se) + { + strata_estimator_destroy (eo->se); + eo->se = NULL; + } + if (NULL != eo->key_to_element) + { + GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); + eo->key_to_element = NULL; + } + + GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, eo->set->state.u->ops_tail, eo); @@ -361,7 +427,7 @@ fail_union_operation (struct UnionEvaluateOperation *eo) mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); - msg->request_id = eo->request_id; + msg->request_id = htonl (eo->request_id); GNUNET_MQ_send (eo->set->client_mq, mqm); destroy_union_operation (eo); } @@ -405,12 +471,12 @@ send_operation_request (struct UnionEvaluateOperation *eo) if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size))) { /* the context message is too large */ - _GSS_client_disconnect (eo->set->client); - GNUNET_MQ_discard (mqm); GNUNET_break (0); + GNUNET_SERVER_client_disconnect (eo->set->client); + GNUNET_MQ_discard (mqm); return; } - msg->operation = eo->operation; + msg->operation = htons (GNUNET_SET_OPERATION_UNION); msg->app_id = eo->app_id; GNUNET_MQ_send (eo->mq, mqm); @@ -547,7 +613,7 @@ prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size) { unsigned int len; len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements); - eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len); + eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements, init_key_to_element_iterator, eo); } @@ -573,6 +639,8 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) prepare_ibf (eo, 1<local_ibf; while (buckets_sent < (1 << ibf_order)) @@ -588,7 +656,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, GNUNET_MESSAGE_TYPE_SET_P2P_IBF); - msg->order = htons (ibf_order); + msg->order = ibf_order; msg->offset = htons (buckets_sent); ibf_write_slice (ibf, buckets_sent, buckets_in_message, &msg[1]); @@ -654,7 +722,6 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) struct StrataEstimator *remote_se; int diff; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n"); if (eo->phase != PHASE_EXPECT_SE) { @@ -667,6 +734,7 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) strata_estimator_read (&mh[1], remote_se); GNUNET_assert (NULL != eo->se); diff = strata_estimator_difference (remote_se, eo->se); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff); strata_estimator_destroy (remote_se); strata_estimator_destroy (eo->se); eo->se = NULL; @@ -708,6 +776,7 @@ send_element_iterator (void *cls, continue; } GNUNET_MQ_send (eo->mq, mqm); + ke = ke->next_colliding; } return GNUNET_NO; } @@ -731,7 +800,6 @@ send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key } - /** * Decode which elements are missing on each side, and * send the appropriate elemens and requests @@ -758,11 +826,22 @@ decode_and_send (struct UnionEvaluateOperation *eo) res = ibf_decode (diff_ibf, &side, &key); if (GNUNET_SYSERR == res) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n", - diff_ibf->size * 2); - send_ibf (eo, diff_ibf->size * 2); - ibf_destroy (diff_ibf); - return; + int next_order; + next_order = 0; + while (1<size) + next_order++; + next_order++; + if (next_order <= MAX_IBF_ORDER) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n", + 1<mq, mqm); - return; + break; } if (1 == side) { @@ -790,6 +869,7 @@ decode_and_send (struct UnionEvaluateOperation *eo) GNUNET_MQ_send (eo->mq, mqm); } } + ibf_destroy (diff_ibf); } @@ -811,6 +891,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) { eo->phase = PHASE_EXPECT_IBF_CONT; GNUNET_assert (NULL == eo->remote_ibf); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<order); eo->remote_ibf = ibf_create (1<order, SE_IBF_HASH_NUM); if (0 != ntohs (msg->offset)) { @@ -825,6 +906,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) { GNUNET_break (0); fail_union_operation (eo); + return; } } @@ -834,13 +916,16 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) { GNUNET_break (0); fail_union_operation (eo); + return; } - + ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); eo->ibf_buckets_received += buckets_in_message; if (eo->ibf_buckets_received == eo->remote_ibf->size) { + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n"); eo->phase = PHASE_EXPECT_ELEMENTS; decode_and_send (eo); } @@ -848,7 +933,8 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) /** - * Send an element to the client of the operations's set. + * Send a result message to the client indicating + * that there is a new element. * * @param eo union operation * @param element element to send @@ -862,6 +948,8 @@ send_client_element (struct UnionEvaluateOperation *eo, GNUNET_assert (0 != eo->request_id); mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); + rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->request_id = htonl (eo->request_id); if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) { GNUNET_MQ_discard (mqm); @@ -869,7 +957,46 @@ send_client_element (struct UnionEvaluateOperation *eo, return; } - GNUNET_MQ_send (eo->mq, mqm); + GNUNET_MQ_send (eo->set->client_mq, mqm); +} + + +/** + * Callback used for notifications + * + * @param cls closure + */ +static void +client_done_sent_cb (void *cls) +{ + //struct UnionEvaluateOperation *eo = cls; + /* FIXME: destroy eo */ +} + + +/** + * Send a result message to the client indicating + * that the operation is over. + * After the result done message has been sent to the client, + * destroy the evaluate operation. + * + * @param eo union operation + * @param element element to send + */ +static void +send_client_done_and_destroy (struct UnionEvaluateOperation *eo) +{ + struct GNUNET_MQ_Message *mqm; + struct ResultMessage *rm; + + GNUNET_assert (0 != eo->request_id); + mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); + rm->request_id = htonl (eo->request_id); + rm->result_status = htons (GNUNET_SET_STATUS_DONE); + GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo); + GNUNET_MQ_send (eo->set->client_mq, mqm); + + /* FIXME: destroy the eo */ } @@ -886,6 +1013,8 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) struct ElementEntry *ee; uint16_t element_size; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n"); + if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) { @@ -920,8 +1049,8 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) /* look up elements and send them */ if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) { - fail_union_operation (eo); GNUNET_break (0); + fail_union_operation (eo); return; } @@ -929,8 +1058,8 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) { - fail_union_operation (eo); GNUNET_break (0); + fail_union_operation (eo); return; } @@ -943,6 +1072,20 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) } +/** + * Callback used for notifications + * + * @param cls closure + */ +static void +peer_done_sent_cb (void *cls) +{ + struct UnionEvaluateOperation *eo = cls; + + send_client_done_and_destroy (eo); +} + + /** * Handle a done message from a remote peer * @@ -959,15 +1102,18 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) /* we got all requests, but still have to send our elements as response */ struct GNUNET_MQ_Message *mqm; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); eo->phase = PHASE_FINISHED; mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); + GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo); GNUNET_MQ_send (eo->mq, mqm); return; } if (eo->phase == PHASE_EXPECT_ELEMENTS) { - /* it's all over! */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n"); eo->phase = PHASE_FINISHED; + send_client_done_and_destroy (eo); return; } GNUNET_break (0); @@ -1026,19 +1172,27 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set) { struct UnionEvaluateOperation *eo; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n"); - eo = GNUNET_new (struct UnionEvaluateOperation); eo->peer = m->peer; eo->set = set; - eo->request_id = htons(m->request_id); + eo->request_id = htonl (m->request_id); + GNUNET_assert (0 != eo->request_id); eo->se = strata_estimator_dup (set->state.u->se); eo->salt = ntohs (m->salt); eo->app_id = m->app_id; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation, (app %s)\n", + GNUNET_h2s (&eo->app_id)); + eo->socket = GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET, stream_open_cb, eo, GNUNET_STREAM_OPTION_END); + + + GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, + eo->set->state.u->ops_tail, + eo); /* the stream open callback will kick off the operation */ } @@ -1056,18 +1210,30 @@ _GSS_union_accept (struct AcceptMessage *m, struct Set *set, { struct UnionEvaluateOperation *eo; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); + eo = GNUNET_new (struct UnionEvaluateOperation); eo->generation_created = set->state.u->current_generation++; eo->set = set; eo->peer = incoming->peer; eo->salt = ntohs (incoming->salt); - eo->request_id = m->request_id; + GNUNET_assert (0 != ntohl (m->request_id)); + eo->request_id = ntohl (m->request_id); eo->se = strata_estimator_dup (set->state.u->se); eo->set = set; eo->mq = incoming->mq; + /* transfer ownership of mq and socket from incoming to eo */ + incoming->mq = NULL; + eo->socket = incoming->socket; + incoming->socket = NULL; /* the peer's socket is now ours, we'll receive all messages */ GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo); - /* kick of the operation */ + + GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, + eo->set->state.u->ops_tail, + eo); + + /* kick off the operation */ send_strata_estimator (eo); } @@ -1082,7 +1248,7 @@ _GSS_union_set_create (void) { struct Set *set; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n"); set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState)); set->state.u = (struct UnionState *) &set[1]; @@ -1109,6 +1275,8 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set) struct ElementEntry *ee_dup; uint16_t element_size; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n"); + GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); element_size = ntohs (m->header.size) - sizeof *m; ee = GNUNET_malloc (element_size + sizeof *ee); @@ -1130,6 +1298,38 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set) } +/** + * Destroy a set that supports the union operation + * + * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION + */ +void +_GSS_union_set_destroy (struct Set *set) +{ + GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); + if (NULL != set->client) + { + GNUNET_SERVER_client_drop (set->client); + set->client = NULL; + } + if (NULL != set->client_mq) + { + GNUNET_MQ_destroy (set->client_mq); + set->client_mq = NULL; + } + + if (NULL != set->state.u->se) + { + strata_estimator_destroy (set->state.u->se); + set->state.u->se = NULL; + } + + destroy_elements (set->state.u); + + while (NULL != set->state.u->ops_head) + destroy_union_operation (set->state.u->ops_head); +} + /** * Remove the element given in the element message from the set. * Only marks the element as removed, so that older set operations can still exchange it. diff --git a/src/set/gnunet-set-bug.c b/src/set/gnunet-set-bug.c index edcd8b561..112def7d7 100644 --- a/src/set/gnunet-set-bug.c +++ b/src/set/gnunet-set-bug.c @@ -113,6 +113,8 @@ run (void *cls, char *const *args, cfg = GNUNET_CONFIGURATION_dup (cfg2); GNUNET_CRYPTO_get_host_identity (cfg, &local_id); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "I am Peer %s\n", GNUNET_h2s (&local_id.hashPubKey)); + listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET, &listen_cb, diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c index c49b60dfd..d665fce11 100644 --- a/src/set/gnunet-set.c +++ b/src/set/gnunet-set.c @@ -36,6 +36,53 @@ static struct GNUNET_HashCode app_id; static struct GNUNET_SET_Handle *set1; static struct GNUNET_SET_Handle *set2; static struct GNUNET_SET_ListenHandle *listen_handle; +const static struct GNUNET_CONFIGURATION_Handle *config; + +int num_done; + + +static void +result_cb_set1 (void *cls, struct GNUNET_SET_Element *element, + enum GNUNET_SET_Status status) +{ + switch (status) + { + case GNUNET_SET_STATUS_OK: + printf ("set 1: got element\n"); + break; + case GNUNET_SET_STATUS_FAILURE: + printf ("set 1: failure\n"); + break; + case GNUNET_SET_STATUS_DONE: + printf ("set 1: done\n"); + GNUNET_SET_destroy (set1); + break; + default: + GNUNET_assert (0); + } +} + + +static void +result_cb_set2 (void *cls, struct GNUNET_SET_Element *element, + enum GNUNET_SET_Status status) +{ + switch (status) + { + case GNUNET_SET_STATUS_OK: + printf ("set 2: got element\n"); + break; + case GNUNET_SET_STATUS_FAILURE: + printf ("set 2: failure\n"); + break; + case GNUNET_SET_STATUS_DONE: + printf ("set 2: done\n"); + GNUNET_SET_destroy (set2); + break; + default: + GNUNET_assert (0); + } +} static void @@ -45,13 +92,67 @@ listen_cb (void *cls, struct GNUNET_SET_Request *request) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); + GNUNET_SET_listen_cancel (listen_handle); + + GNUNET_SET_accept (request, set2, GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); } + +/** + * Start the set operation. + * + * @param cls closure, unused + */ static void -result_cb (void *cls, struct GNUNET_SET_Element *element, - enum GNUNET_SET_Status status) +start (void *cls) +{ + listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, + &app_id, listen_cb, NULL); + GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, + GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED, + result_cb_set1, NULL); +} + + +/** + * Initialize the second set, continue + * + * @param cls closure, unused + */ +static void +init_set2 (void *cls) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result\n"); + struct GNUNET_SET_Element element; + + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n"); + + element.data = "hello"; + element.size = strlen(element.data); + GNUNET_SET_add_element (set2, &element, NULL, NULL); + element.data = "quux"; + element.size = strlen(element.data); + GNUNET_SET_add_element (set2, &element, start, NULL); +} + + +/** + * Initialize the first set, continue. + */ +static void +init_set1 (void) +{ + struct GNUNET_SET_Element element; + + element.data = "hello"; + element.size = strlen(element.data); + GNUNET_SET_add_element (set1, &element, NULL, NULL); + element.data = "bar"; + element.size = strlen(element.data); + GNUNET_SET_add_element (set1, &element, init_set2, NULL); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n"); } @@ -69,6 +170,8 @@ run (void *cls, char *const *args, const struct GNUNET_CONFIGURATION_Handle *cfg) { static const char* app_str = "gnunet-set"; + + config = cfg; GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id); @@ -76,12 +179,8 @@ run (void *cls, char *const *args, set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); - listen_handle = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, - &app_id, listen_cb, NULL); - GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, - GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED, - result_cb, NULL); + init_set1 (); } diff --git a/src/set/ibf.c b/src/set/ibf.c index 739b97339..383ce3daf 100644 --- a/src/set/ibf.c +++ b/src/set/ibf.c @@ -280,6 +280,7 @@ ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct Invertib struct IBF_KeyHash *key_hash_src; struct IBF_Count *count_src; + GNUNET_assert (count > 0); GNUNET_assert (start + count <= ibf->size); /* copy keys */ diff --git a/src/set/mq.c b/src/set/mq.c index 3a9e614e9..0ced014dd 100644 --- a/src/set/mq.c +++ b/src/set/mq.c @@ -192,13 +192,22 @@ static void dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) { const struct GNUNET_MQ_Handler *handler; + int handled = GNUNET_NO; handler = mq->handlers; if (NULL == handler) return; for (; NULL != handler->cb; handler++) + { if (handler->type == ntohs (mh->type)) + { handler->cb (mq->handlers_cls, mh); + handled = GNUNET_YES; + } + } + + if (GNUNET_NO == handled) + LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type)); } @@ -220,6 +229,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm) void GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) { + GNUNET_assert (NULL != mq); mq->send_impl (mq, mqm); } @@ -228,6 +238,7 @@ struct GNUNET_MQ_Message * GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) { struct GNUNET_MQ_Message *mqm; + mqm = GNUNET_malloc (sizeof *mqm + size); mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; mqm->mh->size = htons (size); @@ -245,16 +256,18 @@ GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, size_t new_size; size_t old_size; + GNUNET_assert (NULL != mqmp); + /* there's no data to append => do nothing */ if (NULL == data) return GNUNET_OK; - GNUNET_assert (NULL != mqmp); old_size = ntohs ((*mqmp)->mh->size); /* message too large to concatenate? */ - if (ntohs ((*mqmp)->mh->size) + len < len) + if (((uint16_t) (old_size + len)) < len) return GNUNET_SYSERR; new_size = old_size + len; - *mqmp = GNUNET_realloc (mqmp, sizeof (struct GNUNET_MQ_Message) + new_size); - memcpy ((*mqmp)->mh + old_size, data, new_size - old_size); + *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size); + (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1]; + memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size); (*mqmp)->mh->size = htons (new_size); return GNUNET_OK; } @@ -286,12 +299,10 @@ stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) /* call cb for message we finished sending */ mqm = mq->current_msg; - if (NULL != mqm) - { - if (NULL != mqm->sent_cb) - mqm->sent_cb (mqm->sent_cls); - GNUNET_free (mqm); - } + GNUNET_assert (NULL != mq->current_msg); + if (NULL != mqm->sent_cb) + mqm->sent_cb (mqm->sent_cls); + GNUNET_free (mqm); mss->wh = NULL; @@ -384,6 +395,35 @@ stream_data_processor (void *cls, } +static void +stream_socket_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) +{ + struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state; + + if (NULL != mss->rh) + { + GNUNET_STREAM_read_cancel (mss->rh); + mss->rh = NULL; + } + + if (NULL != mss->wh) + { + GNUNET_STREAM_write_cancel (mss->wh); + mss->wh = NULL; + } + + if (NULL != mss->mst) + { + GNUNET_SERVER_mst_destroy (mss->mst); + mss->mst = NULL; + } + + GNUNET_free (mss); +} + + + + struct GNUNET_MQ_MessageQueue * GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, const struct GNUNET_MQ_Handler *handlers, @@ -397,6 +437,7 @@ GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, mss->socket = socket; mq->impl_state = mss; mq->send_impl = stream_socket_send_impl; + mq->destroy_impl = &stream_socket_destroy_impl; mq->handlers = handlers; mq->handlers_cls = cls; if (NULL != handlers) @@ -425,14 +466,21 @@ transmit_queued (void *cls, size_t size, struct ServerClientSocketState *state = mq->impl_state; size_t msg_size; + GNUNET_assert (NULL != buf); + + if (NULL != mqm->sent_cb) + { + mqm->sent_cb (mqm->sent_cls); + } + mq->current_msg = NULL; GNUNET_assert (NULL != mqm); - GNUNET_assert (NULL != buf); msg_size = ntohs (mqm->mh->size); GNUNET_assert (size >= msg_size); memcpy (buf, mqm->mh, msg_size); GNUNET_free (mqm); state->th = NULL; + if (NULL != mq->msg_head) { mq->current_msg = mq->msg_head; @@ -448,12 +496,27 @@ transmit_queued (void *cls, size_t size, } + +static void +server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) +{ + struct ServerClientSocketState *state; + + GNUNET_assert (NULL != mq); + state = mq->impl_state; + GNUNET_assert (NULL != state); + GNUNET_SERVER_client_drop (state->client); + GNUNET_free (state); +} + static void server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) { - struct ServerClientSocketState *state = mq->impl_state; + struct ServerClientSocketState *state; int msize; + GNUNET_assert (NULL != mq); + state = mq->impl_state; GNUNET_assert (NULL != state); if (NULL != state->th) @@ -461,8 +524,9 @@ server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Mes GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); return; } + GNUNET_assert (NULL == mq->msg_head); GNUNET_assert (NULL == mq->current_msg); - msize = ntohs (mq->msg_head->mh->size); + msize = ntohs (mqm->mh->size); mq->current_msg = mqm; state->th = GNUNET_SERVER_notify_transmit_ready (state->client, msize, @@ -480,7 +544,10 @@ GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); scss = GNUNET_new (struct ServerClientSocketState); mq->impl_state = scss; + scss->client = client; + GNUNET_SERVER_client_keep (client); mq->send_impl = server_client_send_impl; + mq->destroy_impl = server_client_destroy_impl; return mq; } @@ -502,8 +569,15 @@ connection_client_transmit_queued (void *cls, size_t size, struct ClientConnectionState *state = mq->impl_state; size_t msg_size; - mq->current_msg = NULL; + GNUNET_assert (NULL != mqm); + + if (NULL != mqm->sent_cb) + { + mqm->sent_cb (mqm->sent_cls); + } + + mq->current_msg = NULL; GNUNET_assert (NULL != buf); msg_size = ntohs (mqm->mh->size); GNUNET_assert (size >= msg_size); @@ -515,7 +589,7 @@ connection_client_transmit_queued (void *cls, size_t size, mq->current_msg = mq->msg_head; GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); state->th = - GNUNET_CLIENT_notify_transmit_ready (state->connection, htons (mq->current_msg->mh->size), + GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size), GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, &connection_client_transmit_queued, mq); } @@ -525,6 +599,13 @@ connection_client_transmit_queued (void *cls, size_t size, } + +static void +connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) +{ + GNUNET_free (mq->impl_state); +} + static void connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) @@ -549,6 +630,7 @@ connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, } + /** * Type of a function to call when we receive a message * from the service. @@ -561,6 +643,9 @@ handle_client_message (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_MQ_MessageQueue *mq = cls; + struct ClientConnectionState *state; + + state = mq->impl_state; if (NULL == msg) { @@ -569,6 +654,10 @@ handle_client_message (void *cls, mq->read_error_cb (mq->read_error_cls); return; } + + GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, + GNUNET_TIME_UNIT_FOREVER_REL); + dispatch_message (mq, msg); } @@ -590,6 +679,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti state->connection = connection; mq->impl_state = state; mq->send_impl = connection_client_send_impl; + mq->destroy_impl = connection_client_destroy_impl; if (NULL != handlers) { @@ -626,7 +716,10 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, uint32_t id; if (NULL == mq->assoc_map) + { mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8); + mq->assoc_id = 1; + } id = mq->assoc_id++; GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); @@ -652,6 +745,7 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) if (NULL == mq->assoc_map) return NULL; val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); + GNUNET_assert (NULL != val); GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val); return val; } @@ -671,6 +765,12 @@ void GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) { /* FIXME: destroy all pending messages in the queue */ + + if (NULL != mq->destroy_impl) + { + mq->destroy_impl (mq); + } + GNUNET_free (mq); } diff --git a/src/set/mq.h b/src/set/mq.h index e43ce04b4..42b755163 100644 --- a/src/set/mq.h +++ b/src/set/mq.h @@ -106,7 +106,7 @@ * @param esize extra space to allocate after the message header * @param type type of the message */ -#define GNUNET_MQ_msg_header_extra(mh, esize, type) GNUNET_MQ_msg_ (&mh, sizeof (struct GNUNET_MessageHeader), type) +#define GNUNET_MQ_msg_header_extra(mh, esize, type) GNUNET_MQ_msg_ (&mh, (esize) + sizeof (struct GNUNET_MessageHeader), type) /** @@ -166,6 +166,7 @@ struct GNUNET_MQ_Handler */ typedef void (*GNUNET_MQ_NotifyCallback) (void *cls); + /** * Create a new message for MQ. * diff --git a/src/set/set.h b/src/set/set.h index 87fd6efbf..33e0aafdd 100644 --- a/src/set/set.h +++ b/src/set/set.h @@ -80,36 +80,42 @@ struct AcceptMessage struct GNUNET_MessageHeader header; /** - * request id of the request we want to accept + * Request id that will be sent along with + * results for the accepted operation. + * Chosen by the client. + * Must be 0 if the request has been rejected. */ uint32_t request_id GNUNET_PACKED; /** - * Zero if the client has rejected the request, - * non-zero if it has accepted it + * ID of the incoming request we want to accept / reject. */ - uint32_t accepted GNUNET_PACKED; + uint32_t accept_id GNUNET_PACKED; }; +/** + * A request for an operation with another client. + */ struct RequestMessage { /** - * Type: GNUNET_MESSAGE_TYPE_SET_Request + * Type: GNUNET_MESSAGE_TYPE_SET_Request. */ struct GNUNET_MessageHeader header; /** - * requesting peer + * Identity of the requesting peer. */ struct GNUNET_PeerIdentity peer_id; /** - * request id of the request we want to accept + * ID of the request we want to accept, + * chosen by the service. */ - uint32_t request_id GNUNET_PACKED; + uint32_t accept_id GNUNET_PACKED; - /* rest: inner message */ + /* rest: nested context message */ }; @@ -131,7 +137,7 @@ struct EvaluateMessage struct GNUNET_HashCode app_id; /** - * id of our evaluate + * id of our evaluate, chosen by the client */ uint32_t request_id GNUNET_PACKED; @@ -186,6 +192,8 @@ struct ElementMessage uint16_t element_type GNUNET_PACKED; + uint16_t reserved GNUNET_PACKED; + /* rest: the actual element */ }; diff --git a/src/set/set_api.c b/src/set/set_api.c index 2ea002231..775e390de 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c @@ -30,6 +30,7 @@ #include "gnunet_set_service.h" #include "set.h" #include "mq.h" +#include #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) @@ -49,7 +50,7 @@ struct GNUNET_SET_Handle */ struct GNUNET_SET_Request { - uint32_t request_id; + uint32_t accept_id; int accepted; }; @@ -98,20 +99,23 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) } oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id)); - GNUNET_break (NULL != oh); - if (GNUNET_SCHEDULER_NO_TASK != oh->timeout_task) - { - GNUNET_SCHEDULER_cancel (oh->timeout_task); - oh->timeout_task = GNUNET_SCHEDULER_NO_TASK; - } + GNUNET_assert (NULL != oh); + /* status is not STATUS_OK => there's no attached element, + * and this is the last result message we get */ if (htons (msg->result_status) != GNUNET_SET_STATUS_OK) { + if (GNUNET_SCHEDULER_NO_TASK != oh->timeout_task) + { + GNUNET_SCHEDULER_cancel (oh->timeout_task); + oh->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id)); if (NULL != oh->result_cb) oh->result_cb (oh->result_cls, NULL, htons (msg->result_status)); - GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id)); GNUNET_free (oh); return; } + e.data = &msg[1]; e.size = ntohs (mh->size) - sizeof (struct ResultMessage); e.type = msg->element_type; @@ -133,18 +137,25 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh) struct GNUNET_SET_Request *req; req = GNUNET_new (struct GNUNET_SET_Request); - req->request_id = ntohl (msg->request_id); + req->accept_id = ntohl (msg->accept_id); + /* calling GNUNET_SET_accept in the listen cb will set req->accepted */ lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req); + if (GNUNET_NO == req->accepted) { struct GNUNET_MQ_Message *mqm; struct AcceptMessage *amsg; mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); - amsg->request_id = msg->request_id; + /* no request id, as we refused */ + amsg->request_id = htonl (0); + amsg->accept_id = msg->accept_id; GNUNET_MQ_send (lh->mq, mqm); GNUNET_free (req); } + + /* the accept-case is handled in GNUNET_SET_accept, + * as we have the accept message available there */ } @@ -173,7 +184,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, set = GNUNET_new (struct GNUNET_SET_Handle); set->client = GNUNET_CLIENT_connect ("set", cfg); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set client created\n"); + LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n"); GNUNET_assert (NULL != set->client); set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set); mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); @@ -377,12 +388,9 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) { - GNUNET_MQ_destroy (lh->mq); - lh->mq = NULL; GNUNET_CLIENT_disconnect (lh->client); - lh->client = NULL; - lh->listen_cb = NULL; - lh->listen_cls = NULL; + GNUNET_MQ_destroy (lh->mq); + GNUNET_free (lh); } @@ -420,8 +428,8 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, oh->set = set; mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT); - msg->request_id = htonl (request->request_id); - msg->accepted = 1; + msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, NULL, oh)); + msg->accept_id = htonl (request->accept_id); GNUNET_MQ_send (set->mq, mqm); oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); diff --git a/src/set/strata_estimator.c b/src/set/strata_estimator.c index 60f75f1bc..024bb99c6 100644 --- a/src/set/strata_estimator.c +++ b/src/set/strata_estimator.c @@ -33,6 +33,8 @@ void strata_estimator_write (const struct StrataEstimator *se, void *buf) { int i; + + GNUNET_assert (NULL != se); for (i = 0; i < se->strata_count; i++) { ibf_write_slice (se->strata[i], 0, se->ibf_size, buf); diff --git a/src/set/test_mq.c b/src/set/test_mq.c new file mode 100644 index 000000000..d13c63440 --- /dev/null +++ b/src/set/test_mq.c @@ -0,0 +1,115 @@ +/* + This file is part of GNUnet. + (C) 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file set/test_mq.c + * @brief simple tests for mq + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_testing_lib.h" +#include "mq.h" + + +GNUNET_NETWORK_STRUCT_BEGIN + +struct MyMessage +{ + struct GNUNET_MessageHeader header; + uint32_t x GNUNET_PACKED; +}; + +GNUNET_NETWORK_STRUCT_END + +void +test1 (void) +{ + struct GNUNET_MQ_Message *mqm; + struct MyMessage *mm; + + mm = NULL; + mqm = NULL; + + mqm = GNUNET_MQ_msg (mm, 42); + GNUNET_assert (NULL != mqm); + GNUNET_assert (NULL != mm); + GNUNET_assert (42 == ntohs (mm->header.type)); + GNUNET_assert (sizeof (struct MyMessage) == ntohs (mm->header.size)); +} + + +void +test2 (void) +{ + struct GNUNET_MQ_Message *mqm; + struct MyMessage *mm; + int res; + char *s = "foo"; + + mqm = GNUNET_MQ_msg (mm, 42); + res = GNUNET_MQ_nest (mqm, s, strlen(s)); + GNUNET_assert (GNUNET_OK == res); + res = GNUNET_MQ_nest (mqm, s, strlen(s)); + GNUNET_assert (GNUNET_OK == res); + res = GNUNET_MQ_nest (mqm, NULL, 0); + GNUNET_assert (GNUNET_OK == res); + + GNUNET_assert (strlen (s) * 2 + sizeof (struct MyMessage) == ntohs (mm->header.size)); + + res = GNUNET_MQ_nest_mh (mqm, &mm->header); + GNUNET_assert (GNUNET_OK == res); + GNUNET_assert (2 * (strlen (s) * 2 + sizeof (struct MyMessage)) == ntohs (mm->header.size)); + + res = GNUNET_MQ_nest (mqm, (void *) 0xF00BA, 0xFFF0); + GNUNET_assert (GNUNET_OK != res); + + GNUNET_MQ_discard (mqm); +} + + +void +test3 (void) +{ + struct GNUNET_MQ_Message *mqm; + struct GNUNET_MessageHeader *mh; + + mqm = GNUNET_MQ_msg_header (42); + /* how could the above be checked? */ + + GNUNET_MQ_discard (mqm); + + mqm = GNUNET_MQ_msg_header_extra (mh, 20, 42); + GNUNET_assert (42 == ntohs (mh->type)); + GNUNET_assert (sizeof (struct GNUNET_MessageHeader) + 20 == ntohs (mh->size)); +} + + +int +main (int argc, char **argv) +{ + + GNUNET_log_setup ("test-mq", "INFO", NULL); + test1 (); + test2 (); + test3 (); + + return 0; +} + diff --git a/src/set/test_mq_client.c b/src/set/test_mq_client.c new file mode 100644 index 000000000..ca615d37e --- /dev/null +++ b/src/set/test_mq_client.c @@ -0,0 +1,181 @@ +/* + This file is part of GNUnet. + (C) 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file set/test_mq.c + * @brief tests for mq with connection client + */ +/** + * @file util/test_server_with_client.c + * @brief tests for server.c and client.c, + * specifically disconnect_notify, + * client_get_address and receive_done (resume processing) + */ +#include "platform.h" +#include "gnunet_common.h" +#include "gnunet_scheduler_lib.h" +#include "gnunet_client_lib.h" +#include "gnunet_server_lib.h" +#include "gnunet_time_lib.h" +#include "mq.h" + +#define PORT 23336 + +#define MY_TYPE 128 + + +static struct GNUNET_SERVER_Handle *server; + +static struct GNUNET_CLIENT_Connection *client; + +static struct GNUNET_CONFIGURATION_Handle *cfg; + +static int ok; + +static int notify = GNUNET_NO; + +static int received = 0; + + +static void +recv_cb (void *cls, struct GNUNET_SERVER_Client *argclient, + const struct GNUNET_MessageHeader *message) +{ + received++; + + printf ("received\n"); + + + if ((received == 2) && (GNUNET_YES == notify)) + { + printf ("done\n"); + GNUNET_SERVER_receive_done (argclient, GNUNET_NO); + return; + } + + GNUNET_SERVER_receive_done (argclient, GNUNET_YES); +} + + +static void +clean_up (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + GNUNET_SERVER_destroy (server); + server = NULL; + GNUNET_CONFIGURATION_destroy (cfg); + cfg = NULL; +} + + +/** + * Functions with this signature are called whenever a client + * is disconnected on the network level. + * + * @param cls closure + * @param client identification of the client + */ +static void +notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client) +{ + if (client == NULL) + return; + ok = 0; + GNUNET_SCHEDULER_add_now (&clean_up, NULL); +} + + +static struct GNUNET_SERVER_MessageHandler handlers[] = { + {&recv_cb, NULL, MY_TYPE, sizeof (struct GNUNET_MessageHeader)}, + {NULL, NULL, 0, 0} +}; + +void send_cb (void *cls) +{ + printf ("notify sent\n"); + notify = GNUNET_YES; +} + +void test_mq (struct GNUNET_CLIENT_Connection *client) +{ + struct GNUNET_MQ_MessageQueue *mq; + struct GNUNET_MQ_Message *mqm; + + /* FIXME: test handling responses */ + mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL); + + mqm = GNUNET_MQ_msg_header (MY_TYPE); + GNUNET_MQ_send (mq, mqm); + + mqm = GNUNET_MQ_msg_header (MY_TYPE); + GNUNET_MQ_notify_sent (mqm, send_cb, NULL); + GNUNET_MQ_send (mq, mqm); + + /* FIXME: add a message that will be canceled */ +} + + +static void +task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct sockaddr_in sa; + struct sockaddr *sap[2]; + socklen_t slens[2]; + + sap[0] = (struct sockaddr *) &sa; + slens[0] = sizeof (sa); + sap[1] = NULL; + slens[1] = 0; + memset (&sa, 0, sizeof (sa)); +#if HAVE_SOCKADDR_IN_SIN_LEN + sa.sin_len = sizeof (sa); +#endif + sa.sin_family = AF_INET; + sa.sin_port = htons (PORT); + server = + GNUNET_SERVER_create (NULL, NULL, sap, slens, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_MILLISECONDS, 250), GNUNET_NO); + GNUNET_assert (server != NULL); + handlers[0].callback_cls = cls; + GNUNET_SERVER_add_handlers (server, handlers); + GNUNET_SERVER_disconnect_notify (server, ¬ify_disconnect, cls); + cfg = GNUNET_CONFIGURATION_create (); + GNUNET_CONFIGURATION_set_value_number (cfg, "test", "PORT", PORT); + GNUNET_CONFIGURATION_set_value_string (cfg, "test", "HOSTNAME", "localhost"); + GNUNET_CONFIGURATION_set_value_string (cfg, "resolver", "HOSTNAME", + "localhost"); + client = GNUNET_CLIENT_connect ("test", cfg); + GNUNET_assert (client != NULL); + + test_mq (client); +} + + +int +main (int argc, char *argv[]) +{ + GNUNET_log_setup ("test-mq-client", + "INFO", + NULL); + ok = 1; + GNUNET_SCHEDULER_run (&task, NULL); + return ok; +} + diff --git a/src/set/test_set.conf b/src/set/test_set.conf index e69de29bb..c1d5a0f93 100644 --- a/src/set/test_set.conf +++ b/src/set/test_set.conf @@ -0,0 +1,23 @@ +[set] +AUTOSTART = YES +PORT = 2106 +HOSTNAME = localhost +HOME = $SERVICEHOME +BINARY = gnunet-service-set +#PREFIX = gdbserver :12345 +#PREFIX = valgrind --leak-check=full +ACCEPT_FROM = 127.0.0.1; +ACCEPT_FROM6 = ::1; +UNIXPATH = /tmp/gnunet-service-set.sock +UNIX_MATCH_UID = YES +UNIX_MATCH_GID = YES +OPTIONS = -L INFO + + +[transport] +OPTIONS = -LERROR + + +[testbed] +OVERLAY_TOPOLOGY = CLIQUE + diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c index 0753fd139..0ab02cad7 100644 --- a/src/set/test_set_api.c +++ b/src/set/test_set_api.c @@ -28,6 +28,99 @@ #include "gnunet_set_service.h" +static struct GNUNET_PeerIdentity local_id; +static struct GNUNET_HashCode app_id; +static struct GNUNET_SET_Handle *set1; +static struct GNUNET_SET_Handle *set2; +static struct GNUNET_SET_ListenHandle *listen_handle; +const static struct GNUNET_CONFIGURATION_Handle *config; + + +static void +result_cb_set1 (void *cls, struct GNUNET_SET_Element *element, + enum GNUNET_SET_Status status) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result (set 1)\n"); +} + + +static void +result_cb_set2 (void *cls, struct GNUNET_SET_Element *element, + enum GNUNET_SET_Status status) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result (set 2)\n"); +} + + +static void +listen_cb (void *cls, + const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_MessageHeader *context_msg, + struct GNUNET_SET_Request *request) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); + GNUNET_SET_accept (request, set2, GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); +} + + +/** + * Start the set operation. + * + * @param cls closure, unused + */ +static void +start (void *cls) +{ + listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, + &app_id, listen_cb, NULL); + GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, + GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED, + result_cb_set1, NULL); +} + + +/** + * Initialize the second set, continue + * + * @param cls closure, unused + */ +static void +init_set2 (void *cls) +{ + struct GNUNET_SET_Element element; + + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n"); + + element.data = "hello"; + element.size = strlen(element.data); + GNUNET_SET_add_element (set2, &element, NULL, NULL); + element.data = "quux"; + element.size = strlen(element.data); + GNUNET_SET_add_element (set2, &element, start, NULL); +} + + +/** + * Initialize the first set, continue. + */ +static void +init_set1 (void) +{ + struct GNUNET_SET_Element element; + + element.data = "hello"; + element.size = strlen(element.data); + GNUNET_SET_add_element (set1, &element, NULL, NULL); + element.data = "bar"; + element.size = strlen(element.data); + GNUNET_SET_add_element (set1, &element, init_set2, NULL); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n"); +} + + /** * Signature of the 'main' function for a (single-peer) testcase that * is run using 'GNUNET_TESTING_peer_run'. @@ -41,11 +134,15 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_TESTING_Peer *peer) { - struct GNUNET_SET_Handle *set1; - struct GNUNET_SET_Handle *set2; + static const char* app_str = "gnunet-set"; + + config = cfg; + GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id); + GNUNET_CRYPTO_get_host_identity (cfg, &local_id); set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); + init_set1 (); } int -- cgit v1.2.3