diff options
26 files changed, 1550 insertions, 1128 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index e3ddb4913..684580755 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -33,37 +33,6 @@ | |||
33 | 33 | ||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) | 34 | #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) |
35 | 35 | ||
36 | /** | ||
37 | * Actions that can be queued. | ||
38 | */ | ||
39 | struct QueuedMessage | ||
40 | { | ||
41 | /** | ||
42 | * Queued messages are stored in a doubly linked list. | ||
43 | */ | ||
44 | struct QueuedMessage *next; | ||
45 | |||
46 | /** | ||
47 | * Queued messages are stored in a doubly linked list. | ||
48 | */ | ||
49 | struct QueuedMessage *prev; | ||
50 | |||
51 | /** | ||
52 | * The actual queued message. | ||
53 | */ | ||
54 | struct GNUNET_MessageHeader *msg; | ||
55 | |||
56 | /** | ||
57 | * Will be called after transmit, if not NULL | ||
58 | */ | ||
59 | GNUNET_CONSENSUS_InsertDoneCallback idc; | ||
60 | |||
61 | /** | ||
62 | * Closure for idc | ||
63 | */ | ||
64 | void *idc_cls; | ||
65 | }; | ||
66 | |||
67 | 36 | ||
68 | /** | 37 | /** |
69 | * Handle for the service. | 38 | * Handle for the service. |
@@ -106,21 +75,11 @@ struct GNUNET_CONSENSUS_Handle | |||
106 | struct GNUNET_PeerIdentity **peers; | 75 | struct GNUNET_PeerIdentity **peers; |
107 | 76 | ||
108 | /** | 77 | /** |
109 | * Currently active transmit request. | ||
110 | */ | ||
111 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
112 | |||
113 | /** | ||
114 | * GNUNES_YES iff the join message has been sent to the service. | 78 | * GNUNES_YES iff the join message has been sent to the service. |
115 | */ | 79 | */ |
116 | int joined; | 80 | int joined; |
117 | 81 | ||
118 | /** | 82 | /** |
119 | * Closure for the insert done callback. | ||
120 | */ | ||
121 | void *idc_cls; | ||
122 | |||
123 | /** | ||
124 | * Called when the conclude operation finishes or fails. | 83 | * Called when the conclude operation finishes or fails. |
125 | */ | 84 | */ |
126 | GNUNET_CONSENSUS_ConcludeCallback conclude_cb; | 85 | GNUNET_CONSENSUS_ConcludeCallback conclude_cb; |
@@ -135,109 +94,36 @@ struct GNUNET_CONSENSUS_Handle | |||
135 | */ | 94 | */ |
136 | struct GNUNET_TIME_Absolute conclude_deadline; | 95 | struct GNUNET_TIME_Absolute conclude_deadline; |
137 | 96 | ||
138 | unsigned int conclude_min_size; | 97 | /** |
139 | 98 | * Message queue for the client. | |
140 | struct QueuedMessage *messages_head; | 99 | */ |
141 | 100 | struct GNUNET_MQ_Handle *mq; | |
142 | struct QueuedMessage *messages_tail; | ||
143 | |||
144 | }; | 101 | }; |
145 | 102 | ||
146 | |||
147 | |||
148 | /** | 103 | /** |
149 | * Schedule transmitting the next message. | 104 | * FIXME: this should not bee necessary when the API |
150 | * | 105 | * issue has been fixed |
151 | * @param consensus consensus handle | ||
152 | */ | 106 | */ |
153 | static void | 107 | struct InsertDoneInfo |
154 | send_next (struct GNUNET_CONSENSUS_Handle *consensus); | ||
155 | |||
156 | |||
157 | /** | ||
158 | * Function called to notify a client about the connection | ||
159 | * begin ready to queue more data. "buf" will be | ||
160 | * NULL and "size" zero if the connection was closed for | ||
161 | * writing in the meantime. | ||
162 | * | ||
163 | * @param cls closure | ||
164 | * @param size number of bytes available in buf | ||
165 | * @param buf where the callee should write the message | ||
166 | * @return number of bytes written to buf | ||
167 | */ | ||
168 | static size_t | ||
169 | transmit_queued (void *cls, size_t size, | ||
170 | void *buf) | ||
171 | { | 108 | { |
172 | struct GNUNET_CONSENSUS_Handle *consensus; | 109 | GNUNET_CONSENSUS_InsertDoneCallback idc; |
173 | struct QueuedMessage *qmsg; | 110 | void *cls; |
174 | size_t msg_size; | 111 | }; |
175 | |||
176 | consensus = (struct GNUNET_CONSENSUS_Handle *) cls; | ||
177 | consensus->th = NULL; | ||
178 | |||
179 | qmsg = consensus->messages_head; | ||
180 | GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); | ||
181 | |||
182 | if (NULL == buf) | ||
183 | { | ||
184 | if (NULL != qmsg->idc) | ||
185 | { | ||
186 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); | ||
187 | } | ||
188 | return 0; | ||
189 | } | ||
190 | |||
191 | msg_size = ntohs (qmsg->msg->size); | ||
192 | |||
193 | GNUNET_assert (size >= msg_size); | ||
194 | |||
195 | memcpy (buf, qmsg->msg, msg_size); | ||
196 | if (NULL != qmsg->idc) | ||
197 | { | ||
198 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); | ||
199 | } | ||
200 | GNUNET_free (qmsg->msg); | ||
201 | GNUNET_free (qmsg); | ||
202 | /* FIXME: free the messages */ | ||
203 | |||
204 | send_next (consensus); | ||
205 | |||
206 | return msg_size; | ||
207 | } | ||
208 | |||
209 | |||
210 | /** | ||
211 | * Schedule transmitting the next message. | ||
212 | * | ||
213 | * @param consensus consensus handle | ||
214 | */ | ||
215 | static void | ||
216 | send_next (struct GNUNET_CONSENSUS_Handle *consensus) | ||
217 | { | ||
218 | if (NULL != consensus->th) | ||
219 | return; | ||
220 | |||
221 | if (NULL != consensus->messages_head) | ||
222 | { | ||
223 | consensus->th = | ||
224 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size), | ||
225 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
226 | GNUNET_NO, &transmit_queued, consensus); | ||
227 | } | ||
228 | } | ||
229 | 112 | ||
230 | 113 | ||
231 | /** | 114 | /** |
232 | * Called when the server has sent is a new element | 115 | * Called when the server has sent is a new element |
233 | * | 116 | * |
234 | * @param consensus consensus handle | 117 | * @param cls consensus handle |
235 | * @param msg element message | 118 | * @param mh element message |
236 | */ | 119 | */ |
237 | static void | 120 | static void |
238 | handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, | 121 | handle_new_element (void *cls, |
239 | struct GNUNET_CONSENSUS_ElementMessage *msg) | 122 | const struct GNUNET_MessageHeader *mh) |
240 | { | 123 | { |
124 | struct GNUNET_CONSENSUS_Handle *consensus = cls; | ||
125 | const struct GNUNET_CONSENSUS_ElementMessage *msg | ||
126 | = (const struct GNUNET_CONSENSUS_ElementMessage *) mh; | ||
241 | struct GNUNET_SET_Element element; | 127 | struct GNUNET_SET_Element element; |
242 | 128 | ||
243 | LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); | 129 | LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); |
@@ -247,8 +133,6 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, | |||
247 | element.data = &msg[1]; | 133 | element.data = &msg[1]; |
248 | 134 | ||
249 | consensus->new_element_cb (consensus->new_element_cls, &element); | 135 | consensus->new_element_cb (consensus->new_element_cls, &element); |
250 | |||
251 | send_next (consensus); | ||
252 | } | 136 | } |
253 | 137 | ||
254 | 138 | ||
@@ -256,13 +140,15 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, | |||
256 | * Called when the server has announced | 140 | * Called when the server has announced |
257 | * that the conclusion is over. | 141 | * that the conclusion is over. |
258 | * | 142 | * |
259 | * @param consensus consensus handle | 143 | * @param cls consensus handle |
260 | * @param msg conclude done message | 144 | * @param mh conclude done message |
261 | */ | 145 | */ |
262 | static void | 146 | static void |
263 | handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, | 147 | handle_conclude_done (void *cls, |
264 | const struct GNUNET_MessageHeader *msg) | 148 | const struct GNUNET_MessageHeader *msg) |
265 | { | 149 | { |
150 | struct GNUNET_CONSENSUS_Handle *consensus = cls; | ||
151 | |||
266 | GNUNET_CONSENSUS_ConcludeCallback cc; | 152 | GNUNET_CONSENSUS_ConcludeCallback cc; |
267 | 153 | ||
268 | GNUNET_assert (NULL != (cc = consensus->conclude_cb)); | 154 | GNUNET_assert (NULL != (cc = consensus->conclude_cb)); |
@@ -272,89 +158,6 @@ handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, | |||
272 | 158 | ||
273 | 159 | ||
274 | /** | 160 | /** |
275 | * Type of a function to call when we receive a message | ||
276 | * from the service. | ||
277 | * | ||
278 | * @param cls closure | ||
279 | * @param msg message received, NULL on timeout or fatal error | ||
280 | */ | ||
281 | static void | ||
282 | message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | ||
283 | { | ||
284 | struct GNUNET_CONSENSUS_Handle *consensus = cls; | ||
285 | |||
286 | LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n"); | ||
287 | |||
288 | if (NULL == msg) | ||
289 | { | ||
290 | /* Error, timeout, death */ | ||
291 | LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n"); | ||
292 | GNUNET_CLIENT_disconnect (consensus->client); | ||
293 | consensus->client = NULL; | ||
294 | consensus->new_element_cb (consensus->new_element_cls, NULL); | ||
295 | return; | ||
296 | } | ||
297 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, | ||
298 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
299 | switch (ntohs (msg->type)) | ||
300 | { | ||
301 | case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: | ||
302 | handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); | ||
303 | break; | ||
304 | case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE: | ||
305 | handle_conclude_done (consensus, msg); | ||
306 | break; | ||
307 | default: | ||
308 | GNUNET_break (0); | ||
309 | } | ||
310 | } | ||
311 | |||
312 | /** | ||
313 | * Function called to notify a client about the connection | ||
314 | * begin ready to queue more data. "buf" will be | ||
315 | * NULL and "size" zero if the connection was closed for | ||
316 | * writing in the meantime. | ||
317 | * | ||
318 | * @param cls closure | ||
319 | * @param size number of bytes available in buf | ||
320 | * @param buf where the callee should write the message | ||
321 | * @return number of bytes written to buf | ||
322 | */ | ||
323 | static size_t | ||
324 | transmit_join (void *cls, size_t size, void *buf) | ||
325 | { | ||
326 | struct GNUNET_CONSENSUS_JoinMessage *msg; | ||
327 | struct GNUNET_CONSENSUS_Handle *consensus; | ||
328 | int msize; | ||
329 | |||
330 | GNUNET_assert (NULL != buf); | ||
331 | |||
332 | LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n"); | ||
333 | |||
334 | consensus = cls; | ||
335 | consensus->th = NULL; | ||
336 | consensus->joined = 1; | ||
337 | |||
338 | msg = buf; | ||
339 | |||
340 | msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) + | ||
341 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity); | ||
342 | |||
343 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); | ||
344 | msg->header.size = htons (msize); | ||
345 | msg->session_id = consensus->session_id; | ||
346 | msg->num_peers = htonl (consensus->num_peers); | ||
347 | memcpy(&msg[1], | ||
348 | consensus->peers, | ||
349 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
350 | send_next (consensus); | ||
351 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, | ||
352 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
353 | |||
354 | return msize; | ||
355 | } | ||
356 | |||
357 | /** | ||
358 | * Create a consensus session. | 161 | * Create a consensus session. |
359 | * | 162 | * |
360 | * @param cfg configuration to use for connecting to the consensus service | 163 | * @param cfg configuration to use for connecting to the consensus service |
@@ -377,7 +180,15 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
377 | void *new_element_cls) | 180 | void *new_element_cls) |
378 | { | 181 | { |
379 | struct GNUNET_CONSENSUS_Handle *consensus; | 182 | struct GNUNET_CONSENSUS_Handle *consensus; |
380 | size_t join_message_size; | 183 | struct GNUNET_CONSENSUS_JoinMessage *join_msg; |
184 | struct GNUNET_MQ_Envelope *ev; | ||
185 | const static struct GNUNET_MQ_MessageHandler mq_handlers[] = { | ||
186 | {handle_new_element, | ||
187 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0}, | ||
188 | {handle_conclude_done, | ||
189 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0}, | ||
190 | GNUNET_MQ_HANDLERS_END | ||
191 | }; | ||
381 | 192 | ||
382 | consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); | 193 | consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); |
383 | consensus->cfg = cfg; | 194 | consensus->cfg = cfg; |
@@ -393,24 +204,33 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
393 | GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); | 204 | GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); |
394 | 205 | ||
395 | consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); | 206 | consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); |
207 | consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client, | ||
208 | mq_handlers, consensus); | ||
396 | 209 | ||
397 | GNUNET_assert (consensus->client != NULL); | 210 | GNUNET_assert (consensus->client != NULL); |
398 | 211 | ||
399 | join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) + | 212 | ev = GNUNET_MQ_msg_extra (join_msg, |
400 | (num_peers * sizeof (struct GNUNET_PeerIdentity)); | 213 | (num_peers * sizeof (struct GNUNET_PeerIdentity)), |
401 | 214 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); | |
402 | consensus->th = | ||
403 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | ||
404 | join_message_size, | ||
405 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
406 | GNUNET_NO, &transmit_join, consensus); | ||
407 | 215 | ||
216 | join_msg->session_id = consensus->session_id; | ||
217 | join_msg->num_peers = htonl (consensus->num_peers); | ||
218 | memcpy(&join_msg[1], | ||
219 | consensus->peers, | ||
220 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
408 | 221 | ||
409 | GNUNET_assert (consensus->th != NULL); | 222 | GNUNET_MQ_send (consensus->mq, ev); |
410 | return consensus; | 223 | return consensus; |
411 | } | 224 | } |
412 | 225 | ||
413 | 226 | ||
227 | static void | ||
228 | idc_adapter (void *cls) | ||
229 | { | ||
230 | struct InsertDoneInfo *i = cls; | ||
231 | i->idc (i->cls, GNUNET_OK); | ||
232 | GNUNET_free (i); | ||
233 | } | ||
414 | 234 | ||
415 | /** | 235 | /** |
416 | * Insert an element in the set being reconsiled. Must not be called after | 236 | * Insert an element in the set being reconsiled. Must not be called after |
@@ -428,28 +248,24 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | |||
428 | GNUNET_CONSENSUS_InsertDoneCallback idc, | 248 | GNUNET_CONSENSUS_InsertDoneCallback idc, |
429 | void *idc_cls) | 249 | void *idc_cls) |
430 | { | 250 | { |
431 | struct QueuedMessage *qmsg; | ||
432 | struct GNUNET_CONSENSUS_ElementMessage *element_msg; | 251 | struct GNUNET_CONSENSUS_ElementMessage *element_msg; |
433 | size_t element_msg_size; | 252 | struct GNUNET_MQ_Envelope *ev; |
253 | struct InsertDoneInfo *i; | ||
434 | 254 | ||
435 | LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); | 255 | LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); |
436 | 256 | ||
437 | element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + | 257 | ev = GNUNET_MQ_msg_extra (element_msg, element->size, |
438 | element->size); | 258 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); |
439 | 259 | ||
440 | element_msg = GNUNET_malloc (element_msg_size); | ||
441 | element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); | ||
442 | element_msg->header.size = htons (element_msg_size); | ||
443 | memcpy (&element_msg[1], element->data, element->size); | 260 | memcpy (&element_msg[1], element->data, element->size); |
444 | 261 | ||
445 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); | 262 | if (NULL != idc) |
446 | qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; | 263 | { |
447 | qmsg->idc = idc; | 264 | i = GNUNET_new (struct InsertDoneInfo); |
448 | qmsg->idc_cls = idc_cls; | 265 | i->idc = idc; |
449 | 266 | i->cls = idc_cls; | |
450 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); | 267 | GNUNET_MQ_notify_sent (ev, idc_adapter, i); |
451 | 268 | } | |
452 | send_next (consensus); | ||
453 | } | 269 | } |
454 | 270 | ||
455 | 271 | ||
@@ -471,7 +287,7 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, | |||
471 | GNUNET_CONSENSUS_ConcludeCallback conclude, | 287 | GNUNET_CONSENSUS_ConcludeCallback conclude, |
472 | void *conclude_cls) | 288 | void *conclude_cls) |
473 | { | 289 | { |
474 | struct QueuedMessage *qmsg; | 290 | struct GNUNET_MQ_Envelope *ev; |
475 | struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; | 291 | struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; |
476 | 292 | ||
477 | GNUNET_assert (NULL != conclude); | 293 | GNUNET_assert (NULL != conclude); |
@@ -480,17 +296,10 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, | |||
480 | consensus->conclude_cls = conclude_cls; | 296 | consensus->conclude_cls = conclude_cls; |
481 | consensus->conclude_cb = conclude; | 297 | consensus->conclude_cb = conclude; |
482 | 298 | ||
483 | conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); | 299 | ev = GNUNET_MQ_msg (conclude_msg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); |
484 | conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); | ||
485 | conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); | ||
486 | conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); | 300 | conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); |
487 | 301 | ||
488 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); | 302 | GNUNET_MQ_send (consensus->mq, ev); |
489 | qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; | ||
490 | |||
491 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); | ||
492 | |||
493 | send_next (consensus); | ||
494 | } | 303 | } |
495 | 304 | ||
496 | 305 | ||
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 5ebff524c..1c2c78422 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -116,7 +116,7 @@ struct ConsensusSession | |||
116 | /** | 116 | /** |
117 | * Queued messages to the client. | 117 | * Queued messages to the client. |
118 | */ | 118 | */ |
119 | struct GNUNET_MQ_MessageQueue *client_mq; | 119 | struct GNUNET_MQ_Handle *client_mq; |
120 | 120 | ||
121 | /** | 121 | /** |
122 | * Timeout for all rounds together, single rounds will schedule a timeout task | 122 | * Timeout for all rounds together, single rounds will schedule a timeout task |
@@ -217,9 +217,9 @@ struct ConsensusPeerInformation | |||
217 | struct GNUNET_SET_OperationHandle *set_op; | 217 | struct GNUNET_SET_OperationHandle *set_op; |
218 | 218 | ||
219 | /** | 219 | /** |
220 | * Has conclude been called on the set_op? | 220 | * Has commit been called on the set_op? |
221 | */ | 221 | */ |
222 | int set_op_concluded; | 222 | int set_op_commited; |
223 | }; | 223 | }; |
224 | 224 | ||
225 | 225 | ||
@@ -548,14 +548,14 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
548 | GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); | 548 | GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); |
549 | } | 549 | } |
550 | session->partner_outgoing->set_op = | 550 | session->partner_outgoing->set_op = |
551 | GNUNET_SET_evaluate (&session->partner_outgoing->peer_id, | 551 | GNUNET_SET_prepare (&session->partner_outgoing->peer_id, |
552 | &session->global_id, | 552 | &session->global_id, |
553 | (struct GNUNET_MessageHeader *) msg, | 553 | (struct GNUNET_MessageHeader *) msg, |
554 | 0, /* FIXME */ | 554 | 0, /* FIXME */ |
555 | GNUNET_SET_RESULT_ADDED, | 555 | GNUNET_SET_RESULT_ADDED, |
556 | set_result_cb, session->partner_outgoing); | 556 | set_result_cb, session->partner_outgoing); |
557 | GNUNET_SET_conclude (session->partner_outgoing->set_op, session->element_set); | 557 | GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set); |
558 | session->partner_outgoing->set_op_concluded = GNUNET_YES; | 558 | session->partner_outgoing->set_op_commited = GNUNET_YES; |
559 | } | 559 | } |
560 | 560 | ||
561 | #ifdef GNUNET_EXTRA_LOGGING | 561 | #ifdef GNUNET_EXTRA_LOGGING |
@@ -767,12 +767,12 @@ set_listen_cb (void *cls, | |||
767 | set_result_cb, &session->info[index]); | 767 | set_result_cb, &session->info[index]); |
768 | if (ntohl (msg->exp_subround) == session->exp_subround) | 768 | if (ntohl (msg->exp_subround) == session->exp_subround) |
769 | { | 769 | { |
770 | cpi->set_op_concluded = GNUNET_YES; | 770 | cpi->set_op_commited = GNUNET_YES; |
771 | GNUNET_SET_conclude (cpi->set_op, session->element_set); | 771 | GNUNET_SET_commit (cpi->set_op, session->element_set); |
772 | } | 772 | } |
773 | else | 773 | else |
774 | { | 774 | { |
775 | cpi->set_op_concluded = GNUNET_NO; | 775 | cpi->set_op_commited = GNUNET_NO; |
776 | } | 776 | } |
777 | break; | 777 | break; |
778 | default: | 778 | default: |
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index b76fd9ab0..92ca5d9da 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c | |||
@@ -769,7 +769,7 @@ build_set (void *cls) | |||
769 | if (DEFAULT_FISHEYE_DEPTH - 1 == neighbor->consensus_insertion_distance) | 769 | if (DEFAULT_FISHEYE_DEPTH - 1 == neighbor->consensus_insertion_distance) |
770 | { | 770 | { |
771 | /* we have added all elements to the set, run the operation */ | 771 | /* we have added all elements to the set, run the operation */ |
772 | GNUNET_SET_conclude (neighbor->set_op, | 772 | GNUNET_SET_commit (neighbor->set_op, |
773 | neighbor->my_set); | 773 | neighbor->my_set); |
774 | GNUNET_SET_destroy (neighbor->my_set); | 774 | GNUNET_SET_destroy (neighbor->my_set); |
775 | neighbor->my_set = NULL; | 775 | neighbor->my_set = NULL; |
@@ -1425,13 +1425,13 @@ initiate_set_union (void *cls, | |||
1425 | neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK; | 1425 | neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK; |
1426 | neighbor->my_set = GNUNET_SET_create (cfg, | 1426 | neighbor->my_set = GNUNET_SET_create (cfg, |
1427 | GNUNET_SET_OPERATION_UNION); | 1427 | GNUNET_SET_OPERATION_UNION); |
1428 | neighbor->set_op = GNUNET_SET_evaluate (&neighbor->peer, | 1428 | neighbor->set_op = GNUNET_SET_prepare (&neighbor->peer, |
1429 | &neighbor->real_session_id, | 1429 | &neighbor->real_session_id, |
1430 | NULL, | 1430 | NULL, |
1431 | 0 /* FIXME: salt */, | 1431 | 0 /* FIXME: salt */, |
1432 | GNUNET_SET_RESULT_ADDED, | 1432 | GNUNET_SET_RESULT_ADDED, |
1433 | &handle_set_union_result, | 1433 | &handle_set_union_result, |
1434 | neighbor); | 1434 | neighbor); |
1435 | build_set (neighbor); | 1435 | build_set (neighbor); |
1436 | } | 1436 | } |
1437 | 1437 | ||
diff --git a/src/include/gnunet_container_lib.h b/src/include/gnunet_container_lib.h index d52591148..1eb55a4c5 100644 --- a/src/include/gnunet_container_lib.h +++ b/src/include/gnunet_container_lib.h | |||
@@ -534,7 +534,7 @@ enum GNUNET_CONTAINER_MultiHashMapOption | |||
534 | * GNUNET_NO if not. | 534 | * GNUNET_NO if not. |
535 | */ | 535 | */ |
536 | typedef int (*GNUNET_CONTAINER_HashMapIterator) (void *cls, | 536 | typedef int (*GNUNET_CONTAINER_HashMapIterator) (void *cls, |
537 | const struct GNUNET_HashCode * key, | 537 | const struct GNUNET_HashCode *key, |
538 | void *value); | 538 | void *value); |
539 | 539 | ||
540 | 540 | ||
diff --git a/src/include/gnunet_mesh2_service.h b/src/include/gnunet_mesh2_service.h index c69ac2bfd..b6593cf99 100644 --- a/src/include/gnunet_mesh2_service.h +++ b/src/include/gnunet_mesh2_service.h | |||
@@ -162,7 +162,8 @@ typedef void (GNUNET_MESH_TunnelEndHandler) (void *cls, | |||
162 | * the tunnel. | 162 | * the tunnel. |
163 | * @param handlers Callbacks for messages we care about, NULL-terminated. Each | 163 | * @param handlers Callbacks for messages we care about, NULL-terminated. Each |
164 | * one must call GNUNET_MESH_receive_done on the tunnel to | 164 | * one must call GNUNET_MESH_receive_done on the tunnel to |
165 | * receive the next message. | 165 | * receive the next message. Messages of a type that is not |
166 | * in the handlers array are ignored if received. | ||
166 | * @param ports NULL or 0-terminated array of port numbers for incoming tunnels. | 167 | * @param ports NULL or 0-terminated array of port numbers for incoming tunnels. |
167 | * | 168 | * |
168 | * @return handle to the mesh service NULL on error | 169 | * @return handle to the mesh service NULL on error |
@@ -325,7 +326,7 @@ typedef void (*GNUNET_MESH_TunnelCB) (void *cls, | |||
325 | /** | 326 | /** |
326 | * Request information about the running mesh peer. | 327 | * Request information about the running mesh peer. |
327 | * The callback will be called for every tunnel known to the service, | 328 | * The callback will be called for every tunnel known to the service, |
328 | * listing all active peers that blong to the tunnel. | 329 | * listing all active peers that belong to the tunnel. |
329 | * | 330 | * |
330 | * If called again on the same handle, it will overwrite the previous | 331 | * If called again on the same handle, it will overwrite the previous |
331 | * callback and cls. To retrieve the cls, monitor_cancel must be | 332 | * callback and cls. To retrieve the cls, monitor_cancel must be |
@@ -375,6 +376,18 @@ void * | |||
375 | GNUNET_MESH_get_tunnels_cancel (struct GNUNET_MESH_Handle *h); | 376 | GNUNET_MESH_get_tunnels_cancel (struct GNUNET_MESH_Handle *h); |
376 | 377 | ||
377 | 378 | ||
379 | /** | ||
380 | * Create a message queue for a mesh tunnel. | ||
381 | * The message queue can only be used to transmit messages, | ||
382 | * not to receive them. | ||
383 | * | ||
384 | * @param tunnel the tunnel to create the message qeue for | ||
385 | * @return a message queue to messages over the tunnel | ||
386 | */ | ||
387 | struct GNUNET_MQ_Handle * | ||
388 | GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel); | ||
389 | |||
390 | |||
378 | #if 0 /* keep Emacsens' auto-indent happy */ | 391 | #if 0 /* keep Emacsens' auto-indent happy */ |
379 | { | 392 | { |
380 | #endif | 393 | #endif |
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h index 54ea806a5..b73cab8d8 100644 --- a/src/include/gnunet_mq_lib.h +++ b/src/include/gnunet_mq_lib.h | |||
@@ -21,7 +21,7 @@ | |||
21 | /** | 21 | /** |
22 | * @author Florian Dold | 22 | * @author Florian Dold |
23 | * @file set/mq.h | 23 | * @file set/mq.h |
24 | * @brief general purpose request queue | 24 | * @brief general purpose message queue |
25 | */ | 25 | */ |
26 | #ifndef GNUNET_MQ_H | 26 | #ifndef GNUNET_MQ_H |
27 | #define GNUNET_MQ_H | 27 | #define GNUNET_MQ_H |
@@ -30,7 +30,7 @@ | |||
30 | 30 | ||
31 | 31 | ||
32 | /** | 32 | /** |
33 | * Allocate a GNUNET_MQ_Message, with extra space allocated after the space needed | 33 | * Allocate an envelope, with extra space allocated after the space needed |
34 | * by the message struct. | 34 | * by the message struct. |
35 | * The allocated message will already have the type and size field set. | 35 | * The allocated message will already have the type and size field set. |
36 | * | 36 | * |
@@ -43,19 +43,19 @@ | |||
43 | #define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), (esize) + sizeof *(mvar), (type)) | 43 | #define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), (esize) + sizeof *(mvar), (type)) |
44 | 44 | ||
45 | /** | 45 | /** |
46 | * Allocate a GNUNET_MQ_Message. | 46 | * Allocate a GNUNET_MQ_Envelope. |
47 | * The allocated message will already have the type and size field set. | 47 | * The contained message will already have the type and size field set. |
48 | * | 48 | * |
49 | * @param mvar variable to store the allocated message in; | 49 | * @param mvar variable to store the allocated message in; |
50 | * must have a header field | 50 | * must have a header field |
51 | * @param type type of the message | 51 | * @param type type of the message |
52 | * @return the MQ message | 52 | * @return the allocated envelope |
53 | */ | 53 | */ |
54 | #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) | 54 | #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) |
55 | 55 | ||
56 | 56 | ||
57 | /** | 57 | /** |
58 | * Allocate a GNUNET_MQ_Message, where the message only consists of a header. | 58 | * Allocate a GNUNET_MQ_Envelope, where the message only consists of a header. |
59 | * The allocated message will already have the type and size field set. | 59 | * The allocated message will already have the type and size field set. |
60 | * | 60 | * |
61 | * @param type type of the message | 61 | * @param type type of the message |
@@ -64,7 +64,7 @@ | |||
64 | 64 | ||
65 | 65 | ||
66 | /** | 66 | /** |
67 | * Allocate a GNUNET_MQ_Message, where the message only consists of a header and extra space. | 67 | * Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space. |
68 | * The allocated message will already have the type and size field set. | 68 | * The allocated message will already have the type and size field set. |
69 | * | 69 | * |
70 | * @param mh pointer that will changed to point at to the allocated message header | 70 | * @param mh pointer that will changed to point at to the allocated message header |
@@ -75,14 +75,14 @@ | |||
75 | 75 | ||
76 | 76 | ||
77 | /** | 77 | /** |
78 | * Allocate a GNUNET_MQ_Message, and append a payload message after the given | 78 | * Allocate a GNUNET_MQ_Envelope, and append a payload message after the given |
79 | * message struct. | 79 | * message struct. |
80 | * | 80 | * |
81 | * @param mvar pointer to a message struct, will be changed to point at the newly allocated message, | 81 | * @param mvar pointer to a message struct, will be changed to point at the newly allocated message, |
82 | * whose size is 'sizeof(*mvar) + ntohs (mh->size)' | 82 | * whose size is 'sizeof(*mvar) + ntohs (mh->size)' |
83 | * @param type message type of the allocated message, has no effect on the nested message | 83 | * @param type message type of the allocated message, has no effect on the nested message |
84 | * @param mh message to nest | 84 | * @param mh message to nest |
85 | * @return a newly allocated 'struct GNUNET_MQ_Message *' | 85 | * @return a newly allocated 'struct GNUNET_MQ_Envelope *' |
86 | */ | 86 | */ |
87 | #define GNUNET_MQ_msg_nested_mh(mvar, type, mh) GNUNET_MQ_msg_nested_mh_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), sizeof (*(mvar)), (type), mh) | 87 | #define GNUNET_MQ_msg_nested_mh(mvar, type, mh) GNUNET_MQ_msg_nested_mh_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), sizeof (*(mvar)), (type), mh) |
88 | 88 | ||
@@ -98,11 +98,24 @@ | |||
98 | #define GNUNET_MQ_extract_nested_mh(var) GNUNET_MQ_extract_nested_mh_ ((struct GNUNET_MessageHeader *) (var), sizeof (*(var))) | 98 | #define GNUNET_MQ_extract_nested_mh(var) GNUNET_MQ_extract_nested_mh_ ((struct GNUNET_MessageHeader *) (var), sizeof (*(var))) |
99 | 99 | ||
100 | 100 | ||
101 | /** | ||
102 | * Implementation of the GNUNET_MQ_extract_nexted_mh macro. | ||
103 | * | ||
104 | * @param mh message header to extract nested message header from | ||
105 | * @param base_size size of the message before the nested message's header appears | ||
106 | * @return pointer to the nested message, does not copy the message | ||
107 | */ | ||
101 | struct GNUNET_MessageHeader * | 108 | struct GNUNET_MessageHeader * |
102 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size); | 109 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size); |
103 | 110 | ||
104 | 111 | ||
105 | struct GNUNET_MQ_Message * | 112 | /** |
113 | * Implementation of the GNUNET_MQ_msg_nested_mh macro. | ||
114 | * | ||
115 | * @param mhp pointer to the message header pointer that will be changed to allocate at | ||
116 | * the newly allocated space for the message. | ||
117 | */ | ||
118 | struct GNUNET_MQ_Envelope * | ||
106 | GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, | 119 | GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, |
107 | const struct GNUNET_MessageHeader *nested_mh); | 120 | const struct GNUNET_MessageHeader *nested_mh); |
108 | 121 | ||
@@ -114,9 +127,15 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, | |||
114 | #define GNUNET_MQ_HANDLERS_END {NULL, 0, 0} | 127 | #define GNUNET_MQ_HANDLERS_END {NULL, 0, 0} |
115 | 128 | ||
116 | 129 | ||
117 | struct GNUNET_MQ_MessageQueue; | 130 | /** |
131 | * Opaque handle to a message queue. | ||
132 | */ | ||
133 | struct GNUNET_MQ_Handle; | ||
118 | 134 | ||
119 | struct GNUNET_MQ_Message; | 135 | /** |
136 | * Opaque handle to an envelope. | ||
137 | */ | ||
138 | struct GNUNET_MQ_Envelope; | ||
120 | 139 | ||
121 | enum GNUNET_MQ_Error | 140 | enum GNUNET_MQ_Error |
122 | { | 141 | { |
@@ -133,22 +152,45 @@ enum GNUNET_MQ_Error | |||
133 | * @param msg the received message | 152 | * @param msg the received message |
134 | */ | 153 | */ |
135 | typedef void | 154 | typedef void |
136 | (*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg); | 155 | (*GNUNET_MQ_MessageCallback) (void *cls, |
156 | const struct GNUNET_MessageHeader *msg); | ||
137 | 157 | ||
138 | 158 | ||
139 | /** | 159 | /** |
140 | * Signature of functions implementing the | 160 | * Signature of functions implementing the |
141 | * sending part of a message queue | 161 | * sending functionality of a message queue. |
142 | * | 162 | * |
143 | * @param q the message queue | 163 | * @param mq the message queue |
144 | * @param m the message | 164 | * @param msg the message to send |
165 | * @param impl_state state of the implementation | ||
166 | */ | ||
167 | typedef void | ||
168 | (*GNUNET_MQ_SendImpl) (struct GNUNET_MQ_Handle *mq, | ||
169 | const struct GNUNET_MessageHeader *msg, | ||
170 | void *impl_state); | ||
171 | |||
172 | |||
173 | /** | ||
174 | * Signature of functions implementing the | ||
175 | * destruction of a message queue. | ||
176 | * Implementations must not free 'mq', but should | ||
177 | * take care of 'impl_state'. | ||
178 | * | ||
179 | * @param mq the message queue to destroy | ||
180 | * @param impl_state state of the implementation | ||
145 | */ | 181 | */ |
146 | typedef void | 182 | typedef void |
147 | (*GNUNET_MQ_SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m); | 183 | (*GNUNET_MQ_DestroyImpl) (struct GNUNET_MQ_Handle *mq, void *impl_state); |
148 | 184 | ||
149 | 185 | ||
186 | /** | ||
187 | * Implementation function that cancels the currently sent message. | ||
188 | * | ||
189 | * @param mq message queue | ||
190 | * @param impl_state state specific to the implementation | ||
191 | */ | ||
150 | typedef void | 192 | typedef void |
151 | (*GNUNET_MQ_DestroyImpl) (struct GNUNET_MQ_MessageQueue *q); | 193 | (*GNUNET_MQ_CancelImpl) (struct GNUNET_MQ_Handle *mq, void *impl_state); |
152 | 194 | ||
153 | 195 | ||
154 | /** | 196 | /** |
@@ -160,117 +202,23 @@ typedef void | |||
160 | (*GNUNET_MQ_NotifyCallback) (void *cls); | 202 | (*GNUNET_MQ_NotifyCallback) (void *cls); |
161 | 203 | ||
162 | 204 | ||
163 | typedef void | ||
164 | (*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error); | ||
165 | |||
166 | |||
167 | struct GNUNET_MQ_Message | ||
168 | { | ||
169 | /** | ||
170 | * Messages are stored in a linked list | ||
171 | */ | ||
172 | struct GNUNET_MQ_Message *next; | ||
173 | |||
174 | /** | ||
175 | * Messages are stored in a linked list | ||
176 | */ | ||
177 | struct GNUNET_MQ_Message *prev; | ||
178 | |||
179 | /** | ||
180 | * Actual allocated message header, | ||
181 | * usually points to the end of the containing GNUNET_MQ_Message | ||
182 | */ | ||
183 | struct GNUNET_MessageHeader *mh; | ||
184 | |||
185 | /** | ||
186 | * Queue the message is queued in, NULL if message is not queued. | ||
187 | */ | ||
188 | struct GNUNET_MQ_MessageQueue *parent_queue; | ||
189 | |||
190 | /** | ||
191 | * Called after the message was sent irrevokably | ||
192 | */ | ||
193 | GNUNET_MQ_NotifyCallback sent_cb; | ||
194 | |||
195 | /** | ||
196 | * Closure for send_cb | ||
197 | */ | ||
198 | void *sent_cls; | ||
199 | }; | ||
200 | |||
201 | |||
202 | /** | 205 | /** |
203 | * Handle to a message queue. | 206 | * Generic error handler, called with the appropriate |
207 | * error code and the same closure specified at the creation of | ||
208 | * the message queue. | ||
209 | * Not every message queue implementation supports an error handler. | ||
210 | * | ||
211 | * @param cls closure, same closure as for the message handlers | ||
212 | * @param error error code | ||
204 | */ | 213 | */ |
205 | struct GNUNET_MQ_MessageQueue | 214 | typedef void |
206 | { | 215 | (*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error); |
207 | /** | ||
208 | * Handlers array, or NULL if the queue should not receive messages | ||
209 | */ | ||
210 | const struct GNUNET_MQ_Handler *handlers; | ||
211 | |||
212 | /** | ||
213 | * Closure for the handler callbacks, | ||
214 | * as well as for the error handler. | ||
215 | */ | ||
216 | void *handlers_cls; | ||
217 | |||
218 | /** | ||
219 | * Actual implementation of message sending, | ||
220 | * called when a message is added | ||
221 | */ | ||
222 | GNUNET_MQ_SendImpl send_impl; | ||
223 | |||
224 | /** | ||
225 | * Implementation-dependent queue destruction function | ||
226 | */ | ||
227 | GNUNET_MQ_DestroyImpl destroy_impl; | ||
228 | |||
229 | /** | ||
230 | * Implementation-specific state | ||
231 | */ | ||
232 | void *impl_state; | ||
233 | |||
234 | /** | ||
235 | * Callback will be called when an error occurs. | ||
236 | */ | ||
237 | GNUNET_MQ_ErrorHandler error_handler; | ||
238 | |||
239 | /** | ||
240 | * Linked list of messages pending to be sent | ||
241 | */ | ||
242 | struct GNUNET_MQ_Message *msg_head; | ||
243 | |||
244 | /** | ||
245 | * Linked list of messages pending to be sent | ||
246 | */ | ||
247 | struct GNUNET_MQ_Message *msg_tail; | ||
248 | |||
249 | /** | ||
250 | * Message that is currently scheduled to be | ||
251 | * sent. Not the head of the message queue, as the implementation | ||
252 | * needs to know if sending has been already scheduled or not. | ||
253 | */ | ||
254 | struct GNUNET_MQ_Message *current_msg; | ||
255 | |||
256 | /** | ||
257 | * Map of associations, lazily allocated | ||
258 | */ | ||
259 | struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; | ||
260 | |||
261 | /** | ||
262 | * Next id that should be used for the assoc_map, | ||
263 | * initialized lazily to a random value together with | ||
264 | * assoc_map | ||
265 | */ | ||
266 | uint32_t assoc_id; | ||
267 | }; | ||
268 | 216 | ||
269 | 217 | ||
270 | /** | 218 | /** |
271 | * Message handler for a specific message type. | 219 | * Message handler for a specific message type. |
272 | */ | 220 | */ |
273 | struct GNUNET_MQ_Handler | 221 | struct GNUNET_MQ_MessageHandler |
274 | { | 222 | { |
275 | /** | 223 | /** |
276 | * Callback, called every time a new message of | 224 | * Callback, called every time a new message of |
@@ -296,14 +244,14 @@ struct GNUNET_MQ_Handler | |||
296 | 244 | ||
297 | 245 | ||
298 | /** | 246 | /** |
299 | * Create a new message for MQ. | 247 | * Create a new envelope. |
300 | * | 248 | * |
301 | * @param mhp message header to store the allocated message header in, can be NULL | 249 | * @param mhp message header to store the allocated message header in, can be NULL |
302 | * @param size size of the message to allocate | 250 | * @param size size of the message to allocate |
303 | * @param type type of the message, will be set in the allocated message | 251 | * @param type type of the message, will be set in the allocated message |
304 | * @return the allocated MQ message | 252 | * @return the allocated MQ message |
305 | */ | 253 | */ |
306 | struct GNUNET_MQ_Message * | 254 | struct GNUNET_MQ_Envelope * |
307 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type); | 255 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type); |
308 | 256 | ||
309 | 257 | ||
@@ -315,7 +263,7 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | |||
315 | * @param mqm the message to discard | 263 | * @param mqm the message to discard |
316 | */ | 264 | */ |
317 | void | 265 | void |
318 | GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm); | 266 | GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm); |
319 | 267 | ||
320 | 268 | ||
321 | /** | 269 | /** |
@@ -326,7 +274,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm); | |||
326 | * @param mqm the message to send. | 274 | * @param mqm the message to send. |
327 | */ | 275 | */ |
328 | void | 276 | void |
329 | GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm); | 277 | GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev); |
330 | 278 | ||
331 | 279 | ||
332 | /** | 280 | /** |
@@ -336,7 +284,7 @@ GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm | |||
336 | * @param mqm queued message to cancel | 284 | * @param mqm queued message to cancel |
337 | */ | 285 | */ |
338 | void | 286 | void |
339 | GNUNET_MQ_send_cancel (struct GNUNET_MQ_Message *mqm); | 287 | GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev); |
340 | 288 | ||
341 | 289 | ||
342 | /** | 290 | /** |
@@ -347,9 +295,7 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Message *mqm); | |||
347 | * @param assoc_data to associate | 295 | * @param assoc_data to associate |
348 | */ | 296 | */ |
349 | uint32_t | 297 | uint32_t |
350 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, | 298 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data); |
351 | struct GNUNET_MQ_Message *mqm, | ||
352 | void *assoc_data); | ||
353 | 299 | ||
354 | /** | 300 | /** |
355 | * Get the data associated with a request id in a queue | 301 | * Get the data associated with a request id in a queue |
@@ -359,7 +305,7 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, | |||
359 | * @return the associated data | 305 | * @return the associated data |
360 | */ | 306 | */ |
361 | void * | 307 | void * |
362 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); | 308 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id); |
363 | 309 | ||
364 | 310 | ||
365 | /** | 311 | /** |
@@ -370,7 +316,7 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); | |||
370 | * @return the associated data | 316 | * @return the associated data |
371 | */ | 317 | */ |
372 | void * | 318 | void * |
373 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); | 319 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id); |
374 | 320 | ||
375 | 321 | ||
376 | 322 | ||
@@ -383,9 +329,9 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); | |||
383 | * @param cls closure for the handlers | 329 | * @param cls closure for the handlers |
384 | * @return the message queue | 330 | * @return the message queue |
385 | */ | 331 | */ |
386 | struct GNUNET_MQ_MessageQueue * | 332 | struct GNUNET_MQ_Handle * |
387 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, | 333 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, |
388 | const struct GNUNET_MQ_Handler *handlers, | 334 | const struct GNUNET_MQ_MessageHandler *handlers, |
389 | void *cls); | 335 | void *cls); |
390 | 336 | ||
391 | 337 | ||
@@ -395,7 +341,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti | |||
395 | * @param client the client | 341 | * @param client the client |
396 | * @return the message queue | 342 | * @return the message queue |
397 | */ | 343 | */ |
398 | struct GNUNET_MQ_MessageQueue * | 344 | struct GNUNET_MQ_Handle * |
399 | GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client); | 345 | GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client); |
400 | 346 | ||
401 | 347 | ||
@@ -404,16 +350,19 @@ GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client); | |||
404 | * | 350 | * |
405 | * @param send function the implements sending messages | 351 | * @param send function the implements sending messages |
406 | * @param destroy function that implements destroying the queue | 352 | * @param destroy function that implements destroying the queue |
353 | * @param destroy function that implements canceling a message | ||
407 | * @param state for the queue, passed to 'send' and 'destroy' | 354 | * @param state for the queue, passed to 'send' and 'destroy' |
408 | * @param handlers array of message handlers | 355 | * @param handlers array of message handlers |
409 | * @param error_handler handler for read and write errors | 356 | * @param error_handler handler for read and write errors |
357 | * @param cls closure for handlers | ||
410 | * @return a new message queue | 358 | * @return a new message queue |
411 | */ | 359 | */ |
412 | struct GNUNET_MQ_MessageQueue * | 360 | struct GNUNET_MQ_Handle * |
413 | GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, | 361 | GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, |
414 | GNUNET_MQ_DestroyImpl destroy, | 362 | GNUNET_MQ_DestroyImpl destroy, |
363 | GNUNET_MQ_CancelImpl cancel, | ||
415 | void *impl_state, | 364 | void *impl_state, |
416 | struct GNUNET_MQ_Handler *handlers, | 365 | const struct GNUNET_MQ_MessageHandler *handlers, |
417 | GNUNET_MQ_ErrorHandler error_handler, | 366 | GNUNET_MQ_ErrorHandler error_handler, |
418 | void *cls); | 367 | void *cls); |
419 | 368 | ||
@@ -424,27 +373,30 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, | |||
424 | * Takes effect immediately, even for messages that already have been received, but for | 373 | * Takes effect immediately, even for messages that already have been received, but for |
425 | * with the handler has not been called. | 374 | * with the handler has not been called. |
426 | * | 375 | * |
376 | * If the message queue does not support receiving messages, | ||
377 | * this function has no effect. | ||
378 | * | ||
427 | * @param mq message queue | 379 | * @param mq message queue |
428 | * @param new_handlers new handlers | 380 | * @param new_handlers new handlers |
429 | * @param cls new closure for the handlers | 381 | * @param cls new closure for the handlers |
430 | */ | 382 | */ |
431 | void | 383 | void |
432 | GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, | 384 | GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq, |
433 | const struct GNUNET_MQ_Handler *new_handlers, | 385 | const struct GNUNET_MQ_MessageHandler *new_handlers, |
434 | void *cls); | 386 | void *cls); |
435 | 387 | ||
436 | 388 | ||
437 | /** | 389 | /** |
438 | * Call a callback once the message has been sent, that is, the message | 390 | * Call a callback once the envelope has been sent, that is, |
439 | * can not be canceled anymore. | 391 | * sending it can not be canceled anymore. |
440 | * There can be only one notify sent callback per message. | 392 | * There can be only one notify sent callback per envelope. |
441 | * | 393 | * |
442 | * @param mqm message to call the notify callback for | 394 | * @param ev message to call the notify callback for |
443 | * @param cb the notify callback | 395 | * @param cb the notify callback |
444 | * @param cls closure for the callback | 396 | * @param cls closure for the callback |
445 | */ | 397 | */ |
446 | void | 398 | void |
447 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, | 399 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev, |
448 | GNUNET_MQ_NotifyCallback cb, | 400 | GNUNET_MQ_NotifyCallback cb, |
449 | void *cls); | 401 | void *cls); |
450 | 402 | ||
@@ -455,7 +407,7 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, | |||
455 | * @param mq message queue to destroy | 407 | * @param mq message queue to destroy |
456 | */ | 408 | */ |
457 | void | 409 | void |
458 | GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq); | 410 | GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq); |
459 | 411 | ||
460 | 412 | ||
461 | /** | 413 | /** |
@@ -465,7 +417,70 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq); | |||
465 | * @param mh message to dispatch | 417 | * @param mh message to dispatch |
466 | */ | 418 | */ |
467 | void | 419 | void |
468 | GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, | 420 | GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, |
469 | const struct GNUNET_MessageHeader *mh); | 421 | const struct GNUNET_MessageHeader *mh); |
422 | |||
423 | |||
424 | /** | ||
425 | * Call the right callback for an error condition. | ||
426 | * | ||
427 | * @param mq message queue | ||
428 | */ | ||
429 | void | ||
430 | GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, | ||
431 | enum GNUNET_MQ_Error error); | ||
432 | |||
433 | |||
434 | /** | ||
435 | * Call the send implementation for the next queued message, | ||
436 | * if any. | ||
437 | * Only useful for implementing message queues, | ||
438 | * results in undefined behavior if not used carefully. | ||
439 | * | ||
440 | * @param mq message queue to send the next message with | ||
441 | */ | ||
442 | void | ||
443 | GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq); | ||
444 | |||
445 | |||
446 | /** | ||
447 | * Get the message that should currently be sent. | ||
448 | * Fails if there is no current message. | ||
449 | * Only useful for implementing message queues, | ||
450 | * results in undefined behavior if not used carefully. | ||
451 | * | ||
452 | * @param mq message queue with the current message | ||
453 | * @return message to send, never NULL | ||
454 | */ | ||
455 | const struct GNUNET_MessageHeader * | ||
456 | GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq); | ||
457 | |||
458 | |||
459 | /** | ||
460 | * Get the implementation state associated with the | ||
461 | * message queue. | ||
462 | * | ||
463 | * While the GNUNET_MQ_Impl* callbacks receive the | ||
464 | * implementation state, continuations that are scheduled | ||
465 | * by the implementation function often only have one closure | ||
466 | * argument, with this function it is possible to get at the | ||
467 | * implementation state when only passing the GNUNET_MQ_Handle | ||
468 | * as closure. | ||
469 | * | ||
470 | * @param mq message queue with the current message | ||
471 | * @return message to send, never NULL | ||
472 | */ | ||
473 | void * | ||
474 | GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq); | ||
475 | |||
476 | /** | ||
477 | * Mark the current message as irrevocably sent, but do not | ||
478 | * proceed with sending the next message. | ||
479 | * Will call the appropriate GNUNET_MQ_NotifyCallback, if any. | ||
480 | * | ||
481 | * @param mq message queue | ||
482 | */ | ||
483 | void | ||
484 | GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq); | ||
470 | 485 | ||
471 | #endif | 486 | #endif |
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h index 34c9312d1..e08ed5d69 100644 --- a/src/include/gnunet_set_service.h +++ b/src/include/gnunet_set_service.h | |||
@@ -257,9 +257,9 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set); | |||
257 | 257 | ||
258 | 258 | ||
259 | /** | 259 | /** |
260 | * Create a set operation for evaluation with another peer. | 260 | * Prepare a set operation to be evaluated with another peer. |
261 | * The evaluation will not start until the client provides | 261 | * The evaluation will not start until the client provides |
262 | * a local set with GNUNET_SET_conclude. | 262 | * a local set with GNUNET_SET_commit. |
263 | * | 263 | * |
264 | * @param other_peer peer with the other set | 264 | * @param other_peer peer with the other set |
265 | * @param app_id hash for the application using the set | 265 | * @param app_id hash for the application using the set |
@@ -273,14 +273,14 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set); | |||
273 | * @param result_cls closure for result_cb | 273 | * @param result_cls closure for result_cb |
274 | * @return a handle to cancel the operation | 274 | * @return a handle to cancel the operation |
275 | */ | 275 | */ |
276 | struct GNUNET_SET_OperationHandle * // FIXME: rename to _connect? | 276 | struct GNUNET_SET_OperationHandle * |
277 | GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer, | 277 | GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer, |
278 | const struct GNUNET_HashCode *app_id, | 278 | const struct GNUNET_HashCode *app_id, |
279 | const struct GNUNET_MessageHeader *context_msg, | 279 | const struct GNUNET_MessageHeader *context_msg, |
280 | uint16_t salt, | 280 | uint16_t salt, |
281 | enum GNUNET_SET_ResultMode result_mode, | 281 | enum GNUNET_SET_ResultMode result_mode, |
282 | GNUNET_SET_ResultIterator result_cb, | 282 | GNUNET_SET_ResultIterator result_cb, |
283 | void *result_cls); | 283 | void *result_cls); |
284 | 284 | ||
285 | 285 | ||
286 | /** | 286 | /** |
@@ -316,7 +316,7 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh); | |||
316 | * Accept a request we got via GNUNET_SET_listen. Must be called during | 316 | * Accept a request we got via GNUNET_SET_listen. Must be called during |
317 | * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid | 317 | * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid |
318 | * afterwards. | 318 | * afterwards. |
319 | * Call GNUNET_SET_conclude to provide the local set to use for the operation, | 319 | * Call GNUNET_SET_commit to provide the local set to use for the operation, |
320 | * and to begin the exchange with the remote peer. | 320 | * and to begin the exchange with the remote peer. |
321 | * | 321 | * |
322 | * @param request request to accept | 322 | * @param request request to accept |
@@ -334,7 +334,7 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, | |||
334 | 334 | ||
335 | 335 | ||
336 | /** | 336 | /** |
337 | * Conclude the given set operation using the given set. | 337 | * Commit a set to be used with a set operation. |
338 | * This function is called once we have fully constructed | 338 | * This function is called once we have fully constructed |
339 | * the set that we want to use for the operation. At this | 339 | * the set that we want to use for the operation. At this |
340 | * time, the P2P protocol can then begin to exchange the | 340 | * time, the P2P protocol can then begin to exchange the |
@@ -344,9 +344,9 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, | |||
344 | * @param oh handle to the set operation | 344 | * @param oh handle to the set operation |
345 | * @param set the set to use for the operation | 345 | * @param set the set to use for the operation |
346 | */ | 346 | */ |
347 | void // FIXME: rename to _commit | 347 | void |
348 | GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh, | 348 | GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh, |
349 | struct GNUNET_SET_Handle *set); | 349 | struct GNUNET_SET_Handle *set); |
350 | 350 | ||
351 | 351 | ||
352 | /** | 352 | /** |
diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index ece60c033..65e247ece 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h | |||
@@ -403,9 +403,9 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh); | |||
403 | * @param error_handler callback for errors | 403 | * @param error_handler callback for errors |
404 | * @return the message queue for the socket | 404 | * @return the message queue for the socket |
405 | */ | 405 | */ |
406 | struct GNUNET_MQ_MessageQueue * | 406 | struct GNUNET_MQ_Handle * |
407 | GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, | 407 | GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, |
408 | const struct GNUNET_MQ_Handler *msg_handlers, | 408 | const struct GNUNET_MQ_MessageHandler *msg_handlers, |
409 | GNUNET_MQ_ErrorHandler error_handler, | 409 | GNUNET_MQ_ErrorHandler error_handler, |
410 | void *cls); | 410 | void *cls); |
411 | 411 | ||
diff --git a/src/mesh/mesh2_api.c b/src/mesh/mesh2_api.c index 0e4c3b8e6..11ff743ca 100644 --- a/src/mesh/mesh2_api.c +++ b/src/mesh/mesh2_api.c | |||
@@ -320,6 +320,24 @@ struct GNUNET_MESH_Tunnel | |||
320 | }; | 320 | }; |
321 | 321 | ||
322 | 322 | ||
323 | /** | ||
324 | * Implementation state for mesh's message queue. | ||
325 | */ | ||
326 | struct MeshMQState | ||
327 | { | ||
328 | /** | ||
329 | * The current transmit handle, or NULL | ||
330 | * if no transmit is active. | ||
331 | */ | ||
332 | struct GNUNET_MESH_TransmitHandle *th; | ||
333 | |||
334 | /** | ||
335 | * Tunnel to send the data over. | ||
336 | */ | ||
337 | struct GNUNET_MESH_Tunnel *tunnel; | ||
338 | }; | ||
339 | |||
340 | |||
323 | /******************************************************************************/ | 341 | /******************************************************************************/ |
324 | /*********************** DECLARATIONS *************************/ | 342 | /*********************** DECLARATIONS *************************/ |
325 | /******************************************************************************/ | 343 | /******************************************************************************/ |
@@ -1685,4 +1703,115 @@ GNUNET_MESH_show_tunnel (struct GNUNET_MESH_Handle *h, | |||
1685 | h->tunnel_cls = callback_cls; | 1703 | h->tunnel_cls = callback_cls; |
1686 | 1704 | ||
1687 | return; | 1705 | return; |
1688 | } \ No newline at end of file | 1706 | } |
1707 | |||
1708 | |||
1709 | /** | ||
1710 | * Function called to notify a client about the connection | ||
1711 | * begin ready to queue more data. "buf" will be | ||
1712 | * NULL and "size" zero if the connection was closed for | ||
1713 | * writing in the meantime. | ||
1714 | * | ||
1715 | * @param cls closure | ||
1716 | * @param size number of bytes available in buf | ||
1717 | * @param buf where the callee should write the message | ||
1718 | * @return number of bytes written to buf | ||
1719 | */ | ||
1720 | static size_t | ||
1721 | mesh_mq_ntr (void *cls, size_t size, | ||
1722 | void *buf) | ||
1723 | { | ||
1724 | struct GNUNET_MQ_Handle *mq = cls; | ||
1725 | struct MeshMQState *state = GNUNET_MQ_impl_state (mq); | ||
1726 | const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq); | ||
1727 | uint16_t msize; | ||
1728 | |||
1729 | state->th = NULL; | ||
1730 | if (NULL == buf) | ||
1731 | { | ||
1732 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE); | ||
1733 | return 0; | ||
1734 | } | ||
1735 | msize = ntohs (msg->size); | ||
1736 | GNUNET_assert (msize <= size); | ||
1737 | memcpy (buf, msg, msize); | ||
1738 | GNUNET_MQ_impl_send_continue (mq); | ||
1739 | return msize; | ||
1740 | } | ||
1741 | |||
1742 | |||
1743 | /** | ||
1744 | * Signature of functions implementing the | ||
1745 | * sending functionality of a message queue. | ||
1746 | * | ||
1747 | * @param mq the message queue | ||
1748 | * @param msg the message to send | ||
1749 | * @param impl_state state of the implementation | ||
1750 | */ | ||
1751 | static void | ||
1752 | mesh_mq_send_impl (struct GNUNET_MQ_Handle *mq, | ||
1753 | const struct GNUNET_MessageHeader *msg, void *impl_state) | ||
1754 | { | ||
1755 | struct MeshMQState *state = impl_state; | ||
1756 | |||
1757 | GNUNET_assert (NULL == state->th); | ||
1758 | GNUNET_MQ_impl_send_commit (mq); | ||
1759 | state->th = | ||
1760 | GNUNET_MESH_notify_transmit_ready (state->tunnel, | ||
1761 | /* FIXME: add option for corking */ | ||
1762 | GNUNET_NO, | ||
1763 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1764 | ntohs (msg->size), | ||
1765 | mesh_mq_ntr, mq); | ||
1766 | |||
1767 | } | ||
1768 | |||
1769 | |||
1770 | /** | ||
1771 | * Signature of functions implementing the | ||
1772 | * destruction of a message queue. | ||
1773 | * Implementations must not free 'mq', but should | ||
1774 | * take care of 'impl_state'. | ||
1775 | * | ||
1776 | * @param mq the message queue to destroy | ||
1777 | * @param impl_state state of the implementation | ||
1778 | */ | ||
1779 | static void | ||
1780 | mesh_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) | ||
1781 | { | ||
1782 | struct MeshMQState *state = impl_state; | ||
1783 | |||
1784 | if (NULL != state->th) | ||
1785 | GNUNET_MESH_notify_transmit_ready_cancel (state->th); | ||
1786 | |||
1787 | GNUNET_free (state); | ||
1788 | } | ||
1789 | |||
1790 | |||
1791 | /** | ||
1792 | * Create a message queue for a mesh tunnel. | ||
1793 | * The message queue can only be used to transmit messages, | ||
1794 | * not to receive them. | ||
1795 | * | ||
1796 | * @param tunnel the tunnel to create the message qeue for | ||
1797 | * @return a message queue to messages over the tunnel | ||
1798 | */ | ||
1799 | struct GNUNET_MQ_Handle * | ||
1800 | GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel) | ||
1801 | { | ||
1802 | struct GNUNET_MQ_Handle *mq; | ||
1803 | struct MeshMQState *state; | ||
1804 | |||
1805 | state = GNUNET_new (struct MeshMQState); | ||
1806 | state->tunnel = tunnel; | ||
1807 | |||
1808 | mq = GNUNET_MQ_queue_for_callbacks (mesh_mq_send_impl, | ||
1809 | mesh_mq_destroy_impl, | ||
1810 | NULL, /* FIXME: cancel impl. */ | ||
1811 | state, | ||
1812 | NULL, /* no msg handlers */ | ||
1813 | NULL, /* no err handlers */ | ||
1814 | NULL); /* no handler cls */ | ||
1815 | return mq; | ||
1816 | } | ||
1817 | |||
diff --git a/src/set/Makefile.am b/src/set/Makefile.am index 71e71c867..c2449e0ea 100644 --- a/src/set/Makefile.am +++ b/src/set/Makefile.am | |||
@@ -16,7 +16,7 @@ if USE_COVERAGE | |||
16 | endif | 16 | endif |
17 | 17 | ||
18 | bin_PROGRAMS = \ | 18 | bin_PROGRAMS = \ |
19 | gnunet-set | 19 | gnunet-set-profiler gnunet-set-ibf-profiler |
20 | 20 | ||
21 | libexec_PROGRAMS = \ | 21 | libexec_PROGRAMS = \ |
22 | gnunet-service-set | 22 | gnunet-service-set |
@@ -24,17 +24,24 @@ libexec_PROGRAMS = \ | |||
24 | lib_LTLIBRARIES = \ | 24 | lib_LTLIBRARIES = \ |
25 | libgnunetset.la | 25 | libgnunetset.la |
26 | 26 | ||
27 | gnunet_set_SOURCES = \ | 27 | gnunet_set_profiler_SOURCES = \ |
28 | gnunet-set.c | 28 | gnunet-set-profiler.c |
29 | gnunet_set_LDADD = \ | 29 | gnunet_set_profiler_LDADD = \ |
30 | $(top_builddir)/src/util/libgnunetutil.la \ | 30 | $(top_builddir)/src/util/libgnunetutil.la \ |
31 | $(top_builddir)/src/set/libgnunetset.la \ | 31 | $(top_builddir)/src/set/libgnunetset.la \ |
32 | $(top_builddir)/src/stream/libgnunetstream.la \ | 32 | $(top_builddir)/src/testing/libgnunettesting.la \ |
33 | $(top_builddir)/src/testbed/libgnunettestbed.la \ | ||
34 | $(GN_LIBINTL) | 33 | $(GN_LIBINTL) |
35 | gnunet_set_DEPENDENCIES = \ | 34 | gnunet_set_profiler_DEPENDENCIES = \ |
36 | libgnunetset.la | 35 | libgnunetset.la |
37 | 36 | ||
37 | |||
38 | gnunet_set_ibf_profiler_SOURCES = \ | ||
39 | gnunet-set-ibf-profiler.c \ | ||
40 | ibf.c | ||
41 | gnunet_set_ibf_profiler_LDADD = \ | ||
42 | $(top_builddir)/src/util/libgnunetutil.la \ | ||
43 | $(GN_LIBINTL) | ||
44 | |||
38 | gnunet_service_set_SOURCES = \ | 45 | gnunet_service_set_SOURCES = \ |
39 | gnunet-service-set.c \ | 46 | gnunet-service-set.c \ |
40 | gnunet-service-set_union.c \ | 47 | gnunet-service-set_union.c \ |
@@ -43,8 +50,7 @@ gnunet_service_set_SOURCES = \ | |||
43 | gnunet_service_set_LDADD = \ | 50 | gnunet_service_set_LDADD = \ |
44 | $(top_builddir)/src/util/libgnunetutil.la \ | 51 | $(top_builddir)/src/util/libgnunetutil.la \ |
45 | $(top_builddir)/src/core/libgnunetcore.la \ | 52 | $(top_builddir)/src/core/libgnunetcore.la \ |
46 | $(top_builddir)/src/stream/libgnunetstream.la \ | 53 | $(top_builddir)/src/mesh/libgnunetmesh2.la \ |
47 | $(top_builddir)/src/mesh/libgnunetmesh.la \ | ||
48 | $(GN_LIBINTL) | 54 | $(GN_LIBINTL) |
49 | 55 | ||
50 | libgnunetset_la_SOURCES = \ | 56 | libgnunetset_la_SOURCES = \ |
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index d2f0b48d5..bd934de84 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -29,13 +29,16 @@ | |||
29 | 29 | ||
30 | /** | 30 | /** |
31 | * Configuration of our local peer. | 31 | * Configuration of our local peer. |
32 | * (Not declared 'static' as also needed in gnunet-service-set_union.c) | ||
32 | */ | 33 | */ |
33 | const struct GNUNET_CONFIGURATION_Handle *configuration; | 34 | const struct GNUNET_CONFIGURATION_Handle *configuration; |
34 | 35 | ||
35 | /** | 36 | /** |
36 | * Socket listening for other peers via stream. | 37 | * Handle to the mesh service, used |
38 | * to listen for and connect to remote peers. | ||
39 | * (Not declared 'static' as also needed in gnunet-service-set_union.c) | ||
37 | */ | 40 | */ |
38 | static struct GNUNET_STREAM_ListenSocket *stream_listen_socket; | 41 | struct GNUNET_MESH_Handle *mesh; |
39 | 42 | ||
40 | /** | 43 | /** |
41 | * Sets are held in a doubly linked list. | 44 | * Sets are held in a doubly linked list. |
@@ -78,14 +81,14 @@ static uint32_t accept_id = 1; | |||
78 | 81 | ||
79 | 82 | ||
80 | /** | 83 | /** |
81 | * Get set that is owned by the client, if any. | 84 | * Get set that is owned by the given client, if any. |
82 | * | 85 | * |
83 | * @param client client to look for | 86 | * @param client client to look for |
84 | * @return set that the client owns, NULL if the client | 87 | * @return set that the client owns, NULL if the client |
85 | * does not own a set | 88 | * does not own a set |
86 | */ | 89 | */ |
87 | static struct Set * | 90 | static struct Set * |
88 | get_set (struct GNUNET_SERVER_Client *client) | 91 | set_get (struct GNUNET_SERVER_Client *client) |
89 | { | 92 | { |
90 | struct Set *set; | 93 | struct Set *set; |
91 | for (set = sets_head; NULL != set; set = set->next) | 94 | for (set = sets_head; NULL != set; set = set->next) |
@@ -137,7 +140,7 @@ get_incoming (uint32_t id) | |||
137 | * @param listener listener to destroy | 140 | * @param listener listener to destroy |
138 | */ | 141 | */ |
139 | static void | 142 | static void |
140 | destroy_listener (struct Listener *listener) | 143 | listener_destroy (struct Listener *listener) |
141 | { | 144 | { |
142 | if (NULL != listener->client_mq) | 145 | if (NULL != listener->client_mq) |
143 | { | 146 | { |
@@ -155,7 +158,7 @@ destroy_listener (struct Listener *listener) | |||
155 | * @param set the set to destroy | 158 | * @param set the set to destroy |
156 | */ | 159 | */ |
157 | static void | 160 | static void |
158 | destroy_set (struct Set *set) | 161 | set_destroy (struct Set *set) |
159 | { | 162 | { |
160 | switch (set->operation) | 163 | switch (set->operation) |
161 | { | 164 | { |
@@ -187,12 +190,12 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
187 | struct Set *set; | 190 | struct Set *set; |
188 | struct Listener *listener; | 191 | struct Listener *listener; |
189 | 192 | ||
190 | set = get_set (client); | 193 | set = set_get (client); |
191 | if (NULL != set) | 194 | if (NULL != set) |
192 | destroy_set (set); | 195 | set_destroy (set); |
193 | listener = get_listener (client); | 196 | listener = get_listener (client); |
194 | if (NULL != listener) | 197 | if (NULL != listener) |
195 | destroy_listener (listener); | 198 | listener_destroy (listener); |
196 | } | 199 | } |
197 | 200 | ||
198 | 201 | ||
@@ -202,17 +205,14 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
202 | * @param incoming remote request to destroy | 205 | * @param incoming remote request to destroy |
203 | */ | 206 | */ |
204 | static void | 207 | static void |
205 | destroy_incoming (struct Incoming *incoming) | 208 | incoming_destroy (struct Incoming *incoming) |
206 | { | 209 | { |
207 | if (NULL != incoming->mq) | 210 | if (NULL != incoming->tc) |
208 | { | 211 | { |
209 | GNUNET_MQ_destroy (incoming->mq); | 212 | GNUNET_free (incoming->tc); |
210 | incoming->mq = NULL; | 213 | GNUNET_assert (NULL != incoming->tc->tunnel); |
211 | } | 214 | GNUNET_MESH_tunnel_destroy (incoming->tc->tunnel); |
212 | if (NULL != incoming->socket) | 215 | incoming->tc = NULL; |
213 | { | ||
214 | GNUNET_STREAM_close (incoming->socket); | ||
215 | incoming->socket = NULL; | ||
216 | } | 216 | } |
217 | GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); | 217 | GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); |
218 | GNUNET_free (incoming); | 218 | GNUNET_free (incoming); |
@@ -237,6 +237,15 @@ get_listener_by_target (enum GNUNET_SET_OperationType op, | |||
237 | } | 237 | } |
238 | 238 | ||
239 | 239 | ||
240 | |||
241 | static void | ||
242 | tunnel_context_destroy (struct TunnelContext *tc) | ||
243 | { | ||
244 | GNUNET_free (tc); | ||
245 | /* FIXME destroy the rest */ | ||
246 | } | ||
247 | |||
248 | |||
240 | /** | 249 | /** |
241 | * Handle a request for a set operation from | 250 | * Handle a request for a set operation from |
242 | * another peer. | 251 | * another peer. |
@@ -244,16 +253,31 @@ get_listener_by_target (enum GNUNET_SET_OperationType op, | |||
244 | * @param cls the incoming socket | 253 | * @param cls the incoming socket |
245 | * @param mh the message | 254 | * @param mh the message |
246 | */ | 255 | */ |
247 | static void | 256 | static int |
248 | handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) | 257 | handle_p2p_operation_request (void *cls, |
258 | struct GNUNET_MESH_Tunnel *tunnel, | ||
259 | void **tunnel_ctx, | ||
260 | const struct GNUNET_PeerIdentity *sender, | ||
261 | const struct GNUNET_MessageHeader *mh) | ||
249 | { | 262 | { |
250 | struct Incoming *incoming = cls; | 263 | struct TunnelContext *tc = *tunnel_ctx; |
264 | struct Incoming *incoming; | ||
251 | const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; | 265 | const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; |
252 | struct GNUNET_MQ_Message *mqm; | 266 | struct GNUNET_MQ_Envelope *mqm; |
253 | struct GNUNET_SET_RequestMessage *cmsg; | 267 | struct GNUNET_SET_RequestMessage *cmsg; |
254 | struct Listener *listener; | 268 | struct Listener *listener; |
255 | const struct GNUNET_MessageHeader *context_msg; | 269 | const struct GNUNET_MessageHeader *context_msg; |
256 | 270 | ||
271 | if (CONTEXT_INCOMING != tc->type) | ||
272 | { | ||
273 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected operation request\n"); | ||
274 | tunnel_context_destroy (tc); | ||
275 | /* don't kill the whole mesh connection */ | ||
276 | return GNUNET_OK; | ||
277 | } | ||
278 | |||
279 | incoming = tc->data; | ||
280 | |||
257 | context_msg = GNUNET_MQ_extract_nested_mh (msg); | 281 | context_msg = GNUNET_MQ_extract_nested_mh (msg); |
258 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n", | 282 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n", |
259 | ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); | 283 | ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); |
@@ -263,20 +287,26 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) | |||
263 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 287 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
264 | "set operation request from peer failed: " | 288 | "set operation request from peer failed: " |
265 | "no set with matching application ID and operation type\n"); | 289 | "no set with matching application ID and operation type\n"); |
266 | return; | 290 | tunnel_context_destroy (tc); |
291 | /* don't kill the whole mesh connection */ | ||
292 | return GNUNET_OK; | ||
267 | } | 293 | } |
268 | mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg); | 294 | mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg); |
269 | if (NULL == mqm) | 295 | if (NULL == mqm) |
270 | { | 296 | { |
271 | /* FIXME: disconnect the peer */ | 297 | /* FIXME: disconnect the peer */ |
272 | GNUNET_break_op (0); | 298 | GNUNET_break_op (0); |
273 | return; | 299 | tunnel_context_destroy (tc); |
300 | /* don't kill the whole mesh connection */ | ||
301 | return GNUNET_OK; | ||
274 | } | 302 | } |
275 | incoming->accept_id = accept_id++; | 303 | incoming->accept_id = accept_id++; |
276 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id); | 304 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id); |
277 | cmsg->accept_id = htonl (incoming->accept_id); | 305 | cmsg->accept_id = htonl (incoming->accept_id); |
278 | cmsg->peer_id = incoming->peer; | 306 | cmsg->peer_id = incoming->tc->peer; |
279 | GNUNET_MQ_send (listener->client_mq, mqm); | 307 | GNUNET_MQ_send (listener->client_mq, mqm); |
308 | |||
309 | return GNUNET_OK; | ||
280 | } | 310 | } |
281 | 311 | ||
282 | 312 | ||
@@ -298,7 +328,7 @@ handle_client_create (void *cls, | |||
298 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", | 328 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", |
299 | ntohs (msg->operation)); | 329 | ntohs (msg->operation)); |
300 | 330 | ||
301 | if (NULL != get_set (client)) | 331 | if (NULL != set_get (client)) |
302 | { | 332 | { |
303 | GNUNET_break (0); | 333 | GNUNET_break (0); |
304 | GNUNET_SERVER_client_disconnect (client); | 334 | GNUNET_SERVER_client_disconnect (client); |
@@ -379,7 +409,7 @@ handle_client_remove (void *cls, | |||
379 | { | 409 | { |
380 | struct Set *set; | 410 | struct Set *set; |
381 | 411 | ||
382 | set = get_set (client); | 412 | set = set_get (client); |
383 | if (NULL == set) | 413 | if (NULL == set) |
384 | { | 414 | { |
385 | GNUNET_break (0); | 415 | GNUNET_break (0); |
@@ -428,7 +458,7 @@ handle_client_reject (void *cls, | |||
428 | return; | 458 | return; |
429 | } | 459 | } |
430 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); | 460 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); |
431 | destroy_incoming (incoming); | 461 | incoming_destroy (incoming); |
432 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 462 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
433 | } | 463 | } |
434 | 464 | ||
@@ -449,7 +479,7 @@ handle_client_add (void *cls, | |||
449 | { | 479 | { |
450 | struct Set *set; | 480 | struct Set *set; |
451 | 481 | ||
452 | set = get_set (client); | 482 | set = set_get (client); |
453 | if (NULL == set) | 483 | if (NULL == set) |
454 | { | 484 | { |
455 | GNUNET_break (0); | 485 | GNUNET_break (0); |
@@ -486,7 +516,7 @@ handle_client_evaluate (void *cls, | |||
486 | { | 516 | { |
487 | struct Set *set; | 517 | struct Set *set; |
488 | 518 | ||
489 | set = get_set (client); | 519 | set = set_get (client); |
490 | if (NULL == set) | 520 | if (NULL == set) |
491 | { | 521 | { |
492 | GNUNET_break (0); | 522 | GNUNET_break (0); |
@@ -558,8 +588,7 @@ handle_client_accept (void *cls, | |||
558 | return; | 588 | return; |
559 | } | 589 | } |
560 | 590 | ||
561 | 591 | set = set_get (client); | |
562 | set = get_set (client); | ||
563 | 592 | ||
564 | if (NULL == set) | 593 | if (NULL == set) |
565 | { | 594 | { |
@@ -584,51 +613,12 @@ handle_client_accept (void *cls, | |||
584 | 613 | ||
585 | /* note: _GSS_*_accept has to make sure the socket and mq are set to NULL, | 614 | /* note: _GSS_*_accept has to make sure the socket and mq are set to NULL, |
586 | * otherwise they will be destroyed and disconnected */ | 615 | * otherwise they will be destroyed and disconnected */ |
587 | destroy_incoming (incoming); | 616 | incoming_destroy (incoming); |
588 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 617 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
589 | } | 618 | } |
590 | 619 | ||
591 | 620 | ||
592 | /** | 621 | /** |
593 | * Functions of this type are called upon new stream connection from other peers | ||
594 | * or upon binding error which happen when the app_port given in | ||
595 | * GNUNET_STREAM_listen() is already taken. | ||
596 | * | ||
597 | * @param cls the closure from GNUNET_STREAM_listen | ||
598 | * @param socket the socket representing the stream; NULL on binding error | ||
599 | * @param initiator the identity of the peer who wants to establish a stream | ||
600 | * with us; NULL on binding error | ||
601 | * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the | ||
602 | * stream (the socket will be invalid after the call) | ||
603 | */ | ||
604 | static int | ||
605 | stream_listen_cb (void *cls, | ||
606 | struct GNUNET_STREAM_Socket *socket, | ||
607 | const struct GNUNET_PeerIdentity *initiator) | ||
608 | { | ||
609 | struct Incoming *incoming; | ||
610 | static const struct GNUNET_MQ_Handler handlers[] = { | ||
611 | {handle_p2p_operation_request, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST}, | ||
612 | GNUNET_MQ_HANDLERS_END | ||
613 | }; | ||
614 | |||
615 | if (NULL == socket) | ||
616 | { | ||
617 | GNUNET_break (0); | ||
618 | return GNUNET_SYSERR; | ||
619 | } | ||
620 | |||
621 | incoming = GNUNET_new (struct Incoming); | ||
622 | incoming->peer = *initiator; | ||
623 | incoming->socket = socket; | ||
624 | incoming->mq = GNUNET_STREAM_mq_create (incoming->socket, handlers, NULL, incoming); | ||
625 | /* FIXME: timeout for peers that only connect but don't send anything */ | ||
626 | GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); | ||
627 | return GNUNET_OK; | ||
628 | } | ||
629 | |||
630 | |||
631 | /** | ||
632 | * Called to clean up, after a shutdown has been requested. | 622 | * Called to clean up, after a shutdown has been requested. |
633 | * | 623 | * |
634 | * @param cls closure | 624 | * @param cls closure |
@@ -638,31 +628,126 @@ static void | |||
638 | shutdown_task (void *cls, | 628 | shutdown_task (void *cls, |
639 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 629 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
640 | { | 630 | { |
641 | if (NULL != stream_listen_socket) | 631 | if (NULL != mesh) |
642 | { | 632 | { |
643 | GNUNET_STREAM_listen_close (stream_listen_socket); | 633 | GNUNET_MESH_disconnect (mesh); |
644 | stream_listen_socket = NULL; | 634 | mesh = NULL; |
645 | } | 635 | } |
646 | 636 | ||
647 | while (NULL != incoming_head) | 637 | while (NULL != incoming_head) |
648 | { | 638 | { |
649 | destroy_incoming (incoming_head); | 639 | incoming_destroy (incoming_head); |
650 | } | 640 | } |
651 | 641 | ||
652 | while (NULL != listeners_head) | 642 | while (NULL != listeners_head) |
653 | { | 643 | { |
654 | destroy_listener (listeners_head); | 644 | listener_destroy (listeners_head); |
655 | } | 645 | } |
656 | 646 | ||
657 | while (NULL != sets_head) | 647 | while (NULL != sets_head) |
658 | { | 648 | { |
659 | destroy_set (sets_head); | 649 | set_destroy (sets_head); |
660 | } | 650 | } |
661 | 651 | ||
662 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); | 652 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); |
663 | } | 653 | } |
664 | 654 | ||
665 | 655 | ||
656 | |||
657 | /** | ||
658 | * Signature of the main function of a task. | ||
659 | * | ||
660 | * @param cls closure | ||
661 | * @param tc context information (why was this task triggered now) | ||
662 | */ | ||
663 | static void | ||
664 | incoming_timeout_cb (void *cls, | ||
665 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
666 | { | ||
667 | struct Incoming *incoming = cls; | ||
668 | |||
669 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "remote peer timed out"); | ||
670 | incoming_destroy (incoming); | ||
671 | } | ||
672 | |||
673 | |||
674 | /** | ||
675 | * Method called whenever another peer has added us to a tunnel | ||
676 | * the other peer initiated. | ||
677 | * Only called (once) upon reception of data with a message type which was | ||
678 | * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy | ||
679 | * causes te tunnel to be ignored and no further notifications are sent about | ||
680 | * the same tunnel. | ||
681 | * | ||
682 | * @param cls closure | ||
683 | * @param tunnel new handle to the tunnel | ||
684 | * @param initiator peer that started the tunnel | ||
685 | * @param port Port this tunnel is for. | ||
686 | * @return initial tunnel context for the tunnel | ||
687 | * (can be NULL -- that's not an error) | ||
688 | */ | ||
689 | static void * | ||
690 | tunnel_new_cb (void *cls, | ||
691 | struct GNUNET_MESH_Tunnel *tunnel, | ||
692 | const struct GNUNET_PeerIdentity *initiator, | ||
693 | uint32_t port) | ||
694 | { | ||
695 | struct Incoming *incoming; | ||
696 | struct TunnelContext *tc; | ||
697 | |||
698 | GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET); | ||
699 | tc = GNUNET_new (struct TunnelContext); | ||
700 | incoming = GNUNET_new (struct Incoming); | ||
701 | incoming->tc = tc; | ||
702 | tc->peer = *initiator; | ||
703 | tc->tunnel = tunnel; | ||
704 | tc->mq = GNUNET_MESH_mq_create (tunnel); | ||
705 | tc->data = incoming; | ||
706 | tc->type = CONTEXT_INCOMING; | ||
707 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming); | ||
708 | GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); | ||
709 | |||
710 | return tc; | ||
711 | } | ||
712 | |||
713 | |||
714 | /** | ||
715 | * Function called whenever a tunnel is destroyed. Should clean up | ||
716 | * any associated state. This function is NOT called if the client has | ||
717 | * explicitly asked for the tunnel to be destroyed using | ||
718 | * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on | ||
719 | * the tunnel. | ||
720 | * | ||
721 | * @param cls closure (set from GNUNET_MESH_connect) | ||
722 | * @param tunnel connection to the other end (henceforth invalid) | ||
723 | * @param tunnel_ctx place where local state associated | ||
724 | * with the tunnel is stored | ||
725 | */ | ||
726 | static void | ||
727 | tunnel_end_cb (void *cls, | ||
728 | const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx) | ||
729 | { | ||
730 | struct TunnelContext *ctx = tunnel_ctx; | ||
731 | |||
732 | switch (ctx->type) | ||
733 | { | ||
734 | case CONTEXT_INCOMING: | ||
735 | incoming_destroy ((struct Incoming *) ctx->data); | ||
736 | break; | ||
737 | case CONTEXT_OPERATION_UNION: | ||
738 | _GSS_union_operation_destroy ((struct UnionEvaluateOperation *) ctx->data); | ||
739 | break; | ||
740 | case CONTEXT_OPERATION_INTERSECTION: | ||
741 | GNUNET_assert (0); | ||
742 | /* FIXME: cfuchs */ | ||
743 | break; | ||
744 | default: | ||
745 | GNUNET_assert (0); | ||
746 | } | ||
747 | |||
748 | } | ||
749 | |||
750 | |||
666 | /** | 751 | /** |
667 | * Function called by the service's run | 752 | * Function called by the service's run |
668 | * method to run service-specific setup code. | 753 | * method to run service-specific setup code. |
@@ -686,16 +771,40 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
686 | {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, | 771 | {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, |
687 | {NULL, NULL, 0, 0} | 772 | {NULL, NULL, 0, 0} |
688 | }; | 773 | }; |
774 | static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = { | ||
775 | {handle_p2p_operation_request, | ||
776 | GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, | ||
777 | /* messages for the union operation */ | ||
778 | {_GSS_union_handle_p2p_message, | ||
779 | GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0}, | ||
780 | {_GSS_union_handle_p2p_message, | ||
781 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, | ||
782 | {_GSS_union_handle_p2p_message, | ||
783 | GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0}, | ||
784 | {_GSS_union_handle_p2p_message, | ||
785 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, | ||
786 | {_GSS_union_handle_p2p_message, | ||
787 | GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0}, | ||
788 | /* FIXME: messages for intersection operation */ | ||
789 | {NULL, 0, 0} | ||
790 | }; | ||
791 | static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0}; | ||
689 | 792 | ||
690 | configuration = cfg; | 793 | configuration = cfg; |
691 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); | 794 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, |
795 | &shutdown_task, NULL); | ||
692 | GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); | 796 | GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); |
693 | GNUNET_SERVER_add_handlers (server, server_handlers); | 797 | GNUNET_SERVER_add_handlers (server, server_handlers); |
694 | stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET, | ||
695 | &stream_listen_cb, NULL, | ||
696 | GNUNET_STREAM_OPTION_END); | ||
697 | 798 | ||
698 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set service running\n"); | 799 | mesh = GNUNET_MESH_connect (cfg, NULL, tunnel_new_cb, tunnel_end_cb, |
800 | mesh_handlers, mesh_ports); | ||
801 | if (NULL == mesh) | ||
802 | { | ||
803 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not connect to mesh\n"); | ||
804 | return; | ||
805 | } | ||
806 | |||
807 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "service started\n"); | ||
699 | } | 808 | } |
700 | 809 | ||
701 | 810 | ||
@@ -710,7 +819,8 @@ int | |||
710 | main (int argc, char *const *argv) | 819 | main (int argc, char *const *argv) |
711 | { | 820 | { |
712 | int ret; | 821 | int ret; |
713 | ret = GNUNET_SERVICE_run (argc, argv, "set", GNUNET_SERVICE_OPTION_NONE, &run, NULL); | 822 | ret = GNUNET_SERVICE_run (argc, argv, "set", |
823 | GNUNET_SERVICE_OPTION_NONE, &run, NULL); | ||
714 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n"); | 824 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n"); |
715 | return (GNUNET_OK == ret) ? 0 : 1; | 825 | return (GNUNET_OK == ret) ? 0 : 1; |
716 | } | 826 | } |
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index 15199eba4..66bff4ff1 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h | |||
@@ -33,7 +33,7 @@ | |||
33 | #include "gnunet_applications.h" | 33 | #include "gnunet_applications.h" |
34 | #include "gnunet_util_lib.h" | 34 | #include "gnunet_util_lib.h" |
35 | #include "gnunet_core_service.h" | 35 | #include "gnunet_core_service.h" |
36 | #include "gnunet_stream_lib.h" | 36 | #include "gnunet_mesh2_service.h" |
37 | #include "gnunet_set_service.h" | 37 | #include "gnunet_set_service.h" |
38 | #include "set.h" | 38 | #include "set.h" |
39 | 39 | ||
@@ -47,6 +47,8 @@ struct IntersectionState; | |||
47 | */ | 47 | */ |
48 | struct UnionState; | 48 | struct UnionState; |
49 | 49 | ||
50 | struct UnionEvaluateOperation; | ||
51 | |||
50 | 52 | ||
51 | /** | 53 | /** |
52 | * A set that supports a specific operation | 54 | * A set that supports a specific operation |
@@ -63,7 +65,7 @@ struct Set | |||
63 | /** | 65 | /** |
64 | * Message queue for the client | 66 | * Message queue for the client |
65 | */ | 67 | */ |
66 | struct GNUNET_MQ_MessageQueue *client_mq; | 68 | struct GNUNET_MQ_Handle *client_mq; |
67 | 69 | ||
68 | /** | 70 | /** |
69 | * Type of operation supported for this set | 71 | * Type of operation supported for this set |
@@ -116,7 +118,7 @@ struct Listener | |||
116 | /** | 118 | /** |
117 | * Message queue for the client | 119 | * Message queue for the client |
118 | */ | 120 | */ |
119 | struct GNUNET_MQ_MessageQueue *client_mq; | 121 | struct GNUNET_MQ_Handle *client_mq; |
120 | 122 | ||
121 | /** | 123 | /** |
122 | * Type of operation supported for this set | 124 | * Type of operation supported for this set |
@@ -148,19 +150,17 @@ struct Incoming | |||
148 | struct Incoming *prev; | 150 | struct Incoming *prev; |
149 | 151 | ||
150 | /** | 152 | /** |
151 | * Identity of the peer that connected to us | 153 | * Tunnel context, stores information about |
154 | * the tunnel and its peer. | ||
152 | */ | 155 | */ |
153 | struct GNUNET_PeerIdentity peer; | 156 | struct TunnelContext *tc; |
154 | 157 | ||
155 | /** | 158 | /** |
156 | * Socket connected to the peer | 159 | * GNUNET_YES if the incoming peer has sent |
160 | * an operation request (and we are waiting | ||
161 | * for the client to ack/nack), GNUNET_NO otherwise. | ||
157 | */ | 162 | */ |
158 | struct GNUNET_STREAM_Socket *socket; | 163 | int received_request; |
159 | |||
160 | /** | ||
161 | * Message queue for the peer | ||
162 | */ | ||
163 | struct GNUNET_MQ_MessageQueue *mq; | ||
164 | 164 | ||
165 | /** | 165 | /** |
166 | * App code, set once the peer has | 166 | * App code, set once the peer has |
@@ -187,18 +187,37 @@ struct Incoming | |||
187 | 187 | ||
188 | /** | 188 | /** |
189 | * Unique request id for the request from | 189 | * Unique request id for the request from |
190 | * a remote peer, sent to the client with will | 190 | * a remote peer, sent to the client, which will |
191 | * accept or reject the request. | 191 | * accept or reject the request. |
192 | */ | 192 | */ |
193 | uint32_t accept_id; | 193 | uint32_t accept_id; |
194 | }; | 194 | }; |
195 | 195 | ||
196 | 196 | ||
197 | enum TunnelContextType { | ||
198 | CONTEXT_INCOMING, | ||
199 | CONTEXT_OPERATION_UNION, | ||
200 | CONTEXT_OPERATION_INTERSECTION, | ||
201 | }; | ||
202 | |||
203 | struct TunnelContext | ||
204 | { | ||
205 | struct GNUNET_MESH_Tunnel *tunnel; | ||
206 | struct GNUNET_PeerIdentity peer; | ||
207 | struct GNUNET_MQ_Handle *mq; | ||
208 | enum TunnelContextType type; | ||
209 | void *data; | ||
210 | }; | ||
211 | |||
212 | |||
213 | |||
197 | /** | 214 | /** |
198 | * Configuration of the local peer | 215 | * Configuration of the local peer |
199 | */ | 216 | */ |
200 | extern const struct GNUNET_CONFIGURATION_Handle *configuration; | 217 | extern const struct GNUNET_CONFIGURATION_Handle *configuration; |
201 | 218 | ||
219 | extern struct GNUNET_MESH_Handle *mesh; | ||
220 | |||
202 | 221 | ||
203 | /** | 222 | /** |
204 | * Create a new set supporting the union operation | 223 | * Create a new set supporting the union operation |
@@ -262,4 +281,32 @@ _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, | |||
262 | struct Incoming *incoming); | 281 | struct Incoming *incoming); |
263 | 282 | ||
264 | 283 | ||
284 | /** | ||
285 | * Destroy a union operation, and free all resources | ||
286 | * associated with it. | ||
287 | * | ||
288 | * @param eo the union operation to destroy | ||
289 | */ | ||
290 | void | ||
291 | _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo); | ||
292 | |||
293 | |||
294 | /** | ||
295 | * Dispatch messages for a union operation. | ||
296 | * | ||
297 | * @param cls closure | ||
298 | * @param tunnel mesh tunnel | ||
299 | * @param tunnel_ctx tunnel context | ||
300 | * @param sender ??? | ||
301 | * @param mh message to process | ||
302 | * @return ??? | ||
303 | */ | ||
304 | int | ||
305 | _GSS_union_handle_p2p_message (void *cls, | ||
306 | struct GNUNET_MESH_Tunnel *tunnel, | ||
307 | void **tunnel_ctx, | ||
308 | const struct GNUNET_PeerIdentity *sender, | ||
309 | const struct GNUNET_MessageHeader *mh); | ||
310 | |||
311 | |||
265 | #endif | 312 | #endif |
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 6d9658ee5..2b7a0ccba 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -124,15 +124,10 @@ struct UnionEvaluateOperation | |||
124 | struct GNUNET_MessageHeader *context_msg; | 124 | struct GNUNET_MessageHeader *context_msg; |
125 | 125 | ||
126 | /** | 126 | /** |
127 | * Stream socket connected to the other peer | 127 | * Tunnel context for the peer we |
128 | * evaluate the union operation with. | ||
128 | */ | 129 | */ |
129 | struct GNUNET_STREAM_Socket *socket; | 130 | struct TunnelContext *tc; |
130 | |||
131 | /** | ||
132 | * Message queue for the peer on the other | ||
133 | * end | ||
134 | */ | ||
135 | struct GNUNET_MQ_MessageQueue *mq; | ||
136 | 131 | ||
137 | /** | 132 | /** |
138 | * Request ID to multiplex set operations to | 133 | * Request ID to multiplex set operations to |
@@ -397,22 +392,19 @@ destroy_key_to_element_iter (void *cls, | |||
397 | * | 392 | * |
398 | * @param eo the union operation to destroy | 393 | * @param eo the union operation to destroy |
399 | */ | 394 | */ |
400 | static void | 395 | void |
401 | destroy_union_operation (struct UnionEvaluateOperation *eo) | 396 | _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) |
402 | { | 397 | { |
403 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); | 398 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); |
404 | 399 | ||
405 | if (NULL != eo->mq) | 400 | if (NULL != eo->tc) |
406 | { | 401 | { |
407 | GNUNET_MQ_destroy (eo->mq); | 402 | GNUNET_MQ_destroy (eo->tc->mq); |
408 | eo->mq = NULL; | 403 | GNUNET_MESH_tunnel_destroy (eo->tc->tunnel); |
404 | GNUNET_free (eo->tc); | ||
405 | eo->tc = NULL; | ||
409 | } | 406 | } |
410 | 407 | ||
411 | if (NULL != eo->socket) | ||
412 | { | ||
413 | GNUNET_STREAM_close (eo->socket); | ||
414 | eo->socket = NULL; | ||
415 | } | ||
416 | if (NULL != eo->remote_ibf) | 408 | if (NULL != eo->remote_ibf) |
417 | { | 409 | { |
418 | ibf_destroy (eo->remote_ibf); | 410 | ibf_destroy (eo->remote_ibf); |
@@ -457,14 +449,14 @@ destroy_union_operation (struct UnionEvaluateOperation *eo) | |||
457 | static void | 449 | static void |
458 | fail_union_operation (struct UnionEvaluateOperation *eo) | 450 | fail_union_operation (struct UnionEvaluateOperation *eo) |
459 | { | 451 | { |
460 | struct GNUNET_MQ_Message *mqm; | 452 | struct GNUNET_MQ_Envelope *mqm; |
461 | struct GNUNET_SET_ResultMessage *msg; | 453 | struct GNUNET_SET_ResultMessage *msg; |
462 | 454 | ||
463 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 455 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
464 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 456 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
465 | msg->request_id = htonl (eo->request_id); | 457 | msg->request_id = htonl (eo->request_id); |
466 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 458 | GNUNET_MQ_send (eo->set->client_mq, mqm); |
467 | destroy_union_operation (eo); | 459 | _GSS_union_operation_destroy (eo); |
468 | } | 460 | } |
469 | 461 | ||
470 | 462 | ||
@@ -498,7 +490,7 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) | |||
498 | static void | 490 | static void |
499 | send_operation_request (struct UnionEvaluateOperation *eo) | 491 | send_operation_request (struct UnionEvaluateOperation *eo) |
500 | { | 492 | { |
501 | struct GNUNET_MQ_Message *mqm; | 493 | struct GNUNET_MQ_Envelope *mqm; |
502 | struct OperationRequestMessage *msg; | 494 | struct OperationRequestMessage *msg; |
503 | 495 | ||
504 | mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg); | 496 | mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg); |
@@ -512,7 +504,7 @@ send_operation_request (struct UnionEvaluateOperation *eo) | |||
512 | } | 504 | } |
513 | msg->operation = htons (GNUNET_SET_OPERATION_UNION); | 505 | msg->operation = htons (GNUNET_SET_OPERATION_UNION); |
514 | msg->app_id = eo->app_id; | 506 | msg->app_id = eo->app_id; |
515 | GNUNET_MQ_send (eo->mq, mqm); | 507 | GNUNET_MQ_send (eo->tc->mq, mqm); |
516 | 508 | ||
517 | if (NULL != eo->context_msg) | 509 | if (NULL != eo->context_msg) |
518 | { | 510 | { |
@@ -562,7 +554,7 @@ insert_element_iterator (void *cls, | |||
562 | * Insert an element into the union operation's | 554 | * Insert an element into the union operation's |
563 | * key-to-element mapping | 555 | * key-to-element mapping |
564 | * | 556 | * |
565 | * @param the union operation | 557 | * @param eo the union operation |
566 | * @param ee the element entry | 558 | * @param ee the element entry |
567 | */ | 559 | */ |
568 | static void | 560 | static void |
@@ -685,7 +677,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | |||
685 | while (buckets_sent < (1 << ibf_order)) | 677 | while (buckets_sent < (1 << ibf_order)) |
686 | { | 678 | { |
687 | unsigned int buckets_in_message; | 679 | unsigned int buckets_in_message; |
688 | struct GNUNET_MQ_Message *mqm; | 680 | struct GNUNET_MQ_Envelope *mqm; |
689 | struct IBFMessage *msg; | 681 | struct IBFMessage *msg; |
690 | 682 | ||
691 | buckets_in_message = (1 << ibf_order) - buckets_sent; | 683 | buckets_in_message = (1 << ibf_order) - buckets_sent; |
@@ -700,7 +692,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | |||
700 | ibf_write_slice (ibf, buckets_sent, | 692 | ibf_write_slice (ibf, buckets_sent, |
701 | buckets_in_message, &msg[1]); | 693 | buckets_in_message, &msg[1]); |
702 | buckets_sent += buckets_in_message; | 694 | buckets_sent += buckets_in_message; |
703 | GNUNET_MQ_send (eo->mq, mqm); | 695 | GNUNET_MQ_send (eo->tc->mq, mqm); |
704 | } | 696 | } |
705 | 697 | ||
706 | eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; | 698 | eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; |
@@ -715,14 +707,14 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) | |||
715 | static void | 707 | static void |
716 | send_strata_estimator (struct UnionEvaluateOperation *eo) | 708 | send_strata_estimator (struct UnionEvaluateOperation *eo) |
717 | { | 709 | { |
718 | struct GNUNET_MQ_Message *mqm; | 710 | struct GNUNET_MQ_Envelope *mqm; |
719 | struct GNUNET_MessageHeader *strata_msg; | 711 | struct GNUNET_MessageHeader *strata_msg; |
720 | 712 | ||
721 | mqm = GNUNET_MQ_msg_header_extra (strata_msg, | 713 | mqm = GNUNET_MQ_msg_header_extra (strata_msg, |
722 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, | 714 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, |
723 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); | 715 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); |
724 | strata_estimator_write (eo->set->state.u->se, &strata_msg[1]); | 716 | strata_estimator_write (eo->set->state.u->se, &strata_msg[1]); |
725 | GNUNET_MQ_send (eo->mq, mqm); | 717 | GNUNET_MQ_send (eo->tc->mq, mqm); |
726 | eo->phase = PHASE_EXPECT_IBF; | 718 | eo->phase = PHASE_EXPECT_IBF; |
727 | } | 719 | } |
728 | 720 | ||
@@ -751,7 +743,7 @@ get_order_from_difference (unsigned int diff) | |||
751 | /** | 743 | /** |
752 | * Handle a strata estimator from a remote peer | 744 | * Handle a strata estimator from a remote peer |
753 | * | 745 | * |
754 | * @param the union operation | 746 | * @param cls the union operation |
755 | * @param mh the message | 747 | * @param mh the message |
756 | */ | 748 | */ |
757 | static void | 749 | static void |
@@ -804,7 +796,7 @@ send_element_iterator (void *cls, | |||
804 | while (NULL != ke) | 796 | while (NULL != ke) |
805 | { | 797 | { |
806 | const struct GNUNET_SET_Element *const element = &ke->element->element; | 798 | const struct GNUNET_SET_Element *const element = &ke->element->element; |
807 | struct GNUNET_MQ_Message *mqm; | 799 | struct GNUNET_MQ_Envelope *mqm; |
808 | struct GNUNET_MessageHeader *mh; | 800 | struct GNUNET_MessageHeader *mh; |
809 | 801 | ||
810 | GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); | 802 | GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); |
@@ -817,7 +809,7 @@ send_element_iterator (void *cls, | |||
817 | } | 809 | } |
818 | memcpy (&mh[1], element->data, element->size); | 810 | memcpy (&mh[1], element->data, element->size); |
819 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); | 811 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); |
820 | GNUNET_MQ_send (eo->mq, mqm); | 812 | GNUNET_MQ_send (eo->tc->mq, mqm); |
821 | ke = ke->next_colliding; | 813 | ke = ke->next_colliding; |
822 | } | 814 | } |
823 | return GNUNET_NO; | 815 | return GNUNET_NO; |
@@ -889,11 +881,11 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
889 | } | 881 | } |
890 | if (GNUNET_NO == res) | 882 | if (GNUNET_NO == res) |
891 | { | 883 | { |
892 | struct GNUNET_MQ_Message *mqm; | 884 | struct GNUNET_MQ_Envelope *mqm; |
893 | 885 | ||
894 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); | 886 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); |
895 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 887 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
896 | GNUNET_MQ_send (eo->mq, mqm); | 888 | GNUNET_MQ_send (eo->tc->mq, mqm); |
897 | break; | 889 | break; |
898 | } | 890 | } |
899 | if (1 == side) | 891 | if (1 == side) |
@@ -902,7 +894,7 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
902 | } | 894 | } |
903 | else | 895 | else |
904 | { | 896 | { |
905 | struct GNUNET_MQ_Message *mqm; | 897 | struct GNUNET_MQ_Envelope *mqm; |
906 | struct GNUNET_MessageHeader *msg; | 898 | struct GNUNET_MessageHeader *msg; |
907 | 899 | ||
908 | /* FIXME: before sending the request, check if we may just have the element */ | 900 | /* FIXME: before sending the request, check if we may just have the element */ |
@@ -910,7 +902,7 @@ decode_and_send (struct UnionEvaluateOperation *eo) | |||
910 | mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), | 902 | mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), |
911 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); | 903 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); |
912 | *(struct IBF_Key *) &msg[1] = key; | 904 | *(struct IBF_Key *) &msg[1] = key; |
913 | GNUNET_MQ_send (eo->mq, mqm); | 905 | GNUNET_MQ_send (eo->tc->mq, mqm); |
914 | } | 906 | } |
915 | } | 907 | } |
916 | ibf_destroy (diff_ibf); | 908 | ibf_destroy (diff_ibf); |
@@ -987,7 +979,7 @@ static void | |||
987 | send_client_element (struct UnionEvaluateOperation *eo, | 979 | send_client_element (struct UnionEvaluateOperation *eo, |
988 | struct GNUNET_SET_Element *element) | 980 | struct GNUNET_SET_Element *element) |
989 | { | 981 | { |
990 | struct GNUNET_MQ_Message *mqm; | 982 | struct GNUNET_MQ_Envelope *mqm; |
991 | struct GNUNET_SET_ResultMessage *rm; | 983 | struct GNUNET_SET_ResultMessage *rm; |
992 | 984 | ||
993 | GNUNET_assert (0 != eo->request_id); | 985 | GNUNET_assert (0 != eo->request_id); |
@@ -1006,39 +998,17 @@ send_client_element (struct UnionEvaluateOperation *eo, | |||
1006 | 998 | ||
1007 | 999 | ||
1008 | /** | 1000 | /** |
1009 | * Completion callback for shutdown | ||
1010 | * | ||
1011 | * @param cls the closure from GNUNET_STREAM_shutdown call | ||
1012 | * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR, | ||
1013 | * SHUT_RDWR) | ||
1014 | */ | ||
1015 | /* | ||
1016 | static void | ||
1017 | stream_shutdown_cb (void *cls, | ||
1018 | int operation) | ||
1019 | { | ||
1020 | //struct UnionEvaluateOperation *eo = cls; | ||
1021 | |||
1022 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n"); | ||
1023 | |||
1024 | // destroy_union_operation (eo); | ||
1025 | } | ||
1026 | */ | ||
1027 | |||
1028 | |||
1029 | /** | ||
1030 | * Send a result message to the client indicating | 1001 | * Send a result message to the client indicating |
1031 | * that the operation is over. | 1002 | * that the operation is over. |
1032 | * After the result done message has been sent to the client, | 1003 | * After the result done message has been sent to the client, |
1033 | * destroy the evaluate operation. | 1004 | * destroy the evaluate operation. |
1034 | * | 1005 | * |
1035 | * @param eo union operation | 1006 | * @param eo union operation |
1036 | * @param element element to send | ||
1037 | */ | 1007 | */ |
1038 | static void | 1008 | static void |
1039 | send_client_done_and_destroy (struct UnionEvaluateOperation *eo) | 1009 | send_client_done_and_destroy (struct UnionEvaluateOperation *eo) |
1040 | { | 1010 | { |
1041 | struct GNUNET_MQ_Message *mqm; | 1011 | struct GNUNET_MQ_Envelope *mqm; |
1042 | struct GNUNET_SET_ResultMessage *rm; | 1012 | struct GNUNET_SET_ResultMessage *rm; |
1043 | 1013 | ||
1044 | GNUNET_assert (0 != eo->request_id); | 1014 | GNUNET_assert (0 != eo->request_id); |
@@ -1047,7 +1017,6 @@ send_client_done_and_destroy (struct UnionEvaluateOperation *eo) | |||
1047 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 1017 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
1048 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 1018 | GNUNET_MQ_send (eo->set->client_mq, mqm); |
1049 | 1019 | ||
1050 | // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo); | ||
1051 | } | 1020 | } |
1052 | 1021 | ||
1053 | 1022 | ||
@@ -1153,13 +1122,13 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1153 | if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | 1122 | if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) |
1154 | { | 1123 | { |
1155 | /* we got all requests, but still have to send our elements as response */ | 1124 | /* we got all requests, but still have to send our elements as response */ |
1156 | struct GNUNET_MQ_Message *mqm; | 1125 | struct GNUNET_MQ_Envelope *mqm; |
1157 | 1126 | ||
1158 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); | 1127 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); |
1159 | eo->phase = PHASE_FINISHED; | 1128 | eo->phase = PHASE_FINISHED; |
1160 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | 1129 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); |
1161 | GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo); | 1130 | GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo); |
1162 | GNUNET_MQ_send (eo->mq, mqm); | 1131 | GNUNET_MQ_send (eo->tc->mq, mqm); |
1163 | return; | 1132 | return; |
1164 | } | 1133 | } |
1165 | if (eo->phase == PHASE_EXPECT_ELEMENTS) | 1134 | if (eo->phase == PHASE_EXPECT_ELEMENTS) |
@@ -1175,48 +1144,11 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1175 | 1144 | ||
1176 | 1145 | ||
1177 | /** | 1146 | /** |
1178 | * The handlers array, used for both evaluate and accept | ||
1179 | */ | ||
1180 | static const struct GNUNET_MQ_Handler union_handlers[] = { | ||
1181 | {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS}, | ||
1182 | {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE}, | ||
1183 | {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF}, | ||
1184 | {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS}, | ||
1185 | {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE}, | ||
1186 | GNUNET_MQ_HANDLERS_END | ||
1187 | }; | ||
1188 | |||
1189 | |||
1190 | /** | ||
1191 | * Functions of this type will be called when a stream is established | ||
1192 | * | ||
1193 | * @param cls the closure from GNUNET_STREAM_open | ||
1194 | * @param socket socket to use to communicate with the | ||
1195 | * other side (read/write) | ||
1196 | */ | ||
1197 | static void | ||
1198 | stream_open_cb (void *cls, | ||
1199 | struct GNUNET_STREAM_Socket *socket) | ||
1200 | { | ||
1201 | struct UnionEvaluateOperation *eo = cls; | ||
1202 | |||
1203 | GNUNET_assert (NULL == eo->mq); | ||
1204 | GNUNET_assert (socket == eo->socket); | ||
1205 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
1206 | "open cb successful\n"); | ||
1207 | eo->mq = GNUNET_STREAM_mq_create (eo->socket, union_handlers, NULL, eo); | ||
1208 | /* we started the operation, thus we have to send the operation request */ | ||
1209 | send_operation_request (eo); | ||
1210 | eo->phase = PHASE_EXPECT_SE; | ||
1211 | } | ||
1212 | |||
1213 | |||
1214 | /** | ||
1215 | * Evaluate a union operation with | 1147 | * Evaluate a union operation with |
1216 | * a remote peer. | 1148 | * a remote peer. |
1217 | * | 1149 | * |
1218 | * @param m the evaluate request message from the client | 1150 | * @param m the evaluate request message from the client |
1219 | * @parem set the set to evaluate the operation with | 1151 | * @param set the set to evaluate the operation with |
1220 | */ | 1152 | */ |
1221 | void | 1153 | void |
1222 | _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) | 1154 | _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) |
@@ -1243,14 +1175,20 @@ _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) | |||
1243 | "evaluating union operation, (app %s)\n", | 1175 | "evaluating union operation, (app %s)\n", |
1244 | GNUNET_h2s (&eo->app_id)); | 1176 | GNUNET_h2s (&eo->app_id)); |
1245 | 1177 | ||
1246 | eo->socket = | 1178 | eo->tc = GNUNET_new (struct TunnelContext); |
1247 | GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET, | 1179 | eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer, |
1248 | &stream_open_cb, eo, | 1180 | GNUNET_APPLICATION_TYPE_SET); |
1249 | GNUNET_STREAM_OPTION_END); | 1181 | GNUNET_assert (NULL != eo->tc->tunnel); |
1182 | eo->tc->peer = eo->peer; | ||
1183 | eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel); | ||
1184 | /* we started the operation, thus we have to send the operation request */ | ||
1185 | eo->phase = PHASE_EXPECT_SE; | ||
1186 | |||
1250 | GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, | 1187 | GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, |
1251 | eo->set->state.u->ops_tail, | 1188 | eo->set->state.u->ops_tail, |
1252 | eo); | 1189 | eo); |
1253 | /* the stream open callback will kick off the operation */ | 1190 | |
1191 | send_operation_request (eo); | ||
1254 | } | 1192 | } |
1255 | 1193 | ||
1256 | 1194 | ||
@@ -1270,25 +1208,17 @@ _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, | |||
1270 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); | 1208 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); |
1271 | 1209 | ||
1272 | eo = GNUNET_new (struct UnionEvaluateOperation); | 1210 | eo = GNUNET_new (struct UnionEvaluateOperation); |
1211 | eo->tc = incoming->tc; | ||
1273 | eo->generation_created = set->state.u->current_generation++; | 1212 | eo->generation_created = set->state.u->current_generation++; |
1274 | eo->set = set; | 1213 | eo->set = set; |
1275 | eo->peer = incoming->peer; | ||
1276 | eo->salt = ntohs (incoming->salt); | 1214 | eo->salt = ntohs (incoming->salt); |
1277 | GNUNET_assert (0 != ntohl (m->request_id)); | 1215 | GNUNET_assert (0 != ntohl (m->request_id)); |
1278 | eo->request_id = ntohl (m->request_id); | 1216 | eo->request_id = ntohl (m->request_id); |
1279 | eo->se = strata_estimator_dup (set->state.u->se); | 1217 | eo->se = strata_estimator_dup (set->state.u->se); |
1280 | eo->mq = incoming->mq; | ||
1281 | /* transfer ownership of mq and socket from incoming to eo */ | 1218 | /* transfer ownership of mq and socket from incoming to eo */ |
1282 | incoming->mq = NULL; | ||
1283 | eo->socket = incoming->socket; | ||
1284 | incoming->socket = NULL; | ||
1285 | /* the peer's socket is now ours, we'll receive all messages */ | ||
1286 | GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo); | ||
1287 | |||
1288 | GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, | 1219 | GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, |
1289 | eo->set->state.u->ops_tail, | 1220 | eo->set->state.u->ops_tail, |
1290 | eo); | 1221 | eo); |
1291 | |||
1292 | /* kick off the operation */ | 1222 | /* kick off the operation */ |
1293 | send_strata_estimator (eo); | 1223 | send_strata_estimator (eo); |
1294 | } | 1224 | } |
@@ -1384,7 +1314,7 @@ _GSS_union_set_destroy (struct Set *set) | |||
1384 | 1314 | ||
1385 | while (NULL != set->state.u->ops_head) | 1315 | while (NULL != set->state.u->ops_head) |
1386 | { | 1316 | { |
1387 | destroy_union_operation (set->state.u->ops_head); | 1317 | _GSS_union_operation_destroy (set->state.u->ops_head); |
1388 | } | 1318 | } |
1389 | } | 1319 | } |
1390 | 1320 | ||
@@ -1418,3 +1348,57 @@ _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) | |||
1418 | ee->generation_removed = set->state.u->current_generation; | 1348 | ee->generation_removed = set->state.u->current_generation; |
1419 | } | 1349 | } |
1420 | 1350 | ||
1351 | |||
1352 | /** | ||
1353 | * Dispatch messages for a union operation. | ||
1354 | * | ||
1355 | * @param cls closure | ||
1356 | * @param tunnel mesh tunnel | ||
1357 | * @param tunnel_ctx tunnel context | ||
1358 | * @param sender ??? | ||
1359 | * @param mh message to process | ||
1360 | * @return ??? | ||
1361 | */ | ||
1362 | int | ||
1363 | _GSS_union_handle_p2p_message (void *cls, | ||
1364 | struct GNUNET_MESH_Tunnel *tunnel, | ||
1365 | void **tunnel_ctx, | ||
1366 | const struct GNUNET_PeerIdentity *sender, | ||
1367 | const struct GNUNET_MessageHeader *mh) | ||
1368 | { | ||
1369 | struct TunnelContext *tc = *tunnel_ctx; | ||
1370 | struct UnionEvaluateOperation *eo; | ||
1371 | |||
1372 | if (CONTEXT_OPERATION_UNION != tc->type) | ||
1373 | { | ||
1374 | /* FIXME: kill the tunnel */ | ||
1375 | /* never kill mesh */ | ||
1376 | return GNUNET_OK; | ||
1377 | } | ||
1378 | |||
1379 | eo = tc->data; | ||
1380 | |||
1381 | switch (ntohs (mh->type)) | ||
1382 | { | ||
1383 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: | ||
1384 | handle_p2p_ibf (eo, mh); | ||
1385 | break; | ||
1386 | case GNUNET_MESSAGE_TYPE_SET_P2P_SE: | ||
1387 | handle_p2p_strata_estimator (eo, mh); | ||
1388 | break; | ||
1389 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: | ||
1390 | handle_p2p_elements (eo, mh); | ||
1391 | break; | ||
1392 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: | ||
1393 | handle_p2p_element_requests (eo, mh); | ||
1394 | break; | ||
1395 | case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: | ||
1396 | handle_p2p_done (eo, mh); | ||
1397 | break; | ||
1398 | default: | ||
1399 | /* something wrong with mesh's message handlers? */ | ||
1400 | GNUNET_assert (0); | ||
1401 | } | ||
1402 | /* never kill mesh! */ | ||
1403 | return GNUNET_OK; | ||
1404 | } | ||
diff --git a/src/set/gnunet-set-ibf.c b/src/set/gnunet-set-ibf-profiler.c index d431795f1..92feb3db4 100644 --- a/src/set/gnunet-set-ibf.c +++ b/src/set/gnunet-set-ibf-profiler.c | |||
@@ -19,8 +19,8 @@ | |||
19 | */ | 19 | */ |
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file consensus/gnunet-consensus-ibf.c | 22 | * @file set/gnunet-set-ibf-profiler.c |
23 | * @brief tool for reconciling data with invertible bloom filters | 23 | * @brief tool for profiling the invertible bloom filter implementation |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | */ | 25 | */ |
26 | 26 | ||
@@ -35,7 +35,7 @@ | |||
35 | static unsigned int asize = 10; | 35 | static unsigned int asize = 10; |
36 | static unsigned int bsize = 10; | 36 | static unsigned int bsize = 10; |
37 | static unsigned int csize = 10; | 37 | static unsigned int csize = 10; |
38 | static unsigned int hash_num = 3; | 38 | static unsigned int hash_num = 4; |
39 | static unsigned int ibf_size = 80; | 39 | static unsigned int ibf_size = 80; |
40 | 40 | ||
41 | /* FIXME: add parameter for this */ | 41 | /* FIXME: add parameter for this */ |
@@ -181,12 +181,14 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
181 | 181 | ||
182 | start_time = GNUNET_TIME_absolute_get (); | 182 | start_time = GNUNET_TIME_absolute_get (); |
183 | 183 | ||
184 | for (;;) | 184 | for (i = 0; i <= asize + bsize; i++) |
185 | { | 185 | { |
186 | res = ibf_decode (ibf_a, &side, &ibf_key); | 186 | res = ibf_decode (ibf_a, &side, &ibf_key); |
187 | if (GNUNET_SYSERR == res) | 187 | if (GNUNET_SYSERR == res) |
188 | { | 188 | { |
189 | printf ("decode failed\n"); | 189 | printf ("decode failed, %u/%u elements left\n", |
190 | GNUNET_CONTAINER_multihashmap_size (set_a) + GNUNET_CONTAINER_multihashmap_size (set_b), | ||
191 | asize + bsize); | ||
190 | return; | 192 | return; |
191 | } | 193 | } |
192 | if (GNUNET_NO == res) | 194 | if (GNUNET_NO == res) |
@@ -198,7 +200,9 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
198 | printf ("decoded successfully in: %s\n", GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO)); | 200 | printf ("decoded successfully in: %s\n", GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO)); |
199 | } | 201 | } |
200 | else | 202 | else |
201 | printf ("decode missed elements\n"); | 203 | { |
204 | printf ("decode missed elements (should never happen)\n"); | ||
205 | } | ||
202 | return; | 206 | return; |
203 | } | 207 | } |
204 | 208 | ||
@@ -207,6 +211,9 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
207 | if (side == -1) | 211 | if (side == -1) |
208 | iter_hashcodes (ibf_key, remove_iterator, set_b); | 212 | iter_hashcodes (ibf_key, remove_iterator, set_b); |
209 | } | 213 | } |
214 | printf("cyclic IBF, %u/%u elements left\n", | ||
215 | GNUNET_CONTAINER_multihashmap_size (set_a) + GNUNET_CONTAINER_multihashmap_size (set_b), | ||
216 | asize + bsize); | ||
210 | } | 217 | } |
211 | 218 | ||
212 | int | 219 | int |
diff --git a/src/set/gnunet-set-profiler.c b/src/set/gnunet-set-profiler.c new file mode 100644 index 000000000..bbaef7c43 --- /dev/null +++ b/src/set/gnunet-set-profiler.c | |||
@@ -0,0 +1,320 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | (C) 2013 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file set/gnunet-set-profiler.c | ||
23 | * @brief profiling tool for set | ||
24 | * @author Florian Dold | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_common.h" | ||
28 | #include "gnunet_util_lib.h" | ||
29 | #include "gnunet_set_service.h" | ||
30 | #include "gnunet_testbed_service.h" | ||
31 | |||
32 | |||
33 | static int ret; | ||
34 | |||
35 | static unsigned int num_a = 5; | ||
36 | static unsigned int num_b = 5; | ||
37 | static unsigned int num_c = 20; | ||
38 | |||
39 | static unsigned int salt = 42; | ||
40 | |||
41 | static char* op_str = "union"; | ||
42 | |||
43 | const static struct GNUNET_CONFIGURATION_Handle *config; | ||
44 | |||
45 | struct GNUNET_CONTAINER_MultiHashMap *map_a; | ||
46 | struct GNUNET_CONTAINER_MultiHashMap *map_b; | ||
47 | struct GNUNET_CONTAINER_MultiHashMap *map_c; | ||
48 | |||
49 | |||
50 | /** | ||
51 | * Elements that set a received, should match map_c | ||
52 | * in the end. | ||
53 | */ | ||
54 | struct GNUNET_CONTAINER_MultiHashMap *map_a_received; | ||
55 | |||
56 | /** | ||
57 | * Elements that set b received, should match map_c | ||
58 | * in the end. | ||
59 | */ | ||
60 | struct GNUNET_CONTAINER_MultiHashMap *map_b_received; | ||
61 | |||
62 | struct GNUNET_SET_Handle *set_a; | ||
63 | struct GNUNET_SET_Handle *set_b; | ||
64 | |||
65 | struct GNUNET_HashCode app_id; | ||
66 | |||
67 | struct GNUNET_PeerIdentity local_peer; | ||
68 | |||
69 | struct GNUNET_SET_ListenHandle *set_listener; | ||
70 | |||
71 | struct GNUNET_SET_OperationHandle *set_oh1; | ||
72 | struct GNUNET_SET_OperationHandle *set_oh2; | ||
73 | |||
74 | |||
75 | int a_done; | ||
76 | int b_done; | ||
77 | |||
78 | |||
79 | |||
80 | static int | ||
81 | map_remove_iterator (void *cls, | ||
82 | const struct GNUNET_HashCode *key, | ||
83 | void *value) | ||
84 | { | ||
85 | struct GNUNET_CONTAINER_MultiHashMap *m = cls; | ||
86 | int ret; | ||
87 | |||
88 | ret = GNUNET_CONTAINER_multihashmap_remove (m, key, NULL); | ||
89 | GNUNET_assert (GNUNET_OK == ret); | ||
90 | return GNUNET_YES; | ||
91 | |||
92 | } | ||
93 | |||
94 | |||
95 | static void | ||
96 | set_result_cb_1 (void *cls, | ||
97 | const struct GNUNET_SET_Element *element, | ||
98 | enum GNUNET_SET_Status status) | ||
99 | { | ||
100 | GNUNET_assert (GNUNET_NO == a_done); | ||
101 | GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode)); | ||
102 | switch (status) | ||
103 | { | ||
104 | case GNUNET_SET_STATUS_DONE: | ||
105 | case GNUNET_SET_STATUS_HALF_DONE: | ||
106 | a_done = GNUNET_YES; | ||
107 | GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, map_a_received); | ||
108 | GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_a_received)); | ||
109 | return; | ||
110 | case GNUNET_SET_STATUS_FAILURE: | ||
111 | GNUNET_assert (0); | ||
112 | return; | ||
113 | case GNUNET_SET_STATUS_OK: | ||
114 | break; | ||
115 | default: | ||
116 | GNUNET_assert (0); | ||
117 | } | ||
118 | GNUNET_CONTAINER_multihashmap_put (map_a_received, | ||
119 | element->data, NULL, | ||
120 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | ||
121 | } | ||
122 | |||
123 | |||
124 | static void | ||
125 | set_result_cb_2 (void *cls, | ||
126 | const struct GNUNET_SET_Element *element, | ||
127 | enum GNUNET_SET_Status status) | ||
128 | { | ||
129 | GNUNET_assert (GNUNET_NO == b_done); | ||
130 | GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode)); | ||
131 | switch (status) | ||
132 | { | ||
133 | case GNUNET_SET_STATUS_DONE: | ||
134 | case GNUNET_SET_STATUS_HALF_DONE: | ||
135 | b_done = GNUNET_YES; | ||
136 | GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, map_b_received); | ||
137 | GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_b_received)); | ||
138 | return; | ||
139 | case GNUNET_SET_STATUS_FAILURE: | ||
140 | GNUNET_assert (0); | ||
141 | return; | ||
142 | case GNUNET_SET_STATUS_OK: | ||
143 | break; | ||
144 | default: | ||
145 | GNUNET_assert (0); | ||
146 | } | ||
147 | GNUNET_CONTAINER_multihashmap_put (map_b_received, | ||
148 | element->data, NULL, | ||
149 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | ||
150 | } | ||
151 | |||
152 | |||
153 | static void | ||
154 | set_listen_cb (void *cls, | ||
155 | const struct GNUNET_PeerIdentity *other_peer, | ||
156 | const struct GNUNET_MessageHeader *context_msg, | ||
157 | struct GNUNET_SET_Request *request) | ||
158 | { | ||
159 | GNUNET_assert (NULL == set_oh2); | ||
160 | set_oh2 = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, | ||
161 | set_result_cb_2, NULL); | ||
162 | GNUNET_SET_commit (set_oh2, set_b); | ||
163 | } | ||
164 | |||
165 | |||
166 | |||
167 | static int | ||
168 | set_insert_iterator (void *cls, | ||
169 | const struct GNUNET_HashCode *key, | ||
170 | void *value) | ||
171 | { | ||
172 | struct GNUNET_SET_Handle *set = cls; | ||
173 | struct GNUNET_SET_Element *el; | ||
174 | |||
175 | el = GNUNET_malloc (sizeof *el + sizeof *key); | ||
176 | el->type = 0; | ||
177 | memcpy (&el[1], key, sizeof *key); | ||
178 | el->data = &el[1]; | ||
179 | el->size = sizeof *key; | ||
180 | GNUNET_SET_add_element (set, el, NULL, NULL); | ||
181 | GNUNET_free (el); | ||
182 | return GNUNET_YES; | ||
183 | } | ||
184 | |||
185 | |||
186 | /** | ||
187 | * Signature of the 'main' function for a (single-peer) testcase that | ||
188 | * is run using 'GNUNET_TESTING_peer_run'. | ||
189 | * | ||
190 | * @param cls closure | ||
191 | * @param cfg configuration of the peer that was started | ||
192 | * @param peer identity of the peer that was created | ||
193 | */ | ||
194 | static void | ||
195 | test_main (void *cls, | ||
196 | const struct GNUNET_CONFIGURATION_Handle *cfg, | ||
197 | struct GNUNET_TESTING_Peer *peer) | ||
198 | { | ||
199 | unsigned int i; | ||
200 | struct GNUNET_HashCode hash; | ||
201 | |||
202 | config = cfg; | ||
203 | |||
204 | if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &local_peer)) | ||
205 | { | ||
206 | GNUNET_assert (0); | ||
207 | return; | ||
208 | } | ||
209 | |||
210 | map_a = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO); | ||
211 | map_b = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO); | ||
212 | map_c = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO); | ||
213 | |||
214 | for (i = 0; i < num_a; i++) | ||
215 | { | ||
216 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); | ||
217 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash)) | ||
218 | { | ||
219 | i--; | ||
220 | continue; | ||
221 | } | ||
222 | GNUNET_CONTAINER_multihashmap_put (map_a, &hash, &hash, | ||
223 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
224 | } | ||
225 | |||
226 | for (i = 0; i < num_b; i++) | ||
227 | { | ||
228 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); | ||
229 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash)) | ||
230 | { | ||
231 | i--; | ||
232 | continue; | ||
233 | } | ||
234 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash)) | ||
235 | { | ||
236 | i--; | ||
237 | continue; | ||
238 | } | ||
239 | GNUNET_CONTAINER_multihashmap_put (map_b, &hash, NULL, | ||
240 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | ||
241 | } | ||
242 | |||
243 | for (i = 0; i < num_c; i++) | ||
244 | { | ||
245 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); | ||
246 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash)) | ||
247 | { | ||
248 | i--; | ||
249 | continue; | ||
250 | } | ||
251 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash)) | ||
252 | { | ||
253 | i--; | ||
254 | continue; | ||
255 | } | ||
256 | if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_c, &hash)) | ||
257 | { | ||
258 | i--; | ||
259 | continue; | ||
260 | } | ||
261 | GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); | ||
262 | GNUNET_CONTAINER_multihashmap_put (map_c, &hash, NULL, | ||
263 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); | ||
264 | } | ||
265 | |||
266 | /* use last hash for app id */ | ||
267 | app_id = hash; | ||
268 | |||
269 | /* FIXME: also implement intersection etc. */ | ||
270 | set_a = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); | ||
271 | set_b = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); | ||
272 | |||
273 | GNUNET_CONTAINER_multihashmap_iterate (map_a, set_insert_iterator, set_a); | ||
274 | GNUNET_CONTAINER_multihashmap_iterate (map_b, set_insert_iterator, set_b); | ||
275 | GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_a); | ||
276 | GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_b); | ||
277 | |||
278 | set_listener = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, | ||
279 | &app_id, set_listen_cb, NULL); | ||
280 | |||
281 | set_oh1 = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt, GNUNET_SET_RESULT_ADDED, | ||
282 | set_result_cb_1, NULL); | ||
283 | GNUNET_SET_commit (set_oh1, set_a); | ||
284 | } | ||
285 | |||
286 | static void | ||
287 | run (void *cls, char *const *args, const char *cfgfile, | ||
288 | const struct GNUNET_CONFIGURATION_Handle *cfg) | ||
289 | { | ||
290 | |||
291 | ret = GNUNET_TESTING_peer_run ("test_set_api", | ||
292 | "test_set.conf", | ||
293 | &test_main, NULL); | ||
294 | } | ||
295 | |||
296 | |||
297 | int | ||
298 | main (int argc, char **argv) | ||
299 | { | ||
300 | static const struct GNUNET_GETOPT_CommandLineOption options[] = { | ||
301 | { 'A', "num-first", NULL, | ||
302 | gettext_noop ("number of values"), | ||
303 | GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_a }, | ||
304 | { 'B', "num-second", NULL, | ||
305 | gettext_noop ("number of values"), | ||
306 | GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_b }, | ||
307 | { 'B', "num-common", NULL, | ||
308 | gettext_noop ("number of values"), | ||
309 | GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_c }, | ||
310 | { 'x', "operation", NULL, | ||
311 | gettext_noop ("oeration to execute"), | ||
312 | GNUNET_YES, &GNUNET_GETOPT_set_string, &op_str }, | ||
313 | GNUNET_GETOPT_OPTION_END | ||
314 | }; | ||
315 | GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus", | ||
316 | "help", | ||
317 | options, &run, NULL, GNUNET_YES); | ||
318 | return ret; | ||
319 | } | ||
320 | |||
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c deleted file mode 100644 index ae84610fc..000000000 --- a/src/set/gnunet-set.c +++ /dev/null | |||
@@ -1,203 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | (C) 2012 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file set/gnunet-set.c | ||
23 | * @brief profiling tool for the set service | ||
24 | * @author Florian Dold | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_common.h" | ||
28 | #include "gnunet_util_lib.h" | ||
29 | #include "gnunet_testbed_service.h" | ||
30 | #include "gnunet_set_service.h" | ||
31 | |||
32 | |||
33 | static struct GNUNET_PeerIdentity local_id; | ||
34 | |||
35 | static struct GNUNET_HashCode app_id; | ||
36 | static struct GNUNET_SET_Handle *set1; | ||
37 | static struct GNUNET_SET_Handle *set2; | ||
38 | static struct GNUNET_SET_ListenHandle *listen_handle; | ||
39 | const static struct GNUNET_CONFIGURATION_Handle *config; | ||
40 | |||
41 | int num_done; | ||
42 | |||
43 | |||
44 | static void | ||
45 | result_cb_set1 (void *cls, const struct GNUNET_SET_Element *element, | ||
46 | enum GNUNET_SET_Status status) | ||
47 | { | ||
48 | switch (status) | ||
49 | { | ||
50 | case GNUNET_SET_STATUS_OK: | ||
51 | printf ("set 1: got element\n"); | ||
52 | break; | ||
53 | case GNUNET_SET_STATUS_FAILURE: | ||
54 | printf ("set 1: failure\n"); | ||
55 | break; | ||
56 | case GNUNET_SET_STATUS_DONE: | ||
57 | printf ("set 1: done\n"); | ||
58 | GNUNET_SET_destroy (set1); | ||
59 | break; | ||
60 | default: | ||
61 | GNUNET_assert (0); | ||
62 | } | ||
63 | } | ||
64 | |||
65 | |||
66 | static void | ||
67 | result_cb_set2 (void *cls, const struct GNUNET_SET_Element *element, | ||
68 | enum GNUNET_SET_Status status) | ||
69 | { | ||
70 | switch (status) | ||
71 | { | ||
72 | case GNUNET_SET_STATUS_OK: | ||
73 | printf ("set 2: got element\n"); | ||
74 | break; | ||
75 | case GNUNET_SET_STATUS_FAILURE: | ||
76 | printf ("set 2: failure\n"); | ||
77 | break; | ||
78 | case GNUNET_SET_STATUS_DONE: | ||
79 | printf ("set 2: done\n"); | ||
80 | GNUNET_SET_destroy (set2); | ||
81 | break; | ||
82 | default: | ||
83 | GNUNET_assert (0); | ||
84 | } | ||
85 | } | ||
86 | |||
87 | |||
88 | static void | ||
89 | listen_cb (void *cls, | ||
90 | const struct GNUNET_PeerIdentity *other_peer, | ||
91 | const struct GNUNET_MessageHeader *context_msg, | ||
92 | struct GNUNET_SET_Request *request) | ||
93 | { | ||
94 | struct GNUNET_SET_OperationHandle *oh; | ||
95 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); | ||
96 | GNUNET_SET_listen_cancel (listen_handle); | ||
97 | |||
98 | oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); | ||
99 | GNUNET_SET_conclude (oh, set2); | ||
100 | } | ||
101 | |||
102 | |||
103 | /** | ||
104 | * Start the set operation. | ||
105 | * | ||
106 | * @param cls closure, unused | ||
107 | */ | ||
108 | static void | ||
109 | start (void *cls) | ||
110 | { | ||
111 | struct GNUNET_SET_OperationHandle *oh; | ||
112 | |||
113 | listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, | ||
114 | &app_id, listen_cb, NULL); | ||
115 | oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42, | ||
116 | GNUNET_SET_RESULT_ADDED, | ||
117 | result_cb_set1, NULL); | ||
118 | GNUNET_SET_conclude (oh, set1); | ||
119 | } | ||
120 | |||
121 | |||
122 | /** | ||
123 | * Initialize the second set, continue | ||
124 | * | ||
125 | * @param cls closure, unused | ||
126 | */ | ||
127 | static void | ||
128 | init_set2 (void *cls) | ||
129 | { | ||
130 | struct GNUNET_SET_Element element; | ||
131 | |||
132 | |||
133 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n"); | ||
134 | |||
135 | element.data = "hello"; | ||
136 | element.size = strlen(element.data); | ||
137 | GNUNET_SET_add_element (set2, &element, NULL, NULL); | ||
138 | element.data = "quux"; | ||
139 | element.size = strlen(element.data); | ||
140 | GNUNET_SET_add_element (set2, &element, start, NULL); | ||
141 | } | ||
142 | |||
143 | |||
144 | /** | ||
145 | * Initialize the first set, continue. | ||
146 | */ | ||
147 | static void | ||
148 | init_set1 (void) | ||
149 | { | ||
150 | struct GNUNET_SET_Element element; | ||
151 | |||
152 | element.data = "hello"; | ||
153 | element.size = strlen(element.data); | ||
154 | GNUNET_SET_add_element (set1, &element, NULL, NULL); | ||
155 | element.data = "bar"; | ||
156 | element.size = strlen(element.data); | ||
157 | GNUNET_SET_add_element (set1, &element, init_set2, NULL); | ||
158 | |||
159 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n"); | ||
160 | } | ||
161 | |||
162 | |||
163 | /** | ||
164 | * Main function that will be run. | ||
165 | * | ||
166 | * @param cls closure | ||
167 | * @param args remaining command-line arguments | ||
168 | * @param cfgfile name of the configuration file used (for saving, can be NULL!) | ||
169 | * @param cfg configuration | ||
170 | */ | ||
171 | static void | ||
172 | run (void *cls, char *const *args, | ||
173 | const char *cfgfile, | ||
174 | const struct GNUNET_CONFIGURATION_Handle *cfg) | ||
175 | { | ||
176 | static const char* app_str = "gnunet-set"; | ||
177 | |||
178 | config = cfg; | ||
179 | |||
180 | GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id); | ||
181 | |||
182 | GNUNET_CRYPTO_get_host_identity (cfg, &local_id); | ||
183 | |||
184 | set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | ||
185 | set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | ||
186 | |||
187 | init_set1 (); | ||
188 | } | ||
189 | |||
190 | |||
191 | |||
192 | int | ||
193 | main (int argc, char **argv) | ||
194 | { | ||
195 | static const struct GNUNET_GETOPT_CommandLineOption options[] = { | ||
196 | GNUNET_GETOPT_OPTION_END | ||
197 | }; | ||
198 | GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set", | ||
199 | "help", | ||
200 | options, &run, NULL, GNUNET_NO); | ||
201 | return 0; | ||
202 | } | ||
203 | |||
diff --git a/src/set/ibf.c b/src/set/ibf.c index 383ce3daf..e3c5be59a 100644 --- a/src/set/ibf.c +++ b/src/set/ibf.c | |||
@@ -19,7 +19,7 @@ | |||
19 | */ | 19 | */ |
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file consensus/ibf.c | 22 | * @file set/ibf.c |
23 | * @brief implementation of the invertible bloom filter | 23 | * @brief implementation of the invertible bloom filter |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | */ | 25 | */ |
@@ -27,6 +27,12 @@ | |||
27 | #include "ibf.h" | 27 | #include "ibf.h" |
28 | 28 | ||
29 | /** | 29 | /** |
30 | * Compute the key's hash from the key. | ||
31 | * Redefine to use a different hash function. | ||
32 | */ | ||
33 | #define IBF_KEY_HASH_VAL(k) (GNUNET_CRYPTO_crc32_n (&(k), sizeof (struct IBF_KeyHash))) | ||
34 | |||
35 | /** | ||
30 | * Create a key from a hashcode. | 36 | * Create a key from a hashcode. |
31 | * | 37 | * |
32 | * @param hash the hashcode | 38 | * @param hash the hashcode |
@@ -89,23 +95,21 @@ static inline void | |||
89 | ibf_get_indices (const struct InvertibleBloomFilter *ibf, | 95 | ibf_get_indices (const struct InvertibleBloomFilter *ibf, |
90 | struct IBF_Key key, int *dst) | 96 | struct IBF_Key key, int *dst) |
91 | { | 97 | { |
92 | struct GNUNET_HashCode bucket_indices; | 98 | uint32_t filled; |
93 | unsigned int filled; | 99 | uint32_t i; |
94 | int i; | 100 | uint32_t bucket = key.key_val & 0xFFFFFFFF; |
95 | GNUNET_CRYPTO_hash (&key, sizeof key, &bucket_indices); | 101 | |
96 | filled = 0; | 102 | for (i = 0, filled=0; filled < ibf->hash_num; i++) |
97 | for (i = 0; filled < ibf->hash_num; i++) | ||
98 | { | 103 | { |
99 | unsigned int bucket; | ||
100 | unsigned int j; | 104 | unsigned int j; |
101 | if ( (0 != i) && (0 == (i % 16)) ) | 105 | uint64_t x; |
102 | GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), &bucket_indices); | ||
103 | bucket = bucket_indices.bits[i % 16] % ibf->size; | ||
104 | for (j = 0; j < filled; j++) | 106 | for (j = 0; j < filled; j++) |
105 | if (dst[j] == bucket) | 107 | if (dst[j] == bucket) |
106 | goto try_next; | 108 | goto try_next; |
107 | dst[filled++] = bucket; | 109 | dst[filled++] = bucket % ibf->size; |
108 | try_next: ; | 110 | try_next: ; |
111 | x = ((uint64_t) bucket << 32) | i; | ||
112 | bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x); | ||
109 | } | 113 | } |
110 | } | 114 | } |
111 | 115 | ||
@@ -116,16 +120,14 @@ ibf_insert_into (struct InvertibleBloomFilter *ibf, | |||
116 | const int *buckets, int side) | 120 | const int *buckets, int side) |
117 | { | 121 | { |
118 | int i; | 122 | int i; |
119 | struct GNUNET_HashCode key_hash_sha; | 123 | |
120 | struct IBF_KeyHash key_hash; | ||
121 | GNUNET_CRYPTO_hash (&key, sizeof key, &key_hash_sha); | ||
122 | key_hash.key_hash_val = key_hash_sha.bits[0]; | ||
123 | for (i = 0; i < ibf->hash_num; i++) | 124 | for (i = 0; i < ibf->hash_num; i++) |
124 | { | 125 | { |
125 | const int bucket = buckets[i]; | 126 | const int bucket = buckets[i]; |
126 | ibf->count[bucket].count_val += side; | 127 | ibf->count[bucket].count_val += side; |
127 | ibf->key_sum[bucket].key_val ^= key.key_val; | 128 | ibf->key_sum[bucket].key_val ^= key.key_val; |
128 | ibf->key_hash_sum[bucket].key_hash_val ^= key_hash.key_hash_val; | 129 | ibf->key_hash_sum[bucket].key_hash_val |
130 | ^= IBF_KEY_HASH_VAL (key); | ||
129 | } | 131 | } |
130 | } | 132 | } |
131 | 133 | ||
@@ -183,7 +185,6 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
183 | { | 185 | { |
184 | struct IBF_KeyHash hash; | 186 | struct IBF_KeyHash hash; |
185 | int i; | 187 | int i; |
186 | struct GNUNET_HashCode key_hash_sha; | ||
187 | int buckets[ibf->hash_num]; | 188 | int buckets[ibf->hash_num]; |
188 | 189 | ||
189 | GNUNET_assert (NULL != ibf); | 190 | GNUNET_assert (NULL != ibf); |
@@ -197,8 +198,7 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
197 | if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val)) | 198 | if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val)) |
198 | continue; | 199 | continue; |
199 | 200 | ||
200 | GNUNET_CRYPTO_hash (&ibf->key_sum[i], sizeof (struct IBF_Key), &key_hash_sha); | 201 | hash.key_hash_val = IBF_KEY_HASH_VAL (ibf->key_sum[i]); |
201 | hash.key_hash_val = key_hash_sha.bits[0]; | ||
202 | 202 | ||
203 | /* test if the hash matches the key */ | 203 | /* test if the hash matches the key */ |
204 | if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val) | 204 | if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val) |
diff --git a/src/set/ibf.h b/src/set/ibf.h index 2bf3ef7c7..90ea231c0 100644 --- a/src/set/ibf.h +++ b/src/set/ibf.h | |||
@@ -19,7 +19,7 @@ | |||
19 | */ | 19 | */ |
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file consensus/ibf.h | 22 | * @file set/ibf.h |
23 | * @brief invertible bloom filter | 23 | * @brief invertible bloom filter |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | */ | 25 | */ |
diff --git a/src/set/set_api.c b/src/set/set_api.c index c74933aa0..e1b6132cb 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c | |||
@@ -40,7 +40,7 @@ | |||
40 | struct GNUNET_SET_Handle | 40 | struct GNUNET_SET_Handle |
41 | { | 41 | { |
42 | struct GNUNET_CLIENT_Connection *client; | 42 | struct GNUNET_CLIENT_Connection *client; |
43 | struct GNUNET_MQ_MessageQueue *mq; | 43 | struct GNUNET_MQ_Handle *mq; |
44 | unsigned int messages_since_ack; | 44 | unsigned int messages_since_ack; |
45 | }; | 45 | }; |
46 | 46 | ||
@@ -73,7 +73,7 @@ struct GNUNET_SET_OperationHandle | |||
73 | * Message sent to the server on calling conclude, | 73 | * Message sent to the server on calling conclude, |
74 | * NULL if conclude has been called. | 74 | * NULL if conclude has been called. |
75 | */ | 75 | */ |
76 | struct GNUNET_MQ_Message *conclude_mqm; | 76 | struct GNUNET_MQ_Envelope *conclude_mqm; |
77 | 77 | ||
78 | /** | 78 | /** |
79 | * Address of the request if in the conclude message, | 79 | * Address of the request if in the conclude message, |
@@ -89,7 +89,7 @@ struct GNUNET_SET_OperationHandle | |||
89 | struct GNUNET_SET_ListenHandle | 89 | struct GNUNET_SET_ListenHandle |
90 | { | 90 | { |
91 | struct GNUNET_CLIENT_Connection *client; | 91 | struct GNUNET_CLIENT_Connection *client; |
92 | struct GNUNET_MQ_MessageQueue* mq; | 92 | struct GNUNET_MQ_Handle* mq; |
93 | GNUNET_SET_ListenCallback listen_cb; | 93 | GNUNET_SET_ListenCallback listen_cb; |
94 | void *listen_cls; | 94 | void *listen_cls; |
95 | }; | 95 | }; |
@@ -115,7 +115,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) | |||
115 | 115 | ||
116 | if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) | 116 | if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) |
117 | { | 117 | { |
118 | struct GNUNET_MQ_Message *mqm; | 118 | struct GNUNET_MQ_Envelope *mqm; |
119 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK); | 119 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK); |
120 | GNUNET_MQ_send (set->mq, mqm); | 120 | GNUNET_MQ_send (set->mq, mqm); |
121 | } | 121 | } |
@@ -162,7 +162,7 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh) | |||
162 | 162 | ||
163 | if (GNUNET_NO == req->accepted) | 163 | if (GNUNET_NO == req->accepted) |
164 | { | 164 | { |
165 | struct GNUNET_MQ_Message *mqm; | 165 | struct GNUNET_MQ_Envelope *mqm; |
166 | struct GNUNET_SET_AcceptRejectMessage *amsg; | 166 | struct GNUNET_SET_AcceptRejectMessage *amsg; |
167 | 167 | ||
168 | mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT); | 168 | mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT); |
@@ -197,9 +197,9 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
197 | enum GNUNET_SET_OperationType op) | 197 | enum GNUNET_SET_OperationType op) |
198 | { | 198 | { |
199 | struct GNUNET_SET_Handle *set; | 199 | struct GNUNET_SET_Handle *set; |
200 | struct GNUNET_MQ_Message *mqm; | 200 | struct GNUNET_MQ_Envelope *mqm; |
201 | struct GNUNET_SET_CreateMessage *msg; | 201 | struct GNUNET_SET_CreateMessage *msg; |
202 | static const struct GNUNET_MQ_Handler mq_handlers[] = { | 202 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { |
203 | {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT}, | 203 | {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT}, |
204 | GNUNET_MQ_HANDLERS_END | 204 | GNUNET_MQ_HANDLERS_END |
205 | }; | 205 | }; |
@@ -234,7 +234,7 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set, | |||
234 | GNUNET_SET_Continuation cont, | 234 | GNUNET_SET_Continuation cont, |
235 | void *cont_cls) | 235 | void *cont_cls) |
236 | { | 236 | { |
237 | struct GNUNET_MQ_Message *mqm; | 237 | struct GNUNET_MQ_Envelope *mqm; |
238 | struct GNUNET_SET_ElementMessage *msg; | 238 | struct GNUNET_SET_ElementMessage *msg; |
239 | 239 | ||
240 | mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD); | 240 | mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD); |
@@ -262,7 +262,7 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set, | |||
262 | GNUNET_SET_Continuation cont, | 262 | GNUNET_SET_Continuation cont, |
263 | void *cont_cls) | 263 | void *cont_cls) |
264 | { | 264 | { |
265 | struct GNUNET_MQ_Message *mqm; | 265 | struct GNUNET_MQ_Envelope *mqm; |
266 | struct GNUNET_SET_ElementMessage *msg; | 266 | struct GNUNET_SET_ElementMessage *msg; |
267 | 267 | ||
268 | mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE); | 268 | mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE); |
@@ -287,9 +287,9 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) | |||
287 | 287 | ||
288 | 288 | ||
289 | /** | 289 | /** |
290 | * Create a set operation for evaluation with another peer. | 290 | * Prepare a set operation to be evaluated with another peer. |
291 | * The evaluation will not start until the client provides | 291 | * The evaluation will not start until the client provides |
292 | * a local set with GNUNET_SET_conclude. | 292 | * a local set with GNUNET_SET_commit. |
293 | * | 293 | * |
294 | * @param other_peer peer with the other set | 294 | * @param other_peer peer with the other set |
295 | * @param app_id hash for the application using the set | 295 | * @param app_id hash for the application using the set |
@@ -304,15 +304,15 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) | |||
304 | * @return a handle to cancel the operation | 304 | * @return a handle to cancel the operation |
305 | */ | 305 | */ |
306 | struct GNUNET_SET_OperationHandle * | 306 | struct GNUNET_SET_OperationHandle * |
307 | GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer, | 307 | GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer, |
308 | const struct GNUNET_HashCode *app_id, | 308 | const struct GNUNET_HashCode *app_id, |
309 | const struct GNUNET_MessageHeader *context_msg, | 309 | const struct GNUNET_MessageHeader *context_msg, |
310 | uint16_t salt, | 310 | uint16_t salt, |
311 | enum GNUNET_SET_ResultMode result_mode, | 311 | enum GNUNET_SET_ResultMode result_mode, |
312 | GNUNET_SET_ResultIterator result_cb, | 312 | GNUNET_SET_ResultIterator result_cb, |
313 | void *result_cls) | 313 | void *result_cls) |
314 | { | 314 | { |
315 | struct GNUNET_MQ_Message *mqm; | 315 | struct GNUNET_MQ_Envelope *mqm; |
316 | struct GNUNET_SET_OperationHandle *oh; | 316 | struct GNUNET_SET_OperationHandle *oh; |
317 | struct GNUNET_SET_EvaluateMessage *msg; | 317 | struct GNUNET_SET_EvaluateMessage *msg; |
318 | 318 | ||
@@ -322,9 +322,6 @@ GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer, | |||
322 | 322 | ||
323 | mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, context_msg); | 323 | mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, context_msg); |
324 | 324 | ||
325 | if (NULL != context_msg) | ||
326 | LOG (GNUNET_ERROR_TYPE_INFO, "passed context msg\n"); | ||
327 | |||
328 | msg->app_id = *app_id; | 325 | msg->app_id = *app_id; |
329 | msg->target_peer = *other_peer; | 326 | msg->target_peer = *other_peer; |
330 | msg->salt = salt; | 327 | msg->salt = salt; |
@@ -356,9 +353,9 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
356 | void *listen_cls) | 353 | void *listen_cls) |
357 | { | 354 | { |
358 | struct GNUNET_SET_ListenHandle *lh; | 355 | struct GNUNET_SET_ListenHandle *lh; |
359 | struct GNUNET_MQ_Message *mqm; | 356 | struct GNUNET_MQ_Envelope *mqm; |
360 | struct GNUNET_SET_ListenMessage *msg; | 357 | struct GNUNET_SET_ListenMessage *msg; |
361 | static const struct GNUNET_MQ_Handler mq_handlers[] = { | 358 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { |
362 | {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, | 359 | {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, |
363 | GNUNET_MQ_HANDLERS_END | 360 | GNUNET_MQ_HANDLERS_END |
364 | }; | 361 | }; |
@@ -403,7 +400,7 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) | |||
403 | * @param result_mode specified how results will be returned, | 400 | * @param result_mode specified how results will be returned, |
404 | * see 'GNUNET_SET_ResultMode'. | 401 | * see 'GNUNET_SET_ResultMode'. |
405 | * @param result_cb callback for the results | 402 | * @param result_cb callback for the results |
406 | * @param result_cls closure for result_cb | 403 | * @param cls closure for result_cb |
407 | * @return a handle to cancel the operation | 404 | * @return a handle to cancel the operation |
408 | */ | 405 | */ |
409 | struct GNUNET_SET_OperationHandle * | 406 | struct GNUNET_SET_OperationHandle * |
@@ -412,7 +409,7 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, | |||
412 | GNUNET_SET_ResultIterator result_cb, | 409 | GNUNET_SET_ResultIterator result_cb, |
413 | void *cls) | 410 | void *cls) |
414 | { | 411 | { |
415 | struct GNUNET_MQ_Message *mqm; | 412 | struct GNUNET_MQ_Envelope *mqm; |
416 | struct GNUNET_SET_OperationHandle *oh; | 413 | struct GNUNET_SET_OperationHandle *oh; |
417 | struct GNUNET_SET_AcceptRejectMessage *msg; | 414 | struct GNUNET_SET_AcceptRejectMessage *msg; |
418 | 415 | ||
@@ -441,7 +438,7 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, | |||
441 | void | 438 | void |
442 | GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) | 439 | GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) |
443 | { | 440 | { |
444 | struct GNUNET_MQ_Message *mqm; | 441 | struct GNUNET_MQ_Envelope *mqm; |
445 | struct GNUNET_SET_OperationHandle *h_assoc; | 442 | struct GNUNET_SET_OperationHandle *h_assoc; |
446 | 443 | ||
447 | if (NULL != oh->set) | 444 | if (NULL != oh->set) |
@@ -460,7 +457,7 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) | |||
460 | 457 | ||
461 | 458 | ||
462 | /** | 459 | /** |
463 | * Conclude the given set operation using the given set. | 460 | * Commit a set to be used with a set operation. |
464 | * This function is called once we have fully constructed | 461 | * This function is called once we have fully constructed |
465 | * the set that we want to use for the operation. At this | 462 | * the set that we want to use for the operation. At this |
466 | * time, the P2P protocol can then begin to exchange the | 463 | * time, the P2P protocol can then begin to exchange the |
@@ -471,13 +468,13 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) | |||
471 | * @param set the set to use for the operation | 468 | * @param set the set to use for the operation |
472 | */ | 469 | */ |
473 | void | 470 | void |
474 | GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh, | 471 | GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh, |
475 | struct GNUNET_SET_Handle *set) | 472 | struct GNUNET_SET_Handle *set) |
476 | { | 473 | { |
477 | GNUNET_assert (NULL == oh->set); | 474 | GNUNET_assert (NULL == oh->set); |
478 | GNUNET_assert (NULL != oh->conclude_mqm); | 475 | GNUNET_assert (NULL != oh->conclude_mqm); |
479 | oh->set = set; | 476 | oh->set = set; |
480 | oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, NULL, oh); | 477 | oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, oh); |
481 | *oh->request_id_addr = htonl (oh->request_id); | 478 | *oh->request_id_addr = htonl (oh->request_id); |
482 | GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm); | 479 | GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm); |
483 | oh->conclude_mqm = NULL; | 480 | oh->conclude_mqm = NULL; |
diff --git a/src/set/strata_estimator.c b/src/set/strata_estimator.c index 024bb99c6..18c127cd6 100644 --- a/src/set/strata_estimator.c +++ b/src/set/strata_estimator.c | |||
@@ -19,7 +19,7 @@ | |||
19 | */ | 19 | */ |
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file consensus/ibf.h | 22 | * @file set/ibf.h |
23 | * @brief invertible bloom filter | 23 | * @brief invertible bloom filter |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | */ | 25 | */ |
diff --git a/src/set/strata_estimator.h b/src/set/strata_estimator.h index b3f050743..718c996d0 100644 --- a/src/set/strata_estimator.h +++ b/src/set/strata_estimator.h | |||
@@ -19,7 +19,7 @@ | |||
19 | */ | 19 | */ |
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file consensus/strata_estimator.h | 22 | * @file set/strata_estimator.h |
23 | * @brief estimator of set difference | 23 | * @brief estimator of set difference |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | */ | 25 | */ |
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c index f773cebdf..db82b83b4 100644 --- a/src/set/test_set_api.c +++ b/src/set/test_set_api.c | |||
@@ -95,7 +95,7 @@ listen_cb (void *cls, | |||
95 | GNUNET_SET_listen_cancel (listen_handle); | 95 | GNUNET_SET_listen_cancel (listen_handle); |
96 | 96 | ||
97 | oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); | 97 | oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); |
98 | GNUNET_SET_conclude (oh, set2); | 98 | GNUNET_SET_commit (oh, set2); |
99 | } | 99 | } |
100 | 100 | ||
101 | 101 | ||
@@ -111,10 +111,10 @@ start (void *cls) | |||
111 | 111 | ||
112 | listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, | 112 | listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, |
113 | &app_id, listen_cb, NULL); | 113 | &app_id, listen_cb, NULL); |
114 | oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42, | 114 | oh = GNUNET_SET_prepare (&local_id, &app_id, NULL, 42, |
115 | GNUNET_SET_RESULT_ADDED, | 115 | GNUNET_SET_RESULT_ADDED, |
116 | result_cb_set1, NULL); | 116 | result_cb_set1, NULL); |
117 | GNUNET_SET_conclude (oh, set1); | 117 | GNUNET_SET_commit (oh, set1); |
118 | } | 118 | } |
119 | 119 | ||
120 | 120 | ||
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 34f1ea0fa..47ed04117 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -3779,11 +3779,11 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh) | |||
3779 | * @param size the number of bytes written | 3779 | * @param size the number of bytes written |
3780 | */ | 3780 | */ |
3781 | static void | 3781 | static void |
3782 | mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 3782 | mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, |
3783 | size_t size) | ||
3783 | { | 3784 | { |
3784 | struct GNUNET_MQ_MessageQueue *mq = cls; | 3785 | struct GNUNET_MQ_Handle *mq = cls; |
3785 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | 3786 | struct MQStreamState *mss = GNUNET_MQ_impl_state (mq); |
3786 | struct GNUNET_MQ_Message *mqm; | ||
3787 | 3787 | ||
3788 | switch (status) | 3788 | switch (status) |
3789 | { | 3789 | { |
@@ -3793,56 +3793,32 @@ mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size | |||
3793 | /* FIXME: call shutdown handler */ | 3793 | /* FIXME: call shutdown handler */ |
3794 | return; | 3794 | return; |
3795 | case GNUNET_STREAM_TIMEOUT: | 3795 | case GNUNET_STREAM_TIMEOUT: |
3796 | if (NULL == mq->error_handler) | 3796 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT); |
3797 | LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n"); | ||
3798 | else | ||
3799 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT); | ||
3800 | return; | 3797 | return; |
3801 | case GNUNET_STREAM_SYSERR: | 3798 | case GNUNET_STREAM_SYSERR: |
3802 | if (NULL == mq->error_handler) | 3799 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE); |
3803 | LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n"); | ||
3804 | else | ||
3805 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE); | ||
3806 | return; | 3800 | return; |
3807 | default: | 3801 | default: |
3808 | GNUNET_assert (0); | 3802 | GNUNET_assert (0); |
3809 | return; | 3803 | return; |
3810 | } | 3804 | } |
3811 | |||
3812 | /* call cb for message we finished sending */ | ||
3813 | mqm = mq->current_msg; | ||
3814 | GNUNET_assert (NULL != mq->current_msg); | ||
3815 | if (NULL != mqm->sent_cb) | ||
3816 | mqm->sent_cb (mqm->sent_cls); | ||
3817 | GNUNET_free (mqm); | ||
3818 | 3805 | ||
3819 | mss->wh = NULL; | 3806 | mss->wh = NULL; |
3820 | 3807 | ||
3821 | mqm = mq->msg_head; | 3808 | GNUNET_MQ_impl_send_continue (mq); |
3822 | mq->current_msg = mqm; | ||
3823 | if (NULL == mqm) | ||
3824 | return; | ||
3825 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); | ||
3826 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
3827 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
3828 | mq_stream_write_queued, mq); | ||
3829 | GNUNET_assert (NULL != mss->wh); | ||
3830 | } | 3809 | } |
3831 | 3810 | ||
3832 | 3811 | ||
3833 | static void | 3812 | static void |
3834 | mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, | 3813 | mq_stream_send_impl (struct GNUNET_MQ_Handle *mq, |
3835 | struct GNUNET_MQ_Message *mqm) | 3814 | const struct GNUNET_MessageHeader *msg, void *impl_state) |
3836 | { | 3815 | { |
3837 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | 3816 | struct MQStreamState *mss = impl_state; |
3838 | 3817 | ||
3839 | if (NULL != mq->current_msg) | 3818 | /* no way to cancel sending now */ |
3840 | { | 3819 | GNUNET_MQ_impl_send_commit (mq); |
3841 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | 3820 | |
3842 | return; | 3821 | mss->wh = GNUNET_STREAM_write (mss->socket, msg, ntohs (msg->size), |
3843 | } | ||
3844 | mq->current_msg = mqm; | ||
3845 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
3846 | GNUNET_TIME_UNIT_FOREVER_REL, | 3822 | GNUNET_TIME_UNIT_FOREVER_REL, |
3847 | mq_stream_write_queued, mq); | 3823 | mq_stream_write_queued, mq); |
3848 | } | 3824 | } |
@@ -3862,12 +3838,12 @@ mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, | |||
3862 | */ | 3838 | */ |
3863 | static int | 3839 | static int |
3864 | mq_stream_mst_callback (void *cls, void *client, | 3840 | mq_stream_mst_callback (void *cls, void *client, |
3865 | const struct GNUNET_MessageHeader *message) | 3841 | const struct GNUNET_MessageHeader *message) |
3866 | { | 3842 | { |
3867 | struct GNUNET_MQ_MessageQueue *mq = cls; | 3843 | struct GNUNET_MQ_Handle *mq = cls; |
3868 | 3844 | ||
3869 | GNUNET_assert (NULL != message); | 3845 | GNUNET_assert (NULL != message); |
3870 | GNUNET_MQ_dispatch (mq, message); | 3846 | GNUNET_MQ_inject_message (mq, message); |
3871 | return GNUNET_OK; | 3847 | return GNUNET_OK; |
3872 | } | 3848 | } |
3873 | 3849 | ||
@@ -3889,8 +3865,8 @@ mq_stream_data_processor (void *cls, | |||
3889 | const void *data, | 3865 | const void *data, |
3890 | size_t size) | 3866 | size_t size) |
3891 | { | 3867 | { |
3892 | struct GNUNET_MQ_MessageQueue *mq = cls; | 3868 | struct GNUNET_MQ_Handle *mq = cls; |
3893 | struct MQStreamState *mss; | 3869 | struct MQStreamState *mss = GNUNET_MQ_impl_state (mq); |
3894 | int ret; | 3870 | int ret; |
3895 | 3871 | ||
3896 | switch (status) | 3872 | switch (status) |
@@ -3901,45 +3877,33 @@ mq_stream_data_processor (void *cls, | |||
3901 | /* FIXME: call shutdown handler */ | 3877 | /* FIXME: call shutdown handler */ |
3902 | return 0; | 3878 | return 0; |
3903 | case GNUNET_STREAM_TIMEOUT: | 3879 | case GNUNET_STREAM_TIMEOUT: |
3904 | if (NULL == mq->error_handler) | 3880 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT); |
3905 | LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n"); | ||
3906 | else | ||
3907 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT); | ||
3908 | return 0; | 3881 | return 0; |
3909 | case GNUNET_STREAM_SYSERR: | 3882 | case GNUNET_STREAM_SYSERR: |
3910 | if (NULL == mq->error_handler) | 3883 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); |
3911 | LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n"); | ||
3912 | else | ||
3913 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
3914 | return 0; | 3884 | return 0; |
3915 | default: | 3885 | default: |
3916 | GNUNET_assert (0); | 3886 | GNUNET_assert (0); |
3917 | return 0; | 3887 | return 0; |
3918 | } | 3888 | } |
3919 | 3889 | ||
3920 | mss = (struct MQStreamState *) mq->impl_state; | ||
3921 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
3922 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); | 3890 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); |
3923 | if (GNUNET_OK != ret) | 3891 | if (GNUNET_OK != ret) |
3924 | { | 3892 | { |
3925 | if (NULL == mq->error_handler) | 3893 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); |
3926 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
3927 | "read error (message stream malformed), but no error handler installed for message queue\n"); | ||
3928 | else | ||
3929 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
3930 | return 0; | 3894 | return 0; |
3931 | } | 3895 | } |
3932 | /* we always read all data */ | ||
3933 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, | 3896 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, |
3934 | mq_stream_data_processor, mq); | 3897 | mq_stream_data_processor, mq); |
3898 | /* we always read all data */ | ||
3935 | return size; | 3899 | return size; |
3936 | } | 3900 | } |
3937 | 3901 | ||
3938 | 3902 | ||
3939 | static void | 3903 | static void |
3940 | mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | 3904 | mq_stream_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) |
3941 | { | 3905 | { |
3942 | struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; | 3906 | struct MQStreamState *mss = impl_state; |
3943 | 3907 | ||
3944 | if (NULL != mss->rh) | 3908 | if (NULL != mss->rh) |
3945 | { | 3909 | { |
@@ -3972,24 +3936,21 @@ mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | |||
3972 | * @param error_handler callback for errors | 3936 | * @param error_handler callback for errors |
3973 | * @return the message queue for the socket | 3937 | * @return the message queue for the socket |
3974 | */ | 3938 | */ |
3975 | struct GNUNET_MQ_MessageQueue * | 3939 | struct GNUNET_MQ_Handle * |
3976 | GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, | 3940 | GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, |
3977 | const struct GNUNET_MQ_Handler *msg_handlers, | 3941 | const struct GNUNET_MQ_MessageHandler *msg_handlers, |
3978 | GNUNET_MQ_ErrorHandler error_handler, | 3942 | GNUNET_MQ_ErrorHandler error_handler, |
3979 | void *cls) | 3943 | void *cls) |
3980 | { | 3944 | { |
3981 | struct GNUNET_MQ_MessageQueue *mq; | 3945 | struct GNUNET_MQ_Handle *mq; |
3982 | struct MQStreamState *mss; | 3946 | struct MQStreamState *mss; |
3983 | 3947 | ||
3984 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | ||
3985 | mss = GNUNET_new (struct MQStreamState); | 3948 | mss = GNUNET_new (struct MQStreamState); |
3986 | mss->socket = socket; | 3949 | mss->socket = socket; |
3987 | mq->impl_state = mss; | 3950 | mq = GNUNET_MQ_queue_for_callbacks (mq_stream_send_impl, |
3988 | mq->send_impl = mq_stream_send_impl; | 3951 | mq_stream_destroy_impl, |
3989 | mq->destroy_impl = mq_stream_destroy_impl; | 3952 | NULL, |
3990 | mq->handlers = msg_handlers; | 3953 | mss, msg_handlers, error_handler, cls); |
3991 | mq->handlers_cls = cls; | ||
3992 | mq->error_handler = error_handler; | ||
3993 | if (NULL != msg_handlers) | 3954 | if (NULL != msg_handlers) |
3994 | { | 3955 | { |
3995 | mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); | 3956 | mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); |
diff --git a/src/util/mq.c b/src/util/mq.c index dc87b9711..d0253c40f 100644 --- a/src/util/mq.c +++ b/src/util/mq.c | |||
@@ -31,6 +31,118 @@ | |||
31 | #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) | 31 | #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) |
32 | 32 | ||
33 | 33 | ||
34 | struct GNUNET_MQ_Envelope | ||
35 | { | ||
36 | /** | ||
37 | * Messages are stored in a linked list. | ||
38 | * Each queue has its own list of envelopes. | ||
39 | */ | ||
40 | struct GNUNET_MQ_Envelope *next; | ||
41 | |||
42 | /** | ||
43 | * Messages are stored in a linked list | ||
44 | * Each queue has its own list of envelopes. | ||
45 | */ | ||
46 | struct GNUNET_MQ_Envelope *prev; | ||
47 | |||
48 | /** | ||
49 | * Actual allocated message header, | ||
50 | * usually points to the end of the containing GNUNET_MQ_Envelope | ||
51 | */ | ||
52 | struct GNUNET_MessageHeader *mh; | ||
53 | |||
54 | /** | ||
55 | * Queue the message is queued in, NULL if message is not queued. | ||
56 | */ | ||
57 | struct GNUNET_MQ_Handle *parent_queue; | ||
58 | |||
59 | /** | ||
60 | * Called after the message was sent irrevocably. | ||
61 | */ | ||
62 | GNUNET_MQ_NotifyCallback sent_cb; | ||
63 | |||
64 | /** | ||
65 | * Closure for send_cb | ||
66 | */ | ||
67 | void *sent_cls; | ||
68 | }; | ||
69 | |||
70 | |||
71 | /** | ||
72 | * Handle to a message queue. | ||
73 | */ | ||
74 | struct GNUNET_MQ_Handle | ||
75 | { | ||
76 | /** | ||
77 | * Handlers array, or NULL if the queue should not receive messages | ||
78 | */ | ||
79 | const struct GNUNET_MQ_MessageHandler *handlers; | ||
80 | |||
81 | /** | ||
82 | * Closure for the handler callbacks, | ||
83 | * as well as for the error handler. | ||
84 | */ | ||
85 | void *handlers_cls; | ||
86 | |||
87 | /** | ||
88 | * Actual implementation of message sending, | ||
89 | * called when a message is added | ||
90 | */ | ||
91 | GNUNET_MQ_SendImpl send_impl; | ||
92 | |||
93 | /** | ||
94 | * Implementation-dependent queue destruction function | ||
95 | */ | ||
96 | GNUNET_MQ_DestroyImpl destroy_impl; | ||
97 | |||
98 | /** | ||
99 | * Implementation-specific state | ||
100 | */ | ||
101 | void *impl_state; | ||
102 | |||
103 | /** | ||
104 | * Callback will be called when an error occurs. | ||
105 | */ | ||
106 | GNUNET_MQ_ErrorHandler error_handler; | ||
107 | |||
108 | /** | ||
109 | * Linked list of messages pending to be sent | ||
110 | */ | ||
111 | struct GNUNET_MQ_Envelope *envelope_head; | ||
112 | |||
113 | /** | ||
114 | * Linked list of messages pending to be sent | ||
115 | */ | ||
116 | struct GNUNET_MQ_Envelope *envelope_tail; | ||
117 | |||
118 | /** | ||
119 | * Message that is currently scheduled to be | ||
120 | * sent. Not the head of the message queue, as the implementation | ||
121 | * needs to know if sending has been already scheduled or not. | ||
122 | */ | ||
123 | struct GNUNET_MQ_Envelope *current_envelope; | ||
124 | |||
125 | /** | ||
126 | * Has the current envelope been commited? | ||
127 | * Either GNUNET_YES or GNUNET_NO. | ||
128 | */ | ||
129 | int commited; | ||
130 | |||
131 | /** | ||
132 | * Map of associations, lazily allocated | ||
133 | */ | ||
134 | struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; | ||
135 | |||
136 | /** | ||
137 | * Next id that should be used for the assoc_map, | ||
138 | * initialized lazily to a random value together with | ||
139 | * assoc_map | ||
140 | */ | ||
141 | uint32_t assoc_id; | ||
142 | }; | ||
143 | |||
144 | |||
145 | |||
34 | 146 | ||
35 | struct ServerClientSocketState | 147 | struct ServerClientSocketState |
36 | { | 148 | { |
@@ -42,9 +154,14 @@ struct ServerClientSocketState | |||
42 | struct ClientConnectionState | 154 | struct ClientConnectionState |
43 | { | 155 | { |
44 | /** | 156 | /** |
45 | * Did we call receive? | 157 | * Did we call receive alread alreadyy? |
46 | */ | 158 | */ |
47 | int receive_active; | 159 | int receive_active; |
160 | |||
161 | /** | ||
162 | * Do we also want to receive? | ||
163 | */ | ||
164 | int receive_requested; | ||
48 | struct GNUNET_CLIENT_Connection *connection; | 165 | struct GNUNET_CLIENT_Connection *connection; |
49 | struct GNUNET_CLIENT_TransmitHandle *th; | 166 | struct GNUNET_CLIENT_TransmitHandle *th; |
50 | }; | 167 | }; |
@@ -59,9 +176,9 @@ struct ClientConnectionState | |||
59 | * @param mh message to dispatch | 176 | * @param mh message to dispatch |
60 | */ | 177 | */ |
61 | void | 178 | void |
62 | GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) | 179 | GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh) |
63 | { | 180 | { |
64 | const struct GNUNET_MQ_Handler *handler; | 181 | const struct GNUNET_MQ_MessageHandler *handler; |
65 | int handled = GNUNET_NO; | 182 | int handled = GNUNET_NO; |
66 | 183 | ||
67 | handler = mq->handlers; | 184 | handler = mq->handlers; |
@@ -81,8 +198,27 @@ GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_Messa | |||
81 | } | 198 | } |
82 | 199 | ||
83 | 200 | ||
201 | /** | ||
202 | * Call the right callback for an error condition. | ||
203 | * | ||
204 | * @param mq message queue | ||
205 | */ | ||
84 | void | 206 | void |
85 | GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm) | 207 | GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, |
208 | enum GNUNET_MQ_Error error) | ||
209 | { | ||
210 | if (NULL == mq->error_handler) | ||
211 | { | ||
212 | /* FIXME: log what kind of error occured */ | ||
213 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n"); | ||
214 | return; | ||
215 | } | ||
216 | mq->error_handler (mq->handlers_cls, error); | ||
217 | } | ||
218 | |||
219 | |||
220 | void | ||
221 | GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm) | ||
86 | { | 222 | { |
87 | GNUNET_assert (NULL == mqm->parent_queue); | 223 | GNUNET_assert (NULL == mqm->parent_queue); |
88 | GNUNET_free (mqm); | 224 | GNUNET_free (mqm); |
@@ -94,20 +230,156 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm) | |||
94 | * May only be called once per message. | 230 | * May only be called once per message. |
95 | * | 231 | * |
96 | * @param mq message queue | 232 | * @param mq message queue |
97 | * @param mqm the message to send. | 233 | * @param ev the message to send. |
234 | */ | ||
235 | void | ||
236 | GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) | ||
237 | { | ||
238 | GNUNET_assert (NULL != mq); | ||
239 | GNUNET_assert (NULL == ev->parent_queue); | ||
240 | |||
241 | /* is the implementation busy? queue it! */ | ||
242 | if (NULL != mq->current_envelope) | ||
243 | { | ||
244 | GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev); | ||
245 | return; | ||
246 | } | ||
247 | mq->current_envelope = ev; | ||
248 | mq->send_impl (mq, ev->mh, mq->impl_state); | ||
249 | } | ||
250 | |||
251 | |||
252 | /** | ||
253 | * Call the send implementation for the next queued message, | ||
254 | * if any. | ||
255 | * Only useful for implementing message queues, | ||
256 | * results in undefined behavior if not used carefully. | ||
257 | * | ||
258 | * @param mq message queue to send the next message with | ||
98 | */ | 259 | */ |
99 | void | 260 | void |
100 | GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | 261 | GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) |
101 | { | 262 | { |
263 | /* call is only valid if we're actually currently sending | ||
264 | * a message */ | ||
102 | GNUNET_assert (NULL != mq); | 265 | GNUNET_assert (NULL != mq); |
103 | mq->send_impl (mq, mqm); | 266 | GNUNET_assert (NULL != mq->current_envelope); |
267 | GNUNET_assert (GNUNET_YES == mq->commited); | ||
268 | mq->commited = GNUNET_NO; | ||
269 | GNUNET_free (mq->current_envelope); | ||
270 | if (NULL == mq->envelope_head) | ||
271 | { | ||
272 | mq->current_envelope = NULL; | ||
273 | return; | ||
274 | } | ||
275 | |||
276 | |||
277 | GNUNET_assert (NULL != mq->envelope_tail); | ||
278 | GNUNET_assert (NULL != mq->envelope_head); | ||
279 | mq->current_envelope = mq->envelope_head; | ||
280 | GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, | ||
281 | mq->current_envelope); | ||
282 | mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); | ||
104 | } | 283 | } |
105 | 284 | ||
106 | 285 | ||
107 | struct GNUNET_MQ_Message * | 286 | /** |
287 | * Create a message queue for the specified handlers. | ||
288 | * | ||
289 | * @param send function the implements sending messages | ||
290 | * @param destroy function that implements destroying the queue | ||
291 | * @param destroy function that implements canceling a message | ||
292 | * @param state for the queue, passed to 'send' and 'destroy' | ||
293 | * @param handlers array of message handlers | ||
294 | * @param error_handler handler for read and write errors | ||
295 | * @return a new message queue | ||
296 | */ | ||
297 | struct GNUNET_MQ_Handle * | ||
298 | GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, | ||
299 | GNUNET_MQ_DestroyImpl destroy, | ||
300 | GNUNET_MQ_CancelImpl cancel, | ||
301 | void *impl_state, | ||
302 | const struct GNUNET_MQ_MessageHandler *handlers, | ||
303 | GNUNET_MQ_ErrorHandler error_handler, | ||
304 | void *cls) | ||
305 | { | ||
306 | struct GNUNET_MQ_Handle *mq; | ||
307 | |||
308 | mq = GNUNET_new (struct GNUNET_MQ_Handle); | ||
309 | mq->send_impl = send; | ||
310 | mq->destroy_impl = destroy; | ||
311 | mq->handlers = handlers; | ||
312 | mq->handlers_cls = cls; | ||
313 | mq->impl_state = impl_state; | ||
314 | |||
315 | return mq; | ||
316 | } | ||
317 | |||
318 | |||
319 | /** | ||
320 | * Get the message that should currently be sent. | ||
321 | * Fails if there is no current message. | ||
322 | * Only useful for implementing message queues, | ||
323 | * results in undefined behavior if not used carefully. | ||
324 | * | ||
325 | * @param mq message queue with the current message | ||
326 | * @return message to send, never NULL | ||
327 | */ | ||
328 | const struct GNUNET_MessageHeader * | ||
329 | GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq) | ||
330 | { | ||
331 | if (NULL == mq->current_envelope) | ||
332 | GNUNET_abort (); | ||
333 | if (NULL == mq->current_envelope->mh) | ||
334 | GNUNET_abort (); | ||
335 | return mq->current_envelope->mh; | ||
336 | } | ||
337 | |||
338 | |||
339 | /** | ||
340 | * Get the implementation state associated with the | ||
341 | * message queue. | ||
342 | * | ||
343 | * While the GNUNET_MQ_Impl* callbacks receive the | ||
344 | * implementation state, continuations that are scheduled | ||
345 | * by the implementation function often only have one closure | ||
346 | * argument, with this function it is possible to get at the | ||
347 | * implementation state when only passing the GNUNET_MQ_Handle | ||
348 | * as closure. | ||
349 | * | ||
350 | * @param mq message queue with the current message | ||
351 | * @return message to send, never NULL | ||
352 | */ | ||
353 | void * | ||
354 | GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq) | ||
355 | { | ||
356 | return mq->impl_state; | ||
357 | } | ||
358 | |||
359 | |||
360 | |||
361 | /** | ||
362 | * Mark the current message as irrevocably sent, but do not | ||
363 | * proceed with sending the next message. | ||
364 | * Will call the appropriate GNUNET_MQ_NotifyCallback, if any. | ||
365 | * | ||
366 | * @param mq message queue | ||
367 | */ | ||
368 | void | ||
369 | GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq) | ||
370 | { | ||
371 | GNUNET_assert (NULL != mq->current_envelope); | ||
372 | GNUNET_assert (GNUNET_NO == mq->commited); | ||
373 | mq->commited = GNUNET_YES; | ||
374 | if (NULL != mq->current_envelope->sent_cb) | ||
375 | mq->current_envelope->sent_cb (mq->current_envelope->sent_cls); | ||
376 | } | ||
377 | |||
378 | |||
379 | struct GNUNET_MQ_Envelope * | ||
108 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | 380 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) |
109 | { | 381 | { |
110 | struct GNUNET_MQ_Message *mqm; | 382 | struct GNUNET_MQ_Envelope *mqm; |
111 | 383 | ||
112 | mqm = GNUNET_malloc (sizeof *mqm + size); | 384 | mqm = GNUNET_malloc (sizeof *mqm + size); |
113 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; | 385 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; |
@@ -119,11 +391,11 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | |||
119 | } | 391 | } |
120 | 392 | ||
121 | 393 | ||
122 | struct GNUNET_MQ_Message * | 394 | struct GNUNET_MQ_Envelope * |
123 | GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, | 395 | GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, |
124 | const struct GNUNET_MessageHeader *nested_mh) | 396 | const struct GNUNET_MessageHeader *nested_mh) |
125 | { | 397 | { |
126 | struct GNUNET_MQ_Message *mqm; | 398 | struct GNUNET_MQ_Envelope *mqm; |
127 | uint16_t size; | 399 | uint16_t size; |
128 | 400 | ||
129 | if (NULL == nested_mh) | 401 | if (NULL == nested_mh) |
@@ -154,85 +426,62 @@ static size_t | |||
154 | transmit_queued (void *cls, size_t size, | 426 | transmit_queued (void *cls, size_t size, |
155 | void *buf) | 427 | void *buf) |
156 | { | 428 | { |
157 | struct GNUNET_MQ_MessageQueue *mq = cls; | 429 | struct GNUNET_MQ_Handle *mq = cls; |
158 | struct GNUNET_MQ_Message *mqm = mq->current_msg; | 430 | struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq); |
159 | struct ServerClientSocketState *state = mq->impl_state; | 431 | const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq); |
160 | size_t msg_size; | 432 | size_t msg_size; |
161 | 433 | ||
162 | GNUNET_assert (NULL != buf); | 434 | GNUNET_assert (NULL != buf); |
163 | 435 | ||
164 | if (NULL != mqm->sent_cb) | 436 | msg_size = ntohs (msg->size); |
165 | { | ||
166 | mqm->sent_cb (mqm->sent_cls); | ||
167 | } | ||
168 | |||
169 | mq->current_msg = NULL; | ||
170 | GNUNET_assert (NULL != mqm); | ||
171 | msg_size = ntohs (mqm->mh->size); | ||
172 | GNUNET_assert (size >= msg_size); | 437 | GNUNET_assert (size >= msg_size); |
173 | memcpy (buf, mqm->mh, msg_size); | 438 | memcpy (buf, msg, msg_size); |
174 | GNUNET_free (mqm); | ||
175 | state->th = NULL; | 439 | state->th = NULL; |
176 | 440 | ||
177 | if (NULL != mq->msg_head) | 441 | GNUNET_MQ_impl_send_continue (mq); |
178 | { | 442 | |
179 | mq->current_msg = mq->msg_head; | ||
180 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); | ||
181 | state->th = | ||
182 | GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, | ||
183 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
184 | &transmit_queued, mq); | ||
185 | } | ||
186 | return msg_size; | 443 | return msg_size; |
187 | } | 444 | } |
188 | 445 | ||
189 | 446 | ||
190 | 447 | ||
191 | static void | 448 | static void |
192 | server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | 449 | server_client_destroy_impl (struct GNUNET_MQ_Handle *mq, |
450 | void *impl_state) | ||
193 | { | 451 | { |
194 | struct ServerClientSocketState *state; | 452 | struct ServerClientSocketState *state = impl_state; |
195 | 453 | ||
196 | GNUNET_assert (NULL != mq); | 454 | GNUNET_assert (NULL != mq); |
197 | state = mq->impl_state; | ||
198 | GNUNET_assert (NULL != state); | 455 | GNUNET_assert (NULL != state); |
199 | GNUNET_SERVER_client_drop (state->client); | 456 | GNUNET_SERVER_client_drop (state->client); |
200 | GNUNET_free (state); | 457 | GNUNET_free (state); |
201 | } | 458 | } |
202 | 459 | ||
203 | static void | 460 | static void |
204 | server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | 461 | server_client_send_impl (struct GNUNET_MQ_Handle *mq, |
462 | const struct GNUNET_MessageHeader *msg, void *impl_state) | ||
205 | { | 463 | { |
206 | struct ServerClientSocketState *state; | 464 | struct ServerClientSocketState *state = impl_state; |
207 | int msize; | ||
208 | 465 | ||
209 | GNUNET_assert (NULL != mq); | 466 | GNUNET_assert (NULL != mq); |
210 | state = mq->impl_state; | ||
211 | GNUNET_assert (NULL != state); | 467 | GNUNET_assert (NULL != state); |
212 | 468 | ||
213 | if (NULL != state->th) | 469 | GNUNET_MQ_impl_send_commit (mq); |
214 | { | 470 | |
215 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | ||
216 | return; | ||
217 | } | ||
218 | GNUNET_assert (NULL == mq->msg_head); | ||
219 | GNUNET_assert (NULL == mq->current_msg); | ||
220 | msize = ntohs (mqm->mh->size); | ||
221 | mq->current_msg = mqm; | ||
222 | state->th = | 471 | state->th = |
223 | GNUNET_SERVER_notify_transmit_ready (state->client, msize, | 472 | GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size), |
224 | GNUNET_TIME_UNIT_FOREVER_REL, | 473 | GNUNET_TIME_UNIT_FOREVER_REL, |
225 | &transmit_queued, mq); | 474 | &transmit_queued, mq); |
226 | } | 475 | } |
227 | 476 | ||
228 | 477 | ||
229 | struct GNUNET_MQ_MessageQueue * | 478 | struct GNUNET_MQ_Handle * |
230 | GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) | 479 | GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) |
231 | { | 480 | { |
232 | struct GNUNET_MQ_MessageQueue *mq; | 481 | struct GNUNET_MQ_Handle *mq; |
233 | struct ServerClientSocketState *scss; | 482 | struct ServerClientSocketState *scss; |
234 | 483 | ||
235 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | 484 | mq = GNUNET_new (struct GNUNET_MQ_Handle); |
236 | scss = GNUNET_new (struct ServerClientSocketState); | 485 | scss = GNUNET_new (struct ServerClientSocketState); |
237 | mq->impl_state = scss; | 486 | mq->impl_state = scss; |
238 | scss->client = client; | 487 | scss->client = client; |
@@ -254,24 +503,21 @@ static void | |||
254 | handle_client_message (void *cls, | 503 | handle_client_message (void *cls, |
255 | const struct GNUNET_MessageHeader *msg) | 504 | const struct GNUNET_MessageHeader *msg) |
256 | { | 505 | { |
257 | struct GNUNET_MQ_MessageQueue *mq = cls; | 506 | struct GNUNET_MQ_Handle *mq = cls; |
258 | struct ClientConnectionState *state; | 507 | struct ClientConnectionState *state; |
259 | 508 | ||
260 | state = mq->impl_state; | 509 | state = mq->impl_state; |
261 | 510 | ||
262 | if (NULL == msg) | 511 | if (NULL == msg) |
263 | { | 512 | { |
264 | if (NULL == mq->error_handler) | 513 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); |
265 | LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n"); | ||
266 | else | ||
267 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
268 | return; | 514 | return; |
269 | } | 515 | } |
270 | 516 | ||
271 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, | 517 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, |
272 | GNUNET_TIME_UNIT_FOREVER_REL); | 518 | GNUNET_TIME_UNIT_FOREVER_REL); |
273 | 519 | ||
274 | GNUNET_MQ_dispatch (mq, msg); | 520 | GNUNET_MQ_inject_message (mq, msg); |
275 | } | 521 | } |
276 | 522 | ||
277 | 523 | ||
@@ -287,23 +533,22 @@ static size_t | |||
287 | connection_client_transmit_queued (void *cls, size_t size, | 533 | connection_client_transmit_queued (void *cls, size_t size, |
288 | void *buf) | 534 | void *buf) |
289 | { | 535 | { |
290 | struct GNUNET_MQ_MessageQueue *mq = cls; | 536 | struct GNUNET_MQ_Handle *mq = cls; |
291 | struct GNUNET_MQ_Message *mqm = mq->current_msg; | 537 | const struct GNUNET_MessageHeader *msg; |
292 | struct ClientConnectionState *state = mq->impl_state; | 538 | struct ClientConnectionState *state = mq->impl_state; |
293 | size_t msg_size; | 539 | size_t msg_size; |
294 | 540 | ||
541 | GNUNET_assert (NULL != mq); | ||
542 | msg = GNUNET_MQ_impl_current (mq); | ||
543 | |||
295 | if (NULL == buf) | 544 | if (NULL == buf) |
296 | { | 545 | { |
297 | if (NULL == mq->error_handler) | 546 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); |
298 | { | ||
299 | LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n"); | ||
300 | return 0; | ||
301 | } | ||
302 | mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); | ||
303 | return 0; | 547 | return 0; |
304 | } | 548 | } |
305 | 549 | ||
306 | if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active)) | 550 | if ( (GNUNET_YES == state->receive_requested) && |
551 | (GNUNET_NO == state->receive_active) ) | ||
307 | { | 552 | { |
308 | state->receive_active = GNUNET_YES; | 553 | state->receive_active = GNUNET_YES; |
309 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, | 554 | GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, |
@@ -311,78 +556,53 @@ connection_client_transmit_queued (void *cls, size_t size, | |||
311 | } | 556 | } |
312 | 557 | ||
313 | 558 | ||
314 | GNUNET_assert (NULL != mqm); | 559 | msg_size = ntohs (msg->size); |
315 | |||
316 | if (NULL != mqm->sent_cb) | ||
317 | { | ||
318 | mqm->sent_cb (mqm->sent_cls); | ||
319 | } | ||
320 | |||
321 | mq->current_msg = NULL; | ||
322 | GNUNET_assert (NULL != buf); | ||
323 | msg_size = ntohs (mqm->mh->size); | ||
324 | GNUNET_assert (size >= msg_size); | 560 | GNUNET_assert (size >= msg_size); |
325 | memcpy (buf, mqm->mh, msg_size); | 561 | memcpy (buf, msg, msg_size); |
326 | GNUNET_free (mqm); | ||
327 | state->th = NULL; | 562 | state->th = NULL; |
328 | if (NULL != mq->msg_head) | 563 | |
329 | { | 564 | GNUNET_MQ_impl_send_continue (mq); |
330 | mq->current_msg = mq->msg_head; | 565 | |
331 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); | ||
332 | state->th = | ||
333 | GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size), | ||
334 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | ||
335 | &connection_client_transmit_queued, mq); | ||
336 | } | ||
337 | return msg_size; | 566 | return msg_size; |
338 | } | 567 | } |
339 | 568 | ||
340 | 569 | ||
341 | 570 | ||
342 | static void | 571 | static void |
343 | connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) | 572 | connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) |
344 | { | 573 | { |
345 | GNUNET_free (mq->impl_state); | 574 | GNUNET_free (impl_state); |
346 | } | 575 | } |
347 | 576 | ||
348 | static void | 577 | static void |
349 | connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, | 578 | connection_client_send_impl (struct GNUNET_MQ_Handle *mq, |
350 | struct GNUNET_MQ_Message *mqm) | 579 | const struct GNUNET_MessageHeader *msg, void *impl_state) |
351 | { | 580 | { |
352 | struct ClientConnectionState *state = mq->impl_state; | 581 | struct ClientConnectionState *state = impl_state; |
353 | int msize; | ||
354 | 582 | ||
355 | GNUNET_assert (NULL != state); | 583 | GNUNET_assert (NULL != state); |
584 | GNUNET_assert (NULL == state->th); | ||
585 | |||
586 | GNUNET_MQ_impl_send_commit (mq); | ||
356 | 587 | ||
357 | if (NULL != state->th) | ||
358 | { | ||
359 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | ||
360 | return; | ||
361 | } | ||
362 | GNUNET_assert (NULL == mq->current_msg); | ||
363 | mq->current_msg = mqm; | ||
364 | msize = ntohs (mqm->mh->size); | ||
365 | state->th = | 588 | state->th = |
366 | GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, | 589 | GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), |
367 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | 590 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, |
368 | &connection_client_transmit_queued, mq); | 591 | &connection_client_transmit_queued, mq); |
369 | } | 592 | } |
370 | 593 | ||
371 | 594 | ||
372 | 595 | struct GNUNET_MQ_Handle * | |
373 | |||
374 | |||
375 | struct GNUNET_MQ_MessageQueue * | ||
376 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, | 596 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, |
377 | const struct GNUNET_MQ_Handler *handlers, | 597 | const struct GNUNET_MQ_MessageHandler *handlers, |
378 | void *cls) | 598 | void *cls) |
379 | { | 599 | { |
380 | struct GNUNET_MQ_MessageQueue *mq; | 600 | struct GNUNET_MQ_Handle *mq; |
381 | struct ClientConnectionState *state; | 601 | struct ClientConnectionState *state; |
382 | 602 | ||
383 | GNUNET_assert (NULL != connection); | 603 | GNUNET_assert (NULL != connection); |
384 | 604 | ||
385 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | 605 | mq = GNUNET_new (struct GNUNET_MQ_Handle); |
386 | mq->handlers = handlers; | 606 | mq->handlers = handlers; |
387 | mq->handlers_cls = cls; | 607 | mq->handlers_cls = cls; |
388 | state = GNUNET_new (struct ClientConnectionState); | 608 | state = GNUNET_new (struct ClientConnectionState); |
@@ -390,16 +610,20 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti | |||
390 | mq->impl_state = state; | 610 | mq->impl_state = state; |
391 | mq->send_impl = connection_client_send_impl; | 611 | mq->send_impl = connection_client_send_impl; |
392 | mq->destroy_impl = connection_client_destroy_impl; | 612 | mq->destroy_impl = connection_client_destroy_impl; |
613 | if (NULL != handlers) | ||
614 | state->receive_requested = GNUNET_YES; | ||
393 | 615 | ||
394 | return mq; | 616 | return mq; |
395 | } | 617 | } |
396 | 618 | ||
397 | 619 | ||
398 | void | 620 | void |
399 | GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, | 621 | GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq, |
400 | const struct GNUNET_MQ_Handler *new_handlers, | 622 | const struct GNUNET_MQ_MessageHandler *new_handlers, |
401 | void *cls) | 623 | void *cls) |
402 | { | 624 | { |
625 | /* FIXME: notify implementation? */ | ||
626 | /* FIXME: what about NULL handlers? abort receive? */ | ||
403 | mq->handlers = new_handlers; | 627 | mq->handlers = new_handlers; |
404 | mq->handlers_cls = cls; | 628 | mq->handlers_cls = cls; |
405 | } | 629 | } |
@@ -413,8 +637,7 @@ GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, | |||
413 | * @param assoc_data to associate | 637 | * @param assoc_data to associate |
414 | */ | 638 | */ |
415 | uint32_t | 639 | uint32_t |
416 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, | 640 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, |
417 | struct GNUNET_MQ_Message *mqm, | ||
418 | void *assoc_data) | 641 | void *assoc_data) |
419 | { | 642 | { |
420 | uint32_t id; | 643 | uint32_t id; |
@@ -433,7 +656,7 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, | |||
433 | 656 | ||
434 | 657 | ||
435 | void * | 658 | void * |
436 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | 659 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) |
437 | { | 660 | { |
438 | if (NULL == mq->assoc_map) | 661 | if (NULL == mq->assoc_map) |
439 | return NULL; | 662 | return NULL; |
@@ -442,7 +665,7 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | |||
442 | 665 | ||
443 | 666 | ||
444 | void * | 667 | void * |
445 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | 668 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id) |
446 | { | 669 | { |
447 | void *val; | 670 | void *val; |
448 | 671 | ||
@@ -456,7 +679,7 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | |||
456 | 679 | ||
457 | 680 | ||
458 | void | 681 | void |
459 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, | 682 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm, |
460 | GNUNET_MQ_NotifyCallback cb, | 683 | GNUNET_MQ_NotifyCallback cb, |
461 | void *cls) | 684 | void *cls) |
462 | { | 685 | { |
@@ -466,13 +689,13 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, | |||
466 | 689 | ||
467 | 690 | ||
468 | void | 691 | void |
469 | GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) | 692 | GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) |
470 | { | 693 | { |
471 | /* FIXME: destroy all pending messages in the queue */ | 694 | /* FIXME: destroy all pending messages in the queue */ |
472 | 695 | ||
473 | if (NULL != mq->destroy_impl) | 696 | if (NULL != mq->destroy_impl) |
474 | { | 697 | { |
475 | mq->destroy_impl (mq); | 698 | mq->destroy_impl (mq, mq->impl_state); |
476 | } | 699 | } |
477 | 700 | ||
478 | GNUNET_free (mq); | 701 | GNUNET_free (mq); |
@@ -480,7 +703,6 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) | |||
480 | 703 | ||
481 | 704 | ||
482 | 705 | ||
483 | |||
484 | struct GNUNET_MessageHeader * | 706 | struct GNUNET_MessageHeader * |
485 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) | 707 | GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) |
486 | { | 708 | { |
diff --git a/src/util/test_mq.c b/src/util/test_mq.c index 55cd80ef1..45bba0a6b 100644 --- a/src/util/test_mq.c +++ b/src/util/test_mq.c | |||
@@ -40,7 +40,7 @@ GNUNET_NETWORK_STRUCT_END | |||
40 | void | 40 | void |
41 | test1 (void) | 41 | test1 (void) |
42 | { | 42 | { |
43 | struct GNUNET_MQ_Message *mqm; | 43 | struct GNUNET_MQ_Envelope *mqm; |
44 | struct MyMessage *mm; | 44 | struct MyMessage *mm; |
45 | 45 | ||
46 | mm = NULL; | 46 | mm = NULL; |
@@ -57,7 +57,7 @@ test1 (void) | |||
57 | void | 57 | void |
58 | test2 (void) | 58 | test2 (void) |
59 | { | 59 | { |
60 | struct GNUNET_MQ_Message *mqm; | 60 | struct GNUNET_MQ_Envelope *mqm; |
61 | struct GNUNET_MessageHeader *mh; | 61 | struct GNUNET_MessageHeader *mh; |
62 | 62 | ||
63 | mqm = GNUNET_MQ_msg_header (42); | 63 | mqm = GNUNET_MQ_msg_header (42); |
diff --git a/src/util/test_mq_client.c b/src/util/test_mq_client.c index b7eb1516a..30e498fcc 100644 --- a/src/util/test_mq_client.c +++ b/src/util/test_mq_client.c | |||
@@ -60,6 +60,9 @@ recv_cb (void *cls, struct GNUNET_SERVER_Client *argclient, | |||
60 | return; | 60 | return; |
61 | } | 61 | } |
62 | 62 | ||
63 | /* can happen if notify does not work */ | ||
64 | GNUNET_assert (received < 2); | ||
65 | |||
63 | GNUNET_SERVER_receive_done (argclient, GNUNET_YES); | 66 | GNUNET_SERVER_receive_done (argclient, GNUNET_YES); |
64 | } | 67 | } |
65 | 68 | ||
@@ -98,14 +101,16 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = { | |||
98 | 101 | ||
99 | void send_cb (void *cls) | 102 | void send_cb (void *cls) |
100 | { | 103 | { |
104 | /* the notify should only be called once */ | ||
105 | GNUNET_assert (GNUNET_NO == notify); | ||
101 | printf ("notify sent\n"); | 106 | printf ("notify sent\n"); |
102 | notify = GNUNET_YES; | 107 | notify = GNUNET_YES; |
103 | } | 108 | } |
104 | 109 | ||
105 | void test_mq (struct GNUNET_CLIENT_Connection *client) | 110 | void test_mq (struct GNUNET_CLIENT_Connection *client) |
106 | { | 111 | { |
107 | struct GNUNET_MQ_MessageQueue *mq; | 112 | struct GNUNET_MQ_Handle *mq; |
108 | struct GNUNET_MQ_Message *mqm; | 113 | struct GNUNET_MQ_Envelope *mqm; |
109 | 114 | ||
110 | /* FIXME: test handling responses */ | 115 | /* FIXME: test handling responses */ |
111 | mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL); | 116 | mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL); |