aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/consensus/consensus_api.c319
-rw-r--r--src/consensus/gnunet-service-consensus.c28
-rw-r--r--src/dv/gnunet-service-dv.c16
-rw-r--r--src/include/gnunet_container_lib.h2
-rw-r--r--src/include/gnunet_mesh2_service.h17
-rw-r--r--src/include/gnunet_mq_lib.h309
-rw-r--r--src/include/gnunet_set_service.h30
-rw-r--r--src/include/gnunet_stream_lib.h4
-rw-r--r--src/mesh/mesh2_api.c131
-rw-r--r--src/set/Makefile.am24
-rw-r--r--src/set/gnunet-service-set.c278
-rw-r--r--src/set/gnunet-service-set.h73
-rw-r--r--src/set/gnunet-service-set_union.c212
-rw-r--r--src/set/gnunet-set-ibf-profiler.c (renamed from src/set/gnunet-set-ibf.c)19
-rw-r--r--src/set/gnunet-set-profiler.c320
-rw-r--r--src/set/gnunet-set.c203
-rw-r--r--src/set/ibf.c40
-rw-r--r--src/set/ibf.h2
-rw-r--r--src/set/set_api.c57
-rw-r--r--src/set/strata_estimator.c2
-rw-r--r--src/set/strata_estimator.h2
-rw-r--r--src/set/test_set_api.c10
-rw-r--r--src/stream/stream_api.c103
-rw-r--r--src/util/mq.c464
-rw-r--r--src/util/test_mq.c4
-rw-r--r--src/util/test_mq_client.c9
26 files changed, 1550 insertions, 1128 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index e3ddb4913..684580755 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -33,37 +33,6 @@
33 33
34#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35 35
36/**
37 * Actions that can be queued.
38 */
39struct QueuedMessage
40{
41 /**
42 * Queued messages are stored in a doubly linked list.
43 */
44 struct QueuedMessage *next;
45
46 /**
47 * Queued messages are stored in a doubly linked list.
48 */
49 struct QueuedMessage *prev;
50
51 /**
52 * The actual queued message.
53 */
54 struct GNUNET_MessageHeader *msg;
55
56 /**
57 * Will be called after transmit, if not NULL
58 */
59 GNUNET_CONSENSUS_InsertDoneCallback idc;
60
61 /**
62 * Closure for idc
63 */
64 void *idc_cls;
65};
66
67 36
68/** 37/**
69 * Handle for the service. 38 * Handle for the service.
@@ -106,21 +75,11 @@ struct GNUNET_CONSENSUS_Handle
106 struct GNUNET_PeerIdentity **peers; 75 struct GNUNET_PeerIdentity **peers;
107 76
108 /** 77 /**
109 * Currently active transmit request.
110 */
111 struct GNUNET_CLIENT_TransmitHandle *th;
112
113 /**
114 * GNUNES_YES iff the join message has been sent to the service. 78 * GNUNES_YES iff the join message has been sent to the service.
115 */ 79 */
116 int joined; 80 int joined;
117 81
118 /** 82 /**
119 * Closure for the insert done callback.
120 */
121 void *idc_cls;
122
123 /**
124 * Called when the conclude operation finishes or fails. 83 * Called when the conclude operation finishes or fails.
125 */ 84 */
126 GNUNET_CONSENSUS_ConcludeCallback conclude_cb; 85 GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
@@ -135,109 +94,36 @@ struct GNUNET_CONSENSUS_Handle
135 */ 94 */
136 struct GNUNET_TIME_Absolute conclude_deadline; 95 struct GNUNET_TIME_Absolute conclude_deadline;
137 96
138 unsigned int conclude_min_size; 97 /**
139 98 * Message queue for the client.
140 struct QueuedMessage *messages_head; 99 */
141 100 struct GNUNET_MQ_Handle *mq;
142 struct QueuedMessage *messages_tail;
143
144}; 101};
145 102
146
147
148/** 103/**
149 * Schedule transmitting the next message. 104 * FIXME: this should not bee necessary when the API
150 * 105 * issue has been fixed
151 * @param consensus consensus handle
152 */ 106 */
153static void 107struct InsertDoneInfo
154send_next (struct GNUNET_CONSENSUS_Handle *consensus);
155
156
157/**
158 * Function called to notify a client about the connection
159 * begin ready to queue more data. "buf" will be
160 * NULL and "size" zero if the connection was closed for
161 * writing in the meantime.
162 *
163 * @param cls closure
164 * @param size number of bytes available in buf
165 * @param buf where the callee should write the message
166 * @return number of bytes written to buf
167 */
168static size_t
169transmit_queued (void *cls, size_t size,
170 void *buf)
171{ 108{
172 struct GNUNET_CONSENSUS_Handle *consensus; 109 GNUNET_CONSENSUS_InsertDoneCallback idc;
173 struct QueuedMessage *qmsg; 110 void *cls;
174 size_t msg_size; 111};
175
176 consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
177 consensus->th = NULL;
178
179 qmsg = consensus->messages_head;
180 GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
181
182 if (NULL == buf)
183 {
184 if (NULL != qmsg->idc)
185 {
186 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
187 }
188 return 0;
189 }
190
191 msg_size = ntohs (qmsg->msg->size);
192
193 GNUNET_assert (size >= msg_size);
194
195 memcpy (buf, qmsg->msg, msg_size);
196 if (NULL != qmsg->idc)
197 {
198 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
199 }
200 GNUNET_free (qmsg->msg);
201 GNUNET_free (qmsg);
202 /* FIXME: free the messages */
203
204 send_next (consensus);
205
206 return msg_size;
207}
208
209
210/**
211 * Schedule transmitting the next message.
212 *
213 * @param consensus consensus handle
214 */
215static void
216send_next (struct GNUNET_CONSENSUS_Handle *consensus)
217{
218 if (NULL != consensus->th)
219 return;
220
221 if (NULL != consensus->messages_head)
222 {
223 consensus->th =
224 GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
225 GNUNET_TIME_UNIT_FOREVER_REL,
226 GNUNET_NO, &transmit_queued, consensus);
227 }
228}
229 112
230 113
231/** 114/**
232 * Called when the server has sent is a new element 115 * Called when the server has sent is a new element
233 * 116 *
234 * @param consensus consensus handle 117 * @param cls consensus handle
235 * @param msg element message 118 * @param mh element message
236 */ 119 */
237static void 120static void
238handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, 121handle_new_element (void *cls,
239 struct GNUNET_CONSENSUS_ElementMessage *msg) 122 const struct GNUNET_MessageHeader *mh)
240{ 123{
124 struct GNUNET_CONSENSUS_Handle *consensus = cls;
125 const struct GNUNET_CONSENSUS_ElementMessage *msg
126 = (const struct GNUNET_CONSENSUS_ElementMessage *) mh;
241 struct GNUNET_SET_Element element; 127 struct GNUNET_SET_Element element;
242 128
243 LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); 129 LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
@@ -247,8 +133,6 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
247 element.data = &msg[1]; 133 element.data = &msg[1];
248 134
249 consensus->new_element_cb (consensus->new_element_cls, &element); 135 consensus->new_element_cb (consensus->new_element_cls, &element);
250
251 send_next (consensus);
252} 136}
253 137
254 138
@@ -256,13 +140,15 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
256 * Called when the server has announced 140 * Called when the server has announced
257 * that the conclusion is over. 141 * that the conclusion is over.
258 * 142 *
259 * @param consensus consensus handle 143 * @param cls consensus handle
260 * @param msg conclude done message 144 * @param mh conclude done message
261 */ 145 */
262static void 146static void
263handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, 147handle_conclude_done (void *cls,
264 const struct GNUNET_MessageHeader *msg) 148 const struct GNUNET_MessageHeader *msg)
265{ 149{
150 struct GNUNET_CONSENSUS_Handle *consensus = cls;
151
266 GNUNET_CONSENSUS_ConcludeCallback cc; 152 GNUNET_CONSENSUS_ConcludeCallback cc;
267 153
268 GNUNET_assert (NULL != (cc = consensus->conclude_cb)); 154 GNUNET_assert (NULL != (cc = consensus->conclude_cb));
@@ -272,89 +158,6 @@ handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
272 158
273 159
274/** 160/**
275 * Type of a function to call when we receive a message
276 * from the service.
277 *
278 * @param cls closure
279 * @param msg message received, NULL on timeout or fatal error
280 */
281static void
282message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
283{
284 struct GNUNET_CONSENSUS_Handle *consensus = cls;
285
286 LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n");
287
288 if (NULL == msg)
289 {
290 /* Error, timeout, death */
291 LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n");
292 GNUNET_CLIENT_disconnect (consensus->client);
293 consensus->client = NULL;
294 consensus->new_element_cb (consensus->new_element_cls, NULL);
295 return;
296 }
297 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
298 GNUNET_TIME_UNIT_FOREVER_REL);
299 switch (ntohs (msg->type))
300 {
301 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
302 handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
303 break;
304 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
305 handle_conclude_done (consensus, msg);
306 break;
307 default:
308 GNUNET_break (0);
309 }
310}
311
312/**
313 * Function called to notify a client about the connection
314 * begin ready to queue more data. "buf" will be
315 * NULL and "size" zero if the connection was closed for
316 * writing in the meantime.
317 *
318 * @param cls closure
319 * @param size number of bytes available in buf
320 * @param buf where the callee should write the message
321 * @return number of bytes written to buf
322 */
323static size_t
324transmit_join (void *cls, size_t size, void *buf)
325{
326 struct GNUNET_CONSENSUS_JoinMessage *msg;
327 struct GNUNET_CONSENSUS_Handle *consensus;
328 int msize;
329
330 GNUNET_assert (NULL != buf);
331
332 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n");
333
334 consensus = cls;
335 consensus->th = NULL;
336 consensus->joined = 1;
337
338 msg = buf;
339
340 msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
341 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
342
343 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
344 msg->header.size = htons (msize);
345 msg->session_id = consensus->session_id;
346 msg->num_peers = htonl (consensus->num_peers);
347 memcpy(&msg[1],
348 consensus->peers,
349 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
350 send_next (consensus);
351 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
352 GNUNET_TIME_UNIT_FOREVER_REL);
353
354 return msize;
355}
356
357/**
358 * Create a consensus session. 161 * Create a consensus session.
359 * 162 *
360 * @param cfg configuration to use for connecting to the consensus service 163 * @param cfg configuration to use for connecting to the consensus service
@@ -377,7 +180,15 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
377 void *new_element_cls) 180 void *new_element_cls)
378{ 181{
379 struct GNUNET_CONSENSUS_Handle *consensus; 182 struct GNUNET_CONSENSUS_Handle *consensus;
380 size_t join_message_size; 183 struct GNUNET_CONSENSUS_JoinMessage *join_msg;
184 struct GNUNET_MQ_Envelope *ev;
185 const static struct GNUNET_MQ_MessageHandler mq_handlers[] = {
186 {handle_new_element,
187 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0},
188 {handle_conclude_done,
189 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0},
190 GNUNET_MQ_HANDLERS_END
191 };
381 192
382 consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); 193 consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
383 consensus->cfg = cfg; 194 consensus->cfg = cfg;
@@ -393,24 +204,33 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
393 GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); 204 GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
394 205
395 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); 206 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
207 consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client,
208 mq_handlers, consensus);
396 209
397 GNUNET_assert (consensus->client != NULL); 210 GNUNET_assert (consensus->client != NULL);
398 211
399 join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) + 212 ev = GNUNET_MQ_msg_extra (join_msg,
400 (num_peers * sizeof (struct GNUNET_PeerIdentity)); 213 (num_peers * sizeof (struct GNUNET_PeerIdentity)),
401 214 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
402 consensus->th =
403 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
404 join_message_size,
405 GNUNET_TIME_UNIT_FOREVER_REL,
406 GNUNET_NO, &transmit_join, consensus);
407 215
216 join_msg->session_id = consensus->session_id;
217 join_msg->num_peers = htonl (consensus->num_peers);
218 memcpy(&join_msg[1],
219 consensus->peers,
220 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
408 221
409 GNUNET_assert (consensus->th != NULL); 222 GNUNET_MQ_send (consensus->mq, ev);
410 return consensus; 223 return consensus;
411} 224}
412 225
413 226
227static void
228idc_adapter (void *cls)
229{
230 struct InsertDoneInfo *i = cls;
231 i->idc (i->cls, GNUNET_OK);
232 GNUNET_free (i);
233}
414 234
415/** 235/**
416 * Insert an element in the set being reconsiled. Must not be called after 236 * Insert an element in the set being reconsiled. Must not be called after
@@ -428,28 +248,24 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
428 GNUNET_CONSENSUS_InsertDoneCallback idc, 248 GNUNET_CONSENSUS_InsertDoneCallback idc,
429 void *idc_cls) 249 void *idc_cls)
430{ 250{
431 struct QueuedMessage *qmsg;
432 struct GNUNET_CONSENSUS_ElementMessage *element_msg; 251 struct GNUNET_CONSENSUS_ElementMessage *element_msg;
433 size_t element_msg_size; 252 struct GNUNET_MQ_Envelope *ev;
253 struct InsertDoneInfo *i;
434 254
435 LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); 255 LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size);
436 256
437 element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + 257 ev = GNUNET_MQ_msg_extra (element_msg, element->size,
438 element->size); 258 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
439 259
440 element_msg = GNUNET_malloc (element_msg_size);
441 element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
442 element_msg->header.size = htons (element_msg_size);
443 memcpy (&element_msg[1], element->data, element->size); 260 memcpy (&element_msg[1], element->data, element->size);
444 261
445 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); 262 if (NULL != idc)
446 qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; 263 {
447 qmsg->idc = idc; 264 i = GNUNET_new (struct InsertDoneInfo);
448 qmsg->idc_cls = idc_cls; 265 i->idc = idc;
449 266 i->cls = idc_cls;
450 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); 267 GNUNET_MQ_notify_sent (ev, idc_adapter, i);
451 268 }
452 send_next (consensus);
453} 269}
454 270
455 271
@@ -471,7 +287,7 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
471 GNUNET_CONSENSUS_ConcludeCallback conclude, 287 GNUNET_CONSENSUS_ConcludeCallback conclude,
472 void *conclude_cls) 288 void *conclude_cls)
473{ 289{
474 struct QueuedMessage *qmsg; 290 struct GNUNET_MQ_Envelope *ev;
475 struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; 291 struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
476 292
477 GNUNET_assert (NULL != conclude); 293 GNUNET_assert (NULL != conclude);
@@ -480,17 +296,10 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
480 consensus->conclude_cls = conclude_cls; 296 consensus->conclude_cls = conclude_cls;
481 consensus->conclude_cb = conclude; 297 consensus->conclude_cb = conclude;
482 298
483 conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); 299 ev = GNUNET_MQ_msg (conclude_msg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
484 conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
485 conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
486 conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); 300 conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
487 301
488 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); 302 GNUNET_MQ_send (consensus->mq, ev);
489 qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
490
491 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
492
493 send_next (consensus);
494} 303}
495 304
496 305
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 5ebff524c..1c2c78422 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -116,7 +116,7 @@ struct ConsensusSession
116 /** 116 /**
117 * Queued messages to the client. 117 * Queued messages to the client.
118 */ 118 */
119 struct GNUNET_MQ_MessageQueue *client_mq; 119 struct GNUNET_MQ_Handle *client_mq;
120 120
121 /** 121 /**
122 * Timeout for all rounds together, single rounds will schedule a timeout task 122 * Timeout for all rounds together, single rounds will schedule a timeout task
@@ -217,9 +217,9 @@ struct ConsensusPeerInformation
217 struct GNUNET_SET_OperationHandle *set_op; 217 struct GNUNET_SET_OperationHandle *set_op;
218 218
219 /** 219 /**
220 * Has conclude been called on the set_op? 220 * Has commit been called on the set_op?
221 */ 221 */
222 int set_op_concluded; 222 int set_op_commited;
223}; 223};
224 224
225 225
@@ -548,14 +548,14 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
548 GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); 548 GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
549 } 549 }
550 session->partner_outgoing->set_op = 550 session->partner_outgoing->set_op =
551 GNUNET_SET_evaluate (&session->partner_outgoing->peer_id, 551 GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
552 &session->global_id, 552 &session->global_id,
553 (struct GNUNET_MessageHeader *) msg, 553 (struct GNUNET_MessageHeader *) msg,
554 0, /* FIXME */ 554 0, /* FIXME */
555 GNUNET_SET_RESULT_ADDED, 555 GNUNET_SET_RESULT_ADDED,
556 set_result_cb, session->partner_outgoing); 556 set_result_cb, session->partner_outgoing);
557 GNUNET_SET_conclude (session->partner_outgoing->set_op, session->element_set); 557 GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set);
558 session->partner_outgoing->set_op_concluded = GNUNET_YES; 558 session->partner_outgoing->set_op_commited = GNUNET_YES;
559 } 559 }
560 560
561#ifdef GNUNET_EXTRA_LOGGING 561#ifdef GNUNET_EXTRA_LOGGING
@@ -767,12 +767,12 @@ set_listen_cb (void *cls,
767 set_result_cb, &session->info[index]); 767 set_result_cb, &session->info[index]);
768 if (ntohl (msg->exp_subround) == session->exp_subround) 768 if (ntohl (msg->exp_subround) == session->exp_subround)
769 { 769 {
770 cpi->set_op_concluded = GNUNET_YES; 770 cpi->set_op_commited = GNUNET_YES;
771 GNUNET_SET_conclude (cpi->set_op, session->element_set); 771 GNUNET_SET_commit (cpi->set_op, session->element_set);
772 } 772 }
773 else 773 else
774 { 774 {
775 cpi->set_op_concluded = GNUNET_NO; 775 cpi->set_op_commited = GNUNET_NO;
776 } 776 }
777 break; 777 break;
778 default: 778 default:
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c
index b76fd9ab0..92ca5d9da 100644
--- a/src/dv/gnunet-service-dv.c
+++ b/src/dv/gnunet-service-dv.c
@@ -769,7 +769,7 @@ build_set (void *cls)
769 if (DEFAULT_FISHEYE_DEPTH - 1 == neighbor->consensus_insertion_distance) 769 if (DEFAULT_FISHEYE_DEPTH - 1 == neighbor->consensus_insertion_distance)
770 { 770 {
771 /* we have added all elements to the set, run the operation */ 771 /* we have added all elements to the set, run the operation */
772 GNUNET_SET_conclude (neighbor->set_op, 772 GNUNET_SET_commit (neighbor->set_op,
773 neighbor->my_set); 773 neighbor->my_set);
774 GNUNET_SET_destroy (neighbor->my_set); 774 GNUNET_SET_destroy (neighbor->my_set);
775 neighbor->my_set = NULL; 775 neighbor->my_set = NULL;
@@ -1425,13 +1425,13 @@ initiate_set_union (void *cls,
1425 neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK; 1425 neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK;
1426 neighbor->my_set = GNUNET_SET_create (cfg, 1426 neighbor->my_set = GNUNET_SET_create (cfg,
1427 GNUNET_SET_OPERATION_UNION); 1427 GNUNET_SET_OPERATION_UNION);
1428 neighbor->set_op = GNUNET_SET_evaluate (&neighbor->peer, 1428 neighbor->set_op = GNUNET_SET_prepare (&neighbor->peer,
1429 &neighbor->real_session_id, 1429 &neighbor->real_session_id,
1430 NULL, 1430 NULL,
1431 0 /* FIXME: salt */, 1431 0 /* FIXME: salt */,
1432 GNUNET_SET_RESULT_ADDED, 1432 GNUNET_SET_RESULT_ADDED,
1433 &handle_set_union_result, 1433 &handle_set_union_result,
1434 neighbor); 1434 neighbor);
1435 build_set (neighbor); 1435 build_set (neighbor);
1436} 1436}
1437 1437
diff --git a/src/include/gnunet_container_lib.h b/src/include/gnunet_container_lib.h
index d52591148..1eb55a4c5 100644
--- a/src/include/gnunet_container_lib.h
+++ b/src/include/gnunet_container_lib.h
@@ -534,7 +534,7 @@ enum GNUNET_CONTAINER_MultiHashMapOption
534 * GNUNET_NO if not. 534 * GNUNET_NO if not.
535 */ 535 */
536typedef int (*GNUNET_CONTAINER_HashMapIterator) (void *cls, 536typedef int (*GNUNET_CONTAINER_HashMapIterator) (void *cls,
537 const struct GNUNET_HashCode * key, 537 const struct GNUNET_HashCode *key,
538 void *value); 538 void *value);
539 539
540 540
diff --git a/src/include/gnunet_mesh2_service.h b/src/include/gnunet_mesh2_service.h
index c69ac2bfd..b6593cf99 100644
--- a/src/include/gnunet_mesh2_service.h
+++ b/src/include/gnunet_mesh2_service.h
@@ -162,7 +162,8 @@ typedef void (GNUNET_MESH_TunnelEndHandler) (void *cls,
162 * the tunnel. 162 * the tunnel.
163 * @param handlers Callbacks for messages we care about, NULL-terminated. Each 163 * @param handlers Callbacks for messages we care about, NULL-terminated. Each
164 * one must call GNUNET_MESH_receive_done on the tunnel to 164 * one must call GNUNET_MESH_receive_done on the tunnel to
165 * receive the next message. 165 * receive the next message. Messages of a type that is not
166 * in the handlers array are ignored if received.
166 * @param ports NULL or 0-terminated array of port numbers for incoming tunnels. 167 * @param ports NULL or 0-terminated array of port numbers for incoming tunnels.
167 * 168 *
168 * @return handle to the mesh service NULL on error 169 * @return handle to the mesh service NULL on error
@@ -325,7 +326,7 @@ typedef void (*GNUNET_MESH_TunnelCB) (void *cls,
325/** 326/**
326 * Request information about the running mesh peer. 327 * Request information about the running mesh peer.
327 * The callback will be called for every tunnel known to the service, 328 * The callback will be called for every tunnel known to the service,
328 * listing all active peers that blong to the tunnel. 329 * listing all active peers that belong to the tunnel.
329 * 330 *
330 * If called again on the same handle, it will overwrite the previous 331 * If called again on the same handle, it will overwrite the previous
331 * callback and cls. To retrieve the cls, monitor_cancel must be 332 * callback and cls. To retrieve the cls, monitor_cancel must be
@@ -375,6 +376,18 @@ void *
375GNUNET_MESH_get_tunnels_cancel (struct GNUNET_MESH_Handle *h); 376GNUNET_MESH_get_tunnels_cancel (struct GNUNET_MESH_Handle *h);
376 377
377 378
379/**
380 * Create a message queue for a mesh tunnel.
381 * The message queue can only be used to transmit messages,
382 * not to receive them.
383 *
384 * @param tunnel the tunnel to create the message qeue for
385 * @return a message queue to messages over the tunnel
386 */
387struct GNUNET_MQ_Handle *
388GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel);
389
390
378#if 0 /* keep Emacsens' auto-indent happy */ 391#if 0 /* keep Emacsens' auto-indent happy */
379{ 392{
380#endif 393#endif
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h
index 54ea806a5..b73cab8d8 100644
--- a/src/include/gnunet_mq_lib.h
+++ b/src/include/gnunet_mq_lib.h
@@ -21,7 +21,7 @@
21/** 21/**
22 * @author Florian Dold 22 * @author Florian Dold
23 * @file set/mq.h 23 * @file set/mq.h
24 * @brief general purpose request queue 24 * @brief general purpose message queue
25 */ 25 */
26#ifndef GNUNET_MQ_H 26#ifndef GNUNET_MQ_H
27#define GNUNET_MQ_H 27#define GNUNET_MQ_H
@@ -30,7 +30,7 @@
30 30
31 31
32/** 32/**
33 * Allocate a GNUNET_MQ_Message, with extra space allocated after the space needed 33 * Allocate an envelope, with extra space allocated after the space needed
34 * by the message struct. 34 * by the message struct.
35 * The allocated message will already have the type and size field set. 35 * The allocated message will already have the type and size field set.
36 * 36 *
@@ -43,19 +43,19 @@
43#define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), (esize) + sizeof *(mvar), (type)) 43#define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), (esize) + sizeof *(mvar), (type))
44 44
45/** 45/**
46 * Allocate a GNUNET_MQ_Message. 46 * Allocate a GNUNET_MQ_Envelope.
47 * The allocated message will already have the type and size field set. 47 * The contained message will already have the type and size field set.
48 * 48 *
49 * @param mvar variable to store the allocated message in; 49 * @param mvar variable to store the allocated message in;
50 * must have a header field 50 * must have a header field
51 * @param type type of the message 51 * @param type type of the message
52 * @return the MQ message 52 * @return the allocated envelope
53 */ 53 */
54#define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) 54#define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type)
55 55
56 56
57/** 57/**
58 * Allocate a GNUNET_MQ_Message, where the message only consists of a header. 58 * Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
59 * The allocated message will already have the type and size field set. 59 * The allocated message will already have the type and size field set.
60 * 60 *
61 * @param type type of the message 61 * @param type type of the message
@@ -64,7 +64,7 @@
64 64
65 65
66/** 66/**
67 * Allocate a GNUNET_MQ_Message, where the message only consists of a header and extra space. 67 * Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space.
68 * The allocated message will already have the type and size field set. 68 * The allocated message will already have the type and size field set.
69 * 69 *
70 * @param mh pointer that will changed to point at to the allocated message header 70 * @param mh pointer that will changed to point at to the allocated message header
@@ -75,14 +75,14 @@
75 75
76 76
77/** 77/**
78 * Allocate a GNUNET_MQ_Message, and append a payload message after the given 78 * Allocate a GNUNET_MQ_Envelope, and append a payload message after the given
79 * message struct. 79 * message struct.
80 * 80 *
81 * @param mvar pointer to a message struct, will be changed to point at the newly allocated message, 81 * @param mvar pointer to a message struct, will be changed to point at the newly allocated message,
82 * whose size is 'sizeof(*mvar) + ntohs (mh->size)' 82 * whose size is 'sizeof(*mvar) + ntohs (mh->size)'
83 * @param type message type of the allocated message, has no effect on the nested message 83 * @param type message type of the allocated message, has no effect on the nested message
84 * @param mh message to nest 84 * @param mh message to nest
85 * @return a newly allocated 'struct GNUNET_MQ_Message *' 85 * @return a newly allocated 'struct GNUNET_MQ_Envelope *'
86 */ 86 */
87#define GNUNET_MQ_msg_nested_mh(mvar, type, mh) GNUNET_MQ_msg_nested_mh_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), sizeof (*(mvar)), (type), mh) 87#define GNUNET_MQ_msg_nested_mh(mvar, type, mh) GNUNET_MQ_msg_nested_mh_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), sizeof (*(mvar)), (type), mh)
88 88
@@ -98,11 +98,24 @@
98#define GNUNET_MQ_extract_nested_mh(var) GNUNET_MQ_extract_nested_mh_ ((struct GNUNET_MessageHeader *) (var), sizeof (*(var))) 98#define GNUNET_MQ_extract_nested_mh(var) GNUNET_MQ_extract_nested_mh_ ((struct GNUNET_MessageHeader *) (var), sizeof (*(var)))
99 99
100 100
101/**
102 * Implementation of the GNUNET_MQ_extract_nexted_mh macro.
103 *
104 * @param mh message header to extract nested message header from
105 * @param base_size size of the message before the nested message's header appears
106 * @return pointer to the nested message, does not copy the message
107 */
101struct GNUNET_MessageHeader * 108struct GNUNET_MessageHeader *
102GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size); 109GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size);
103 110
104 111
105struct GNUNET_MQ_Message * 112/**
113 * Implementation of the GNUNET_MQ_msg_nested_mh macro.
114 *
115 * @param mhp pointer to the message header pointer that will be changed to allocate at
116 * the newly allocated space for the message.
117 */
118struct GNUNET_MQ_Envelope *
106GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, 119GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
107 const struct GNUNET_MessageHeader *nested_mh); 120 const struct GNUNET_MessageHeader *nested_mh);
108 121
@@ -114,9 +127,15 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size,
114#define GNUNET_MQ_HANDLERS_END {NULL, 0, 0} 127#define GNUNET_MQ_HANDLERS_END {NULL, 0, 0}
115 128
116 129
117struct GNUNET_MQ_MessageQueue; 130/**
131 * Opaque handle to a message queue.
132 */
133struct GNUNET_MQ_Handle;
118 134
119struct GNUNET_MQ_Message; 135/**
136 * Opaque handle to an envelope.
137 */
138struct GNUNET_MQ_Envelope;
120 139
121enum GNUNET_MQ_Error 140enum GNUNET_MQ_Error
122{ 141{
@@ -133,22 +152,45 @@ enum GNUNET_MQ_Error
133 * @param msg the received message 152 * @param msg the received message
134 */ 153 */
135typedef void 154typedef void
136(*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg); 155(*GNUNET_MQ_MessageCallback) (void *cls,
156 const struct GNUNET_MessageHeader *msg);
137 157
138 158
139/** 159/**
140 * Signature of functions implementing the 160 * Signature of functions implementing the
141 * sending part of a message queue 161 * sending functionality of a message queue.
142 * 162 *
143 * @param q the message queue 163 * @param mq the message queue
144 * @param m the message 164 * @param msg the message to send
165 * @param impl_state state of the implementation
166 */
167typedef void
168(*GNUNET_MQ_SendImpl) (struct GNUNET_MQ_Handle *mq,
169 const struct GNUNET_MessageHeader *msg,
170 void *impl_state);
171
172
173/**
174 * Signature of functions implementing the
175 * destruction of a message queue.
176 * Implementations must not free 'mq', but should
177 * take care of 'impl_state'.
178 *
179 * @param mq the message queue to destroy
180 * @param impl_state state of the implementation
145 */ 181 */
146typedef void 182typedef void
147(*GNUNET_MQ_SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m); 183(*GNUNET_MQ_DestroyImpl) (struct GNUNET_MQ_Handle *mq, void *impl_state);
148 184
149 185
186/**
187 * Implementation function that cancels the currently sent message.
188 *
189 * @param mq message queue
190 * @param impl_state state specific to the implementation
191 */
150typedef void 192typedef void
151(*GNUNET_MQ_DestroyImpl) (struct GNUNET_MQ_MessageQueue *q); 193(*GNUNET_MQ_CancelImpl) (struct GNUNET_MQ_Handle *mq, void *impl_state);
152 194
153 195
154/** 196/**
@@ -160,117 +202,23 @@ typedef void
160(*GNUNET_MQ_NotifyCallback) (void *cls); 202(*GNUNET_MQ_NotifyCallback) (void *cls);
161 203
162 204
163typedef void
164(*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error);
165
166
167struct GNUNET_MQ_Message
168{
169 /**
170 * Messages are stored in a linked list
171 */
172 struct GNUNET_MQ_Message *next;
173
174 /**
175 * Messages are stored in a linked list
176 */
177 struct GNUNET_MQ_Message *prev;
178
179 /**
180 * Actual allocated message header,
181 * usually points to the end of the containing GNUNET_MQ_Message
182 */
183 struct GNUNET_MessageHeader *mh;
184
185 /**
186 * Queue the message is queued in, NULL if message is not queued.
187 */
188 struct GNUNET_MQ_MessageQueue *parent_queue;
189
190 /**
191 * Called after the message was sent irrevokably
192 */
193 GNUNET_MQ_NotifyCallback sent_cb;
194
195 /**
196 * Closure for send_cb
197 */
198 void *sent_cls;
199};
200
201
202/** 205/**
203 * Handle to a message queue. 206 * Generic error handler, called with the appropriate
207 * error code and the same closure specified at the creation of
208 * the message queue.
209 * Not every message queue implementation supports an error handler.
210 *
211 * @param cls closure, same closure as for the message handlers
212 * @param error error code
204 */ 213 */
205struct GNUNET_MQ_MessageQueue 214typedef void
206{ 215(*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error);
207 /**
208 * Handlers array, or NULL if the queue should not receive messages
209 */
210 const struct GNUNET_MQ_Handler *handlers;
211
212 /**
213 * Closure for the handler callbacks,
214 * as well as for the error handler.
215 */
216 void *handlers_cls;
217
218 /**
219 * Actual implementation of message sending,
220 * called when a message is added
221 */
222 GNUNET_MQ_SendImpl send_impl;
223
224 /**
225 * Implementation-dependent queue destruction function
226 */
227 GNUNET_MQ_DestroyImpl destroy_impl;
228
229 /**
230 * Implementation-specific state
231 */
232 void *impl_state;
233
234 /**
235 * Callback will be called when an error occurs.
236 */
237 GNUNET_MQ_ErrorHandler error_handler;
238
239 /**
240 * Linked list of messages pending to be sent
241 */
242 struct GNUNET_MQ_Message *msg_head;
243
244 /**
245 * Linked list of messages pending to be sent
246 */
247 struct GNUNET_MQ_Message *msg_tail;
248
249 /**
250 * Message that is currently scheduled to be
251 * sent. Not the head of the message queue, as the implementation
252 * needs to know if sending has been already scheduled or not.
253 */
254 struct GNUNET_MQ_Message *current_msg;
255
256 /**
257 * Map of associations, lazily allocated
258 */
259 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
260
261 /**
262 * Next id that should be used for the assoc_map,
263 * initialized lazily to a random value together with
264 * assoc_map
265 */
266 uint32_t assoc_id;
267};
268 216
269 217
270/** 218/**
271 * Message handler for a specific message type. 219 * Message handler for a specific message type.
272 */ 220 */
273struct GNUNET_MQ_Handler 221struct GNUNET_MQ_MessageHandler
274{ 222{
275 /** 223 /**
276 * Callback, called every time a new message of 224 * Callback, called every time a new message of
@@ -296,14 +244,14 @@ struct GNUNET_MQ_Handler
296 244
297 245
298/** 246/**
299 * Create a new message for MQ. 247 * Create a new envelope.
300 * 248 *
301 * @param mhp message header to store the allocated message header in, can be NULL 249 * @param mhp message header to store the allocated message header in, can be NULL
302 * @param size size of the message to allocate 250 * @param size size of the message to allocate
303 * @param type type of the message, will be set in the allocated message 251 * @param type type of the message, will be set in the allocated message
304 * @return the allocated MQ message 252 * @return the allocated MQ message
305 */ 253 */
306struct GNUNET_MQ_Message * 254struct GNUNET_MQ_Envelope *
307GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type); 255GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type);
308 256
309 257
@@ -315,7 +263,7 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
315 * @param mqm the message to discard 263 * @param mqm the message to discard
316 */ 264 */
317void 265void
318GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm); 266GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm);
319 267
320 268
321/** 269/**
@@ -326,7 +274,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm);
326 * @param mqm the message to send. 274 * @param mqm the message to send.
327 */ 275 */
328void 276void
329GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm); 277GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev);
330 278
331 279
332/** 280/**
@@ -336,7 +284,7 @@ GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm
336 * @param mqm queued message to cancel 284 * @param mqm queued message to cancel
337 */ 285 */
338void 286void
339GNUNET_MQ_send_cancel (struct GNUNET_MQ_Message *mqm); 287GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev);
340 288
341 289
342/** 290/**
@@ -347,9 +295,7 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Message *mqm);
347 * @param assoc_data to associate 295 * @param assoc_data to associate
348 */ 296 */
349uint32_t 297uint32_t
350GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, 298GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data);
351 struct GNUNET_MQ_Message *mqm,
352 void *assoc_data);
353 299
354/** 300/**
355 * Get the data associated with a request id in a queue 301 * Get the data associated with a request id in a queue
@@ -359,7 +305,7 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
359 * @return the associated data 305 * @return the associated data
360 */ 306 */
361void * 307void *
362GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); 308GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id);
363 309
364 310
365/** 311/**
@@ -370,7 +316,7 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id);
370 * @return the associated data 316 * @return the associated data
371 */ 317 */
372void * 318void *
373GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); 319GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id);
374 320
375 321
376 322
@@ -383,9 +329,9 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id);
383 * @param cls closure for the handlers 329 * @param cls closure for the handlers
384 * @return the message queue 330 * @return the message queue
385 */ 331 */
386struct GNUNET_MQ_MessageQueue * 332struct GNUNET_MQ_Handle *
387GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, 333GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
388 const struct GNUNET_MQ_Handler *handlers, 334 const struct GNUNET_MQ_MessageHandler *handlers,
389 void *cls); 335 void *cls);
390 336
391 337
@@ -395,7 +341,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti
395 * @param client the client 341 * @param client the client
396 * @return the message queue 342 * @return the message queue
397 */ 343 */
398struct GNUNET_MQ_MessageQueue * 344struct GNUNET_MQ_Handle *
399GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client); 345GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client);
400 346
401 347
@@ -404,16 +350,19 @@ GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client);
404 * 350 *
405 * @param send function the implements sending messages 351 * @param send function the implements sending messages
406 * @param destroy function that implements destroying the queue 352 * @param destroy function that implements destroying the queue
353 * @param destroy function that implements canceling a message
407 * @param state for the queue, passed to 'send' and 'destroy' 354 * @param state for the queue, passed to 'send' and 'destroy'
408 * @param handlers array of message handlers 355 * @param handlers array of message handlers
409 * @param error_handler handler for read and write errors 356 * @param error_handler handler for read and write errors
357 * @param cls closure for handlers
410 * @return a new message queue 358 * @return a new message queue
411 */ 359 */
412struct GNUNET_MQ_MessageQueue * 360struct GNUNET_MQ_Handle *
413GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, 361GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
414 GNUNET_MQ_DestroyImpl destroy, 362 GNUNET_MQ_DestroyImpl destroy,
363 GNUNET_MQ_CancelImpl cancel,
415 void *impl_state, 364 void *impl_state,
416 struct GNUNET_MQ_Handler *handlers, 365 const struct GNUNET_MQ_MessageHandler *handlers,
417 GNUNET_MQ_ErrorHandler error_handler, 366 GNUNET_MQ_ErrorHandler error_handler,
418 void *cls); 367 void *cls);
419 368
@@ -424,27 +373,30 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
424 * Takes effect immediately, even for messages that already have been received, but for 373 * Takes effect immediately, even for messages that already have been received, but for
425 * with the handler has not been called. 374 * with the handler has not been called.
426 * 375 *
376 * If the message queue does not support receiving messages,
377 * this function has no effect.
378 *
427 * @param mq message queue 379 * @param mq message queue
428 * @param new_handlers new handlers 380 * @param new_handlers new handlers
429 * @param cls new closure for the handlers 381 * @param cls new closure for the handlers
430 */ 382 */
431void 383void
432GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, 384GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
433 const struct GNUNET_MQ_Handler *new_handlers, 385 const struct GNUNET_MQ_MessageHandler *new_handlers,
434 void *cls); 386 void *cls);
435 387
436 388
437/** 389/**
438 * Call a callback once the message has been sent, that is, the message 390 * Call a callback once the envelope has been sent, that is,
439 * can not be canceled anymore. 391 * sending it can not be canceled anymore.
440 * There can be only one notify sent callback per message. 392 * There can be only one notify sent callback per envelope.
441 * 393 *
442 * @param mqm message to call the notify callback for 394 * @param ev message to call the notify callback for
443 * @param cb the notify callback 395 * @param cb the notify callback
444 * @param cls closure for the callback 396 * @param cls closure for the callback
445 */ 397 */
446void 398void
447GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, 399GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
448 GNUNET_MQ_NotifyCallback cb, 400 GNUNET_MQ_NotifyCallback cb,
449 void *cls); 401 void *cls);
450 402
@@ -455,7 +407,7 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
455 * @param mq message queue to destroy 407 * @param mq message queue to destroy
456 */ 408 */
457void 409void
458GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq); 410GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq);
459 411
460 412
461/** 413/**
@@ -465,7 +417,70 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq);
465 * @param mh message to dispatch 417 * @param mh message to dispatch
466 */ 418 */
467void 419void
468GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, 420GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
469 const struct GNUNET_MessageHeader *mh); 421 const struct GNUNET_MessageHeader *mh);
422
423
424/**
425 * Call the right callback for an error condition.
426 *
427 * @param mq message queue
428 */
429void
430GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
431 enum GNUNET_MQ_Error error);
432
433
434/**
435 * Call the send implementation for the next queued message,
436 * if any.
437 * Only useful for implementing message queues,
438 * results in undefined behavior if not used carefully.
439 *
440 * @param mq message queue to send the next message with
441 */
442void
443GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq);
444
445
446/**
447 * Get the message that should currently be sent.
448 * Fails if there is no current message.
449 * Only useful for implementing message queues,
450 * results in undefined behavior if not used carefully.
451 *
452 * @param mq message queue with the current message
453 * @return message to send, never NULL
454 */
455const struct GNUNET_MessageHeader *
456GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq);
457
458
459/**
460 * Get the implementation state associated with the
461 * message queue.
462 *
463 * While the GNUNET_MQ_Impl* callbacks receive the
464 * implementation state, continuations that are scheduled
465 * by the implementation function often only have one closure
466 * argument, with this function it is possible to get at the
467 * implementation state when only passing the GNUNET_MQ_Handle
468 * as closure.
469 *
470 * @param mq message queue with the current message
471 * @return message to send, never NULL
472 */
473void *
474GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq);
475
476/**
477 * Mark the current message as irrevocably sent, but do not
478 * proceed with sending the next message.
479 * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
480 *
481 * @param mq message queue
482 */
483void
484GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq);
470 485
471#endif 486#endif
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h
index 34c9312d1..e08ed5d69 100644
--- a/src/include/gnunet_set_service.h
+++ b/src/include/gnunet_set_service.h
@@ -257,9 +257,9 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set);
257 257
258 258
259/** 259/**
260 * Create a set operation for evaluation with another peer. 260 * Prepare a set operation to be evaluated with another peer.
261 * The evaluation will not start until the client provides 261 * The evaluation will not start until the client provides
262 * a local set with GNUNET_SET_conclude. 262 * a local set with GNUNET_SET_commit.
263 * 263 *
264 * @param other_peer peer with the other set 264 * @param other_peer peer with the other set
265 * @param app_id hash for the application using the set 265 * @param app_id hash for the application using the set
@@ -273,14 +273,14 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set);
273 * @param result_cls closure for result_cb 273 * @param result_cls closure for result_cb
274 * @return a handle to cancel the operation 274 * @return a handle to cancel the operation
275 */ 275 */
276struct GNUNET_SET_OperationHandle * // FIXME: rename to _connect? 276struct GNUNET_SET_OperationHandle *
277GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer, 277GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
278 const struct GNUNET_HashCode *app_id, 278 const struct GNUNET_HashCode *app_id,
279 const struct GNUNET_MessageHeader *context_msg, 279 const struct GNUNET_MessageHeader *context_msg,
280 uint16_t salt, 280 uint16_t salt,
281 enum GNUNET_SET_ResultMode result_mode, 281 enum GNUNET_SET_ResultMode result_mode,
282 GNUNET_SET_ResultIterator result_cb, 282 GNUNET_SET_ResultIterator result_cb,
283 void *result_cls); 283 void *result_cls);
284 284
285 285
286/** 286/**
@@ -316,7 +316,7 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh);
316 * Accept a request we got via GNUNET_SET_listen. Must be called during 316 * Accept a request we got via GNUNET_SET_listen. Must be called during
317 * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid 317 * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
318 * afterwards. 318 * afterwards.
319 * Call GNUNET_SET_conclude to provide the local set to use for the operation, 319 * Call GNUNET_SET_commit to provide the local set to use for the operation,
320 * and to begin the exchange with the remote peer. 320 * and to begin the exchange with the remote peer.
321 * 321 *
322 * @param request request to accept 322 * @param request request to accept
@@ -334,7 +334,7 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
334 334
335 335
336/** 336/**
337 * Conclude the given set operation using the given set. 337 * Commit a set to be used with a set operation.
338 * This function is called once we have fully constructed 338 * This function is called once we have fully constructed
339 * the set that we want to use for the operation. At this 339 * the set that we want to use for the operation. At this
340 * time, the P2P protocol can then begin to exchange the 340 * time, the P2P protocol can then begin to exchange the
@@ -344,9 +344,9 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
344 * @param oh handle to the set operation 344 * @param oh handle to the set operation
345 * @param set the set to use for the operation 345 * @param set the set to use for the operation
346 */ 346 */
347void // FIXME: rename to _commit 347void
348GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh, 348GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
349 struct GNUNET_SET_Handle *set); 349 struct GNUNET_SET_Handle *set);
350 350
351 351
352/** 352/**
diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h
index ece60c033..65e247ece 100644
--- a/src/include/gnunet_stream_lib.h
+++ b/src/include/gnunet_stream_lib.h
@@ -403,9 +403,9 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh);
403 * @param error_handler callback for errors 403 * @param error_handler callback for errors
404 * @return the message queue for the socket 404 * @return the message queue for the socket
405 */ 405 */
406struct GNUNET_MQ_MessageQueue * 406struct GNUNET_MQ_Handle *
407GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, 407GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket,
408 const struct GNUNET_MQ_Handler *msg_handlers, 408 const struct GNUNET_MQ_MessageHandler *msg_handlers,
409 GNUNET_MQ_ErrorHandler error_handler, 409 GNUNET_MQ_ErrorHandler error_handler,
410 void *cls); 410 void *cls);
411 411
diff --git a/src/mesh/mesh2_api.c b/src/mesh/mesh2_api.c
index 0e4c3b8e6..11ff743ca 100644
--- a/src/mesh/mesh2_api.c
+++ b/src/mesh/mesh2_api.c
@@ -320,6 +320,24 @@ struct GNUNET_MESH_Tunnel
320}; 320};
321 321
322 322
323/**
324 * Implementation state for mesh's message queue.
325 */
326struct MeshMQState
327{
328 /**
329 * The current transmit handle, or NULL
330 * if no transmit is active.
331 */
332 struct GNUNET_MESH_TransmitHandle *th;
333
334 /**
335 * Tunnel to send the data over.
336 */
337 struct GNUNET_MESH_Tunnel *tunnel;
338};
339
340
323/******************************************************************************/ 341/******************************************************************************/
324/*********************** DECLARATIONS *************************/ 342/*********************** DECLARATIONS *************************/
325/******************************************************************************/ 343/******************************************************************************/
@@ -1685,4 +1703,115 @@ GNUNET_MESH_show_tunnel (struct GNUNET_MESH_Handle *h,
1685 h->tunnel_cls = callback_cls; 1703 h->tunnel_cls = callback_cls;
1686 1704
1687 return; 1705 return;
1688} \ No newline at end of file 1706}
1707
1708
1709/**
1710 * Function called to notify a client about the connection
1711 * begin ready to queue more data. "buf" will be
1712 * NULL and "size" zero if the connection was closed for
1713 * writing in the meantime.
1714 *
1715 * @param cls closure
1716 * @param size number of bytes available in buf
1717 * @param buf where the callee should write the message
1718 * @return number of bytes written to buf
1719 */
1720static size_t
1721mesh_mq_ntr (void *cls, size_t size,
1722 void *buf)
1723{
1724 struct GNUNET_MQ_Handle *mq = cls;
1725 struct MeshMQState *state = GNUNET_MQ_impl_state (mq);
1726 const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
1727 uint16_t msize;
1728
1729 state->th = NULL;
1730 if (NULL == buf)
1731 {
1732 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
1733 return 0;
1734 }
1735 msize = ntohs (msg->size);
1736 GNUNET_assert (msize <= size);
1737 memcpy (buf, msg, msize);
1738 GNUNET_MQ_impl_send_continue (mq);
1739 return msize;
1740}
1741
1742
1743/**
1744 * Signature of functions implementing the
1745 * sending functionality of a message queue.
1746 *
1747 * @param mq the message queue
1748 * @param msg the message to send
1749 * @param impl_state state of the implementation
1750 */
1751static void
1752mesh_mq_send_impl (struct GNUNET_MQ_Handle *mq,
1753 const struct GNUNET_MessageHeader *msg, void *impl_state)
1754{
1755 struct MeshMQState *state = impl_state;
1756
1757 GNUNET_assert (NULL == state->th);
1758 GNUNET_MQ_impl_send_commit (mq);
1759 state->th =
1760 GNUNET_MESH_notify_transmit_ready (state->tunnel,
1761 /* FIXME: add option for corking */
1762 GNUNET_NO,
1763 GNUNET_TIME_UNIT_FOREVER_REL,
1764 ntohs (msg->size),
1765 mesh_mq_ntr, mq);
1766
1767}
1768
1769
1770/**
1771 * Signature of functions implementing the
1772 * destruction of a message queue.
1773 * Implementations must not free 'mq', but should
1774 * take care of 'impl_state'.
1775 *
1776 * @param mq the message queue to destroy
1777 * @param impl_state state of the implementation
1778 */
1779static void
1780mesh_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
1781{
1782 struct MeshMQState *state = impl_state;
1783
1784 if (NULL != state->th)
1785 GNUNET_MESH_notify_transmit_ready_cancel (state->th);
1786
1787 GNUNET_free (state);
1788}
1789
1790
1791/**
1792 * Create a message queue for a mesh tunnel.
1793 * The message queue can only be used to transmit messages,
1794 * not to receive them.
1795 *
1796 * @param tunnel the tunnel to create the message qeue for
1797 * @return a message queue to messages over the tunnel
1798 */
1799struct GNUNET_MQ_Handle *
1800GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel)
1801{
1802 struct GNUNET_MQ_Handle *mq;
1803 struct MeshMQState *state;
1804
1805 state = GNUNET_new (struct MeshMQState);
1806 state->tunnel = tunnel;
1807
1808 mq = GNUNET_MQ_queue_for_callbacks (mesh_mq_send_impl,
1809 mesh_mq_destroy_impl,
1810 NULL, /* FIXME: cancel impl. */
1811 state,
1812 NULL, /* no msg handlers */
1813 NULL, /* no err handlers */
1814 NULL); /* no handler cls */
1815 return mq;
1816}
1817
diff --git a/src/set/Makefile.am b/src/set/Makefile.am
index 71e71c867..c2449e0ea 100644
--- a/src/set/Makefile.am
+++ b/src/set/Makefile.am
@@ -16,7 +16,7 @@ if USE_COVERAGE
16endif 16endif
17 17
18bin_PROGRAMS = \ 18bin_PROGRAMS = \
19 gnunet-set 19 gnunet-set-profiler gnunet-set-ibf-profiler
20 20
21libexec_PROGRAMS = \ 21libexec_PROGRAMS = \
22 gnunet-service-set 22 gnunet-service-set
@@ -24,17 +24,24 @@ libexec_PROGRAMS = \
24lib_LTLIBRARIES = \ 24lib_LTLIBRARIES = \
25 libgnunetset.la 25 libgnunetset.la
26 26
27gnunet_set_SOURCES = \ 27gnunet_set_profiler_SOURCES = \
28 gnunet-set.c 28 gnunet-set-profiler.c
29gnunet_set_LDADD = \ 29gnunet_set_profiler_LDADD = \
30 $(top_builddir)/src/util/libgnunetutil.la \ 30 $(top_builddir)/src/util/libgnunetutil.la \
31 $(top_builddir)/src/set/libgnunetset.la \ 31 $(top_builddir)/src/set/libgnunetset.la \
32 $(top_builddir)/src/stream/libgnunetstream.la \ 32 $(top_builddir)/src/testing/libgnunettesting.la \
33 $(top_builddir)/src/testbed/libgnunettestbed.la \
34 $(GN_LIBINTL) 33 $(GN_LIBINTL)
35gnunet_set_DEPENDENCIES = \ 34gnunet_set_profiler_DEPENDENCIES = \
36 libgnunetset.la 35 libgnunetset.la
37 36
37
38gnunet_set_ibf_profiler_SOURCES = \
39 gnunet-set-ibf-profiler.c \
40 ibf.c
41gnunet_set_ibf_profiler_LDADD = \
42 $(top_builddir)/src/util/libgnunetutil.la \
43 $(GN_LIBINTL)
44
38gnunet_service_set_SOURCES = \ 45gnunet_service_set_SOURCES = \
39 gnunet-service-set.c \ 46 gnunet-service-set.c \
40 gnunet-service-set_union.c \ 47 gnunet-service-set_union.c \
@@ -43,8 +50,7 @@ gnunet_service_set_SOURCES = \
43gnunet_service_set_LDADD = \ 50gnunet_service_set_LDADD = \
44 $(top_builddir)/src/util/libgnunetutil.la \ 51 $(top_builddir)/src/util/libgnunetutil.la \
45 $(top_builddir)/src/core/libgnunetcore.la \ 52 $(top_builddir)/src/core/libgnunetcore.la \
46 $(top_builddir)/src/stream/libgnunetstream.la \ 53 $(top_builddir)/src/mesh/libgnunetmesh2.la \
47 $(top_builddir)/src/mesh/libgnunetmesh.la \
48 $(GN_LIBINTL) 54 $(GN_LIBINTL)
49 55
50libgnunetset_la_SOURCES = \ 56libgnunetset_la_SOURCES = \
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index d2f0b48d5..bd934de84 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -29,13 +29,16 @@
29 29
30/** 30/**
31 * Configuration of our local peer. 31 * Configuration of our local peer.
32 * (Not declared 'static' as also needed in gnunet-service-set_union.c)
32 */ 33 */
33const struct GNUNET_CONFIGURATION_Handle *configuration; 34const struct GNUNET_CONFIGURATION_Handle *configuration;
34 35
35/** 36/**
36 * Socket listening for other peers via stream. 37 * Handle to the mesh service, used
38 * to listen for and connect to remote peers.
39 * (Not declared 'static' as also needed in gnunet-service-set_union.c)
37 */ 40 */
38static struct GNUNET_STREAM_ListenSocket *stream_listen_socket; 41struct GNUNET_MESH_Handle *mesh;
39 42
40/** 43/**
41 * Sets are held in a doubly linked list. 44 * Sets are held in a doubly linked list.
@@ -78,14 +81,14 @@ static uint32_t accept_id = 1;
78 81
79 82
80/** 83/**
81 * Get set that is owned by the client, if any. 84 * Get set that is owned by the given client, if any.
82 * 85 *
83 * @param client client to look for 86 * @param client client to look for
84 * @return set that the client owns, NULL if the client 87 * @return set that the client owns, NULL if the client
85 * does not own a set 88 * does not own a set
86 */ 89 */
87static struct Set * 90static struct Set *
88get_set (struct GNUNET_SERVER_Client *client) 91set_get (struct GNUNET_SERVER_Client *client)
89{ 92{
90 struct Set *set; 93 struct Set *set;
91 for (set = sets_head; NULL != set; set = set->next) 94 for (set = sets_head; NULL != set; set = set->next)
@@ -137,7 +140,7 @@ get_incoming (uint32_t id)
137 * @param listener listener to destroy 140 * @param listener listener to destroy
138 */ 141 */
139static void 142static void
140destroy_listener (struct Listener *listener) 143listener_destroy (struct Listener *listener)
141{ 144{
142 if (NULL != listener->client_mq) 145 if (NULL != listener->client_mq)
143 { 146 {
@@ -155,7 +158,7 @@ destroy_listener (struct Listener *listener)
155 * @param set the set to destroy 158 * @param set the set to destroy
156 */ 159 */
157static void 160static void
158destroy_set (struct Set *set) 161set_destroy (struct Set *set)
159{ 162{
160 switch (set->operation) 163 switch (set->operation)
161 { 164 {
@@ -187,12 +190,12 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
187 struct Set *set; 190 struct Set *set;
188 struct Listener *listener; 191 struct Listener *listener;
189 192
190 set = get_set (client); 193 set = set_get (client);
191 if (NULL != set) 194 if (NULL != set)
192 destroy_set (set); 195 set_destroy (set);
193 listener = get_listener (client); 196 listener = get_listener (client);
194 if (NULL != listener) 197 if (NULL != listener)
195 destroy_listener (listener); 198 listener_destroy (listener);
196} 199}
197 200
198 201
@@ -202,17 +205,14 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
202 * @param incoming remote request to destroy 205 * @param incoming remote request to destroy
203 */ 206 */
204static void 207static void
205destroy_incoming (struct Incoming *incoming) 208incoming_destroy (struct Incoming *incoming)
206{ 209{
207 if (NULL != incoming->mq) 210 if (NULL != incoming->tc)
208 { 211 {
209 GNUNET_MQ_destroy (incoming->mq); 212 GNUNET_free (incoming->tc);
210 incoming->mq = NULL; 213 GNUNET_assert (NULL != incoming->tc->tunnel);
211 } 214 GNUNET_MESH_tunnel_destroy (incoming->tc->tunnel);
212 if (NULL != incoming->socket) 215 incoming->tc = NULL;
213 {
214 GNUNET_STREAM_close (incoming->socket);
215 incoming->socket = NULL;
216 } 216 }
217 GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); 217 GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
218 GNUNET_free (incoming); 218 GNUNET_free (incoming);
@@ -237,6 +237,15 @@ get_listener_by_target (enum GNUNET_SET_OperationType op,
237} 237}
238 238
239 239
240
241static void
242tunnel_context_destroy (struct TunnelContext *tc)
243{
244 GNUNET_free (tc);
245 /* FIXME destroy the rest */
246}
247
248
240/** 249/**
241 * Handle a request for a set operation from 250 * Handle a request for a set operation from
242 * another peer. 251 * another peer.
@@ -244,16 +253,31 @@ get_listener_by_target (enum GNUNET_SET_OperationType op,
244 * @param cls the incoming socket 253 * @param cls the incoming socket
245 * @param mh the message 254 * @param mh the message
246 */ 255 */
247static void 256static int
248handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) 257handle_p2p_operation_request (void *cls,
258 struct GNUNET_MESH_Tunnel *tunnel,
259 void **tunnel_ctx,
260 const struct GNUNET_PeerIdentity *sender,
261 const struct GNUNET_MessageHeader *mh)
249{ 262{
250 struct Incoming *incoming = cls; 263 struct TunnelContext *tc = *tunnel_ctx;
264 struct Incoming *incoming;
251 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; 265 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
252 struct GNUNET_MQ_Message *mqm; 266 struct GNUNET_MQ_Envelope *mqm;
253 struct GNUNET_SET_RequestMessage *cmsg; 267 struct GNUNET_SET_RequestMessage *cmsg;
254 struct Listener *listener; 268 struct Listener *listener;
255 const struct GNUNET_MessageHeader *context_msg; 269 const struct GNUNET_MessageHeader *context_msg;
256 270
271 if (CONTEXT_INCOMING != tc->type)
272 {
273 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected operation request\n");
274 tunnel_context_destroy (tc);
275 /* don't kill the whole mesh connection */
276 return GNUNET_OK;
277 }
278
279 incoming = tc->data;
280
257 context_msg = GNUNET_MQ_extract_nested_mh (msg); 281 context_msg = GNUNET_MQ_extract_nested_mh (msg);
258 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n", 282 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n",
259 ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); 283 ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
@@ -263,20 +287,26 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
263 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 287 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
264 "set operation request from peer failed: " 288 "set operation request from peer failed: "
265 "no set with matching application ID and operation type\n"); 289 "no set with matching application ID and operation type\n");
266 return; 290 tunnel_context_destroy (tc);
291 /* don't kill the whole mesh connection */
292 return GNUNET_OK;
267 } 293 }
268 mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg); 294 mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg);
269 if (NULL == mqm) 295 if (NULL == mqm)
270 { 296 {
271 /* FIXME: disconnect the peer */ 297 /* FIXME: disconnect the peer */
272 GNUNET_break_op (0); 298 GNUNET_break_op (0);
273 return; 299 tunnel_context_destroy (tc);
300 /* don't kill the whole mesh connection */
301 return GNUNET_OK;
274 } 302 }
275 incoming->accept_id = accept_id++; 303 incoming->accept_id = accept_id++;
276 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id); 304 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id);
277 cmsg->accept_id = htonl (incoming->accept_id); 305 cmsg->accept_id = htonl (incoming->accept_id);
278 cmsg->peer_id = incoming->peer; 306 cmsg->peer_id = incoming->tc->peer;
279 GNUNET_MQ_send (listener->client_mq, mqm); 307 GNUNET_MQ_send (listener->client_mq, mqm);
308
309 return GNUNET_OK;
280} 310}
281 311
282 312
@@ -298,7 +328,7 @@ handle_client_create (void *cls,
298 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", 328 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n",
299 ntohs (msg->operation)); 329 ntohs (msg->operation));
300 330
301 if (NULL != get_set (client)) 331 if (NULL != set_get (client))
302 { 332 {
303 GNUNET_break (0); 333 GNUNET_break (0);
304 GNUNET_SERVER_client_disconnect (client); 334 GNUNET_SERVER_client_disconnect (client);
@@ -379,7 +409,7 @@ handle_client_remove (void *cls,
379{ 409{
380 struct Set *set; 410 struct Set *set;
381 411
382 set = get_set (client); 412 set = set_get (client);
383 if (NULL == set) 413 if (NULL == set)
384 { 414 {
385 GNUNET_break (0); 415 GNUNET_break (0);
@@ -428,7 +458,7 @@ handle_client_reject (void *cls,
428 return; 458 return;
429 } 459 }
430 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); 460 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
431 destroy_incoming (incoming); 461 incoming_destroy (incoming);
432 GNUNET_SERVER_receive_done (client, GNUNET_OK); 462 GNUNET_SERVER_receive_done (client, GNUNET_OK);
433} 463}
434 464
@@ -449,7 +479,7 @@ handle_client_add (void *cls,
449{ 479{
450 struct Set *set; 480 struct Set *set;
451 481
452 set = get_set (client); 482 set = set_get (client);
453 if (NULL == set) 483 if (NULL == set)
454 { 484 {
455 GNUNET_break (0); 485 GNUNET_break (0);
@@ -486,7 +516,7 @@ handle_client_evaluate (void *cls,
486{ 516{
487 struct Set *set; 517 struct Set *set;
488 518
489 set = get_set (client); 519 set = set_get (client);
490 if (NULL == set) 520 if (NULL == set)
491 { 521 {
492 GNUNET_break (0); 522 GNUNET_break (0);
@@ -558,8 +588,7 @@ handle_client_accept (void *cls,
558 return; 588 return;
559 } 589 }
560 590
561 591 set = set_get (client);
562 set = get_set (client);
563 592
564 if (NULL == set) 593 if (NULL == set)
565 { 594 {
@@ -584,51 +613,12 @@ handle_client_accept (void *cls,
584 613
585 /* note: _GSS_*_accept has to make sure the socket and mq are set to NULL, 614 /* note: _GSS_*_accept has to make sure the socket and mq are set to NULL,
586 * otherwise they will be destroyed and disconnected */ 615 * otherwise they will be destroyed and disconnected */
587 destroy_incoming (incoming); 616 incoming_destroy (incoming);
588 GNUNET_SERVER_receive_done (client, GNUNET_OK); 617 GNUNET_SERVER_receive_done (client, GNUNET_OK);
589} 618}
590 619
591 620
592/** 621/**
593 * Functions of this type are called upon new stream connection from other peers
594 * or upon binding error which happen when the app_port given in
595 * GNUNET_STREAM_listen() is already taken.
596 *
597 * @param cls the closure from GNUNET_STREAM_listen
598 * @param socket the socket representing the stream; NULL on binding error
599 * @param initiator the identity of the peer who wants to establish a stream
600 * with us; NULL on binding error
601 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
602 * stream (the socket will be invalid after the call)
603 */
604static int
605stream_listen_cb (void *cls,
606 struct GNUNET_STREAM_Socket *socket,
607 const struct GNUNET_PeerIdentity *initiator)
608{
609 struct Incoming *incoming;
610 static const struct GNUNET_MQ_Handler handlers[] = {
611 {handle_p2p_operation_request, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST},
612 GNUNET_MQ_HANDLERS_END
613 };
614
615 if (NULL == socket)
616 {
617 GNUNET_break (0);
618 return GNUNET_SYSERR;
619 }
620
621 incoming = GNUNET_new (struct Incoming);
622 incoming->peer = *initiator;
623 incoming->socket = socket;
624 incoming->mq = GNUNET_STREAM_mq_create (incoming->socket, handlers, NULL, incoming);
625 /* FIXME: timeout for peers that only connect but don't send anything */
626 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
627 return GNUNET_OK;
628}
629
630
631/**
632 * Called to clean up, after a shutdown has been requested. 622 * Called to clean up, after a shutdown has been requested.
633 * 623 *
634 * @param cls closure 624 * @param cls closure
@@ -638,31 +628,126 @@ static void
638shutdown_task (void *cls, 628shutdown_task (void *cls,
639 const struct GNUNET_SCHEDULER_TaskContext *tc) 629 const struct GNUNET_SCHEDULER_TaskContext *tc)
640{ 630{
641 if (NULL != stream_listen_socket) 631 if (NULL != mesh)
642 { 632 {
643 GNUNET_STREAM_listen_close (stream_listen_socket); 633 GNUNET_MESH_disconnect (mesh);
644 stream_listen_socket = NULL; 634 mesh = NULL;
645 } 635 }
646 636
647 while (NULL != incoming_head) 637 while (NULL != incoming_head)
648 { 638 {
649 destroy_incoming (incoming_head); 639 incoming_destroy (incoming_head);
650 } 640 }
651 641
652 while (NULL != listeners_head) 642 while (NULL != listeners_head)
653 { 643 {
654 destroy_listener (listeners_head); 644 listener_destroy (listeners_head);
655 } 645 }
656 646
657 while (NULL != sets_head) 647 while (NULL != sets_head)
658 { 648 {
659 destroy_set (sets_head); 649 set_destroy (sets_head);
660 } 650 }
661 651
662 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); 652 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
663} 653}
664 654
665 655
656
657/**
658 * Signature of the main function of a task.
659 *
660 * @param cls closure
661 * @param tc context information (why was this task triggered now)
662 */
663static void
664incoming_timeout_cb (void *cls,
665 const struct GNUNET_SCHEDULER_TaskContext *tc)
666{
667 struct Incoming *incoming = cls;
668
669 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "remote peer timed out");
670 incoming_destroy (incoming);
671}
672
673
674/**
675 * Method called whenever another peer has added us to a tunnel
676 * the other peer initiated.
677 * Only called (once) upon reception of data with a message type which was
678 * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy
679 * causes te tunnel to be ignored and no further notifications are sent about
680 * the same tunnel.
681 *
682 * @param cls closure
683 * @param tunnel new handle to the tunnel
684 * @param initiator peer that started the tunnel
685 * @param port Port this tunnel is for.
686 * @return initial tunnel context for the tunnel
687 * (can be NULL -- that's not an error)
688 */
689static void *
690tunnel_new_cb (void *cls,
691 struct GNUNET_MESH_Tunnel *tunnel,
692 const struct GNUNET_PeerIdentity *initiator,
693 uint32_t port)
694{
695 struct Incoming *incoming;
696 struct TunnelContext *tc;
697
698 GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET);
699 tc = GNUNET_new (struct TunnelContext);
700 incoming = GNUNET_new (struct Incoming);
701 incoming->tc = tc;
702 tc->peer = *initiator;
703 tc->tunnel = tunnel;
704 tc->mq = GNUNET_MESH_mq_create (tunnel);
705 tc->data = incoming;
706 tc->type = CONTEXT_INCOMING;
707 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming);
708 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
709
710 return tc;
711}
712
713
714/**
715 * Function called whenever a tunnel is destroyed. Should clean up
716 * any associated state. This function is NOT called if the client has
717 * explicitly asked for the tunnel to be destroyed using
718 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
719 * the tunnel.
720 *
721 * @param cls closure (set from GNUNET_MESH_connect)
722 * @param tunnel connection to the other end (henceforth invalid)
723 * @param tunnel_ctx place where local state associated
724 * with the tunnel is stored
725 */
726static void
727tunnel_end_cb (void *cls,
728 const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
729{
730 struct TunnelContext *ctx = tunnel_ctx;
731
732 switch (ctx->type)
733 {
734 case CONTEXT_INCOMING:
735 incoming_destroy ((struct Incoming *) ctx->data);
736 break;
737 case CONTEXT_OPERATION_UNION:
738 _GSS_union_operation_destroy ((struct UnionEvaluateOperation *) ctx->data);
739 break;
740 case CONTEXT_OPERATION_INTERSECTION:
741 GNUNET_assert (0);
742 /* FIXME: cfuchs */
743 break;
744 default:
745 GNUNET_assert (0);
746 }
747
748}
749
750
666/** 751/**
667 * Function called by the service's run 752 * Function called by the service's run
668 * method to run service-specific setup code. 753 * method to run service-specific setup code.
@@ -686,16 +771,40 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
686 {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, 771 {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
687 {NULL, NULL, 0, 0} 772 {NULL, NULL, 0, 0}
688 }; 773 };
774 static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
775 {handle_p2p_operation_request,
776 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
777 /* messages for the union operation */
778 {_GSS_union_handle_p2p_message,
779 GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0},
780 {_GSS_union_handle_p2p_message,
781 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
782 {_GSS_union_handle_p2p_message,
783 GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
784 {_GSS_union_handle_p2p_message,
785 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
786 {_GSS_union_handle_p2p_message,
787 GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0},
788 /* FIXME: messages for intersection operation */
789 {NULL, 0, 0}
790 };
791 static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
689 792
690 configuration = cfg; 793 configuration = cfg;
691 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); 794 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
795 &shutdown_task, NULL);
692 GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); 796 GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
693 GNUNET_SERVER_add_handlers (server, server_handlers); 797 GNUNET_SERVER_add_handlers (server, server_handlers);
694 stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET,
695 &stream_listen_cb, NULL,
696 GNUNET_STREAM_OPTION_END);
697 798
698 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set service running\n"); 799 mesh = GNUNET_MESH_connect (cfg, NULL, tunnel_new_cb, tunnel_end_cb,
800 mesh_handlers, mesh_ports);
801 if (NULL == mesh)
802 {
803 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not connect to mesh\n");
804 return;
805 }
806
807 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "service started\n");
699} 808}
700 809
701 810
@@ -710,7 +819,8 @@ int
710main (int argc, char *const *argv) 819main (int argc, char *const *argv)
711{ 820{
712 int ret; 821 int ret;
713 ret = GNUNET_SERVICE_run (argc, argv, "set", GNUNET_SERVICE_OPTION_NONE, &run, NULL); 822 ret = GNUNET_SERVICE_run (argc, argv, "set",
823 GNUNET_SERVICE_OPTION_NONE, &run, NULL);
714 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n"); 824 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
715 return (GNUNET_OK == ret) ? 0 : 1; 825 return (GNUNET_OK == ret) ? 0 : 1;
716} 826}
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index 15199eba4..66bff4ff1 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -33,7 +33,7 @@
33#include "gnunet_applications.h" 33#include "gnunet_applications.h"
34#include "gnunet_util_lib.h" 34#include "gnunet_util_lib.h"
35#include "gnunet_core_service.h" 35#include "gnunet_core_service.h"
36#include "gnunet_stream_lib.h" 36#include "gnunet_mesh2_service.h"
37#include "gnunet_set_service.h" 37#include "gnunet_set_service.h"
38#include "set.h" 38#include "set.h"
39 39
@@ -47,6 +47,8 @@ struct IntersectionState;
47 */ 47 */
48struct UnionState; 48struct UnionState;
49 49
50struct UnionEvaluateOperation;
51
50 52
51/** 53/**
52 * A set that supports a specific operation 54 * A set that supports a specific operation
@@ -63,7 +65,7 @@ struct Set
63 /** 65 /**
64 * Message queue for the client 66 * Message queue for the client
65 */ 67 */
66 struct GNUNET_MQ_MessageQueue *client_mq; 68 struct GNUNET_MQ_Handle *client_mq;
67 69
68 /** 70 /**
69 * Type of operation supported for this set 71 * Type of operation supported for this set
@@ -116,7 +118,7 @@ struct Listener
116 /** 118 /**
117 * Message queue for the client 119 * Message queue for the client
118 */ 120 */
119 struct GNUNET_MQ_MessageQueue *client_mq; 121 struct GNUNET_MQ_Handle *client_mq;
120 122
121 /** 123 /**
122 * Type of operation supported for this set 124 * Type of operation supported for this set
@@ -148,19 +150,17 @@ struct Incoming
148 struct Incoming *prev; 150 struct Incoming *prev;
149 151
150 /** 152 /**
151 * Identity of the peer that connected to us 153 * Tunnel context, stores information about
154 * the tunnel and its peer.
152 */ 155 */
153 struct GNUNET_PeerIdentity peer; 156 struct TunnelContext *tc;
154 157
155 /** 158 /**
156 * Socket connected to the peer 159 * GNUNET_YES if the incoming peer has sent
160 * an operation request (and we are waiting
161 * for the client to ack/nack), GNUNET_NO otherwise.
157 */ 162 */
158 struct GNUNET_STREAM_Socket *socket; 163 int received_request;
159
160 /**
161 * Message queue for the peer
162 */
163 struct GNUNET_MQ_MessageQueue *mq;
164 164
165 /** 165 /**
166 * App code, set once the peer has 166 * App code, set once the peer has
@@ -187,18 +187,37 @@ struct Incoming
187 187
188 /** 188 /**
189 * Unique request id for the request from 189 * Unique request id for the request from
190 * a remote peer, sent to the client with will 190 * a remote peer, sent to the client, which will
191 * accept or reject the request. 191 * accept or reject the request.
192 */ 192 */
193 uint32_t accept_id; 193 uint32_t accept_id;
194}; 194};
195 195
196 196
197enum TunnelContextType {
198 CONTEXT_INCOMING,
199 CONTEXT_OPERATION_UNION,
200 CONTEXT_OPERATION_INTERSECTION,
201};
202
203struct TunnelContext
204{
205 struct GNUNET_MESH_Tunnel *tunnel;
206 struct GNUNET_PeerIdentity peer;
207 struct GNUNET_MQ_Handle *mq;
208 enum TunnelContextType type;
209 void *data;
210};
211
212
213
197/** 214/**
198 * Configuration of the local peer 215 * Configuration of the local peer
199 */ 216 */
200extern const struct GNUNET_CONFIGURATION_Handle *configuration; 217extern const struct GNUNET_CONFIGURATION_Handle *configuration;
201 218
219extern struct GNUNET_MESH_Handle *mesh;
220
202 221
203/** 222/**
204 * Create a new set supporting the union operation 223 * Create a new set supporting the union operation
@@ -262,4 +281,32 @@ _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
262 struct Incoming *incoming); 281 struct Incoming *incoming);
263 282
264 283
284/**
285 * Destroy a union operation, and free all resources
286 * associated with it.
287 *
288 * @param eo the union operation to destroy
289 */
290void
291_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo);
292
293
294/**
295 * Dispatch messages for a union operation.
296 *
297 * @param cls closure
298 * @param tunnel mesh tunnel
299 * @param tunnel_ctx tunnel context
300 * @param sender ???
301 * @param mh message to process
302 * @return ???
303 */
304int
305_GSS_union_handle_p2p_message (void *cls,
306 struct GNUNET_MESH_Tunnel *tunnel,
307 void **tunnel_ctx,
308 const struct GNUNET_PeerIdentity *sender,
309 const struct GNUNET_MessageHeader *mh);
310
311
265#endif 312#endif
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 6d9658ee5..2b7a0ccba 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -124,15 +124,10 @@ struct UnionEvaluateOperation
124 struct GNUNET_MessageHeader *context_msg; 124 struct GNUNET_MessageHeader *context_msg;
125 125
126 /** 126 /**
127 * Stream socket connected to the other peer 127 * Tunnel context for the peer we
128 * evaluate the union operation with.
128 */ 129 */
129 struct GNUNET_STREAM_Socket *socket; 130 struct TunnelContext *tc;
130
131 /**
132 * Message queue for the peer on the other
133 * end
134 */
135 struct GNUNET_MQ_MessageQueue *mq;
136 131
137 /** 132 /**
138 * Request ID to multiplex set operations to 133 * Request ID to multiplex set operations to
@@ -397,22 +392,19 @@ destroy_key_to_element_iter (void *cls,
397 * 392 *
398 * @param eo the union operation to destroy 393 * @param eo the union operation to destroy
399 */ 394 */
400static void 395void
401destroy_union_operation (struct UnionEvaluateOperation *eo) 396_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
402{ 397{
403 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); 398 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
404 399
405 if (NULL != eo->mq) 400 if (NULL != eo->tc)
406 { 401 {
407 GNUNET_MQ_destroy (eo->mq); 402 GNUNET_MQ_destroy (eo->tc->mq);
408 eo->mq = NULL; 403 GNUNET_MESH_tunnel_destroy (eo->tc->tunnel);
404 GNUNET_free (eo->tc);
405 eo->tc = NULL;
409 } 406 }
410 407
411 if (NULL != eo->socket)
412 {
413 GNUNET_STREAM_close (eo->socket);
414 eo->socket = NULL;
415 }
416 if (NULL != eo->remote_ibf) 408 if (NULL != eo->remote_ibf)
417 { 409 {
418 ibf_destroy (eo->remote_ibf); 410 ibf_destroy (eo->remote_ibf);
@@ -457,14 +449,14 @@ destroy_union_operation (struct UnionEvaluateOperation *eo)
457static void 449static void
458fail_union_operation (struct UnionEvaluateOperation *eo) 450fail_union_operation (struct UnionEvaluateOperation *eo)
459{ 451{
460 struct GNUNET_MQ_Message *mqm; 452 struct GNUNET_MQ_Envelope *mqm;
461 struct GNUNET_SET_ResultMessage *msg; 453 struct GNUNET_SET_ResultMessage *msg;
462 454
463 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 455 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
464 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 456 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
465 msg->request_id = htonl (eo->request_id); 457 msg->request_id = htonl (eo->request_id);
466 GNUNET_MQ_send (eo->set->client_mq, mqm); 458 GNUNET_MQ_send (eo->set->client_mq, mqm);
467 destroy_union_operation (eo); 459 _GSS_union_operation_destroy (eo);
468} 460}
469 461
470 462
@@ -498,7 +490,7 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
498static void 490static void
499send_operation_request (struct UnionEvaluateOperation *eo) 491send_operation_request (struct UnionEvaluateOperation *eo)
500{ 492{
501 struct GNUNET_MQ_Message *mqm; 493 struct GNUNET_MQ_Envelope *mqm;
502 struct OperationRequestMessage *msg; 494 struct OperationRequestMessage *msg;
503 495
504 mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg); 496 mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg);
@@ -512,7 +504,7 @@ send_operation_request (struct UnionEvaluateOperation *eo)
512 } 504 }
513 msg->operation = htons (GNUNET_SET_OPERATION_UNION); 505 msg->operation = htons (GNUNET_SET_OPERATION_UNION);
514 msg->app_id = eo->app_id; 506 msg->app_id = eo->app_id;
515 GNUNET_MQ_send (eo->mq, mqm); 507 GNUNET_MQ_send (eo->tc->mq, mqm);
516 508
517 if (NULL != eo->context_msg) 509 if (NULL != eo->context_msg)
518 { 510 {
@@ -562,7 +554,7 @@ insert_element_iterator (void *cls,
562 * Insert an element into the union operation's 554 * Insert an element into the union operation's
563 * key-to-element mapping 555 * key-to-element mapping
564 * 556 *
565 * @param the union operation 557 * @param eo the union operation
566 * @param ee the element entry 558 * @param ee the element entry
567 */ 559 */
568static void 560static void
@@ -685,7 +677,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
685 while (buckets_sent < (1 << ibf_order)) 677 while (buckets_sent < (1 << ibf_order))
686 { 678 {
687 unsigned int buckets_in_message; 679 unsigned int buckets_in_message;
688 struct GNUNET_MQ_Message *mqm; 680 struct GNUNET_MQ_Envelope *mqm;
689 struct IBFMessage *msg; 681 struct IBFMessage *msg;
690 682
691 buckets_in_message = (1 << ibf_order) - buckets_sent; 683 buckets_in_message = (1 << ibf_order) - buckets_sent;
@@ -700,7 +692,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
700 ibf_write_slice (ibf, buckets_sent, 692 ibf_write_slice (ibf, buckets_sent,
701 buckets_in_message, &msg[1]); 693 buckets_in_message, &msg[1]);
702 buckets_sent += buckets_in_message; 694 buckets_sent += buckets_in_message;
703 GNUNET_MQ_send (eo->mq, mqm); 695 GNUNET_MQ_send (eo->tc->mq, mqm);
704 } 696 }
705 697
706 eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; 698 eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
@@ -715,14 +707,14 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
715static void 707static void
716send_strata_estimator (struct UnionEvaluateOperation *eo) 708send_strata_estimator (struct UnionEvaluateOperation *eo)
717{ 709{
718 struct GNUNET_MQ_Message *mqm; 710 struct GNUNET_MQ_Envelope *mqm;
719 struct GNUNET_MessageHeader *strata_msg; 711 struct GNUNET_MessageHeader *strata_msg;
720 712
721 mqm = GNUNET_MQ_msg_header_extra (strata_msg, 713 mqm = GNUNET_MQ_msg_header_extra (strata_msg,
722 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, 714 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
723 GNUNET_MESSAGE_TYPE_SET_P2P_SE); 715 GNUNET_MESSAGE_TYPE_SET_P2P_SE);
724 strata_estimator_write (eo->set->state.u->se, &strata_msg[1]); 716 strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
725 GNUNET_MQ_send (eo->mq, mqm); 717 GNUNET_MQ_send (eo->tc->mq, mqm);
726 eo->phase = PHASE_EXPECT_IBF; 718 eo->phase = PHASE_EXPECT_IBF;
727} 719}
728 720
@@ -751,7 +743,7 @@ get_order_from_difference (unsigned int diff)
751/** 743/**
752 * Handle a strata estimator from a remote peer 744 * Handle a strata estimator from a remote peer
753 * 745 *
754 * @param the union operation 746 * @param cls the union operation
755 * @param mh the message 747 * @param mh the message
756 */ 748 */
757static void 749static void
@@ -804,7 +796,7 @@ send_element_iterator (void *cls,
804 while (NULL != ke) 796 while (NULL != ke)
805 { 797 {
806 const struct GNUNET_SET_Element *const element = &ke->element->element; 798 const struct GNUNET_SET_Element *const element = &ke->element->element;
807 struct GNUNET_MQ_Message *mqm; 799 struct GNUNET_MQ_Envelope *mqm;
808 struct GNUNET_MessageHeader *mh; 800 struct GNUNET_MessageHeader *mh;
809 801
810 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); 802 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
@@ -817,7 +809,7 @@ send_element_iterator (void *cls,
817 } 809 }
818 memcpy (&mh[1], element->data, element->size); 810 memcpy (&mh[1], element->data, element->size);
819 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); 811 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
820 GNUNET_MQ_send (eo->mq, mqm); 812 GNUNET_MQ_send (eo->tc->mq, mqm);
821 ke = ke->next_colliding; 813 ke = ke->next_colliding;
822 } 814 }
823 return GNUNET_NO; 815 return GNUNET_NO;
@@ -889,11 +881,11 @@ decode_and_send (struct UnionEvaluateOperation *eo)
889 } 881 }
890 if (GNUNET_NO == res) 882 if (GNUNET_NO == res)
891 { 883 {
892 struct GNUNET_MQ_Message *mqm; 884 struct GNUNET_MQ_Envelope *mqm;
893 885
894 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); 886 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
895 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 887 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
896 GNUNET_MQ_send (eo->mq, mqm); 888 GNUNET_MQ_send (eo->tc->mq, mqm);
897 break; 889 break;
898 } 890 }
899 if (1 == side) 891 if (1 == side)
@@ -902,7 +894,7 @@ decode_and_send (struct UnionEvaluateOperation *eo)
902 } 894 }
903 else 895 else
904 { 896 {
905 struct GNUNET_MQ_Message *mqm; 897 struct GNUNET_MQ_Envelope *mqm;
906 struct GNUNET_MessageHeader *msg; 898 struct GNUNET_MessageHeader *msg;
907 899
908 /* FIXME: before sending the request, check if we may just have the element */ 900 /* FIXME: before sending the request, check if we may just have the element */
@@ -910,7 +902,7 @@ decode_and_send (struct UnionEvaluateOperation *eo)
910 mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), 902 mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
911 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); 903 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
912 *(struct IBF_Key *) &msg[1] = key; 904 *(struct IBF_Key *) &msg[1] = key;
913 GNUNET_MQ_send (eo->mq, mqm); 905 GNUNET_MQ_send (eo->tc->mq, mqm);
914 } 906 }
915 } 907 }
916 ibf_destroy (diff_ibf); 908 ibf_destroy (diff_ibf);
@@ -987,7 +979,7 @@ static void
987send_client_element (struct UnionEvaluateOperation *eo, 979send_client_element (struct UnionEvaluateOperation *eo,
988 struct GNUNET_SET_Element *element) 980 struct GNUNET_SET_Element *element)
989{ 981{
990 struct GNUNET_MQ_Message *mqm; 982 struct GNUNET_MQ_Envelope *mqm;
991 struct GNUNET_SET_ResultMessage *rm; 983 struct GNUNET_SET_ResultMessage *rm;
992 984
993 GNUNET_assert (0 != eo->request_id); 985 GNUNET_assert (0 != eo->request_id);
@@ -1006,39 +998,17 @@ send_client_element (struct UnionEvaluateOperation *eo,
1006 998
1007 999
1008/** 1000/**
1009 * Completion callback for shutdown
1010 *
1011 * @param cls the closure from GNUNET_STREAM_shutdown call
1012 * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
1013 * SHUT_RDWR)
1014 */
1015/*
1016static void
1017stream_shutdown_cb (void *cls,
1018 int operation)
1019{
1020 //struct UnionEvaluateOperation *eo = cls;
1021
1022 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n");
1023
1024 // destroy_union_operation (eo);
1025}
1026*/
1027
1028
1029/**
1030 * Send a result message to the client indicating 1001 * Send a result message to the client indicating
1031 * that the operation is over. 1002 * that the operation is over.
1032 * After the result done message has been sent to the client, 1003 * After the result done message has been sent to the client,
1033 * destroy the evaluate operation. 1004 * destroy the evaluate operation.
1034 * 1005 *
1035 * @param eo union operation 1006 * @param eo union operation
1036 * @param element element to send
1037 */ 1007 */
1038static void 1008static void
1039send_client_done_and_destroy (struct UnionEvaluateOperation *eo) 1009send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1040{ 1010{
1041 struct GNUNET_MQ_Message *mqm; 1011 struct GNUNET_MQ_Envelope *mqm;
1042 struct GNUNET_SET_ResultMessage *rm; 1012 struct GNUNET_SET_ResultMessage *rm;
1043 1013
1044 GNUNET_assert (0 != eo->request_id); 1014 GNUNET_assert (0 != eo->request_id);
@@ -1047,7 +1017,6 @@ send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1047 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 1017 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1048 GNUNET_MQ_send (eo->set->client_mq, mqm); 1018 GNUNET_MQ_send (eo->set->client_mq, mqm);
1049 1019
1050 // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo);
1051} 1020}
1052 1021
1053 1022
@@ -1153,13 +1122,13 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1153 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) 1122 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1154 { 1123 {
1155 /* we got all requests, but still have to send our elements as response */ 1124 /* we got all requests, but still have to send our elements as response */
1156 struct GNUNET_MQ_Message *mqm; 1125 struct GNUNET_MQ_Envelope *mqm;
1157 1126
1158 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); 1127 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
1159 eo->phase = PHASE_FINISHED; 1128 eo->phase = PHASE_FINISHED;
1160 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 1129 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1161 GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo); 1130 GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
1162 GNUNET_MQ_send (eo->mq, mqm); 1131 GNUNET_MQ_send (eo->tc->mq, mqm);
1163 return; 1132 return;
1164 } 1133 }
1165 if (eo->phase == PHASE_EXPECT_ELEMENTS) 1134 if (eo->phase == PHASE_EXPECT_ELEMENTS)
@@ -1175,48 +1144,11 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1175 1144
1176 1145
1177/** 1146/**
1178 * The handlers array, used for both evaluate and accept
1179 */
1180static const struct GNUNET_MQ_Handler union_handlers[] = {
1181 {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
1182 {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
1183 {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
1184 {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
1185 {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
1186 GNUNET_MQ_HANDLERS_END
1187};
1188
1189
1190/**
1191 * Functions of this type will be called when a stream is established
1192 *
1193 * @param cls the closure from GNUNET_STREAM_open
1194 * @param socket socket to use to communicate with the
1195 * other side (read/write)
1196 */
1197static void
1198stream_open_cb (void *cls,
1199 struct GNUNET_STREAM_Socket *socket)
1200{
1201 struct UnionEvaluateOperation *eo = cls;
1202
1203 GNUNET_assert (NULL == eo->mq);
1204 GNUNET_assert (socket == eo->socket);
1205 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1206 "open cb successful\n");
1207 eo->mq = GNUNET_STREAM_mq_create (eo->socket, union_handlers, NULL, eo);
1208 /* we started the operation, thus we have to send the operation request */
1209 send_operation_request (eo);
1210 eo->phase = PHASE_EXPECT_SE;
1211}
1212
1213
1214/**
1215 * Evaluate a union operation with 1147 * Evaluate a union operation with
1216 * a remote peer. 1148 * a remote peer.
1217 * 1149 *
1218 * @param m the evaluate request message from the client 1150 * @param m the evaluate request message from the client
1219 * @parem set the set to evaluate the operation with 1151 * @param set the set to evaluate the operation with
1220 */ 1152 */
1221void 1153void
1222_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) 1154_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
@@ -1243,14 +1175,20 @@ _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
1243 "evaluating union operation, (app %s)\n", 1175 "evaluating union operation, (app %s)\n",
1244 GNUNET_h2s (&eo->app_id)); 1176 GNUNET_h2s (&eo->app_id));
1245 1177
1246 eo->socket = 1178 eo->tc = GNUNET_new (struct TunnelContext);
1247 GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET, 1179 eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer,
1248 &stream_open_cb, eo, 1180 GNUNET_APPLICATION_TYPE_SET);
1249 GNUNET_STREAM_OPTION_END); 1181 GNUNET_assert (NULL != eo->tc->tunnel);
1182 eo->tc->peer = eo->peer;
1183 eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel);
1184 /* we started the operation, thus we have to send the operation request */
1185 eo->phase = PHASE_EXPECT_SE;
1186
1250 GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, 1187 GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1251 eo->set->state.u->ops_tail, 1188 eo->set->state.u->ops_tail,
1252 eo); 1189 eo);
1253 /* the stream open callback will kick off the operation */ 1190
1191 send_operation_request (eo);
1254} 1192}
1255 1193
1256 1194
@@ -1270,25 +1208,17 @@ _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
1270 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); 1208 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
1271 1209
1272 eo = GNUNET_new (struct UnionEvaluateOperation); 1210 eo = GNUNET_new (struct UnionEvaluateOperation);
1211 eo->tc = incoming->tc;
1273 eo->generation_created = set->state.u->current_generation++; 1212 eo->generation_created = set->state.u->current_generation++;
1274 eo->set = set; 1213 eo->set = set;
1275 eo->peer = incoming->peer;
1276 eo->salt = ntohs (incoming->salt); 1214 eo->salt = ntohs (incoming->salt);
1277 GNUNET_assert (0 != ntohl (m->request_id)); 1215 GNUNET_assert (0 != ntohl (m->request_id));
1278 eo->request_id = ntohl (m->request_id); 1216 eo->request_id = ntohl (m->request_id);
1279 eo->se = strata_estimator_dup (set->state.u->se); 1217 eo->se = strata_estimator_dup (set->state.u->se);
1280 eo->mq = incoming->mq;
1281 /* transfer ownership of mq and socket from incoming to eo */ 1218 /* transfer ownership of mq and socket from incoming to eo */
1282 incoming->mq = NULL;
1283 eo->socket = incoming->socket;
1284 incoming->socket = NULL;
1285 /* the peer's socket is now ours, we'll receive all messages */
1286 GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
1287
1288 GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, 1219 GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
1289 eo->set->state.u->ops_tail, 1220 eo->set->state.u->ops_tail,
1290 eo); 1221 eo);
1291
1292 /* kick off the operation */ 1222 /* kick off the operation */
1293 send_strata_estimator (eo); 1223 send_strata_estimator (eo);
1294} 1224}
@@ -1384,7 +1314,7 @@ _GSS_union_set_destroy (struct Set *set)
1384 1314
1385 while (NULL != set->state.u->ops_head) 1315 while (NULL != set->state.u->ops_head)
1386 { 1316 {
1387 destroy_union_operation (set->state.u->ops_head); 1317 _GSS_union_operation_destroy (set->state.u->ops_head);
1388 } 1318 }
1389} 1319}
1390 1320
@@ -1418,3 +1348,57 @@ _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1418 ee->generation_removed = set->state.u->current_generation; 1348 ee->generation_removed = set->state.u->current_generation;
1419} 1349}
1420 1350
1351
1352/**
1353 * Dispatch messages for a union operation.
1354 *
1355 * @param cls closure
1356 * @param tunnel mesh tunnel
1357 * @param tunnel_ctx tunnel context
1358 * @param sender ???
1359 * @param mh message to process
1360 * @return ???
1361 */
1362int
1363_GSS_union_handle_p2p_message (void *cls,
1364 struct GNUNET_MESH_Tunnel *tunnel,
1365 void **tunnel_ctx,
1366 const struct GNUNET_PeerIdentity *sender,
1367 const struct GNUNET_MessageHeader *mh)
1368{
1369 struct TunnelContext *tc = *tunnel_ctx;
1370 struct UnionEvaluateOperation *eo;
1371
1372 if (CONTEXT_OPERATION_UNION != tc->type)
1373 {
1374 /* FIXME: kill the tunnel */
1375 /* never kill mesh */
1376 return GNUNET_OK;
1377 }
1378
1379 eo = tc->data;
1380
1381 switch (ntohs (mh->type))
1382 {
1383 case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1384 handle_p2p_ibf (eo, mh);
1385 break;
1386 case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1387 handle_p2p_strata_estimator (eo, mh);
1388 break;
1389 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1390 handle_p2p_elements (eo, mh);
1391 break;
1392 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1393 handle_p2p_element_requests (eo, mh);
1394 break;
1395 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1396 handle_p2p_done (eo, mh);
1397 break;
1398 default:
1399 /* something wrong with mesh's message handlers? */
1400 GNUNET_assert (0);
1401 }
1402 /* never kill mesh! */
1403 return GNUNET_OK;
1404}
diff --git a/src/set/gnunet-set-ibf.c b/src/set/gnunet-set-ibf-profiler.c
index d431795f1..92feb3db4 100644
--- a/src/set/gnunet-set-ibf.c
+++ b/src/set/gnunet-set-ibf-profiler.c
@@ -19,8 +19,8 @@
19*/ 19*/
20 20
21/** 21/**
22 * @file consensus/gnunet-consensus-ibf.c 22 * @file set/gnunet-set-ibf-profiler.c
23 * @brief tool for reconciling data with invertible bloom filters 23 * @brief tool for profiling the invertible bloom filter implementation
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
26 26
@@ -35,7 +35,7 @@
35static unsigned int asize = 10; 35static unsigned int asize = 10;
36static unsigned int bsize = 10; 36static unsigned int bsize = 10;
37static unsigned int csize = 10; 37static unsigned int csize = 10;
38static unsigned int hash_num = 3; 38static unsigned int hash_num = 4;
39static unsigned int ibf_size = 80; 39static unsigned int ibf_size = 80;
40 40
41/* FIXME: add parameter for this */ 41/* FIXME: add parameter for this */
@@ -181,12 +181,14 @@ run (void *cls, char *const *args, const char *cfgfile,
181 181
182 start_time = GNUNET_TIME_absolute_get (); 182 start_time = GNUNET_TIME_absolute_get ();
183 183
184 for (;;) 184 for (i = 0; i <= asize + bsize; i++)
185 { 185 {
186 res = ibf_decode (ibf_a, &side, &ibf_key); 186 res = ibf_decode (ibf_a, &side, &ibf_key);
187 if (GNUNET_SYSERR == res) 187 if (GNUNET_SYSERR == res)
188 { 188 {
189 printf ("decode failed\n"); 189 printf ("decode failed, %u/%u elements left\n",
190 GNUNET_CONTAINER_multihashmap_size (set_a) + GNUNET_CONTAINER_multihashmap_size (set_b),
191 asize + bsize);
190 return; 192 return;
191 } 193 }
192 if (GNUNET_NO == res) 194 if (GNUNET_NO == res)
@@ -198,7 +200,9 @@ run (void *cls, char *const *args, const char *cfgfile,
198 printf ("decoded successfully in: %s\n", GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO)); 200 printf ("decoded successfully in: %s\n", GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
199 } 201 }
200 else 202 else
201 printf ("decode missed elements\n"); 203 {
204 printf ("decode missed elements (should never happen)\n");
205 }
202 return; 206 return;
203 } 207 }
204 208
@@ -207,6 +211,9 @@ run (void *cls, char *const *args, const char *cfgfile,
207 if (side == -1) 211 if (side == -1)
208 iter_hashcodes (ibf_key, remove_iterator, set_b); 212 iter_hashcodes (ibf_key, remove_iterator, set_b);
209 } 213 }
214 printf("cyclic IBF, %u/%u elements left\n",
215 GNUNET_CONTAINER_multihashmap_size (set_a) + GNUNET_CONTAINER_multihashmap_size (set_b),
216 asize + bsize);
210} 217}
211 218
212int 219int
diff --git a/src/set/gnunet-set-profiler.c b/src/set/gnunet-set-profiler.c
new file mode 100644
index 000000000..bbaef7c43
--- /dev/null
+++ b/src/set/gnunet-set-profiler.c
@@ -0,0 +1,320 @@
1/*
2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19 */
20
21/**
22 * @file set/gnunet-set-profiler.c
23 * @brief profiling tool for set
24 * @author Florian Dold
25 */
26#include "platform.h"
27#include "gnunet_common.h"
28#include "gnunet_util_lib.h"
29#include "gnunet_set_service.h"
30#include "gnunet_testbed_service.h"
31
32
33static int ret;
34
35static unsigned int num_a = 5;
36static unsigned int num_b = 5;
37static unsigned int num_c = 20;
38
39static unsigned int salt = 42;
40
41static char* op_str = "union";
42
43const static struct GNUNET_CONFIGURATION_Handle *config;
44
45struct GNUNET_CONTAINER_MultiHashMap *map_a;
46struct GNUNET_CONTAINER_MultiHashMap *map_b;
47struct GNUNET_CONTAINER_MultiHashMap *map_c;
48
49
50/**
51 * Elements that set a received, should match map_c
52 * in the end.
53 */
54struct GNUNET_CONTAINER_MultiHashMap *map_a_received;
55
56/**
57 * Elements that set b received, should match map_c
58 * in the end.
59 */
60struct GNUNET_CONTAINER_MultiHashMap *map_b_received;
61
62struct GNUNET_SET_Handle *set_a;
63struct GNUNET_SET_Handle *set_b;
64
65struct GNUNET_HashCode app_id;
66
67struct GNUNET_PeerIdentity local_peer;
68
69struct GNUNET_SET_ListenHandle *set_listener;
70
71struct GNUNET_SET_OperationHandle *set_oh1;
72struct GNUNET_SET_OperationHandle *set_oh2;
73
74
75int a_done;
76int b_done;
77
78
79
80static int
81map_remove_iterator (void *cls,
82 const struct GNUNET_HashCode *key,
83 void *value)
84{
85 struct GNUNET_CONTAINER_MultiHashMap *m = cls;
86 int ret;
87
88 ret = GNUNET_CONTAINER_multihashmap_remove (m, key, NULL);
89 GNUNET_assert (GNUNET_OK == ret);
90 return GNUNET_YES;
91
92}
93
94
95static void
96set_result_cb_1 (void *cls,
97 const struct GNUNET_SET_Element *element,
98 enum GNUNET_SET_Status status)
99{
100 GNUNET_assert (GNUNET_NO == a_done);
101 GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode));
102 switch (status)
103 {
104 case GNUNET_SET_STATUS_DONE:
105 case GNUNET_SET_STATUS_HALF_DONE:
106 a_done = GNUNET_YES;
107 GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, map_a_received);
108 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_a_received));
109 return;
110 case GNUNET_SET_STATUS_FAILURE:
111 GNUNET_assert (0);
112 return;
113 case GNUNET_SET_STATUS_OK:
114 break;
115 default:
116 GNUNET_assert (0);
117 }
118 GNUNET_CONTAINER_multihashmap_put (map_a_received,
119 element->data, NULL,
120 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
121}
122
123
124static void
125set_result_cb_2 (void *cls,
126 const struct GNUNET_SET_Element *element,
127 enum GNUNET_SET_Status status)
128{
129 GNUNET_assert (GNUNET_NO == b_done);
130 GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode));
131 switch (status)
132 {
133 case GNUNET_SET_STATUS_DONE:
134 case GNUNET_SET_STATUS_HALF_DONE:
135 b_done = GNUNET_YES;
136 GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, map_b_received);
137 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_b_received));
138 return;
139 case GNUNET_SET_STATUS_FAILURE:
140 GNUNET_assert (0);
141 return;
142 case GNUNET_SET_STATUS_OK:
143 break;
144 default:
145 GNUNET_assert (0);
146 }
147 GNUNET_CONTAINER_multihashmap_put (map_b_received,
148 element->data, NULL,
149 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
150}
151
152
153static void
154set_listen_cb (void *cls,
155 const struct GNUNET_PeerIdentity *other_peer,
156 const struct GNUNET_MessageHeader *context_msg,
157 struct GNUNET_SET_Request *request)
158{
159 GNUNET_assert (NULL == set_oh2);
160 set_oh2 = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
161 set_result_cb_2, NULL);
162 GNUNET_SET_commit (set_oh2, set_b);
163}
164
165
166
167static int
168set_insert_iterator (void *cls,
169 const struct GNUNET_HashCode *key,
170 void *value)
171{
172 struct GNUNET_SET_Handle *set = cls;
173 struct GNUNET_SET_Element *el;
174
175 el = GNUNET_malloc (sizeof *el + sizeof *key);
176 el->type = 0;
177 memcpy (&el[1], key, sizeof *key);
178 el->data = &el[1];
179 el->size = sizeof *key;
180 GNUNET_SET_add_element (set, el, NULL, NULL);
181 GNUNET_free (el);
182 return GNUNET_YES;
183}
184
185
186/**
187 * Signature of the 'main' function for a (single-peer) testcase that
188 * is run using 'GNUNET_TESTING_peer_run'.
189 *
190 * @param cls closure
191 * @param cfg configuration of the peer that was started
192 * @param peer identity of the peer that was created
193 */
194static void
195test_main (void *cls,
196 const struct GNUNET_CONFIGURATION_Handle *cfg,
197 struct GNUNET_TESTING_Peer *peer)
198{
199 unsigned int i;
200 struct GNUNET_HashCode hash;
201
202 config = cfg;
203
204 if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &local_peer))
205 {
206 GNUNET_assert (0);
207 return;
208 }
209
210 map_a = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO);
211 map_b = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO);
212 map_c = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO);
213
214 for (i = 0; i < num_a; i++)
215 {
216 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
217 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
218 {
219 i--;
220 continue;
221 }
222 GNUNET_CONTAINER_multihashmap_put (map_a, &hash, &hash,
223 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
224 }
225
226 for (i = 0; i < num_b; i++)
227 {
228 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
229 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
230 {
231 i--;
232 continue;
233 }
234 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash))
235 {
236 i--;
237 continue;
238 }
239 GNUNET_CONTAINER_multihashmap_put (map_b, &hash, NULL,
240 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
241 }
242
243 for (i = 0; i < num_c; i++)
244 {
245 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
246 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
247 {
248 i--;
249 continue;
250 }
251 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash))
252 {
253 i--;
254 continue;
255 }
256 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_c, &hash))
257 {
258 i--;
259 continue;
260 }
261 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
262 GNUNET_CONTAINER_multihashmap_put (map_c, &hash, NULL,
263 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
264 }
265
266 /* use last hash for app id */
267 app_id = hash;
268
269 /* FIXME: also implement intersection etc. */
270 set_a = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
271 set_b = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
272
273 GNUNET_CONTAINER_multihashmap_iterate (map_a, set_insert_iterator, set_a);
274 GNUNET_CONTAINER_multihashmap_iterate (map_b, set_insert_iterator, set_b);
275 GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_a);
276 GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_b);
277
278 set_listener = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
279 &app_id, set_listen_cb, NULL);
280
281 set_oh1 = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt, GNUNET_SET_RESULT_ADDED,
282 set_result_cb_1, NULL);
283 GNUNET_SET_commit (set_oh1, set_a);
284}
285
286static void
287run (void *cls, char *const *args, const char *cfgfile,
288 const struct GNUNET_CONFIGURATION_Handle *cfg)
289{
290
291 ret = GNUNET_TESTING_peer_run ("test_set_api",
292 "test_set.conf",
293 &test_main, NULL);
294}
295
296
297int
298main (int argc, char **argv)
299{
300 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
301 { 'A', "num-first", NULL,
302 gettext_noop ("number of values"),
303 GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_a },
304 { 'B', "num-second", NULL,
305 gettext_noop ("number of values"),
306 GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_b },
307 { 'B', "num-common", NULL,
308 gettext_noop ("number of values"),
309 GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_c },
310 { 'x', "operation", NULL,
311 gettext_noop ("oeration to execute"),
312 GNUNET_YES, &GNUNET_GETOPT_set_string, &op_str },
313 GNUNET_GETOPT_OPTION_END
314 };
315 GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus",
316 "help",
317 options, &run, NULL, GNUNET_YES);
318 return ret;
319}
320
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c
deleted file mode 100644
index ae84610fc..000000000
--- a/src/set/gnunet-set.c
+++ /dev/null
@@ -1,203 +0,0 @@
1/*
2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19 */
20
21/**
22 * @file set/gnunet-set.c
23 * @brief profiling tool for the set service
24 * @author Florian Dold
25 */
26#include "platform.h"
27#include "gnunet_common.h"
28#include "gnunet_util_lib.h"
29#include "gnunet_testbed_service.h"
30#include "gnunet_set_service.h"
31
32
33static struct GNUNET_PeerIdentity local_id;
34
35static struct GNUNET_HashCode app_id;
36static struct GNUNET_SET_Handle *set1;
37static struct GNUNET_SET_Handle *set2;
38static struct GNUNET_SET_ListenHandle *listen_handle;
39const static struct GNUNET_CONFIGURATION_Handle *config;
40
41int num_done;
42
43
44static void
45result_cb_set1 (void *cls, const struct GNUNET_SET_Element *element,
46 enum GNUNET_SET_Status status)
47{
48 switch (status)
49 {
50 case GNUNET_SET_STATUS_OK:
51 printf ("set 1: got element\n");
52 break;
53 case GNUNET_SET_STATUS_FAILURE:
54 printf ("set 1: failure\n");
55 break;
56 case GNUNET_SET_STATUS_DONE:
57 printf ("set 1: done\n");
58 GNUNET_SET_destroy (set1);
59 break;
60 default:
61 GNUNET_assert (0);
62 }
63}
64
65
66static void
67result_cb_set2 (void *cls, const struct GNUNET_SET_Element *element,
68 enum GNUNET_SET_Status status)
69{
70 switch (status)
71 {
72 case GNUNET_SET_STATUS_OK:
73 printf ("set 2: got element\n");
74 break;
75 case GNUNET_SET_STATUS_FAILURE:
76 printf ("set 2: failure\n");
77 break;
78 case GNUNET_SET_STATUS_DONE:
79 printf ("set 2: done\n");
80 GNUNET_SET_destroy (set2);
81 break;
82 default:
83 GNUNET_assert (0);
84 }
85}
86
87
88static void
89listen_cb (void *cls,
90 const struct GNUNET_PeerIdentity *other_peer,
91 const struct GNUNET_MessageHeader *context_msg,
92 struct GNUNET_SET_Request *request)
93{
94 struct GNUNET_SET_OperationHandle *oh;
95 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
96 GNUNET_SET_listen_cancel (listen_handle);
97
98 oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
99 GNUNET_SET_conclude (oh, set2);
100}
101
102
103/**
104 * Start the set operation.
105 *
106 * @param cls closure, unused
107 */
108static void
109start (void *cls)
110{
111 struct GNUNET_SET_OperationHandle *oh;
112
113 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
114 &app_id, listen_cb, NULL);
115 oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
116 GNUNET_SET_RESULT_ADDED,
117 result_cb_set1, NULL);
118 GNUNET_SET_conclude (oh, set1);
119}
120
121
122/**
123 * Initialize the second set, continue
124 *
125 * @param cls closure, unused
126 */
127static void
128init_set2 (void *cls)
129{
130 struct GNUNET_SET_Element element;
131
132
133 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
134
135 element.data = "hello";
136 element.size = strlen(element.data);
137 GNUNET_SET_add_element (set2, &element, NULL, NULL);
138 element.data = "quux";
139 element.size = strlen(element.data);
140 GNUNET_SET_add_element (set2, &element, start, NULL);
141}
142
143
144/**
145 * Initialize the first set, continue.
146 */
147static void
148init_set1 (void)
149{
150 struct GNUNET_SET_Element element;
151
152 element.data = "hello";
153 element.size = strlen(element.data);
154 GNUNET_SET_add_element (set1, &element, NULL, NULL);
155 element.data = "bar";
156 element.size = strlen(element.data);
157 GNUNET_SET_add_element (set1, &element, init_set2, NULL);
158
159 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
160}
161
162
163/**
164 * Main function that will be run.
165 *
166 * @param cls closure
167 * @param args remaining command-line arguments
168 * @param cfgfile name of the configuration file used (for saving, can be NULL!)
169 * @param cfg configuration
170 */
171static void
172run (void *cls, char *const *args,
173 const char *cfgfile,
174 const struct GNUNET_CONFIGURATION_Handle *cfg)
175{
176 static const char* app_str = "gnunet-set";
177
178 config = cfg;
179
180 GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id);
181
182 GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
183
184 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
185 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
186
187 init_set1 ();
188}
189
190
191
192int
193main (int argc, char **argv)
194{
195 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
196 GNUNET_GETOPT_OPTION_END
197 };
198 GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set",
199 "help",
200 options, &run, NULL, GNUNET_NO);
201 return 0;
202}
203
diff --git a/src/set/ibf.c b/src/set/ibf.c
index 383ce3daf..e3c5be59a 100644
--- a/src/set/ibf.c
+++ b/src/set/ibf.c
@@ -19,7 +19,7 @@
19*/ 19*/
20 20
21/** 21/**
22 * @file consensus/ibf.c 22 * @file set/ibf.c
23 * @brief implementation of the invertible bloom filter 23 * @brief implementation of the invertible bloom filter
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
@@ -27,6 +27,12 @@
27#include "ibf.h" 27#include "ibf.h"
28 28
29/** 29/**
30 * Compute the key's hash from the key.
31 * Redefine to use a different hash function.
32 */
33#define IBF_KEY_HASH_VAL(k) (GNUNET_CRYPTO_crc32_n (&(k), sizeof (struct IBF_KeyHash)))
34
35/**
30 * Create a key from a hashcode. 36 * Create a key from a hashcode.
31 * 37 *
32 * @param hash the hashcode 38 * @param hash the hashcode
@@ -89,23 +95,21 @@ static inline void
89ibf_get_indices (const struct InvertibleBloomFilter *ibf, 95ibf_get_indices (const struct InvertibleBloomFilter *ibf,
90 struct IBF_Key key, int *dst) 96 struct IBF_Key key, int *dst)
91{ 97{
92 struct GNUNET_HashCode bucket_indices; 98 uint32_t filled;
93 unsigned int filled; 99 uint32_t i;
94 int i; 100 uint32_t bucket = key.key_val & 0xFFFFFFFF;
95 GNUNET_CRYPTO_hash (&key, sizeof key, &bucket_indices); 101
96 filled = 0; 102 for (i = 0, filled=0; filled < ibf->hash_num; i++)
97 for (i = 0; filled < ibf->hash_num; i++)
98 { 103 {
99 unsigned int bucket;
100 unsigned int j; 104 unsigned int j;
101 if ( (0 != i) && (0 == (i % 16)) ) 105 uint64_t x;
102 GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), &bucket_indices);
103 bucket = bucket_indices.bits[i % 16] % ibf->size;
104 for (j = 0; j < filled; j++) 106 for (j = 0; j < filled; j++)
105 if (dst[j] == bucket) 107 if (dst[j] == bucket)
106 goto try_next; 108 goto try_next;
107 dst[filled++] = bucket; 109 dst[filled++] = bucket % ibf->size;
108 try_next: ; 110 try_next: ;
111 x = ((uint64_t) bucket << 32) | i;
112 bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x);
109 } 113 }
110} 114}
111 115
@@ -116,16 +120,14 @@ ibf_insert_into (struct InvertibleBloomFilter *ibf,
116 const int *buckets, int side) 120 const int *buckets, int side)
117{ 121{
118 int i; 122 int i;
119 struct GNUNET_HashCode key_hash_sha; 123
120 struct IBF_KeyHash key_hash;
121 GNUNET_CRYPTO_hash (&key, sizeof key, &key_hash_sha);
122 key_hash.key_hash_val = key_hash_sha.bits[0];
123 for (i = 0; i < ibf->hash_num; i++) 124 for (i = 0; i < ibf->hash_num; i++)
124 { 125 {
125 const int bucket = buckets[i]; 126 const int bucket = buckets[i];
126 ibf->count[bucket].count_val += side; 127 ibf->count[bucket].count_val += side;
127 ibf->key_sum[bucket].key_val ^= key.key_val; 128 ibf->key_sum[bucket].key_val ^= key.key_val;
128 ibf->key_hash_sum[bucket].key_hash_val ^= key_hash.key_hash_val; 129 ibf->key_hash_sum[bucket].key_hash_val
130 ^= IBF_KEY_HASH_VAL (key);
129 } 131 }
130} 132}
131 133
@@ -183,7 +185,6 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
183{ 185{
184 struct IBF_KeyHash hash; 186 struct IBF_KeyHash hash;
185 int i; 187 int i;
186 struct GNUNET_HashCode key_hash_sha;
187 int buckets[ibf->hash_num]; 188 int buckets[ibf->hash_num];
188 189
189 GNUNET_assert (NULL != ibf); 190 GNUNET_assert (NULL != ibf);
@@ -197,8 +198,7 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
197 if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val)) 198 if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val))
198 continue; 199 continue;
199 200
200 GNUNET_CRYPTO_hash (&ibf->key_sum[i], sizeof (struct IBF_Key), &key_hash_sha); 201 hash.key_hash_val = IBF_KEY_HASH_VAL (ibf->key_sum[i]);
201 hash.key_hash_val = key_hash_sha.bits[0];
202 202
203 /* test if the hash matches the key */ 203 /* test if the hash matches the key */
204 if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val) 204 if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val)
diff --git a/src/set/ibf.h b/src/set/ibf.h
index 2bf3ef7c7..90ea231c0 100644
--- a/src/set/ibf.h
+++ b/src/set/ibf.h
@@ -19,7 +19,7 @@
19*/ 19*/
20 20
21/** 21/**
22 * @file consensus/ibf.h 22 * @file set/ibf.h
23 * @brief invertible bloom filter 23 * @brief invertible bloom filter
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
diff --git a/src/set/set_api.c b/src/set/set_api.c
index c74933aa0..e1b6132cb 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -40,7 +40,7 @@
40struct GNUNET_SET_Handle 40struct GNUNET_SET_Handle
41{ 41{
42 struct GNUNET_CLIENT_Connection *client; 42 struct GNUNET_CLIENT_Connection *client;
43 struct GNUNET_MQ_MessageQueue *mq; 43 struct GNUNET_MQ_Handle *mq;
44 unsigned int messages_since_ack; 44 unsigned int messages_since_ack;
45}; 45};
46 46
@@ -73,7 +73,7 @@ struct GNUNET_SET_OperationHandle
73 * Message sent to the server on calling conclude, 73 * Message sent to the server on calling conclude,
74 * NULL if conclude has been called. 74 * NULL if conclude has been called.
75 */ 75 */
76 struct GNUNET_MQ_Message *conclude_mqm; 76 struct GNUNET_MQ_Envelope *conclude_mqm;
77 77
78 /** 78 /**
79 * Address of the request if in the conclude message, 79 * Address of the request if in the conclude message,
@@ -89,7 +89,7 @@ struct GNUNET_SET_OperationHandle
89struct GNUNET_SET_ListenHandle 89struct GNUNET_SET_ListenHandle
90{ 90{
91 struct GNUNET_CLIENT_Connection *client; 91 struct GNUNET_CLIENT_Connection *client;
92 struct GNUNET_MQ_MessageQueue* mq; 92 struct GNUNET_MQ_Handle* mq;
93 GNUNET_SET_ListenCallback listen_cb; 93 GNUNET_SET_ListenCallback listen_cb;
94 void *listen_cls; 94 void *listen_cls;
95}; 95};
@@ -115,7 +115,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
115 115
116 if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) 116 if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
117 { 117 {
118 struct GNUNET_MQ_Message *mqm; 118 struct GNUNET_MQ_Envelope *mqm;
119 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK); 119 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK);
120 GNUNET_MQ_send (set->mq, mqm); 120 GNUNET_MQ_send (set->mq, mqm);
121 } 121 }
@@ -162,7 +162,7 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
162 162
163 if (GNUNET_NO == req->accepted) 163 if (GNUNET_NO == req->accepted)
164 { 164 {
165 struct GNUNET_MQ_Message *mqm; 165 struct GNUNET_MQ_Envelope *mqm;
166 struct GNUNET_SET_AcceptRejectMessage *amsg; 166 struct GNUNET_SET_AcceptRejectMessage *amsg;
167 167
168 mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT); 168 mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT);
@@ -197,9 +197,9 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
197 enum GNUNET_SET_OperationType op) 197 enum GNUNET_SET_OperationType op)
198{ 198{
199 struct GNUNET_SET_Handle *set; 199 struct GNUNET_SET_Handle *set;
200 struct GNUNET_MQ_Message *mqm; 200 struct GNUNET_MQ_Envelope *mqm;
201 struct GNUNET_SET_CreateMessage *msg; 201 struct GNUNET_SET_CreateMessage *msg;
202 static const struct GNUNET_MQ_Handler mq_handlers[] = { 202 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
203 {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT}, 203 {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT},
204 GNUNET_MQ_HANDLERS_END 204 GNUNET_MQ_HANDLERS_END
205 }; 205 };
@@ -234,7 +234,7 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
234 GNUNET_SET_Continuation cont, 234 GNUNET_SET_Continuation cont,
235 void *cont_cls) 235 void *cont_cls)
236{ 236{
237 struct GNUNET_MQ_Message *mqm; 237 struct GNUNET_MQ_Envelope *mqm;
238 struct GNUNET_SET_ElementMessage *msg; 238 struct GNUNET_SET_ElementMessage *msg;
239 239
240 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD); 240 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD);
@@ -262,7 +262,7 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
262 GNUNET_SET_Continuation cont, 262 GNUNET_SET_Continuation cont,
263 void *cont_cls) 263 void *cont_cls)
264{ 264{
265 struct GNUNET_MQ_Message *mqm; 265 struct GNUNET_MQ_Envelope *mqm;
266 struct GNUNET_SET_ElementMessage *msg; 266 struct GNUNET_SET_ElementMessage *msg;
267 267
268 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE); 268 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE);
@@ -287,9 +287,9 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
287 287
288 288
289/** 289/**
290 * Create a set operation for evaluation with another peer. 290 * Prepare a set operation to be evaluated with another peer.
291 * The evaluation will not start until the client provides 291 * The evaluation will not start until the client provides
292 * a local set with GNUNET_SET_conclude. 292 * a local set with GNUNET_SET_commit.
293 * 293 *
294 * @param other_peer peer with the other set 294 * @param other_peer peer with the other set
295 * @param app_id hash for the application using the set 295 * @param app_id hash for the application using the set
@@ -304,15 +304,15 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
304 * @return a handle to cancel the operation 304 * @return a handle to cancel the operation
305 */ 305 */
306struct GNUNET_SET_OperationHandle * 306struct GNUNET_SET_OperationHandle *
307GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer, 307GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
308 const struct GNUNET_HashCode *app_id, 308 const struct GNUNET_HashCode *app_id,
309 const struct GNUNET_MessageHeader *context_msg, 309 const struct GNUNET_MessageHeader *context_msg,
310 uint16_t salt, 310 uint16_t salt,
311 enum GNUNET_SET_ResultMode result_mode, 311 enum GNUNET_SET_ResultMode result_mode,
312 GNUNET_SET_ResultIterator result_cb, 312 GNUNET_SET_ResultIterator result_cb,
313 void *result_cls) 313 void *result_cls)
314{ 314{
315 struct GNUNET_MQ_Message *mqm; 315 struct GNUNET_MQ_Envelope *mqm;
316 struct GNUNET_SET_OperationHandle *oh; 316 struct GNUNET_SET_OperationHandle *oh;
317 struct GNUNET_SET_EvaluateMessage *msg; 317 struct GNUNET_SET_EvaluateMessage *msg;
318 318
@@ -322,9 +322,6 @@ GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer,
322 322
323 mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, context_msg); 323 mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, context_msg);
324 324
325 if (NULL != context_msg)
326 LOG (GNUNET_ERROR_TYPE_INFO, "passed context msg\n");
327
328 msg->app_id = *app_id; 325 msg->app_id = *app_id;
329 msg->target_peer = *other_peer; 326 msg->target_peer = *other_peer;
330 msg->salt = salt; 327 msg->salt = salt;
@@ -356,9 +353,9 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
356 void *listen_cls) 353 void *listen_cls)
357{ 354{
358 struct GNUNET_SET_ListenHandle *lh; 355 struct GNUNET_SET_ListenHandle *lh;
359 struct GNUNET_MQ_Message *mqm; 356 struct GNUNET_MQ_Envelope *mqm;
360 struct GNUNET_SET_ListenMessage *msg; 357 struct GNUNET_SET_ListenMessage *msg;
361 static const struct GNUNET_MQ_Handler mq_handlers[] = { 358 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
362 {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, 359 {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
363 GNUNET_MQ_HANDLERS_END 360 GNUNET_MQ_HANDLERS_END
364 }; 361 };
@@ -403,7 +400,7 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
403 * @param result_mode specified how results will be returned, 400 * @param result_mode specified how results will be returned,
404 * see 'GNUNET_SET_ResultMode'. 401 * see 'GNUNET_SET_ResultMode'.
405 * @param result_cb callback for the results 402 * @param result_cb callback for the results
406 * @param result_cls closure for result_cb 403 * @param cls closure for result_cb
407 * @return a handle to cancel the operation 404 * @return a handle to cancel the operation
408 */ 405 */
409struct GNUNET_SET_OperationHandle * 406struct GNUNET_SET_OperationHandle *
@@ -412,7 +409,7 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
412 GNUNET_SET_ResultIterator result_cb, 409 GNUNET_SET_ResultIterator result_cb,
413 void *cls) 410 void *cls)
414{ 411{
415 struct GNUNET_MQ_Message *mqm; 412 struct GNUNET_MQ_Envelope *mqm;
416 struct GNUNET_SET_OperationHandle *oh; 413 struct GNUNET_SET_OperationHandle *oh;
417 struct GNUNET_SET_AcceptRejectMessage *msg; 414 struct GNUNET_SET_AcceptRejectMessage *msg;
418 415
@@ -441,7 +438,7 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
441void 438void
442GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) 439GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
443{ 440{
444 struct GNUNET_MQ_Message *mqm; 441 struct GNUNET_MQ_Envelope *mqm;
445 struct GNUNET_SET_OperationHandle *h_assoc; 442 struct GNUNET_SET_OperationHandle *h_assoc;
446 443
447 if (NULL != oh->set) 444 if (NULL != oh->set)
@@ -460,7 +457,7 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
460 457
461 458
462/** 459/**
463 * Conclude the given set operation using the given set. 460 * Commit a set to be used with a set operation.
464 * This function is called once we have fully constructed 461 * This function is called once we have fully constructed
465 * the set that we want to use for the operation. At this 462 * the set that we want to use for the operation. At this
466 * time, the P2P protocol can then begin to exchange the 463 * time, the P2P protocol can then begin to exchange the
@@ -471,13 +468,13 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
471 * @param set the set to use for the operation 468 * @param set the set to use for the operation
472 */ 469 */
473void 470void
474GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh, 471GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
475 struct GNUNET_SET_Handle *set) 472 struct GNUNET_SET_Handle *set)
476{ 473{
477 GNUNET_assert (NULL == oh->set); 474 GNUNET_assert (NULL == oh->set);
478 GNUNET_assert (NULL != oh->conclude_mqm); 475 GNUNET_assert (NULL != oh->conclude_mqm);
479 oh->set = set; 476 oh->set = set;
480 oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, NULL, oh); 477 oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, oh);
481 *oh->request_id_addr = htonl (oh->request_id); 478 *oh->request_id_addr = htonl (oh->request_id);
482 GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm); 479 GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm);
483 oh->conclude_mqm = NULL; 480 oh->conclude_mqm = NULL;
diff --git a/src/set/strata_estimator.c b/src/set/strata_estimator.c
index 024bb99c6..18c127cd6 100644
--- a/src/set/strata_estimator.c
+++ b/src/set/strata_estimator.c
@@ -19,7 +19,7 @@
19*/ 19*/
20 20
21/** 21/**
22 * @file consensus/ibf.h 22 * @file set/ibf.h
23 * @brief invertible bloom filter 23 * @brief invertible bloom filter
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
diff --git a/src/set/strata_estimator.h b/src/set/strata_estimator.h
index b3f050743..718c996d0 100644
--- a/src/set/strata_estimator.h
+++ b/src/set/strata_estimator.h
@@ -19,7 +19,7 @@
19*/ 19*/
20 20
21/** 21/**
22 * @file consensus/strata_estimator.h 22 * @file set/strata_estimator.h
23 * @brief estimator of set difference 23 * @brief estimator of set difference
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c
index f773cebdf..db82b83b4 100644
--- a/src/set/test_set_api.c
+++ b/src/set/test_set_api.c
@@ -95,7 +95,7 @@ listen_cb (void *cls,
95 GNUNET_SET_listen_cancel (listen_handle); 95 GNUNET_SET_listen_cancel (listen_handle);
96 96
97 oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); 97 oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
98 GNUNET_SET_conclude (oh, set2); 98 GNUNET_SET_commit (oh, set2);
99} 99}
100 100
101 101
@@ -111,10 +111,10 @@ start (void *cls)
111 111
112 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, 112 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
113 &app_id, listen_cb, NULL); 113 &app_id, listen_cb, NULL);
114 oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42, 114 oh = GNUNET_SET_prepare (&local_id, &app_id, NULL, 42,
115 GNUNET_SET_RESULT_ADDED, 115 GNUNET_SET_RESULT_ADDED,
116 result_cb_set1, NULL); 116 result_cb_set1, NULL);
117 GNUNET_SET_conclude (oh, set1); 117 GNUNET_SET_commit (oh, set1);
118} 118}
119 119
120 120
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 34f1ea0fa..47ed04117 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -3779,11 +3779,11 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh)
3779 * @param size the number of bytes written 3779 * @param size the number of bytes written
3780 */ 3780 */
3781static void 3781static void
3782mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) 3782mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status,
3783 size_t size)
3783{ 3784{
3784 struct GNUNET_MQ_MessageQueue *mq = cls; 3785 struct GNUNET_MQ_Handle *mq = cls;
3785 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; 3786 struct MQStreamState *mss = GNUNET_MQ_impl_state (mq);
3786 struct GNUNET_MQ_Message *mqm;
3787 3787
3788 switch (status) 3788 switch (status)
3789 { 3789 {
@@ -3793,56 +3793,32 @@ mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size
3793 /* FIXME: call shutdown handler */ 3793 /* FIXME: call shutdown handler */
3794 return; 3794 return;
3795 case GNUNET_STREAM_TIMEOUT: 3795 case GNUNET_STREAM_TIMEOUT:
3796 if (NULL == mq->error_handler) 3796 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT);
3797 LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n");
3798 else
3799 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
3800 return; 3797 return;
3801 case GNUNET_STREAM_SYSERR: 3798 case GNUNET_STREAM_SYSERR:
3802 if (NULL == mq->error_handler) 3799 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
3803 LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n");
3804 else
3805 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE);
3806 return; 3800 return;
3807 default: 3801 default:
3808 GNUNET_assert (0); 3802 GNUNET_assert (0);
3809 return; 3803 return;
3810 } 3804 }
3811
3812 /* call cb for message we finished sending */
3813 mqm = mq->current_msg;
3814 GNUNET_assert (NULL != mq->current_msg);
3815 if (NULL != mqm->sent_cb)
3816 mqm->sent_cb (mqm->sent_cls);
3817 GNUNET_free (mqm);
3818 3805
3819 mss->wh = NULL; 3806 mss->wh = NULL;
3820 3807
3821 mqm = mq->msg_head; 3808 GNUNET_MQ_impl_send_continue (mq);
3822 mq->current_msg = mqm;
3823 if (NULL == mqm)
3824 return;
3825 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
3826 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
3827 GNUNET_TIME_UNIT_FOREVER_REL,
3828 mq_stream_write_queued, mq);
3829 GNUNET_assert (NULL != mss->wh);
3830} 3809}
3831 3810
3832 3811
3833static void 3812static void
3834mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, 3813mq_stream_send_impl (struct GNUNET_MQ_Handle *mq,
3835 struct GNUNET_MQ_Message *mqm) 3814 const struct GNUNET_MessageHeader *msg, void *impl_state)
3836{ 3815{
3837 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; 3816 struct MQStreamState *mss = impl_state;
3838 3817
3839 if (NULL != mq->current_msg) 3818 /* no way to cancel sending now */
3840 { 3819 GNUNET_MQ_impl_send_commit (mq);
3841 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); 3820
3842 return; 3821 mss->wh = GNUNET_STREAM_write (mss->socket, msg, ntohs (msg->size),
3843 }
3844 mq->current_msg = mqm;
3845 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
3846 GNUNET_TIME_UNIT_FOREVER_REL, 3822 GNUNET_TIME_UNIT_FOREVER_REL,
3847 mq_stream_write_queued, mq); 3823 mq_stream_write_queued, mq);
3848} 3824}
@@ -3862,12 +3838,12 @@ mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq,
3862 */ 3838 */
3863static int 3839static int
3864mq_stream_mst_callback (void *cls, void *client, 3840mq_stream_mst_callback (void *cls, void *client,
3865 const struct GNUNET_MessageHeader *message) 3841 const struct GNUNET_MessageHeader *message)
3866{ 3842{
3867 struct GNUNET_MQ_MessageQueue *mq = cls; 3843 struct GNUNET_MQ_Handle *mq = cls;
3868 3844
3869 GNUNET_assert (NULL != message); 3845 GNUNET_assert (NULL != message);
3870 GNUNET_MQ_dispatch (mq, message); 3846 GNUNET_MQ_inject_message (mq, message);
3871 return GNUNET_OK; 3847 return GNUNET_OK;
3872} 3848}
3873 3849
@@ -3889,8 +3865,8 @@ mq_stream_data_processor (void *cls,
3889 const void *data, 3865 const void *data,
3890 size_t size) 3866 size_t size)
3891{ 3867{
3892 struct GNUNET_MQ_MessageQueue *mq = cls; 3868 struct GNUNET_MQ_Handle *mq = cls;
3893 struct MQStreamState *mss; 3869 struct MQStreamState *mss = GNUNET_MQ_impl_state (mq);
3894 int ret; 3870 int ret;
3895 3871
3896 switch (status) 3872 switch (status)
@@ -3901,45 +3877,33 @@ mq_stream_data_processor (void *cls,
3901 /* FIXME: call shutdown handler */ 3877 /* FIXME: call shutdown handler */
3902 return 0; 3878 return 0;
3903 case GNUNET_STREAM_TIMEOUT: 3879 case GNUNET_STREAM_TIMEOUT:
3904 if (NULL == mq->error_handler) 3880 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT);
3905 LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n");
3906 else
3907 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
3908 return 0; 3881 return 0;
3909 case GNUNET_STREAM_SYSERR: 3882 case GNUNET_STREAM_SYSERR:
3910 if (NULL == mq->error_handler) 3883 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
3911 LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n");
3912 else
3913 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
3914 return 0; 3884 return 0;
3915 default: 3885 default:
3916 GNUNET_assert (0); 3886 GNUNET_assert (0);
3917 return 0; 3887 return 0;
3918 } 3888 }
3919 3889
3920 mss = (struct MQStreamState *) mq->impl_state;
3921 GNUNET_assert (GNUNET_STREAM_OK == status);
3922 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); 3890 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
3923 if (GNUNET_OK != ret) 3891 if (GNUNET_OK != ret)
3924 { 3892 {
3925 if (NULL == mq->error_handler) 3893 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
3926 LOG (GNUNET_ERROR_TYPE_WARNING,
3927 "read error (message stream malformed), but no error handler installed for message queue\n");
3928 else
3929 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
3930 return 0; 3894 return 0;
3931 } 3895 }
3932 /* we always read all data */
3933 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, 3896 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
3934 mq_stream_data_processor, mq); 3897 mq_stream_data_processor, mq);
3898 /* we always read all data */
3935 return size; 3899 return size;
3936} 3900}
3937 3901
3938 3902
3939static void 3903static void
3940mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) 3904mq_stream_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
3941{ 3905{
3942 struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; 3906 struct MQStreamState *mss = impl_state;
3943 3907
3944 if (NULL != mss->rh) 3908 if (NULL != mss->rh)
3945 { 3909 {
@@ -3972,24 +3936,21 @@ mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
3972 * @param error_handler callback for errors 3936 * @param error_handler callback for errors
3973 * @return the message queue for the socket 3937 * @return the message queue for the socket
3974 */ 3938 */
3975struct GNUNET_MQ_MessageQueue * 3939struct GNUNET_MQ_Handle *
3976GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, 3940GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket,
3977 const struct GNUNET_MQ_Handler *msg_handlers, 3941 const struct GNUNET_MQ_MessageHandler *msg_handlers,
3978 GNUNET_MQ_ErrorHandler error_handler, 3942 GNUNET_MQ_ErrorHandler error_handler,
3979 void *cls) 3943 void *cls)
3980{ 3944{
3981 struct GNUNET_MQ_MessageQueue *mq; 3945 struct GNUNET_MQ_Handle *mq;
3982 struct MQStreamState *mss; 3946 struct MQStreamState *mss;
3983 3947
3984 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
3985 mss = GNUNET_new (struct MQStreamState); 3948 mss = GNUNET_new (struct MQStreamState);
3986 mss->socket = socket; 3949 mss->socket = socket;
3987 mq->impl_state = mss; 3950 mq = GNUNET_MQ_queue_for_callbacks (mq_stream_send_impl,
3988 mq->send_impl = mq_stream_send_impl; 3951 mq_stream_destroy_impl,
3989 mq->destroy_impl = mq_stream_destroy_impl; 3952 NULL,
3990 mq->handlers = msg_handlers; 3953 mss, msg_handlers, error_handler, cls);
3991 mq->handlers_cls = cls;
3992 mq->error_handler = error_handler;
3993 if (NULL != msg_handlers) 3954 if (NULL != msg_handlers)
3994 { 3955 {
3995 mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); 3956 mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);
diff --git a/src/util/mq.c b/src/util/mq.c
index dc87b9711..d0253c40f 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -31,6 +31,118 @@
31#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) 31#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
32 32
33 33
34struct GNUNET_MQ_Envelope
35{
36 /**
37 * Messages are stored in a linked list.
38 * Each queue has its own list of envelopes.
39 */
40 struct GNUNET_MQ_Envelope *next;
41
42 /**
43 * Messages are stored in a linked list
44 * Each queue has its own list of envelopes.
45 */
46 struct GNUNET_MQ_Envelope *prev;
47
48 /**
49 * Actual allocated message header,
50 * usually points to the end of the containing GNUNET_MQ_Envelope
51 */
52 struct GNUNET_MessageHeader *mh;
53
54 /**
55 * Queue the message is queued in, NULL if message is not queued.
56 */
57 struct GNUNET_MQ_Handle *parent_queue;
58
59 /**
60 * Called after the message was sent irrevocably.
61 */
62 GNUNET_MQ_NotifyCallback sent_cb;
63
64 /**
65 * Closure for send_cb
66 */
67 void *sent_cls;
68};
69
70
71/**
72 * Handle to a message queue.
73 */
74struct GNUNET_MQ_Handle
75{
76 /**
77 * Handlers array, or NULL if the queue should not receive messages
78 */
79 const struct GNUNET_MQ_MessageHandler *handlers;
80
81 /**
82 * Closure for the handler callbacks,
83 * as well as for the error handler.
84 */
85 void *handlers_cls;
86
87 /**
88 * Actual implementation of message sending,
89 * called when a message is added
90 */
91 GNUNET_MQ_SendImpl send_impl;
92
93 /**
94 * Implementation-dependent queue destruction function
95 */
96 GNUNET_MQ_DestroyImpl destroy_impl;
97
98 /**
99 * Implementation-specific state
100 */
101 void *impl_state;
102
103 /**
104 * Callback will be called when an error occurs.
105 */
106 GNUNET_MQ_ErrorHandler error_handler;
107
108 /**
109 * Linked list of messages pending to be sent
110 */
111 struct GNUNET_MQ_Envelope *envelope_head;
112
113 /**
114 * Linked list of messages pending to be sent
115 */
116 struct GNUNET_MQ_Envelope *envelope_tail;
117
118 /**
119 * Message that is currently scheduled to be
120 * sent. Not the head of the message queue, as the implementation
121 * needs to know if sending has been already scheduled or not.
122 */
123 struct GNUNET_MQ_Envelope *current_envelope;
124
125 /**
126 * Has the current envelope been commited?
127 * Either GNUNET_YES or GNUNET_NO.
128 */
129 int commited;
130
131 /**
132 * Map of associations, lazily allocated
133 */
134 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
135
136 /**
137 * Next id that should be used for the assoc_map,
138 * initialized lazily to a random value together with
139 * assoc_map
140 */
141 uint32_t assoc_id;
142};
143
144
145
34 146
35struct ServerClientSocketState 147struct ServerClientSocketState
36{ 148{
@@ -42,9 +154,14 @@ struct ServerClientSocketState
42struct ClientConnectionState 154struct ClientConnectionState
43{ 155{
44 /** 156 /**
45 * Did we call receive? 157 * Did we call receive alread alreadyy?
46 */ 158 */
47 int receive_active; 159 int receive_active;
160
161 /**
162 * Do we also want to receive?
163 */
164 int receive_requested;
48 struct GNUNET_CLIENT_Connection *connection; 165 struct GNUNET_CLIENT_Connection *connection;
49 struct GNUNET_CLIENT_TransmitHandle *th; 166 struct GNUNET_CLIENT_TransmitHandle *th;
50}; 167};
@@ -59,9 +176,9 @@ struct ClientConnectionState
59 * @param mh message to dispatch 176 * @param mh message to dispatch
60 */ 177 */
61void 178void
62GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) 179GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
63{ 180{
64 const struct GNUNET_MQ_Handler *handler; 181 const struct GNUNET_MQ_MessageHandler *handler;
65 int handled = GNUNET_NO; 182 int handled = GNUNET_NO;
66 183
67 handler = mq->handlers; 184 handler = mq->handlers;
@@ -81,8 +198,27 @@ GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_Messa
81} 198}
82 199
83 200
201/**
202 * Call the right callback for an error condition.
203 *
204 * @param mq message queue
205 */
84void 206void
85GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm) 207GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
208 enum GNUNET_MQ_Error error)
209{
210 if (NULL == mq->error_handler)
211 {
212 /* FIXME: log what kind of error occured */
213 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n");
214 return;
215 }
216 mq->error_handler (mq->handlers_cls, error);
217}
218
219
220void
221GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm)
86{ 222{
87 GNUNET_assert (NULL == mqm->parent_queue); 223 GNUNET_assert (NULL == mqm->parent_queue);
88 GNUNET_free (mqm); 224 GNUNET_free (mqm);
@@ -94,20 +230,156 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
94 * May only be called once per message. 230 * May only be called once per message.
95 * 231 *
96 * @param mq message queue 232 * @param mq message queue
97 * @param mqm the message to send. 233 * @param ev the message to send.
234 */
235void
236GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
237{
238 GNUNET_assert (NULL != mq);
239 GNUNET_assert (NULL == ev->parent_queue);
240
241 /* is the implementation busy? queue it! */
242 if (NULL != mq->current_envelope)
243 {
244 GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
245 return;
246 }
247 mq->current_envelope = ev;
248 mq->send_impl (mq, ev->mh, mq->impl_state);
249}
250
251
252/**
253 * Call the send implementation for the next queued message,
254 * if any.
255 * Only useful for implementing message queues,
256 * results in undefined behavior if not used carefully.
257 *
258 * @param mq message queue to send the next message with
98 */ 259 */
99void 260void
100GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) 261GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
101{ 262{
263 /* call is only valid if we're actually currently sending
264 * a message */
102 GNUNET_assert (NULL != mq); 265 GNUNET_assert (NULL != mq);
103 mq->send_impl (mq, mqm); 266 GNUNET_assert (NULL != mq->current_envelope);
267 GNUNET_assert (GNUNET_YES == mq->commited);
268 mq->commited = GNUNET_NO;
269 GNUNET_free (mq->current_envelope);
270 if (NULL == mq->envelope_head)
271 {
272 mq->current_envelope = NULL;
273 return;
274 }
275
276
277 GNUNET_assert (NULL != mq->envelope_tail);
278 GNUNET_assert (NULL != mq->envelope_head);
279 mq->current_envelope = mq->envelope_head;
280 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail,
281 mq->current_envelope);
282 mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
104} 283}
105 284
106 285
107struct GNUNET_MQ_Message * 286/**
287 * Create a message queue for the specified handlers.
288 *
289 * @param send function the implements sending messages
290 * @param destroy function that implements destroying the queue
291 * @param destroy function that implements canceling a message
292 * @param state for the queue, passed to 'send' and 'destroy'
293 * @param handlers array of message handlers
294 * @param error_handler handler for read and write errors
295 * @return a new message queue
296 */
297struct GNUNET_MQ_Handle *
298GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
299 GNUNET_MQ_DestroyImpl destroy,
300 GNUNET_MQ_CancelImpl cancel,
301 void *impl_state,
302 const struct GNUNET_MQ_MessageHandler *handlers,
303 GNUNET_MQ_ErrorHandler error_handler,
304 void *cls)
305{
306 struct GNUNET_MQ_Handle *mq;
307
308 mq = GNUNET_new (struct GNUNET_MQ_Handle);
309 mq->send_impl = send;
310 mq->destroy_impl = destroy;
311 mq->handlers = handlers;
312 mq->handlers_cls = cls;
313 mq->impl_state = impl_state;
314
315 return mq;
316}
317
318
319/**
320 * Get the message that should currently be sent.
321 * Fails if there is no current message.
322 * Only useful for implementing message queues,
323 * results in undefined behavior if not used carefully.
324 *
325 * @param mq message queue with the current message
326 * @return message to send, never NULL
327 */
328const struct GNUNET_MessageHeader *
329GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
330{
331 if (NULL == mq->current_envelope)
332 GNUNET_abort ();
333 if (NULL == mq->current_envelope->mh)
334 GNUNET_abort ();
335 return mq->current_envelope->mh;
336}
337
338
339/**
340 * Get the implementation state associated with the
341 * message queue.
342 *
343 * While the GNUNET_MQ_Impl* callbacks receive the
344 * implementation state, continuations that are scheduled
345 * by the implementation function often only have one closure
346 * argument, with this function it is possible to get at the
347 * implementation state when only passing the GNUNET_MQ_Handle
348 * as closure.
349 *
350 * @param mq message queue with the current message
351 * @return message to send, never NULL
352 */
353void *
354GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
355{
356 return mq->impl_state;
357}
358
359
360
361/**
362 * Mark the current message as irrevocably sent, but do not
363 * proceed with sending the next message.
364 * Will call the appropriate GNUNET_MQ_NotifyCallback, if any.
365 *
366 * @param mq message queue
367 */
368void
369GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq)
370{
371 GNUNET_assert (NULL != mq->current_envelope);
372 GNUNET_assert (GNUNET_NO == mq->commited);
373 mq->commited = GNUNET_YES;
374 if (NULL != mq->current_envelope->sent_cb)
375 mq->current_envelope->sent_cb (mq->current_envelope->sent_cls);
376}
377
378
379struct GNUNET_MQ_Envelope *
108GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) 380GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
109{ 381{
110 struct GNUNET_MQ_Message *mqm; 382 struct GNUNET_MQ_Envelope *mqm;
111 383
112 mqm = GNUNET_malloc (sizeof *mqm + size); 384 mqm = GNUNET_malloc (sizeof *mqm + size);
113 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; 385 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
@@ -119,11 +391,11 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
119} 391}
120 392
121 393
122struct GNUNET_MQ_Message * 394struct GNUNET_MQ_Envelope *
123GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, 395GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
124 const struct GNUNET_MessageHeader *nested_mh) 396 const struct GNUNET_MessageHeader *nested_mh)
125{ 397{
126 struct GNUNET_MQ_Message *mqm; 398 struct GNUNET_MQ_Envelope *mqm;
127 uint16_t size; 399 uint16_t size;
128 400
129 if (NULL == nested_mh) 401 if (NULL == nested_mh)
@@ -154,85 +426,62 @@ static size_t
154transmit_queued (void *cls, size_t size, 426transmit_queued (void *cls, size_t size,
155 void *buf) 427 void *buf)
156{ 428{
157 struct GNUNET_MQ_MessageQueue *mq = cls; 429 struct GNUNET_MQ_Handle *mq = cls;
158 struct GNUNET_MQ_Message *mqm = mq->current_msg; 430 struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq);
159 struct ServerClientSocketState *state = mq->impl_state; 431 const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq);
160 size_t msg_size; 432 size_t msg_size;
161 433
162 GNUNET_assert (NULL != buf); 434 GNUNET_assert (NULL != buf);
163 435
164 if (NULL != mqm->sent_cb) 436 msg_size = ntohs (msg->size);
165 {
166 mqm->sent_cb (mqm->sent_cls);
167 }
168
169 mq->current_msg = NULL;
170 GNUNET_assert (NULL != mqm);
171 msg_size = ntohs (mqm->mh->size);
172 GNUNET_assert (size >= msg_size); 437 GNUNET_assert (size >= msg_size);
173 memcpy (buf, mqm->mh, msg_size); 438 memcpy (buf, msg, msg_size);
174 GNUNET_free (mqm);
175 state->th = NULL; 439 state->th = NULL;
176 440
177 if (NULL != mq->msg_head) 441 GNUNET_MQ_impl_send_continue (mq);
178 { 442
179 mq->current_msg = mq->msg_head;
180 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
181 state->th =
182 GNUNET_SERVER_notify_transmit_ready (state->client, msg_size,
183 GNUNET_TIME_UNIT_FOREVER_REL,
184 &transmit_queued, mq);
185 }
186 return msg_size; 443 return msg_size;
187} 444}
188 445
189 446
190 447
191static void 448static void
192server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) 449server_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
450 void *impl_state)
193{ 451{
194 struct ServerClientSocketState *state; 452 struct ServerClientSocketState *state = impl_state;
195 453
196 GNUNET_assert (NULL != mq); 454 GNUNET_assert (NULL != mq);
197 state = mq->impl_state;
198 GNUNET_assert (NULL != state); 455 GNUNET_assert (NULL != state);
199 GNUNET_SERVER_client_drop (state->client); 456 GNUNET_SERVER_client_drop (state->client);
200 GNUNET_free (state); 457 GNUNET_free (state);
201} 458}
202 459
203static void 460static void
204server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) 461server_client_send_impl (struct GNUNET_MQ_Handle *mq,
462 const struct GNUNET_MessageHeader *msg, void *impl_state)
205{ 463{
206 struct ServerClientSocketState *state; 464 struct ServerClientSocketState *state = impl_state;
207 int msize;
208 465
209 GNUNET_assert (NULL != mq); 466 GNUNET_assert (NULL != mq);
210 state = mq->impl_state;
211 GNUNET_assert (NULL != state); 467 GNUNET_assert (NULL != state);
212 468
213 if (NULL != state->th) 469 GNUNET_MQ_impl_send_commit (mq);
214 { 470
215 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
216 return;
217 }
218 GNUNET_assert (NULL == mq->msg_head);
219 GNUNET_assert (NULL == mq->current_msg);
220 msize = ntohs (mqm->mh->size);
221 mq->current_msg = mqm;
222 state->th = 471 state->th =
223 GNUNET_SERVER_notify_transmit_ready (state->client, msize, 472 GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size),
224 GNUNET_TIME_UNIT_FOREVER_REL, 473 GNUNET_TIME_UNIT_FOREVER_REL,
225 &transmit_queued, mq); 474 &transmit_queued, mq);
226} 475}
227 476
228 477
229struct GNUNET_MQ_MessageQueue * 478struct GNUNET_MQ_Handle *
230GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) 479GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
231{ 480{
232 struct GNUNET_MQ_MessageQueue *mq; 481 struct GNUNET_MQ_Handle *mq;
233 struct ServerClientSocketState *scss; 482 struct ServerClientSocketState *scss;
234 483
235 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); 484 mq = GNUNET_new (struct GNUNET_MQ_Handle);
236 scss = GNUNET_new (struct ServerClientSocketState); 485 scss = GNUNET_new (struct ServerClientSocketState);
237 mq->impl_state = scss; 486 mq->impl_state = scss;
238 scss->client = client; 487 scss->client = client;
@@ -254,24 +503,21 @@ static void
254handle_client_message (void *cls, 503handle_client_message (void *cls,
255 const struct GNUNET_MessageHeader *msg) 504 const struct GNUNET_MessageHeader *msg)
256{ 505{
257 struct GNUNET_MQ_MessageQueue *mq = cls; 506 struct GNUNET_MQ_Handle *mq = cls;
258 struct ClientConnectionState *state; 507 struct ClientConnectionState *state;
259 508
260 state = mq->impl_state; 509 state = mq->impl_state;
261 510
262 if (NULL == msg) 511 if (NULL == msg)
263 { 512 {
264 if (NULL == mq->error_handler) 513 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
265 LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
266 else
267 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
268 return; 514 return;
269 } 515 }
270 516
271 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, 517 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
272 GNUNET_TIME_UNIT_FOREVER_REL); 518 GNUNET_TIME_UNIT_FOREVER_REL);
273 519
274 GNUNET_MQ_dispatch (mq, msg); 520 GNUNET_MQ_inject_message (mq, msg);
275} 521}
276 522
277 523
@@ -287,23 +533,22 @@ static size_t
287connection_client_transmit_queued (void *cls, size_t size, 533connection_client_transmit_queued (void *cls, size_t size,
288 void *buf) 534 void *buf)
289{ 535{
290 struct GNUNET_MQ_MessageQueue *mq = cls; 536 struct GNUNET_MQ_Handle *mq = cls;
291 struct GNUNET_MQ_Message *mqm = mq->current_msg; 537 const struct GNUNET_MessageHeader *msg;
292 struct ClientConnectionState *state = mq->impl_state; 538 struct ClientConnectionState *state = mq->impl_state;
293 size_t msg_size; 539 size_t msg_size;
294 540
541 GNUNET_assert (NULL != mq);
542 msg = GNUNET_MQ_impl_current (mq);
543
295 if (NULL == buf) 544 if (NULL == buf)
296 { 545 {
297 if (NULL == mq->error_handler) 546 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
298 {
299 LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n");
300 return 0;
301 }
302 mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
303 return 0; 547 return 0;
304 } 548 }
305 549
306 if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active)) 550 if ( (GNUNET_YES == state->receive_requested) &&
551 (GNUNET_NO == state->receive_active) )
307 { 552 {
308 state->receive_active = GNUNET_YES; 553 state->receive_active = GNUNET_YES;
309 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, 554 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
@@ -311,78 +556,53 @@ connection_client_transmit_queued (void *cls, size_t size,
311 } 556 }
312 557
313 558
314 GNUNET_assert (NULL != mqm); 559 msg_size = ntohs (msg->size);
315
316 if (NULL != mqm->sent_cb)
317 {
318 mqm->sent_cb (mqm->sent_cls);
319 }
320
321 mq->current_msg = NULL;
322 GNUNET_assert (NULL != buf);
323 msg_size = ntohs (mqm->mh->size);
324 GNUNET_assert (size >= msg_size); 560 GNUNET_assert (size >= msg_size);
325 memcpy (buf, mqm->mh, msg_size); 561 memcpy (buf, msg, msg_size);
326 GNUNET_free (mqm);
327 state->th = NULL; 562 state->th = NULL;
328 if (NULL != mq->msg_head) 563
329 { 564 GNUNET_MQ_impl_send_continue (mq);
330 mq->current_msg = mq->msg_head; 565
331 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
332 state->th =
333 GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size),
334 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
335 &connection_client_transmit_queued, mq);
336 }
337 return msg_size; 566 return msg_size;
338} 567}
339 568
340 569
341 570
342static void 571static void
343connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) 572connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
344{ 573{
345 GNUNET_free (mq->impl_state); 574 GNUNET_free (impl_state);
346} 575}
347 576
348static void 577static void
349connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, 578connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
350 struct GNUNET_MQ_Message *mqm) 579 const struct GNUNET_MessageHeader *msg, void *impl_state)
351{ 580{
352 struct ClientConnectionState *state = mq->impl_state; 581 struct ClientConnectionState *state = impl_state;
353 int msize;
354 582
355 GNUNET_assert (NULL != state); 583 GNUNET_assert (NULL != state);
584 GNUNET_assert (NULL == state->th);
585
586 GNUNET_MQ_impl_send_commit (mq);
356 587
357 if (NULL != state->th)
358 {
359 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
360 return;
361 }
362 GNUNET_assert (NULL == mq->current_msg);
363 mq->current_msg = mqm;
364 msize = ntohs (mqm->mh->size);
365 state->th = 588 state->th =
366 GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, 589 GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size),
367 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, 590 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
368 &connection_client_transmit_queued, mq); 591 &connection_client_transmit_queued, mq);
369} 592}
370 593
371 594
372 595struct GNUNET_MQ_Handle *
373
374
375struct GNUNET_MQ_MessageQueue *
376GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, 596GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
377 const struct GNUNET_MQ_Handler *handlers, 597 const struct GNUNET_MQ_MessageHandler *handlers,
378 void *cls) 598 void *cls)
379{ 599{
380 struct GNUNET_MQ_MessageQueue *mq; 600 struct GNUNET_MQ_Handle *mq;
381 struct ClientConnectionState *state; 601 struct ClientConnectionState *state;
382 602
383 GNUNET_assert (NULL != connection); 603 GNUNET_assert (NULL != connection);
384 604
385 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); 605 mq = GNUNET_new (struct GNUNET_MQ_Handle);
386 mq->handlers = handlers; 606 mq->handlers = handlers;
387 mq->handlers_cls = cls; 607 mq->handlers_cls = cls;
388 state = GNUNET_new (struct ClientConnectionState); 608 state = GNUNET_new (struct ClientConnectionState);
@@ -390,16 +610,20 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti
390 mq->impl_state = state; 610 mq->impl_state = state;
391 mq->send_impl = connection_client_send_impl; 611 mq->send_impl = connection_client_send_impl;
392 mq->destroy_impl = connection_client_destroy_impl; 612 mq->destroy_impl = connection_client_destroy_impl;
613 if (NULL != handlers)
614 state->receive_requested = GNUNET_YES;
393 615
394 return mq; 616 return mq;
395} 617}
396 618
397 619
398void 620void
399GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, 621GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
400 const struct GNUNET_MQ_Handler *new_handlers, 622 const struct GNUNET_MQ_MessageHandler *new_handlers,
401 void *cls) 623 void *cls)
402{ 624{
625 /* FIXME: notify implementation? */
626 /* FIXME: what about NULL handlers? abort receive? */
403 mq->handlers = new_handlers; 627 mq->handlers = new_handlers;
404 mq->handlers_cls = cls; 628 mq->handlers_cls = cls;
405} 629}
@@ -413,8 +637,7 @@ GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
413 * @param assoc_data to associate 637 * @param assoc_data to associate
414 */ 638 */
415uint32_t 639uint32_t
416GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, 640GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq,
417 struct GNUNET_MQ_Message *mqm,
418 void *assoc_data) 641 void *assoc_data)
419{ 642{
420 uint32_t id; 643 uint32_t id;
@@ -433,7 +656,7 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
433 656
434 657
435void * 658void *
436GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) 659GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
437{ 660{
438 if (NULL == mq->assoc_map) 661 if (NULL == mq->assoc_map)
439 return NULL; 662 return NULL;
@@ -442,7 +665,7 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
442 665
443 666
444void * 667void *
445GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) 668GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
446{ 669{
447 void *val; 670 void *val;
448 671
@@ -456,7 +679,7 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
456 679
457 680
458void 681void
459GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, 682GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm,
460 GNUNET_MQ_NotifyCallback cb, 683 GNUNET_MQ_NotifyCallback cb,
461 void *cls) 684 void *cls)
462{ 685{
@@ -466,13 +689,13 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
466 689
467 690
468void 691void
469GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) 692GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
470{ 693{
471 /* FIXME: destroy all pending messages in the queue */ 694 /* FIXME: destroy all pending messages in the queue */
472 695
473 if (NULL != mq->destroy_impl) 696 if (NULL != mq->destroy_impl)
474 { 697 {
475 mq->destroy_impl (mq); 698 mq->destroy_impl (mq, mq->impl_state);
476 } 699 }
477 700
478 GNUNET_free (mq); 701 GNUNET_free (mq);
@@ -480,7 +703,6 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
480 703
481 704
482 705
483
484struct GNUNET_MessageHeader * 706struct GNUNET_MessageHeader *
485GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) 707GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
486{ 708{
diff --git a/src/util/test_mq.c b/src/util/test_mq.c
index 55cd80ef1..45bba0a6b 100644
--- a/src/util/test_mq.c
+++ b/src/util/test_mq.c
@@ -40,7 +40,7 @@ GNUNET_NETWORK_STRUCT_END
40void 40void
41test1 (void) 41test1 (void)
42{ 42{
43 struct GNUNET_MQ_Message *mqm; 43 struct GNUNET_MQ_Envelope *mqm;
44 struct MyMessage *mm; 44 struct MyMessage *mm;
45 45
46 mm = NULL; 46 mm = NULL;
@@ -57,7 +57,7 @@ test1 (void)
57void 57void
58test2 (void) 58test2 (void)
59{ 59{
60 struct GNUNET_MQ_Message *mqm; 60 struct GNUNET_MQ_Envelope *mqm;
61 struct GNUNET_MessageHeader *mh; 61 struct GNUNET_MessageHeader *mh;
62 62
63 mqm = GNUNET_MQ_msg_header (42); 63 mqm = GNUNET_MQ_msg_header (42);
diff --git a/src/util/test_mq_client.c b/src/util/test_mq_client.c
index b7eb1516a..30e498fcc 100644
--- a/src/util/test_mq_client.c
+++ b/src/util/test_mq_client.c
@@ -60,6 +60,9 @@ recv_cb (void *cls, struct GNUNET_SERVER_Client *argclient,
60 return; 60 return;
61 } 61 }
62 62
63 /* can happen if notify does not work */
64 GNUNET_assert (received < 2);
65
63 GNUNET_SERVER_receive_done (argclient, GNUNET_YES); 66 GNUNET_SERVER_receive_done (argclient, GNUNET_YES);
64} 67}
65 68
@@ -98,14 +101,16 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = {
98 101
99void send_cb (void *cls) 102void send_cb (void *cls)
100{ 103{
104 /* the notify should only be called once */
105 GNUNET_assert (GNUNET_NO == notify);
101 printf ("notify sent\n"); 106 printf ("notify sent\n");
102 notify = GNUNET_YES; 107 notify = GNUNET_YES;
103} 108}
104 109
105void test_mq (struct GNUNET_CLIENT_Connection *client) 110void test_mq (struct GNUNET_CLIENT_Connection *client)
106{ 111{
107 struct GNUNET_MQ_MessageQueue *mq; 112 struct GNUNET_MQ_Handle *mq;
108 struct GNUNET_MQ_Message *mqm; 113 struct GNUNET_MQ_Envelope *mqm;
109 114
110 /* FIXME: test handling responses */ 115 /* FIXME: test handling responses */
111 mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL); 116 mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL);