aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-05-15 10:48:55 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-05-15 10:48:55 +0000
commit6f54b50858457dfa2b5f0b519fbf230e1119c6b2 (patch)
tree5f84bfa599cb50522999cad892344e2fecbfa963 /src/set
parent6625c27a83831b61a80683f4385b6a90b9a45b31 (diff)
downloadgnunet-6f54b50858457dfa2b5f0b519fbf230e1119c6b2.tar.gz
gnunet-6f54b50858457dfa2b5f0b519fbf230e1119c6b2.zip
test cases for mq, set works
Diffstat (limited to 'src/set')
-rw-r--r--src/set/Makefile.am20
-rw-r--r--src/set/gnunet-service-set.c180
-rw-r--r--src/set/gnunet-service-set.h28
-rw-r--r--src/set/gnunet-service-set_union.c262
-rw-r--r--src/set/gnunet-set-bug.c2
-rw-r--r--src/set/gnunet-set.c115
-rw-r--r--src/set/ibf.c1
-rw-r--r--src/set/mq.c130
-rw-r--r--src/set/mq.h3
-rw-r--r--src/set/set.h28
-rw-r--r--src/set/set_api.c44
-rw-r--r--src/set/strata_estimator.c2
-rw-r--r--src/set/test_mq.c115
-rw-r--r--src/set/test_mq_client.c181
-rw-r--r--src/set/test_set.conf23
-rw-r--r--src/set/test_set_api.c101
16 files changed, 1093 insertions, 142 deletions
diff --git a/src/set/Makefile.am b/src/set/Makefile.am
index 2de531ce3..a609840b1 100644
--- a/src/set/Makefile.am
+++ b/src/set/Makefile.am
@@ -69,7 +69,7 @@ libgnunetset_la_LDFLAGS = \
69 $(GN_LIB_LDFLAGS) 69 $(GN_LIB_LDFLAGS)
70 70
71check_PROGRAMS = \ 71check_PROGRAMS = \
72 test_set_api 72 test_set_api test_mq test_mq_client
73 73
74if ENABLE_TEST_RUN 74if ENABLE_TEST_RUN
75TESTS = $(check_PROGRAMS) 75TESTS = $(check_PROGRAMS)
@@ -84,6 +84,24 @@ test_set_api_LDADD = \
84test_set_api_DEPENDENCIES = \ 84test_set_api_DEPENDENCIES = \
85 libgnunetset.la 85 libgnunetset.la
86 86
87
88test_mq_SOURCES = \
89 test_mq.c \
90 mq.c
91test_mq_LDADD = \
92 $(top_builddir)/src/util/libgnunetutil.la \
93 $(top_builddir)/src/stream/libgnunetstream.la
94test_mq_CFLAGS = $(AM_CFLAGS)
95
96
97test_mq_client_SOURCES = \
98 test_mq_client.c \
99 mq.c
100test_mq_client_LDADD = \
101 $(top_builddir)/src/util/libgnunetutil.la \
102 $(top_builddir)/src/stream/libgnunetstream.la
103test_mq_client_CFLAGS = $(AM_CFLAGS)
104
87EXTRA_DIST = \ 105EXTRA_DIST = \
88 test_set.conf 106 test_set.conf
89 107
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index aea198a61..3ed896775 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -72,24 +72,11 @@ static struct Incoming *incoming_head;
72static struct Incoming *incoming_tail; 72static struct Incoming *incoming_tail;
73 73
74/** 74/**
75 * Counter for allocating unique request IDs for clients. 75 * Counter for allocating unique IDs for clients,
76 * Used to identify incoming requests from remote peers. 76 * used to identify incoming operation requests from remote peers,
77 * that the client can choose to accept or refuse.
77 */ 78 */
78static uint32_t request_id = 1; 79static uint32_t accept_id = 1;
79
80
81/**
82 * Disconnect a client and free all resources
83 * that the client allocated (e.g. Sets or Listeners)
84 *
85 * @param client the client to disconnect
86 */
87void
88_GSS_client_disconnect (struct GNUNET_SERVER_Client *client)
89{
90 /* FIXME: clean up any data structures belonging to the client */
91 GNUNET_SERVER_client_disconnect (client);
92}
93 80
94 81
95/** 82/**
@@ -140,13 +127,84 @@ get_incoming (uint32_t id)
140{ 127{
141 struct Incoming *incoming; 128 struct Incoming *incoming;
142 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) 129 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
143 if (incoming->request_id == id) 130 if (incoming->accept_id == id)
144 return incoming; 131 return incoming;
145 return NULL; 132 return NULL;
146} 133}
147 134
148 135
149/** 136/**
137 * Destroy a listener, free all resources associated with it.
138 *
139 * @param listener listener to destroy
140 */
141static void
142destroy_listener (struct Listener *listener)
143{
144 if (NULL != listener->client_mq)
145 {
146 GNUNET_MQ_destroy (listener->client_mq);
147 listener->client_mq = NULL;
148 }
149 if (NULL != listener->client)
150 {
151 GNUNET_SERVER_client_drop (listener->client);
152 listener->client = NULL;
153 }
154
155 GNUNET_CONTAINER_DLL_remove (listeners_head, listeners_tail, listener);
156 GNUNET_free (listener);
157}
158
159
160/**
161 * Destroy a set, and free all resources associated with it.
162 *
163 * @param set the set to destroy
164 */
165static void
166destroy_set (struct Set *set)
167{
168 switch (set->operation)
169 {
170 case GNUNET_SET_OPERATION_INTERSECTION:
171 GNUNET_assert (0);
172 break;
173 case GNUNET_SET_OPERATION_UNION:
174 _GSS_union_set_destroy (set);
175 break;
176 default:
177 GNUNET_assert (0);
178 break;
179 }
180 GNUNET_CONTAINER_DLL_remove (sets_head, sets_tail, set);
181 GNUNET_free (set);
182}
183
184
185/**
186 * Clean up after a client after it is
187 * disconnected (either by us or by itself)
188 *
189 * @param cls closure, unused
190 * @param client the client to clean up after
191 */
192void
193handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
194{
195 struct Set *set;
196 struct Listener *listener;
197
198 set = get_set (client);
199 if (NULL != set)
200 destroy_set (set);
201 listener = get_listener (client);
202 if (NULL != listener)
203 destroy_listener (listener);
204}
205
206
207/**
150 * Destroy an incoming request from a remote peer 208 * Destroy an incoming request from a remote peer
151 * 209 *
152 * @param incoming remote request to destroy 210 * @param incoming remote request to destroy
@@ -186,16 +244,16 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
186 struct Listener *listener; 244 struct Listener *listener;
187 const struct GNUNET_MessageHeader *context_msg; 245 const struct GNUNET_MessageHeader *context_msg;
188 246
189 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got operation request\n");
190
191 if (ntohs (mh->size) < sizeof *msg) 247 if (ntohs (mh->size) < sizeof *msg)
192 { 248 {
249 /* message is to small for its type */
193 GNUNET_break (0); 250 GNUNET_break (0);
194 destroy_incoming (incoming); 251 destroy_incoming (incoming);
195 return; 252 return;
196 } 253 }
197 else if (ntohs (mh->size) == sizeof *msg) 254 else if (ntohs (mh->size) == sizeof *msg)
198 { 255 {
256 /* there is no context message */
199 context_msg = NULL; 257 context_msg = NULL;
200 } 258 }
201 else 259 else
@@ -209,13 +267,17 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
209 return; 267 return;
210 } 268 }
211 } 269 }
270
271 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n",
272 ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
273
212 /* find the appropriate listener */ 274 /* find the appropriate listener */
213 for (listener = listeners_head; 275 for (listener = listeners_head;
214 listener != NULL; 276 listener != NULL;
215 listener = listener->next) 277 listener = listener->next)
216 { 278 {
217 if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) || 279 if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) ||
218 (htons (msg->operation) != listener->operation) ) 280 (ntohs (msg->operation) != listener->operation) )
219 continue; 281 continue;
220 mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST); 282 mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST);
221 if (GNUNET_OK != GNUNET_MQ_nest_mh (mqm, context_msg)) 283 if (GNUNET_OK != GNUNET_MQ_nest_mh (mqm, context_msg))
@@ -225,11 +287,15 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
225 GNUNET_break (0); 287 GNUNET_break (0);
226 return; 288 return;
227 } 289 }
228 incoming->request_id = request_id++; 290 incoming->accept_id = accept_id++;
229 cmsg->request_id = htonl (incoming->request_id); 291 cmsg->accept_id = htonl (incoming->accept_id);
230 GNUNET_MQ_send (listener->client_mq, mqm); 292 GNUNET_MQ_send (listener->client_mq, mqm);
231 return; 293 return;
232 } 294 }
295
296 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
297 "set operation request from peer failed: "
298 "no set with matching application ID and operation type\n");
233} 299}
234 300
235 301
@@ -248,7 +314,8 @@ handle_client_create (void *cls,
248 struct SetCreateMessage *msg = (struct SetCreateMessage *) m; 314 struct SetCreateMessage *msg = (struct SetCreateMessage *) m;
249 struct Set *set; 315 struct Set *set;
250 316
251 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new set created\n"); 317 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n",
318 ntohs (msg->operation));
252 319
253 if (NULL != get_set (client)) 320 if (NULL != get_set (client))
254 { 321 {
@@ -276,9 +343,9 @@ handle_client_create (void *cls,
276 } 343 }
277 344
278 set->client = client; 345 set->client = client;
346 GNUNET_SERVER_client_keep (client);
279 set->client_mq = GNUNET_MQ_queue_for_server_client (client); 347 set->client_mq = GNUNET_MQ_queue_for_server_client (client);
280 GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set); 348 GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set);
281
282 GNUNET_SERVER_receive_done (client, GNUNET_OK); 349 GNUNET_SERVER_receive_done (client, GNUNET_OK);
283} 350}
284 351
@@ -297,19 +364,22 @@ handle_client_listen (void *cls,
297{ 364{
298 struct ListenMessage *msg = (struct ListenMessage *) m; 365 struct ListenMessage *msg = (struct ListenMessage *) m;
299 struct Listener *listener; 366 struct Listener *listener;
300 367
301 if (NULL != get_listener (client)) 368 if (NULL != get_listener (client))
302 { 369 {
303 GNUNET_break (0); 370 GNUNET_break (0);
304 GNUNET_SERVER_client_disconnect (client); 371 GNUNET_SERVER_client_disconnect (client);
305 return; 372 return;
306 } 373 }
307
308 listener = GNUNET_new (struct Listener); 374 listener = GNUNET_new (struct Listener);
375 listener->client = client;
376 GNUNET_SERVER_client_keep (client);
377 listener->client_mq = GNUNET_MQ_queue_for_server_client (client);
309 listener->app_id = msg->app_id; 378 listener->app_id = msg->app_id;
310 listener->operation = msg->operation; 379 listener->operation = ntohs (msg->operation);
311 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); 380 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
312 381 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new listener created (op %u, app %s)\n",
382 listener->operation, GNUNET_h2s (&listener->app_id));
313 GNUNET_SERVER_receive_done (client, GNUNET_OK); 383 GNUNET_SERVER_receive_done (client, GNUNET_OK);
314} 384}
315 385
@@ -333,13 +403,13 @@ handle_client_remove (void *cls,
333 if (NULL == set) 403 if (NULL == set)
334 { 404 {
335 GNUNET_break (0); 405 GNUNET_break (0);
336 _GSS_client_disconnect (client); 406 GNUNET_SERVER_client_disconnect (client);
337 return; 407 return;
338 } 408 }
339 switch (set->operation) 409 switch (set->operation)
340 { 410 {
341 case GNUNET_SET_OPERATION_UNION: 411 case GNUNET_SET_OPERATION_UNION:
342 _GSS_union_add ((struct ElementMessage *) m, set); 412 _GSS_union_remove ((struct ElementMessage *) m, set);
343 case GNUNET_SET_OPERATION_INTERSECTION: 413 case GNUNET_SET_OPERATION_INTERSECTION:
344 /* FIXME: cfuchs */ 414 /* FIXME: cfuchs */
345 break; 415 break;
@@ -371,13 +441,13 @@ handle_client_add (void *cls,
371 if (NULL == set) 441 if (NULL == set)
372 { 442 {
373 GNUNET_break (0); 443 GNUNET_break (0);
374 _GSS_client_disconnect (client); 444 GNUNET_SERVER_client_disconnect (client);
375 return; 445 return;
376 } 446 }
377 switch (set->operation) 447 switch (set->operation)
378 { 448 {
379 case GNUNET_SET_OPERATION_UNION: 449 case GNUNET_SET_OPERATION_UNION:
380 _GSS_union_remove ((struct ElementMessage *) m, set); 450 _GSS_union_add ((struct ElementMessage *) m, set);
381 case GNUNET_SET_OPERATION_INTERSECTION: 451 case GNUNET_SET_OPERATION_INTERSECTION:
382 /* FIXME: cfuchs */ 452 /* FIXME: cfuchs */
383 break; 453 break;
@@ -408,7 +478,7 @@ handle_client_evaluate (void *cls,
408 if (NULL == set) 478 if (NULL == set)
409 { 479 {
410 GNUNET_break (0); 480 GNUNET_break (0);
411 _GSS_client_disconnect (client); 481 GNUNET_SERVER_client_disconnect (client);
412 return; 482 return;
413 } 483 }
414 484
@@ -481,22 +551,30 @@ handle_client_accept (void *cls,
481 struct Incoming *incoming; 551 struct Incoming *incoming;
482 struct AcceptMessage *msg = (struct AcceptMessage *) mh; 552 struct AcceptMessage *msg = (struct AcceptMessage *) mh;
483 553
484 set = get_set (client);
485 554
486 if (NULL == set) 555 incoming = get_incoming (ntohl (msg->accept_id));
556
557 if (NULL == incoming)
487 { 558 {
488 GNUNET_break (0); 559 GNUNET_break (0);
489 _GSS_client_disconnect (client); 560 GNUNET_SERVER_client_disconnect (client);
490 return; 561 return;
491 } 562 }
492 563
493 incoming = get_incoming (ntohl (msg->request_id)); 564 if (0 == ntohl (msg->request_id))
565 {
566 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
567 destroy_incoming (incoming);
568 GNUNET_SERVER_receive_done (client, GNUNET_OK);
569 return;
570 }
571
572 set = get_set (client);
494 573
495 if ( (NULL == incoming) || 574 if (NULL == set)
496 (incoming->operation != set->operation) )
497 { 575 {
498 GNUNET_break (0); 576 GNUNET_break (0);
499 _GSS_client_disconnect (client); 577 GNUNET_SERVER_client_disconnect (client);
500 return; 578 return;
501 } 579 }
502 580
@@ -513,8 +591,10 @@ handle_client_accept (void *cls,
513 GNUNET_assert (0); 591 GNUNET_assert (0);
514 break; 592 break;
515 } 593 }
516 /* FIXME: destroy incoming */
517 594
595 /* note: _GSS_*_accept has to make sure the socket and mq are set to NULL,
596 * otherwise they will be destroyed and disconnected */
597 destroy_incoming (incoming);
518 GNUNET_SERVER_receive_done (client, GNUNET_OK); 598 GNUNET_SERVER_receive_done (client, GNUNET_OK);
519} 599}
520 600
@@ -574,6 +654,21 @@ shutdown_task (void *cls,
574 stream_listen_socket = NULL; 654 stream_listen_socket = NULL;
575 } 655 }
576 656
657 while (NULL != incoming_head)
658 {
659 destroy_incoming (incoming_head);
660 }
661
662 while (NULL != listeners_head)
663 {
664 destroy_listener (listeners_head);
665 }
666
667 while (NULL != sets_head)
668 {
669 destroy_set (sets_head);
670 }
671
577 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); 672 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
578} 673}
579 674
@@ -604,6 +699,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
604 699
605 configuration = cfg; 700 configuration = cfg;
606 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); 701 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
702 GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
607 GNUNET_SERVER_add_handlers (server, server_handlers); 703 GNUNET_SERVER_add_handlers (server, server_handlers);
608 stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET, 704 stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET,
609 &stream_listen_cb, NULL, 705 &stream_listen_cb, NULL,
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index a5a53671c..cc28e9701 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -109,8 +109,8 @@ struct Listener
109 struct Listener *prev; 109 struct Listener *prev;
110 110
111 /** 111 /**
112 * Client that owns the set. 112 * Client that owns the listener.
113 * Only one client may own a set. 113 * Only one client may own a listener.
114 */ 114 */
115 struct GNUNET_SERVER_Client *client; 115 struct GNUNET_SERVER_Client *client;
116 116
@@ -188,9 +188,10 @@ struct Incoming
188 188
189 /** 189 /**
190 * Unique request id for the request from 190 * Unique request id for the request from
191 * a remote peer. 191 * a remote peer, sent to the client with will
192 * accept or reject the request.
192 */ 193 */
193 uint32_t request_id; 194 uint32_t accept_id;
194}; 195};
195 196
196 197
@@ -201,16 +202,6 @@ extern const struct GNUNET_CONFIGURATION_Handle *configuration;
201 202
202 203
203/** 204/**
204 * Disconnect a client and free all resources
205 * that the client allocated (e.g. Sets or Listeners)
206 *
207 * @param client the client to disconnect
208 */
209void
210_GSS_client_disconnect (struct GNUNET_SERVER_Client *client);
211
212
213/**
214 * Create a new set supporting the union operation 205 * Create a new set supporting the union operation
215 * 206 *
216 * @return the newly created set 207 * @return the newly created set
@@ -252,6 +243,15 @@ _GSS_union_remove (struct ElementMessage *m, struct Set *set);
252 243
253 244
254/** 245/**
246 * Destroy a set that supports the union operation
247 *
248 * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
249 */
250void
251_GSS_union_set_destroy (struct Set *set);
252
253
254/**
255 * Accept an union operation request from a remote peer 255 * Accept an union operation request from a remote peer
256 * 256 *
257 * @param m the accept message from the client 257 * @param m the accept message from the client
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index efedbcef6..694fb6056 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -135,11 +135,6 @@ struct UnionEvaluateOperation
135 struct GNUNET_MQ_MessageQueue *mq; 135 struct GNUNET_MQ_MessageQueue *mq;
136 136
137 /** 137 /**
138 * Type of this operation
139 */
140 enum GNUNET_SET_OperationType operation;
141
142 /**
143 * Request ID to multiplex set operations to 138 * Request ID to multiplex set operations to
144 * the client inhabiting the set. 139 * the client inhabiting the set.
145 */ 140 */
@@ -330,6 +325,45 @@ struct UnionState
330}; 325};
331 326
332 327
328
329/**
330 * Iterator over hash map entries.
331 *
332 * @param cls closure
333 * @param key current key code
334 * @param value value in the hash map
335 * @return GNUNET_YES if we should continue to
336 * iterate,
337 * GNUNET_NO if not.
338 */
339static int
340destroy_elements_iterator (void *cls,
341 const struct GNUNET_HashCode * key,
342 void *value)
343{
344 struct ElementEntry *ee = value;
345
346 GNUNET_free (ee);
347 return GNUNET_YES;
348}
349
350
351/**
352 * Destroy the elements belonging to a union set.
353 *
354 * @param us union state that contains the elements
355 */
356static void
357destroy_elements (struct UnionState *us)
358{
359 if (NULL == us->elements)
360 return;
361 GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
362 GNUNET_CONTAINER_multihashmap_destroy (us->elements);
363 us->elements = NULL;
364}
365
366
333/** 367/**
334 * Destroy a union operation, and free all resources 368 * Destroy a union operation, and free all resources
335 * associated with it. 369 * associated with it.
@@ -339,6 +373,38 @@ struct UnionState
339static void 373static void
340destroy_union_operation (struct UnionEvaluateOperation *eo) 374destroy_union_operation (struct UnionEvaluateOperation *eo)
341{ 375{
376 if (NULL != eo->mq)
377 {
378 GNUNET_MQ_destroy (eo->mq);
379 eo->mq = NULL;
380 }
381 if (NULL != eo->socket)
382 {
383 GNUNET_STREAM_close (eo->socket);
384 eo->socket = NULL;
385 }
386 if (NULL != eo->remote_ibf)
387 {
388 ibf_destroy (eo->remote_ibf);
389 eo->remote_ibf = NULL;
390 }
391 if (NULL != eo->local_ibf)
392 {
393 ibf_destroy (eo->local_ibf);
394 eo->local_ibf = NULL;
395 }
396 if (NULL != eo->se)
397 {
398 strata_estimator_destroy (eo->se);
399 eo->se = NULL;
400 }
401 if (NULL != eo->key_to_element)
402 {
403 GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
404 eo->key_to_element = NULL;
405 }
406
407
342 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, 408 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
343 eo->set->state.u->ops_tail, 409 eo->set->state.u->ops_tail,
344 eo); 410 eo);
@@ -361,7 +427,7 @@ fail_union_operation (struct UnionEvaluateOperation *eo)
361 427
362 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 428 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
363 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 429 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
364 msg->request_id = eo->request_id; 430 msg->request_id = htonl (eo->request_id);
365 GNUNET_MQ_send (eo->set->client_mq, mqm); 431 GNUNET_MQ_send (eo->set->client_mq, mqm);
366 destroy_union_operation (eo); 432 destroy_union_operation (eo);
367} 433}
@@ -405,12 +471,12 @@ send_operation_request (struct UnionEvaluateOperation *eo)
405 if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size))) 471 if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size)))
406 { 472 {
407 /* the context message is too large */ 473 /* the context message is too large */
408 _GSS_client_disconnect (eo->set->client);
409 GNUNET_MQ_discard (mqm);
410 GNUNET_break (0); 474 GNUNET_break (0);
475 GNUNET_SERVER_client_disconnect (eo->set->client);
476 GNUNET_MQ_discard (mqm);
411 return; 477 return;
412 } 478 }
413 msg->operation = eo->operation; 479 msg->operation = htons (GNUNET_SET_OPERATION_UNION);
414 msg->app_id = eo->app_id; 480 msg->app_id = eo->app_id;
415 GNUNET_MQ_send (eo->mq, mqm); 481 GNUNET_MQ_send (eo->mq, mqm);
416 482
@@ -547,7 +613,7 @@ prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
547 { 613 {
548 unsigned int len; 614 unsigned int len;
549 len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements); 615 len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
550 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len); 616 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
551 GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements, 617 GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
552 init_key_to_element_iterator, eo); 618 init_key_to_element_iterator, eo);
553 } 619 }
@@ -573,6 +639,8 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
573 639
574 prepare_ibf (eo, 1<<ibf_order); 640 prepare_ibf (eo, 1<<ibf_order);
575 641
642 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order);
643
576 ibf = eo->local_ibf; 644 ibf = eo->local_ibf;
577 645
578 while (buckets_sent < (1 << ibf_order)) 646 while (buckets_sent < (1 << ibf_order))
@@ -588,7 +656,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
588 656
589 mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, 657 mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
590 GNUNET_MESSAGE_TYPE_SET_P2P_IBF); 658 GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
591 msg->order = htons (ibf_order); 659 msg->order = ibf_order;
592 msg->offset = htons (buckets_sent); 660 msg->offset = htons (buckets_sent);
593 ibf_write_slice (ibf, buckets_sent, 661 ibf_write_slice (ibf, buckets_sent,
594 buckets_in_message, &msg[1]); 662 buckets_in_message, &msg[1]);
@@ -654,7 +722,6 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
654 struct StrataEstimator *remote_se; 722 struct StrataEstimator *remote_se;
655 int diff; 723 int diff;
656 724
657 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n");
658 725
659 if (eo->phase != PHASE_EXPECT_SE) 726 if (eo->phase != PHASE_EXPECT_SE)
660 { 727 {
@@ -667,6 +734,7 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
667 strata_estimator_read (&mh[1], remote_se); 734 strata_estimator_read (&mh[1], remote_se);
668 GNUNET_assert (NULL != eo->se); 735 GNUNET_assert (NULL != eo->se);
669 diff = strata_estimator_difference (remote_se, eo->se); 736 diff = strata_estimator_difference (remote_se, eo->se);
737 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
670 strata_estimator_destroy (remote_se); 738 strata_estimator_destroy (remote_se);
671 strata_estimator_destroy (eo->se); 739 strata_estimator_destroy (eo->se);
672 eo->se = NULL; 740 eo->se = NULL;
@@ -708,6 +776,7 @@ send_element_iterator (void *cls,
708 continue; 776 continue;
709 } 777 }
710 GNUNET_MQ_send (eo->mq, mqm); 778 GNUNET_MQ_send (eo->mq, mqm);
779 ke = ke->next_colliding;
711 } 780 }
712 return GNUNET_NO; 781 return GNUNET_NO;
713} 782}
@@ -731,7 +800,6 @@ send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key
731} 800}
732 801
733 802
734
735/** 803/**
736 * Decode which elements are missing on each side, and 804 * Decode which elements are missing on each side, and
737 * send the appropriate elemens and requests 805 * send the appropriate elemens and requests
@@ -758,11 +826,22 @@ decode_and_send (struct UnionEvaluateOperation *eo)
758 res = ibf_decode (diff_ibf, &side, &key); 826 res = ibf_decode (diff_ibf, &side, &key);
759 if (GNUNET_SYSERR == res) 827 if (GNUNET_SYSERR == res)
760 { 828 {
761 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n", 829 int next_order;
762 diff_ibf->size * 2); 830 next_order = 0;
763 send_ibf (eo, diff_ibf->size * 2); 831 while (1<<next_order < diff_ibf->size)
764 ibf_destroy (diff_ibf); 832 next_order++;
765 return; 833 next_order++;
834 if (next_order <= MAX_IBF_ORDER)
835 {
836 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n",
837 1<<next_order);
838 send_ibf (eo, next_order);
839 }
840 else
841 {
842 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "set union failed: reached ibf limit\n");
843 }
844 break;
766 } 845 }
767 if (GNUNET_NO == res) 846 if (GNUNET_NO == res)
768 { 847 {
@@ -771,7 +850,7 @@ decode_and_send (struct UnionEvaluateOperation *eo)
771 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); 850 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
772 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 851 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
773 GNUNET_MQ_send (eo->mq, mqm); 852 GNUNET_MQ_send (eo->mq, mqm);
774 return; 853 break;
775 } 854 }
776 if (1 == side) 855 if (1 == side)
777 { 856 {
@@ -790,6 +869,7 @@ decode_and_send (struct UnionEvaluateOperation *eo)
790 GNUNET_MQ_send (eo->mq, mqm); 869 GNUNET_MQ_send (eo->mq, mqm);
791 } 870 }
792 } 871 }
872 ibf_destroy (diff_ibf);
793} 873}
794 874
795 875
@@ -811,6 +891,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
811 { 891 {
812 eo->phase = PHASE_EXPECT_IBF_CONT; 892 eo->phase = PHASE_EXPECT_IBF_CONT;
813 GNUNET_assert (NULL == eo->remote_ibf); 893 GNUNET_assert (NULL == eo->remote_ibf);
894 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order);
814 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); 895 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
815 if (0 != ntohs (msg->offset)) 896 if (0 != ntohs (msg->offset))
816 { 897 {
@@ -825,6 +906,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
825 { 906 {
826 GNUNET_break (0); 907 GNUNET_break (0);
827 fail_union_operation (eo); 908 fail_union_operation (eo);
909 return;
828 } 910 }
829 } 911 }
830 912
@@ -834,13 +916,16 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
834 { 916 {
835 GNUNET_break (0); 917 GNUNET_break (0);
836 fail_union_operation (eo); 918 fail_union_operation (eo);
919 return;
837 } 920 }
838 921
839 ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); 922 ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
840 eo->ibf_buckets_received += buckets_in_message; 923 eo->ibf_buckets_received += buckets_in_message;
841 924
842 if (eo->ibf_buckets_received == eo->remote_ibf->size) 925 if (eo->ibf_buckets_received == eo->remote_ibf->size)
843 { 926 {
927
928 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
844 eo->phase = PHASE_EXPECT_ELEMENTS; 929 eo->phase = PHASE_EXPECT_ELEMENTS;
845 decode_and_send (eo); 930 decode_and_send (eo);
846 } 931 }
@@ -848,7 +933,8 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
848 933
849 934
850/** 935/**
851 * Send an element to the client of the operations's set. 936 * Send a result message to the client indicating
937 * that there is a new element.
852 * 938 *
853 * @param eo union operation 939 * @param eo union operation
854 * @param element element to send 940 * @param element element to send
@@ -862,6 +948,8 @@ send_client_element (struct UnionEvaluateOperation *eo,
862 948
863 GNUNET_assert (0 != eo->request_id); 949 GNUNET_assert (0 != eo->request_id);
864 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 950 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
951 rm->result_status = htons (GNUNET_SET_STATUS_OK);
952 rm->request_id = htonl (eo->request_id);
865 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) 953 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
866 { 954 {
867 GNUNET_MQ_discard (mqm); 955 GNUNET_MQ_discard (mqm);
@@ -869,7 +957,46 @@ send_client_element (struct UnionEvaluateOperation *eo,
869 return; 957 return;
870 } 958 }
871 959
872 GNUNET_MQ_send (eo->mq, mqm); 960 GNUNET_MQ_send (eo->set->client_mq, mqm);
961}
962
963
964/**
965 * Callback used for notifications
966 *
967 * @param cls closure
968 */
969static void
970client_done_sent_cb (void *cls)
971{
972 //struct UnionEvaluateOperation *eo = cls;
973 /* FIXME: destroy eo */
974}
975
976
977/**
978 * Send a result message to the client indicating
979 * that the operation is over.
980 * After the result done message has been sent to the client,
981 * destroy the evaluate operation.
982 *
983 * @param eo union operation
984 * @param element element to send
985 */
986static void
987send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
988{
989 struct GNUNET_MQ_Message *mqm;
990 struct ResultMessage *rm;
991
992 GNUNET_assert (0 != eo->request_id);
993 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
994 rm->request_id = htonl (eo->request_id);
995 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
996 GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo);
997 GNUNET_MQ_send (eo->set->client_mq, mqm);
998
999 /* FIXME: destroy the eo */
873} 1000}
874 1001
875 1002
@@ -886,6 +1013,8 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
886 struct ElementEntry *ee; 1013 struct ElementEntry *ee;
887 uint16_t element_size; 1014 uint16_t element_size;
888 1015
1016 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
1017
889 if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && 1018 if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
890 (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) 1019 (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
891 { 1020 {
@@ -920,8 +1049,8 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
920 /* look up elements and send them */ 1049 /* look up elements and send them */
921 if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) 1050 if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
922 { 1051 {
923 fail_union_operation (eo);
924 GNUNET_break (0); 1052 GNUNET_break (0);
1053 fail_union_operation (eo);
925 return; 1054 return;
926 } 1055 }
927 1056
@@ -929,8 +1058,8 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
929 1058
930 if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) 1059 if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
931 { 1060 {
932 fail_union_operation (eo);
933 GNUNET_break (0); 1061 GNUNET_break (0);
1062 fail_union_operation (eo);
934 return; 1063 return;
935 } 1064 }
936 1065
@@ -944,6 +1073,20 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
944 1073
945 1074
946/** 1075/**
1076 * Callback used for notifications
1077 *
1078 * @param cls closure
1079 */
1080static void
1081peer_done_sent_cb (void *cls)
1082{
1083 struct UnionEvaluateOperation *eo = cls;
1084
1085 send_client_done_and_destroy (eo);
1086}
1087
1088
1089/**
947 * Handle a done message from a remote peer 1090 * Handle a done message from a remote peer
948 * 1091 *
949 * @param cls the union operation 1092 * @param cls the union operation
@@ -959,15 +1102,18 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
959 /* we got all requests, but still have to send our elements as response */ 1102 /* we got all requests, but still have to send our elements as response */
960 struct GNUNET_MQ_Message *mqm; 1103 struct GNUNET_MQ_Message *mqm;
961 1104
1105 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
962 eo->phase = PHASE_FINISHED; 1106 eo->phase = PHASE_FINISHED;
963 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 1107 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1108 GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
964 GNUNET_MQ_send (eo->mq, mqm); 1109 GNUNET_MQ_send (eo->mq, mqm);
965 return; 1110 return;
966 } 1111 }
967 if (eo->phase == PHASE_EXPECT_ELEMENTS) 1112 if (eo->phase == PHASE_EXPECT_ELEMENTS)
968 { 1113 {
969 /* it's all over! */ 1114 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
970 eo->phase = PHASE_FINISHED; 1115 eo->phase = PHASE_FINISHED;
1116 send_client_done_and_destroy (eo);
971 return; 1117 return;
972 } 1118 }
973 GNUNET_break (0); 1119 GNUNET_break (0);
@@ -1026,19 +1172,27 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
1026{ 1172{
1027 struct UnionEvaluateOperation *eo; 1173 struct UnionEvaluateOperation *eo;
1028 1174
1029 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n");
1030
1031 eo = GNUNET_new (struct UnionEvaluateOperation); 1175 eo = GNUNET_new (struct UnionEvaluateOperation);
1032 eo->peer = m->peer; 1176 eo->peer = m->peer;
1033 eo->set = set; 1177 eo->set = set;
1034 eo->request_id = htons(m->request_id); 1178 eo->request_id = htonl (m->request_id);
1179 GNUNET_assert (0 != eo->request_id);
1035 eo->se = strata_estimator_dup (set->state.u->se); 1180 eo->se = strata_estimator_dup (set->state.u->se);
1036 eo->salt = ntohs (m->salt); 1181 eo->salt = ntohs (m->salt);
1037 eo->app_id = m->app_id; 1182 eo->app_id = m->app_id;
1183
1184 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation, (app %s)\n",
1185 GNUNET_h2s (&eo->app_id));
1186
1038 eo->socket = 1187 eo->socket =
1039 GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET, 1188 GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
1040 stream_open_cb, eo, 1189 stream_open_cb, eo,
1041 GNUNET_STREAM_OPTION_END); 1190 GNUNET_STREAM_OPTION_END);
1191
1192
1193 GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1194 eo->set->state.u->ops_tail,
1195 eo);
1042 /* the stream open callback will kick off the operation */ 1196 /* the stream open callback will kick off the operation */
1043} 1197}
1044 1198
@@ -1056,18 +1210,30 @@ _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
1056{ 1210{
1057 struct UnionEvaluateOperation *eo; 1211 struct UnionEvaluateOperation *eo;
1058 1212
1213 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
1214
1059 eo = GNUNET_new (struct UnionEvaluateOperation); 1215 eo = GNUNET_new (struct UnionEvaluateOperation);
1060 eo->generation_created = set->state.u->current_generation++; 1216 eo->generation_created = set->state.u->current_generation++;
1061 eo->set = set; 1217 eo->set = set;
1062 eo->peer = incoming->peer; 1218 eo->peer = incoming->peer;
1063 eo->salt = ntohs (incoming->salt); 1219 eo->salt = ntohs (incoming->salt);
1064 eo->request_id = m->request_id; 1220 GNUNET_assert (0 != ntohl (m->request_id));
1221 eo->request_id = ntohl (m->request_id);
1065 eo->se = strata_estimator_dup (set->state.u->se); 1222 eo->se = strata_estimator_dup (set->state.u->se);
1066 eo->set = set; 1223 eo->set = set;
1067 eo->mq = incoming->mq; 1224 eo->mq = incoming->mq;
1225 /* transfer ownership of mq and socket from incoming to eo */
1226 incoming->mq = NULL;
1227 eo->socket = incoming->socket;
1228 incoming->socket = NULL;
1068 /* the peer's socket is now ours, we'll receive all messages */ 1229 /* the peer's socket is now ours, we'll receive all messages */
1069 GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo); 1230 GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
1070 /* kick of the operation */ 1231
1232 GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1233 eo->set->state.u->ops_tail,
1234 eo);
1235
1236 /* kick off the operation */
1071 send_strata_estimator (eo); 1237 send_strata_estimator (eo);
1072} 1238}
1073 1239
@@ -1082,7 +1248,7 @@ _GSS_union_set_create (void)
1082{ 1248{
1083 struct Set *set; 1249 struct Set *set;
1084 1250
1085 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n"); 1251 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n");
1086 1252
1087 set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState)); 1253 set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
1088 set->state.u = (struct UnionState *) &set[1]; 1254 set->state.u = (struct UnionState *) &set[1];
@@ -1109,6 +1275,8 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set)
1109 struct ElementEntry *ee_dup; 1275 struct ElementEntry *ee_dup;
1110 uint16_t element_size; 1276 uint16_t element_size;
1111 1277
1278 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
1279
1112 GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); 1280 GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1113 element_size = ntohs (m->header.size) - sizeof *m; 1281 element_size = ntohs (m->header.size) - sizeof *m;
1114 ee = GNUNET_malloc (element_size + sizeof *ee); 1282 ee = GNUNET_malloc (element_size + sizeof *ee);
@@ -1131,6 +1299,38 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set)
1131 1299
1132 1300
1133/** 1301/**
1302 * Destroy a set that supports the union operation
1303 *
1304 * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
1305 */
1306void
1307_GSS_union_set_destroy (struct Set *set)
1308{
1309 GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1310 if (NULL != set->client)
1311 {
1312 GNUNET_SERVER_client_drop (set->client);
1313 set->client = NULL;
1314 }
1315 if (NULL != set->client_mq)
1316 {
1317 GNUNET_MQ_destroy (set->client_mq);
1318 set->client_mq = NULL;
1319 }
1320
1321 if (NULL != set->state.u->se)
1322 {
1323 strata_estimator_destroy (set->state.u->se);
1324 set->state.u->se = NULL;
1325 }
1326
1327 destroy_elements (set->state.u);
1328
1329 while (NULL != set->state.u->ops_head)
1330 destroy_union_operation (set->state.u->ops_head);
1331}
1332
1333/**
1134 * Remove the element given in the element message from the set. 1334 * Remove the element given in the element message from the set.
1135 * Only marks the element as removed, so that older set operations can still exchange it. 1335 * Only marks the element as removed, so that older set operations can still exchange it.
1136 * 1336 *
diff --git a/src/set/gnunet-set-bug.c b/src/set/gnunet-set-bug.c
index edcd8b561..112def7d7 100644
--- a/src/set/gnunet-set-bug.c
+++ b/src/set/gnunet-set-bug.c
@@ -113,6 +113,8 @@ run (void *cls, char *const *args,
113 cfg = GNUNET_CONFIGURATION_dup (cfg2); 113 cfg = GNUNET_CONFIGURATION_dup (cfg2);
114 GNUNET_CRYPTO_get_host_identity (cfg, &local_id); 114 GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
115 115
116 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "I am Peer %s\n", GNUNET_h2s (&local_id.hashPubKey));
117
116 listen_socket = GNUNET_STREAM_listen (cfg, 118 listen_socket = GNUNET_STREAM_listen (cfg,
117 GNUNET_APPLICATION_TYPE_SET, 119 GNUNET_APPLICATION_TYPE_SET,
118 &listen_cb, 120 &listen_cb,
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c
index c49b60dfd..d665fce11 100644
--- a/src/set/gnunet-set.c
+++ b/src/set/gnunet-set.c
@@ -36,6 +36,53 @@ static struct GNUNET_HashCode app_id;
36static struct GNUNET_SET_Handle *set1; 36static struct GNUNET_SET_Handle *set1;
37static struct GNUNET_SET_Handle *set2; 37static struct GNUNET_SET_Handle *set2;
38static struct GNUNET_SET_ListenHandle *listen_handle; 38static struct GNUNET_SET_ListenHandle *listen_handle;
39const static struct GNUNET_CONFIGURATION_Handle *config;
40
41int num_done;
42
43
44static void
45result_cb_set1 (void *cls, struct GNUNET_SET_Element *element,
46 enum GNUNET_SET_Status status)
47{
48 switch (status)
49 {
50 case GNUNET_SET_STATUS_OK:
51 printf ("set 1: got element\n");
52 break;
53 case GNUNET_SET_STATUS_FAILURE:
54 printf ("set 1: failure\n");
55 break;
56 case GNUNET_SET_STATUS_DONE:
57 printf ("set 1: done\n");
58 GNUNET_SET_destroy (set1);
59 break;
60 default:
61 GNUNET_assert (0);
62 }
63}
64
65
66static void
67result_cb_set2 (void *cls, struct GNUNET_SET_Element *element,
68 enum GNUNET_SET_Status status)
69{
70 switch (status)
71 {
72 case GNUNET_SET_STATUS_OK:
73 printf ("set 2: got element\n");
74 break;
75 case GNUNET_SET_STATUS_FAILURE:
76 printf ("set 2: failure\n");
77 break;
78 case GNUNET_SET_STATUS_DONE:
79 printf ("set 2: done\n");
80 GNUNET_SET_destroy (set2);
81 break;
82 default:
83 GNUNET_assert (0);
84 }
85}
39 86
40 87
41static void 88static void
@@ -45,13 +92,67 @@ listen_cb (void *cls,
45 struct GNUNET_SET_Request *request) 92 struct GNUNET_SET_Request *request)
46{ 93{
47 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); 94 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
95 GNUNET_SET_listen_cancel (listen_handle);
96
97 GNUNET_SET_accept (request, set2, GNUNET_TIME_UNIT_FOREVER_REL,
98 GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
48} 99}
49 100
101
102/**
103 * Start the set operation.
104 *
105 * @param cls closure, unused
106 */
50static void 107static void
51result_cb (void *cls, struct GNUNET_SET_Element *element, 108start (void *cls)
52 enum GNUNET_SET_Status status) 109{
110 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
111 &app_id, listen_cb, NULL);
112 GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42,
113 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED,
114 result_cb_set1, NULL);
115}
116
117
118/**
119 * Initialize the second set, continue
120 *
121 * @param cls closure, unused
122 */
123static void
124init_set2 (void *cls)
53{ 125{
54 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result\n"); 126 struct GNUNET_SET_Element element;
127
128
129 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
130
131 element.data = "hello";
132 element.size = strlen(element.data);
133 GNUNET_SET_add_element (set2, &element, NULL, NULL);
134 element.data = "quux";
135 element.size = strlen(element.data);
136 GNUNET_SET_add_element (set2, &element, start, NULL);
137}
138
139
140/**
141 * Initialize the first set, continue.
142 */
143static void
144init_set1 (void)
145{
146 struct GNUNET_SET_Element element;
147
148 element.data = "hello";
149 element.size = strlen(element.data);
150 GNUNET_SET_add_element (set1, &element, NULL, NULL);
151 element.data = "bar";
152 element.size = strlen(element.data);
153 GNUNET_SET_add_element (set1, &element, init_set2, NULL);
154
155 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
55} 156}
56 157
57 158
@@ -69,6 +170,8 @@ run (void *cls, char *const *args,
69 const struct GNUNET_CONFIGURATION_Handle *cfg) 170 const struct GNUNET_CONFIGURATION_Handle *cfg)
70{ 171{
71 static const char* app_str = "gnunet-set"; 172 static const char* app_str = "gnunet-set";
173
174 config = cfg;
72 175
73 GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id); 176 GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id);
74 177
@@ -76,12 +179,8 @@ run (void *cls, char *const *args,
76 179
77 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); 180 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
78 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); 181 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
79 listen_handle = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
80 &app_id, listen_cb, NULL);
81 182
82 GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, 183 init_set1 ();
83 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED,
84 result_cb, NULL);
85} 184}
86 185
87 186
diff --git a/src/set/ibf.c b/src/set/ibf.c
index 739b97339..383ce3daf 100644
--- a/src/set/ibf.c
+++ b/src/set/ibf.c
@@ -280,6 +280,7 @@ ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct Invertib
280 struct IBF_KeyHash *key_hash_src; 280 struct IBF_KeyHash *key_hash_src;
281 struct IBF_Count *count_src; 281 struct IBF_Count *count_src;
282 282
283 GNUNET_assert (count > 0);
283 GNUNET_assert (start + count <= ibf->size); 284 GNUNET_assert (start + count <= ibf->size);
284 285
285 /* copy keys */ 286 /* copy keys */
diff --git a/src/set/mq.c b/src/set/mq.c
index 3a9e614e9..0ced014dd 100644
--- a/src/set/mq.c
+++ b/src/set/mq.c
@@ -192,13 +192,22 @@ static void
192dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) 192dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
193{ 193{
194 const struct GNUNET_MQ_Handler *handler; 194 const struct GNUNET_MQ_Handler *handler;
195 int handled = GNUNET_NO;
195 196
196 handler = mq->handlers; 197 handler = mq->handlers;
197 if (NULL == handler) 198 if (NULL == handler)
198 return; 199 return;
199 for (; NULL != handler->cb; handler++) 200 for (; NULL != handler->cb; handler++)
201 {
200 if (handler->type == ntohs (mh->type)) 202 if (handler->type == ntohs (mh->type))
203 {
201 handler->cb (mq->handlers_cls, mh); 204 handler->cb (mq->handlers_cls, mh);
205 handled = GNUNET_YES;
206 }
207 }
208
209 if (GNUNET_NO == handled)
210 LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
202} 211}
203 212
204 213
@@ -220,6 +229,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
220void 229void
221GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) 230GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
222{ 231{
232 GNUNET_assert (NULL != mq);
223 mq->send_impl (mq, mqm); 233 mq->send_impl (mq, mqm);
224} 234}
225 235
@@ -228,6 +238,7 @@ struct GNUNET_MQ_Message *
228GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) 238GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
229{ 239{
230 struct GNUNET_MQ_Message *mqm; 240 struct GNUNET_MQ_Message *mqm;
241
231 mqm = GNUNET_malloc (sizeof *mqm + size); 242 mqm = GNUNET_malloc (sizeof *mqm + size);
232 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; 243 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
233 mqm->mh->size = htons (size); 244 mqm->mh->size = htons (size);
@@ -245,16 +256,18 @@ GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
245 size_t new_size; 256 size_t new_size;
246 size_t old_size; 257 size_t old_size;
247 258
259 GNUNET_assert (NULL != mqmp);
260 /* there's no data to append => do nothing */
248 if (NULL == data) 261 if (NULL == data)
249 return GNUNET_OK; 262 return GNUNET_OK;
250 GNUNET_assert (NULL != mqmp);
251 old_size = ntohs ((*mqmp)->mh->size); 263 old_size = ntohs ((*mqmp)->mh->size);
252 /* message too large to concatenate? */ 264 /* message too large to concatenate? */
253 if (ntohs ((*mqmp)->mh->size) + len < len) 265 if (((uint16_t) (old_size + len)) < len)
254 return GNUNET_SYSERR; 266 return GNUNET_SYSERR;
255 new_size = old_size + len; 267 new_size = old_size + len;
256 *mqmp = GNUNET_realloc (mqmp, sizeof (struct GNUNET_MQ_Message) + new_size); 268 *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
257 memcpy ((*mqmp)->mh + old_size, data, new_size - old_size); 269 (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1];
270 memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size);
258 (*mqmp)->mh->size = htons (new_size); 271 (*mqmp)->mh->size = htons (new_size);
259 return GNUNET_OK; 272 return GNUNET_OK;
260} 273}
@@ -286,12 +299,10 @@ stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
286 299
287 /* call cb for message we finished sending */ 300 /* call cb for message we finished sending */
288 mqm = mq->current_msg; 301 mqm = mq->current_msg;
289 if (NULL != mqm) 302 GNUNET_assert (NULL != mq->current_msg);
290 { 303 if (NULL != mqm->sent_cb)
291 if (NULL != mqm->sent_cb) 304 mqm->sent_cb (mqm->sent_cls);
292 mqm->sent_cb (mqm->sent_cls); 305 GNUNET_free (mqm);
293 GNUNET_free (mqm);
294 }
295 306
296 mss->wh = NULL; 307 mss->wh = NULL;
297 308
@@ -384,6 +395,35 @@ stream_data_processor (void *cls,
384} 395}
385 396
386 397
398static void
399stream_socket_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
400{
401 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
402
403 if (NULL != mss->rh)
404 {
405 GNUNET_STREAM_read_cancel (mss->rh);
406 mss->rh = NULL;
407 }
408
409 if (NULL != mss->wh)
410 {
411 GNUNET_STREAM_write_cancel (mss->wh);
412 mss->wh = NULL;
413 }
414
415 if (NULL != mss->mst)
416 {
417 GNUNET_SERVER_mst_destroy (mss->mst);
418 mss->mst = NULL;
419 }
420
421 GNUNET_free (mss);
422}
423
424
425
426
387struct GNUNET_MQ_MessageQueue * 427struct GNUNET_MQ_MessageQueue *
388GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, 428GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
389 const struct GNUNET_MQ_Handler *handlers, 429 const struct GNUNET_MQ_Handler *handlers,
@@ -397,6 +437,7 @@ GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
397 mss->socket = socket; 437 mss->socket = socket;
398 mq->impl_state = mss; 438 mq->impl_state = mss;
399 mq->send_impl = stream_socket_send_impl; 439 mq->send_impl = stream_socket_send_impl;
440 mq->destroy_impl = &stream_socket_destroy_impl;
400 mq->handlers = handlers; 441 mq->handlers = handlers;
401 mq->handlers_cls = cls; 442 mq->handlers_cls = cls;
402 if (NULL != handlers) 443 if (NULL != handlers)
@@ -425,14 +466,21 @@ transmit_queued (void *cls, size_t size,
425 struct ServerClientSocketState *state = mq->impl_state; 466 struct ServerClientSocketState *state = mq->impl_state;
426 size_t msg_size; 467 size_t msg_size;
427 468
469 GNUNET_assert (NULL != buf);
470
471 if (NULL != mqm->sent_cb)
472 {
473 mqm->sent_cb (mqm->sent_cls);
474 }
475
428 mq->current_msg = NULL; 476 mq->current_msg = NULL;
429 GNUNET_assert (NULL != mqm); 477 GNUNET_assert (NULL != mqm);
430 GNUNET_assert (NULL != buf);
431 msg_size = ntohs (mqm->mh->size); 478 msg_size = ntohs (mqm->mh->size);
432 GNUNET_assert (size >= msg_size); 479 GNUNET_assert (size >= msg_size);
433 memcpy (buf, mqm->mh, msg_size); 480 memcpy (buf, mqm->mh, msg_size);
434 GNUNET_free (mqm); 481 GNUNET_free (mqm);
435 state->th = NULL; 482 state->th = NULL;
483
436 if (NULL != mq->msg_head) 484 if (NULL != mq->msg_head)
437 { 485 {
438 mq->current_msg = mq->msg_head; 486 mq->current_msg = mq->msg_head;
@@ -448,12 +496,27 @@ transmit_queued (void *cls, size_t size,
448} 496}
449 497
450 498
499
500static void
501server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
502{
503 struct ServerClientSocketState *state;
504
505 GNUNET_assert (NULL != mq);
506 state = mq->impl_state;
507 GNUNET_assert (NULL != state);
508 GNUNET_SERVER_client_drop (state->client);
509 GNUNET_free (state);
510}
511
451static void 512static void
452server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) 513server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
453{ 514{
454 struct ServerClientSocketState *state = mq->impl_state; 515 struct ServerClientSocketState *state;
455 int msize; 516 int msize;
456 517
518 GNUNET_assert (NULL != mq);
519 state = mq->impl_state;
457 GNUNET_assert (NULL != state); 520 GNUNET_assert (NULL != state);
458 521
459 if (NULL != state->th) 522 if (NULL != state->th)
@@ -461,8 +524,9 @@ server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Mes
461 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); 524 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
462 return; 525 return;
463 } 526 }
527 GNUNET_assert (NULL == mq->msg_head);
464 GNUNET_assert (NULL == mq->current_msg); 528 GNUNET_assert (NULL == mq->current_msg);
465 msize = ntohs (mq->msg_head->mh->size); 529 msize = ntohs (mqm->mh->size);
466 mq->current_msg = mqm; 530 mq->current_msg = mqm;
467 state->th = 531 state->th =
468 GNUNET_SERVER_notify_transmit_ready (state->client, msize, 532 GNUNET_SERVER_notify_transmit_ready (state->client, msize,
@@ -480,7 +544,10 @@ GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
480 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); 544 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
481 scss = GNUNET_new (struct ServerClientSocketState); 545 scss = GNUNET_new (struct ServerClientSocketState);
482 mq->impl_state = scss; 546 mq->impl_state = scss;
547 scss->client = client;
548 GNUNET_SERVER_client_keep (client);
483 mq->send_impl = server_client_send_impl; 549 mq->send_impl = server_client_send_impl;
550 mq->destroy_impl = server_client_destroy_impl;
484 return mq; 551 return mq;
485} 552}
486 553
@@ -502,8 +569,15 @@ connection_client_transmit_queued (void *cls, size_t size,
502 struct ClientConnectionState *state = mq->impl_state; 569 struct ClientConnectionState *state = mq->impl_state;
503 size_t msg_size; 570 size_t msg_size;
504 571
505 mq->current_msg = NULL; 572
506 GNUNET_assert (NULL != mqm); 573 GNUNET_assert (NULL != mqm);
574
575 if (NULL != mqm->sent_cb)
576 {
577 mqm->sent_cb (mqm->sent_cls);
578 }
579
580 mq->current_msg = NULL;
507 GNUNET_assert (NULL != buf); 581 GNUNET_assert (NULL != buf);
508 msg_size = ntohs (mqm->mh->size); 582 msg_size = ntohs (mqm->mh->size);
509 GNUNET_assert (size >= msg_size); 583 GNUNET_assert (size >= msg_size);
@@ -515,7 +589,7 @@ connection_client_transmit_queued (void *cls, size_t size,
515 mq->current_msg = mq->msg_head; 589 mq->current_msg = mq->msg_head;
516 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); 590 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
517 state->th = 591 state->th =
518 GNUNET_CLIENT_notify_transmit_ready (state->connection, htons (mq->current_msg->mh->size), 592 GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size),
519 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, 593 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
520 &connection_client_transmit_queued, mq); 594 &connection_client_transmit_queued, mq);
521 } 595 }
@@ -525,6 +599,13 @@ connection_client_transmit_queued (void *cls, size_t size,
525} 599}
526 600
527 601
602
603static void
604connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
605{
606 GNUNET_free (mq->impl_state);
607}
608
528static void 609static void
529connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, 610connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
530 struct GNUNET_MQ_Message *mqm) 611 struct GNUNET_MQ_Message *mqm)
@@ -549,6 +630,7 @@ connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
549} 630}
550 631
551 632
633
552/** 634/**
553 * Type of a function to call when we receive a message 635 * Type of a function to call when we receive a message
554 * from the service. 636 * from the service.
@@ -561,6 +643,9 @@ handle_client_message (void *cls,
561 const struct GNUNET_MessageHeader *msg) 643 const struct GNUNET_MessageHeader *msg)
562{ 644{
563 struct GNUNET_MQ_MessageQueue *mq = cls; 645 struct GNUNET_MQ_MessageQueue *mq = cls;
646 struct ClientConnectionState *state;
647
648 state = mq->impl_state;
564 649
565 if (NULL == msg) 650 if (NULL == msg)
566 { 651 {
@@ -569,6 +654,10 @@ handle_client_message (void *cls,
569 mq->read_error_cb (mq->read_error_cls); 654 mq->read_error_cb (mq->read_error_cls);
570 return; 655 return;
571 } 656 }
657
658 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
659 GNUNET_TIME_UNIT_FOREVER_REL);
660
572 dispatch_message (mq, msg); 661 dispatch_message (mq, msg);
573} 662}
574 663
@@ -590,6 +679,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti
590 state->connection = connection; 679 state->connection = connection;
591 mq->impl_state = state; 680 mq->impl_state = state;
592 mq->send_impl = connection_client_send_impl; 681 mq->send_impl = connection_client_send_impl;
682 mq->destroy_impl = connection_client_destroy_impl;
593 683
594 if (NULL != handlers) 684 if (NULL != handlers)
595 { 685 {
@@ -626,7 +716,10 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
626 uint32_t id; 716 uint32_t id;
627 717
628 if (NULL == mq->assoc_map) 718 if (NULL == mq->assoc_map)
719 {
629 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8); 720 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
721 mq->assoc_id = 1;
722 }
630 id = mq->assoc_id++; 723 id = mq->assoc_id++;
631 GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data, 724 GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
632 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 725 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
@@ -652,6 +745,7 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
652 if (NULL == mq->assoc_map) 745 if (NULL == mq->assoc_map)
653 return NULL; 746 return NULL;
654 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); 747 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
748 GNUNET_assert (NULL != val);
655 GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val); 749 GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
656 return val; 750 return val;
657} 751}
@@ -671,6 +765,12 @@ void
671GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) 765GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
672{ 766{
673 /* FIXME: destroy all pending messages in the queue */ 767 /* FIXME: destroy all pending messages in the queue */
768
769 if (NULL != mq->destroy_impl)
770 {
771 mq->destroy_impl (mq);
772 }
773
674 GNUNET_free (mq); 774 GNUNET_free (mq);
675} 775}
676 776
diff --git a/src/set/mq.h b/src/set/mq.h
index e43ce04b4..42b755163 100644
--- a/src/set/mq.h
+++ b/src/set/mq.h
@@ -106,7 +106,7 @@
106 * @param esize extra space to allocate after the message header 106 * @param esize extra space to allocate after the message header
107 * @param type type of the message 107 * @param type type of the message
108 */ 108 */
109#define GNUNET_MQ_msg_header_extra(mh, esize, type) GNUNET_MQ_msg_ (&mh, sizeof (struct GNUNET_MessageHeader), type) 109#define GNUNET_MQ_msg_header_extra(mh, esize, type) GNUNET_MQ_msg_ (&mh, (esize) + sizeof (struct GNUNET_MessageHeader), type)
110 110
111 111
112/** 112/**
@@ -166,6 +166,7 @@ struct GNUNET_MQ_Handler
166 */ 166 */
167typedef void (*GNUNET_MQ_NotifyCallback) (void *cls); 167typedef void (*GNUNET_MQ_NotifyCallback) (void *cls);
168 168
169
169/** 170/**
170 * Create a new message for MQ. 171 * Create a new message for MQ.
171 * 172 *
diff --git a/src/set/set.h b/src/set/set.h
index 87fd6efbf..33e0aafdd 100644
--- a/src/set/set.h
+++ b/src/set/set.h
@@ -80,36 +80,42 @@ struct AcceptMessage
80 struct GNUNET_MessageHeader header; 80 struct GNUNET_MessageHeader header;
81 81
82 /** 82 /**
83 * request id of the request we want to accept 83 * Request id that will be sent along with
84 * results for the accepted operation.
85 * Chosen by the client.
86 * Must be 0 if the request has been rejected.
84 */ 87 */
85 uint32_t request_id GNUNET_PACKED; 88 uint32_t request_id GNUNET_PACKED;
86 89
87 /** 90 /**
88 * Zero if the client has rejected the request, 91 * ID of the incoming request we want to accept / reject.
89 * non-zero if it has accepted it
90 */ 92 */
91 uint32_t accepted GNUNET_PACKED; 93 uint32_t accept_id GNUNET_PACKED;
92}; 94};
93 95
94 96
97/**
98 * A request for an operation with another client.
99 */
95struct RequestMessage 100struct RequestMessage
96{ 101{
97 /** 102 /**
98 * Type: GNUNET_MESSAGE_TYPE_SET_Request 103 * Type: GNUNET_MESSAGE_TYPE_SET_Request.
99 */ 104 */
100 struct GNUNET_MessageHeader header; 105 struct GNUNET_MessageHeader header;
101 106
102 /** 107 /**
103 * requesting peer 108 * Identity of the requesting peer.
104 */ 109 */
105 struct GNUNET_PeerIdentity peer_id; 110 struct GNUNET_PeerIdentity peer_id;
106 111
107 /** 112 /**
108 * request id of the request we want to accept 113 * ID of the request we want to accept,
114 * chosen by the service.
109 */ 115 */
110 uint32_t request_id GNUNET_PACKED; 116 uint32_t accept_id GNUNET_PACKED;
111 117
112 /* rest: inner message */ 118 /* rest: nested context message */
113}; 119};
114 120
115 121
@@ -131,7 +137,7 @@ struct EvaluateMessage
131 struct GNUNET_HashCode app_id; 137 struct GNUNET_HashCode app_id;
132 138
133 /** 139 /**
134 * id of our evaluate 140 * id of our evaluate, chosen by the client
135 */ 141 */
136 uint32_t request_id GNUNET_PACKED; 142 uint32_t request_id GNUNET_PACKED;
137 143
@@ -186,6 +192,8 @@ struct ElementMessage
186 192
187 uint16_t element_type GNUNET_PACKED; 193 uint16_t element_type GNUNET_PACKED;
188 194
195 uint16_t reserved GNUNET_PACKED;
196
189 /* rest: the actual element */ 197 /* rest: the actual element */
190}; 198};
191 199
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 2ea002231..775e390de 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -30,6 +30,7 @@
30#include "gnunet_set_service.h" 30#include "gnunet_set_service.h"
31#include "set.h" 31#include "set.h"
32#include "mq.h" 32#include "mq.h"
33#include <inttypes.h>
33 34
34 35
35#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) 36#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
@@ -49,7 +50,7 @@ struct GNUNET_SET_Handle
49 */ 50 */
50struct GNUNET_SET_Request 51struct GNUNET_SET_Request
51{ 52{
52 uint32_t request_id; 53 uint32_t accept_id;
53 int accepted; 54 int accepted;
54}; 55};
55 56
@@ -98,20 +99,23 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
98 } 99 }
99 100
100 oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id)); 101 oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id));
101 GNUNET_break (NULL != oh); 102 GNUNET_assert (NULL != oh);
102 if (GNUNET_SCHEDULER_NO_TASK != oh->timeout_task) 103 /* status is not STATUS_OK => there's no attached element,
103 { 104 * and this is the last result message we get */
104 GNUNET_SCHEDULER_cancel (oh->timeout_task);
105 oh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
106 }
107 if (htons (msg->result_status) != GNUNET_SET_STATUS_OK) 105 if (htons (msg->result_status) != GNUNET_SET_STATUS_OK)
108 { 106 {
107 if (GNUNET_SCHEDULER_NO_TASK != oh->timeout_task)
108 {
109 GNUNET_SCHEDULER_cancel (oh->timeout_task);
110 oh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
111 }
112 GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
109 if (NULL != oh->result_cb) 113 if (NULL != oh->result_cb)
110 oh->result_cb (oh->result_cls, NULL, htons (msg->result_status)); 114 oh->result_cb (oh->result_cls, NULL, htons (msg->result_status));
111 GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
112 GNUNET_free (oh); 115 GNUNET_free (oh);
113 return; 116 return;
114 } 117 }
118
115 e.data = &msg[1]; 119 e.data = &msg[1];
116 e.size = ntohs (mh->size) - sizeof (struct ResultMessage); 120 e.size = ntohs (mh->size) - sizeof (struct ResultMessage);
117 e.type = msg->element_type; 121 e.type = msg->element_type;
@@ -133,18 +137,25 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
133 struct GNUNET_SET_Request *req; 137 struct GNUNET_SET_Request *req;
134 138
135 req = GNUNET_new (struct GNUNET_SET_Request); 139 req = GNUNET_new (struct GNUNET_SET_Request);
136 req->request_id = ntohl (msg->request_id); 140 req->accept_id = ntohl (msg->accept_id);
141 /* calling GNUNET_SET_accept in the listen cb will set req->accepted */
137 lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req); 142 lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req);
143
138 if (GNUNET_NO == req->accepted) 144 if (GNUNET_NO == req->accepted)
139 { 145 {
140 struct GNUNET_MQ_Message *mqm; 146 struct GNUNET_MQ_Message *mqm;
141 struct AcceptMessage *amsg; 147 struct AcceptMessage *amsg;
142 148
143 mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); 149 mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
144 amsg->request_id = msg->request_id; 150 /* no request id, as we refused */
151 amsg->request_id = htonl (0);
152 amsg->accept_id = msg->accept_id;
145 GNUNET_MQ_send (lh->mq, mqm); 153 GNUNET_MQ_send (lh->mq, mqm);
146 GNUNET_free (req); 154 GNUNET_free (req);
147 } 155 }
156
157 /* the accept-case is handled in GNUNET_SET_accept,
158 * as we have the accept message available there */
148} 159}
149 160
150 161
@@ -173,7 +184,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
173 184
174 set = GNUNET_new (struct GNUNET_SET_Handle); 185 set = GNUNET_new (struct GNUNET_SET_Handle);
175 set->client = GNUNET_CLIENT_connect ("set", cfg); 186 set->client = GNUNET_CLIENT_connect ("set", cfg);
176 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set client created\n"); 187 LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n");
177 GNUNET_assert (NULL != set->client); 188 GNUNET_assert (NULL != set->client);
178 set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set); 189 set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set);
179 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); 190 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
@@ -377,12 +388,9 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
377void 388void
378GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) 389GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
379{ 390{
380 GNUNET_MQ_destroy (lh->mq);
381 lh->mq = NULL;
382 GNUNET_CLIENT_disconnect (lh->client); 391 GNUNET_CLIENT_disconnect (lh->client);
383 lh->client = NULL; 392 GNUNET_MQ_destroy (lh->mq);
384 lh->listen_cb = NULL; 393 GNUNET_free (lh);
385 lh->listen_cls = NULL;
386} 394}
387 395
388 396
@@ -420,8 +428,8 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
420 oh->set = set; 428 oh->set = set;
421 429
422 mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT); 430 mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT);
423 msg->request_id = htonl (request->request_id); 431 msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, NULL, oh));
424 msg->accepted = 1; 432 msg->accept_id = htonl (request->accept_id);
425 GNUNET_MQ_send (set->mq, mqm); 433 GNUNET_MQ_send (set->mq, mqm);
426 434
427 oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); 435 oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh);
diff --git a/src/set/strata_estimator.c b/src/set/strata_estimator.c
index 60f75f1bc..024bb99c6 100644
--- a/src/set/strata_estimator.c
+++ b/src/set/strata_estimator.c
@@ -33,6 +33,8 @@ void
33strata_estimator_write (const struct StrataEstimator *se, void *buf) 33strata_estimator_write (const struct StrataEstimator *se, void *buf)
34{ 34{
35 int i; 35 int i;
36
37 GNUNET_assert (NULL != se);
36 for (i = 0; i < se->strata_count; i++) 38 for (i = 0; i < se->strata_count; i++)
37 { 39 {
38 ibf_write_slice (se->strata[i], 0, se->ibf_size, buf); 40 ibf_write_slice (se->strata[i], 0, se->ibf_size, buf);
diff --git a/src/set/test_mq.c b/src/set/test_mq.c
new file mode 100644
index 000000000..d13c63440
--- /dev/null
+++ b/src/set/test_mq.c
@@ -0,0 +1,115 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file set/test_mq.c
23 * @brief simple tests for mq
24 */
25#include "platform.h"
26#include "gnunet_util_lib.h"
27#include "gnunet_testing_lib.h"
28#include "mq.h"
29
30
31GNUNET_NETWORK_STRUCT_BEGIN
32
33struct MyMessage
34{
35 struct GNUNET_MessageHeader header;
36 uint32_t x GNUNET_PACKED;
37};
38
39GNUNET_NETWORK_STRUCT_END
40
41void
42test1 (void)
43{
44 struct GNUNET_MQ_Message *mqm;
45 struct MyMessage *mm;
46
47 mm = NULL;
48 mqm = NULL;
49
50 mqm = GNUNET_MQ_msg (mm, 42);
51 GNUNET_assert (NULL != mqm);
52 GNUNET_assert (NULL != mm);
53 GNUNET_assert (42 == ntohs (mm->header.type));
54 GNUNET_assert (sizeof (struct MyMessage) == ntohs (mm->header.size));
55}
56
57
58void
59test2 (void)
60{
61 struct GNUNET_MQ_Message *mqm;
62 struct MyMessage *mm;
63 int res;
64 char *s = "foo";
65
66 mqm = GNUNET_MQ_msg (mm, 42);
67 res = GNUNET_MQ_nest (mqm, s, strlen(s));
68 GNUNET_assert (GNUNET_OK == res);
69 res = GNUNET_MQ_nest (mqm, s, strlen(s));
70 GNUNET_assert (GNUNET_OK == res);
71 res = GNUNET_MQ_nest (mqm, NULL, 0);
72 GNUNET_assert (GNUNET_OK == res);
73
74 GNUNET_assert (strlen (s) * 2 + sizeof (struct MyMessage) == ntohs (mm->header.size));
75
76 res = GNUNET_MQ_nest_mh (mqm, &mm->header);
77 GNUNET_assert (GNUNET_OK == res);
78 GNUNET_assert (2 * (strlen (s) * 2 + sizeof (struct MyMessage)) == ntohs (mm->header.size));
79
80 res = GNUNET_MQ_nest (mqm, (void *) 0xF00BA, 0xFFF0);
81 GNUNET_assert (GNUNET_OK != res);
82
83 GNUNET_MQ_discard (mqm);
84}
85
86
87void
88test3 (void)
89{
90 struct GNUNET_MQ_Message *mqm;
91 struct GNUNET_MessageHeader *mh;
92
93 mqm = GNUNET_MQ_msg_header (42);
94 /* how could the above be checked? */
95
96 GNUNET_MQ_discard (mqm);
97
98 mqm = GNUNET_MQ_msg_header_extra (mh, 20, 42);
99 GNUNET_assert (42 == ntohs (mh->type));
100 GNUNET_assert (sizeof (struct GNUNET_MessageHeader) + 20 == ntohs (mh->size));
101}
102
103
104int
105main (int argc, char **argv)
106{
107
108 GNUNET_log_setup ("test-mq", "INFO", NULL);
109 test1 ();
110 test2 ();
111 test3 ();
112
113 return 0;
114}
115
diff --git a/src/set/test_mq_client.c b/src/set/test_mq_client.c
new file mode 100644
index 000000000..ca615d37e
--- /dev/null
+++ b/src/set/test_mq_client.c
@@ -0,0 +1,181 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file set/test_mq.c
23 * @brief tests for mq with connection client
24 */
25/**
26 * @file util/test_server_with_client.c
27 * @brief tests for server.c and client.c,
28 * specifically disconnect_notify,
29 * client_get_address and receive_done (resume processing)
30 */
31#include "platform.h"
32#include "gnunet_common.h"
33#include "gnunet_scheduler_lib.h"
34#include "gnunet_client_lib.h"
35#include "gnunet_server_lib.h"
36#include "gnunet_time_lib.h"
37#include "mq.h"
38
39#define PORT 23336
40
41#define MY_TYPE 128
42
43
44static struct GNUNET_SERVER_Handle *server;
45
46static struct GNUNET_CLIENT_Connection *client;
47
48static struct GNUNET_CONFIGURATION_Handle *cfg;
49
50static int ok;
51
52static int notify = GNUNET_NO;
53
54static int received = 0;
55
56
57static void
58recv_cb (void *cls, struct GNUNET_SERVER_Client *argclient,
59 const struct GNUNET_MessageHeader *message)
60{
61 received++;
62
63 printf ("received\n");
64
65
66 if ((received == 2) && (GNUNET_YES == notify))
67 {
68 printf ("done\n");
69 GNUNET_SERVER_receive_done (argclient, GNUNET_NO);
70 return;
71 }
72
73 GNUNET_SERVER_receive_done (argclient, GNUNET_YES);
74}
75
76
77static void
78clean_up (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
79{
80 GNUNET_SERVER_destroy (server);
81 server = NULL;
82 GNUNET_CONFIGURATION_destroy (cfg);
83 cfg = NULL;
84}
85
86
87/**
88 * Functions with this signature are called whenever a client
89 * is disconnected on the network level.
90 *
91 * @param cls closure
92 * @param client identification of the client
93 */
94static void
95notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
96{
97 if (client == NULL)
98 return;
99 ok = 0;
100 GNUNET_SCHEDULER_add_now (&clean_up, NULL);
101}
102
103
104static struct GNUNET_SERVER_MessageHandler handlers[] = {
105 {&recv_cb, NULL, MY_TYPE, sizeof (struct GNUNET_MessageHeader)},
106 {NULL, NULL, 0, 0}
107};
108
109void send_cb (void *cls)
110{
111 printf ("notify sent\n");
112 notify = GNUNET_YES;
113}
114
115void test_mq (struct GNUNET_CLIENT_Connection *client)
116{
117 struct GNUNET_MQ_MessageQueue *mq;
118 struct GNUNET_MQ_Message *mqm;
119
120 /* FIXME: test handling responses */
121 mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL);
122
123 mqm = GNUNET_MQ_msg_header (MY_TYPE);
124 GNUNET_MQ_send (mq, mqm);
125
126 mqm = GNUNET_MQ_msg_header (MY_TYPE);
127 GNUNET_MQ_notify_sent (mqm, send_cb, NULL);
128 GNUNET_MQ_send (mq, mqm);
129
130 /* FIXME: add a message that will be canceled */
131}
132
133
134static void
135task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
136{
137 struct sockaddr_in sa;
138 struct sockaddr *sap[2];
139 socklen_t slens[2];
140
141 sap[0] = (struct sockaddr *) &sa;
142 slens[0] = sizeof (sa);
143 sap[1] = NULL;
144 slens[1] = 0;
145 memset (&sa, 0, sizeof (sa));
146#if HAVE_SOCKADDR_IN_SIN_LEN
147 sa.sin_len = sizeof (sa);
148#endif
149 sa.sin_family = AF_INET;
150 sa.sin_port = htons (PORT);
151 server =
152 GNUNET_SERVER_create (NULL, NULL, sap, slens,
153 GNUNET_TIME_relative_multiply
154 (GNUNET_TIME_UNIT_MILLISECONDS, 250), GNUNET_NO);
155 GNUNET_assert (server != NULL);
156 handlers[0].callback_cls = cls;
157 GNUNET_SERVER_add_handlers (server, handlers);
158 GNUNET_SERVER_disconnect_notify (server, &notify_disconnect, cls);
159 cfg = GNUNET_CONFIGURATION_create ();
160 GNUNET_CONFIGURATION_set_value_number (cfg, "test", "PORT", PORT);
161 GNUNET_CONFIGURATION_set_value_string (cfg, "test", "HOSTNAME", "localhost");
162 GNUNET_CONFIGURATION_set_value_string (cfg, "resolver", "HOSTNAME",
163 "localhost");
164 client = GNUNET_CLIENT_connect ("test", cfg);
165 GNUNET_assert (client != NULL);
166
167 test_mq (client);
168}
169
170
171int
172main (int argc, char *argv[])
173{
174 GNUNET_log_setup ("test-mq-client",
175 "INFO",
176 NULL);
177 ok = 1;
178 GNUNET_SCHEDULER_run (&task, NULL);
179 return ok;
180}
181
diff --git a/src/set/test_set.conf b/src/set/test_set.conf
index e69de29bb..c1d5a0f93 100644
--- a/src/set/test_set.conf
+++ b/src/set/test_set.conf
@@ -0,0 +1,23 @@
1[set]
2AUTOSTART = YES
3PORT = 2106
4HOSTNAME = localhost
5HOME = $SERVICEHOME
6BINARY = gnunet-service-set
7#PREFIX = gdbserver :12345
8#PREFIX = valgrind --leak-check=full
9ACCEPT_FROM = 127.0.0.1;
10ACCEPT_FROM6 = ::1;
11UNIXPATH = /tmp/gnunet-service-set.sock
12UNIX_MATCH_UID = YES
13UNIX_MATCH_GID = YES
14OPTIONS = -L INFO
15
16
17[transport]
18OPTIONS = -LERROR
19
20
21[testbed]
22OVERLAY_TOPOLOGY = CLIQUE
23
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c
index 0753fd139..0ab02cad7 100644
--- a/src/set/test_set_api.c
+++ b/src/set/test_set_api.c
@@ -28,6 +28,99 @@
28#include "gnunet_set_service.h" 28#include "gnunet_set_service.h"
29 29
30 30
31static struct GNUNET_PeerIdentity local_id;
32static struct GNUNET_HashCode app_id;
33static struct GNUNET_SET_Handle *set1;
34static struct GNUNET_SET_Handle *set2;
35static struct GNUNET_SET_ListenHandle *listen_handle;
36const static struct GNUNET_CONFIGURATION_Handle *config;
37
38
39static void
40result_cb_set1 (void *cls, struct GNUNET_SET_Element *element,
41 enum GNUNET_SET_Status status)
42{
43 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result (set 1)\n");
44}
45
46
47static void
48result_cb_set2 (void *cls, struct GNUNET_SET_Element *element,
49 enum GNUNET_SET_Status status)
50{
51 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result (set 2)\n");
52}
53
54
55static void
56listen_cb (void *cls,
57 const struct GNUNET_PeerIdentity *other_peer,
58 const struct GNUNET_MessageHeader *context_msg,
59 struct GNUNET_SET_Request *request)
60{
61 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
62 GNUNET_SET_accept (request, set2, GNUNET_TIME_UNIT_FOREVER_REL,
63 GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
64}
65
66
67/**
68 * Start the set operation.
69 *
70 * @param cls closure, unused
71 */
72static void
73start (void *cls)
74{
75 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
76 &app_id, listen_cb, NULL);
77 GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42,
78 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED,
79 result_cb_set1, NULL);
80}
81
82
83/**
84 * Initialize the second set, continue
85 *
86 * @param cls closure, unused
87 */
88static void
89init_set2 (void *cls)
90{
91 struct GNUNET_SET_Element element;
92
93
94 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
95
96 element.data = "hello";
97 element.size = strlen(element.data);
98 GNUNET_SET_add_element (set2, &element, NULL, NULL);
99 element.data = "quux";
100 element.size = strlen(element.data);
101 GNUNET_SET_add_element (set2, &element, start, NULL);
102}
103
104
105/**
106 * Initialize the first set, continue.
107 */
108static void
109init_set1 (void)
110{
111 struct GNUNET_SET_Element element;
112
113 element.data = "hello";
114 element.size = strlen(element.data);
115 GNUNET_SET_add_element (set1, &element, NULL, NULL);
116 element.data = "bar";
117 element.size = strlen(element.data);
118 GNUNET_SET_add_element (set1, &element, init_set2, NULL);
119
120 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
121}
122
123
31/** 124/**
32 * Signature of the 'main' function for a (single-peer) testcase that 125 * Signature of the 'main' function for a (single-peer) testcase that
33 * is run using 'GNUNET_TESTING_peer_run'. 126 * is run using 'GNUNET_TESTING_peer_run'.
@@ -41,11 +134,15 @@ run (void *cls,
41 const struct GNUNET_CONFIGURATION_Handle *cfg, 134 const struct GNUNET_CONFIGURATION_Handle *cfg,
42 struct GNUNET_TESTING_Peer *peer) 135 struct GNUNET_TESTING_Peer *peer)
43{ 136{
44 struct GNUNET_SET_Handle *set1;
45 struct GNUNET_SET_Handle *set2;
46 137
138 static const char* app_str = "gnunet-set";
139
140 config = cfg;
141 GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id);
142 GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
47 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); 143 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
48 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); 144 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
145 init_set1 ();
49} 146}
50 147
51int 148int