summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-05-15 10:48:55 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-05-15 10:48:55 +0000
commit6f54b50858457dfa2b5f0b519fbf230e1119c6b2 (patch)
tree5f84bfa599cb50522999cad892344e2fecbfa963 /src
parent6625c27a83831b61a80683f4385b6a90b9a45b31 (diff)
test cases for mq, set works
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_protocols.h2
-rw-r--r--src/set/Makefile.am20
-rw-r--r--src/set/gnunet-service-set.c180
-rw-r--r--src/set/gnunet-service-set.h28
-rw-r--r--src/set/gnunet-service-set_union.c262
-rw-r--r--src/set/gnunet-set-bug.c2
-rw-r--r--src/set/gnunet-set.c115
-rw-r--r--src/set/ibf.c1
-rw-r--r--src/set/mq.c130
-rw-r--r--src/set/mq.h3
-rw-r--r--src/set/set.h28
-rw-r--r--src/set/set_api.c44
-rw-r--r--src/set/strata_estimator.c2
-rw-r--r--src/set/test_mq.c115
-rw-r--r--src/set/test_mq_client.c181
-rw-r--r--src/set/test_set.conf23
-rw-r--r--src/set/test_set_api.c101
17 files changed, 1093 insertions, 144 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 33e97b80c..a80d4afe7 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1779,13 +1779,11 @@ extern "C"
*/
#define GNUNET_MESSAGE_TYPE_SET_ADD 573
-
/**
* Remove element from set
*/
#define GNUNET_MESSAGE_TYPE_SET_REMOVE 574
-
/**
* Listen for operation requests
*/
diff --git a/src/set/Makefile.am b/src/set/Makefile.am
index 2de531ce3..a609840b1 100644
--- a/src/set/Makefile.am
+++ b/src/set/Makefile.am
@@ -69,7 +69,7 @@ libgnunetset_la_LDFLAGS = \
$(GN_LIB_LDFLAGS)
check_PROGRAMS = \
- test_set_api
+ test_set_api test_mq test_mq_client
if ENABLE_TEST_RUN
TESTS = $(check_PROGRAMS)
@@ -84,6 +84,24 @@ test_set_api_LDADD = \
test_set_api_DEPENDENCIES = \
libgnunetset.la
+
+test_mq_SOURCES = \
+ test_mq.c \
+ mq.c
+test_mq_LDADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/stream/libgnunetstream.la
+test_mq_CFLAGS = $(AM_CFLAGS)
+
+
+test_mq_client_SOURCES = \
+ test_mq_client.c \
+ mq.c
+test_mq_client_LDADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/stream/libgnunetstream.la
+test_mq_client_CFLAGS = $(AM_CFLAGS)
+
EXTRA_DIST = \
test_set.conf
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index aea198a61..3ed896775 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -72,24 +72,11 @@ static struct Incoming *incoming_head;
static struct Incoming *incoming_tail;
/**
- * Counter for allocating unique request IDs for clients.
- * Used to identify incoming requests from remote peers.
+ * Counter for allocating unique IDs for clients,
+ * used to identify incoming operation requests from remote peers,
+ * that the client can choose to accept or refuse.
*/
-static uint32_t request_id = 1;
-
-
-/**
- * Disconnect a client and free all resources
- * that the client allocated (e.g. Sets or Listeners)
- *
- * @param client the client to disconnect
- */
-void
-_GSS_client_disconnect (struct GNUNET_SERVER_Client *client)
-{
- /* FIXME: clean up any data structures belonging to the client */
- GNUNET_SERVER_client_disconnect (client);
-}
+static uint32_t accept_id = 1;
/**
@@ -140,13 +127,84 @@ get_incoming (uint32_t id)
{
struct Incoming *incoming;
for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
- if (incoming->request_id == id)
+ if (incoming->accept_id == id)
return incoming;
return NULL;
}
/**
+ * Destroy a listener, free all resources associated with it.
+ *
+ * @param listener listener to destroy
+ */
+static void
+destroy_listener (struct Listener *listener)
+{
+ if (NULL != listener->client_mq)
+ {
+ GNUNET_MQ_destroy (listener->client_mq);
+ listener->client_mq = NULL;
+ }
+ if (NULL != listener->client)
+ {
+ GNUNET_SERVER_client_drop (listener->client);
+ listener->client = NULL;
+ }
+
+ GNUNET_CONTAINER_DLL_remove (listeners_head, listeners_tail, listener);
+ GNUNET_free (listener);
+}
+
+
+/**
+ * Destroy a set, and free all resources associated with it.
+ *
+ * @param set the set to destroy
+ */
+static void
+destroy_set (struct Set *set)
+{
+ switch (set->operation)
+ {
+ case GNUNET_SET_OPERATION_INTERSECTION:
+ GNUNET_assert (0);
+ break;
+ case GNUNET_SET_OPERATION_UNION:
+ _GSS_union_set_destroy (set);
+ break;
+ default:
+ GNUNET_assert (0);
+ break;
+ }
+ GNUNET_CONTAINER_DLL_remove (sets_head, sets_tail, set);
+ GNUNET_free (set);
+}
+
+
+/**
+ * Clean up after a client after it is
+ * disconnected (either by us or by itself)
+ *
+ * @param cls closure, unused
+ * @param client the client to clean up after
+ */
+void
+handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+{
+ struct Set *set;
+ struct Listener *listener;
+
+ set = get_set (client);
+ if (NULL != set)
+ destroy_set (set);
+ listener = get_listener (client);
+ if (NULL != listener)
+ destroy_listener (listener);
+}
+
+
+/**
* Destroy an incoming request from a remote peer
*
* @param incoming remote request to destroy
@@ -186,16 +244,16 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
struct Listener *listener;
const struct GNUNET_MessageHeader *context_msg;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got operation request\n");
-
if (ntohs (mh->size) < sizeof *msg)
{
+ /* message is to small for its type */
GNUNET_break (0);
destroy_incoming (incoming);
return;
}
else if (ntohs (mh->size) == sizeof *msg)
{
+ /* there is no context message */
context_msg = NULL;
}
else
@@ -209,13 +267,17 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
return;
}
}
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n",
+ ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
+
/* find the appropriate listener */
for (listener = listeners_head;
listener != NULL;
listener = listener->next)
{
if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) ||
- (htons (msg->operation) != listener->operation) )
+ (ntohs (msg->operation) != listener->operation) )
continue;
mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST);
if (GNUNET_OK != GNUNET_MQ_nest_mh (mqm, context_msg))
@@ -225,11 +287,15 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
GNUNET_break (0);
return;
}
- incoming->request_id = request_id++;
- cmsg->request_id = htonl (incoming->request_id);
+ incoming->accept_id = accept_id++;
+ cmsg->accept_id = htonl (incoming->accept_id);
GNUNET_MQ_send (listener->client_mq, mqm);
return;
}
+
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "set operation request from peer failed: "
+ "no set with matching application ID and operation type\n");
}
@@ -248,7 +314,8 @@ handle_client_create (void *cls,
struct SetCreateMessage *msg = (struct SetCreateMessage *) m;
struct Set *set;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new set created\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n",
+ ntohs (msg->operation));
if (NULL != get_set (client))
{
@@ -276,9 +343,9 @@ handle_client_create (void *cls,
}
set->client = client;
+ GNUNET_SERVER_client_keep (client);
set->client_mq = GNUNET_MQ_queue_for_server_client (client);
GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set);
-
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -297,19 +364,22 @@ handle_client_listen (void *cls,
{
struct ListenMessage *msg = (struct ListenMessage *) m;
struct Listener *listener;
-
+
if (NULL != get_listener (client))
{
GNUNET_break (0);
GNUNET_SERVER_client_disconnect (client);
return;
}
-
listener = GNUNET_new (struct Listener);
+ listener->client = client;
+ GNUNET_SERVER_client_keep (client);
+ listener->client_mq = GNUNET_MQ_queue_for_server_client (client);
listener->app_id = msg->app_id;
- listener->operation = msg->operation;
+ listener->operation = ntohs (msg->operation);
GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
-
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new listener created (op %u, app %s)\n",
+ listener->operation, GNUNET_h2s (&listener->app_id));
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -333,13 +403,13 @@ handle_client_remove (void *cls,
if (NULL == set)
{
GNUNET_break (0);
- _GSS_client_disconnect (client);
+ GNUNET_SERVER_client_disconnect (client);
return;
}
switch (set->operation)
{
case GNUNET_SET_OPERATION_UNION:
- _GSS_union_add ((struct ElementMessage *) m, set);
+ _GSS_union_remove ((struct ElementMessage *) m, set);
case GNUNET_SET_OPERATION_INTERSECTION:
/* FIXME: cfuchs */
break;
@@ -371,13 +441,13 @@ handle_client_add (void *cls,
if (NULL == set)
{
GNUNET_break (0);
- _GSS_client_disconnect (client);
+ GNUNET_SERVER_client_disconnect (client);
return;
}
switch (set->operation)
{
case GNUNET_SET_OPERATION_UNION:
- _GSS_union_remove ((struct ElementMessage *) m, set);
+ _GSS_union_add ((struct ElementMessage *) m, set);
case GNUNET_SET_OPERATION_INTERSECTION:
/* FIXME: cfuchs */
break;
@@ -408,7 +478,7 @@ handle_client_evaluate (void *cls,
if (NULL == set)
{
GNUNET_break (0);
- _GSS_client_disconnect (client);
+ GNUNET_SERVER_client_disconnect (client);
return;
}
@@ -481,22 +551,30 @@ handle_client_accept (void *cls,
struct Incoming *incoming;
struct AcceptMessage *msg = (struct AcceptMessage *) mh;
- set = get_set (client);
- if (NULL == set)
+ incoming = get_incoming (ntohl (msg->accept_id));
+
+ if (NULL == incoming)
{
GNUNET_break (0);
- _GSS_client_disconnect (client);
+ GNUNET_SERVER_client_disconnect (client);
return;
}
- incoming = get_incoming (ntohl (msg->request_id));
+ if (0 == ntohl (msg->request_id))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
+ destroy_incoming (incoming);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ return;
+ }
+
+ set = get_set (client);
- if ( (NULL == incoming) ||
- (incoming->operation != set->operation) )
+ if (NULL == set)
{
GNUNET_break (0);
- _GSS_client_disconnect (client);
+ GNUNET_SERVER_client_disconnect (client);
return;
}
@@ -513,8 +591,10 @@ handle_client_accept (void *cls,
GNUNET_assert (0);
break;
}
- /* FIXME: destroy incoming */
+ /* note: _GSS_*_accept has to make sure the socket and mq are set to NULL,
+ * otherwise they will be destroyed and disconnected */
+ destroy_incoming (incoming);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -574,6 +654,21 @@ shutdown_task (void *cls,
stream_listen_socket = NULL;
}
+ while (NULL != incoming_head)
+ {
+ destroy_incoming (incoming_head);
+ }
+
+ while (NULL != listeners_head)
+ {
+ destroy_listener (listeners_head);
+ }
+
+ while (NULL != sets_head)
+ {
+ destroy_set (sets_head);
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
}
@@ -604,6 +699,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
configuration = cfg;
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
+ GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
GNUNET_SERVER_add_handlers (server, server_handlers);
stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET,
&stream_listen_cb, NULL,
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index a5a53671c..cc28e9701 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -109,8 +109,8 @@ struct Listener
struct Listener *prev;
/**
- * Client that owns the set.
- * Only one client may own a set.
+ * Client that owns the listener.
+ * Only one client may own a listener.
*/
struct GNUNET_SERVER_Client *client;
@@ -188,9 +188,10 @@ struct Incoming
/**
* Unique request id for the request from
- * a remote peer.
+ * a remote peer, sent to the client with will
+ * accept or reject the request.
*/
- uint32_t request_id;
+ uint32_t accept_id;
};
@@ -201,16 +202,6 @@ extern const struct GNUNET_CONFIGURATION_Handle *configuration;
/**
- * Disconnect a client and free all resources
- * that the client allocated (e.g. Sets or Listeners)
- *
- * @param client the client to disconnect
- */
-void
-_GSS_client_disconnect (struct GNUNET_SERVER_Client *client);
-
-
-/**
* Create a new set supporting the union operation
*
* @return the newly created set
@@ -252,6 +243,15 @@ _GSS_union_remove (struct ElementMessage *m, struct Set *set);
/**
+ * Destroy a set that supports the union operation
+ *
+ * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
+ */
+void
+_GSS_union_set_destroy (struct Set *set);
+
+
+/**
* Accept an union operation request from a remote peer
*
* @param m the accept message from the client
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index efedbcef6..694fb6056 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -135,11 +135,6 @@ struct UnionEvaluateOperation
struct GNUNET_MQ_MessageQueue *mq;
/**
- * Type of this operation
- */
- enum GNUNET_SET_OperationType operation;
-
- /**
* Request ID to multiplex set operations to
* the client inhabiting the set.
*/
@@ -330,6 +325,45 @@ struct UnionState
};
+
+/**
+ * Iterator over hash map entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES if we should continue to
+ * iterate,
+ * GNUNET_NO if not.
+ */
+static int
+destroy_elements_iterator (void *cls,
+ const struct GNUNET_HashCode * key,
+ void *value)
+{
+ struct ElementEntry *ee = value;
+
+ GNUNET_free (ee);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Destroy the elements belonging to a union set.
+ *
+ * @param us union state that contains the elements
+ */
+static void
+destroy_elements (struct UnionState *us)
+{
+ if (NULL == us->elements)
+ return;
+ GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (us->elements);
+ us->elements = NULL;
+}
+
+
/**
* Destroy a union operation, and free all resources
* associated with it.
@@ -339,6 +373,38 @@ struct UnionState
static void
destroy_union_operation (struct UnionEvaluateOperation *eo)
{
+ if (NULL != eo->mq)
+ {
+ GNUNET_MQ_destroy (eo->mq);
+ eo->mq = NULL;
+ }
+ if (NULL != eo->socket)
+ {
+ GNUNET_STREAM_close (eo->socket);
+ eo->socket = NULL;
+ }
+ if (NULL != eo->remote_ibf)
+ {
+ ibf_destroy (eo->remote_ibf);
+ eo->remote_ibf = NULL;
+ }
+ if (NULL != eo->local_ibf)
+ {
+ ibf_destroy (eo->local_ibf);
+ eo->local_ibf = NULL;
+ }
+ if (NULL != eo->se)
+ {
+ strata_estimator_destroy (eo->se);
+ eo->se = NULL;
+ }
+ if (NULL != eo->key_to_element)
+ {
+ GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
+ eo->key_to_element = NULL;
+ }
+
+
GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
eo->set->state.u->ops_tail,
eo);
@@ -361,7 +427,7 @@ fail_union_operation (struct UnionEvaluateOperation *eo)
mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
- msg->request_id = eo->request_id;
+ msg->request_id = htonl (eo->request_id);
GNUNET_MQ_send (eo->set->client_mq, mqm);
destroy_union_operation (eo);
}
@@ -405,12 +471,12 @@ send_operation_request (struct UnionEvaluateOperation *eo)
if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size)))
{
/* the context message is too large */
- _GSS_client_disconnect (eo->set->client);
- GNUNET_MQ_discard (mqm);
GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (eo->set->client);
+ GNUNET_MQ_discard (mqm);
return;
}
- msg->operation = eo->operation;
+ msg->operation = htons (GNUNET_SET_OPERATION_UNION);
msg->app_id = eo->app_id;
GNUNET_MQ_send (eo->mq, mqm);
@@ -547,7 +613,7 @@ prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
{
unsigned int len;
len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
- eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len);
+ eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
init_key_to_element_iterator, eo);
}
@@ -573,6 +639,8 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
prepare_ibf (eo, 1<<ibf_order);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order);
+
ibf = eo->local_ibf;
while (buckets_sent < (1 << ibf_order))
@@ -588,7 +656,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
- msg->order = htons (ibf_order);
+ msg->order = ibf_order;
msg->offset = htons (buckets_sent);
ibf_write_slice (ibf, buckets_sent,
buckets_in_message, &msg[1]);
@@ -654,7 +722,6 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
struct StrataEstimator *remote_se;
int diff;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n");
if (eo->phase != PHASE_EXPECT_SE)
{
@@ -667,6 +734,7 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
strata_estimator_read (&mh[1], remote_se);
GNUNET_assert (NULL != eo->se);
diff = strata_estimator_difference (remote_se, eo->se);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
strata_estimator_destroy (remote_se);
strata_estimator_destroy (eo->se);
eo->se = NULL;
@@ -708,6 +776,7 @@ send_element_iterator (void *cls,
continue;
}
GNUNET_MQ_send (eo->mq, mqm);
+ ke = ke->next_colliding;
}
return GNUNET_NO;
}
@@ -731,7 +800,6 @@ send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key
}
-
/**
* Decode which elements are missing on each side, and
* send the appropriate elemens and requests
@@ -758,11 +826,22 @@ decode_and_send (struct UnionEvaluateOperation *eo)
res = ibf_decode (diff_ibf, &side, &key);
if (GNUNET_SYSERR == res)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n",
- diff_ibf->size * 2);
- send_ibf (eo, diff_ibf->size * 2);
- ibf_destroy (diff_ibf);
- return;
+ int next_order;
+ next_order = 0;
+ while (1<<next_order < diff_ibf->size)
+ next_order++;
+ next_order++;
+ if (next_order <= MAX_IBF_ORDER)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n",
+ 1<<next_order);
+ send_ibf (eo, next_order);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "set union failed: reached ibf limit\n");
+ }
+ break;
}
if (GNUNET_NO == res)
{
@@ -771,7 +850,7 @@ decode_and_send (struct UnionEvaluateOperation *eo)
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
GNUNET_MQ_send (eo->mq, mqm);
- return;
+ break;
}
if (1 == side)
{
@@ -790,6 +869,7 @@ decode_and_send (struct UnionEvaluateOperation *eo)
GNUNET_MQ_send (eo->mq, mqm);
}
}
+ ibf_destroy (diff_ibf);
}
@@ -811,6 +891,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
{
eo->phase = PHASE_EXPECT_IBF_CONT;
GNUNET_assert (NULL == eo->remote_ibf);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order);
eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
if (0 != ntohs (msg->offset))
{
@@ -825,6 +906,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
{
GNUNET_break (0);
fail_union_operation (eo);
+ return;
}
}
@@ -834,13 +916,16 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
{
GNUNET_break (0);
fail_union_operation (eo);
+ return;
}
-
+
ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
eo->ibf_buckets_received += buckets_in_message;
if (eo->ibf_buckets_received == eo->remote_ibf->size)
{
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
eo->phase = PHASE_EXPECT_ELEMENTS;
decode_and_send (eo);
}
@@ -848,7 +933,8 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
/**
- * Send an element to the client of the operations's set.
+ * Send a result message to the client indicating
+ * that there is a new element.
*
* @param eo union operation
* @param element element to send
@@ -862,6 +948,8 @@ send_client_element (struct UnionEvaluateOperation *eo,
GNUNET_assert (0 != eo->request_id);
mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
+ rm->result_status = htons (GNUNET_SET_STATUS_OK);
+ rm->request_id = htonl (eo->request_id);
if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
{
GNUNET_MQ_discard (mqm);
@@ -869,7 +957,46 @@ send_client_element (struct UnionEvaluateOperation *eo,
return;
}
- GNUNET_MQ_send (eo->mq, mqm);
+ GNUNET_MQ_send (eo->set->client_mq, mqm);
+}
+
+
+/**
+ * Callback used for notifications
+ *
+ * @param cls closure
+ */
+static void
+client_done_sent_cb (void *cls)
+{
+ //struct UnionEvaluateOperation *eo = cls;
+ /* FIXME: destroy eo */
+}
+
+
+/**
+ * Send a result message to the client indicating
+ * that the operation is over.
+ * After the result done message has been sent to the client,
+ * destroy the evaluate operation.
+ *
+ * @param eo union operation
+ * @param element element to send
+ */
+static void
+send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
+{
+ struct GNUNET_MQ_Message *mqm;
+ struct ResultMessage *rm;
+
+ GNUNET_assert (0 != eo->request_id);
+ mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
+ rm->request_id = htonl (eo->request_id);
+ rm->result_status = htons (GNUNET_SET_STATUS_DONE);
+ GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo);
+ GNUNET_MQ_send (eo->set->client_mq, mqm);
+
+ /* FIXME: destroy the eo */
}
@@ -886,6 +1013,8 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
struct ElementEntry *ee;
uint16_t element_size;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
+
if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
(eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
{
@@ -920,8 +1049,8 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
/* look up elements and send them */
if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
{
- fail_union_operation (eo);
GNUNET_break (0);
+ fail_union_operation (eo);
return;
}
@@ -929,8 +1058,8 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
{
- fail_union_operation (eo);
GNUNET_break (0);
+ fail_union_operation (eo);
return;
}
@@ -944,6 +1073,20 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
/**
+ * Callback used for notifications
+ *
+ * @param cls closure
+ */
+static void
+peer_done_sent_cb (void *cls)
+{
+ struct UnionEvaluateOperation *eo = cls;
+
+ send_client_done_and_destroy (eo);
+}
+
+
+/**
* Handle a done message from a remote peer
*
* @param cls the union operation
@@ -959,15 +1102,18 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
/* we got all requests, but still have to send our elements as response */
struct GNUNET_MQ_Message *mqm;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
eo->phase = PHASE_FINISHED;
mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
+ GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
GNUNET_MQ_send (eo->mq, mqm);
return;
}
if (eo->phase == PHASE_EXPECT_ELEMENTS)
{
- /* it's all over! */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
eo->phase = PHASE_FINISHED;
+ send_client_done_and_destroy (eo);
return;
}
GNUNET_break (0);
@@ -1026,19 +1172,27 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
{
struct UnionEvaluateOperation *eo;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n");
-
eo = GNUNET_new (struct UnionEvaluateOperation);
eo->peer = m->peer;
eo->set = set;
- eo->request_id = htons(m->request_id);
+ eo->request_id = htonl (m->request_id);
+ GNUNET_assert (0 != eo->request_id);
eo->se = strata_estimator_dup (set->state.u->se);
eo->salt = ntohs (m->salt);
eo->app_id = m->app_id;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation, (app %s)\n",
+ GNUNET_h2s (&eo->app_id));
+
eo->socket =
GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
stream_open_cb, eo,
GNUNET_STREAM_OPTION_END);
+
+
+ GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
+ eo->set->state.u->ops_tail,
+ eo);
/* the stream open callback will kick off the operation */
}
@@ -1056,18 +1210,30 @@ _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
{
struct UnionEvaluateOperation *eo;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
+
eo = GNUNET_new (struct UnionEvaluateOperation);
eo->generation_created = set->state.u->current_generation++;
eo->set = set;
eo->peer = incoming->peer;
eo->salt = ntohs (incoming->salt);
- eo->request_id = m->request_id;
+ GNUNET_assert (0 != ntohl (m->request_id));
+ eo->request_id = ntohl (m->request_id);
eo->se = strata_estimator_dup (set->state.u->se);
eo->set = set;
eo->mq = incoming->mq;
+ /* transfer ownership of mq and socket from incoming to eo */
+ incoming->mq = NULL;
+ eo->socket = incoming->socket;
+ incoming->socket = NULL;
/* the peer's socket is now ours, we'll receive all messages */
GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
- /* kick of the operation */
+
+ GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
+ eo->set->state.u->ops_tail,
+ eo);
+
+ /* kick off the operation */
send_strata_estimator (eo);
}
@@ -1082,7 +1248,7 @@ _GSS_union_set_create (void)
{
struct Set *set;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n");
set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
set->state.u = (struct UnionState *) &set[1];
@@ -1109,6 +1275,8 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set)
struct ElementEntry *ee_dup;
uint16_t element_size;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
+
GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
element_size = ntohs (m->header.size) - sizeof *m;
ee = GNUNET_malloc (element_size + sizeof *ee);
@@ -1131,6 +1299,38 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set)
/**
+ * Destroy a set that supports the union operation
+ *
+ * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
+ */
+void
+_GSS_union_set_destroy (struct Set *set)
+{
+ GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
+ if (NULL != set->client)
+ {
+ GNUNET_SERVER_client_drop (set->client);
+ set->client = NULL;
+ }
+ if (NULL != set->client_mq)
+ {
+ GNUNET_MQ_destroy (set->client_mq);
+ set->client_mq = NULL;
+ }
+
+ if (NULL != set->state.u->se)
+ {
+ strata_estimator_destroy (set->state.u->se);
+ set->state.u->se = NULL;
+ }
+
+ destroy_elements (set->state.u);
+
+ while (NULL != set->state.u->ops_head)
+ destroy_union_operation (set->state.u->ops_head);
+}
+
+/**
* Remove the element given in the element message from the set.
* Only marks the element as removed, so that older set operations can still exchange it.
*
diff --git a/src/set/gnunet-set-bug.c b/src/set/gnunet-set-bug.c
index edcd8b561..112def7d7 100644
--- a/src/set/gnunet-set-bug.c
+++ b/src/set/gnunet-set-bug.c
@@ -113,6 +113,8 @@ run (void *cls, char *const *args,
cfg = GNUNET_CONFIGURATION_dup (cfg2);
GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "I am Peer %s\n", GNUNET_h2s (&local_id.hashPubKey));
+
listen_socket = GNUNET_STREAM_listen (cfg,
GNUNET_APPLICATION_TYPE_SET,
&listen_cb,
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c
index c49b60dfd..d665fce11 100644
--- a/src/set/gnunet-set.c
+++ b/src/set/gnunet-set.c
@@ -36,6 +36,53 @@ static struct GNUNET_HashCode app_id;
static struct GNUNET_SET_Handle *set1;
static struct GNUNET_SET_Handle *set2;
static struct GNUNET_SET_ListenHandle *listen_handle;
+const static struct GNUNET_CONFIGURATION_Handle *config;
+
+int num_done;
+
+
+static void
+result_cb_set1 (void *cls, struct GNUNET_SET_Element *element,
+ enum GNUNET_SET_Status status)
+{
+ switch (status)
+ {
+ case GNUNET_SET_STATUS_OK:
+ printf ("set 1: got element\n");
+ break;
+ case GNUNET_SET_STATUS_FAILURE:
+ printf ("set 1: failure\n");
+ break;
+ case GNUNET_SET_STATUS_DONE:
+ printf ("set 1: done\n");
+ GNUNET_SET_destroy (set1);
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+static void
+result_cb_set2 (void *cls, struct GNUNET_SET_Element *element,
+ enum GNUNET_SET_Status status)
+{
+ switch (status)
+ {
+ case GNUNET_SET_STATUS_OK:
+ printf ("set 2: got element\n");
+ break;
+ case GNUNET_SET_STATUS_FAILURE:
+ printf ("set 2: failure\n");
+ break;
+ case GNUNET_SET_STATUS_DONE:
+ printf ("set 2: done\n");
+ GNUNET_SET_destroy (set2);
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+}
static void
@@ -45,13 +92,67 @@ listen_cb (void *cls,
struct GNUNET_SET_Request *request)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
+ GNUNET_SET_listen_cancel (listen_handle);
+
+ GNUNET_SET_accept (request, set2, GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
}
+
+/**
+ * Start the set operation.
+ *
+ * @param cls closure, unused
+ */
static void
-result_cb (void *cls, struct GNUNET_SET_Element *element,
- enum GNUNET_SET_Status status)
+start (void *cls)
+{
+ listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
+ &app_id, listen_cb, NULL);
+ GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42,
+ GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED,
+ result_cb_set1, NULL);
+}
+
+
+/**
+ * Initialize the second set, continue
+ *
+ * @param cls closure, unused
+ */
+static void
+init_set2 (void *cls)
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result\n");
+ struct GNUNET_SET_Element element;
+
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
+
+ element.data = "hello";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set2, &element, NULL, NULL);
+ element.data = "quux";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set2, &element, start, NULL);
+}
+
+
+/**
+ * Initialize the first set, continue.
+ */
+static void
+init_set1 (void)
+{
+ struct GNUNET_SET_Element element;
+
+ element.data = "hello";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set1, &element, NULL, NULL);
+ element.data = "bar";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set1, &element, init_set2, NULL);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
}
@@ -69,6 +170,8 @@ run (void *cls, char *const *args,
const struct GNUNET_CONFIGURATION_Handle *cfg)
{
static const char* app_str = "gnunet-set";
+
+ config = cfg;
GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id);
@@ -76,12 +179,8 @@ run (void *cls, char *const *args,
set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
- listen_handle = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
- &app_id, listen_cb, NULL);
- GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42,
- GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED,
- result_cb, NULL);
+ init_set1 ();
}
diff --git a/src/set/ibf.c b/src/set/ibf.c
index 739b97339..383ce3daf 100644
--- a/src/set/ibf.c
+++ b/src/set/ibf.c
@@ -280,6 +280,7 @@ ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct Invertib
struct IBF_KeyHash *key_hash_src;
struct IBF_Count *count_src;
+ GNUNET_assert (count > 0);
GNUNET_assert (start + count <= ibf->size);
/* copy keys */
diff --git a/src/set/mq.c b/src/set/mq.c
index 3a9e614e9..0ced014dd 100644
--- a/src/set/mq.c
+++ b/src/set/mq.c
@@ -192,13 +192,22 @@ static void
dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
{
const struct GNUNET_MQ_Handler *handler;
+ int handled = GNUNET_NO;
handler = mq->handlers;
if (NULL == handler)
return;
for (; NULL != handler->cb; handler++)
+ {
if (handler->type == ntohs (mh->type))
+ {
handler->cb (mq->handlers_cls, mh);
+ handled = GNUNET_YES;
+ }
+ }
+
+ if (GNUNET_NO == handled)
+ LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
}
@@ -220,6 +229,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
void
GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
{
+ GNUNET_assert (NULL != mq);
mq->send_impl (mq, mqm);
}
@@ -228,6 +238,7 @@ struct GNUNET_MQ_Message *
GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
{
struct GNUNET_MQ_Message *mqm;
+
mqm = GNUNET_malloc (sizeof *mqm + size);
mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
mqm->mh->size = htons (size);
@@ -245,16 +256,18 @@ GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
size_t new_size;
size_t old_size;
+ GNUNET_assert (NULL != mqmp);
+ /* there's no data to append => do nothing */
if (NULL == data)
return GNUNET_OK;
- GNUNET_assert (NULL != mqmp);
old_size = ntohs ((*mqmp)->mh->size);
/* message too large to concatenate? */
- if (ntohs ((*mqmp)->mh->size) + len < len)
+ if (((uint16_t) (old_size + len)) < len)
return GNUNET_SYSERR;
new_size = old_size + len;
- *mqmp = GNUNET_realloc (mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
- memcpy ((*mqmp)->mh + old_size, data, new_size - old_size);
+ *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
+ (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1];
+ memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size);
(*mqmp)->mh->size = htons (new_size);
return GNUNET_OK;
}
@@ -286,12 +299,10 @@ stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
/* call cb for message we finished sending */
mqm = mq->current_msg;
- if (NULL != mqm)
- {
- if (NULL != mqm->sent_cb)
- mqm->sent_cb (mqm->sent_cls);
- GNUNET_free (mqm);
- }
+ GNUNET_assert (NULL != mq->current_msg);
+ if (NULL != mqm->sent_cb)
+ mqm->sent_cb (mqm->sent_cls);
+ GNUNET_free (mqm);
mss->wh = NULL;
@@ -384,6 +395,35 @@ stream_data_processor (void *cls,
}
+static void
+stream_socket_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
+{
+ struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
+
+ if (NULL != mss->rh)
+ {
+ GNUNET_STREAM_read_cancel (mss->rh);
+ mss->rh = NULL;
+ }
+
+ if (NULL != mss->wh)
+ {
+ GNUNET_STREAM_write_cancel (mss->wh);
+ mss->wh = NULL;
+ }
+
+ if (NULL != mss->mst)
+ {
+ GNUNET_SERVER_mst_destroy (mss->mst);
+ mss->mst = NULL;
+ }
+
+ GNUNET_free (mss);
+}
+
+
+
+
struct GNUNET_MQ_MessageQueue *
GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
const struct GNUNET_MQ_Handler *handlers,
@@ -397,6 +437,7 @@ GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
mss->socket = socket;
mq->impl_state = mss;
mq->send_impl = stream_socket_send_impl;
+ mq->destroy_impl = &stream_socket_destroy_impl;
mq->handlers = handlers;
mq->handlers_cls = cls;
if (NULL != handlers)
@@ -425,14 +466,21 @@ transmit_queued (void *cls, size_t size,
struct ServerClientSocketState *state = mq->impl_state;
size_t msg_size;
+ GNUNET_assert (NULL != buf);
+
+ if (NULL != mqm->sent_cb)
+ {
+ mqm->sent_cb (mqm->sent_cls);
+ }
+
mq->current_msg = NULL;
GNUNET_assert (NULL != mqm);
- GNUNET_assert (NULL != buf);
msg_size = ntohs (mqm->mh->size);
GNUNET_assert (size >= msg_size);
memcpy (buf, mqm->mh, msg_size);
GNUNET_free (mqm);
state->th = NULL;
+
if (NULL != mq->msg_head)
{
mq->current_msg = mq->msg_head;
@@ -448,12 +496,27 @@ transmit_queued (void *cls, size_t size,
}
+
+static void
+server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
+{
+ struct ServerClientSocketState *state;
+
+ GNUNET_assert (NULL != mq);
+ state = mq->impl_state;
+ GNUNET_assert (NULL != state);
+ GNUNET_SERVER_client_drop (state->client);
+ GNUNET_free (state);
+}
+
static void
server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
{
- struct ServerClientSocketState *state = mq->impl_state;
+ struct ServerClientSocketState *state;
int msize;
+ GNUNET_assert (NULL != mq);
+ state = mq->impl_state;
GNUNET_assert (NULL != state);
if (NULL != state->th)
@@ -461,8 +524,9 @@ server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Mes
GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
return;
}
+ GNUNET_assert (NULL == mq->msg_head);
GNUNET_assert (NULL == mq->current_msg);
- msize = ntohs (mq->msg_head->mh->size);
+ msize = ntohs (mqm->mh->size);
mq->current_msg = mqm;
state->th =
GNUNET_SERVER_notify_transmit_ready (state->client, msize,
@@ -480,7 +544,10 @@ GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
scss = GNUNET_new (struct ServerClientSocketState);
mq->impl_state = scss;
+ scss->client = client;
+ GNUNET_SERVER_client_keep (client);
mq->send_impl = server_client_send_impl;
+ mq->destroy_impl = server_client_destroy_impl;
return mq;
}
@@ -502,8 +569,15 @@ connection_client_transmit_queued (void *cls, size_t size,
struct ClientConnectionState *state = mq->impl_state;
size_t msg_size;
- mq->current_msg = NULL;
+
GNUNET_assert (NULL != mqm);
+
+ if (NULL != mqm->sent_cb)
+ {
+ mqm->sent_cb (mqm->sent_cls);
+ }
+
+ mq->current_msg = NULL;
GNUNET_assert (NULL != buf);
msg_size = ntohs (mqm->mh->size);
GNUNET_assert (size >= msg_size);
@@ -515,7 +589,7 @@ connection_client_transmit_queued (void *cls, size_t size,
mq->current_msg = mq->msg_head;
GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
state->th =
- GNUNET_CLIENT_notify_transmit_ready (state->connection, htons (mq->current_msg->mh->size),
+ GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size),
GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
&connection_client_transmit_queued, mq);
}
@@ -525,6 +599,13 @@ connection_client_transmit_queued (void *cls, size_t size,
}
+
+static void
+connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
+{
+ GNUNET_free (mq->impl_state);
+}
+
static void
connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
struct GNUNET_MQ_Message *mqm)
@@ -549,6 +630,7 @@ connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
}
+
/**
* Type of a function to call when we receive a message
* from the service.
@@ -561,6 +643,9 @@ handle_client_message (void *cls,
const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_MQ_MessageQueue *mq = cls;
+ struct ClientConnectionState *state;
+
+ state = mq->impl_state;
if (NULL == msg)
{
@@ -569,6 +654,10 @@ handle_client_message (void *cls,
mq->read_error_cb (mq->read_error_cls);
return;
}
+
+ GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+
dispatch_message (mq, msg);
}
@@ -590,6 +679,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti
state->connection = connection;
mq->impl_state = state;
mq->send_impl = connection_client_send_impl;
+ mq->destroy_impl = connection_client_destroy_impl;
if (NULL != handlers)
{
@@ -626,7 +716,10 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
uint32_t id;
if (NULL == mq->assoc_map)
+ {
mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
+ mq->assoc_id = 1;
+ }
id = mq->assoc_id++;
GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
@@ -652,6 +745,7 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
if (NULL == mq->assoc_map)
return NULL;
val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
+ GNUNET_assert (NULL != val);
GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
return val;
}
@@ -671,6 +765,12 @@ void
GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
{
/* FIXME: destroy all pending messages in the queue */
+
+ if (NULL != mq->destroy_impl)
+ {
+ mq->destroy_impl (mq);
+ }
+
GNUNET_free (mq);
}
diff --git a/src/set/mq.h b/src/set/mq.h
index e43ce04b4..42b755163 100644
--- a/src/set/mq.h
+++ b/src/set/mq.h
@@ -106,7 +106,7 @@
* @param esize extra space to allocate after the message header
* @param type type of the message
*/
-#define GNUNET_MQ_msg_header_extra(mh, esize, type) GNUNET_MQ_msg_ (&mh, sizeof (struct GNUNET_MessageHeader), type)
+#define GNUNET_MQ_msg_header_extra(mh, esize, type) GNUNET_MQ_msg_ (&mh, (esize) + sizeof (struct GNUNET_MessageHeader), type)
/**
@@ -166,6 +166,7 @@ struct GNUNET_MQ_Handler
*/
typedef void (*GNUNET_MQ_NotifyCallback) (void *cls);
+
/**
* Create a new message for MQ.
*
diff --git a/src/set/set.h b/src/set/set.h
index 87fd6efbf..33e0aafdd 100644
--- a/src/set/set.h
+++ b/src/set/set.h
@@ -80,36 +80,42 @@ struct AcceptMessage
struct GNUNET_MessageHeader header;
/**
- * request id of the request we want to accept
+ * Request id that will be sent along with
+ * results for the accepted operation.
+ * Chosen by the client.
+ * Must be 0 if the request has been rejected.
*/
uint32_t request_id GNUNET_PACKED;
/**
- * Zero if the client has rejected the request,
- * non-zero if it has accepted it
+ * ID of the incoming request we want to accept / reject.
*/
- uint32_t accepted GNUNET_PACKED;
+ uint32_t accept_id GNUNET_PACKED;
};
+/**
+ * A request for an operation with another client.
+ */
struct RequestMessage
{
/**
- * Type: GNUNET_MESSAGE_TYPE_SET_Request
+ * Type: GNUNET_MESSAGE_TYPE_SET_Request.
*/
struct GNUNET_MessageHeader header;
/**
- * requesting peer
+ * Identity of the requesting peer.
*/
struct GNUNET_PeerIdentity peer_id;
/**
- * request id of the request we want to accept
+ * ID of the request we want to accept,
+ * chosen by the service.
*/
- uint32_t request_id GNUNET_PACKED;
+ uint32_t accept_id GNUNET_PACKED;
- /* rest: inner message */
+ /* rest: nested context message */
};
@@ -131,7 +137,7 @@ struct EvaluateMessage
struct GNUNET_HashCode app_id;
/**
- * id of our evaluate
+ * id of our evaluate, chosen by the client
*/
uint32_t request_id GNUNET_PACKED;
@@ -186,6 +192,8 @@ struct ElementMessage
uint16_t element_type GNUNET_PACKED;
+ uint16_t reserved GNUNET_PACKED;
+
/* rest: the actual element */
};
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 2ea002231..775e390de 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -30,6 +30,7 @@
#include "gnunet_set_service.h"
#include "set.h"
#include "mq.h"
+#include <inttypes.h>
#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
@@ -49,7 +50,7 @@ struct GNUNET_SET_Handle
*/
struct GNUNET_SET_Request
{
- uint32_t request_id;
+ uint32_t accept_id;
int accepted;
};
@@ -98,20 +99,23 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
}
oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id));
- GNUNET_break (NULL != oh);
- if (GNUNET_SCHEDULER_NO_TASK != oh->timeout_task)
- {
- GNUNET_SCHEDULER_cancel (oh->timeout_task);
- oh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- }
+ GNUNET_assert (NULL != oh);
+ /* status is not STATUS_OK => there's no attached element,
+ * and this is the last result message we get */
if (htons (msg->result_status) != GNUNET_SET_STATUS_OK)
{
+ if (GNUNET_SCHEDULER_NO_TASK != oh->timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (oh->timeout_task);
+ oh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
if (NULL != oh->result_cb)
oh->result_cb (oh->result_cls, NULL, htons (msg->result_status));
- GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
GNUNET_free (oh);
return;
}
+
e.data = &msg[1];
e.size = ntohs (mh->size) - sizeof (struct ResultMessage);
e.type = msg->element_type;
@@ -133,18 +137,25 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
struct GNUNET_SET_Request *req;
req = GNUNET_new (struct GNUNET_SET_Request);
- req->request_id = ntohl (msg->request_id);
+ req->accept_id = ntohl (msg->accept_id);
+ /* calling GNUNET_SET_accept in the listen cb will set req->accepted */
lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req);
+
if (GNUNET_NO == req->accepted)
{
struct GNUNET_MQ_Message *mqm;
struct AcceptMessage *amsg;
mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
- amsg->request_id = msg->request_id;
+ /* no request id, as we refused */
+ amsg->request_id = htonl (0);
+ amsg->accept_id = msg->accept_id;
GNUNET_MQ_send (lh->mq, mqm);
GNUNET_free (req);
}
+
+ /* the accept-case is handled in GNUNET_SET_accept,
+ * as we have the accept message available there */
}
@@ -173,7 +184,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
set = GNUNET_new (struct GNUNET_SET_Handle);
set->client = GNUNET_CLIENT_connect ("set", cfg);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set client created\n");
+ LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n");
GNUNET_assert (NULL != set->client);
set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set);
mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
@@ -377,12 +388,9 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
void
GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
{
- GNUNET_MQ_destroy (lh->mq);
- lh->mq = NULL;
GNUNET_CLIENT_disconnect (lh->client);
- lh->client = NULL;
- lh->listen_cb = NULL;
- lh->listen_cls = NULL;
+ GNUNET_MQ_destroy (lh->mq);
+ GNUNET_free (lh);
}
@@ -420,8 +428,8 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
oh->set = set;
mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT);
- msg->request_id = htonl (request->request_id);
- msg->accepted = 1;
+ msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, NULL, oh));
+ msg->accept_id = htonl (request->accept_id);
GNUNET_MQ_send (set->mq, mqm);
oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh);
diff --git a/src/set/strata_estimator.c b/src/set/strata_estimator.c
index 60f75f1bc..024bb99c6 100644
--- a/src/set/strata_estimator.c
+++ b/src/set/strata_estimator.c
@@ -33,6 +33,8 @@ void
strata_estimator_write (const struct StrataEstimator *se, void *buf)
{
int i;
+
+ GNUNET_assert (NULL != se);
for (i = 0; i < se->strata_count; i++)
{
ibf_write_slice (se->strata[i], 0, se->ibf_size, buf);
diff --git a/src/set/test_mq.c b/src/set/test_mq.c
new file mode 100644
index 000000000..d13c63440
--- /dev/null
+++ b/src/set/test_mq.c
@@ -0,0 +1,115 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file set/test_mq.c
+ * @brief simple tests for mq
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_testing_lib.h"
+#include "mq.h"
+
+
+GNUNET_NETWORK_STRUCT_BEGIN
+
+struct MyMessage
+{
+ struct GNUNET_MessageHeader header;
+ uint32_t x GNUNET_PACKED;
+};
+
+GNUNET_NETWORK_STRUCT_END
+
+void
+test1 (void)
+{
+ struct GNUNET_MQ_Message *mqm;
+ struct MyMessage *mm;
+
+ mm = NULL;
+ mqm = NULL;
+
+ mqm = GNUNET_MQ_msg (mm, 42);
+ GNUNET_assert (NULL != mqm);
+ GNUNET_assert (NULL != mm);
+ GNUNET_assert (42 == ntohs (mm->header.type));
+ GNUNET_assert (sizeof (struct MyMessage) == ntohs (mm->header.size));
+}
+
+
+void
+test2 (void)
+{
+ struct GNUNET_MQ_Message *mqm;
+ struct MyMessage *mm;
+ int res;
+ char *s = "foo";
+
+ mqm = GNUNET_MQ_msg (mm, 42);
+ res = GNUNET_MQ_nest (mqm, s, strlen(s));
+ GNUNET_assert (GNUNET_OK == res);
+ res = GNUNET_MQ_nest (mqm, s, strlen(s));
+ GNUNET_assert (GNUNET_OK == res);
+ res = GNUNET_MQ_nest (mqm, NULL, 0);
+ GNUNET_assert (GNUNET_OK == res);
+
+ GNUNET_assert (strlen (s) * 2 + sizeof (struct MyMessage) == ntohs (mm->header.size));
+
+ res = GNUNET_MQ_nest_mh (mqm, &mm->header);
+ GNUNET_assert (GNUNET_OK == res);
+ GNUNET_assert (2 * (strlen (s) * 2 + sizeof (struct MyMessage)) == ntohs (mm->header.size));
+
+ res = GNUNET_MQ_nest (mqm, (void *) 0xF00BA, 0xFFF0);
+ GNUNET_assert (GNUNET_OK != res);
+
+ GNUNET_MQ_discard (mqm);
+}
+
+
+void
+test3 (void)
+{
+ struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MessageHeader *mh;
+
+ mqm = GNUNET_MQ_msg_header (42);
+ /* how could the above be checked? */
+
+ GNUNET_MQ_discard (mqm);
+
+ mqm = GNUNET_MQ_msg_header_extra (mh, 20, 42);
+ GNUNET_assert (42 == ntohs (mh->type));
+ GNUNET_assert (sizeof (struct GNUNET_MessageHeader) + 20 == ntohs (mh->size));
+}
+
+
+int
+main (int argc, char **argv)
+{
+
+ GNUNET_log_setup ("test-mq", "INFO", NULL);
+ test1 ();
+ test2 ();
+ test3 ();
+
+ return 0;
+}
+
diff --git a/src/set/test_mq_client.c b/src/set/test_mq_client.c
new file mode 100644
index 000000000..ca615d37e
--- /dev/null
+++ b/src/set/test_mq_client.c
@@ -0,0 +1,181 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file set/test_mq.c
+ * @brief tests for mq with connection client
+ */
+/**
+ * @file util/test_server_with_client.c
+ * @brief tests for server.c and client.c,
+ * specifically disconnect_notify,
+ * client_get_address and receive_done (resume processing)
+ */
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_scheduler_lib.h"
+#include "gnunet_client_lib.h"
+#include "gnunet_server_lib.h"
+#include "gnunet_time_lib.h"
+#include "mq.h"
+
+#define PORT 23336
+
+#define MY_TYPE 128
+
+
+static struct GNUNET_SERVER_Handle *server;
+
+static struct GNUNET_CLIENT_Connection *client;
+
+static struct GNUNET_CONFIGURATION_Handle *cfg;
+
+static int ok;
+
+static int notify = GNUNET_NO;
+
+static int received = 0;
+
+
+static void
+recv_cb (void *cls, struct GNUNET_SERVER_Client *argclient,
+ const struct GNUNET_MessageHeader *message)
+{
+ received++;
+
+ printf ("received\n");
+
+
+ if ((received == 2) && (GNUNET_YES == notify))
+ {
+ printf ("done\n");
+ GNUNET_SERVER_receive_done (argclient, GNUNET_NO);
+ return;
+ }
+
+ GNUNET_SERVER_receive_done (argclient, GNUNET_YES);
+}
+
+
+static void
+clean_up (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_SERVER_destroy (server);
+ server = NULL;
+ GNUNET_CONFIGURATION_destroy (cfg);
+ cfg = NULL;
+}
+
+
+/**
+ * Functions with this signature are called whenever a client
+ * is disconnected on the network level.
+ *
+ * @param cls closure
+ * @param client identification of the client
+ */
+static void
+notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+{
+ if (client == NULL)
+ return;
+ ok = 0;
+ GNUNET_SCHEDULER_add_now (&clean_up, NULL);
+}
+
+
+static struct GNUNET_SERVER_MessageHandler handlers[] = {
+ {&recv_cb, NULL, MY_TYPE, sizeof (struct GNUNET_MessageHeader)},
+ {NULL, NULL, 0, 0}
+};
+
+void send_cb (void *cls)
+{
+ printf ("notify sent\n");
+ notify = GNUNET_YES;
+}
+
+void test_mq (struct GNUNET_CLIENT_Connection *client)
+{
+ struct GNUNET_MQ_MessageQueue *mq;
+ struct GNUNET_MQ_Message *mqm;
+
+ /* FIXME: test handling responses */
+ mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL);
+
+ mqm = GNUNET_MQ_msg_header (MY_TYPE);
+ GNUNET_MQ_send (mq, mqm);
+
+ mqm = GNUNET_MQ_msg_header (MY_TYPE);
+ GNUNET_MQ_notify_sent (mqm, send_cb, NULL);
+ GNUNET_MQ_send (mq, mqm);
+
+ /* FIXME: add a message that will be canceled */
+}
+
+
+static void
+task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct sockaddr_in sa;
+ struct sockaddr *sap[2];
+ socklen_t slens[2];
+
+ sap[0] = (struct sockaddr *) &sa;
+ slens[0] = sizeof (sa);
+ sap[1] = NULL;
+ slens[1] = 0;
+ memset (&sa, 0, sizeof (sa));
+#if HAVE_SOCKADDR_IN_SIN_LEN
+ sa.sin_len = sizeof (sa);
+#endif
+ sa.sin_family = AF_INET;
+ sa.sin_port = htons (PORT);
+ server =
+ GNUNET_SERVER_create (NULL, NULL, sap, slens,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_MILLISECONDS, 250), GNUNET_NO);
+ GNUNET_assert (server != NULL);
+ handlers[0].callback_cls = cls;
+ GNUNET_SERVER_add_handlers (server, handlers);
+ GNUNET_SERVER_disconnect_notify (server, &notify_disconnect, cls);
+ cfg = GNUNET_CONFIGURATION_create ();
+ GNUNET_CONFIGURATION_set_value_number (cfg, "test", "PORT", PORT);
+ GNUNET_CONFIGURATION_set_value_string (cfg, "test", "HOSTNAME", "localhost");
+ GNUNET_CONFIGURATION_set_value_string (cfg, "resolver", "HOSTNAME",
+ "localhost");
+ client = GNUNET_CLIENT_connect ("test", cfg);
+ GNUNET_assert (client != NULL);
+
+ test_mq (client);
+}
+
+
+int
+main (int argc, char *argv[])
+{
+ GNUNET_log_setup ("test-mq-client",
+ "INFO",
+ NULL);
+ ok = 1;
+ GNUNET_SCHEDULER_run (&task, NULL);
+ return ok;
+}
+
diff --git a/src/set/test_set.conf b/src/set/test_set.conf
index e69de29bb..c1d5a0f93 100644
--- a/src/set/test_set.conf
+++ b/src/set/test_set.conf
@@ -0,0 +1,23 @@
+[set]
+AUTOSTART = YES
+PORT = 2106
+HOSTNAME = localhost
+HOME = $SERVICEHOME
+BINARY = gnunet-service-set
+#PREFIX = gdbserver :12345
+#PREFIX = valgrind --leak-check=full
+ACCEPT_FROM = 127.0.0.1;
+ACCEPT_FROM6 = ::1;
+UNIXPATH = /tmp/gnunet-service-set.sock
+UNIX_MATCH_UID = YES
+UNIX_MATCH_GID = YES
+OPTIONS = -L INFO
+
+
+[transport]
+OPTIONS = -LERROR
+
+
+[testbed]
+OVERLAY_TOPOLOGY = CLIQUE
+
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c
index 0753fd139..0ab02cad7 100644
--- a/src/set/test_set_api.c
+++ b/src/set/test_set_api.c
@@ -28,6 +28,99 @@
#include "gnunet_set_service.h"
+static struct GNUNET_PeerIdentity local_id;
+static struct GNUNET_HashCode app_id;
+static struct GNUNET_SET_Handle *set1;
+static struct GNUNET_SET_Handle *set2;
+static struct GNUNET_SET_ListenHandle *listen_handle;
+const static struct GNUNET_CONFIGURATION_Handle *config;
+
+
+static void
+result_cb_set1 (void *cls, struct GNUNET_SET_Element *element,
+ enum GNUNET_SET_Status status)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result (set 1)\n");
+}
+
+
+static void
+result_cb_set2 (void *cls, struct GNUNET_SET_Element *element,
+ enum GNUNET_SET_Status status)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result (set 2)\n");
+}
+
+
+static void
+listen_cb (void *cls,
+ const struct GNUNET_PeerIdentity *other_peer,
+ const struct GNUNET_MessageHeader *context_msg,
+ struct GNUNET_SET_Request *request)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
+ GNUNET_SET_accept (request, set2, GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
+}
+
+
+/**
+ * Start the set operation.
+ *
+ * @param cls closure, unused
+ */
+static void
+start (void *cls)
+{
+ listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
+ &app_id, listen_cb, NULL);
+ GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42,
+ GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED,
+ result_cb_set1, NULL);
+}
+
+
+/**
+ * Initialize the second set, continue
+ *
+ * @param cls closure, unused
+ */
+static void
+init_set2 (void *cls)
+{
+ struct GNUNET_SET_Element element;
+
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
+
+ element.data = "hello";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set2, &element, NULL, NULL);
+ element.data = "quux";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set2, &element, start, NULL);
+}
+
+
+/**
+ * Initialize the first set, continue.
+ */
+static void
+init_set1 (void)
+{
+ struct GNUNET_SET_Element element;
+
+ element.data = "hello";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set1, &element, NULL, NULL);
+ element.data = "bar";
+ element.size = strlen(element.data);
+ GNUNET_SET_add_element (set1, &element, init_set2, NULL);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
+}
+
+
/**
* Signature of the 'main' function for a (single-peer) testcase that
* is run using 'GNUNET_TESTING_peer_run'.
@@ -41,11 +134,15 @@ run (void *cls,
const struct GNUNET_CONFIGURATION_Handle *cfg,
struct GNUNET_TESTING_Peer *peer)
{
- struct GNUNET_SET_Handle *set1;
- struct GNUNET_SET_Handle *set2;
+ static const char* app_str = "gnunet-set";
+
+ config = cfg;
+ GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id);
+ GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+ init_set1 ();
}
int