diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-06-23 16:30:45 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-06-23 16:30:45 +0000 |
commit | 021e5d47b4ac2fd2088cee65e551fd7e6114e99b (patch) | |
tree | 703829ff172d75f5417e42041ed4694a53d10e71 /src/dht/dht_api.c | |
parent | c1bac2613d2e7915ab23c3675918aa839514bd97 (diff) | |
download | gnunet-021e5d47b4ac2fd2088cee65e551fd7e6114e99b.tar.gz gnunet-021e5d47b4ac2fd2088cee65e551fd7e6114e99b.zip |
convert DHT API to new MQ API
Diffstat (limited to 'src/dht/dht_api.c')
-rw-r--r-- | src/dht/dht_api.c | 1481 |
1 files changed, 567 insertions, 914 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index b5ae2f8ce..de1043ca9 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2009, 2010, 2011, 2012 GNUnet e.V. | 3 | Copyright (C) 2009, 2010, 2011, 2012, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -36,85 +36,6 @@ | |||
36 | 36 | ||
37 | #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__) | 37 | #define LOG(kind,...) GNUNET_log_from (kind, "dht-api",__VA_ARGS__) |
38 | 38 | ||
39 | /** | ||
40 | * Entry in our list of messages to be (re-)transmitted. | ||
41 | */ | ||
42 | struct PendingMessage | ||
43 | { | ||
44 | /** | ||
45 | * This is a doubly-linked list. | ||
46 | */ | ||
47 | struct PendingMessage *prev; | ||
48 | |||
49 | /** | ||
50 | * This is a doubly-linked list. | ||
51 | */ | ||
52 | struct PendingMessage *next; | ||
53 | |||
54 | /** | ||
55 | * Message that is pending, allocated at the end | ||
56 | * of this struct. | ||
57 | */ | ||
58 | const struct GNUNET_MessageHeader *msg; | ||
59 | |||
60 | /** | ||
61 | * Handle to the DHT API context. | ||
62 | */ | ||
63 | struct GNUNET_DHT_Handle *handle; | ||
64 | |||
65 | /** | ||
66 | * Continuation to call when the request has been | ||
67 | * transmitted (for the first time) to the service; can be NULL. | ||
68 | */ | ||
69 | GNUNET_SCHEDULER_TaskCallback cont; | ||
70 | |||
71 | /** | ||
72 | * Closure for 'cont'. | ||
73 | */ | ||
74 | void *cont_cls; | ||
75 | |||
76 | /** | ||
77 | * Unique ID for this request | ||
78 | */ | ||
79 | uint64_t unique_id; | ||
80 | |||
81 | /** | ||
82 | * Free the saved message once sent, set to GNUNET_YES for messages | ||
83 | * that do not receive responses; GNUNET_NO if this pending message | ||
84 | * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed | ||
85 | * from there. | ||
86 | */ | ||
87 | int free_on_send; | ||
88 | |||
89 | /** | ||
90 | * GNUNET_YES if this message is in our pending queue right now. | ||
91 | */ | ||
92 | int in_pending_queue; | ||
93 | |||
94 | }; | ||
95 | |||
96 | #if ENABLE_MALICIOUS | ||
97 | /** | ||
98 | * Handle to act malicious message | ||
99 | */ | ||
100 | struct GNUNET_DHT_ActMaliciousHandle | ||
101 | { | ||
102 | /** | ||
103 | * Continuation to call when done. | ||
104 | */ | ||
105 | GNUNET_DHT_ActMaliciousContinuation cont; | ||
106 | |||
107 | /** | ||
108 | * Main handle to this DHT api | ||
109 | */ | ||
110 | struct GNUNET_DHT_Handle *dht_handle; | ||
111 | |||
112 | /** | ||
113 | * Closure for 'cont'. | ||
114 | */ | ||
115 | void *cont_cls; | ||
116 | }; | ||
117 | #endif | ||
118 | 39 | ||
119 | /** | 40 | /** |
120 | * Handle to a PUT request. | 41 | * Handle to a PUT request. |
@@ -137,27 +58,16 @@ struct GNUNET_DHT_PutHandle | |||
137 | GNUNET_DHT_PutContinuation cont; | 58 | GNUNET_DHT_PutContinuation cont; |
138 | 59 | ||
139 | /** | 60 | /** |
140 | * Pending message associated with this PUT operation, | ||
141 | * NULL after the message has been transmitted to the service. | ||
142 | */ | ||
143 | struct PendingMessage *pending; | ||
144 | |||
145 | /** | ||
146 | * Main handle to this DHT api | 61 | * Main handle to this DHT api |
147 | */ | 62 | */ |
148 | struct GNUNET_DHT_Handle *dht_handle; | 63 | struct GNUNET_DHT_Handle *dht_handle; |
149 | 64 | ||
150 | /** | 65 | /** |
151 | * Closure for 'cont'. | 66 | * Closure for @e cont. |
152 | */ | 67 | */ |
153 | void *cont_cls; | 68 | void *cont_cls; |
154 | 69 | ||
155 | /** | 70 | /** |
156 | * Timeout task for this operation. | ||
157 | */ | ||
158 | struct GNUNET_SCHEDULER_Task * timeout_task; | ||
159 | |||
160 | /** | ||
161 | * Unique ID for the PUT operation. | 71 | * Unique ID for the PUT operation. |
162 | */ | 72 | */ |
163 | uint64_t unique_id; | 73 | uint64_t unique_id; |
@@ -176,7 +86,7 @@ struct GNUNET_DHT_GetHandle | |||
176 | GNUNET_DHT_GetIterator iter; | 86 | GNUNET_DHT_GetIterator iter; |
177 | 87 | ||
178 | /** | 88 | /** |
179 | * Closure for the iterator callback | 89 | * Closure for @a iter. |
180 | */ | 90 | */ |
181 | void *iter_cls; | 91 | void *iter_cls; |
182 | 92 | ||
@@ -186,13 +96,6 @@ struct GNUNET_DHT_GetHandle | |||
186 | struct GNUNET_DHT_Handle *dht_handle; | 96 | struct GNUNET_DHT_Handle *dht_handle; |
187 | 97 | ||
188 | /** | 98 | /** |
189 | * The actual message sent for this request, | ||
190 | * used for retransmitting requests on service | ||
191 | * failure/reconnect. Freed on route_stop. | ||
192 | */ | ||
193 | struct PendingMessage *message; | ||
194 | |||
195 | /** | ||
196 | * Array of hash codes over the results that we have already | 99 | * Array of hash codes over the results that we have already |
197 | * seen. | 100 | * seen. |
198 | */ | 101 | */ |
@@ -209,25 +112,36 @@ struct GNUNET_DHT_GetHandle | |||
209 | uint64_t unique_id; | 112 | uint64_t unique_id; |
210 | 113 | ||
211 | /** | 114 | /** |
212 | * Size of the 'seen_results' array. Note that not | 115 | * Size of the extended query, allocated at the end of this struct. |
213 | * all positions might be used (as we over-allocate). | ||
214 | */ | 116 | */ |
215 | unsigned int seen_results_size; | 117 | size_t xquery_size; |
216 | 118 | ||
217 | /** | 119 | /** |
218 | * Offset into the 'seen_results' array marking the | 120 | * Desired replication level. |
219 | * end of the positions that are actually used. | ||
220 | */ | 121 | */ |
221 | unsigned int seen_results_end; | 122 | uint32_t desired_replication_level; |
123 | |||
124 | /** | ||
125 | * Type of the block we are looking for. | ||
126 | */ | ||
127 | enum GNUNET_BLOCK_Type type; | ||
128 | |||
129 | /** | ||
130 | * Routing options. | ||
131 | */ | ||
132 | enum GNUNET_DHT_RouteOption options; | ||
222 | 133 | ||
223 | /** | 134 | /** |
224 | * Offset into the 'seen_results' array marking the | 135 | * Size of the @e seen_results array. Note that not |
225 | * position up to where we've send the hash codes to | 136 | * all positions might be used (as we over-allocate). |
226 | * the DHT for blocking (needed as we might not be | ||
227 | * able to send all hash codes at once). | ||
228 | */ | 137 | */ |
229 | unsigned int seen_results_transmission_offset; | 138 | unsigned int seen_results_size; |
230 | 139 | ||
140 | /** | ||
141 | * Offset into the @e seen_results array marking the | ||
142 | * end of the positions that are actually used. | ||
143 | */ | ||
144 | unsigned int seen_results_end; | ||
231 | 145 | ||
232 | }; | 146 | }; |
233 | 147 | ||
@@ -278,7 +192,7 @@ struct GNUNET_DHT_MonitorHandle | |||
278 | GNUNET_DHT_MonitorPutCB put_cb; | 192 | GNUNET_DHT_MonitorPutCB put_cb; |
279 | 193 | ||
280 | /** | 194 | /** |
281 | * Closure for cb. | 195 | * Closure for @e get_cb, @e put_cb and @e get_resp_cb. |
282 | */ | 196 | */ |
283 | void *cb_cls; | 197 | void *cb_cls; |
284 | 198 | ||
@@ -297,24 +211,9 @@ struct GNUNET_DHT_Handle | |||
297 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 211 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
298 | 212 | ||
299 | /** | 213 | /** |
300 | * Socket (if available). | 214 | * Connection to DHT service. |
301 | */ | ||
302 | struct GNUNET_CLIENT_Connection *client; | ||
303 | |||
304 | /** | ||
305 | * Currently pending transmission request (or NULL). | ||
306 | */ | ||
307 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
308 | |||
309 | /** | ||
310 | * Head of linked list of messages we would like to transmit. | ||
311 | */ | ||
312 | struct PendingMessage *pending_head; | ||
313 | |||
314 | /** | ||
315 | * Tail of linked list of messages we would like to transmit. | ||
316 | */ | 215 | */ |
317 | struct PendingMessage *pending_tail; | 216 | struct GNUNET_MQ_Handle *mq; |
318 | 217 | ||
319 | /** | 218 | /** |
320 | * Head of linked list of messages we would like to monitor. | 219 | * Head of linked list of messages we would like to monitor. |
@@ -345,7 +244,7 @@ struct GNUNET_DHT_Handle | |||
345 | /** | 244 | /** |
346 | * Task for trying to reconnect. | 245 | * Task for trying to reconnect. |
347 | */ | 246 | */ |
348 | struct GNUNET_SCHEDULER_Task * reconnect_task; | 247 | struct GNUNET_SCHEDULER_Task *reconnect_task; |
349 | 248 | ||
350 | /** | 249 | /** |
351 | * How quickly should we retry? Used for exponential back-off on | 250 | * How quickly should we retry? Used for exponential back-off on |
@@ -358,101 +257,93 @@ struct GNUNET_DHT_Handle | |||
358 | */ | 257 | */ |
359 | uint64_t uid_gen; | 258 | uint64_t uid_gen; |
360 | 259 | ||
361 | /** | ||
362 | * Did we start our receive loop yet? | ||
363 | */ | ||
364 | int in_receive; | ||
365 | 260 | ||
366 | #if ENABLE_MALICIOUS | ||
367 | /** | ||
368 | * Handle of act malicious request. | ||
369 | */ | ||
370 | struct GNUNET_DHT_ActMaliciousHandle *mh; | ||
371 | #endif | ||
372 | }; | 261 | }; |
373 | 262 | ||
374 | 263 | ||
375 | /** | 264 | /** |
376 | * Handler for messages received from the DHT service | 265 | * Try to (re)connect to the DHT service. |
377 | * a demultiplexer which handles numerous message types | ||
378 | * | 266 | * |
379 | * @param cls the `struct GNUNET_DHT_Handle` | 267 | * @param h DHT handle to reconnect |
380 | * @param msg the incoming message | 268 | * @return #GNUNET_YES on success, #GNUNET_NO on failure. |
381 | */ | 269 | */ |
382 | static void | 270 | static int |
383 | service_message_handler (void *cls, | 271 | try_connect (struct GNUNET_DHT_Handle *h); |
384 | const struct GNUNET_MessageHeader *msg); | ||
385 | 272 | ||
386 | 273 | ||
387 | /** | 274 | /** |
388 | * Try to (re)connect to the DHT service. | 275 | * Send GET message for a @a get_handle to DHT. |
389 | * | 276 | * |
390 | * @param handle DHT handle to reconnect | 277 | * @param gh GET to generate messages for. |
391 | * @return #GNUNET_YES on success, #GNUNET_NO on failure. | ||
392 | */ | 278 | */ |
393 | static int | 279 | static void |
394 | try_connect (struct GNUNET_DHT_Handle *handle) | 280 | send_get (struct GNUNET_DHT_GetHandle *gh) |
395 | { | 281 | { |
396 | if (NULL != handle->client) | 282 | struct GNUNET_DHT_Handle *h = gh->dht_handle; |
397 | return GNUNET_OK; | 283 | struct GNUNET_MQ_Envelope *env; |
398 | handle->in_receive = GNUNET_NO; | 284 | struct GNUNET_DHT_ClientGetMessage *get_msg; |
399 | handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg); | 285 | |
400 | if (NULL == handle->client) | 286 | env = GNUNET_MQ_msg_extra (get_msg, |
401 | { | 287 | gh->xquery_size, |
402 | LOG (GNUNET_ERROR_TYPE_WARNING, | 288 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET); |
403 | _("Failed to connect to the DHT service!\n")); | 289 | get_msg->options = htonl ((uint32_t) gh->options); |
404 | return GNUNET_NO; | 290 | get_msg->desired_replication_level = htonl (gh->desired_replication_level); |
405 | } | 291 | get_msg->type = htonl (gh->type); |
406 | return GNUNET_YES; | 292 | get_msg->key = gh->key; |
293 | get_msg->unique_id = gh->unique_id; | ||
294 | memcpy (&get_msg[1], | ||
295 | &gh[1], | ||
296 | gh->xquery_size); | ||
297 | GNUNET_MQ_send (h->mq, | ||
298 | env); | ||
407 | } | 299 | } |
408 | 300 | ||
409 | 301 | ||
410 | /** | 302 | /** |
411 | * Queue messages to DHT to block certain results from the result set. | 303 | * Send GET message(s) for indicating which results are already known |
304 | * for a @a get_handle to DHT. Complex as we need to send the list of | ||
305 | * known results, which means we may need mulitple messages to block | ||
306 | * known results from the result set. | ||
412 | * | 307 | * |
413 | * @param get_handle GET to generate messages for. | 308 | * @param gh GET to generate messages for |
309 | * @param transmission_offset_start at which offset should we start? | ||
414 | */ | 310 | */ |
415 | static void | 311 | static void |
416 | queue_filter_messages (struct GNUNET_DHT_GetHandle *get_handle) | 312 | send_get_known_results (struct GNUNET_DHT_GetHandle *gh, |
313 | unsigned int transmission_offset_start) | ||
417 | { | 314 | { |
418 | struct PendingMessage *pm; | 315 | struct GNUNET_DHT_Handle *h = gh->dht_handle; |
316 | struct GNUNET_MQ_Envelope *env; | ||
419 | struct GNUNET_DHT_ClientGetResultSeenMessage *msg; | 317 | struct GNUNET_DHT_ClientGetResultSeenMessage *msg; |
420 | uint16_t msize; | ||
421 | unsigned int delta; | 318 | unsigned int delta; |
422 | unsigned int max; | 319 | unsigned int max; |
320 | unsigned int transmission_offset; | ||
423 | 321 | ||
424 | while (get_handle->seen_results_transmission_offset < get_handle->seen_results_end) | 322 | max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*msg)) |
323 | / sizeof (struct GNUNET_HashCode); | ||
324 | transmission_offset = transmission_offset_start; | ||
325 | while (transmission_offset < gh->seen_results_end) | ||
425 | { | 326 | { |
426 | delta = get_handle->seen_results_end - get_handle->seen_results_transmission_offset; | 327 | delta = gh->seen_results_end - transmission_offset; |
427 | max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode); | ||
428 | if (delta > max) | 328 | if (delta > max) |
429 | delta = max; | 329 | delta = max; |
430 | msize = sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + delta * sizeof (struct GNUNET_HashCode); | 330 | env = GNUNET_MQ_msg_extra (msg, |
431 | 331 | delta * sizeof (struct GNUNET_HashCode), | |
432 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); | 332 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN); |
433 | msg = (struct GNUNET_DHT_ClientGetResultSeenMessage *) &pm[1]; | 333 | msg->key = gh->key; |
434 | pm->msg = &msg->header; | 334 | msg->unique_id = gh->unique_id; |
435 | pm->handle = get_handle->dht_handle; | ||
436 | pm->unique_id = get_handle->unique_id; | ||
437 | pm->free_on_send = GNUNET_YES; | ||
438 | pm->in_pending_queue = GNUNET_YES; | ||
439 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN); | ||
440 | msg->header.size = htons (msize); | ||
441 | msg->key = get_handle->key; | ||
442 | msg->unique_id = get_handle->unique_id; | ||
443 | memcpy (&msg[1], | 335 | memcpy (&msg[1], |
444 | &get_handle->seen_results[get_handle->seen_results_transmission_offset], | 336 | &gh->seen_results[transmission_offset], |
445 | sizeof (struct GNUNET_HashCode) * delta); | 337 | sizeof (struct GNUNET_HashCode) * delta); |
446 | get_handle->seen_results_transmission_offset += delta; | 338 | GNUNET_MQ_send (h->mq, |
447 | GNUNET_CONTAINER_DLL_insert_tail (get_handle->dht_handle->pending_head, | 339 | env); |
448 | get_handle->dht_handle->pending_tail, | 340 | transmission_offset += delta; |
449 | pm); | ||
450 | } | 341 | } |
451 | } | 342 | } |
452 | 343 | ||
453 | 344 | ||
454 | /** | 345 | /** |
455 | * Add the request corresponding to the given route handle | 346 | * Add the GET request corresponding to the given route handle |
456 | * to the pending queue (if it is not already in there). | 347 | * to the pending queue (if it is not already in there). |
457 | * | 348 | * |
458 | * @param cls the `struct GNUNET_DHT_Handle *` | 349 | * @param cls the `struct GNUNET_DHT_Handle *` |
@@ -461,35 +352,49 @@ queue_filter_messages (struct GNUNET_DHT_GetHandle *get_handle) | |||
461 | * @return #GNUNET_YES (always) | 352 | * @return #GNUNET_YES (always) |
462 | */ | 353 | */ |
463 | static int | 354 | static int |
464 | add_request_to_pending (void *cls, | 355 | add_get_request_to_pending (void *cls, |
465 | const struct GNUNET_HashCode *key, | 356 | const struct GNUNET_HashCode *key, |
466 | void *value) | 357 | void *value) |
467 | { | 358 | { |
468 | struct GNUNET_DHT_Handle *handle = cls; | 359 | struct GNUNET_DHT_Handle *handle = cls; |
469 | struct GNUNET_DHT_GetHandle *get_handle = value; | 360 | struct GNUNET_DHT_GetHandle *gh = value; |
470 | 361 | ||
471 | if (GNUNET_NO == get_handle->message->in_pending_queue) | 362 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
472 | { | 363 | "Retransmitting request related to %s to DHT %p\n", |
473 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 364 | GNUNET_h2s (key), |
474 | "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key), | 365 | handle); |
475 | handle); | 366 | send_get (gh); |
476 | get_handle->seen_results_transmission_offset = 0; | 367 | send_get_known_results (gh, 0); |
477 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, | ||
478 | get_handle->message); | ||
479 | queue_filter_messages (get_handle); | ||
480 | get_handle->message->in_pending_queue = GNUNET_YES; | ||
481 | } | ||
482 | return GNUNET_YES; | 368 | return GNUNET_YES; |
483 | } | 369 | } |
484 | 370 | ||
485 | 371 | ||
486 | /** | 372 | /** |
487 | * Try to send messages from list of messages to send | 373 | * Send #GNUNET_MESSAGE_TYPE_DHT_MONITOR_START message. |
488 | * | 374 | * |
489 | * @param handle DHT_Handle | 375 | * @param mh monitor handle to generate start message for |
490 | */ | 376 | */ |
491 | static void | 377 | static void |
492 | process_pending_messages (struct GNUNET_DHT_Handle *handle); | 378 | send_monitor_start (struct GNUNET_DHT_MonitorHandle *mh) |
379 | { | ||
380 | struct GNUNET_DHT_Handle *h = mh->dht_handle; | ||
381 | struct GNUNET_MQ_Envelope *env; | ||
382 | struct GNUNET_DHT_MonitorStartStopMessage *m; | ||
383 | |||
384 | env = GNUNET_MQ_msg (m, | ||
385 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_START); | ||
386 | m->type = htonl (mh->type); | ||
387 | m->get = htons (NULL != mh->get_cb); | ||
388 | m->get_resp = htons (NULL != mh->get_resp_cb); | ||
389 | m->put = htons (NULL != mh->put_cb); | ||
390 | if (NULL != mh->key) | ||
391 | { | ||
392 | m->filter_key = htons(1); | ||
393 | m->key = *mh->key; | ||
394 | } | ||
395 | GNUNET_MQ_send (h->mq, | ||
396 | env); | ||
397 | } | ||
493 | 398 | ||
494 | 399 | ||
495 | /** | 400 | /** |
@@ -500,333 +405,238 @@ process_pending_messages (struct GNUNET_DHT_Handle *handle); | |||
500 | static void | 405 | static void |
501 | try_reconnect (void *cls) | 406 | try_reconnect (void *cls) |
502 | { | 407 | { |
503 | struct GNUNET_DHT_Handle *handle = cls; | 408 | struct GNUNET_DHT_Handle *h = cls; |
409 | struct GNUNET_DHT_MonitorHandle *mh; | ||
504 | 410 | ||
505 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with DHT %p\n", handle); | 411 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
506 | handle->retry_time = GNUNET_TIME_STD_BACKOFF (handle->retry_time); | 412 | "Reconnecting with DHT %p\n", |
507 | handle->reconnect_task = NULL; | 413 | h); |
508 | if (GNUNET_YES != try_connect (handle)) | 414 | h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); |
415 | h->reconnect_task = NULL; | ||
416 | if (GNUNET_YES != try_connect (h)) | ||
509 | { | 417 | { |
510 | LOG (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n"); | 418 | LOG (GNUNET_ERROR_TYPE_WARNING, |
419 | "DHT reconnect failed!\n"); | ||
420 | h->reconnect_task | ||
421 | = GNUNET_SCHEDULER_add_delayed (h->retry_time, | ||
422 | &try_reconnect, | ||
423 | h); | ||
511 | return; | 424 | return; |
512 | } | 425 | } |
513 | GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests, | 426 | GNUNET_CONTAINER_multihashmap_iterate (h->active_requests, |
514 | &add_request_to_pending, handle); | 427 | &add_get_request_to_pending, |
515 | process_pending_messages (handle); | 428 | h); |
429 | for (mh = h->monitor_head; NULL != mh; mh = mh->next) | ||
430 | send_monitor_start (mh); | ||
516 | } | 431 | } |
517 | 432 | ||
518 | 433 | ||
519 | /** | 434 | /** |
520 | * Try reconnecting to the DHT service. | 435 | * Try reconnecting to the DHT service. |
521 | * | 436 | * |
522 | * @param handle handle to dht to (possibly) disconnect and reconnect | 437 | * @param h handle to dht to (possibly) disconnect and reconnect |
523 | */ | 438 | */ |
524 | static void | 439 | static void |
525 | do_disconnect (struct GNUNET_DHT_Handle *handle) | 440 | do_disconnect (struct GNUNET_DHT_Handle *h) |
526 | { | 441 | { |
527 | struct GNUNET_DHT_PutHandle *ph; | 442 | struct GNUNET_DHT_PutHandle *ph; |
528 | struct GNUNET_DHT_PutHandle *next; | 443 | GNUNET_DHT_PutContinuation cont; |
444 | void *cont_cls; | ||
529 | 445 | ||
530 | if (NULL == handle->client) | 446 | if (NULL == h->mq) |
531 | return; | 447 | return; |
532 | GNUNET_assert (NULL == handle->reconnect_task); | 448 | GNUNET_MQ_destroy (h->mq); |
533 | if (NULL != handle->th) | 449 | h->mq = NULL; |
534 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); | ||
535 | handle->th = NULL; | ||
536 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 450 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
537 | "Disconnecting from DHT service, will try to reconnect in %s\n", | 451 | "Disconnecting from DHT service, will try to reconnect in %s\n", |
538 | GNUNET_STRINGS_relative_time_to_string (handle->retry_time, | 452 | GNUNET_STRINGS_relative_time_to_string (h->retry_time, |
539 | GNUNET_YES)); | 453 | GNUNET_YES)); |
540 | GNUNET_CLIENT_disconnect (handle->client); | 454 | /* notify client about all PUTs that (may) have failed due to disconnect */ |
541 | handle->client = NULL; | 455 | while (NULL != (ph = h->put_head)) |
542 | |||
543 | /* signal disconnect to all PUT requests that were transmitted but waiting | ||
544 | for the put confirmation */ | ||
545 | next = handle->put_head; | ||
546 | while (NULL != (ph = next)) | ||
547 | { | 456 | { |
548 | next = ph->next; | 457 | cont = ph->cont; |
549 | if (NULL == ph->pending) | 458 | cont_cls = ph->cont_cls; |
550 | { | 459 | GNUNET_DHT_put_cancel (ph); |
551 | if (NULL != ph->cont) | 460 | if (NULL != cont) |
552 | ph->cont (ph->cont_cls, GNUNET_SYSERR); | 461 | cont (cont_cls, |
553 | GNUNET_DHT_put_cancel (ph); | 462 | GNUNET_SYSERR); |
554 | } | ||
555 | } | 463 | } |
556 | handle->reconnect_task = | 464 | GNUNET_assert (NULL == h->reconnect_task); |
557 | GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle); | 465 | h->reconnect_task |
466 | = GNUNET_SCHEDULER_add_delayed (h->retry_time, | ||
467 | &try_reconnect, | ||
468 | h); | ||
558 | } | 469 | } |
559 | 470 | ||
560 | 471 | ||
561 | /** | 472 | /** |
562 | * Transmit the next pending message, called by notify_transmit_ready | 473 | * Generic error handler, called with the appropriate error code and |
474 | * the same closure specified at the creation of the message queue. | ||
475 | * Not every message queue implementation supports an error handler. | ||
563 | * | 476 | * |
564 | * @param cls the DHT handle | 477 | * @param cls closure with the `struct GNUNET_DHT_Handle *` |
565 | * @param size number of bytes available in @a buf for transmission | 478 | * @param error error code |
566 | * @param buf where to copy messages for the service | ||
567 | * @return number of bytes written to @a buf | ||
568 | */ | 479 | */ |
569 | static size_t | 480 | static void |
570 | transmit_pending (void *cls, | 481 | mq_error_handler (void *cls, |
571 | size_t size, | 482 | enum GNUNET_MQ_Error error) |
572 | void *buf); | 483 | { |
484 | struct GNUNET_DHT_Handle *h = cls; | ||
485 | |||
486 | do_disconnect (h); | ||
487 | } | ||
573 | 488 | ||
574 | 489 | ||
575 | /** | 490 | /** |
576 | * Try to send messages from list of messages to send | 491 | * Verify integrity of a get monitor message from the service. |
577 | * | 492 | * |
578 | * @param handle handle to DHT | 493 | * @param cls The DHT handle. |
494 | * @param msg Monitor get message from the service. | ||
495 | * @return #GNUNET_OK if everything went fine, | ||
496 | * #GNUNET_SYSERR if the message is malformed. | ||
579 | */ | 497 | */ |
580 | static void | 498 | static int |
581 | process_pending_messages (struct GNUNET_DHT_Handle *handle) | 499 | check_monitor_get (void *cls, |
500 | const struct GNUNET_DHT_MonitorGetMessage *msg) | ||
582 | { | 501 | { |
583 | struct PendingMessage *head; | 502 | uint32_t plen = ntohl (msg->get_path_length); |
503 | uint16_t msize = ntohs (msg->header.size) - sizeof (*msg); | ||
584 | 504 | ||
585 | if (NULL == handle->client) | 505 | if ( (plen > UINT16_MAX) || |
506 | (plen * sizeof (struct GNUNET_HashCode) != msize) ) | ||
586 | { | 507 | { |
587 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 508 | GNUNET_break (0); |
588 | "process_pending_messages called, but client is NULL, reconnecting\n"); | 509 | return GNUNET_SYSERR; |
589 | do_disconnect (handle); | ||
590 | return; | ||
591 | } | 510 | } |
592 | if (NULL != handle->th) | 511 | return GNUNET_OK; |
593 | return; | ||
594 | if (NULL == (head = handle->pending_head)) | ||
595 | return; | ||
596 | handle->th = | ||
597 | GNUNET_CLIENT_notify_transmit_ready (handle->client, | ||
598 | ntohs (head->msg->size), | ||
599 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
600 | GNUNET_YES, &transmit_pending, | ||
601 | handle); | ||
602 | if (NULL != handle->th) | ||
603 | return; | ||
604 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
605 | "notify_transmit_ready returned NULL, reconnecting\n"); | ||
606 | do_disconnect (handle); | ||
607 | } | 512 | } |
608 | 513 | ||
609 | 514 | ||
610 | /** | 515 | /** |
611 | * Transmit the next pending message, called by notify_transmit_ready | 516 | * Process a get monitor message from the service. |
612 | * | 517 | * |
613 | * @param cls the DHT handle | 518 | * @param cls The DHT handle. |
614 | * @param size number of bytes available in @a buf for transmission | 519 | * @param msg Monitor get message from the service. |
615 | * @param buf where to copy messages for the service | ||
616 | * @return number of bytes written to @a buf | ||
617 | */ | 520 | */ |
618 | static size_t | 521 | static void |
619 | transmit_pending (void *cls, | 522 | handle_monitor_get (void *cls, |
620 | size_t size, | 523 | const struct GNUNET_DHT_MonitorGetMessage *msg) |
621 | void *buf) | ||
622 | { | 524 | { |
623 | struct GNUNET_DHT_Handle *handle = cls; | 525 | struct GNUNET_DHT_Handle *handle = cls; |
624 | struct PendingMessage *head; | 526 | struct GNUNET_DHT_MonitorHandle *mh; |
625 | size_t tsize; | ||
626 | 527 | ||
627 | 528 | for (mh = handle->monitor_head; NULL != mh; mh = mh->next) | |
628 | handle->th = NULL; | ||
629 | if (NULL == buf) | ||
630 | { | ||
631 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
632 | "Transmission to DHT service failed! Reconnecting!\n"); | ||
633 | do_disconnect (handle); | ||
634 | return 0; | ||
635 | } | ||
636 | if (NULL == (head = handle->pending_head)) | ||
637 | return 0; | ||
638 | |||
639 | tsize = ntohs (head->msg->size); | ||
640 | if (size < tsize) | ||
641 | { | ||
642 | process_pending_messages (handle); | ||
643 | return 0; | ||
644 | } | ||
645 | memcpy (buf, head->msg, tsize); | ||
646 | GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, | ||
647 | head); | ||
648 | head->in_pending_queue = GNUNET_NO; | ||
649 | if (NULL != head->cont) | ||
650 | { | ||
651 | head->cont (head->cont_cls); | ||
652 | head->cont = NULL; | ||
653 | head->cont_cls = NULL; | ||
654 | } | ||
655 | if (GNUNET_YES == head->free_on_send) | ||
656 | GNUNET_free (head); | ||
657 | process_pending_messages (handle); | ||
658 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
659 | "Forwarded request of %u bytes to DHT service\n", (unsigned int) tsize); | ||
660 | if (GNUNET_NO == handle->in_receive) | ||
661 | { | 529 | { |
662 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n"); | 530 | if (NULL == mh->get_cb) |
663 | handle->in_receive = GNUNET_YES; | 531 | continue; |
664 | 532 | if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) || | |
665 | GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, | 533 | (mh->type == ntohl (msg->type)) ) && |
666 | GNUNET_TIME_UNIT_FOREVER_REL); | 534 | ( (NULL == mh->key) || |
535 | (0 == memcmp (mh->key, | ||
536 | &msg->key, | ||
537 | sizeof (struct GNUNET_HashCode))) ) ) | ||
538 | mh->get_cb (mh->cb_cls, | ||
539 | ntohl (msg->options), | ||
540 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), | ||
541 | ntohl (msg->hop_count), | ||
542 | ntohl (msg->desired_replication_level), | ||
543 | ntohl (msg->get_path_length), | ||
544 | (struct GNUNET_PeerIdentity *) &msg[1], | ||
545 | &msg->key); | ||
667 | } | 546 | } |
668 | return tsize; | ||
669 | } | 547 | } |
670 | 548 | ||
671 | 549 | ||
672 | /** | 550 | /** |
673 | * Process a given reply that might match the given | 551 | * Validate a get response monitor message from the service. |
674 | * request. | ||
675 | * | 552 | * |
676 | * @param cls the `struct GNUNET_DHT_ClientResultMessage` | 553 | * @param cls The DHT handle. |
677 | * @param key query of the request | 554 | * @param msg monitor get response message from the service |
678 | * @param value the `struct GNUNET_DHT_RouteHandle` of a request matching the same key | 555 | * @return #GNUNET_OK if everything went fine, |
679 | * @return #GNUNET_YES to continue to iterate over all results, | 556 | * #GNUNET_SYSERR if the message is malformed. |
680 | * #GNUNET_NO if the reply is malformed or we found a matching request | ||
681 | */ | 557 | */ |
682 | static int | 558 | static int |
683 | process_reply (void *cls, | 559 | check_monitor_get_resp (void *cls, |
684 | const struct GNUNET_HashCode *key, | 560 | const struct GNUNET_DHT_MonitorGetRespMessage *msg) |
685 | void *value) | ||
686 | { | 561 | { |
687 | const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls; | 562 | size_t msize = ntohs (msg->header.size) - sizeof (*msg); |
688 | struct GNUNET_DHT_GetHandle *get_handle = value; | 563 | uint32_t getl = ntohl (msg->get_path_length); |
689 | const struct GNUNET_PeerIdentity *put_path; | 564 | uint32_t putl = ntohl (msg->put_path_length); |
690 | const struct GNUNET_PeerIdentity *get_path; | ||
691 | struct GNUNET_HashCode hc; | ||
692 | uint32_t put_path_length; | ||
693 | uint32_t get_path_length; | ||
694 | size_t data_length; | ||
695 | size_t msize; | ||
696 | size_t meta_length; | ||
697 | const void *data; | ||
698 | 565 | ||
699 | if (dht_msg->unique_id != get_handle->unique_id) | 566 | if ( (getl + putl < getl) || |
700 | { | 567 | ( (msize / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) ) |
701 | /* UID mismatch */ | ||
702 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
703 | "Ignoring reply for %s: UID mismatch: %llu/%llu\n", GNUNET_h2s (key), | ||
704 | dht_msg->unique_id, get_handle->unique_id); | ||
705 | return GNUNET_YES; | ||
706 | } | ||
707 | msize = ntohs (dht_msg->header.size); | ||
708 | put_path_length = ntohl (dht_msg->put_path_length); | ||
709 | get_path_length = ntohl (dht_msg->get_path_length); | ||
710 | meta_length = | ||
711 | sizeof (struct GNUNET_DHT_ClientResultMessage) + | ||
712 | sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length); | ||
713 | if ((msize < meta_length) || | ||
714 | (get_path_length > | ||
715 | GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) || | ||
716 | (put_path_length > | ||
717 | GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity))) | ||
718 | { | 568 | { |
719 | GNUNET_break (0); | 569 | GNUNET_break (0); |
720 | return GNUNET_NO; | 570 | return GNUNET_SYSERR; |
721 | } | 571 | } |
722 | data_length = msize - meta_length; | 572 | return GNUNET_OK; |
723 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Giving %u byte reply for %s to application\n", | ||
724 | (unsigned int) data_length, GNUNET_h2s (key)); | ||
725 | put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1]; | ||
726 | get_path = &put_path[put_path_length]; | ||
727 | data = &get_path[get_path_length]; | ||
728 | /* remember that we've seen this result */ | ||
729 | GNUNET_CRYPTO_hash (data, data_length, &hc); | ||
730 | if (get_handle->seen_results_size == get_handle->seen_results_end) | ||
731 | GNUNET_array_grow (get_handle->seen_results, | ||
732 | get_handle->seen_results_size, | ||
733 | get_handle->seen_results_size * 2 + 1); | ||
734 | GNUNET_assert (get_handle->seen_results_end == get_handle->seen_results_transmission_offset); | ||
735 | get_handle->seen_results[get_handle->seen_results_end++] = hc; | ||
736 | /* no need to block it explicitly, service already knows about it! */ | ||
737 | get_handle->seen_results_transmission_offset++; | ||
738 | get_handle->iter (get_handle->iter_cls, | ||
739 | GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key, | ||
740 | get_path, get_path_length, put_path, put_path_length, | ||
741 | ntohl (dht_msg->type), data_length, data); | ||
742 | return GNUNET_NO; | ||
743 | } | 573 | } |
744 | 574 | ||
745 | 575 | ||
746 | /** | 576 | /** |
747 | * Process a get monitor message from the service. | 577 | * Process a get response monitor message from the service. |
748 | * | 578 | * |
749 | * @param handle The DHT handle. | 579 | * @param cls The DHT handle. |
750 | * @param msg Monitor get message from the service. | 580 | * @param msg monitor get response message from the service |
751 | * @return #GNUNET_OK if everything went fine, | ||
752 | * #GNUNET_SYSERR if the message is malformed. | ||
753 | */ | 581 | */ |
754 | static int | 582 | static void |
755 | process_monitor_get_message (struct GNUNET_DHT_Handle *handle, | 583 | handle_monitor_get_resp (void *cls, |
756 | const struct GNUNET_DHT_MonitorGetMessage *msg) | 584 | const struct GNUNET_DHT_MonitorGetRespMessage *msg) |
757 | { | 585 | { |
758 | struct GNUNET_DHT_MonitorHandle *h; | 586 | struct GNUNET_DHT_Handle *handle = cls; |
759 | 587 | size_t msize = ntohs (msg->header.size) - sizeof (*msg); | |
760 | for (h = handle->monitor_head; NULL != h; h = h->next) | 588 | const struct GNUNET_PeerIdentity *path; |
589 | uint32_t getl = ntohl (msg->get_path_length); | ||
590 | uint32_t putl = ntohl (msg->put_path_length); | ||
591 | struct GNUNET_DHT_MonitorHandle *mh; | ||
592 | |||
593 | path = (const struct GNUNET_PeerIdentity *) &msg[1]; | ||
594 | for (mh = handle->monitor_head; NULL != mh; mh = mh->next) | ||
761 | { | 595 | { |
762 | int type_ok; | 596 | if (NULL == mh->get_resp_cb) |
763 | int key_ok; | 597 | continue; |
764 | 598 | if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) || | |
765 | type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); | 599 | (mh->type == ntohl(msg->type)) ) && |
766 | key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, | 600 | ( (NULL == mh->key) || |
767 | sizeof (struct GNUNET_HashCode))); | 601 | (0 == memcmp (mh->key, |
768 | if (type_ok && key_ok && (NULL != h->get_cb)) | 602 | &msg->key, |
769 | h->get_cb (h->cb_cls, | 603 | sizeof (struct GNUNET_HashCode))) ) ) |
770 | ntohl (msg->options), | 604 | mh->get_resp_cb (mh->cb_cls, |
771 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), | 605 | (enum GNUNET_BLOCK_Type) ntohl (msg->type), |
772 | ntohl (msg->hop_count), | 606 | path, |
773 | ntohl (msg->desired_replication_level), | 607 | getl, |
774 | ntohl (msg->get_path_length), | 608 | &path[getl], |
775 | (struct GNUNET_PeerIdentity *) &msg[1], | 609 | putl, |
776 | &msg->key); | 610 | GNUNET_TIME_absolute_ntoh(msg->expiration_time), |
611 | &msg->key, | ||
612 | (const void *) &path[getl + putl], | ||
613 | msize - sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); | ||
777 | } | 614 | } |
778 | return GNUNET_OK; | ||
779 | } | 615 | } |
780 | 616 | ||
781 | 617 | ||
782 | /** | 618 | /** |
783 | * Process a get response monitor message from the service. | 619 | * Check validity of a put monitor message from the service. |
784 | * | 620 | * |
785 | * @param handle The DHT handle. | 621 | * @param cls The DHT handle. |
786 | * @param msg monitor get response message from the service | 622 | * @param msg Monitor put message from the service. |
787 | * @return #GNUNET_OK if everything went fine, | 623 | * @return #GNUNET_OK if everything went fine, |
788 | * #GNUNET_SYSERR if the message is malformed. | 624 | * #GNUNET_SYSERR if the message is malformed. |
789 | */ | 625 | */ |
790 | static int | 626 | static int |
791 | process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle, | 627 | check_monitor_put (void *cls, |
792 | const struct GNUNET_DHT_MonitorGetRespMessage *msg) | 628 | const struct GNUNET_DHT_MonitorPutMessage *msg) |
793 | { | 629 | { |
794 | struct GNUNET_DHT_MonitorHandle *h; | ||
795 | struct GNUNET_PeerIdentity *path; | ||
796 | uint32_t getl; | ||
797 | uint32_t putl; | ||
798 | size_t msize; | 630 | size_t msize; |
631 | uint32_t putl; | ||
799 | 632 | ||
800 | msize = ntohs (msg->header.size); | 633 | msize = ntohs (msg->header.size) - sizeof (*msg); |
801 | path = (struct GNUNET_PeerIdentity *) &msg[1]; | ||
802 | getl = ntohl (msg->get_path_length); | ||
803 | putl = ntohl (msg->put_path_length); | 634 | putl = ntohl (msg->put_path_length); |
804 | if ( (getl + putl < getl) || | 635 | if ((msize / sizeof (struct GNUNET_PeerIdentity)) < putl) |
805 | ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) ) | ||
806 | { | 636 | { |
807 | GNUNET_break (0); | 637 | GNUNET_break (0); |
808 | return GNUNET_SYSERR; | 638 | return GNUNET_SYSERR; |
809 | } | 639 | } |
810 | for (h = handle->monitor_head; NULL != h; h = h->next) | ||
811 | { | ||
812 | int type_ok; | ||
813 | int key_ok; | ||
814 | |||
815 | type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); | ||
816 | key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, | ||
817 | sizeof (struct GNUNET_HashCode))); | ||
818 | if (type_ok && key_ok && (NULL != h->get_resp_cb)) | ||
819 | h->get_resp_cb (h->cb_cls, | ||
820 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), | ||
821 | path, getl, | ||
822 | &path[getl], putl, | ||
823 | GNUNET_TIME_absolute_ntoh(msg->expiration_time), | ||
824 | &msg->key, | ||
825 | (void *) &path[getl + putl], | ||
826 | msize - | ||
827 | sizeof (struct GNUNET_DHT_MonitorGetRespMessage) - | ||
828 | sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); | ||
829 | } | ||
830 | return GNUNET_OK; | 640 | return GNUNET_OK; |
831 | } | 641 | } |
832 | 642 | ||
@@ -834,95 +644,177 @@ process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle, | |||
834 | /** | 644 | /** |
835 | * Process a put monitor message from the service. | 645 | * Process a put monitor message from the service. |
836 | * | 646 | * |
837 | * @param handle The DHT handle. | 647 | * @param cls The DHT handle. |
648 | * @param msg Monitor put message from the service. | ||
649 | */ | ||
650 | static void | ||
651 | handle_monitor_put (void *cls, | ||
652 | const struct GNUNET_DHT_MonitorPutMessage *msg) | ||
653 | { | ||
654 | struct GNUNET_DHT_Handle *handle = cls; | ||
655 | size_t msize = ntohs (msg->header.size) - sizeof (*msg); | ||
656 | uint32_t putl = ntohl (msg->put_path_length); | ||
657 | const struct GNUNET_PeerIdentity *path; | ||
658 | struct GNUNET_DHT_MonitorHandle *mh; | ||
659 | |||
660 | path = (const struct GNUNET_PeerIdentity *) &msg[1]; | ||
661 | for (mh = handle->monitor_head; NULL != mh; mh = mh->next) | ||
662 | { | ||
663 | if (NULL == mh->put_cb) | ||
664 | continue; | ||
665 | if ( ( (GNUNET_BLOCK_TYPE_ANY == mh->type) || | ||
666 | (mh->type == ntohl(msg->type)) ) && | ||
667 | ( (NULL == mh->key) || | ||
668 | (0 == memcmp (mh->key, | ||
669 | &msg->key, | ||
670 | sizeof (struct GNUNET_HashCode))) ) ) | ||
671 | mh->put_cb (mh->cb_cls, | ||
672 | ntohl (msg->options), | ||
673 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), | ||
674 | ntohl (msg->hop_count), | ||
675 | ntohl (msg->desired_replication_level), | ||
676 | putl, | ||
677 | path, | ||
678 | GNUNET_TIME_absolute_ntoh(msg->expiration_time), | ||
679 | &msg->key, | ||
680 | (const void *) &path[putl], | ||
681 | msize - sizeof (struct GNUNET_PeerIdentity) * putl); | ||
682 | } | ||
683 | } | ||
684 | |||
685 | |||
686 | /** | ||
687 | * Verify that client result message received from the service is well-formed. | ||
688 | * | ||
689 | * @param cls The DHT handle. | ||
838 | * @param msg Monitor put message from the service. | 690 | * @param msg Monitor put message from the service. |
839 | * @return #GNUNET_OK if everything went fine, | 691 | * @return #GNUNET_OK if everything went fine, |
840 | * #GNUNET_SYSERR if the message is malformed. | 692 | * #GNUNET_SYSERR if the message is malformed. |
841 | */ | 693 | */ |
842 | static int | 694 | static int |
843 | process_monitor_put_message (struct GNUNET_DHT_Handle *handle, | 695 | check_client_result (void *cls, |
844 | const struct GNUNET_DHT_MonitorPutMessage *msg) | 696 | const struct GNUNET_DHT_ClientResultMessage *msg) |
845 | { | 697 | { |
846 | struct GNUNET_DHT_MonitorHandle *h; | 698 | size_t msize = ntohs (msg->header.size) - sizeof (*msg); |
847 | size_t msize; | 699 | uint32_t put_path_length = ntohl (msg->put_path_length); |
848 | struct GNUNET_PeerIdentity *path; | 700 | uint32_t get_path_length = ntohl (msg->get_path_length); |
849 | uint32_t putl; | 701 | size_t meta_length; |
850 | 702 | ||
851 | msize = ntohs (msg->header.size); | 703 | meta_length = |
852 | path = (struct GNUNET_PeerIdentity *) &msg[1]; | 704 | sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length); |
853 | putl = ntohl (msg->put_path_length); | 705 | if ( (msize < meta_length) || |
854 | if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl) | 706 | (get_path_length > |
707 | GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) || | ||
708 | (put_path_length > | ||
709 | GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ) | ||
855 | { | 710 | { |
856 | GNUNET_break (0); | 711 | GNUNET_break (0); |
857 | return GNUNET_SYSERR; | 712 | return GNUNET_SYSERR; |
858 | } | 713 | } |
859 | for (h = handle->monitor_head; NULL != h; h = h->next) | ||
860 | { | ||
861 | int type_ok; | ||
862 | int key_ok; | ||
863 | |||
864 | type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); | ||
865 | key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, | ||
866 | sizeof (struct GNUNET_HashCode))); | ||
867 | if (type_ok && key_ok && (NULL != h->put_cb)) | ||
868 | h->put_cb (h->cb_cls, | ||
869 | ntohl (msg->options), | ||
870 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), | ||
871 | ntohl (msg->hop_count), | ||
872 | ntohl (msg->desired_replication_level), | ||
873 | putl, path, | ||
874 | GNUNET_TIME_absolute_ntoh(msg->expiration_time), | ||
875 | &msg->key, | ||
876 | (void *) &path[putl], | ||
877 | msize - | ||
878 | sizeof (struct GNUNET_DHT_MonitorPutMessage) - | ||
879 | sizeof (struct GNUNET_PeerIdentity) * putl); | ||
880 | } | ||
881 | return GNUNET_OK; | 714 | return GNUNET_OK; |
882 | } | 715 | } |
883 | 716 | ||
884 | 717 | ||
885 | #if ENABLE_MALICIOUS | ||
886 | /** | 718 | /** |
887 | * Process a act malicious confirmation from service. | 719 | * Process a given reply that might match the given request. |
888 | * @param handle The DHT handle. | 720 | * |
889 | * @param msg confirmation message from the service. | 721 | * @param cls the `struct GNUNET_DHT_ClientResultMessage` |
890 | * @return #GNUNET_OK if everything went fine, | 722 | * @param key query of the request |
891 | * #GNUNET_SYSERR if the message is malformed. | 723 | * @param value the `struct GNUNET_DHT_GetHandle` of a request matching the same key |
724 | * @return #GNUNET_YES to continue to iterate over all results | ||
892 | */ | 725 | */ |
893 | static int | 726 | static int |
894 | process_act_malicious_confirmation_message (struct GNUNET_DHT_Handle *handle, | 727 | process_client_result (void *cls, |
895 | const struct GNUNET_DHT_ClientActMaliciousConfirmationMessage *msg) | 728 | const struct GNUNET_HashCode *key, |
729 | void *value) | ||
896 | { | 730 | { |
897 | struct GNUNET_DHT_ActMaliciousHandle *mh; | 731 | const struct GNUNET_DHT_ClientResultMessage *crm = cls; |
898 | GNUNET_DHT_PutContinuation cont; | 732 | struct GNUNET_DHT_GetHandle *get_handle = value; |
899 | void *cont_cls; | 733 | size_t msize = ntohs (crm->header.size) - sizeof (*crm); |
734 | uint32_t put_path_length = ntohl (crm->put_path_length); | ||
735 | uint32_t get_path_length = ntohl (crm->get_path_length); | ||
736 | const struct GNUNET_PeerIdentity *put_path; | ||
737 | const struct GNUNET_PeerIdentity *get_path; | ||
738 | struct GNUNET_HashCode hc; | ||
739 | size_t data_length; | ||
740 | size_t meta_length; | ||
741 | const void *data; | ||
900 | 742 | ||
901 | mh = handle->mh; | 743 | if (crm->unique_id != get_handle->unique_id) |
902 | if (NULL == mh) | 744 | { |
903 | return GNUNET_OK; | 745 | /* UID mismatch */ |
904 | cont = mh->cont; | 746 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
905 | cont_cls = mh->cont_cls; | 747 | "Ignoring reply for %s: UID mismatch: %llu/%llu\n", |
906 | if (NULL != cont) | 748 | GNUNET_h2s (key), |
907 | cont (cont_cls, GNUNET_OK); | 749 | crm->unique_id, |
750 | get_handle->unique_id); | ||
751 | return GNUNET_YES; | ||
752 | } | ||
753 | /* FIXME: might want to check that type matches */ | ||
754 | meta_length = | ||
755 | sizeof (struct GNUNET_PeerIdentity) * (get_path_length + put_path_length); | ||
756 | data_length = msize - meta_length; | ||
757 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
758 | "Giving %u byte reply for %s to application\n", | ||
759 | (unsigned int) data_length, | ||
760 | GNUNET_h2s (key)); | ||
761 | put_path = (const struct GNUNET_PeerIdentity *) &crm[1]; | ||
762 | get_path = &put_path[put_path_length]; | ||
763 | data = &get_path[get_path_length]; | ||
764 | /* remember that we've seen this result */ | ||
765 | GNUNET_CRYPTO_hash (data, | ||
766 | data_length, | ||
767 | &hc); | ||
768 | if (get_handle->seen_results_size == get_handle->seen_results_end) | ||
769 | GNUNET_array_grow (get_handle->seen_results, | ||
770 | get_handle->seen_results_size, | ||
771 | get_handle->seen_results_size * 2 + 1); | ||
772 | get_handle->seen_results[get_handle->seen_results_end++] = hc; | ||
773 | /* no need to block it explicitly, service already knows about it! */ | ||
774 | get_handle->iter (get_handle->iter_cls, | ||
775 | GNUNET_TIME_absolute_ntoh (crm->expiration), | ||
776 | key, | ||
777 | get_path, | ||
778 | get_path_length, | ||
779 | put_path, | ||
780 | put_path_length, | ||
781 | ntohl (crm->type), | ||
782 | data_length, | ||
783 | data); | ||
784 | return GNUNET_YES; | ||
785 | } | ||
908 | 786 | ||
909 | return GNUNET_OK; | 787 | |
788 | /** | ||
789 | * Process a client result message received from the service. | ||
790 | * | ||
791 | * @param cls The DHT handle. | ||
792 | * @param msg Monitor put message from the service. | ||
793 | */ | ||
794 | static void | ||
795 | handle_client_result (void *cls, | ||
796 | const struct GNUNET_DHT_ClientResultMessage *msg) | ||
797 | { | ||
798 | struct GNUNET_DHT_Handle *handle = cls; | ||
799 | |||
800 | GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests, | ||
801 | &msg->key, | ||
802 | &process_client_result, | ||
803 | (void *) msg); | ||
910 | } | 804 | } |
911 | #endif | ||
912 | 805 | ||
913 | 806 | ||
914 | /** | 807 | /** |
915 | * Process a put confirmation message from the service. | 808 | * Process a put confirmation message from the service. |
916 | * | 809 | * |
917 | * @param handle The DHT handle. | 810 | * @param cls The DHT handle. |
918 | * @param msg confirmation message from the service. | 811 | * @param msg confirmation message from the service. |
919 | * @return #GNUNET_OK if everything went fine, | ||
920 | * #GNUNET_SYSERR if the message is malformed. | ||
921 | */ | 812 | */ |
922 | static int | 813 | static void |
923 | process_put_confirmation_message (struct GNUNET_DHT_Handle *handle, | 814 | handle_put_confirmation (void *cls, |
924 | const struct GNUNET_DHT_ClientPutConfirmationMessage *msg) | 815 | const struct GNUNET_DHT_ClientPutConfirmationMessage *msg) |
925 | { | 816 | { |
817 | struct GNUNET_DHT_Handle *handle = cls; | ||
926 | struct GNUNET_DHT_PutHandle *ph; | 818 | struct GNUNET_DHT_PutHandle *ph; |
927 | GNUNET_DHT_PutContinuation cont; | 819 | GNUNET_DHT_PutContinuation cont; |
928 | void *cont_cls; | 820 | void *cont_cls; |
@@ -931,125 +823,62 @@ process_put_confirmation_message (struct GNUNET_DHT_Handle *handle, | |||
931 | if (ph->unique_id == msg->unique_id) | 823 | if (ph->unique_id == msg->unique_id) |
932 | break; | 824 | break; |
933 | if (NULL == ph) | 825 | if (NULL == ph) |
934 | return GNUNET_OK; | 826 | return; |
935 | cont = ph->cont; | 827 | cont = ph->cont; |
936 | cont_cls = ph->cont_cls; | 828 | cont_cls = ph->cont_cls; |
937 | GNUNET_DHT_put_cancel (ph); | 829 | GNUNET_DHT_put_cancel (ph); |
938 | if (NULL != cont) | 830 | if (NULL != cont) |
939 | cont (cont_cls, GNUNET_OK); | 831 | cont (cont_cls, |
940 | return GNUNET_OK; | 832 | GNUNET_OK); |
941 | } | 833 | } |
942 | 834 | ||
943 | 835 | ||
944 | /** | 836 | /** |
945 | * Handler for messages received from the DHT service | 837 | * Try to (re)connect to the DHT service. |
946 | * a demultiplexer which handles numerous message types | ||
947 | * | 838 | * |
948 | * @param cls the `struct GNUNET_DHT_Handle` | 839 | * @param h DHT handle to reconnect |
949 | * @param msg the incoming message | 840 | * @return #GNUNET_YES on success, #GNUNET_NO on failure. |
950 | */ | 841 | */ |
951 | static void | 842 | static int |
952 | service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | 843 | try_connect (struct GNUNET_DHT_Handle *h) |
953 | { | 844 | { |
954 | struct GNUNET_DHT_Handle *handle = cls; | 845 | GNUNET_MQ_hd_var_size (monitor_get, |
955 | const struct GNUNET_DHT_ClientResultMessage *dht_msg; | 846 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET, |
956 | uint16_t msize; | 847 | struct GNUNET_DHT_MonitorGetMessage); |
957 | int ret; | 848 | GNUNET_MQ_hd_var_size (monitor_get_resp, |
958 | 849 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP, | |
959 | 850 | struct GNUNET_DHT_MonitorGetRespMessage); | |
960 | if (NULL == msg) | 851 | GNUNET_MQ_hd_var_size (monitor_put, |
961 | { | 852 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT, |
962 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 853 | struct GNUNET_DHT_MonitorPutMessage); |
963 | "Error receiving data from DHT service, reconnecting\n"); | 854 | GNUNET_MQ_hd_var_size (client_result, |
964 | do_disconnect (handle); | 855 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT, |
965 | return; | 856 | struct GNUNET_DHT_ClientResultMessage); |
966 | } | 857 | GNUNET_MQ_hd_fixed_size (put_confirmation, |
967 | GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, | 858 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK, |
968 | GNUNET_TIME_UNIT_FOREVER_REL); | 859 | struct GNUNET_DHT_ClientPutConfirmationMessage); |
969 | ret = GNUNET_SYSERR; | 860 | struct GNUNET_MQ_MessageHandler handlers[] = { |
970 | msize = ntohs (msg->size); | 861 | make_monitor_get_handler (h), |
971 | switch (ntohs (msg->type)) | 862 | make_monitor_get_resp_handler (h), |
863 | make_monitor_put_handler (h), | ||
864 | make_client_result_handler (h), | ||
865 | make_put_confirmation_handler (h), | ||
866 | GNUNET_MQ_handler_end () | ||
867 | }; | ||
868 | if (NULL != h->mq) | ||
869 | return GNUNET_OK; | ||
870 | h->mq = GNUNET_CLIENT_connecT (h->cfg, | ||
871 | "dht", | ||
872 | handlers, | ||
873 | &mq_error_handler, | ||
874 | h); | ||
875 | if (NULL == h->mq) | ||
972 | { | 876 | { |
973 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET: | ||
974 | if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage)) | ||
975 | { | ||
976 | GNUNET_break (0); | ||
977 | break; | ||
978 | } | ||
979 | ret = process_monitor_get_message(handle, | ||
980 | (const struct GNUNET_DHT_MonitorGetMessage *) msg); | ||
981 | break; | ||
982 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP: | ||
983 | if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) | ||
984 | { | ||
985 | GNUNET_break (0); | ||
986 | break; | ||
987 | } | ||
988 | ret = process_monitor_get_resp_message(handle, | ||
989 | (const struct GNUNET_DHT_MonitorGetRespMessage *) msg); | ||
990 | break; | ||
991 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT: | ||
992 | if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage)) | ||
993 | { | ||
994 | GNUNET_break (0); | ||
995 | break; | ||
996 | } | ||
997 | ret = process_monitor_put_message(handle, | ||
998 | (const struct GNUNET_DHT_MonitorPutMessage *) msg); | ||
999 | break; | ||
1000 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP: | ||
1001 | /* Not implemented yet */ | ||
1002 | GNUNET_break(0); | ||
1003 | break; | ||
1004 | case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT: | ||
1005 | if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage)) | ||
1006 | { | ||
1007 | GNUNET_break (0); | ||
1008 | break; | ||
1009 | } | ||
1010 | dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg; | ||
1011 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1012 | "Received reply for `%s' from DHT service %p\n", | ||
1013 | GNUNET_h2s (&dht_msg->key), handle); | ||
1014 | GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests, | ||
1015 | &dht_msg->key, | ||
1016 | &process_reply, | ||
1017 | (void *) dht_msg); | ||
1018 | ret = GNUNET_OK; | ||
1019 | break; | ||
1020 | case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK: | ||
1021 | if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)) | ||
1022 | { | ||
1023 | GNUNET_break (0); | ||
1024 | break; | ||
1025 | } | ||
1026 | ret = process_put_confirmation_message (handle, | ||
1027 | (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg); | ||
1028 | break; | ||
1029 | #if ENABLE_MALICIOUS | ||
1030 | case GNUNET_MESSAGE_TYPE_DHT_CLIENT_ACT_MALICIOUS_OK: | ||
1031 | if(msize != sizeof (struct GNUNET_DHT_ClientActMaliciousConfirmationMessage)) | ||
1032 | { | ||
1033 | GNUNET_break (0); | ||
1034 | break; | ||
1035 | } | ||
1036 | ret = process_act_malicious_confirmation_message (handle, | ||
1037 | (const struct GNUNET_DHT_ClientActMaliciousConfirmationMessage*) msg); | ||
1038 | break; | ||
1039 | #endif | ||
1040 | default: | ||
1041 | GNUNET_break(0); | ||
1042 | LOG (GNUNET_ERROR_TYPE_WARNING, | 877 | LOG (GNUNET_ERROR_TYPE_WARNING, |
1043 | "Unknown DHT message type: %hu (%hu) size: %hu\n", | 878 | "Failed to connect to the DHT service!\n"); |
1044 | ntohs (msg->type), msg->type, msize); | 879 | return GNUNET_NO; |
1045 | break; | ||
1046 | } | ||
1047 | if (GNUNET_OK != ret) | ||
1048 | { | ||
1049 | GNUNET_break (0); | ||
1050 | do_disconnect (handle); | ||
1051 | return; | ||
1052 | } | 880 | } |
881 | return GNUNET_YES; | ||
1053 | } | 882 | } |
1054 | 883 | ||
1055 | 884 | ||
@@ -1069,9 +898,12 @@ GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1069 | 898 | ||
1070 | handle = GNUNET_new (struct GNUNET_DHT_Handle); | 899 | handle = GNUNET_new (struct GNUNET_DHT_Handle); |
1071 | handle->cfg = cfg; | 900 | handle->cfg = cfg; |
1072 | handle->uid_gen = | 901 | handle->uid_gen |
1073 | GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); | 902 | = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, |
1074 | handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len, GNUNET_YES); | 903 | UINT64_MAX); |
904 | handle->active_requests | ||
905 | = GNUNET_CONTAINER_multihashmap_create (ht_len, | ||
906 | GNUNET_YES); | ||
1075 | if (GNUNET_NO == try_connect (handle)) | 907 | if (GNUNET_NO == try_connect (handle)) |
1076 | { | 908 | { |
1077 | GNUNET_DHT_disconnect (handle); | 909 | GNUNET_DHT_disconnect (handle); |
@@ -1089,92 +921,33 @@ GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1089 | void | 921 | void |
1090 | GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) | 922 | GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) |
1091 | { | 923 | { |
1092 | struct PendingMessage *pm; | ||
1093 | struct GNUNET_DHT_PutHandle *ph; | 924 | struct GNUNET_DHT_PutHandle *ph; |
1094 | 925 | ||
1095 | GNUNET_assert (NULL != handle); | ||
1096 | GNUNET_assert (0 == | 926 | GNUNET_assert (0 == |
1097 | GNUNET_CONTAINER_multihashmap_size (handle->active_requests)); | 927 | GNUNET_CONTAINER_multihashmap_size (handle->active_requests)); |
1098 | if (NULL != handle->th) | ||
1099 | { | ||
1100 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); | ||
1101 | handle->th = NULL; | ||
1102 | } | ||
1103 | while (NULL != (pm = handle->pending_head)) | ||
1104 | { | ||
1105 | GNUNET_assert (GNUNET_YES == pm->in_pending_queue); | ||
1106 | GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, | ||
1107 | pm); | ||
1108 | pm->in_pending_queue = GNUNET_NO; | ||
1109 | GNUNET_assert (GNUNET_YES == pm->free_on_send); | ||
1110 | if (NULL != pm->cont) | ||
1111 | pm->cont (pm->cont_cls); | ||
1112 | GNUNET_free (pm); | ||
1113 | } | ||
1114 | while (NULL != (ph = handle->put_head)) | 928 | while (NULL != (ph = handle->put_head)) |
1115 | { | 929 | { |
1116 | GNUNET_break (NULL == ph->pending); | ||
1117 | if (NULL != ph->cont) | 930 | if (NULL != ph->cont) |
1118 | ph->cont (ph->cont_cls, GNUNET_SYSERR); | 931 | ph->cont (ph->cont_cls, |
932 | GNUNET_SYSERR); | ||
1119 | GNUNET_DHT_put_cancel (ph); | 933 | GNUNET_DHT_put_cancel (ph); |
1120 | } | 934 | } |
1121 | 935 | if (NULL != handle->mq) | |
1122 | if (NULL != handle->client) | ||
1123 | { | 936 | { |
1124 | GNUNET_CLIENT_disconnect (handle->client); | 937 | GNUNET_MQ_destroy (handle->mq); |
1125 | handle->client = NULL; | 938 | handle->mq = NULL; |
1126 | } | 939 | } |
1127 | if (NULL != handle->reconnect_task) | 940 | if (NULL != handle->reconnect_task) |
941 | { | ||
1128 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); | 942 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); |
943 | handle->reconnect_task = NULL; | ||
944 | } | ||
1129 | GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests); | 945 | GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests); |
1130 | GNUNET_free (handle); | 946 | GNUNET_free (handle); |
1131 | } | 947 | } |
1132 | 948 | ||
1133 | 949 | ||
1134 | /** | 950 | /** |
1135 | * Timeout for the transmission of a fire&forget-request. Clean it up. | ||
1136 | * | ||
1137 | * @param cls the `struct GNUNET_DHT_PutHandle *` | ||
1138 | */ | ||
1139 | static void | ||
1140 | timeout_put_request (void *cls) | ||
1141 | { | ||
1142 | struct GNUNET_DHT_PutHandle *ph = cls; | ||
1143 | struct GNUNET_DHT_Handle *handle = ph->dht_handle; | ||
1144 | |||
1145 | ph->timeout_task = NULL; | ||
1146 | if (NULL != ph->pending) | ||
1147 | { | ||
1148 | GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, | ||
1149 | ph->pending); | ||
1150 | ph->pending->in_pending_queue = GNUNET_NO; | ||
1151 | GNUNET_free (ph->pending); | ||
1152 | } | ||
1153 | if (NULL != ph->cont) | ||
1154 | ph->cont (ph->cont_cls, GNUNET_NO); | ||
1155 | GNUNET_CONTAINER_DLL_remove (handle->put_head, | ||
1156 | handle->put_tail, | ||
1157 | ph); | ||
1158 | GNUNET_free (ph); | ||
1159 | } | ||
1160 | |||
1161 | |||
1162 | /** | ||
1163 | * Function called whenever the PUT message leaves the queue. Sets | ||
1164 | * the message pointer in the put handle to NULL. | ||
1165 | * | ||
1166 | * @param cls the `struct GNUNET_DHT_PutHandle` | ||
1167 | */ | ||
1168 | static void | ||
1169 | mark_put_message_gone (void *cls) | ||
1170 | { | ||
1171 | struct GNUNET_DHT_PutHandle *ph = cls; | ||
1172 | |||
1173 | ph->pending = NULL; | ||
1174 | } | ||
1175 | |||
1176 | |||
1177 | /** | ||
1178 | * Perform a PUT operation storing data in the DHT. FIXME: we should | 951 | * Perform a PUT operation storing data in the DHT. FIXME: we should |
1179 | * change the protocol to get a confirmation for the PUT from the DHT | 952 | * change the protocol to get a confirmation for the PUT from the DHT |
1180 | * and call 'cont' only after getting the confirmation; otherwise, the | 953 | * and call 'cont' only after getting the confirmation; otherwise, the |
@@ -1190,26 +963,25 @@ mark_put_message_gone (void *cls) | |||
1190 | * @param size number of bytes in data; must be less than 64k | 963 | * @param size number of bytes in data; must be less than 64k |
1191 | * @param data the data to store | 964 | * @param data the data to store |
1192 | * @param exp desired expiration time for the value | 965 | * @param exp desired expiration time for the value |
1193 | * @param timeout how long to wait for transmission of this request | ||
1194 | * @param cont continuation to call when done (transmitting request to service) | 966 | * @param cont continuation to call when done (transmitting request to service) |
1195 | * You must not call #GNUNET_DHT_disconnect in this continuation | 967 | * You must not call #GNUNET_DHT_disconnect in this continuation |
1196 | * @param cont_cls closure for @a cont | 968 | * @param cont_cls closure for @a cont |
1197 | */ | 969 | */ |
1198 | struct GNUNET_DHT_PutHandle * | 970 | struct GNUNET_DHT_PutHandle * |
1199 | GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, | 971 | GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, |
1200 | const struct GNUNET_HashCode * key, | 972 | const struct GNUNET_HashCode *key, |
1201 | uint32_t desired_replication_level, | 973 | uint32_t desired_replication_level, |
1202 | enum GNUNET_DHT_RouteOption options, | 974 | enum GNUNET_DHT_RouteOption options, |
1203 | enum GNUNET_BLOCK_Type type, size_t size, | 975 | enum GNUNET_BLOCK_Type type, |
1204 | const void *data, | 976 | size_t size, |
977 | const void *data, | ||
1205 | struct GNUNET_TIME_Absolute exp, | 978 | struct GNUNET_TIME_Absolute exp, |
1206 | struct GNUNET_TIME_Relative timeout, | 979 | GNUNET_DHT_PutContinuation cont, |
1207 | GNUNET_DHT_PutContinuation cont, | ||
1208 | void *cont_cls) | 980 | void *cont_cls) |
1209 | { | 981 | { |
982 | struct GNUNET_MQ_Envelope *env; | ||
1210 | struct GNUNET_DHT_ClientPutMessage *put_msg; | 983 | struct GNUNET_DHT_ClientPutMessage *put_msg; |
1211 | size_t msize; | 984 | size_t msize; |
1212 | struct PendingMessage *pending; | ||
1213 | struct GNUNET_DHT_PutHandle *ph; | 985 | struct GNUNET_DHT_PutHandle *ph; |
1214 | 986 | ||
1215 | msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size; | 987 | msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size; |
@@ -1219,36 +991,30 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, | |||
1219 | GNUNET_break (0); | 991 | GNUNET_break (0); |
1220 | return NULL; | 992 | return NULL; |
1221 | } | 993 | } |
994 | if (NULL == handle->mq) | ||
995 | return NULL; | ||
1222 | ph = GNUNET_new (struct GNUNET_DHT_PutHandle); | 996 | ph = GNUNET_new (struct GNUNET_DHT_PutHandle); |
1223 | ph->dht_handle = handle; | 997 | ph->dht_handle = handle; |
1224 | ph->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, ph); | ||
1225 | ph->cont = cont; | 998 | ph->cont = cont; |
1226 | ph->cont_cls = cont_cls; | 999 | ph->cont_cls = cont_cls; |
1227 | ph->unique_id = ++handle->uid_gen; | 1000 | ph->unique_id = ++handle->uid_gen; |
1228 | pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize); | 1001 | GNUNET_CONTAINER_DLL_insert_tail (handle->put_head, |
1229 | ph->pending = pending; | 1002 | handle->put_tail, |
1230 | put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1]; | 1003 | ph); |
1231 | pending->msg = &put_msg->header; | 1004 | env = GNUNET_MQ_msg_extra (put_msg, |
1232 | pending->handle = handle; | 1005 | size, |
1233 | pending->cont = &mark_put_message_gone; | 1006 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT); |
1234 | pending->cont_cls = ph; | 1007 | put_msg->type = htonl ((uint32_t) type); |
1235 | pending->free_on_send = GNUNET_YES; | ||
1236 | put_msg->header.size = htons (msize); | ||
1237 | put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT); | ||
1238 | put_msg->type = htonl (type); | ||
1239 | put_msg->options = htonl ((uint32_t) options); | 1008 | put_msg->options = htonl ((uint32_t) options); |
1240 | put_msg->desired_replication_level = htonl (desired_replication_level); | 1009 | put_msg->desired_replication_level = htonl (desired_replication_level); |
1241 | put_msg->unique_id = ph->unique_id; | 1010 | put_msg->unique_id = ph->unique_id; |
1242 | put_msg->expiration = GNUNET_TIME_absolute_hton (exp); | 1011 | put_msg->expiration = GNUNET_TIME_absolute_hton (exp); |
1243 | put_msg->key = *key; | 1012 | put_msg->key = *key; |
1244 | memcpy (&put_msg[1], data, size); | 1013 | memcpy (&put_msg[1], |
1245 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, | 1014 | data, |
1246 | pending); | 1015 | size); |
1247 | pending->in_pending_queue = GNUNET_YES; | 1016 | GNUNET_MQ_send (handle->mq, |
1248 | GNUNET_CONTAINER_DLL_insert_tail (handle->put_head, | 1017 | env); |
1249 | handle->put_tail, | ||
1250 | ph); | ||
1251 | process_pending_messages (handle); | ||
1252 | return ph; | 1018 | return ph; |
1253 | } | 1019 | } |
1254 | 1020 | ||
@@ -1269,19 +1035,6 @@ GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph) | |||
1269 | { | 1035 | { |
1270 | struct GNUNET_DHT_Handle *handle = ph->dht_handle; | 1036 | struct GNUNET_DHT_Handle *handle = ph->dht_handle; |
1271 | 1037 | ||
1272 | if (NULL != ph->pending) | ||
1273 | { | ||
1274 | GNUNET_CONTAINER_DLL_remove (handle->pending_head, | ||
1275 | handle->pending_tail, | ||
1276 | ph->pending); | ||
1277 | GNUNET_free (ph->pending); | ||
1278 | ph->pending = NULL; | ||
1279 | } | ||
1280 | if (ph->timeout_task != NULL) | ||
1281 | { | ||
1282 | GNUNET_SCHEDULER_cancel (ph->timeout_task); | ||
1283 | ph->timeout_task = NULL; | ||
1284 | } | ||
1285 | GNUNET_CONTAINER_DLL_remove (handle->put_head, | 1038 | GNUNET_CONTAINER_DLL_remove (handle->put_head, |
1286 | handle->put_tail, | 1039 | handle->put_tail, |
1287 | ph); | 1040 | ph); |
@@ -1302,21 +1055,22 @@ GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph) | |||
1302 | * @param xquery extended query data (can be NULL, depending on type) | 1055 | * @param xquery extended query data (can be NULL, depending on type) |
1303 | * @param xquery_size number of bytes in @a xquery | 1056 | * @param xquery_size number of bytes in @a xquery |
1304 | * @param iter function to call on each result | 1057 | * @param iter function to call on each result |
1305 | * @param iter_cls closure for iter | 1058 | * @param iter_cls closure for @a iter |
1306 | * @return handle to stop the async get | 1059 | * @return handle to stop the async get |
1307 | */ | 1060 | */ |
1308 | struct GNUNET_DHT_GetHandle * | 1061 | struct GNUNET_DHT_GetHandle * |
1309 | GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, | 1062 | GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, |
1310 | enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode * key, | 1063 | enum GNUNET_BLOCK_Type type, |
1064 | const struct GNUNET_HashCode *key, | ||
1311 | uint32_t desired_replication_level, | 1065 | uint32_t desired_replication_level, |
1312 | enum GNUNET_DHT_RouteOption options, const void *xquery, | 1066 | enum GNUNET_DHT_RouteOption options, |
1313 | size_t xquery_size, GNUNET_DHT_GetIterator iter, | 1067 | const void *xquery, |
1068 | size_t xquery_size, | ||
1069 | GNUNET_DHT_GetIterator iter, | ||
1314 | void *iter_cls) | 1070 | void *iter_cls) |
1315 | { | 1071 | { |
1316 | struct GNUNET_DHT_ClientGetMessage *get_msg; | 1072 | struct GNUNET_DHT_GetHandle *gh; |
1317 | struct GNUNET_DHT_GetHandle *get_handle; | ||
1318 | size_t msize; | 1073 | size_t msize; |
1319 | struct PendingMessage *pending; | ||
1320 | 1074 | ||
1321 | msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size; | 1075 | msize = sizeof (struct GNUNET_DHT_ClientGetMessage) + xquery_size; |
1322 | if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || | 1076 | if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || |
@@ -1325,37 +1079,31 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, | |||
1325 | GNUNET_break (0); | 1079 | GNUNET_break (0); |
1326 | return NULL; | 1080 | return NULL; |
1327 | } | 1081 | } |
1328 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending query for %s to DHT %p\n", | 1082 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1329 | GNUNET_h2s (key), handle); | 1083 | "Sending query for %s to DHT %p\n", |
1330 | pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize); | 1084 | GNUNET_h2s (key), |
1331 | get_msg = (struct GNUNET_DHT_ClientGetMessage *) &pending[1]; | 1085 | handle); |
1332 | pending->msg = &get_msg->header; | 1086 | gh = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle) + |
1333 | pending->handle = handle; | 1087 | xquery_size); |
1334 | pending->free_on_send = GNUNET_NO; | 1088 | gh->iter = iter; |
1335 | get_msg->header.size = htons (msize); | 1089 | gh->iter_cls = iter_cls; |
1336 | get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET); | 1090 | gh->dht_handle = handle; |
1337 | get_msg->options = htonl ((uint32_t) options); | 1091 | gh->key = *key; |
1338 | get_msg->desired_replication_level = htonl (desired_replication_level); | 1092 | gh->unique_id = ++handle->uid_gen; |
1339 | get_msg->type = htonl (type); | 1093 | gh->xquery_size = xquery_size; |
1340 | get_msg->key = *key; | 1094 | gh->desired_replication_level = desired_replication_level; |
1341 | get_msg->unique_id = ++handle->uid_gen; | 1095 | gh->type = type; |
1342 | memcpy (&get_msg[1], xquery, xquery_size); | 1096 | gh->options = options; |
1343 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, | 1097 | memcpy (&gh[1], |
1344 | pending); | 1098 | xquery, |
1345 | pending->in_pending_queue = GNUNET_YES; | 1099 | xquery_size); |
1346 | get_handle = GNUNET_new (struct GNUNET_DHT_GetHandle); | ||
1347 | get_handle->key = *key; | ||
1348 | get_handle->dht_handle = handle; | ||
1349 | get_handle->iter = iter; | ||
1350 | get_handle->iter_cls = iter_cls; | ||
1351 | get_handle->message = pending; | ||
1352 | get_handle->unique_id = get_msg->unique_id; | ||
1353 | GNUNET_CONTAINER_multihashmap_put (handle->active_requests, | 1100 | GNUNET_CONTAINER_multihashmap_put (handle->active_requests, |
1354 | &get_handle->key, | 1101 | &gh->key, |
1355 | get_handle, | 1102 | gh, |
1356 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 1103 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
1357 | process_pending_messages (handle); | 1104 | if (NULL != handle->mq) |
1358 | return get_handle; | 1105 | send_get (gh); |
1106 | return gh; | ||
1359 | } | 1107 | } |
1360 | 1108 | ||
1361 | 1109 | ||
@@ -1375,8 +1123,10 @@ GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle, | |||
1375 | const struct GNUNET_HashCode *results) | 1123 | const struct GNUNET_HashCode *results) |
1376 | { | 1124 | { |
1377 | unsigned int needed; | 1125 | unsigned int needed; |
1126 | unsigned int had; | ||
1378 | 1127 | ||
1379 | needed = get_handle->seen_results_end + num_results; | 1128 | had = get_handle->seen_results_end; |
1129 | needed = had + num_results; | ||
1380 | if (needed > get_handle->seen_results_size) | 1130 | if (needed > get_handle->seen_results_size) |
1381 | GNUNET_array_grow (get_handle->seen_results, | 1131 | GNUNET_array_grow (get_handle->seen_results, |
1382 | get_handle->seen_results_size, | 1132 | get_handle->seen_results_size, |
@@ -1385,8 +1135,9 @@ GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle, | |||
1385 | results, | 1135 | results, |
1386 | num_results * sizeof (struct GNUNET_HashCode)); | 1136 | num_results * sizeof (struct GNUNET_HashCode)); |
1387 | get_handle->seen_results_end += num_results; | 1137 | get_handle->seen_results_end += num_results; |
1388 | queue_filter_messages (get_handle); | 1138 | if (NULL != get_handle->dht_handle->mq) |
1389 | process_pending_messages (get_handle->dht_handle); | 1139 | send_get_known_results (get_handle, |
1140 | had); | ||
1390 | } | 1141 | } |
1391 | 1142 | ||
1392 | 1143 | ||
@@ -1398,52 +1149,33 @@ GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle, | |||
1398 | void | 1149 | void |
1399 | GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) | 1150 | GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) |
1400 | { | 1151 | { |
1401 | struct GNUNET_DHT_Handle *handle; | 1152 | struct GNUNET_DHT_Handle *handle = get_handle->dht_handle; |
1402 | const struct GNUNET_DHT_ClientGetMessage *get_msg; | ||
1403 | struct GNUNET_DHT_ClientGetStopMessage *stop_msg; | ||
1404 | struct PendingMessage *pending; | ||
1405 | 1153 | ||
1406 | handle = get_handle->message->handle; | ||
1407 | get_msg = | ||
1408 | (const struct GNUNET_DHT_ClientGetMessage *) get_handle->message->msg; | ||
1409 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1154 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1410 | "Sending STOP for %s to DHT via %p\n", | 1155 | "Sending STOP for %s to DHT via %p\n", |
1411 | GNUNET_h2s (&get_msg->key), handle); | 1156 | GNUNET_h2s (&get_handle->key), |
1412 | /* generate STOP */ | 1157 | handle); |
1413 | pending = | 1158 | if (NULL != handle->mq) |
1414 | GNUNET_malloc (sizeof (struct PendingMessage) + | 1159 | { |
1415 | sizeof (struct GNUNET_DHT_ClientGetStopMessage)); | 1160 | struct GNUNET_MQ_Envelope *env; |
1416 | stop_msg = (struct GNUNET_DHT_ClientGetStopMessage *) &pending[1]; | 1161 | struct GNUNET_DHT_ClientGetStopMessage *stop_msg; |
1417 | pending->msg = &stop_msg->header; | 1162 | |
1418 | pending->handle = handle; | 1163 | env = GNUNET_MQ_msg (stop_msg, |
1419 | pending->free_on_send = GNUNET_YES; | 1164 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP); |
1420 | stop_msg->header.size = | 1165 | stop_msg->reserved = htonl (0); |
1421 | htons (sizeof (struct GNUNET_DHT_ClientGetStopMessage)); | 1166 | stop_msg->unique_id = get_handle->unique_id; |
1422 | stop_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP); | 1167 | stop_msg->key = get_handle->key; |
1423 | stop_msg->reserved = htonl (0); | 1168 | GNUNET_MQ_send (handle->mq, |
1424 | stop_msg->unique_id = get_msg->unique_id; | 1169 | env); |
1425 | stop_msg->key = get_msg->key; | 1170 | } |
1426 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, | ||
1427 | pending); | ||
1428 | pending->in_pending_queue = GNUNET_YES; | ||
1429 | |||
1430 | /* remove 'GET' from active status */ | ||
1431 | GNUNET_assert (GNUNET_YES == | 1171 | GNUNET_assert (GNUNET_YES == |
1432 | GNUNET_CONTAINER_multihashmap_remove (handle->active_requests, | 1172 | GNUNET_CONTAINER_multihashmap_remove (handle->active_requests, |
1433 | &get_handle->key, | 1173 | &get_handle->key, |
1434 | get_handle)); | 1174 | get_handle)); |
1435 | if (GNUNET_YES == get_handle->message->in_pending_queue) | ||
1436 | { | ||
1437 | GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, | ||
1438 | get_handle->message); | ||
1439 | get_handle->message->in_pending_queue = GNUNET_NO; | ||
1440 | } | ||
1441 | GNUNET_free (get_handle->message); | ||
1442 | GNUNET_array_grow (get_handle->seen_results, | 1175 | GNUNET_array_grow (get_handle->seen_results, |
1443 | get_handle->seen_results_end, | 1176 | get_handle->seen_results_end, |
1444 | 0); | 1177 | 0); |
1445 | GNUNET_free (get_handle); | 1178 | GNUNET_free (get_handle); |
1446 | process_pending_messages (handle); | ||
1447 | } | 1179 | } |
1448 | 1180 | ||
1449 | 1181 | ||
@@ -1468,141 +1200,62 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, | |||
1468 | GNUNET_DHT_MonitorPutCB put_cb, | 1200 | GNUNET_DHT_MonitorPutCB put_cb, |
1469 | void *cb_cls) | 1201 | void *cb_cls) |
1470 | { | 1202 | { |
1471 | struct GNUNET_DHT_MonitorHandle *h; | 1203 | struct GNUNET_DHT_MonitorHandle *mh; |
1472 | struct GNUNET_DHT_MonitorStartStopMessage *m; | 1204 | |
1473 | struct PendingMessage *pending; | 1205 | mh = GNUNET_new (struct GNUNET_DHT_MonitorHandle); |
1474 | 1206 | mh->get_cb = get_cb; | |
1475 | h = GNUNET_new (struct GNUNET_DHT_MonitorHandle); | 1207 | mh->get_resp_cb = get_resp_cb; |
1476 | GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h); | 1208 | mh->put_cb = put_cb; |
1477 | 1209 | mh->cb_cls = cb_cls; | |
1478 | h->get_cb = get_cb; | 1210 | mh->type = type; |
1479 | h->get_resp_cb = get_resp_cb; | 1211 | mh->dht_handle = handle; |
1480 | h->put_cb = put_cb; | ||
1481 | h->cb_cls = cb_cls; | ||
1482 | h->type = type; | ||
1483 | h->dht_handle = handle; | ||
1484 | if (NULL != key) | 1212 | if (NULL != key) |
1485 | { | 1213 | { |
1486 | h->key = GNUNET_new (struct GNUNET_HashCode); | 1214 | mh->key = GNUNET_new (struct GNUNET_HashCode); |
1487 | *h->key = *key; | 1215 | *mh->key = *key; |
1488 | } | ||
1489 | |||
1490 | pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) + | ||
1491 | sizeof (struct PendingMessage)); | ||
1492 | m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1]; | ||
1493 | pending->msg = &m->header; | ||
1494 | pending->handle = handle; | ||
1495 | pending->free_on_send = GNUNET_YES; | ||
1496 | m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_START); | ||
1497 | m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage)); | ||
1498 | m->type = htonl(type); | ||
1499 | m->get = htons(NULL != get_cb); | ||
1500 | m->get_resp = htons(NULL != get_resp_cb); | ||
1501 | m->put = htons(NULL != put_cb); | ||
1502 | if (NULL != key) { | ||
1503 | m->filter_key = htons(1); | ||
1504 | memcpy (&m->key, key, sizeof(struct GNUNET_HashCode)); | ||
1505 | } | 1216 | } |
1506 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, | 1217 | GNUNET_CONTAINER_DLL_insert (handle->monitor_head, |
1507 | pending); | 1218 | handle->monitor_tail, |
1508 | pending->in_pending_queue = GNUNET_YES; | 1219 | mh); |
1509 | process_pending_messages (handle); | 1220 | if (NULL != handle->mq) |
1510 | 1221 | send_monitor_start (mh); | |
1511 | return h; | 1222 | return mh; |
1512 | } | 1223 | } |
1513 | 1224 | ||
1514 | 1225 | ||
1515 | /** | 1226 | /** |
1516 | * Stop monitoring. | 1227 | * Stop monitoring. |
1517 | * | 1228 | * |
1518 | * @param handle The handle to the monitor request returned by monitor_start. | 1229 | * @param mh The handle to the monitor request returned by monitor_start. |
1519 | * | 1230 | * |
1520 | * On return get_handle will no longer be valid, caller must not use again!!! | 1231 | * On return get_handle will no longer be valid, caller must not use again!!! |
1521 | */ | 1232 | */ |
1522 | void | 1233 | void |
1523 | GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle) | 1234 | GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *mh) |
1524 | { | 1235 | { |
1236 | struct GNUNET_DHT_Handle *handle = mh->dht_handle; | ||
1525 | struct GNUNET_DHT_MonitorStartStopMessage *m; | 1237 | struct GNUNET_DHT_MonitorStartStopMessage *m; |
1526 | struct PendingMessage *pending; | 1238 | struct GNUNET_MQ_Envelope *env; |
1527 | 1239 | ||
1528 | GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head, | 1240 | GNUNET_CONTAINER_DLL_remove (handle->monitor_head, |
1529 | handle->dht_handle->monitor_tail, | 1241 | handle->monitor_tail, |
1530 | handle); | 1242 | mh); |
1531 | 1243 | env = GNUNET_MQ_msg (m, | |
1532 | pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) + | 1244 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP); |
1533 | sizeof (struct PendingMessage)); | 1245 | m->type = htonl (mh->type); |
1534 | m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1]; | 1246 | m->get = htons (NULL != mh->get_cb); |
1535 | pending->msg = &m->header; | 1247 | m->get_resp = htons(NULL != mh->get_resp_cb); |
1536 | pending->handle = handle->dht_handle; | 1248 | m->put = htons (NULL != mh->put_cb); |
1537 | pending->free_on_send = GNUNET_YES; | 1249 | if (NULL != mh->key) |
1538 | m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP); | ||
1539 | m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage)); | ||
1540 | m->type = htonl(handle->type); | ||
1541 | m->get = htons (NULL != handle->get_cb); | ||
1542 | m->get_resp = htons(NULL != handle->get_resp_cb); | ||
1543 | m->put = htons (NULL != handle->put_cb); | ||
1544 | if (NULL != handle->key) | ||
1545 | { | 1250 | { |
1546 | m->filter_key = htons (1); | 1251 | m->filter_key = htons (1); |
1547 | m->key = *handle->key; | 1252 | m->key = *mh->key; |
1548 | } | ||
1549 | GNUNET_CONTAINER_DLL_insert (handle->dht_handle->pending_head, | ||
1550 | handle->dht_handle->pending_tail, | ||
1551 | pending); | ||
1552 | pending->in_pending_queue = GNUNET_YES; | ||
1553 | process_pending_messages (handle->dht_handle); | ||
1554 | |||
1555 | GNUNET_free_non_null (handle->key); | ||
1556 | GNUNET_free (handle); | ||
1557 | } | ||
1558 | |||
1559 | |||
1560 | #if ENABLE_MALICIOUS | ||
1561 | /** | ||
1562 | * Turn the DHT service to act malicious. | ||
1563 | * | ||
1564 | * @param handle the DHT handle | ||
1565 | * @param action 1 to make the service malicious; 0 to make it benign | ||
1566 | * @param cont continuation to call when done (transmitting request to service) | ||
1567 | * @param cont_cls closure for @a cont | ||
1568 | */ | ||
1569 | struct GNUNET_DHT_ActMaliciousHandle * | ||
1570 | GNUNET_DHT_act_malicious (struct GNUNET_DHT_Handle *handle, | ||
1571 | unsigned int action, | ||
1572 | GNUNET_DHT_PutContinuation cont, | ||
1573 | void *cont_cls) | ||
1574 | { | ||
1575 | struct GNUNET_DHT_ActMaliciousMessage *amm; | ||
1576 | struct GNUNET_DHT_ActMaliciousHandle *mh; | ||
1577 | struct PendingMessage *pending; | ||
1578 | size_t msize; | ||
1579 | |||
1580 | msize = sizeof(struct GNUNET_DHT_ActMaliciousMessage); | ||
1581 | if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | ||
1582 | { | ||
1583 | GNUNET_break(0); | ||
1584 | return NULL; | ||
1585 | } | 1253 | } |
1586 | mh = GNUNET_new (struct GNUNET_DHT_ActMaliciousHandle); | 1254 | GNUNET_MQ_send (handle->mq, |
1587 | mh->dht_handle = handle; | 1255 | env); |
1588 | mh->cont = cont; | 1256 | GNUNET_free_non_null (mh->key); |
1589 | mh->cont_cls = cont_cls; | 1257 | GNUNET_free (mh); |
1590 | pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize); | ||
1591 | amm = (struct GNUNET_DHT_ActMaliciousMessage *)&pending[1]; | ||
1592 | pending->msg = &amm->header; | ||
1593 | pending->handle = handle; | ||
1594 | pending->free_on_send = GNUNET_YES; | ||
1595 | amm->header.size = htons (msize); | ||
1596 | amm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ACT_MALICIOUS); | ||
1597 | amm->action = action; | ||
1598 | handle->mh = mh; | ||
1599 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, | ||
1600 | pending); | ||
1601 | pending->in_pending_queue = GNUNET_YES; | ||
1602 | process_pending_messages (handle); | ||
1603 | return mh; | ||
1604 | } | 1258 | } |
1605 | #endif | ||
1606 | 1259 | ||
1607 | 1260 | ||
1608 | /* end of dht_api.c */ | 1261 | /* end of dht_api.c */ |