diff options
author | Nathan S. Evans <evans@in.tum.de> | 2010-04-19 15:16:38 +0000 |
---|---|---|
committer | Nathan S. Evans <evans@in.tum.de> | 2010-04-19 15:16:38 +0000 |
commit | c5fee7d95eaa1695999c12d059b0aa4bc566d836 (patch) | |
tree | 062ba8d6e248ccb219d081bfcab6ed3790f95676 /src/dht | |
parent | c268decf67ed3178b8f9beee5b6fff11ee0c0037 (diff) | |
download | gnunet-c5fee7d95eaa1695999c12d059b0aa4bc566d836.tar.gz gnunet-c5fee7d95eaa1695999c12d059b0aa4bc566d836.zip |
dht api fixes, it works again (for me)
Diffstat (limited to 'src/dht')
-rw-r--r-- | src/dht/Makefile.am | 1 | ||||
-rw-r--r-- | src/dht/dht.h | 17 | ||||
-rw-r--r-- | src/dht/dht_api.c | 661 | ||||
-rw-r--r-- | src/dht/gnunet-dht-get-peer.c | 27 | ||||
-rw-r--r-- | src/dht/gnunet-service-dht.c | 172 | ||||
-rw-r--r-- | src/dht/test_dht_api.c | 66 |
6 files changed, 584 insertions, 360 deletions
diff --git a/src/dht/Makefile.am b/src/dht/Makefile.am index 1a50800b6..1677cf465 100644 --- a/src/dht/Makefile.am +++ b/src/dht/Makefile.am | |||
@@ -68,6 +68,7 @@ test_dht_api_SOURCES = \ | |||
68 | test_dht_api.c | 68 | test_dht_api.c |
69 | test_dht_api_LDADD = \ | 69 | test_dht_api_LDADD = \ |
70 | $(top_builddir)/src/util/libgnunetutil.la \ | 70 | $(top_builddir)/src/util/libgnunetutil.la \ |
71 | $(top_builddir)/src/hello/libgnunethello.la \ | ||
71 | $(top_builddir)/src/dht/libgnunetdht.la | 72 | $(top_builddir)/src/dht/libgnunetdht.la |
72 | 73 | ||
73 | EXTRA_DIST = \ | 74 | EXTRA_DIST = \ |
diff --git a/src/dht/dht.h b/src/dht/dht.h index 2bafc3694..93ac9fa69 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h | |||
@@ -34,12 +34,13 @@ typedef void (*GNUNET_DHT_MessageReceivedHandler) (void *cls, | |||
34 | * msg); | 34 | * msg); |
35 | 35 | ||
36 | /** | 36 | /** |
37 | * FIXME. | 37 | * Message which indicates the DHT should cancel outstanding |
38 | * requests and discard any state. | ||
38 | */ | 39 | */ |
39 | struct GNUNET_DHT_StopMessage | 40 | struct GNUNET_DHT_StopMessage |
40 | { | 41 | { |
41 | /** | 42 | /** |
42 | * Type: GNUNET_MESSAGE_TYPE_DHT_MESSAGE | 43 | * Type: GNUNET_MESSAGE_TYPE_DHT_STOP |
43 | */ | 44 | */ |
44 | struct GNUNET_MessageHeader header; | 45 | struct GNUNET_MessageHeader header; |
45 | 46 | ||
@@ -57,7 +58,8 @@ struct GNUNET_DHT_StopMessage | |||
57 | 58 | ||
58 | 59 | ||
59 | /** | 60 | /** |
60 | * Generic DHT message, wrapper for other message types | 61 | * Generic DHT message, indicates that a route request |
62 | * should be issued. | ||
61 | */ | 63 | */ |
62 | struct GNUNET_DHT_RouteMessage | 64 | struct GNUNET_DHT_RouteMessage |
63 | { | 65 | { |
@@ -77,7 +79,8 @@ struct GNUNET_DHT_RouteMessage | |||
77 | GNUNET_HashCode key; | 79 | GNUNET_HashCode key; |
78 | 80 | ||
79 | /** | 81 | /** |
80 | * Unique ID identifying this request | 82 | * Unique ID identifying this request, if 0 then |
83 | * the client will not expect a response | ||
81 | */ | 84 | */ |
82 | uint64_t unique_id GNUNET_PACKED; | 85 | uint64_t unique_id GNUNET_PACKED; |
83 | 86 | ||
@@ -86,12 +89,6 @@ struct GNUNET_DHT_RouteMessage | |||
86 | */ | 89 | */ |
87 | uint32_t desired_replication_level GNUNET_PACKED; | 90 | uint32_t desired_replication_level GNUNET_PACKED; |
88 | 91 | ||
89 | /** | ||
90 | * Is this message uniquely identified? If so it will | ||
91 | * be fire and forget, if not we will wait for a receipt | ||
92 | * from the service. | ||
93 | */ | ||
94 | uint32_t unique GNUNET_PACKED; | ||
95 | 92 | ||
96 | /* GNUNET_MessageHeader *enc actual DHT message, copied to end of this dealy do */ | 93 | /* GNUNET_MessageHeader *enc actual DHT message, copied to end of this dealy do */ |
97 | 94 | ||
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 5675cef50..fda836d69 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c | |||
@@ -24,6 +24,11 @@ | |||
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | * @author Nathan Evans | 25 | * @author Nathan Evans |
26 | * | 26 | * |
27 | * TODO: retransmission of pending requests maybe happens now, at least | ||
28 | * the code is in place to do so. Need to add checks when api calls | ||
29 | * happen to check if retransmission is in progress, and if so set | ||
30 | * the single pending message for transmission once the list of | ||
31 | * retries are done. | ||
27 | */ | 32 | */ |
28 | 33 | ||
29 | #include "platform.h" | 34 | #include "platform.h" |
@@ -67,16 +72,23 @@ struct PendingMessage | |||
67 | void *cont_cls; | 72 | void *cont_cls; |
68 | 73 | ||
69 | /** | 74 | /** |
70 | * Whether or not to await verification the message | 75 | * Unique ID for this request |
71 | * was received by the service | ||
72 | */ | 76 | */ |
73 | size_t is_unique; | 77 | uint64_t unique_id; |
78 | |||
79 | }; | ||
74 | 80 | ||
81 | struct PendingMessageList | ||
82 | { | ||
75 | /** | 83 | /** |
76 | * Unique ID for this request | 84 | * This is a singly linked list. |
77 | */ | 85 | */ |
78 | uint64_t unique_id; | 86 | struct PendingMessageList *next; |
79 | 87 | ||
88 | /** | ||
89 | * The pending message. | ||
90 | */ | ||
91 | struct PendingMessage *message; | ||
80 | }; | 92 | }; |
81 | 93 | ||
82 | struct GNUNET_DHT_GetContext | 94 | struct GNUNET_DHT_GetContext |
@@ -108,8 +120,7 @@ struct GNUNET_DHT_FindPeerContext | |||
108 | }; | 120 | }; |
109 | 121 | ||
110 | /** | 122 | /** |
111 | * Handle to control a unique operation (one that is | 123 | * Handle to a route request |
112 | * expected to return results) | ||
113 | */ | 124 | */ |
114 | struct GNUNET_DHT_RouteHandle | 125 | struct GNUNET_DHT_RouteHandle |
115 | { | 126 | { |
@@ -138,37 +149,16 @@ struct GNUNET_DHT_RouteHandle | |||
138 | * Main handle to this DHT api | 149 | * Main handle to this DHT api |
139 | */ | 150 | */ |
140 | struct GNUNET_DHT_Handle *dht_handle; | 151 | struct GNUNET_DHT_Handle *dht_handle; |
141 | }; | ||
142 | 152 | ||
143 | /** | ||
144 | * Handle for a non unique request, holds callback | ||
145 | * which needs to be called before we allow other | ||
146 | * messages to be processed and sent to the DHT service | ||
147 | */ | ||
148 | struct GNUNET_DHT_NonUniqueHandle | ||
149 | { | ||
150 | /** | 153 | /** |
151 | * Key that this get request is for | 154 | * The actual message sent for this request, |
155 | * used for retransmitting requests on service | ||
156 | * failure/reconnect. Freed on route_stop. | ||
152 | */ | 157 | */ |
153 | GNUNET_HashCode key; | 158 | struct GNUNET_DHT_RouteMessage *message; |
154 | |||
155 | /** | ||
156 | * Type of data get request was for | ||
157 | */ | ||
158 | uint32_t type; | ||
159 | |||
160 | /** | ||
161 | * Continuation to call on service | ||
162 | * confirmation of message receipt. | ||
163 | */ | ||
164 | GNUNET_SCHEDULER_Task cont; | ||
165 | |||
166 | /** | ||
167 | * Send continuation cls | ||
168 | */ | ||
169 | void *cont_cls; | ||
170 | }; | 159 | }; |
171 | 160 | ||
161 | |||
172 | /** | 162 | /** |
173 | * Handle to control a get operation. | 163 | * Handle to control a get operation. |
174 | */ | 164 | */ |
@@ -185,6 +175,7 @@ struct GNUNET_DHT_GetHandle | |||
185 | struct GNUNET_DHT_GetContext get_context; | 175 | struct GNUNET_DHT_GetContext get_context; |
186 | }; | 176 | }; |
187 | 177 | ||
178 | |||
188 | /** | 179 | /** |
189 | * Handle to control a find peer operation. | 180 | * Handle to control a find peer operation. |
190 | */ | 181 | */ |
@@ -202,6 +193,27 @@ struct GNUNET_DHT_FindPeerHandle | |||
202 | }; | 193 | }; |
203 | 194 | ||
204 | 195 | ||
196 | enum DHT_Retransmit_Stage | ||
197 | { | ||
198 | /** | ||
199 | * The API is not retransmitting anything at this time. | ||
200 | */ | ||
201 | DHT_NOT_RETRANSMITTING, | ||
202 | |||
203 | /** | ||
204 | * The API is retransmitting, and nothing has been single | ||
205 | * queued for sending. | ||
206 | */ | ||
207 | DHT_RETRANSMITTING, | ||
208 | |||
209 | /** | ||
210 | * The API is retransmitting, and a single message has been | ||
211 | * queued for transmission once finished. | ||
212 | */ | ||
213 | DHT_RETRANSMITTING_MESSAGE_QUEUED | ||
214 | }; | ||
215 | |||
216 | |||
205 | /** | 217 | /** |
206 | * Connection to the DHT service. | 218 | * Connection to the DHT service. |
207 | */ | 219 | */ |
@@ -242,16 +254,27 @@ struct GNUNET_DHT_Handle | |||
242 | struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests; | 254 | struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests; |
243 | 255 | ||
244 | /** | 256 | /** |
245 | * Non unique handle. If set don't schedule another non | 257 | * Generator for unique ids. |
246 | * unique request. | ||
247 | */ | 258 | */ |
248 | struct GNUNET_DHT_NonUniqueHandle *non_unique_request; | 259 | uint64_t uid_gen; |
249 | 260 | ||
250 | /** | 261 | /** |
251 | * Generator for unique ids. | 262 | * Are we currently retransmitting requests? If so queue a _single_ |
263 | * new request when received. | ||
252 | */ | 264 | */ |
253 | uint64_t uid_gen; | 265 | enum DHT_Retransmit_Stage retransmit_stage; |
254 | 266 | ||
267 | /** | ||
268 | * Linked list of retranmissions, to be used in the event | ||
269 | * of a dht service disconnect/reconnect. | ||
270 | */ | ||
271 | struct PendingMessageList *retransmissions; | ||
272 | |||
273 | /** | ||
274 | * A single pending message allowed to be scheduled | ||
275 | * during retransmission phase. | ||
276 | */ | ||
277 | struct PendingMessage *retransmission_buffer; | ||
255 | }; | 278 | }; |
256 | 279 | ||
257 | 280 | ||
@@ -269,6 +292,253 @@ hash_from_uid (uint64_t uid, | |||
269 | *((uint64_t*)hash) = uid; | 292 | *((uint64_t*)hash) = uid; |
270 | } | 293 | } |
271 | 294 | ||
295 | /** | ||
296 | * Iterator callback to retransmit each outstanding request | ||
297 | * because the connection to the DHT service went down (and | ||
298 | * came back). | ||
299 | * | ||
300 | * | ||
301 | */ | ||
302 | static int retransmit_iterator (void *cls, | ||
303 | const GNUNET_HashCode * key, | ||
304 | void *value) | ||
305 | { | ||
306 | struct GNUNET_DHT_RouteHandle *route_handle = value; | ||
307 | struct PendingMessageList *pending_message_list; | ||
308 | |||
309 | pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage)); | ||
310 | pending_message_list->message = (struct PendingMessage *)&pending_message_list[1]; | ||
311 | pending_message_list->message->msg = &route_handle->message->header; | ||
312 | pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever(); | ||
313 | pending_message_list->message->cont = NULL; | ||
314 | pending_message_list->message->cont_cls = NULL; | ||
315 | pending_message_list->message->unique_id = route_handle->uid; | ||
316 | /* Add the new pending message to the front of the retransmission list */ | ||
317 | pending_message_list->next = route_handle->dht_handle->retransmissions; | ||
318 | |||
319 | return GNUNET_OK; | ||
320 | } | ||
321 | |||
322 | /** | ||
323 | * Try to (re)connect to the dht service. | ||
324 | * | ||
325 | * @return GNUNET_YES on success, GNUNET_NO on failure. | ||
326 | */ | ||
327 | static int | ||
328 | try_connect (struct GNUNET_DHT_Handle *handle) | ||
329 | { | ||
330 | if (handle->client != NULL) | ||
331 | return GNUNET_OK; | ||
332 | handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg); | ||
333 | if (handle->client != NULL) | ||
334 | return GNUNET_YES; | ||
335 | #if DEBUG_STATISTICS | ||
336 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
337 | _("Failed to connect to the dht service!\n")); | ||
338 | #endif | ||
339 | return GNUNET_NO; | ||
340 | } | ||
341 | |||
342 | /** | ||
343 | * Send complete (or failed), call continuation if we have one. | ||
344 | */ | ||
345 | static void | ||
346 | finish (struct GNUNET_DHT_Handle *handle, int code) | ||
347 | { | ||
348 | struct PendingMessage *pos = handle->current; | ||
349 | GNUNET_HashCode uid_hash; | ||
350 | hash_from_uid (pos->unique_id, &uid_hash); | ||
351 | #if DEBUG_DHT_API | ||
352 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API"); | ||
353 | #endif | ||
354 | GNUNET_assert (pos != NULL); | ||
355 | |||
356 | if (pos->cont != NULL) | ||
357 | { | ||
358 | if (code == GNUNET_SYSERR) | ||
359 | GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, | ||
360 | pos->cont_cls, | ||
361 | GNUNET_SCHEDULER_REASON_TIMEOUT); | ||
362 | else | ||
363 | GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, | ||
364 | pos->cont_cls, | ||
365 | GNUNET_SCHEDULER_REASON_PREREQ_DONE); | ||
366 | } | ||
367 | |||
368 | if (pos->unique_id != 0) | ||
369 | GNUNET_free(pos->msg); | ||
370 | GNUNET_free (pos); | ||
371 | handle->current = NULL; | ||
372 | } | ||
373 | |||
374 | /** | ||
375 | * Transmit the next pending message, called by notify_transmit_ready | ||
376 | */ | ||
377 | static size_t | ||
378 | transmit_pending (void *cls, size_t size, void *buf) | ||
379 | { | ||
380 | struct GNUNET_DHT_Handle *handle = cls; | ||
381 | size_t tsize; | ||
382 | |||
383 | #if DEBUG_DHT_API | ||
384 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
385 | "`%s': In transmit_pending\n", "DHT API"); | ||
386 | #endif | ||
387 | if (buf == NULL) | ||
388 | { | ||
389 | #if DEBUG_DHT_API | ||
390 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
391 | "`%s': In transmit_pending buf is NULL\n", "DHT API"); | ||
392 | #endif | ||
393 | finish (handle, GNUNET_SYSERR); | ||
394 | return 0; | ||
395 | } | ||
396 | |||
397 | handle->th = NULL; | ||
398 | |||
399 | if (handle->current != NULL) | ||
400 | { | ||
401 | tsize = ntohs (handle->current->msg->size); | ||
402 | if (size >= tsize) | ||
403 | { | ||
404 | #if DEBUG_DHT_API | ||
405 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
406 | "`%s': Sending message size %d\n", "DHT API", tsize); | ||
407 | #endif | ||
408 | memcpy (buf, handle->current->msg, tsize); | ||
409 | finish (handle, GNUNET_OK); | ||
410 | return tsize; | ||
411 | } | ||
412 | else | ||
413 | { | ||
414 | return 0; | ||
415 | } | ||
416 | } | ||
417 | /* Have no pending request */ | ||
418 | return 0; | ||
419 | } | ||
420 | |||
421 | /** | ||
422 | * Try to send messages from list of messages to send | ||
423 | */ | ||
424 | static void | ||
425 | process_pending_message (struct GNUNET_DHT_Handle *handle) | ||
426 | { | ||
427 | |||
428 | if (handle->current == NULL) | ||
429 | return; /* action already pending */ | ||
430 | if (GNUNET_YES != try_connect (handle)) | ||
431 | { | ||
432 | finish (handle, GNUNET_SYSERR); | ||
433 | return; | ||
434 | } | ||
435 | |||
436 | if (NULL == | ||
437 | (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, | ||
438 | ntohs (handle-> | ||
439 | current->msg-> | ||
440 | size), | ||
441 | handle->current-> | ||
442 | timeout, GNUNET_YES, | ||
443 | &transmit_pending, | ||
444 | handle))) | ||
445 | { | ||
446 | #if DEBUG_DHT_API | ||
447 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
448 | "Failed to transmit request to dht service.\n"); | ||
449 | #endif | ||
450 | finish (handle, GNUNET_SYSERR); | ||
451 | return; | ||
452 | } | ||
453 | #if DEBUG_DHT_API | ||
454 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
455 | "`%s': Scheduled sending message of size %d to service\n", | ||
456 | "DHT API", ntohs (handle->current->msg->size)); | ||
457 | #endif | ||
458 | } | ||
459 | |||
460 | /** | ||
461 | * Send complete (or failed), call continuation if we have one. | ||
462 | * Forward declaration. | ||
463 | */ | ||
464 | static void | ||
465 | finish_retransmission (struct GNUNET_DHT_Handle *handle, int code); | ||
466 | |||
467 | /* Forward declaration */ | ||
468 | static size_t | ||
469 | transmit_pending_retransmission (void *cls, size_t size, void *buf); | ||
470 | |||
471 | /** | ||
472 | * Try to send messages from list of messages to send | ||
473 | */ | ||
474 | static void | ||
475 | process_pending_retransmissions (struct GNUNET_DHT_Handle *handle) | ||
476 | { | ||
477 | |||
478 | if (handle->current == NULL) | ||
479 | return; /* action already pending */ | ||
480 | if (GNUNET_YES != try_connect (handle)) | ||
481 | { | ||
482 | finish_retransmission (handle, GNUNET_SYSERR); | ||
483 | return; | ||
484 | } | ||
485 | |||
486 | if (NULL == | ||
487 | (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, | ||
488 | ntohs (handle-> | ||
489 | current->msg-> | ||
490 | size), | ||
491 | handle->current-> | ||
492 | timeout, GNUNET_YES, | ||
493 | &transmit_pending_retransmission, | ||
494 | handle))) | ||
495 | { | ||
496 | #if DEBUG_DHT_API | ||
497 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
498 | "Failed to transmit request to dht service.\n"); | ||
499 | #endif | ||
500 | finish_retransmission (handle, GNUNET_SYSERR); | ||
501 | return; | ||
502 | } | ||
503 | #if DEBUG_DHT_API | ||
504 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
505 | "`%s': Scheduled sending message of size %d to service\n", | ||
506 | "DHT API", ntohs (handle->current->msg->size)); | ||
507 | #endif | ||
508 | } | ||
509 | |||
510 | /** | ||
511 | * Send complete (or failed), call continuation if we have one. | ||
512 | */ | ||
513 | static void | ||
514 | finish_retransmission (struct GNUNET_DHT_Handle *handle, int code) | ||
515 | { | ||
516 | struct PendingMessage *pos = handle->current; | ||
517 | struct PendingMessageList *pending_list; | ||
518 | #if DEBUG_DHT_API | ||
519 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API"); | ||
520 | #endif | ||
521 | GNUNET_assert (pos == handle->retransmissions->message); | ||
522 | pending_list = handle->retransmissions; | ||
523 | handle->retransmissions = handle->retransmissions->next; | ||
524 | GNUNET_free (pending_list); | ||
525 | |||
526 | if (handle->retransmissions == NULL) | ||
527 | { | ||
528 | handle->retransmit_stage = DHT_NOT_RETRANSMITTING; | ||
529 | } | ||
530 | |||
531 | if (handle->retransmissions != NULL) | ||
532 | { | ||
533 | handle->current = handle->retransmissions->message; | ||
534 | process_pending_retransmissions(handle); | ||
535 | } | ||
536 | else if (handle->retransmission_buffer != NULL) | ||
537 | { | ||
538 | handle->current = handle->retransmission_buffer; | ||
539 | process_pending_message(handle); | ||
540 | } | ||
541 | } | ||
272 | 542 | ||
273 | /** | 543 | /** |
274 | * Handler for messages received from the DHT service | 544 | * Handler for messages received from the DHT service |
@@ -286,8 +556,6 @@ service_message_handler (void *cls, | |||
286 | uint64_t uid; | 556 | uint64_t uid; |
287 | GNUNET_HashCode uid_hash; | 557 | GNUNET_HashCode uid_hash; |
288 | size_t enc_size; | 558 | size_t enc_size; |
289 | /* TODO: find out message type, handle callbacks for different types of messages. | ||
290 | * Should be a non unique acknowledgment, or unique result. */ | ||
291 | 559 | ||
292 | if (msg == NULL) | 560 | if (msg == NULL) |
293 | { | 561 | { |
@@ -300,8 +568,11 @@ service_message_handler (void *cls, | |||
300 | handle->client = GNUNET_CLIENT_connect (handle->sched, | 568 | handle->client = GNUNET_CLIENT_connect (handle->sched, |
301 | "dht", | 569 | "dht", |
302 | handle->cfg); | 570 | handle->cfg); |
303 | /* FIXME: re-transmit *all* of our GET requests AND re-start | 571 | |
304 | receiving responses! */ | 572 | handle->retransmit_stage = DHT_RETRANSMITTING; |
573 | GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle); | ||
574 | handle->current = handle->retransmissions->message; | ||
575 | process_pending_retransmissions(handle); | ||
305 | return; | 576 | return; |
306 | } | 577 | } |
307 | 578 | ||
@@ -341,37 +612,6 @@ service_message_handler (void *cls, | |||
341 | 612 | ||
342 | break; | 613 | break; |
343 | } | 614 | } |
344 | /* FIXME: we don't want these anymore, call continuation once message is sent. */ | ||
345 | /* | ||
346 | case GNUNET_MESSAGE_TYPE_DHT_STOP: | ||
347 | { | ||
348 | stop_msg = (struct GNUNET_DHT_StopMessage *) msg; | ||
349 | uid = GNUNET_ntohll (stop_msg->unique_id); | ||
350 | #if DEBUG_DHT_API | ||
351 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
352 | "`%s': Received response to message (uid %llu), current uid %llu\n", | ||
353 | "DHT API", uid, handle->current->unique_id); | ||
354 | #endif | ||
355 | if (handle->current->unique_id == uid) | ||
356 | { | ||
357 | #if DEBUG_DHT_API | ||
358 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
359 | "`%s': Have pending confirmation for this message!\n", | ||
360 | "DHT API", uid); | ||
361 | #endif | ||
362 | if (handle->current->cont != NULL) | ||
363 | GNUNET_SCHEDULER_add_continuation (handle->sched, | ||
364 | handle->current->cont, | ||
365 | handle->current->cont_cls, | ||
366 | GNUNET_SCHEDULER_REASON_PREREQ_DONE); | ||
367 | |||
368 | GNUNET_free (handle->current->msg); | ||
369 | GNUNET_free (handle->current); | ||
370 | handle->current = NULL; | ||
371 | } | ||
372 | break; | ||
373 | } | ||
374 | */ | ||
375 | default: | 615 | default: |
376 | { | 616 | { |
377 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 617 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
@@ -407,6 +647,7 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched, | |||
407 | handle->cfg = cfg; | 647 | handle->cfg = cfg; |
408 | handle->sched = sched; | 648 | handle->sched = sched; |
409 | handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg); | 649 | handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg); |
650 | handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1); | ||
410 | if (handle->client == NULL) | 651 | if (handle->client == NULL) |
411 | { | 652 | { |
412 | GNUNET_free (handle); | 653 | GNUNET_free (handle); |
@@ -451,40 +692,10 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) | |||
451 | GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); | 692 | GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); |
452 | handle->client = NULL; | 693 | handle->client = NULL; |
453 | } | 694 | } |
454 | /* Either assert that outstanding_requests is empty */ | ||
455 | /* FIXME: handle->outstanding_requests not freed! */ | ||
456 | GNUNET_free (handle); | ||
457 | } | ||
458 | |||
459 | |||
460 | /** | ||
461 | * Send complete (or failed), call continuation if we have one. | ||
462 | */ | ||
463 | static void | ||
464 | finish (struct GNUNET_DHT_Handle *handle, int code) | ||
465 | { | ||
466 | struct PendingMessage *pos = handle->current; | ||
467 | #if DEBUG_DHT_API | ||
468 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API"); | ||
469 | #endif | ||
470 | GNUNET_assert (pos != NULL); | ||
471 | |||
472 | |||
473 | if (pos->cont != NULL) | ||
474 | { | ||
475 | if (code == GNUNET_SYSERR) | ||
476 | GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, | ||
477 | pos->cont_cls, | ||
478 | GNUNET_SCHEDULER_REASON_TIMEOUT); | ||
479 | else | ||
480 | GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, | ||
481 | pos->cont_cls, | ||
482 | GNUNET_SCHEDULER_REASON_PREREQ_DONE); | ||
483 | } | ||
484 | 695 | ||
485 | GNUNET_free (pos->msg); | 696 | GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0); |
486 | GNUNET_free (pos); | 697 | GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests); |
487 | handle->current = NULL; | 698 | GNUNET_free (handle); |
488 | } | 699 | } |
489 | 700 | ||
490 | 701 | ||
@@ -492,7 +703,7 @@ finish (struct GNUNET_DHT_Handle *handle, int code) | |||
492 | * Transmit the next pending message, called by notify_transmit_ready | 703 | * Transmit the next pending message, called by notify_transmit_ready |
493 | */ | 704 | */ |
494 | static size_t | 705 | static size_t |
495 | transmit_pending (void *cls, size_t size, void *buf) | 706 | transmit_pending_retransmission (void *cls, size_t size, void *buf) |
496 | { | 707 | { |
497 | struct GNUNET_DHT_Handle *handle = cls; | 708 | struct GNUNET_DHT_Handle *handle = cls; |
498 | size_t tsize; | 709 | size_t tsize; |
@@ -507,8 +718,7 @@ transmit_pending (void *cls, size_t size, void *buf) | |||
507 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 718 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
508 | "`%s': In transmit_pending buf is NULL\n", "DHT API"); | 719 | "`%s': In transmit_pending buf is NULL\n", "DHT API"); |
509 | #endif | 720 | #endif |
510 | /* FIXME: free associated resources or summat */ | 721 | finish_retransmission (handle, GNUNET_SYSERR); |
511 | finish (handle, GNUNET_SYSERR); | ||
512 | return 0; | 722 | return 0; |
513 | } | 723 | } |
514 | 724 | ||
@@ -524,7 +734,7 @@ transmit_pending (void *cls, size_t size, void *buf) | |||
524 | "`%s': Sending message size %d\n", "DHT API", tsize); | 734 | "`%s': Sending message size %d\n", "DHT API", tsize); |
525 | #endif | 735 | #endif |
526 | memcpy (buf, handle->current->msg, tsize); | 736 | memcpy (buf, handle->current->msg, tsize); |
527 | finish (handle, GNUNET_OK); | 737 | finish_retransmission (handle, GNUNET_OK); |
528 | return tsize; | 738 | return tsize; |
529 | } | 739 | } |
530 | else | 740 | else |
@@ -538,66 +748,6 @@ transmit_pending (void *cls, size_t size, void *buf) | |||
538 | 748 | ||
539 | 749 | ||
540 | /** | 750 | /** |
541 | * Try to (re)connect to the dht service. | ||
542 | * | ||
543 | * @return GNUNET_YES on success, GNUNET_NO on failure. | ||
544 | */ | ||
545 | static int | ||
546 | try_connect (struct GNUNET_DHT_Handle *handle) | ||
547 | { | ||
548 | if (handle->client != NULL) | ||
549 | return GNUNET_OK; | ||
550 | handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg); | ||
551 | if (handle->client != NULL) | ||
552 | return GNUNET_YES; | ||
553 | #if DEBUG_STATISTICS | ||
554 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
555 | _("Failed to connect to the dht service!\n")); | ||
556 | #endif | ||
557 | return GNUNET_NO; | ||
558 | } | ||
559 | |||
560 | |||
561 | /** | ||
562 | * Try to send messages from list of messages to send | ||
563 | */ | ||
564 | static void | ||
565 | process_pending_message (struct GNUNET_DHT_Handle *handle) | ||
566 | { | ||
567 | |||
568 | if (handle->current == NULL) | ||
569 | return; /* action already pending */ | ||
570 | if (GNUNET_YES != try_connect (handle)) | ||
571 | { | ||
572 | finish (handle, GNUNET_SYSERR); | ||
573 | return; | ||
574 | } | ||
575 | |||
576 | if (NULL == | ||
577 | (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, | ||
578 | ntohs (handle-> | ||
579 | current->msg-> | ||
580 | size), | ||
581 | handle->current-> | ||
582 | timeout, GNUNET_YES, | ||
583 | &transmit_pending, | ||
584 | handle))) | ||
585 | { | ||
586 | #if DEBUG_DHT_API | ||
587 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
588 | "Failed to transmit request to dht service.\n"); | ||
589 | #endif | ||
590 | finish (handle, GNUNET_SYSERR); | ||
591 | return; | ||
592 | } | ||
593 | #if DEBUG_DHT_API | ||
594 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
595 | "`%s': Scheduled sending message of size %d to service\n", | ||
596 | "DHT API", ntohs (handle->current->msg->size)); | ||
597 | #endif | ||
598 | } | ||
599 | |||
600 | /** | ||
601 | * Iterator called on each result obtained from a generic route | 751 | * Iterator called on each result obtained from a generic route |
602 | * operation | 752 | * operation |
603 | */ | 753 | */ |
@@ -633,20 +783,31 @@ void | |||
633 | find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply) | 783 | find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply) |
634 | { | 784 | { |
635 | struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls; | 785 | struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls; |
786 | struct GNUNET_MessageHeader *hello; | ||
787 | size_t hello_size; | ||
636 | 788 | ||
637 | #if DEBUG_DHT_API | 789 | if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) |
790 | { | ||
638 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 791 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
639 | "Find peer iterator called.\n"); | 792 | "Received wrong type of response to a find peer request...\n"); |
640 | #endif | 793 | return; |
641 | if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_HELLO) | 794 | } |
642 | return; | 795 | |
643 | 796 | ||
644 | GNUNET_assert (ntohs (reply->size) >= | 797 | GNUNET_assert (ntohs (reply->size) >= |
645 | sizeof (struct GNUNET_MessageHeader)); | 798 | sizeof (struct GNUNET_MessageHeader)); |
799 | hello_size = ntohs(reply->size) - sizeof(struct GNUNET_MessageHeader); | ||
800 | hello = (struct GNUNET_MessageHeader *)&reply[1]; | ||
646 | 801 | ||
802 | if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO) | ||
803 | { | ||
804 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
805 | "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO"); | ||
806 | return; | ||
807 | } | ||
647 | find_peer_handle->find_peer_context.proc (find_peer_handle-> | 808 | find_peer_handle->find_peer_context.proc (find_peer_handle-> |
648 | find_peer_context.proc_cls, | 809 | find_peer_context.proc_cls, |
649 | (struct GNUNET_HELLO_Message *)reply); | 810 | (struct GNUNET_HELLO_Message *)hello); |
650 | } | 811 | } |
651 | 812 | ||
652 | /** | 813 | /** |
@@ -685,36 +846,38 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, | |||
685 | struct GNUNET_DHT_RouteHandle *route_handle; | 846 | struct GNUNET_DHT_RouteHandle *route_handle; |
686 | struct PendingMessage *pending; | 847 | struct PendingMessage *pending; |
687 | struct GNUNET_DHT_RouteMessage *message; | 848 | struct GNUNET_DHT_RouteMessage *message; |
688 | size_t expects_response; | ||
689 | uint16_t msize; | 849 | uint16_t msize; |
690 | GNUNET_HashCode uid_key; | 850 | GNUNET_HashCode uid_key; |
691 | uint64_t uid; | ||
692 | 851 | ||
693 | if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | 852 | if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) |
694 | { | 853 | { |
695 | GNUNET_break (0); | 854 | GNUNET_break (0); |
696 | return NULL; | 855 | return NULL; |
697 | } | 856 | } |
698 | expects_response = GNUNET_YES; | 857 | |
699 | if (iter == NULL) | 858 | route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle)); |
700 | expects_response = GNUNET_NO; | 859 | memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode)); |
701 | uid = handle->uid_gen++; | 860 | route_handle->iter = iter; |
702 | if (expects_response) | 861 | route_handle->iter_cls = iter_cls; |
862 | route_handle->dht_handle = handle; | ||
863 | if (iter != NULL) | ||
703 | { | 864 | { |
704 | route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle)); | 865 | route_handle->uid = handle->uid_gen++; |
705 | memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode)); | 866 | hash_from_uid (route_handle->uid, &uid_key); |
706 | route_handle->iter = iter; | ||
707 | route_handle->iter_cls = iter_cls; | ||
708 | route_handle->dht_handle = handle; | ||
709 | route_handle->uid = uid; | ||
710 | #if DEBUG_DHT_API | ||
711 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
712 | "`%s': Unique ID is %llu\n", "DHT API", uid); | ||
713 | #endif | ||
714 | GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests, | 867 | GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests, |
715 | &uid_key, route_handle, | 868 | &uid_key, route_handle, |
716 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 869 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
717 | } | 870 | } |
871 | else | ||
872 | { | ||
873 | route_handle->uid = 0; | ||
874 | } | ||
875 | |||
876 | #if DEBUG_DHT_API | ||
877 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
878 | "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid); | ||
879 | #endif | ||
880 | |||
718 | msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size); | 881 | msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size); |
719 | message = GNUNET_malloc (msize); | 882 | message = GNUNET_malloc (msize); |
720 | message->header.size = htons (msize); | 883 | message->header.size = htons (msize); |
@@ -722,18 +885,25 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, | |||
722 | memcpy (&message->key, key, sizeof (GNUNET_HashCode)); | 885 | memcpy (&message->key, key, sizeof (GNUNET_HashCode)); |
723 | message->options = htonl (options); | 886 | message->options = htonl (options); |
724 | message->desired_replication_level = htonl (options); | 887 | message->desired_replication_level = htonl (options); |
725 | message->unique = htonl (expects_response); | 888 | message->unique_id = GNUNET_htonll (route_handle->uid); |
726 | message->unique_id = GNUNET_htonll (uid); | ||
727 | memcpy (&message[1], enc, ntohs (enc->size)); | 889 | memcpy (&message[1], enc, ntohs (enc->size)); |
728 | pending = GNUNET_malloc (sizeof (struct PendingMessage)); | 890 | pending = GNUNET_malloc (sizeof (struct PendingMessage)); |
729 | pending->msg = &message->header; | 891 | pending->msg = &message->header; |
730 | pending->timeout = timeout; | 892 | pending->timeout = timeout; |
731 | pending->cont = cont; | 893 | pending->cont = cont; |
732 | pending->cont_cls = cont_cls; | 894 | pending->cont_cls = cont_cls; |
733 | pending->unique_id = uid; | 895 | pending->unique_id = route_handle->uid; |
734 | GNUNET_assert (handle->current == NULL); | 896 | if (handle->current == NULL) |
735 | handle->current = pending; | 897 | { |
736 | process_pending_message (handle); | 898 | handle->current = pending; |
899 | process_pending_message (handle); | ||
900 | } | ||
901 | else if ((handle->current != NULL) && (handle->retransmit_stage == DHT_RETRANSMITTING)) | ||
902 | { | ||
903 | handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; | ||
904 | handle->retransmission_buffer = pending; | ||
905 | } | ||
906 | route_handle->message = message; | ||
737 | return route_handle; | 907 | return route_handle; |
738 | } | 908 | } |
739 | 909 | ||
@@ -762,9 +932,9 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, | |||
762 | GNUNET_SCHEDULER_Task cont, void *cont_cls) | 932 | GNUNET_SCHEDULER_Task cont, void *cont_cls) |
763 | { | 933 | { |
764 | struct GNUNET_DHT_GetHandle *get_handle; | 934 | struct GNUNET_DHT_GetHandle *get_handle; |
765 | struct GNUNET_DHT_GetMessage *get_msg; | 935 | struct GNUNET_DHT_GetMessage get_msg; |
766 | 936 | ||
767 | if (handle->current != NULL) /* Can't send right now, we have a pending message... */ | 937 | if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */ |
768 | return NULL; | 938 | return NULL; |
769 | 939 | ||
770 | get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle)); | 940 | get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle)); |
@@ -777,14 +947,14 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, | |||
777 | GNUNET_h2s (key)); | 947 | GNUNET_h2s (key)); |
778 | #endif | 948 | #endif |
779 | 949 | ||
780 | get_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetMessage)); | 950 | get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET); |
781 | get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET); | 951 | get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage)); |
782 | get_msg->header.size = htons (sizeof (struct GNUNET_DHT_GetMessage)); | 952 | get_msg.type = htons (type); |
783 | get_msg->type = htons (type); | ||
784 | 953 | ||
785 | get_handle->route_handle = | 954 | get_handle->route_handle = |
786 | GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg->header, timeout, | 955 | GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout, |
787 | &get_reply_iterator, get_handle, cont, cont_cls); | 956 | &get_reply_iterator, get_handle, cont, cont_cls); |
957 | |||
788 | return get_handle; | 958 | return get_handle; |
789 | } | 959 | } |
790 | 960 | ||
@@ -821,10 +991,23 @@ GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, | |||
821 | pending->timeout = DEFAULT_DHT_TIMEOUT; | 991 | pending->timeout = DEFAULT_DHT_TIMEOUT; |
822 | pending->cont = cont; | 992 | pending->cont = cont; |
823 | pending->cont_cls = cont_cls; | 993 | pending->cont_cls = cont_cls; |
824 | pending->unique_id = route_handle->uid; | 994 | pending->unique_id = 0; /* When finished is called, free pending->msg */ |
825 | GNUNET_assert (route_handle->dht_handle->current == NULL); | 995 | |
826 | route_handle->dht_handle->current = pending; | 996 | if (route_handle->dht_handle->current == NULL) |
827 | process_pending_message (route_handle->dht_handle); | 997 | { |
998 | route_handle->dht_handle->current = pending; | ||
999 | process_pending_message (route_handle->dht_handle); | ||
1000 | } | ||
1001 | else if ((route_handle->dht_handle->current != NULL) && (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING)) | ||
1002 | { | ||
1003 | route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; | ||
1004 | route_handle->dht_handle->retransmission_buffer = pending; | ||
1005 | } | ||
1006 | else | ||
1007 | { | ||
1008 | GNUNET_break(0); | ||
1009 | } | ||
1010 | |||
828 | hash_from_uid (route_handle->uid, &uid_key); | 1011 | hash_from_uid (route_handle->uid, &uid_key); |
829 | GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove | 1012 | GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove |
830 | (route_handle->dht_handle->outstanding_requests, &uid_key, | 1013 | (route_handle->dht_handle->outstanding_requests, &uid_key, |
@@ -843,6 +1026,17 @@ void | |||
843 | GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, | 1026 | GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, |
844 | GNUNET_SCHEDULER_Task cont, void *cont_cls) | 1027 | GNUNET_SCHEDULER_Task cont, void *cont_cls) |
845 | { | 1028 | { |
1029 | if ((get_handle->route_handle->dht_handle->current != NULL) && | ||
1030 | (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING)) | ||
1031 | { | ||
1032 | if (cont != NULL) | ||
1033 | { | ||
1034 | GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls, | ||
1035 | GNUNET_SCHEDULER_REASON_TIMEOUT); | ||
1036 | } | ||
1037 | return; | ||
1038 | } | ||
1039 | |||
846 | #if DEBUG_DHT_API | 1040 | #if DEBUG_DHT_API |
847 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1041 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
848 | "`%s': Removing pending get request with key %s, uid %llu\n", | 1042 | "`%s': Removing pending get request with key %s, uid %llu\n", |
@@ -880,9 +1074,9 @@ GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle, | |||
880 | void *cont_cls) | 1074 | void *cont_cls) |
881 | { | 1075 | { |
882 | struct GNUNET_DHT_FindPeerHandle *find_peer_handle; | 1076 | struct GNUNET_DHT_FindPeerHandle *find_peer_handle; |
883 | struct GNUNET_MessageHeader *find_peer_msg; | 1077 | struct GNUNET_MessageHeader find_peer_msg; |
884 | 1078 | ||
885 | if (handle->current != NULL) /* Can't send right now, we have a pending message... */ | 1079 | if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */ |
886 | return NULL; | 1080 | return NULL; |
887 | 1081 | ||
888 | find_peer_handle = | 1082 | find_peer_handle = |
@@ -896,11 +1090,10 @@ GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle, | |||
896 | "FIND PEER", GNUNET_h2s (key)); | 1090 | "FIND PEER", GNUNET_h2s (key)); |
897 | #endif | 1091 | #endif |
898 | 1092 | ||
899 | find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_MessageHeader)); | 1093 | find_peer_msg.size = htons(sizeof(struct GNUNET_MessageHeader)); |
900 | find_peer_msg->size = htons(sizeof(struct GNUNET_MessageHeader)); | 1094 | find_peer_msg.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); |
901 | find_peer_msg->type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); | ||
902 | find_peer_handle->route_handle = | 1095 | find_peer_handle->route_handle = |
903 | GNUNET_DHT_route_start (handle, key, 0, options, find_peer_msg, | 1096 | GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg, |
904 | timeout, &find_peer_reply_iterator, | 1097 | timeout, &find_peer_reply_iterator, |
905 | find_peer_handle, cont, cont_cls); | 1098 | find_peer_handle, cont, cont_cls); |
906 | return find_peer_handle; | 1099 | return find_peer_handle; |
@@ -917,6 +1110,17 @@ void | |||
917 | GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, | 1110 | GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, |
918 | GNUNET_SCHEDULER_Task cont, void *cont_cls) | 1111 | GNUNET_SCHEDULER_Task cont, void *cont_cls) |
919 | { | 1112 | { |
1113 | if ((find_peer_handle->route_handle->dht_handle->current != NULL) && | ||
1114 | (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING)) | ||
1115 | { | ||
1116 | if (cont != NULL) | ||
1117 | { | ||
1118 | GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls, | ||
1119 | GNUNET_SCHEDULER_REASON_TIMEOUT); | ||
1120 | } | ||
1121 | return; | ||
1122 | } | ||
1123 | |||
920 | #if DEBUG_DHT_API | 1124 | #if DEBUG_DHT_API |
921 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1125 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
922 | "`%s': Removing pending `%s' request with key %s, uid %llu\n", | 1126 | "`%s': Removing pending `%s' request with key %s, uid %llu\n", |
@@ -958,12 +1162,16 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, | |||
958 | GNUNET_SCHEDULER_Task cont, void *cont_cls) | 1162 | GNUNET_SCHEDULER_Task cont, void *cont_cls) |
959 | { | 1163 | { |
960 | struct GNUNET_DHT_PutMessage *put_msg; | 1164 | struct GNUNET_DHT_PutMessage *put_msg; |
1165 | struct GNUNET_DHT_RouteHandle *put_route; | ||
961 | size_t msize; | 1166 | size_t msize; |
962 | 1167 | ||
963 | if (handle->current != NULL) | 1168 | if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) |
964 | { | 1169 | { |
965 | GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, | 1170 | if (cont != NULL) |
966 | GNUNET_SCHEDULER_REASON_TIMEOUT); | 1171 | { |
1172 | GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, | ||
1173 | GNUNET_SCHEDULER_REASON_TIMEOUT); | ||
1174 | } | ||
967 | return; | 1175 | return; |
968 | } | 1176 | } |
969 | 1177 | ||
@@ -982,8 +1190,19 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, | |||
982 | put_msg->expiration = GNUNET_TIME_absolute_hton(exp); | 1190 | put_msg->expiration = GNUNET_TIME_absolute_hton(exp); |
983 | memcpy (&put_msg[1], data, size); | 1191 | memcpy (&put_msg[1], data, size); |
984 | 1192 | ||
985 | GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL, | 1193 | put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL, |
986 | NULL, cont, cont_cls); | 1194 | NULL, cont, cont_cls); |
1195 | |||
1196 | if (put_route == NULL) /* Route start failed! */ | ||
1197 | { | ||
1198 | if (cont != NULL) | ||
1199 | { | ||
1200 | GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls, | ||
1201 | GNUNET_SCHEDULER_REASON_TIMEOUT); | ||
1202 | } | ||
1203 | } | ||
1204 | else | ||
1205 | GNUNET_free(put_route); | ||
987 | 1206 | ||
988 | GNUNET_free (put_msg); | 1207 | GNUNET_free (put_msg); |
989 | } | 1208 | } |
diff --git a/src/dht/gnunet-dht-get-peer.c b/src/dht/gnunet-dht-get-peer.c index 3b7f7a4e8..e3f6116cd 100644 --- a/src/dht/gnunet-dht-get-peer.c +++ b/src/dht/gnunet-dht-get-peer.c | |||
@@ -101,17 +101,18 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
101 | * operation | 101 | * operation |
102 | * | 102 | * |
103 | * @param cls closure (NULL) | 103 | * @param cls closure (NULL) |
104 | * @param peer the peer we learned about | 104 | * @param hello the response message, a HELLO |
105 | * @param reply the response message, should be a HELLO | ||
106 | */ | 105 | */ |
107 | void find_peer_processor (void *cls, | 106 | void find_peer_processor (void *cls, |
108 | const struct GNUNET_PeerIdentity *peer, | 107 | const struct GNUNET_HELLO_Message *hello) |
109 | const struct GNUNET_MessageHeader *reply) | ||
110 | { | 108 | { |
111 | result_count++; | 109 | struct GNUNET_PeerIdentity peer; |
112 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 110 | if (GNUNET_OK == GNUNET_HELLO_get_id(hello, &peer)) |
113 | "test_find_peer_processor called (peer `%s'), total results %d!\n", GNUNET_i2s(peer), result_count); | 111 | { |
114 | 112 | result_count++; | |
113 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
114 | "test_find_peer_processor called (peer `%s'), total results %d!\n", GNUNET_i2s(&peer), result_count); | ||
115 | } | ||
115 | } | 116 | } |
116 | 117 | ||
117 | 118 | ||
@@ -191,8 +192,14 @@ run (void *cls, | |||
191 | if (verbose) | 192 | if (verbose) |
192 | fprintf (stderr, "Issuing FIND PEER request for %s!\n", query_key); | 193 | fprintf (stderr, "Issuing FIND PEER request for %s!\n", query_key); |
193 | 194 | ||
194 | find_peer_handle = GNUNET_DHT_find_peer_start (dht_handle, timeout, 0, NULL, &key, | 195 | find_peer_handle = GNUNET_DHT_find_peer_start (dht_handle, |
195 | &find_peer_processor, NULL, &message_sent_cont, NULL); | 196 | timeout, |
197 | 0, | ||
198 | &key, | ||
199 | &find_peer_processor, | ||
200 | NULL, | ||
201 | &message_sent_cont, | ||
202 | NULL); | ||
196 | 203 | ||
197 | } | 204 | } |
198 | 205 | ||
diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index 8907c3642..27c431bd5 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c | |||
@@ -166,7 +166,7 @@ struct DHT_MessageContext | |||
166 | /** | 166 | /** |
167 | * The key this request was about | 167 | * The key this request was about |
168 | */ | 168 | */ |
169 | GNUNET_HashCode *key; | 169 | const GNUNET_HashCode *key; |
170 | 170 | ||
171 | /** | 171 | /** |
172 | * The unique identifier of this request | 172 | * The unique identifier of this request |
@@ -240,9 +240,6 @@ send_generic_reply (void *cls, size_t size, void *buf) | |||
240 | if (buf == NULL) | 240 | if (buf == NULL) |
241 | { | 241 | { |
242 | /* client disconnected */ | 242 | /* client disconnected */ |
243 | #if DEBUG_DHT | ||
244 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': buffer was NULL\n", "DHT"); | ||
245 | #endif | ||
246 | return 0; | 243 | return 0; |
247 | } | 244 | } |
248 | off = 0; | 245 | off = 0; |
@@ -256,10 +253,6 @@ send_generic_reply (void *cls, size_t size, void *buf) | |||
256 | GNUNET_free (reply); | 253 | GNUNET_free (reply); |
257 | off += msize; | 254 | off += msize; |
258 | } | 255 | } |
259 | #if DEBUG_DHT | ||
260 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
261 | "`%s': Copying reply to buffer, REALLY SENT\n", "DHT"); | ||
262 | #endif | ||
263 | process_pending_messages (client); | 256 | process_pending_messages (client); |
264 | return off; | 257 | return off; |
265 | } | 258 | } |
@@ -284,7 +277,7 @@ add_pending_message (struct ClientList *client, | |||
284 | 277 | ||
285 | 278 | ||
286 | /** | 279 | /** |
287 | * Called when a reply needs to be sent to a client, either as | 280 | * Called when a reply needs to be sent to a client, as |
288 | * a result it found to a GET or FIND PEER request. | 281 | * a result it found to a GET or FIND PEER request. |
289 | * | 282 | * |
290 | * @param client the client to send the reply to | 283 | * @param client the client to send the reply to |
@@ -296,7 +289,7 @@ send_reply_to_client (struct ClientList *client, | |||
296 | const struct GNUNET_MessageHeader *message, | 289 | const struct GNUNET_MessageHeader *message, |
297 | unsigned long long uid) | 290 | unsigned long long uid) |
298 | { | 291 | { |
299 | struct GNUNET_DHT_Message *reply; | 292 | struct GNUNET_DHT_RouteResultMessage *reply; |
300 | struct PendingMessage *pending_message; | 293 | struct PendingMessage *pending_message; |
301 | uint16_t msize; | 294 | uint16_t msize; |
302 | size_t tsize; | 295 | size_t tsize; |
@@ -305,21 +298,21 @@ send_reply_to_client (struct ClientList *client, | |||
305 | "`%s': Sending reply to client.\n", "DHT"); | 298 | "`%s': Sending reply to client.\n", "DHT"); |
306 | #endif | 299 | #endif |
307 | msize = ntohs (message->size); | 300 | msize = ntohs (message->size); |
308 | tsize = sizeof (struct GNUNET_DHT_Message) + msize; | 301 | tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize; |
309 | if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | 302 | if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) |
310 | { | 303 | { |
311 | GNUNET_BREAK_op (0); | 304 | GNUNET_break_op (0); |
312 | return; | 305 | return; |
313 | } | 306 | } |
314 | reply = GNUNET_malloc (tsize); | 307 | |
308 | pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize); | ||
309 | pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1]; | ||
310 | reply = (struct GNUNET_DHT_RouteResultMessage *)&pending_message[1]; | ||
315 | reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT); | 311 | reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT); |
316 | reply->header.size = htons (tsize); | 312 | reply->header.size = htons (tsize); |
317 | if (uid != 0) | ||
318 | reply->unique = htonl (GNUNET_YES); // ???? | ||
319 | reply->unique_id = GNUNET_htonll (uid); | 313 | reply->unique_id = GNUNET_htonll (uid); |
320 | memcpy (&reply[1], message, msize); | 314 | memcpy (&reply[1], message, msize); |
321 | pending_message = GNUNET_malloc (sizeof (struct PendingMessage)); // inline | 315 | |
322 | pending_message->msg = &reply->header; | ||
323 | add_pending_message (client, pending_message); | 316 | add_pending_message (client, pending_message); |
324 | } | 317 | } |
325 | 318 | ||
@@ -354,7 +347,6 @@ datacache_get_iterator (void *cls, | |||
354 | get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT); | 347 | get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT); |
355 | get_result->header.size = | 348 | get_result->header.size = |
356 | htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size); | 349 | htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size); |
357 | get_result->data_size = htons (size); | ||
358 | get_result->expiration = exp; | 350 | get_result->expiration = exp; |
359 | memcpy (&get_result->key, key, sizeof (GNUNET_HashCode)); | 351 | memcpy (&get_result->key, key, sizeof (GNUNET_HashCode)); |
360 | get_result->type = htons (type); | 352 | get_result->type = htons (type); |
@@ -383,12 +375,13 @@ handle_dht_get (void *cls, | |||
383 | unsigned int results; | 375 | unsigned int results; |
384 | struct DatacacheGetContext datacache_get_context; | 376 | struct DatacacheGetContext datacache_get_context; |
385 | 377 | ||
386 | if (ntohs (msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage)) | 378 | get_msg = (const struct GNUNET_DHT_GetMessage *) msg; |
379 | if (ntohs (get_msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage)) | ||
387 | { | 380 | { |
388 | GNUNET_break (0); | 381 | GNUNET_break (0); |
389 | return; | 382 | return; |
390 | } | 383 | } |
391 | get_msg = (const struct GNUNET_DHT_GetMessage *) msg; | 384 | |
392 | get_type = ntohs (get_msg->type); | 385 | get_type = ntohs (get_msg->type); |
393 | #if DEBUG_DHT | 386 | #if DEBUG_DHT |
394 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 387 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -422,7 +415,7 @@ handle_dht_find_peer (void *cls, | |||
422 | const struct GNUNET_MessageHeader *find_msg, | 415 | const struct GNUNET_MessageHeader *find_msg, |
423 | struct DHT_MessageContext *message_context) | 416 | struct DHT_MessageContext *message_context) |
424 | { | 417 | { |
425 | struct GNUNET_DHT_FindPeerResultMessage *find_peer_result; | 418 | struct GNUNET_MessageHeader *find_peer_result; |
426 | size_t hello_size; | 419 | size_t hello_size; |
427 | size_t tsize; | 420 | size_t tsize; |
428 | 421 | ||
@@ -430,8 +423,8 @@ handle_dht_find_peer (void *cls, | |||
430 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 423 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
431 | "`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n", | 424 | "`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n", |
432 | "DHT", "FIND PEER", GNUNET_h2s (message_context->key), | 425 | "DHT", "FIND PEER", GNUNET_h2s (message_context->key), |
433 | ntohs (find_msg->header.size), | 426 | ntohs (find_msg->size), |
434 | sizeof (struct GNUNET_DHT_FindPeerMessage)); | 427 | sizeof (struct GNUNET_MessageHeader)); |
435 | #endif | 428 | #endif |
436 | if (my_hello == NULL) | 429 | if (my_hello == NULL) |
437 | { | 430 | { |
@@ -444,13 +437,18 @@ handle_dht_find_peer (void *cls, | |||
444 | } | 437 | } |
445 | /* Simplistic find_peer functionality, always return our hello */ | 438 | /* Simplistic find_peer functionality, always return our hello */ |
446 | hello_size = ntohs(my_hello->size); | 439 | hello_size = ntohs(my_hello->size); |
447 | tsize = hello_size + sizeof (struct GNUNET_DHT_FindPeerResultMessage); | 440 | tsize = hello_size + sizeof (struct GNUNET_MessageHeader); |
448 | // check tsize < MAX | 441 | // check tsize < MAX |
449 | find_peer_result = GNUNET_malloc (tsize); | 442 | find_peer_result = GNUNET_malloc (tsize); |
450 | find_peer_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT); | 443 | find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT); |
451 | find_peer_result->header.size = htons (tsize); | 444 | find_peer_result->size = htons (tsize); |
452 | memcpy (&find_peer_result[1], &my_hello, hello_size); | 445 | memcpy (&find_peer_result[1], my_hello, hello_size); |
453 | send_reply_to_client(message_context->client, &find_peer_result->header, message_context->unique_id); | 446 | #if DEBUG_DHT_HELLO |
447 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
448 | "`%s': Sending hello size %d to client.\n", | ||
449 | "DHT", hello_size); | ||
450 | #endif | ||
451 | send_reply_to_client(message_context->client, find_peer_result, message_context->unique_id); | ||
454 | GNUNET_free(find_peer_result); | 452 | GNUNET_free(find_peer_result); |
455 | } | 453 | } |
456 | 454 | ||
@@ -471,24 +469,24 @@ handle_dht_put (void *cls, | |||
471 | size_t put_type; | 469 | size_t put_type; |
472 | size_t data_size; | 470 | size_t data_size; |
473 | 471 | ||
474 | GNUNET_assert (ntohs (msg->header.size) >= | 472 | GNUNET_assert (ntohs (msg->size) >= |
475 | sizeof (struct GNUNET_DHT_PutMessage)); | 473 | sizeof (struct GNUNET_DHT_PutMessage)); |
476 | put_msg = (struct GNUNET_DHT_PutMessage *)msg; | 474 | put_msg = (struct GNUNET_DHT_PutMessage *)msg; |
477 | put_type = ntohl (put_msg->type); | 475 | put_type = ntohs (put_msg->type); |
478 | data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage); | 476 | data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage); |
479 | #if DEBUG_DHT | 477 | #if DEBUG_DHT |
480 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 478 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
481 | "`%s': %s msg total size is %d, data size %d, struct size %d\n", | ||
482 | "DHT", "PUT", ntohs (put_msg->header.size), data_size, | ||
483 | sizeof (struct GNUNET_DHT_PutMessage)); | ||
484 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
485 | "`%s': Received `%s' request from client, message type %d, key %s\n", | 479 | "`%s': Received `%s' request from client, message type %d, key %s\n", |
486 | "DHT", "PUT", put_type, GNUNET_h2s (message_context->key)); | 480 | "DHT", "PUT", put_type, GNUNET_h2s (message_context->key)); |
487 | #endif | 481 | #endif |
488 | if (datacache != NULL) | 482 | if (datacache != NULL) |
489 | GNUNET_DATACACHE_put (datacache, message_context->key, data_size, | 483 | GNUNET_DATACACHE_put (datacache, message_context->key, data_size, |
490 | (char *) &put_msg[1], put_type, | 484 | (char *) &put_msg[1], put_type, |
491 | put_msg->expiration); | 485 | GNUNET_TIME_absolute_ntoh(put_msg->expiration)); |
486 | else | ||
487 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
488 | "`%s': %s request received locally, but have no datacache!\n", | ||
489 | "DHT", "PUT"); | ||
492 | } | 490 | } |
493 | 491 | ||
494 | 492 | ||
@@ -520,41 +518,6 @@ find_active_client (struct GNUNET_SERVER_Client *client) | |||
520 | } | 518 | } |
521 | 519 | ||
522 | /** | 520 | /** |
523 | * Construct a message receipt confirmation for a particular uid. | ||
524 | * Receipt confirmations are used for any requests that don't expect | ||
525 | * a reply otherwise (i.e. put requests, stop requests). | ||
526 | * | ||
527 | * @param client the handle for the client | ||
528 | * @param uid the unique identifier of this message | ||
529 | */ | ||
530 | static void | ||
531 | send_client_receipt_confirmation (struct GNUNET_SERVER_Client *client, | ||
532 | uint64_t uid) | ||
533 | { | ||
534 | struct GNUNET_DHT_StopMessage *confirm_message; | ||
535 | struct ClientList *active_client; | ||
536 | struct PendingMessage *pending_message; | ||
537 | |||
538 | #if DEBUG_DHT | ||
539 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
540 | "`%s': Sending receipt confirmation for uid %llu\n", "DHT", | ||
541 | uid); | ||
542 | #endif | ||
543 | confirm_message = GNUNET_malloc (sizeof (struct GNUNET_DHT_StopMessage)); | ||
544 | confirm_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP); | ||
545 | confirm_message->header.size = | ||
546 | htons (sizeof (struct GNUNET_DHT_StopMessage)); | ||
547 | confirm_message->unique_id = GNUNET_htonll (uid); | ||
548 | |||
549 | active_client = find_active_client (client); | ||
550 | pending_message = GNUNET_malloc (sizeof (struct PendingMessage)); | ||
551 | pending_message->msg = &confirm_message->header; | ||
552 | |||
553 | add_pending_message (active_client, pending_message); | ||
554 | |||
555 | } | ||
556 | |||
557 | /** | ||
558 | * Handler for any generic DHT messages, calls the appropriate handler | 521 | * Handler for any generic DHT messages, calls the appropriate handler |
559 | * depending on message type, sends confirmation if responses aren't otherwise | 522 | * depending on message type, sends confirmation if responses aren't otherwise |
560 | * expected. | 523 | * expected. |
@@ -567,9 +530,10 @@ static void | |||
567 | handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, | 530 | handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, |
568 | const struct GNUNET_MessageHeader *message) | 531 | const struct GNUNET_MessageHeader *message) |
569 | { | 532 | { |
570 | const struct GNUNET_DHT_Message *dht_msg = (const struct GNUNET_DHT_Message *) message; | 533 | const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message; |
571 | const struct GNUNET_MessageHeader *enc_msg; | 534 | const struct GNUNET_MessageHeader *enc_msg; |
572 | struct DHT_MessageContext *message_context; | 535 | struct DHT_MessageContext *message_context; |
536 | int handle_locally; | ||
573 | size_t enc_type; | 537 | size_t enc_type; |
574 | 538 | ||
575 | enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1]; | 539 | enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1]; |
@@ -590,28 +554,37 @@ handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
590 | message_context->replication = ntohl (dht_msg->desired_replication_level); | 554 | message_context->replication = ntohl (dht_msg->desired_replication_level); |
591 | message_context->msg_options = ntohl (dht_msg->options); | 555 | message_context->msg_options = ntohl (dht_msg->options); |
592 | 556 | ||
557 | /* TODO: Steps to be added by students */ | ||
593 | /* FIXME: Implement *remote* DHT operations here (forward request) */ | 558 | /* FIXME: Implement *remote* DHT operations here (forward request) */ |
559 | /* Implement generic route function and call here. */ | ||
594 | /* FIXME: *IF* handling should be local, then do this: */ | 560 | /* FIXME: *IF* handling should be local, then do this: */ |
595 | switch (enc_type) | 561 | /* 1. find if this peer is closest based on whatever metric the DHT uses |
562 | * 2. if this peer is closest _OR_ the message options indicate it should | ||
563 | * be processed everywhere _AND_ we want it processed everywhere, then | ||
564 | * handle it locally. | ||
565 | */ | ||
566 | handle_locally = GNUNET_YES; | ||
567 | if (handle_locally == GNUNET_YES) | ||
596 | { | 568 | { |
597 | case GNUNET_MESSAGE_TYPE_DHT_GET: | 569 | switch (enc_type) |
598 | handle_dht_get (cls, enc_msg, | 570 | { |
599 | message_context); | 571 | case GNUNET_MESSAGE_TYPE_DHT_GET: |
600 | break; | 572 | handle_dht_get (cls, enc_msg, |
601 | case GNUNET_MESSAGE_TYPE_DHT_PUT: | 573 | message_context); |
602 | handle_dht_put (cls, enc_msg, | 574 | break; |
603 | message_context); | 575 | case GNUNET_MESSAGE_TYPE_DHT_PUT: |
604 | send_client_receipt_confirmation (client, | 576 | handle_dht_put (cls, enc_msg, |
605 | GNUNET_ntohll (dht_msg->unique_id)); | 577 | message_context); |
606 | break; | 578 | break; |
607 | case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: | 579 | case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: |
608 | handle_dht_find_peer (cls, | 580 | handle_dht_find_peer (cls, |
609 | enc_msg, | 581 | enc_msg, |
610 | message_context); | 582 | message_context); |
611 | break; | 583 | break; |
612 | default: | 584 | default: |
613 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 585 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
614 | "`%s': Message type (%d) not handled\n", "DHT", enc_type); | 586 | "`%s': Message type (%d) not handled\n", "DHT", enc_type); |
587 | } | ||
615 | } | 588 | } |
616 | GNUNET_free (message_context); | 589 | GNUNET_free (message_context); |
617 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 590 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
@@ -620,14 +593,14 @@ handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
620 | 593 | ||
621 | /** | 594 | /** |
622 | * Handler for any generic DHT stop messages, calls the appropriate handler | 595 | * Handler for any generic DHT stop messages, calls the appropriate handler |
623 | * depending on message type, sends confirmation by default (stop messages | 596 | * depending on message type (if processed locally) |
624 | * do not otherwise expect replies) | ||
625 | * | 597 | * |
626 | * @param cls closure for the service | 598 | * @param cls closure for the service |
627 | * @param client the client we received this message from | 599 | * @param client the client we received this message from |
628 | * @param message the actual message received | 600 | * @param message the actual message received |
629 | * | 601 | * |
630 | * TODO: add demultiplexing for stop message types. | 602 | * TODO: once message are remembered by unique id, add code to |
603 | * forget them here | ||
631 | */ | 604 | */ |
632 | static void | 605 | static void |
633 | handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client, | 606 | handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client, |
@@ -635,13 +608,17 @@ handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
635 | { | 608 | { |
636 | const struct GNUNET_DHT_StopMessage *dht_stop_msg = | 609 | const struct GNUNET_DHT_StopMessage *dht_stop_msg = |
637 | (const struct GNUNET_DHT_StopMessage *) message; | 610 | (const struct GNUNET_DHT_StopMessage *) message; |
638 | 611 | uint64_t uid; | |
639 | #if DEBUG_DHT | 612 | #if DEBUG_DHT |
640 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 613 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
641 | "`%s': Received `%s' request from client, uid %llu\n", "DHT", | 614 | "`%s': Received `%s' request from client, uid %llu\n", "DHT", |
642 | "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id)); | 615 | "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id)); |
643 | #endif | 616 | #endif |
644 | /* TODO: actually stop... */ | 617 | |
618 | uid = GNUNET_ntohll(dht_stop_msg->unique_id); | ||
619 | /* TODO: actually stop... free associated resources for the request | ||
620 | * lookup request by uid and remove state. */ | ||
621 | |||
645 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 622 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
646 | } | 623 | } |
647 | 624 | ||
@@ -767,14 +744,14 @@ core_init (void *cls, | |||
767 | 744 | ||
768 | 745 | ||
769 | static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { | 746 | static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { |
770 | {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0}, | 747 | {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT_ROUTE, 0}, |
771 | {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0}, | 748 | {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0}, |
772 | {NULL, NULL, 0, 0} | 749 | {NULL, NULL, 0, 0} |
773 | }; | 750 | }; |
774 | 751 | ||
775 | 752 | ||
776 | static struct GNUNET_CORE_MessageHandler core_handlers[] = { | 753 | static struct GNUNET_CORE_MessageHandler core_handlers[] = { |
777 | {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_ROUTE_REQUEST, 0}, | 754 | {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_ROUTE, 0}, |
778 | {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT, 0}, | 755 | {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT, 0}, |
779 | {NULL, 0, 0} | 756 | {NULL, 0, 0} |
780 | }; | 757 | }; |
@@ -803,7 +780,6 @@ run (void *cls, | |||
803 | GNUNET_TIME_UNIT_FOREVER_REL, | 780 | GNUNET_TIME_UNIT_FOREVER_REL, |
804 | NULL, /* FIXME: anything we want to pass around? */ | 781 | NULL, /* FIXME: anything we want to pass around? */ |
805 | &core_init, /* Call core_init once connected */ | 782 | &core_init, /* Call core_init once connected */ |
806 | NULL, /* Don't care about pre-connects */ | ||
807 | NULL, /* Don't care about connects */ | 783 | NULL, /* Don't care about connects */ |
808 | NULL, /* Don't care about disconnects */ | 784 | NULL, /* Don't care about disconnects */ |
809 | NULL, /* Don't want notified about all incoming messages */ | 785 | NULL, /* Don't want notified about all incoming messages */ |
diff --git a/src/dht/test_dht_api.c b/src/dht/test_dht_api.c index f99e84269..10db5f45a 100644 --- a/src/dht/test_dht_api.c +++ b/src/dht/test_dht_api.c | |||
@@ -32,6 +32,7 @@ | |||
32 | #include "gnunet_program_lib.h" | 32 | #include "gnunet_program_lib.h" |
33 | #include "gnunet_scheduler_lib.h" | 33 | #include "gnunet_scheduler_lib.h" |
34 | #include "gnunet_dht_service.h" | 34 | #include "gnunet_dht_service.h" |
35 | #include "gnunet_hello_lib.h" | ||
35 | 36 | ||
36 | #define VERBOSE GNUNET_NO | 37 | #define VERBOSE GNUNET_NO |
37 | 38 | ||
@@ -90,6 +91,8 @@ struct PeerContext | |||
90 | 91 | ||
91 | static struct PeerContext p1; | 92 | static struct PeerContext p1; |
92 | 93 | ||
94 | struct RetryContext retry_context; | ||
95 | |||
93 | static struct GNUNET_SCHEDULER_Handle *sched; | 96 | static struct GNUNET_SCHEDULER_Handle *sched; |
94 | 97 | ||
95 | static int ok; | 98 | static int ok; |
@@ -146,7 +149,10 @@ end_badly () | |||
146 | #if VERBOSE | 149 | #if VERBOSE |
147 | fprintf (stderr, "Ending on an unhappy note.\n"); | 150 | fprintf (stderr, "Ending on an unhappy note.\n"); |
148 | #endif | 151 | #endif |
149 | 152 | if (retry_context.peer_ctx->find_peer_handle != NULL) | |
153 | GNUNET_DHT_find_peer_stop(retry_context.peer_ctx->find_peer_handle, NULL, NULL); | ||
154 | if (retry_context.retry_task != GNUNET_SCHEDULER_NO_TASK) | ||
155 | GNUNET_SCHEDULER_cancel(sched, retry_context.retry_task); | ||
150 | GNUNET_DHT_disconnect (p1.dht_handle); | 156 | GNUNET_DHT_disconnect (p1.dht_handle); |
151 | 157 | ||
152 | ok = 1; | 158 | ok = 1; |
@@ -186,21 +192,41 @@ test_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
186 | * @param reply response | 192 | * @param reply response |
187 | */ | 193 | */ |
188 | void test_find_peer_processor (void *cls, | 194 | void test_find_peer_processor (void *cls, |
189 | const struct GNUNET_PeerIdentity *peer, | 195 | const struct GNUNET_HELLO_Message *hello) |
190 | const struct GNUNET_MessageHeader *reply) | ||
191 | { | 196 | { |
192 | struct RetryContext *retry_ctx = cls; | 197 | struct RetryContext *retry_ctx = cls; |
198 | struct GNUNET_PeerIdentity peer; | ||
193 | 199 | ||
194 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 200 | if (GNUNET_OK == GNUNET_HELLO_get_id(hello, &peer)) |
195 | "test_find_peer_processor called (peer `%s'), stopping find peer request!\n", GNUNET_i2s(peer)); | 201 | { |
202 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
203 | "test_find_peer_processor called (peer `%s'), stopping find peer request!\n", GNUNET_i2s(&peer)); | ||
196 | 204 | ||
197 | if (retry_ctx->retry_task != GNUNET_SCHEDULER_NO_TASK) | 205 | if (retry_ctx->retry_task != GNUNET_SCHEDULER_NO_TASK) |
198 | GNUNET_SCHEDULER_cancel(sched, retry_ctx->retry_task); | 206 | { |
207 | GNUNET_SCHEDULER_cancel(sched, retry_ctx->retry_task); | ||
208 | retry_ctx->retry_task = GNUNET_SCHEDULER_NO_TASK; | ||
209 | } | ||
210 | |||
211 | GNUNET_SCHEDULER_add_continuation (sched, &test_find_peer_stop, &p1, | ||
212 | GNUNET_SCHEDULER_REASON_PREREQ_DONE); | ||
213 | } | ||
214 | else | ||
215 | { | ||
216 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
217 | "received find peer request, but hello_get_id failed!\n"); | ||
218 | } | ||
199 | 219 | ||
200 | GNUNET_SCHEDULER_add_continuation (sched, &test_find_peer_stop, &p1, | ||
201 | GNUNET_SCHEDULER_REASON_PREREQ_DONE); | ||
202 | } | 220 | } |
203 | 221 | ||
222 | /** | ||
223 | * Retry the find_peer task on timeout. (Forward declaration) | ||
224 | * | ||
225 | * @param cls closure | ||
226 | * @param tc context information (why was this task triggered now?) | ||
227 | */ | ||
228 | void | ||
229 | retry_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
204 | 230 | ||
205 | /** | 231 | /** |
206 | * Retry the find_peer task on timeout. | 232 | * Retry the find_peer task on timeout. |
@@ -219,9 +245,9 @@ retry_find_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
219 | { | 245 | { |
220 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 246 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
221 | "test_find_peer timed out, retrying!\n"); | 247 | "test_find_peer timed out, retrying!\n"); |
222 | 248 | retry_ctx->next_timeout = GNUNET_TIME_relative_multiply(retry_ctx->next_timeout, 2); | |
223 | retry_ctx->peer_ctx->find_peer_handle = | 249 | retry_ctx->peer_ctx->find_peer_handle = |
224 | GNUNET_DHT_find_peer_start (retry_ctx->peer_ctx->dht_handle, retry_ctx->next_timeout, 0, NULL, &hash, | 250 | GNUNET_DHT_find_peer_start (retry_ctx->peer_ctx->dht_handle, retry_ctx->next_timeout, 0, &hash, |
225 | &test_find_peer_processor, retry_ctx, NULL, NULL); | 251 | &test_find_peer_processor, retry_ctx, NULL, NULL); |
226 | } | 252 | } |
227 | else | 253 | else |
@@ -235,14 +261,14 @@ retry_find_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
235 | if (retry_ctx->peer_ctx->find_peer_handle == NULL) | 261 | if (retry_ctx->peer_ctx->find_peer_handle == NULL) |
236 | GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1); | 262 | GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1); |
237 | else | 263 | else |
238 | retry_ctx->retry_task = GNUNET_SCHEDULER_add_delayed(sched, retry_ctx->next_timeout, &retry_find_peer, retry_ctx); | 264 | retry_ctx->retry_task = GNUNET_SCHEDULER_add_delayed(sched, retry_ctx->next_timeout, &retry_find_peer_stop, retry_ctx); |
239 | } | 265 | } |
240 | 266 | ||
241 | /** | 267 | /** |
242 | * Retry the find_peer task on timeout. | 268 | * Retry the find_peer task on timeout. |
243 | * | 269 | * |
244 | * @param cls closure | 270 | * @param cls closure |
245 | * @param tc context information (why was this task triggered now) | 271 | * @param tc context information (why was this task triggered now?) |
246 | */ | 272 | */ |
247 | void | 273 | void |
248 | retry_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 274 | retry_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
@@ -270,24 +296,22 @@ test_find_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
270 | struct PeerContext *peer = cls; | 296 | struct PeerContext *peer = cls; |
271 | GNUNET_HashCode hash; | 297 | GNUNET_HashCode hash; |
272 | memset (&hash, 42, sizeof (GNUNET_HashCode)); | 298 | memset (&hash, 42, sizeof (GNUNET_HashCode)); |
273 | struct RetryContext *retry_ctx; | ||
274 | 299 | ||
275 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_find_peer!\n"); | 300 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_find_peer!\n"); |
276 | GNUNET_assert (peer->dht_handle != NULL); | 301 | GNUNET_assert (peer->dht_handle != NULL); |
277 | 302 | ||
278 | retry_ctx = GNUNET_malloc(sizeof(struct RetryContext)); | 303 | retry_context.real_timeout = GNUNET_TIME_relative_to_absolute(TOTAL_TIMEOUT); |
279 | retry_ctx->real_timeout = GNUNET_TIME_relative_to_absolute(TOTAL_TIMEOUT); | 304 | retry_context.next_timeout = BASE_TIMEOUT; |
280 | retry_ctx->next_timeout = BASE_TIMEOUT; | 305 | retry_context.peer_ctx = peer; |
281 | retry_ctx->peer_ctx = peer; | ||
282 | 306 | ||
283 | peer->find_peer_handle = | 307 | peer->find_peer_handle = |
284 | GNUNET_DHT_find_peer_start (peer->dht_handle, retry_ctx->next_timeout, 0, NULL, &hash, | 308 | GNUNET_DHT_find_peer_start (peer->dht_handle, retry_context.next_timeout, 0, &hash, |
285 | &test_find_peer_processor, retry_ctx, NULL, NULL); | 309 | &test_find_peer_processor, &retry_context, NULL, NULL); |
286 | 310 | ||
287 | if (peer->find_peer_handle == NULL) | 311 | if (peer->find_peer_handle == NULL) |
288 | GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1); | 312 | GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1); |
289 | else | 313 | else |
290 | retry_ctx->retry_task = GNUNET_SCHEDULER_add_delayed(sched, retry_ctx->next_timeout, &retry_find_peer_stop, retry_ctx); | 314 | retry_context.retry_task = GNUNET_SCHEDULER_add_delayed(sched, retry_context.next_timeout, &retry_find_peer_stop, &retry_context); |
291 | } | 315 | } |
292 | 316 | ||
293 | /** | 317 | /** |