aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_applications.h6
-rw-r--r--src/include/gnunet_protocols.h33
-rw-r--r--src/include/gnunet_set_service.h21
-rw-r--r--src/set/Makefile.am9
-rw-r--r--src/set/gnunet-service-set.c506
-rw-r--r--src/set/gnunet-set.c30
-rw-r--r--src/set/mq.c584
-rw-r--r--src/set/mq.h202
-rw-r--r--src/set/set.conf.in11
-rw-r--r--src/set/set.h33
-rw-r--r--src/set/set_api.c23
-rw-r--r--src/set/strata_estimator.c8
-rw-r--r--src/set/strata_estimator.h2
-rw-r--r--src/set/test_set_api.c4
14 files changed, 1402 insertions, 70 deletions
diff --git a/src/include/gnunet_applications.h b/src/include/gnunet_applications.h
index 5710a8838..4e86e332e 100644
--- a/src/include/gnunet_applications.h
+++ b/src/include/gnunet_applications.h
@@ -77,6 +77,12 @@ extern "C"
77#define GNUNET_APPLICATION_TYPE_CONSENSUS 18 77#define GNUNET_APPLICATION_TYPE_CONSENSUS 18
78 78
79 79
80/**
81 * Set. Used for two-peer set operations implemented using stream.
82 */
83#define GNUNET_APPLICATION_TYPE_SET 19
84
85
80#if 0 /* keep Emacsens' auto-indent happy */ 86#if 0 /* keep Emacsens' auto-indent happy */
81{ 87{
82#endif 88#endif
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index ae28da7b1..33e97b80c 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1807,10 +1807,41 @@ extern "C"
1807#define GNUNET_MESSAGE_TYPE_SET_REQUEST 578 1807#define GNUNET_MESSAGE_TYPE_SET_REQUEST 578
1808 1808
1809/** 1809/**
1810 * Evaluate a set operation 1810 * Evaluate a set operation.
1811 */ 1811 */
1812#define GNUNET_MESSAGE_TYPE_SET_CREATE 579 1812#define GNUNET_MESSAGE_TYPE_SET_CREATE 579
1813 1813
1814/**
1815 * Evaluate a set operation.
1816 */
1817#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 580
1818
1819/**
1820 * Strata estimator.
1821 */
1822#define GNUNET_MESSAGE_TYPE_SET_P2P_SE 581
1823
1824/**
1825 * Invertible bloom filter.
1826 */
1827#define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 582
1828
1829/**
1830 * Actual set elements.
1831 */
1832#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 583
1833
1834/**
1835 * Requests for the elements with the given hashes.
1836 */
1837#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 584
1838
1839/**
1840 * Operation is done.
1841 */
1842#define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 585
1843
1844
1814 1845
1815/******************************************************************************* 1846/*******************************************************************************
1816 * TESTBED LOGGER message types 1847 * TESTBED LOGGER message types
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h
index cf782c841..87d766c68 100644
--- a/src/include/gnunet_set_service.h
+++ b/src/include/gnunet_set_service.h
@@ -63,11 +63,6 @@ struct GNUNET_SET_OperationHandle;
63 63
64 64
65/** 65/**
66 * Opaque handle to a listen operation.
67 */
68struct GNUNET_SET_ListenHandle;
69
70/**
71 * The operation that a set set supports. 66 * The operation that a set set supports.
72 */ 67 */
73enum GNUNET_SET_OperationType 68enum GNUNET_SET_OperationType
@@ -135,10 +130,12 @@ struct GNUNET_SET_Element
135 * Number of bytes in the buffer pointed to by data. 130 * Number of bytes in the buffer pointed to by data.
136 */ 131 */
137 uint16_t size; 132 uint16_t size;
133
138 /** 134 /**
139 * Application-specific element type. 135 * Application-specific element type.
140 */ 136 */
141 uint16_t type; 137 uint16_t type;
138
142 /** 139 /**
143 * Actual data of the element 140 * Actual data of the element
144 */ 141 */
@@ -153,6 +150,7 @@ struct GNUNET_SET_Element
153 */ 150 */
154typedef void (*GNUNET_SET_Continuation) (void *cls); 151typedef void (*GNUNET_SET_Continuation) (void *cls);
155 152
153
156/** 154/**
157 * Callback for set operation results. Called for each element 155 * Callback for set operation results. Called for each element
158 * in the result set. 156 * in the result set.
@@ -161,10 +159,9 @@ typedef void (*GNUNET_SET_Continuation) (void *cls);
161 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK 159 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
162 * @param status see enum GNUNET_SET_Status 160 * @param status see enum GNUNET_SET_Status
163 */ 161 */
164typedef void 162typedef void (*GNUNET_SET_ResultIterator) (void *cls,
165(*GNUNET_SET_ResultIterator) (void *cls, 163 struct GNUNET_SET_Element *element,
166 struct GNUNET_SET_Element *element, 164 enum GNUNET_SET_Status status);
167 enum GNUNET_SET_Status status);
168 165
169 166
170/** 167/**
@@ -201,7 +198,7 @@ typedef void
201 * @return a handle to the set 198 * @return a handle to the set
202 */ 199 */
203struct GNUNET_SET_Handle * 200struct GNUNET_SET_Handle *
204GNUNET_SET_create (struct GNUNET_CONFIGURATION_Handle *cfg, 201GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
205 enum GNUNET_SET_OperationType op); 202 enum GNUNET_SET_OperationType op);
206 203
207 204
@@ -270,8 +267,6 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
270 void *result_cls); 267 void *result_cls);
271 268
272 269
273
274
275/** 270/**
276 * Wait for set operation requests for the given application id 271 * Wait for set operation requests for the given application id
277 * 272 *
@@ -285,7 +280,7 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
285 * @return a handle that can be used to cancel the listen operation 280 * @return a handle that can be used to cancel the listen operation
286 */ 281 */
287struct GNUNET_SET_ListenHandle * 282struct GNUNET_SET_ListenHandle *
288GNUNET_SET_listen (struct GNUNET_CONFIGURATION_Handle *cfg, 283GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
289 enum GNUNET_SET_OperationType op_type, 284 enum GNUNET_SET_OperationType op_type,
290 const struct GNUNET_HashCode *app_id, 285 const struct GNUNET_HashCode *app_id,
291 GNUNET_SET_ListenCallback listen_cb, 286 GNUNET_SET_ListenCallback listen_cb,
diff --git a/src/set/Makefile.am b/src/set/Makefile.am
index 06f418465..c1639823e 100644
--- a/src/set/Makefile.am
+++ b/src/set/Makefile.am
@@ -29,6 +29,7 @@ gnunet_set_SOURCES = \
29gnunet_set_LDADD = \ 29gnunet_set_LDADD = \
30 $(top_builddir)/src/util/libgnunetutil.la \ 30 $(top_builddir)/src/util/libgnunetutil.la \
31 $(top_builddir)/src/set/libgnunetset.la \ 31 $(top_builddir)/src/set/libgnunetset.la \
32 $(top_builddir)/src/stream/libgnunetstream.la \
32 $(top_builddir)/src/testbed/libgnunettestbed.la \ 33 $(top_builddir)/src/testbed/libgnunettestbed.la \
33 $(GN_LIBINTL) 34 $(GN_LIBINTL)
34gnunet_set_DEPENDENCIES = \ 35gnunet_set_DEPENDENCIES = \
@@ -36,6 +37,8 @@ gnunet_set_DEPENDENCIES = \
36 37
37gnunet_service_set_SOURCES = \ 38gnunet_service_set_SOURCES = \
38 gnunet-service-set.c \ 39 gnunet-service-set.c \
40 gnunet-service-set_union.c \
41 mq.c \
39 ibf.c \ 42 ibf.c \
40 strata_estimator.c 43 strata_estimator.c
41gnunet_service_set_LDADD = \ 44gnunet_service_set_LDADD = \
@@ -44,12 +47,16 @@ gnunet_service_set_LDADD = \
44 $(top_builddir)/src/stream/libgnunetstream.la \ 47 $(top_builddir)/src/stream/libgnunetstream.la \
45 $(top_builddir)/src/mesh/libgnunetmesh.la \ 48 $(top_builddir)/src/mesh/libgnunetmesh.la \
46 $(GN_LIBINTL) 49 $(GN_LIBINTL)
50# hack for mq.c, see automake Objects ‘created with both libtool and without’
51# remove once GNUNET_MQ is in util/
52gnunet_service_set_CFLAGS = $(AM_CFLAGS)
47 53
48libgnunetset_la_SOURCES = \ 54libgnunetset_la_SOURCES = \
49 set_api.c \ 55 set_api.c \
50 mq.c 56 mq.c
51libgnunetset_la_LIBADD = \ 57libgnunetset_la_LIBADD = \
52 $(top_builddir)/src/util/libgnunetutil.la \ 58 $(top_builddir)/src/util/libgnunetutil.la \
59 $(top_builddir)/src/stream/libgnunetstream.la \
53 $(LTLIBINTL) 60 $(LTLIBINTL)
54libgnunetset_la_LDFLAGS = \ 61libgnunetset_la_LDFLAGS = \
55 $(GN_LIB_LDFLAGS) 62 $(GN_LIB_LDFLAGS)
@@ -67,6 +74,8 @@ test_set_api_LDADD = \
67 $(top_builddir)/src/util/libgnunetutil.la \ 74 $(top_builddir)/src/util/libgnunetutil.la \
68 $(top_builddir)/src/testing/libgnunettesting.la \ 75 $(top_builddir)/src/testing/libgnunettesting.la \
69 $(top_builddir)/src/set/libgnunetset.la 76 $(top_builddir)/src/set/libgnunetset.la
77test_set_api_DEPENDENCIES = \
78 libgnunetset.la
70 79
71EXTRA_DIST = \ 80EXTRA_DIST = \
72 test_set.conf 81 test_set.conf
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 69296fe79..6663e3fae 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -24,31 +24,194 @@
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
26 26
27#include "platform.h"
28#include "gnunet_common.h"
29#include "gnunet_protocols.h"
30#include "gnunet_applications.h"
31#include "gnunet_util_lib.h"
32#include "gnunet_core_service.h"
33#include "gnunet_stream_lib.h"
34 27
35struct Set 28#include "gnunet-service-set.h"
36{ 29#include "set_protocol.h"
37 30
38};
39 31
40struct Listener 32/**
41{ 33 * Configuration of our local peer.
34 */
35const struct GNUNET_CONFIGURATION_Handle *configuration;
36
37/**
38 * Socket listening for other peers via stream.
39 */
40static struct GNUNET_STREAM_ListenSocket *stream_listen_socket;
42 41
43}; 42/**
43 * Sets are held in a doubly linked list.
44 */
45static struct Set *sets_head;
44 46
45/* 47/**
46static struct Listener *sets_head; 48 * Sets are held in a doubly linked list.
47static struct Listener *sets_tail; 49 */
50static struct Set *sets_tail;
48 51
52/**
53 * Listeners are held in a doubly linked list.
54 */
49static struct Listener *listeners_head; 55static struct Listener *listeners_head;
56
57/**
58 * Listeners are held in a doubly linked list.
59 */
50static struct Listener *listeners_tail; 60static struct Listener *listeners_tail;
51*/ 61
62/**
63 * Incoming sockets from remote peers are
64 * held in a doubly linked list.
65 */
66static struct Incoming *incoming_head;
67
68/**
69 * Incoming sockets from remote peers are
70 * held in a doubly linked list.
71 */
72static struct Incoming *incoming_tail;
73
74/**
75 * Counter for allocating unique request IDs for clients.
76 */
77static uint32_t request_id = 1;
78
79
80/**
81 * Disconnect a client and free all resources
82 * that the client allocated (e.g. Sets or Listeners)
83 *
84 * @param client the client to disconnect
85 */
86void
87client_disconnect (struct GNUNET_SERVER_Client *client)
88{
89 /* FIXME: clean up any data structures belonging to the client */
90 GNUNET_SERVER_client_disconnect (client);
91}
92
93
94/**
95 * Get set that is owned by the client, if any.
96 *
97 * @param client client to look for
98 * @return set that the client owns, NULL if the client
99 * does not own a set
100 */
101static struct Set *
102get_set (struct GNUNET_SERVER_Client *client)
103{
104 struct Set *set;
105 for (set = sets_head; NULL != set; set = set->next)
106 if (set->client == client)
107 return set;
108 return NULL;
109}
110
111
112/**
113 * Get the listener associated to a client, if any.
114 *
115 * @param client the client
116 * @return listener associated with the client, NULL
117 * if there isn't any
118 */
119static struct Listener *
120get_listener (struct GNUNET_SERVER_Client *client)
121{
122 struct Listener *listener;
123 for (listener = listeners_head; NULL != listener; listener = listener->next)
124 if (listener->client == client)
125 return listener;
126 return NULL;
127}
128
129/**
130 * Get the incoming socket associated with the given id
131 *
132 * @param id id to look for
133 * @return the incoming socket associated with the id,
134 * or NULL if there is none
135 */
136static struct Incoming *
137get_incoming (uint32_t id)
138{
139 struct Incoming *incoming;
140 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
141 if (incoming->request_id == id)
142 return incoming;
143 return NULL;
144}
145
146static void
147destroy_incoming (struct Incoming *incoming)
148{
149 if (NULL != incoming->mq)
150 {
151 GNUNET_MQ_destroy (incoming->mq);
152 incoming->mq = NULL;
153 }
154 if (NULL != incoming->socket)
155 {
156 GNUNET_STREAM_close (incoming->socket);
157 incoming->socket = NULL;
158 }
159 GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
160 GNUNET_free (incoming);
161}
162
163
164/**
165 * Handle a request for a set operation for
166 * another peer.
167 *
168 * @param cls the incoming socket
169 * @param mh the message
170 */
171static void
172handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
173{
174 struct Incoming *incoming = cls;
175 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
176 struct GNUNET_MQ_Message *mqm;
177 struct RequestMessage *cmsg;
178 struct Listener *listener;
179 const struct GNUNET_MessageHeader *context_msg;
180
181 if (ntohs (mh->size) < sizeof *msg)
182 {
183 GNUNET_break (0);
184 destroy_incoming (incoming);
185 return;
186 }
187 else if (ntohs (mh->size) == sizeof *msg)
188 {
189 context_msg = NULL;
190 }
191 else
192 {
193 context_msg = &msg[1].header;
194 if ((ntohs (context_msg->size) + sizeof *msg) != ntohs (msg->header.size))
195 {
196 /* size of context message is invalid */
197 GNUNET_break (0);
198 destroy_incoming (incoming);
199 return;
200 }
201 }
202
203 for (listener = listeners_head; listener != NULL; listener = listener->next)
204 {
205 if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) ||
206 (htons (msg->operation) != listener->operation) )
207 continue;
208 mqm = GNUNET_MQ_msg_concat (cmsg, context_msg, GNUNET_MESSAGE_TYPE_SET_REQUEST);
209 incoming->request_id = request_id++;
210 cmsg->request_id = htonl (incoming->request_id);
211 GNUNET_MQ_send (listener->client_mq, mqm);
212 return;
213 }
214}
52 215
53 216
54/** 217/**
@@ -63,7 +226,303 @@ handle_client_create (void *cls,
63 struct GNUNET_SERVER_Client *client, 226 struct GNUNET_SERVER_Client *client,
64 const struct GNUNET_MessageHeader *m) 227 const struct GNUNET_MessageHeader *m)
65{ 228{
229 struct SetCreateMessage *msg = (struct SetCreateMessage *) m;
230 struct Set *set;
231
232 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new set created\n");
233
234 if (NULL != get_set (client))
235 {
236 GNUNET_break (0);
237 GNUNET_SERVER_client_disconnect (client);
238 return;
239 }
240
241 set = GNUNET_new (struct Set);
242
243 switch (ntohs (msg->operation))
244 {
245 case GNUNET_SET_OPERATION_INTERSECTION:
246 /* FIXME: cfuchs */
247 GNUNET_assert (0);
248 break;
249 case GNUNET_SET_OPERATION_UNION:
250 set = union_set_create ();
251 break;
252 default:
253 GNUNET_free (set);
254 GNUNET_break (0);
255 GNUNET_SERVER_client_disconnect (client);
256 return;
257 }
258
259 set->client = client;
260 set->client_mq = GNUNET_MQ_queue_for_server_client (client);
261 GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set);
262
263 GNUNET_SERVER_receive_done (client, GNUNET_OK);
264}
265
266
267/**
268 * Called when a client wants to create a new set.
269 *
270 * @param cls unused
271 * @param client client that sent the message
272 * @param m message sent by the client
273 */
274static void
275handle_client_listen (void *cls,
276 struct GNUNET_SERVER_Client *client,
277 const struct GNUNET_MessageHeader *m)
278{
279 struct ListenMessage *msg = (struct ListenMessage *) m;
280 struct Listener *listener;
281
282 if (NULL != get_listener (client))
283 {
284 GNUNET_break (0);
285 GNUNET_SERVER_client_disconnect (client);
286 return;
287 }
66 288
289 listener = GNUNET_new (struct Listener);
290 listener->app_id = msg->app_id;
291 listener->operation = msg->operation;
292 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
293}
294
295
296/**
297 * Called when a client wants to create a new set.
298 *
299 * @param cls unused
300 * @param client client that sent the message
301 * @param m message sent by the client
302 */
303static void
304handle_client_add (void *cls,
305 struct GNUNET_SERVER_Client *client,
306 const struct GNUNET_MessageHeader *m)
307{
308 struct Set *set;
309
310 set = get_set (client);
311 if (NULL == set)
312 {
313 GNUNET_break (0);
314 client_disconnect (client);
315 return;
316 }
317 switch (set->operation)
318 {
319 case GNUNET_SET_OPERATION_UNION:
320 union_add (set, (struct ElementMessage *) m);
321 break;
322 case GNUNET_SET_OPERATION_INTERSECTION:
323 /* FIXME: cfuchs */
324 break;
325 default:
326 GNUNET_assert (0);
327 break;
328 }
329}
330
331
332/**
333 * Called when a client wants to evaluate a set operation with another peer.
334 *
335 * @param cls unused
336 * @param client client that sent the message
337 * @param m message sent by the client
338 */
339static void
340handle_client_evaluate (void *cls,
341 struct GNUNET_SERVER_Client *client,
342 const struct GNUNET_MessageHeader *m)
343{
344 struct Set *set;
345 struct EvaluateMessage *msg = (struct EvaluateMessage *) m;
346 struct EvaluateOperation *eo;
347
348 set = get_set (client);
349
350 if (NULL == set)
351 {
352 GNUNET_break (0);
353 client_disconnect (client);
354 return;
355 }
356
357 eo = GNUNET_new (struct EvaluateOperation);
358 eo->peer = msg->peer;
359 eo->app_id = msg->app_id;
360 eo->request_id = msg->request_id;
361 eo->context_msg = GNUNET_copy_message (&msg[1].header);
362 eo->set = set;
363
364 switch (set->operation)
365 {
366 case GNUNET_SET_OPERATION_INTERSECTION:
367 /* FIXME: cfuchs */
368 break;
369 case GNUNET_SET_OPERATION_UNION:
370 union_evaluate (eo);
371 break;
372 default:
373 GNUNET_assert (0);
374 break;
375 }
376}
377
378
379/**
380 * Handle a cancel request from a client.
381 *
382 * @param cls unused
383 * @param client the client
384 * @param m the cancel message
385 */
386static void
387handle_client_cancel (void *cls,
388 struct GNUNET_SERVER_Client *client,
389 const struct GNUNET_MessageHeader *m)
390{
391 /* FIXME: implement */
392}
393
394
395/**
396 * Handle an ack from a client.
397 *
398 * @param cls unused
399 * @param client the client
400 * @param m the message
401 */
402static void
403handle_client_ack (void *cls,
404 struct GNUNET_SERVER_Client *client,
405 const struct GNUNET_MessageHeader *m)
406{
407 /* FIXME: implement */
408}
409
410
411/**
412 * Handle a request from the client to accept
413 * a set operation.
414 *
415 * @param cls unused
416 * @param client the client
417 * @param m the message
418 */
419static void
420handle_client_accept (void *cls,
421 struct GNUNET_SERVER_Client *client,
422 const struct GNUNET_MessageHeader *m)
423{
424 struct AcceptMessage *msg = (struct AcceptMessage *) m;
425 struct Set *set;
426 struct Incoming *incoming;
427 struct EvaluateOperation *eo;
428
429 set = get_set (client);
430
431 if (NULL == set)
432 {
433 GNUNET_break (0);
434 client_disconnect (client);
435 return;
436 }
437
438 incoming = get_incoming (ntohl (msg->request_id));
439
440 if ( (NULL == incoming) ||
441 (incoming->operation != set->operation) )
442 {
443 GNUNET_break (0);
444 client_disconnect (client);
445 return;
446 }
447
448 eo = GNUNET_new (struct EvaluateOperation);
449 eo->peer = incoming->peer;
450 eo->app_id = incoming->app_id;
451 eo->request_id = msg->request_id;
452 eo->set = set;
453
454 switch (set->operation)
455 {
456 case GNUNET_SET_OPERATION_INTERSECTION:
457 /* FIXME: cfuchs*/
458 GNUNET_assert (0);
459 break;
460 case GNUNET_SET_OPERATION_UNION:
461 union_accept (eo, incoming);
462 break;
463 default:
464 GNUNET_assert (0);
465 break;
466 }
467}
468
469
470/**
471 * Functions of this type are called upon new stream connection from other peers
472 * or upon binding error which happen when the app_port given in
473 * GNUNET_STREAM_listen() is already taken.
474 *
475 * @param cls the closure from GNUNET_STREAM_listen
476 * @param socket the socket representing the stream; NULL on binding error
477 * @param initiator the identity of the peer who wants to establish a stream
478 * with us; NULL on binding error
479 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
480 * stream (the socket will be invalid after the call)
481 */
482static int
483stream_listen_cb (void *cls,
484 struct GNUNET_STREAM_Socket *socket,
485 const struct GNUNET_PeerIdentity *initiator)
486{
487 struct Incoming *incoming;
488 static const struct GNUNET_MQ_Handler handlers[] = {
489 {handle_p2p_operation_request, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST},
490 GNUNET_MQ_HANDLERS_END
491 };
492
493 if (NULL == socket)
494 {
495 GNUNET_break (0);
496 return GNUNET_SYSERR;
497 }
498
499 incoming = GNUNET_new (struct Incoming);
500 incoming->peer = *initiator;
501 incoming->socket = socket;
502 incoming->mq = GNUNET_MQ_queue_for_stream_socket (incoming->socket, handlers, incoming);
503 /* FIXME: timeout for peers that only connect but don't send anything */
504 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
505 return GNUNET_OK;
506}
507
508
509/**
510 * Called to clean up, after a shutdown has been requested.
511 *
512 * @param cls closure
513 * @param tc context information (why was this task triggered now)
514 */
515static void
516shutdown_task (void *cls,
517 const struct GNUNET_SCHEDULER_TaskContext *tc)
518{
519 if (NULL != stream_listen_socket)
520 {
521 GNUNET_STREAM_listen_close (stream_listen_socket);
522 stream_listen_socket = NULL;
523 }
524
525 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
67} 526}
68 527
69 528
@@ -77,15 +536,24 @@ handle_client_create (void *cls,
77 */ 536 */
78static void 537static void
79run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *cfg) 538run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *cfg)
80
81{ 539{
82 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { 540 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
83 {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0}, 541 {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0},
542 {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0},
543 {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
544 {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 0},
545 {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
546 {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
547 {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
84 {NULL, NULL, 0, 0} 548 {NULL, NULL, 0, 0}
85 }; 549 };
86 550
87 551 configuration = cfg;
552 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
88 GNUNET_SERVER_add_handlers (server, server_handlers); 553 GNUNET_SERVER_add_handlers (server, server_handlers);
554 stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET,
555 &stream_listen_cb, NULL,
556 GNUNET_STREAM_OPTION_END);
89} 557}
90 558
91 559
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c
index bb494ffe7..7de687611 100644
--- a/src/set/gnunet-set.c
+++ b/src/set/gnunet-set.c
@@ -27,6 +27,23 @@
27#include "gnunet_common.h" 27#include "gnunet_common.h"
28#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
29#include "gnunet_testbed_service.h" 29#include "gnunet_testbed_service.h"
30#include "gnunet_set_service.h"
31
32
33static struct GNUNET_HashCode app_id;
34static struct GNUNET_SET_Handle *set1;
35static struct GNUNET_SET_Handle *set2;
36static struct GNUNET_SET_ListenHandle *listen_handle;
37
38
39static void
40listen_cb (void *cls,
41 const struct GNUNET_PeerIdentity *other_peer,
42 const struct GNUNET_MessageHeader *context_msg,
43 struct GNUNET_SET_Request *request)
44{
45 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
46}
30 47
31 48
32/** 49/**
@@ -40,10 +57,15 @@
40static void 57static void
41run (void *cls, char *const *args, 58run (void *cls, char *const *args,
42 const char *cfgfile, 59 const char *cfgfile,
43 const struct GNUNET_CONFIGURATION_Handle * 60 const struct GNUNET_CONFIGURATION_Handle *cfg)
44 cfg)
45{ 61{
46 /* FIXME */ 62 static const char* app_str = "gnunet-set";
63 GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id);
64
65 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
66 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
67 listen_handle = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, &app_id,
68 listen_cb, NULL);
47} 69}
48 70
49 71
@@ -56,7 +78,7 @@ main (int argc, char **argv)
56 }; 78 };
57 GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set", 79 GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set",
58 "help", 80 "help",
59 options, &run, NULL, GNUNET_YES); 81 options, &run, NULL, GNUNET_NO);
60 return 0; 82 return 0;
61} 83}
62 84
diff --git a/src/set/mq.c b/src/set/mq.c
index 313e9ce0c..236a692d4 100644
--- a/src/set/mq.c
+++ b/src/set/mq.c
@@ -26,13 +26,181 @@
26 26
27#include "mq.h" 27#include "mq.h"
28 28
29/**
30 * Signature of functions implementing the
31 * sending part of a message queue
32 *
33 * @param q the message queue
34 * @param m the message
35 */
36typedef void (*SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m);
37
38
39typedef void (*DestroyImpl) (struct GNUNET_MQ_MessageQueue *q);
40
41
42/**
43 * Collection of the state necessary to read and write gnunet messages
44 * to a stream socket. Should be used as closure for stream_data_processor.
45 */
46struct MessageStreamState
47{
48 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
49 struct MessageQueue *mq;
50 struct GNUNET_STREAM_Socket *socket;
51 struct GNUNET_STREAM_ReadHandle *rh;
52 struct GNUNET_STREAM_WriteHandle *wh;
53};
54
55
56struct ServerClientSocketState
57{
58 struct GNUNET_SERVER_Client *client;
59 struct GNUNET_SERVER_TransmitHandle* th;
60};
61
62
63struct ClientConnectionState
64{
65 struct GNUNET_CLIENT_Connection *connection;
66 struct GNUNET_CLIENT_TransmitHandle *th;
67};
68
69
70struct GNUNET_MQ_MessageQueue
71{
72 /**
73 * Handlers array, or NULL if the queue should not receive messages
74 */
75 const struct GNUNET_MQ_Handler *handlers;
76
77 /**
78 * Closure for the handler callbacks
79 */
80 void *handlers_cls;
81
82 /**
83 * Actual implementation of message sending,
84 * called when a message is added
85 */
86 SendImpl send_impl;
87
88 /**
89 * Implementation-dependent queue destruction function
90 */
91 DestroyImpl destroy_impl;
92
93 /**
94 * Implementation-specific state
95 */
96 void *impl_state;
97
98 /**
99 * Linked list of messages pending to be sent
100 */
101 struct GNUNET_MQ_Message *msg_head;
102
103 /**
104 * Linked list of messages pending to be sent
105 */
106 struct GNUNET_MQ_Message *msg_tail;
107
108 /**
109 * Message that is currently scheduled to be
110 * sent. Not the head of the message queue, as the implementation
111 * needs to know if sending has been already scheduled or not.
112 */
113 struct GNUNET_MQ_Message *current_msg;
114
115 /**
116 * Map of associations, lazily allocated
117 */
118 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
119
120 /**
121 * Next id that should be used for the assoc_map,
122 * initialized lazily to a random value together with
123 * assoc_map
124 */
125 uint32_t assoc_id;
126};
127
29 128
30struct GNUNET_MQ_Message 129struct GNUNET_MQ_Message
31{ 130{
131 /**
132 * Messages are stored in a linked list
133 */
134 struct GNUNET_MQ_Message *next;
135
136 /**
137 * Messages are stored in a linked list
138 */
139 struct GNUNET_MQ_Message *prev;
140
141 /**
142 * Actual allocated message header,
143 * usually points to the end of the containing GNUNET_MQ_Message
144 */
32 struct GNUNET_MessageHeader *mh; 145 struct GNUNET_MessageHeader *mh;
146
147 /**
148 * Queue the message is queued in, NULL if message is not queued.
149 */
150 struct GNUNET_MQ_MessageQueue *parent_queue;
151
152 /**
153 * Called after the message was sent irrevokably
154 */
155 GNUNET_MQ_NotifyCallback sent_cb;
156
157 /**
158 * Closure for send_cb
159 */
160 void *sent_cls;
33}; 161};
34 162
35 163
164/**
165 * Call the right callback for a message received
166 * by a queue
167 */
168static void
169dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
170{
171 const struct GNUNET_MQ_Handler *handler;
172
173 handler = mq->handlers;
174 if (NULL == handler)
175 return;
176 for (; NULL != handler->cb; handler++)
177 if (handler->type == ntohs (mh->type))
178 handler->cb (mq->handlers_cls, mh);
179}
180
181
182void
183GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
184{
185 GNUNET_assert (NULL == mqm->parent_queue);
186 GNUNET_free (mqm);
187}
188
189
190/**
191 * Send a message with the give message queue.
192 * May only be called once per message.
193 *
194 * @param mq message queue
195 * @param mqm the message to send.
196 */
197void
198GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
199{
200 mq->send_impl (mq, mqm);
201}
202
203
36struct GNUNET_MQ_Message * 204struct GNUNET_MQ_Message *
37GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) 205GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
38{ 206{
@@ -40,8 +208,422 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
40 mqm = GNUNET_malloc (sizeof *mqm + size); 208 mqm = GNUNET_malloc (sizeof *mqm + size);
41 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; 209 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
42 mqm->mh->size = htons (size); 210 mqm->mh->size = htons (size);
43 mqm->mh->type = htons(type); 211 mqm->mh->type = htons (type);
44 if (NULL != mhp) 212 if (NULL != mhp)
45 *mhp = mqm->mh; 213 *mhp = mqm->mh;
46 return mqm; 214 return mqm;
47} 215}
216
217
218struct GNUNET_MQ_Message *
219GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, struct GNUNET_MessageHeader *m, uint16_t type)
220{
221 struct GNUNET_MQ_Message *mq;
222
223 GNUNET_assert (NULL != mhp);
224 if (NULL == m)
225 return GNUNET_MQ_msg_ (mhp, base_size, type);
226 GNUNET_assert (ntohs (m->size >= sizeof (struct GNUNET_MessageHeader)));
227 /* check for overflow */
228 if (base_size + ntohs (m->size) <= base_size)
229 return NULL;
230 mq = GNUNET_MQ_msg_ (mhp, base_size + ntohs (m->size), type);
231 memcpy (((void *) *mhp) + base_size, m, ntohs (m->size));
232 return mq;
233}
234
235
236/**
237 * Functions of this signature are called whenever writing operations
238 * on a stream are executed
239 *
240 * @param cls the closure from GNUNET_STREAM_write
241 * @param status the status of the stream at the time this function is called;
242 * GNUNET_STREAM_OK if writing to stream was completed successfully;
243 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
244 * (this doesn't mean that the data is never sent, the receiver may
245 * have read the data but its ACKs may have been lost);
246 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
247 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
248 * be processed.
249 * @param size the number of bytes written
250 */
251static void
252stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
253{
254 struct GNUNET_MQ_MessageQueue *mq = cls;
255 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
256 struct GNUNET_MQ_Message *mqm;
257
258 GNUNET_assert (GNUNET_STREAM_OK == status);
259
260 /* call cb for message we finished sending */
261 mqm = mq->current_msg;
262 if (NULL != mqm)
263 {
264 if (NULL != mqm->sent_cb)
265 mqm->sent_cb (mqm->sent_cls);
266 GNUNET_free (mqm);
267 }
268
269 mss->wh = NULL;
270
271 mqm = mq->msg_head;
272 mq->current_msg = mqm;
273 if (NULL == mqm)
274 return;
275 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
276 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
277 GNUNET_TIME_UNIT_FOREVER_REL, stream_write_queued, cls);
278 GNUNET_assert (NULL != mss->wh);
279}
280
281
282static void
283stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
284{
285 if (NULL != mq->current_msg)
286 {
287 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
288 return;
289 }
290 stream_write_queued (mq, GNUNET_STREAM_OK, 0);
291}
292
293
294/**
295 * Functions with this signature are called whenever a
296 * complete message is received by the tokenizer.
297 *
298 * Do not call GNUNET_SERVER_mst_destroy in callback
299 *
300 * @param cls closure
301 * @param client identification of the client
302 * @param message the actual message
303 *
304 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
305 */
306static int
307stream_mst_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
308{
309 struct GNUNET_MQ_MessageQueue *mq = cls;
310
311 GNUNET_assert (NULL != message);
312 dispatch_message (mq, message);
313 return GNUNET_OK;
314}
315
316
317/**
318 * Functions of this signature are called whenever data is available from the
319 * stream.
320 *
321 * @param cls the closure from GNUNET_STREAM_read
322 * @param status the status of the stream at the time this function is called
323 * @param data traffic from the other side
324 * @param size the number of bytes available in data read; will be 0 on timeout
325 * @return number of bytes of processed from 'data' (any data remaining should be
326 * given to the next time the read processor is called).
327 */
328static size_t
329stream_data_processor (void *cls,
330 enum GNUNET_STREAM_Status status,
331 const void *data,
332 size_t size)
333{
334 struct GNUNET_MQ_MessageQueue *mq = cls;
335 struct MessageStreamState *mss;
336 int ret;
337 mss = (struct MessageStreamState *) mq->impl_state;
338
339 GNUNET_assert (GNUNET_STREAM_OK == status);
340 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
341 GNUNET_assert (GNUNET_OK == ret);
342 /* we always read all data */
343 return size;
344}
345
346
347struct GNUNET_MQ_MessageQueue *
348GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
349 const struct GNUNET_MQ_Handler *handlers,
350 void *cls)
351{
352 struct GNUNET_MQ_MessageQueue *mq;
353 struct MessageStreamState *mss;
354
355 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
356 mss = GNUNET_new (struct MessageStreamState);
357 mss->socket = socket;
358 mq->impl_state = mss;
359 mq->send_impl = stream_socket_send_impl;
360 mq->handlers = handlers;
361 mq->handlers_cls = cls;
362 if (NULL != handlers)
363 {
364 mss->mst = GNUNET_SERVER_mst_create (stream_mst_callback, mq);
365 mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
366 stream_data_processor, mq);
367 }
368 return mq;
369}
370
371
372/**
373 * Transmit a queued message to the session's client.
374 *
375 * @param cls consensus session
376 * @param size number of bytes available in buf
377 * @param buf where the callee should write the message
378 * @return number of bytes written to buf
379 */
380static size_t
381transmit_queued (void *cls, size_t size,
382 void *buf)
383{
384 struct GNUNET_MQ_MessageQueue *mq = cls;
385 struct GNUNET_MQ_Message *mqm = mq->current_msg;
386 struct ServerClientSocketState *state = mq->impl_state;
387 size_t msg_size;
388
389 mq->current_msg = NULL;
390 GNUNET_assert (NULL != mqm);
391 GNUNET_assert (NULL != buf);
392 msg_size = ntohs (mqm->mh->size);
393 GNUNET_assert (size >= msg_size);
394 memcpy (buf, mqm->mh, msg_size);
395 GNUNET_free (mqm);
396 state->th = NULL;
397 if (NULL != mq->msg_head)
398 {
399 mq->current_msg = mq->msg_head;
400 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
401 state->th =
402 GNUNET_SERVER_notify_transmit_ready (state->client, msg_size,
403 GNUNET_TIME_UNIT_FOREVER_REL,
404 &transmit_queued, mq);
405 }
406 return msg_size;
407}
408
409
410static void
411server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
412{
413 struct ServerClientSocketState *state = mq->impl_state;
414 int msize;
415
416 GNUNET_assert (NULL != state);
417
418 if (NULL != state->th)
419 {
420 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
421 return;
422 }
423 GNUNET_assert (NULL == mq->current_msg);
424 msize = ntohs (mq->msg_head->mh->size);
425 mq->current_msg = mqm;
426 state->th =
427 GNUNET_SERVER_notify_transmit_ready (state->client, msize,
428 GNUNET_TIME_UNIT_FOREVER_REL,
429 &transmit_queued, mq);
430}
431
432
433struct GNUNET_MQ_MessageQueue *
434GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
435{
436 struct GNUNET_MQ_MessageQueue *mq;
437 struct ServerClientSocketState *scss;
438
439 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
440 scss = GNUNET_new (struct ServerClientSocketState);
441 mq->impl_state = scss;
442 mq->send_impl = server_client_send_impl;
443 return mq;
444}
445
446
447/**
448 * Transmit a queued message to the session's client.
449 *
450 * @param cls consensus session
451 * @param size number of bytes available in buf
452 * @param buf where the callee should write the message
453 * @return number of bytes written to buf
454 */
455static size_t
456connection_client_transmit_queued (void *cls, size_t size,
457 void *buf)
458{
459 struct GNUNET_MQ_MessageQueue *mq = cls;
460 struct GNUNET_MQ_Message *mqm = mq->current_msg;
461 struct ClientConnectionState *state = mq->impl_state;
462 size_t msg_size;
463
464 mq->current_msg = NULL;
465 GNUNET_assert (NULL != mqm);
466 GNUNET_assert (NULL != buf);
467 msg_size = ntohs (mqm->mh->size);
468 GNUNET_assert (size >= msg_size);
469 memcpy (buf, mqm->mh, msg_size);
470 GNUNET_free (mqm);
471 state->th = NULL;
472 if (NULL != mq->msg_head)
473 {
474 mq->current_msg = mq->msg_head;
475 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
476 state->th =
477 GNUNET_CLIENT_notify_transmit_ready (state->connection, msg_size,
478 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
479 &connection_client_transmit_queued, mq);
480 }
481 return msg_size;
482}
483
484
485static void
486connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
487{
488 struct ClientConnectionState *state = mq->impl_state;
489 int msize;
490
491 GNUNET_assert (NULL != state);
492
493 if (NULL != state->th)
494 {
495 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
496 return;
497 }
498 GNUNET_assert (NULL == mq->current_msg);
499 mq->current_msg = mqm;
500 msize = ntohs (mqm->mh->size);
501 state->th =
502 GNUNET_CLIENT_notify_transmit_ready (state->connection, msize,
503 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
504 &connection_client_transmit_queued, mq);
505}
506
507
508/**
509 * Type of a function to call when we receive a message
510 * from the service.
511 *
512 * @param cls closure
513 * @param msg message received, NULL on timeout or fatal error
514 */
515static void
516handle_client_message (void *cls,
517 const struct GNUNET_MessageHeader *msg)
518{
519 struct GNUNET_MQ_MessageQueue *mq = cls;
520
521 GNUNET_assert (NULL != msg);
522
523 dispatch_message (mq, msg);
524}
525
526
527struct GNUNET_MQ_MessageQueue *
528GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
529 const struct GNUNET_MQ_Handler *handlers,
530 void *cls)
531{
532 struct GNUNET_MQ_MessageQueue *mq;
533 struct ClientConnectionState *state;
534
535 GNUNET_assert (NULL != connection);
536
537 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
538 mq->handlers = handlers;
539 mq->handlers_cls = cls;
540 state = GNUNET_new (struct ClientConnectionState);
541 state->connection = connection;
542 mq->impl_state = state;
543 mq->send_impl = connection_client_send_impl;
544
545 if (NULL != handlers)
546 {
547 GNUNET_CLIENT_receive (connection, handle_client_message, mq,
548 GNUNET_TIME_UNIT_FOREVER_REL);
549 }
550
551 return mq;
552}
553
554
555
556void
557GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
558 const struct GNUNET_MQ_Handler *new_handlers,
559 void *cls)
560{
561 mq->handlers = new_handlers;
562 mq->handlers_cls = cls;
563}
564
565
566
567/**
568 * Associate the assoc_data in mq with a unique request id.
569 *
570 * @param mq message queue, id will be unique for the queue
571 * @param mqm message to associate
572 * @param data to associate
573 */
574uint32_t
575GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
576 struct GNUNET_MQ_Message *mqm,
577 void *assoc_data)
578{
579 uint32_t id;
580
581 if (NULL == mq->assoc_map)
582 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
583 id = mq->assoc_id++;
584 GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
585 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
586 return id;
587}
588
589
590
591void *
592GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
593{
594 if (NULL == mq->assoc_map)
595 return NULL;
596 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
597}
598
599
600void *
601GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
602{
603 void *val;
604
605 if (NULL == mq->assoc_map)
606 return NULL;
607 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
608 GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
609 return val;
610}
611
612
613void
614GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
615 GNUNET_MQ_NotifyCallback cb,
616 void *cls)
617{
618 mqm->sent_cb = cb;
619 mqm->sent_cls = cls;
620}
621
622
623void
624GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
625{
626 /* FIXME: destroy all pending messages in the queue */
627 GNUNET_free (mq);
628}
629
diff --git a/src/set/mq.h b/src/set/mq.h
index 960025885..27ab0469e 100644
--- a/src/set/mq.h
+++ b/src/set/mq.h
@@ -30,23 +30,109 @@
30#include "gnunet_common.h" 30#include "gnunet_common.h"
31#include "gnunet_util_lib.h" 31#include "gnunet_util_lib.h"
32#include "gnunet_connection_lib.h" 32#include "gnunet_connection_lib.h"
33#include "gnunet_server_lib.h"
34#include "gnunet_stream_lib.h"
33 35
34 36
35#define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_(((void) mvar->header, (struct GNUNET_MessageHeader**) &mvar), (esize) + sizeof *mvar, type) 37/**
38 * Allocate a GNUNET_MQ_Message, with extra space allocated after the space needed
39 * by the message struct.
40 * The allocated message will already have the type and size field set.
41 *
42 * @param mvar variable to store the allocated message in;
43 * must have a header field
44 * @param esize extra space to allocate after the message
45 * @param type type of the message
46 * @return the MQ message
47 */
48#define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), (esize) + sizeof *(mvar), (type))
36 49
50/**
51 * Allocate a GNUNET_MQ_Message.
52 * The allocated message will already have the type and size field set.
53 *
54 * @param mvar variable to store the allocated message in;
55 * must have a header field
56 * @param type type of the message
57 * @return the MQ message
58 */
37#define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) 59#define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type)
38 60
39#define GNUNET_MQ_msg_raw(type) GNUNET_MQ_msg_ (NULL, sizeof (struct GNUNET_MessageHeader), type) 61/**
62 * Allocate a GNUNET_MQ_Message, and concatenate another message
63 * after the space needed by the message struct.
64 *
65 * @param mvar variable to store the allocated message in;
66 * must have a header field
67 * @param mc message to concatenate, can be NULL
68 * @param type type of the message
69 * @return the MQ message, NULL if mc is to large to be concatenated
70 */
71#define GNUNET_MQ_msg_concat(mvar, mc, t) GNUNET_MQ_msg_concat_(((void) mvar->header, (struct GNUNET_MessageHeader **) &(mvar)), \
72 sizeof *mvar, (struct GNUNET_MessageHeader *) mc, t)
73
74
75/**
76 * Allocate a GNUNET_MQ_Message, where the message only consists of a header.
77 * The allocated message will already have the type and size field set.
78 *
79 * @param mvar variable to store the allocated message in;
80 * must have a header field
81 * @param type type of the message
82 */
83#define GNUNET_MQ_msg_header(type) GNUNET_MQ_msg_ (NULL, sizeof (struct GNUNET_MessageHeader), type)
40 84
85
86/**
87 * Allocate a GNUNET_MQ_Message, where the message only consists of a header and extra space.
88 * The allocated message will already have the type and size field set.
89 *
90 * @param mvar variable to store the allocated message in;
91 * must have a header field
92 * @param esize extra space to allocate after the message header
93 * @param type type of the message
94 */
95#define GNUNET_MQ_msg_header_extra(mh, esize, type) GNUNET_MQ_msg_ (&mh, sizeof (struct GNUNET_MessageHeader), type)
96
97
98/**
99 * End-marker for the handlers array
100 */
41#define GNUNET_MQ_HANDLERS_END {NULL, 0} 101#define GNUNET_MQ_HANDLERS_END {NULL, 0}
42 102
103/**
104 * Opaque handle to a message queue
105 */
43struct GNUNET_MQ_MessageQueue; 106struct GNUNET_MQ_MessageQueue;
44 107
108/**
109 * Opaque handle to an allocated message
110 */
45struct GNUNET_MQ_Message; 111struct GNUNET_MQ_Message;
46 112
113/**
114 * Called when a message has been received.
115 *
116 * @param cls closure
117 * @param msg the received message
118 */
119typedef void (*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg);
120
121
122/**
123 * Message handler for a specific message type.
124 */
47struct GNUNET_MQ_Handler 125struct GNUNET_MQ_Handler
48{ 126{
49 void *cb; 127 /**
128 * Callback, called every time a new message of
129 * the specified type has been receied.
130 */
131 GNUNET_MQ_MessageCallback cb;
132
133 /**
134 * Type of the message we are interested in
135 */
50 uint16_t type; 136 uint16_t type;
51}; 137};
52 138
@@ -63,11 +149,33 @@ typedef void (*GNUNET_MQ_NotifyCallback) (void *cls);
63 * @param mhp message header to store the allocated message header in, can be NULL 149 * @param mhp message header to store the allocated message header in, can be NULL
64 * @param size size of the message to allocate 150 * @param size size of the message to allocate
65 * @param type type of the message, will be set in the allocated message 151 * @param type type of the message, will be set in the allocated message
66 * @param return the allocated MQ message 152 * @return the allocated MQ message
67 */ 153 */
68struct GNUNET_MQ_Message * 154struct GNUNET_MQ_Message *
69GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type); 155GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type);
70 156
157
158/**
159 * Create a new message for MQ, by concatenating another message
160 * after a message of the specified type.
161 *
162 * @retrn the allocated MQ message
163 */
164struct GNUNET_MQ_Message *
165GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, struct GNUNET_MessageHeader *m, uint16_t type);
166
167
168/**
169 * Discard the message queue message, free all
170 * allocated resources. Must be called in the event
171 * that a message is created but should not actually be sent.
172 *
173 * @param mqm the message to discard
174 */
175void
176GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm);
177
178
71/** 179/**
72 * Send a message with the give message queue. 180 * Send a message with the give message queue.
73 * May only be called once per message. 181 * May only be called once per message.
@@ -78,6 +186,7 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
78void 186void
79GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm); 187GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm);
80 188
189
81/** 190/**
82 * Cancel sending the message. Message must have been sent with GNUNET_MQ_send before. 191 * Cancel sending the message. Message must have been sent with GNUNET_MQ_send before.
83 * May not be called after the notify sent callback has been called 192 * May not be called after the notify sent callback has been called
@@ -100,32 +209,105 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
100 struct GNUNET_MQ_Message *mqm, 209 struct GNUNET_MQ_Message *mqm,
101 void *assoc_data); 210 void *assoc_data);
102 211
212/**
213 * Get the data associated with a request id in a queue
214 *
215 * @param mq the message queue with the association
216 * @param request_id the request id we are interested in
217 * @return the associated data
218 */
103void * 219void *
104GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); 220GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id);
105 221
222
223/**
224 * Remove the association for a request id
225 *
226 * @param mq the message queue with the association
227 * @param request_id the request id we want to remove
228 * @return the associated data
229 */
106void * 230void *
107GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); 231GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id);
108 232
109 233
110 234
235/**
236 * Create a message queue for a GNUNET_CLIENT_Connection.
237 * If handlers are specfied, receive messages from the connection.
238 *
239 * @param connection the client connection
240 * @param handlers handlers for receiving messages
241 * @param cls closure for the handlers
242 * @return the message queue
243 */
111struct GNUNET_MQ_MessageQueue * 244struct GNUNET_MQ_MessageQueue *
112GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, 245GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
113 const struct GNUNET_MQ_Handler *handlers, 246 const struct GNUNET_MQ_Handler *handlers,
114 void *cls); 247 void *cls);
115 248
116 249
250/**
251 * Create a message queue for a GNUNET_STREAM_Socket.
252 *
253 * @param param client the client
254 * @return the message queue
255 */
256struct GNUNET_MQ_MessageQueue *
257GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client);
258
259
260
261/**
262 * Create a message queue for a GNUNET_STREAM_Socket.
263 * If handlers are specfied, receive messages from the stream socket.
264 *
265 * @param socket the stream socket
266 * @param handlers handlers for receiving messages
267 * @param cls closure for the handlers
268 * @return the message queue
269 */
270struct GNUNET_MQ_MessageQueue *
271GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
272 const struct GNUNET_MQ_Handler *handlers,
273 void *cls);
274
275/**
276 * Replace the handlers of a message queue with new handlers.
277 * Takes effect immediately, even for messages that already have been received, but for
278 * with the handler has not been called.
279 *
280 * @param mq message queue
281 * @param new_handlers new handlers
282 * @param cls new closure for the handlers
283 */
117void 284void
118GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, 285GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
119 GNUNET_MQ_NotifyCallback cb, 286 const struct GNUNET_MQ_Handler *new_handlers,
120 void *cls); 287 void *cls);
288
121 289
122 290
291/**
292 * Call a callback once the message has been sent, that is, the message
293 * can not be canceled anymore.
294 * There can be only one notify sent callback per message.
295 *
296 * @param mqm message to call the notify callback for
297 * @param cb the notify callback
298 * @param cls closure for the callback
299 */
123void 300void
124GNUNET_MQ_notify_timeout (struct GNUNET_MQ_Message *mqm, 301GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
125 GNUNET_MQ_NotifyCallback cb, 302 GNUNET_MQ_NotifyCallback cb,
126 void *cls); 303 void *cls);
127 304
128 305
306/**
307 * Destroy the message queue.
308 *
309 * @param mq message queue to destroy
310 */
129void 311void
130GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq); 312GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq);
131 313
diff --git a/src/set/set.conf.in b/src/set/set.conf.in
index e69de29bb..708b94b50 100644
--- a/src/set/set.conf.in
+++ b/src/set/set.conf.in
@@ -0,0 +1,11 @@
1[set]
2AUTOSTART = NO
3# PORT = 2106
4HOSTNAME = localhost
5HOME = $SERVICEHOME
6BINARY = gnunet-service-set
7ACCEPT_FROM = 127.0.0.1;
8ACCEPT_FROM6 = ::1;
9UNIXPATH = /tmp/gnunet-service-set.sock
10UNIX_MATCH_UID = YES
11UNIX_MATCH_GID = YES
diff --git a/src/set/set.h b/src/set/set.h
index ef11e5abc..10e607982 100644
--- a/src/set/set.h
+++ b/src/set/set.h
@@ -20,12 +20,13 @@
20 20
21/** 21/**
22 * @author Florian Dold 22 * @author Florian Dold
23 * @file consensus/consensus.h 23 * @file set/set.h
24 * @brief 24 * @brief messages used for the set api
25 */ 25 */
26#ifndef SET_H 26#ifndef SET_H
27#define SET_H 27#define SET_H
28 28
29#include "platform.h"
29#include "gnunet_common.h" 30#include "gnunet_common.h"
30 31
31 32
@@ -68,12 +69,6 @@ struct ListenMessage
68 * Operation type, values of enum GNUNET_SET_OperationType 69 * Operation type, values of enum GNUNET_SET_OperationType
69 */ 70 */
70 uint16_t operation GNUNET_PACKED; 71 uint16_t operation GNUNET_PACKED;
71
72 /**
73 * Operation type, values of enum GNUNET_SET_OperationType
74 */
75 uint16_t op GNUNET_PACKED;
76
77}; 72};
78 73
79 74
@@ -88,6 +83,12 @@ struct AcceptMessage
88 * request id of the request we want to accept 83 * request id of the request we want to accept
89 */ 84 */
90 uint32_t request_id GNUNET_PACKED; 85 uint32_t request_id GNUNET_PACKED;
86
87 /**
88 * Zero if the client has rejected the request,
89 * non-zero if it has accepted it
90 */
91 uint32_t accepted GNUNET_PACKED;
91}; 92};
92 93
93 94
@@ -119,8 +120,14 @@ struct EvaluateMessage
119 */ 120 */
120 struct GNUNET_MessageHeader header; 121 struct GNUNET_MessageHeader header;
121 122
122 struct GNUNET_PeerIdentity other_peer; 123 /**
124 * Peer to evaluate the operation with
125 */
126 struct GNUNET_PeerIdentity peer;
123 127
128 /**
129 * Application id
130 */
124 struct GNUNET_HashCode app_id; 131 struct GNUNET_HashCode app_id;
125 132
126 /** 133 /**
@@ -144,8 +151,15 @@ struct ResultMessage
144 */ 151 */
145 uint32_t request_id GNUNET_PACKED; 152 uint32_t request_id GNUNET_PACKED;
146 153
154 /**
155 * Was the evaluation successful?
156 */
147 uint16_t result_status GNUNET_PACKED; 157 uint16_t result_status GNUNET_PACKED;
148 158
159 /**
160 * Type of the element attachted to the message,
161 * if any.
162 */
149 uint16_t element_type GNUNET_PACKED; 163 uint16_t element_type GNUNET_PACKED;
150 164
151 /* rest: the actual element */ 165 /* rest: the actual element */
@@ -179,6 +193,7 @@ struct CancelMessage
179 uint32_t request_id GNUNET_PACKED; 193 uint32_t request_id GNUNET_PACKED;
180}; 194};
181 195
196
182GNUNET_NETWORK_STRUCT_END 197GNUNET_NETWORK_STRUCT_END
183 198
184#endif 199#endif
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 45c4e47bd..b2491afe7 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -93,7 +93,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
93 if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) 93 if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
94 { 94 {
95 struct GNUNET_MQ_Message *mqm; 95 struct GNUNET_MQ_Message *mqm;
96 mqm = GNUNET_MQ_msg_raw (GNUNET_MESSAGE_TYPE_SET_ACK); 96 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK);
97 GNUNET_MQ_send (set->mq, mqm); 97 GNUNET_MQ_send (set->mq, mqm);
98 } 98 }
99 99
@@ -136,7 +136,15 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
136 req->request_id = ntohl (msg->request_id); 136 req->request_id = ntohl (msg->request_id);
137 lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req); 137 lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req);
138 if (GNUNET_NO == req->accepted) 138 if (GNUNET_NO == req->accepted)
139 {
140 struct GNUNET_MQ_Message *mqm;
141 struct AcceptMessage *amsg;
142
143 mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
144 amsg->request_id = msg->request_id;
145 GNUNET_MQ_send (lh->mq, mqm);
139 GNUNET_free (req); 146 GNUNET_free (req);
147 }
140} 148}
141 149
142 150
@@ -152,7 +160,7 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
152 * @return a handle to the set 160 * @return a handle to the set
153 */ 161 */
154struct GNUNET_SET_Handle * 162struct GNUNET_SET_Handle *
155GNUNET_SET_create (struct GNUNET_CONFIGURATION_Handle *cfg, 163GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
156 enum GNUNET_SET_OperationType op) 164 enum GNUNET_SET_OperationType op)
157{ 165{
158 struct GNUNET_SET_Handle *set; 166 struct GNUNET_SET_Handle *set;
@@ -165,6 +173,7 @@ GNUNET_SET_create (struct GNUNET_CONFIGURATION_Handle *cfg,
165 173
166 set = GNUNET_new (struct GNUNET_SET_Handle); 174 set = GNUNET_new (struct GNUNET_SET_Handle);
167 set->client = GNUNET_CLIENT_connect ("set", cfg); 175 set->client = GNUNET_CLIENT_connect ("set", cfg);
176 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set client created\n");
168 GNUNET_assert (NULL != set->client); 177 GNUNET_assert (NULL != set->client);
169 set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set); 178 set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set);
170 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); 179 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
@@ -295,7 +304,7 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
295 304
296 mqm = GNUNET_MQ_msg_extra (msg, htons(context_msg->size), GNUNET_MESSAGE_TYPE_SET_EVALUATE); 305 mqm = GNUNET_MQ_msg_extra (msg, htons(context_msg->size), GNUNET_MESSAGE_TYPE_SET_EVALUATE);
297 msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh)); 306 msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh));
298 msg->other_peer = *other_peer; 307 msg->peer = *other_peer;
299 msg->app_id = *app_id; 308 msg->app_id = *app_id;
300 memcpy (&msg[1], context_msg, htons (context_msg->size)); 309 memcpy (&msg[1], context_msg, htons (context_msg->size));
301 oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); 310 oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh);
@@ -318,7 +327,7 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
318 * @return a handle that can be used to cancel the listen operation 327 * @return a handle that can be used to cancel the listen operation
319 */ 328 */
320struct GNUNET_SET_ListenHandle * 329struct GNUNET_SET_ListenHandle *
321GNUNET_SET_listen (struct GNUNET_CONFIGURATION_Handle *cfg, 330GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
322 enum GNUNET_SET_OperationType operation, 331 enum GNUNET_SET_OperationType operation,
323 const struct GNUNET_HashCode *app_id, 332 const struct GNUNET_HashCode *app_id,
324 GNUNET_SET_ListenCallback listen_cb, 333 GNUNET_SET_ListenCallback listen_cb,
@@ -396,9 +405,11 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
396 405
397 mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT); 406 mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT);
398 msg->request_id = htonl (request->request_id); 407 msg->request_id = htonl (request->request_id);
399 oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); 408 msg->accepted = 1;
400 GNUNET_MQ_send (set->mq, mqm); 409 GNUNET_MQ_send (set->mq, mqm);
401 410
411 oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh);
412
402 return oh; 413 return oh;
403} 414}
404 415
@@ -416,7 +427,7 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *h)
416 427
417 h_assoc = GNUNET_MQ_assoc_remove (h->set->mq, h->request_id); 428 h_assoc = GNUNET_MQ_assoc_remove (h->set->mq, h->request_id);
418 GNUNET_assert (h_assoc == h); 429 GNUNET_assert (h_assoc == h);
419 mqm = GNUNET_MQ_msg_raw (GNUNET_MESSAGE_TYPE_SET_CANCEL); 430 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
420 GNUNET_MQ_send (h->set->mq, mqm); 431 GNUNET_MQ_send (h->set->mq, mqm);
421 GNUNET_free (h); 432 GNUNET_free (h);
422} 433}
diff --git a/src/set/strata_estimator.c b/src/set/strata_estimator.c
index 685c50f0f..f42c827e2 100644
--- a/src/set/strata_estimator.c
+++ b/src/set/strata_estimator.c
@@ -53,15 +53,15 @@ strata_estimator_read (const void *buf, struct StrataEstimator *se)
53 53
54 54
55void 55void
56strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key) 56strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key)
57{ 57{
58 uint32_t v; 58 uint64_t v;
59 int i; 59 int i;
60 v = key->bits[0]; 60 v = key.key_val;
61 /* count trailing '1'-bits of v */ 61 /* count trailing '1'-bits of v */
62 for (i = 0; v & 1; v>>=1, i++) 62 for (i = 0; v & 1; v>>=1, i++)
63 /* empty */; 63 /* empty */;
64 ibf_insert (se->strata[i], ibf_key_from_hashcode (key)); 64 ibf_insert (se->strata[i], key);
65} 65}
66 66
67 67
diff --git a/src/set/strata_estimator.h b/src/set/strata_estimator.h
index cb5bd3d0a..57f0961a9 100644
--- a/src/set/strata_estimator.h
+++ b/src/set/strata_estimator.h
@@ -66,7 +66,7 @@ strata_estimator_difference (const struct StrataEstimator *se1,
66 66
67 67
68void 68void
69strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key); 69strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key);
70 70
71 71
72void 72void
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c
index 7e2ed2b41..0753fd139 100644
--- a/src/set/test_set_api.c
+++ b/src/set/test_set_api.c
@@ -44,8 +44,8 @@ run (void *cls,
44 struct GNUNET_SET_Handle *set1; 44 struct GNUNET_SET_Handle *set1;
45 struct GNUNET_SET_Handle *set2; 45 struct GNUNET_SET_Handle *set2;
46 46
47 set1 = GNUNET_SET_create (GNUNET_SET_OPERATION_UNION); 47 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
48 set2 = GNUNET_SET_create (GNUNET_SET_OPERATION_UNION); 48 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
49} 49}
50 50
51int 51int