aboutsummaryrefslogtreecommitdiff
path: root/src/dht
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht')
-rw-r--r--src/dht/Makefile.am8
-rw-r--r--src/dht/dht.h5
-rw-r--r--src/dht/dht_api.c1468
-rw-r--r--src/dht/gnunet-dht-get-peer.c85
-rw-r--r--src/dht/gnunet-dht-get.c69
-rw-r--r--src/dht/gnunet-dht-put.c20
6 files changed, 533 insertions, 1122 deletions
diff --git a/src/dht/Makefile.am b/src/dht/Makefile.am
index ccfc5a041..c5ae96f02 100644
--- a/src/dht/Makefile.am
+++ b/src/dht/Makefile.am
@@ -52,7 +52,7 @@ libgnunet_plugin_dhtlog_mysql_dump_la_LIBADD = \
52 $(XLIB) 52 $(XLIB)
53libgnunet_plugin_dhtlog_mysql_dump_la_LDFLAGS = \ 53libgnunet_plugin_dhtlog_mysql_dump_la_LDFLAGS = \
54 $(GN_PLUGIN_LDFLAGS) 54 $(GN_PLUGIN_LDFLAGS)
55 55
56libgnunet_plugin_dhtlog_mysql_dump_load_la_SOURCES = \ 56libgnunet_plugin_dhtlog_mysql_dump_load_la_SOURCES = \
57 plugin_dhtlog_mysql_dump_load.c 57 plugin_dhtlog_mysql_dump_load.c
58libgnunet_plugin_dhtlog_mysql_dump_load_la_LIBADD = \ 58libgnunet_plugin_dhtlog_mysql_dump_load_la_LIBADD = \
@@ -70,7 +70,9 @@ libgnunetdhtlog_la_LDFLAGS = \
70 -version-info 0:0:0 70 -version-info 0:0:0
71 71
72libgnunetdht_la_SOURCES = \ 72libgnunetdht_la_SOURCES = \
73 dht_api.c dht.h 73 dht_api.c dht.h \
74 dht_api_get_put.c \
75 dht_api_find_peer.c
74libgnunetdht_la_LIBADD = \ 76libgnunetdht_la_LIBADD = \
75 $(top_builddir)/src/util/libgnunetutil.la \ 77 $(top_builddir)/src/util/libgnunetutil.la \
76 $(XLIB) 78 $(XLIB)
@@ -92,8 +94,10 @@ bin_PROGRAMS = $(STUD_PROGS) \
92 gnunet-dht-get-peer \ 94 gnunet-dht-get-peer \
93 gnunet-dht-put 95 gnunet-dht-put
94 96
97if HAVE_MALICIOUS
95noinst_PROGRAMS = \ 98noinst_PROGRAMS = \
96 gnunet-dht-driver 99 gnunet-dht-driver
100endif
97 101
98gnunet_service_dht_SOURCES = \ 102gnunet_service_dht_SOURCES = \
99 gnunet-service-dht.c 103 gnunet-service-dht.c
diff --git a/src/dht/dht.h b/src/dht/dht.h
index 56bb29934..237b91f64 100644
--- a/src/dht/dht.h
+++ b/src/dht/dht.h
@@ -320,11 +320,6 @@ struct GNUNET_DHT_PutMessage
320 */ 320 */
321 struct GNUNET_TIME_AbsoluteNBO expiration; 321 struct GNUNET_TIME_AbsoluteNBO expiration;
322 322
323 /**
324 * The size of the data, appended to the end of this message.
325 */
326 size_t data_size GNUNET_PACKED;
327
328}; 323};
329 324
330 325
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 15faba6c9..c89a92dd5 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -23,12 +23,6 @@
23 * @brief library to access the DHT service 23 * @brief library to access the DHT service
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * @author Nathan Evans 25 * @author Nathan Evans
26 *
27 * TODO: retransmission of pending requests maybe happens now, at least
28 * the code is in place to do so. Need to add checks when api calls
29 * happen to check if retransmission is in progress, and if so set
30 * the single pending message for transmission once the list of
31 * retries are done.
32 */ 26 */
33 27
34#include "platform.h" 28#include "platform.h"
@@ -46,83 +40,68 @@
46 40
47#define DEBUG_DHT_API GNUNET_NO 41#define DEBUG_DHT_API GNUNET_NO
48 42
43/**
44 * Entry in our list of messages to be (re-)transmitted.
45 */
49struct PendingMessage 46struct PendingMessage
50{ 47{
51 /** 48 /**
52 * Message that is pending 49 * This is a doubly-linked list.
53 */ 50 */
54 struct GNUNET_MessageHeader *msg; 51 struct PendingMessage *prev;
55 52
56 /** 53 /**
57 * Timeout for this message 54 * This is a doubly-linked list.
58 */ 55 */
59 struct GNUNET_TIME_Relative timeout; 56 struct PendingMessage *next;
60 57
61 /** 58 /**
62 * Continuation to call on message send 59 * Message that is pending, allocated at the end
63 * or message receipt confirmation 60 * of this struct.
64 */ 61 */
65 GNUNET_SCHEDULER_Task cont; 62 const struct GNUNET_MessageHeader *msg;
66 63
67 /** 64 /**
68 * Continuation closure 65 * Handle to the DHT API context.
69 */ 66 */
70 void *cont_cls; 67 struct GNUNET_DHT_Handle *handle;
71 68
72 /** 69 /**
73 * Unique ID for this request 70 * Continuation to call when the request has been
71 * transmitted (for the first time) to the service; can be NULL.
74 */ 72 */
75 uint64_t unique_id; 73 GNUNET_SCHEDULER_Task cont;
76 74
77 /** 75 /**
78 * Free the saved message once sent, set 76 * Closure for 'cont'.
79 * to GNUNET_YES for messages that don't
80 * receive responses!
81 */ 77 */
82 int free_on_send; 78 void *cont_cls;
83
84};
85 79
86struct PendingMessageList
87{
88 /** 80 /**
89 * This is a singly linked list. 81 * Timeout task for this message
90 */ 82 */
91 struct PendingMessageList *next; 83 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
92 84
93 /** 85 /**
94 * The pending message. 86 * Unique ID for this request
95 */ 87 */
96 struct PendingMessage *message; 88 uint64_t unique_id;
97};
98 89
99struct GNUNET_DHT_GetContext
100{
101 /** 90 /**
102 * Iterator to call on data receipt 91 * Free the saved message once sent, set to GNUNET_YES for messages
92 * that do not receive responses; GNUNET_NO if this pending message
93 * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
94 * from there.
103 */ 95 */
104 GNUNET_DHT_GetIterator iter; 96 int free_on_send;
105 97
106 /** 98 /**
107 * Closure for the iterator callback 99 * GNUNET_YES if this message is in our pending queue right now.
108 */ 100 */
109 void *iter_cls; 101 int in_pending_queue;
110 102
111}; 103};
112 104
113struct GNUNET_DHT_FindPeerContext
114{
115 /**
116 * Iterator to call on data receipt
117 */
118 GNUNET_DHT_FindPeerProcessor proc;
119
120 /**
121 * Closure for the iterator callback
122 */
123 void *proc_cls;
124
125};
126 105
127/** 106/**
128 * Handle to a route request 107 * Handle to a route request
@@ -131,16 +110,6 @@ struct GNUNET_DHT_RouteHandle
131{ 110{
132 111
133 /** 112 /**
134 * Unique identifier for this request (for key collisions)
135 */
136 uint64_t uid;
137
138 /**
139 * Key that this get request is for
140 */
141 GNUNET_HashCode key;
142
143 /**
144 * Iterator to call on data receipt 113 * Iterator to call on data receipt
145 */ 114 */
146 GNUNET_DHT_ReplyProcessor iter; 115 GNUNET_DHT_ReplyProcessor iter;
@@ -160,62 +129,18 @@ struct GNUNET_DHT_RouteHandle
160 * used for retransmitting requests on service 129 * used for retransmitting requests on service
161 * failure/reconnect. Freed on route_stop. 130 * failure/reconnect. Freed on route_stop.
162 */ 131 */
163 struct GNUNET_DHT_RouteMessage *message; 132 struct PendingMessage *message;
164};
165
166
167/**
168 * Handle to control a get operation.
169 */
170struct GNUNET_DHT_GetHandle
171{
172 /**
173 * Handle to the actual route operation for the get
174 */
175 struct GNUNET_DHT_RouteHandle *route_handle;
176
177 /**
178 * The context of the get request
179 */
180 struct GNUNET_DHT_GetContext get_context;
181};
182
183
184/**
185 * Handle to control a find peer operation.
186 */
187struct GNUNET_DHT_FindPeerHandle
188{
189 /**
190 * Handle to the actual route operation for the request
191 */
192 struct GNUNET_DHT_RouteHandle *route_handle;
193
194 /**
195 * The context of the find peer request
196 */
197 struct GNUNET_DHT_FindPeerContext find_peer_context;
198};
199
200 133
201enum DHT_Retransmit_Stage
202{
203 /** 134 /**
204 * The API is not retransmitting anything at this time. 135 * Key that this get request is for
205 */ 136 */
206 DHT_NOT_RETRANSMITTING, 137 GNUNET_HashCode key;
207 138
208 /** 139 /**
209 * The API is retransmitting, and nothing has been single 140 * Unique identifier for this request (for key collisions)
210 * queued for sending.
211 */ 141 */
212 DHT_RETRANSMITTING, 142 uint64_t uid;
213 143
214 /**
215 * The API is retransmitting, and a single message has been
216 * queued for transmission once finished.
217 */
218 DHT_RETRANSMITTING_MESSAGE_QUEUED
219}; 144};
220 145
221 146
@@ -240,95 +165,57 @@ struct GNUNET_DHT_Handle
240 struct GNUNET_CLIENT_Connection *client; 165 struct GNUNET_CLIENT_Connection *client;
241 166
242 /** 167 /**
243 * Currently pending transmission request. 168 * Currently pending transmission request (or NULL).
244 */ 169 */
245 struct GNUNET_CLIENT_TransmitHandle *th; 170 struct GNUNET_CLIENT_TransmitHandle *th;
246 171
247 /** 172 /**
248 * Message we are currently sending, only allow 173 * Head of linked list of messages we would like to transmit.
249 * a single message to be queued. If not unique
250 * (typically a put request), await a confirmation
251 * from the service that the message was received.
252 * If unique, just fire and forget.
253 */ 174 */
254 struct PendingMessage *current; 175 struct PendingMessage *pending_head;
255 176
256 /** 177 /**
257 * Hash map containing the current outstanding unique requests 178 * Tail of linked list of messages we would like to transmit.
258 */
259 struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
260
261 /**
262 * Generator for unique ids.
263 */ 179 */
264 uint64_t uid_gen; 180 struct PendingMessage *pending_tail;
265 181
266 /** 182 /**
267 * Are we currently retransmitting requests? If so queue a _single_ 183 * Hash map containing the current outstanding unique requests
268 * new request when received. 184 * (values are of type 'struct GNUNET_DHT_RouteHandle').
269 */ 185 */
270 enum DHT_Retransmit_Stage retransmit_stage; 186 struct GNUNET_CONTAINER_MultiHashMap *active_requests;
271 187
272 /** 188 /**
273 * Linked list of retranmissions, to be used in the event 189 * Generator for unique ids.
274 * of a dht service disconnect/reconnect.
275 */ 190 */
276 struct PendingMessageList *retransmissions; 191 uint64_t uid_gen;
277 192
278 /**
279 * A single pending message allowed to be scheduled
280 * during retransmission phase.
281 */
282 struct PendingMessage *retransmission_buffer;
283}; 193};
284 194
285 195
286/** 196/**
287 * Convert unique ID to hash code. 197 * Transmit the next pending message, called by notify_transmit_ready
288 *
289 * @param uid unique ID to convert
290 * @param hash set to uid (extended with zeros)
291 */ 198 */
292static void 199static size_t
293hash_from_uid (uint64_t uid, 200transmit_pending (void *cls,
294 GNUNET_HashCode *hash) 201 size_t size,
295{ 202 void *buf);
296 memset (hash, 0, sizeof(GNUNET_HashCode)); 203
297 *((uint64_t*)hash) = uid;
298}
299 204
300#if RETRANSMIT
301/** 205/**
302 * Iterator callback to retransmit each outstanding request 206 * Handler for messages received from the DHT service
303 * because the connection to the DHT service went down (and 207 * a demultiplexer which handles numerous message types
304 * came back).
305 *
306 * 208 *
307 */ 209 */
308static int retransmit_iterator (void *cls, 210static void
309 const GNUNET_HashCode * key, 211service_message_handler (void *cls,
310 void *value) 212 const struct GNUNET_MessageHeader *msg);
311{ 213
312 struct GNUNET_DHT_RouteHandle *route_handle = value; 214
313 struct PendingMessageList *pending_message_list; 215
314
315 pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + sizeof(struct PendingMessage));
316 pending_message_list->message = (struct PendingMessage *)&pending_message_list[1];
317 pending_message_list->message->msg = &route_handle->message->header;
318 pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
319 pending_message_list->message->cont = NULL;
320 pending_message_list->message->cont_cls = NULL;
321 pending_message_list->message->unique_id = route_handle->uid;
322 /* Add the new pending message to the front of the retransmission list */
323 pending_message_list->next = route_handle->dht_handle->retransmissions;
324 route_handle->dht_handle->retransmissions = pending_message_list;
325
326 return GNUNET_OK;
327}
328#endif
329 216
330/** 217/**
331 * Try to (re)connect to the dht service. 218 * Try to (re)connect to the DHT service.
332 * 219 *
333 * @return GNUNET_YES on success, GNUNET_NO on failure. 220 * @return GNUNET_YES on success, GNUNET_NO on failure.
334 */ 221 */
@@ -338,304 +225,233 @@ try_connect (struct GNUNET_DHT_Handle *handle)
338 if (handle->client != NULL) 225 if (handle->client != NULL)
339 return GNUNET_OK; 226 return GNUNET_OK;
340 handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg); 227 handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
341 if (handle->client != NULL) 228 if (handle->client == NULL)
342 return GNUNET_YES; 229 {
343#if DEBUG_STATISTICS 230 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
344 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 231 _("Failed to connect to the DHT service!\n"));
345 _("Failed to connect to the dht service!\n")); 232 return GNUNET_NO;
346#endif 233 }
347 return GNUNET_NO; 234 GNUNET_CLIENT_receive (handle->client,
235 &service_message_handler,
236 handle,
237 GNUNET_TIME_UNIT_FOREVER_REL);
238 return GNUNET_YES;
348} 239}
349 240
241
350/** 242/**
351 * Send complete (or failed), call continuation if we have one. 243 * Add the request corresponding to the given route handle
244 * to the pending queue (if it is not already in there).
245 *
246 * @param cls the 'struct GNUNET_DHT_Handle*'
247 * @param key key for the request (not used)
248 * @param value the 'struct GNUNET_DHT_RouteHandle*'
249 * @return GNUNET_YES (always)
352 */ 250 */
353static void 251static int
354finish (struct GNUNET_DHT_Handle *handle, int code) 252add_request_to_pending (void *cls,
253 const GNUNET_HashCode *key,
254 void *value)
355{ 255{
356 struct PendingMessage *pos = handle->current; 256 struct GNUNET_DHT_Handle *handle = cls;
357 GNUNET_HashCode uid_hash; 257 struct GNUNET_DHT_RouteHandle *rh = value;
358#if DEBUG_DHT_API 258
359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API"); 259 if (GNUNET_NO == rh->message->in_pending_queue)
360#endif
361 GNUNET_assert (pos != NULL);
362 hash_from_uid (pos->unique_id, &uid_hash);
363 if (pos->cont != NULL)
364 { 260 {
365 if (code == GNUNET_SYSERR) 261 GNUNET_CONTAINER_DLL_insert (handle->pending_head,
366 GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont, 262 handle->pending_tail,
367 pos->cont_cls, 263 rh->message);
368 GNUNET_SCHEDULER_REASON_TIMEOUT); 264 rh->message->in_pending_queue = GNUNET_YES;
369 else
370 GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
371 pos->cont_cls,
372 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
373 } 265 }
374 266 return GNUNET_YES;
375 GNUNET_assert(handle->th == NULL);
376 if (pos->free_on_send == GNUNET_YES)
377 GNUNET_free(pos->msg);
378 GNUNET_free (pos);
379 handle->current = NULL;
380} 267}
381 268
269
382/** 270/**
383 * Transmit the next pending message, called by notify_transmit_ready 271 * Re-connect to the DHT, re-issue all pending requests if needed.
384 */ 272 */
385static size_t 273static void
386transmit_pending (void *cls, size_t size, void *buf) 274reconnect (struct GNUNET_DHT_Handle *handle)
387{ 275{
388 struct GNUNET_DHT_Handle *handle = cls; 276 if (handle->client != NULL)
389 size_t tsize;
390
391#if DEBUG_DHT_API
392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
393 "`%s': In transmit_pending\n", "DHT API");
394#endif
395 handle->th = NULL;
396
397 if (buf == NULL)
398 {
399#if DEBUG_DHT_API
400 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
401 "`%s': In transmit_pending buf is NULL\n", "DHT API");
402#endif
403 finish (handle, GNUNET_SYSERR);
404 return 0;
405 }
406
407 if (handle->current != NULL)
408 { 277 {
409 tsize = ntohs (handle->current->msg->size); 278 GNUNET_CLIENT_disconnect (handle->client,
410 if (size >= tsize) 279 GNUNET_NO);
411 { 280 handle->client = NULL;
412#if DEBUG_DHT_API
413 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
414 "`%s': Sending message size %d\n", "DHT API", tsize);
415#endif
416 memcpy (buf, handle->current->msg, tsize);
417 finish (handle, GNUNET_OK);
418 return tsize;
419 }
420 else
421 {
422 return 0;
423 }
424 } 281 }
425 /* Have no pending request */ 282 if (GNUNET_YES != try_connect (handle))
426 return 0; 283 return;
284 GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
285 &add_request_to_pending,
286 handle);
287 if (handle->pending_head == NULL)
288 return;
289 GNUNET_CLIENT_notify_transmit_ready (handle->client,
290 ntohs(handle->pending_head->msg->size),
291 GNUNET_TIME_UNIT_FOREVER_REL,
292 GNUNET_NO,
293 &transmit_pending,
294 handle);
295
427} 296}
428 297
298
429/** 299/**
430 * Try to send messages from list of messages to send 300 * Try to send messages from list of messages to send
431 */ 301 */
432static void 302static void
433process_pending_message (struct GNUNET_DHT_Handle *handle) 303process_pending_messages (struct GNUNET_DHT_Handle *handle)
434{ 304{
305 struct PendingMessage *head;
435 306
436 if (handle->current == NULL)
437 return; /* action already pending */
438 if (GNUNET_YES != try_connect (handle)) 307 if (GNUNET_YES != try_connect (handle))
439 { 308 return;
440 handle->th = NULL; 309 if (handle->th != NULL)
441 finish (handle, GNUNET_SYSERR); 310 return;
442 return; 311 if (NULL == (head = handle->pending_head))
443 } 312 return;
444 313 handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
445 if (NULL == 314 ntohs (head->msg->size),
446 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, 315 GNUNET_TIME_UNIT_FOREVER_REL,
447 ntohs (handle-> 316 GNUNET_YES,
448 current->msg-> 317 &transmit_pending,
449 size), 318 handle);
450 handle->current-> 319 if (NULL == handle->th)
451 timeout, GNUNET_YES, 320 {
452 &transmit_pending, 321 reconnect (handle);
453 handle)))
454 {
455#if DEBUG_DHT_API
456 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
457 "Failed to transmit request to dht service.\n");
458#endif
459 finish (handle, GNUNET_SYSERR);
460 return; 322 return;
461 } 323 }
462#if DEBUG_DHT_API
463 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
464 "`%s': Scheduled sending message of size %d to service\n",
465 "DHT API", ntohs (handle->current->msg->size));
466#endif
467} 324}
468 325
469/**
470 * Send complete (or failed), call continuation if we have one.
471 * Forward declaration.
472 */
473static void
474finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
475
476/* Forward declaration */
477static size_t
478transmit_pending_retransmission (void *cls, size_t size, void *buf);
479 326
480/** 327/**
481 * Try to send messages from list of messages to send 328 * Transmit the next pending message, called by notify_transmit_ready
482 */ 329 */
483static void 330static size_t
484process_pending_retransmissions (struct GNUNET_DHT_Handle *handle) 331transmit_pending (void *cls,
332 size_t size,
333 void *buf)
485{ 334{
335 struct GNUNET_DHT_Handle *handle = cls;
336 struct PendingMessage *head;
337 size_t tsize;
486 338
487 if (handle->current == NULL) 339 handle->th = NULL;
488 return; /* action already pending */ 340 if (buf == NULL)
489 if (GNUNET_YES != try_connect (handle))
490 { 341 {
491 finish_retransmission (handle, GNUNET_SYSERR); 342 reconnect (handle);
492 return; 343 return 0;
493 } 344 }
494 345 if (NULL == (head = handle->pending_head))
495 if (NULL == 346 return 0;
496 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, 347
497 ntohs (handle-> 348 tsize = ntohs (head->msg->size);
498 current->msg-> 349 if (size < tsize)
499 size),
500 handle->current->
501 timeout, GNUNET_YES,
502 &transmit_pending_retransmission,
503 handle)))
504 { 350 {
505#if DEBUG_DHT_API 351 process_pending_messages (handle);
506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 352 return 0;
507 "Failed to transmit request to dht service.\n");
508#endif
509 finish_retransmission (handle, GNUNET_SYSERR);
510 return;
511 } 353 }
512#if DEBUG_DHT_API 354 memcpy (buf, head->msg, tsize);
513 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 355 GNUNET_CONTAINER_DLL_remove (handle->pending_head,
514 "`%s': Scheduled sending message of size %d to service\n", 356 handle->pending_tail,
515 "DHT API", ntohs (handle->current->msg->size)); 357 head);
516#endif 358 if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
359 {
360 GNUNET_SCHEDULER_cancel (handle->sched,
361 head->timeout_task);
362 head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
363 }
364 if (NULL != head->cont)
365 {
366 GNUNET_SCHEDULER_add_now (handle->sched,
367 head->cont,
368 head->cont_cls);
369 head->cont = NULL;
370 head->cont_cls = NULL;
371 }
372 head->in_pending_queue = GNUNET_NO;
373 if (GNUNET_YES == head->free_on_send)
374 GNUNET_free (head);
375 process_pending_messages (handle);
376 return tsize;
517} 377}
518 378
379
380
381
519/** 382/**
520 * Send complete (or failed), call continuation if we have one. 383 * Process a given reply that might match the given
384 * request.
521 */ 385 */
522static void 386static int
523finish_retransmission (struct GNUNET_DHT_Handle *handle, int code) 387process_reply (void *cls,
388 const GNUNET_HashCode *key,
389 void *value)
524{ 390{
525 struct PendingMessage *pos = handle->current; 391 const struct GNUNET_DHT_RouteResultMessage *dht_msg = cls;
526 struct PendingMessageList *pending_list; 392 struct GNUNET_DHT_RouteHandle *rh = value;
527#if DEBUG_DHT_API 393 const struct GNUNET_MessageHeader *enc_msg;
528 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) called!\n", "DHT API"); 394 size_t enc_size;
529#endif 395 uint64_t uid;
530 GNUNET_assert (pos == handle->retransmissions->message);
531 pending_list = handle->retransmissions;
532 handle->retransmissions = handle->retransmissions->next;
533 GNUNET_free (pending_list);
534
535 if (handle->retransmissions == NULL)
536 {
537 handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
538 }
539 396
540 if (handle->retransmissions != NULL) 397 uid = GNUNET_ntohll (dht_msg->unique_id);
398 if (uid != rh->uid)
399 return GNUNET_YES;
400 enc_size = ntohs (dht_msg->header.size) - sizeof (struct GNUNET_DHT_RouteResultMessage);
401 if (enc_size < sizeof (struct GNUNET_MessageHeader))
541 { 402 {
542 handle->current = handle->retransmissions->message; 403 GNUNET_break (0);
543 process_pending_retransmissions(handle); 404 return GNUNET_NO;
544 } 405 }
545 else if (handle->retransmission_buffer != NULL) 406 enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
407 if (enc_size != ntohs (enc_msg->size))
546 { 408 {
547 handle->current = handle->retransmission_buffer; 409 GNUNET_break (0);
548 process_pending_message(handle); 410 return GNUNET_NO;
549 } 411 }
412 rh->iter (rh->iter_cls,
413 &rh->key,
414 enc_msg);
415 return GNUNET_YES;
550} 416}
551 417
418
552/** 419/**
553 * Handler for messages received from the DHT service 420 * Handler for messages received from the DHT service
554 * a demultiplexer which handles numerous message types 421 * a demultiplexer which handles numerous message types
555 * 422 *
423 * @param cls the 'struct GNUNET_DHT_Handle'
424 * @param msg the incoming message
556 */ 425 */
557void 426static void
558service_message_handler (void *cls, 427service_message_handler (void *cls,
559 const struct GNUNET_MessageHeader *msg) 428 const struct GNUNET_MessageHeader *msg)
560{ 429{
561 struct GNUNET_DHT_Handle *handle = cls; 430 struct GNUNET_DHT_Handle *handle = cls;
562 struct GNUNET_DHT_RouteResultMessage *dht_msg; 431 const struct GNUNET_DHT_RouteResultMessage *dht_msg;
563 struct GNUNET_MessageHeader *enc_msg; 432
564 struct GNUNET_DHT_RouteHandle *route_handle;
565 uint64_t uid;
566 GNUNET_HashCode uid_hash;
567 size_t enc_size;
568
569 if (msg == NULL) 433 if (msg == NULL)
570 { 434 {
571#if DEBUG_DHT_API 435 reconnect (handle);
572 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573 "`%s': Received NULL from server, connection down!\n",
574 "DHT API");
575#endif
576 GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
577 handle->client = GNUNET_CLIENT_connect (handle->sched,
578 "dht",
579 handle->cfg);
580 if (handle->current != NULL)
581 {
582 handle->th = NULL;
583 finish(handle, GNUNET_SYSERR); /* If there was a current message, kill it! */
584 }
585#if RETRANSMIT
586 if ((handle->retransmit_stage != DHT_RETRANSMITTING) && (GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, &retransmit_iterator, handle) > 0))
587 {
588 handle->retransmit_stage = DHT_RETRANSMITTING;
589 handle->current = handle->retransmissions->message;
590 process_pending_retransmissions(handle);
591 }
592#endif
593 return; 436 return;
594 } 437 }
595 438 if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT)
596 switch (ntohs (msg->type))
597 { 439 {
598 case GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT: 440 GNUNET_break (0);
599 { 441 reconnect (handle);
600 dht_msg = (struct GNUNET_DHT_RouteResultMessage *) msg; 442 return;
601 uid = GNUNET_ntohll (dht_msg->unique_id);
602#if DEBUG_DHT_API
603 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
604 "`%s': Received response to message (uid %llu)\n",
605 "DHT API", uid);
606#endif
607
608 hash_from_uid (uid, &uid_hash);
609 route_handle =
610 GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
611 &uid_hash);
612 if (route_handle == NULL) /* We have no recollection of this request */
613 {
614#if DEBUG_DHT_API
615 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616 "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
617 "DHT API", uid);
618#endif
619 }
620 else
621 {
622 enc_size =
623 ntohs (dht_msg->header.size) -
624 sizeof (struct GNUNET_DHT_RouteResultMessage);
625 GNUNET_assert (enc_size > 0);
626 enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
627 route_handle->iter (route_handle->iter_cls, enc_msg);
628 }
629
630 break;
631 }
632 default:
633 {
634 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
635 "`%s': Received unknown message type %d\n", "DHT API",
636 ntohs (msg->type));
637 }
638 } 443 }
444 if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_RouteResultMessage))
445 {
446 GNUNET_break (0);
447 reconnect (handle);
448 return;
449 }
450 dht_msg = (const struct GNUNET_DHT_RouteResultMessage *) msg;
451 GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
452 &dht_msg->key,
453 &process_reply,
454 (void*) dht_msg);
639 GNUNET_CLIENT_receive (handle->client, 455 GNUNET_CLIENT_receive (handle->client,
640 &service_message_handler, 456 &service_message_handler,
641 handle, GNUNET_TIME_UNIT_FOREVER_REL); 457 handle, GNUNET_TIME_UNIT_FOREVER_REL);
@@ -663,22 +479,13 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
663 handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle)); 479 handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
664 handle->cfg = cfg; 480 handle->cfg = cfg;
665 handle->sched = sched; 481 handle->sched = sched;
666 handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg); 482 handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
667 handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1); 483 handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
668 if (handle->client == NULL) 484 if (GNUNET_NO == try_connect (handle))
669 { 485 {
670 GNUNET_free (handle); 486 GNUNET_DHT_disconnect (handle);
671 return NULL; 487 return NULL;
672 } 488 }
673 handle->outstanding_requests =
674 GNUNET_CONTAINER_multihashmap_create (ht_len);
675#if DEBUG_DHT_API
676 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
677 "`%s': Connection to service in progress\n", "DHT API");
678#endif
679 GNUNET_CLIENT_receive (handle->client,
680 &service_message_handler,
681 handle, GNUNET_TIME_UNIT_FOREVER_REL);
682 return handle; 489 return handle;
683} 490}
684 491
@@ -691,327 +498,73 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
691void 498void
692GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) 499GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
693{ 500{
694#if DEBUG_DHT_API 501 struct PendingMessage *pm;
695 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 502
696 "`%s': Called GNUNET_DHT_disconnect\n", "DHT API"); 503 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size(handle->active_requests));
697#endif 504 if (handle->th != NULL)
698 GNUNET_assert (handle != NULL);
699 if (handle->th != NULL) /* We have a live transmit request */
700 { 505 {
701 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); 506 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
702 handle->th = NULL; 507 handle->th = NULL;
703 } 508 }
704 if (handle->current != NULL) /* We are trying to send something now, clean it up */ 509 while (NULL != (pm = handle->pending_head))
705 GNUNET_free (handle->current); 510 {
706 511 GNUNET_CONTAINER_DLL_remove (handle->pending_head,
707 if (handle->client != NULL) /* Finally, disconnect from the service */ 512 handle->pending_tail,
513 pm);
514 GNUNET_assert (GNUNET_YES == pm->free_on_send);
515 if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
516 GNUNET_SCHEDULER_cancel (handle->sched,
517 pm->timeout_task);
518 if (NULL != pm->cont)
519 GNUNET_SCHEDULER_add_continuation (handle->sched,
520 pm->cont,
521 pm->cont_cls,
522 GNUNET_SCHEDULER_REASON_TIMEOUT);
523 pm->in_pending_queue = GNUNET_NO;
524 GNUNET_free (pm);
525 }
526 if (handle->client != NULL)
708 { 527 {
709 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); 528 GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
710 handle->client = NULL; 529 handle->client = NULL;
711 } 530 }
712 531 GNUNET_CONTAINER_multihashmap_destroy(handle->active_requests);
713 GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) == 0);
714 GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
715 GNUNET_free (handle); 532 GNUNET_free (handle);
716} 533}
717 534
718 535
719/**
720 * Transmit the next pending message, called by notify_transmit_ready
721 */
722static size_t
723transmit_pending_retransmission (void *cls, size_t size, void *buf)
724{
725 struct GNUNET_DHT_Handle *handle = cls;
726 size_t tsize;
727
728#if DEBUG_DHT_API
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "`%s': In transmit_pending\n", "DHT API");
731#endif
732 if (buf == NULL)
733 {
734#if DEBUG_DHT_API
735 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
736 "`%s': In transmit_pending buf is NULL\n", "DHT API");
737#endif
738 finish_retransmission (handle, GNUNET_SYSERR);
739 return 0;
740 }
741
742 handle->th = NULL;
743
744 if (handle->current != NULL)
745 {
746 tsize = ntohs (handle->current->msg->size);
747 if (size >= tsize)
748 {
749#if DEBUG_DHT_API
750 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751 "`%s': Sending message size %d\n", "DHT API", tsize);
752#endif
753 memcpy (buf, handle->current->msg, tsize);
754 finish_retransmission (handle, GNUNET_OK);
755 return tsize;
756 }
757 else
758 {
759 return 0;
760 }
761 }
762 /* Have no pending request */
763 return 0;
764}
765
766
767/**
768 * Iterator called on each result obtained from a generic route
769 * operation
770 */
771void
772get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
773{
774 struct GNUNET_DHT_GetHandle *get_handle = cls;
775 struct GNUNET_DHT_GetResultMessage *result;
776 size_t data_size;
777 char *result_data;
778
779 if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
780 return;
781
782 GNUNET_assert (ntohs (reply->size) >=
783 sizeof (struct GNUNET_DHT_GetResultMessage));
784 result = (struct GNUNET_DHT_GetResultMessage *) reply;
785 data_size = ntohs (reply->size) - sizeof(struct GNUNET_DHT_GetResultMessage);
786
787 result_data = (char *) &result[1]; /* Set data pointer to end of message */
788
789 get_handle->get_context.iter (get_handle->get_context.iter_cls,
790 GNUNET_TIME_absolute_ntoh (result->expiration), &get_handle->route_handle->key,
791 ntohs (result->type), data_size, result_data);
792}
793
794
795/**
796 * Iterator called on each result obtained from a generic route
797 * operation
798 */
799void
800find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
801{
802 struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
803 struct GNUNET_MessageHeader *hello;
804
805 if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
806 {
807 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808 "Received wrong type of response to a find peer request...\n");
809 return;
810 }
811
812
813 GNUNET_assert (ntohs (reply->size) >=
814 sizeof (struct GNUNET_MessageHeader));
815 hello = (struct GNUNET_MessageHeader *)&reply[1];
816
817 if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
818 {
819 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
820 "Encapsulated message of type %d, is not a `%s' message!\n", ntohs(hello->type), "HELLO");
821 return;
822 }
823 find_peer_handle->find_peer_context.proc (find_peer_handle->
824 find_peer_context.proc_cls,
825 (struct GNUNET_HELLO_Message *)hello);
826}
827
828/**
829 * Send a message to the DHT telling it to start issuing random GET
830 * requests every 'frequency' milliseconds.
831 *
832 * @param handle handle to the DHT service
833 * @param frequency delay (in milliseconds) between sending malicious messages
834 * @param cont continuation to call once the message is sent
835 * @param cont_cls closure for continuation
836 *
837 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
838 */
839int GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
840{
841 struct GNUNET_DHT_ControlMessage *msg;
842 struct PendingMessage *pending;
843
844 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
845 return GNUNET_NO;
846
847 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
848 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
849 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
850 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET);
851 msg->variable = htons(frequency);
852
853 pending = GNUNET_malloc (sizeof (struct PendingMessage));
854 pending->msg = &msg->header;
855 pending->timeout = GNUNET_TIME_relative_get_forever();
856 pending->free_on_send = GNUNET_YES;
857 pending->cont = cont;
858 pending->cont_cls = cont_cls;
859 pending->unique_id = 0;
860
861 if (handle->current == NULL)
862 {
863 handle->current = pending;
864 process_pending_message (handle);
865 }
866 else
867 {
868 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
869 handle->retransmission_buffer = pending;
870 }
871
872 return GNUNET_YES;
873}
874
875/**
876 * Send a message to the DHT telling it to issue a single find
877 * peer request using the peers unique identifier as key. This
878 * is used to fill the routing table, and is normally controlled
879 * by the DHT itself. However, for testing and perhaps more
880 * close control over the DHT, this can be explicitly managed.
881 *
882 * @param handle handle to the DHT service
883 * @param cont continuation to call once the message is sent
884 * @param cont_cls closure for continuation
885 *
886 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
887 */
888int GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
889 GNUNET_SCHEDULER_Task cont, void *cont_cls)
890{
891 struct GNUNET_DHT_ControlMessage *msg;
892 struct PendingMessage *pending;
893
894 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
895 return GNUNET_NO;
896
897 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
898 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
899 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
900 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
901
902 pending = GNUNET_malloc (sizeof (struct PendingMessage));
903 pending->msg = &msg->header;
904 pending->timeout = GNUNET_TIME_relative_get_forever();
905 pending->free_on_send = GNUNET_YES;
906 pending->cont = cont;
907 pending->cont_cls = cont_cls;
908 pending->unique_id = 0;
909
910 if (handle->current == NULL)
911 {
912 handle->current = pending;
913 process_pending_message (handle);
914 }
915 else
916 {
917 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
918 handle->retransmission_buffer = pending;
919 }
920
921 return GNUNET_YES;
922}
923
924/**
925 * Send a message to the DHT telling it to start issuing random PUT
926 * requests every 'frequency' milliseconds.
927 *
928 * @param handle handle to the DHT service
929 * @param frequency delay (in milliseconds) between sending malicious messages
930 * @param cont continuation to call once the message is sent
931 * @param cont_cls closure for continuation
932 *
933 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not
934 */
935int GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle, int frequency, GNUNET_SCHEDULER_Task cont, void *cont_cls)
936{
937 struct GNUNET_DHT_ControlMessage *msg;
938 struct PendingMessage *pending;
939
940 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
941 return GNUNET_NO;
942 536
943 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
944 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
945 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
946 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT);
947 msg->variable = htons(frequency);
948 537
949 pending = GNUNET_malloc (sizeof (struct PendingMessage)); 538/* ***** Special low-level API providing generic routeing abstraction ***** */
950 pending->msg = &msg->header;
951 pending->timeout = GNUNET_TIME_relative_get_forever();
952 pending->free_on_send = GNUNET_YES;
953 pending->cont = cont;
954 pending->cont_cls = cont_cls;
955 pending->unique_id = 0;
956 539
957 if (handle->current == NULL)
958 {
959 handle->current = pending;
960 process_pending_message (handle);
961 }
962 else
963 {
964 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
965 handle->retransmission_buffer = pending;
966 }
967
968 return GNUNET_YES;
969}
970 540
971/** 541/**
972 * Send a message to the DHT telling it to start dropping 542 * Timeout for the transmission of a fire&forget-request. Clean it up.
973 * all requests received.
974 *
975 * @param handle handle to the DHT service
976 * @param cont continuation to call once the message is sent
977 * @param cont_cls closure for continuation
978 * 543 *
979 * @return GNUNET_YES if the control message was sent, GNUNET_NO if not 544 * @param cls the 'struct PendingMessage'
545 * @param tc scheduler context
980 */ 546 */
981int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_SCHEDULER_Task cont, void *cont_cls) 547static void
548timeout_route_request (void *cls,
549 const struct GNUNET_SCHEDULER_TaskContext *tc)
982{ 550{
983 struct GNUNET_DHT_ControlMessage *msg; 551 struct PendingMessage *pending = cls;
984 struct PendingMessage *pending; 552 struct GNUNET_DHT_Handle *handle;
985
986 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
987 return GNUNET_NO;
988
989 msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_ControlMessage));
990 msg->header.size = htons(sizeof(struct GNUNET_DHT_ControlMessage));
991 msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_CONTROL);
992 msg->command = htons(GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP);
993 msg->variable = htons(0);
994
995 pending = GNUNET_malloc (sizeof (struct PendingMessage));
996 pending->msg = &msg->header;
997 pending->timeout = GNUNET_TIME_relative_get_forever();
998 pending->free_on_send = GNUNET_YES;
999 pending->cont = cont;
1000 pending->cont_cls = cont_cls;
1001 pending->unique_id = 0;
1002 553
1003 if (handle->current == NULL) 554 if (pending->free_on_send != GNUNET_YES)
1004 { 555 {
1005 handle->current = pending; 556 /* timeouts should only apply to fire & forget requests! */
1006 process_pending_message (handle); 557 GNUNET_break (0);
558 return;
1007 } 559 }
1008 else 560 handle = pending->handle;
1009 { 561 GNUNET_CONTAINER_DLL_remove (handle->pending_head,
1010 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; 562 handle->pending_tail,
1011 handle->retransmission_buffer = pending; 563 pending);
1012 } 564 if (pending->cont != NULL)
1013 565 pending->cont (pending->cont_cls,
1014 return GNUNET_YES; 566 tc);
567 GNUNET_free (pending);
1015} 568}
1016 569
1017 570
@@ -1029,398 +582,259 @@ int GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle, GNUNET_S
1029 * @param iter_cls closure for iter 582 * @param iter_cls closure for iter
1030 * @param timeout when to abort with an error if we fail to get 583 * @param timeout when to abort with an error if we fail to get
1031 * a confirmation for the request (when necessary) or how long 584 * a confirmation for the request (when necessary) or how long
1032 * to wait for tramission to the service 585 * to wait for tramission to the service; only applies
1033 * @param cont continuation to call when done; 586 * if 'iter' is NULL
1034 * reason will be TIMEOUT on error, 587 * @param cont continuation to call when the request has been transmitted
1035 * reason will be PREREQ_DONE on success 588 * the first time to the service
1036 * @param cont_cls closure for cont 589 * @param cont_cls closure for cont
1037 *
1038 * @return handle to stop the request, NULL if the request is "fire and forget" 590 * @return handle to stop the request, NULL if the request is "fire and forget"
1039 */ 591 */
1040struct GNUNET_DHT_RouteHandle * 592struct GNUNET_DHT_RouteHandle *
1041GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, 593GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
1042 const GNUNET_HashCode * key, 594 const GNUNET_HashCode * key,
1043 unsigned int desired_replication_level, 595 uint32_t desired_replication_level,
1044 enum GNUNET_DHT_RouteOption options, 596 enum GNUNET_DHT_RouteOption options,
1045 const struct GNUNET_MessageHeader *enc, 597 const struct GNUNET_MessageHeader *enc,
1046 struct GNUNET_TIME_Relative timeout, 598 struct GNUNET_TIME_Relative timeout,
1047 GNUNET_DHT_ReplyProcessor iter, 599 GNUNET_DHT_ReplyProcessor iter,
1048 void *iter_cls, 600 void *iter_cls,
1049 GNUNET_SCHEDULER_Task cont, void *cont_cls) 601 GNUNET_SCHEDULER_Task cont,
602 void *cont_cls)
1050{ 603{
1051 struct GNUNET_DHT_RouteHandle *route_handle;
1052 struct PendingMessage *pending; 604 struct PendingMessage *pending;
1053 struct GNUNET_DHT_RouteMessage *message; 605 struct GNUNET_DHT_RouteMessage *message;
606 struct GNUNET_DHT_RouteHandle *route_handle;
1054 uint16_t msize; 607 uint16_t msize;
1055 GNUNET_HashCode uid_key; 608 uint16_t esize;
1056
1057 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1058 return NULL;
1059 609
1060 if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 610 esize = ntohs (enc->size);
611 if (sizeof (struct GNUNET_DHT_RouteMessage) + esize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1061 { 612 {
1062 GNUNET_break (0); 613 GNUNET_break (0);
1063 return NULL; 614 return NULL;
1064 } 615 }
1065 616 msize = sizeof (struct GNUNET_DHT_RouteMessage) + esize;
1066 route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle)); 617 pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1067 memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode)); 618 message = (struct GNUNET_DHT_RouteMessage*) &pending[1];
1068 route_handle->iter = iter;
1069 route_handle->iter_cls = iter_cls;
1070 route_handle->dht_handle = handle;
1071 route_handle->uid = handle->uid_gen++;
1072 if (iter != NULL)
1073 {
1074 hash_from_uid (route_handle->uid, &uid_key);
1075 GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
1076 &uid_key, route_handle,
1077 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1078 }
1079
1080#if DEBUG_DHT_API
1081 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1082 "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
1083#endif
1084
1085 msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
1086 message = GNUNET_malloc (msize);
1087 message->header.size = htons (msize);
1088 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
1089 memcpy (&message->key, key, sizeof (GNUNET_HashCode));
1090 message->options = htonl (options);
1091 message->desired_replication_level = htonl (desired_replication_level);
1092 message->unique_id = GNUNET_htonll (route_handle->uid);
1093 memcpy (&message[1], enc, ntohs (enc->size));
1094 pending = GNUNET_malloc (sizeof (struct PendingMessage));
1095 pending->msg = &message->header; 619 pending->msg = &message->header;
1096 pending->timeout = timeout; 620 pending->handle = handle;
1097 if (iter == NULL)
1098 pending->free_on_send = GNUNET_YES;
1099 pending->cont = cont; 621 pending->cont = cont;
1100 pending->cont_cls = cont_cls; 622 pending->cont_cls = cont_cls;
1101 pending->unique_id = route_handle->uid; 623
1102 if (handle->current == NULL) 624 message->header.size = htons (msize);
625 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
626 message->key = *key;
627 message->options = htonl ((uint32_t) options);
628 message->desired_replication_level = htonl (desired_replication_level);
629 memcpy (&message[1], enc, esize);
630 if (iter != NULL)
1103 { 631 {
1104 handle->current = pending; 632 route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
1105 process_pending_message (handle); 633 route_handle->key = *key;
634 route_handle->iter = iter;
635 route_handle->iter_cls = iter_cls;
636 route_handle->dht_handle = handle;
637 route_handle->uid = handle->uid_gen++;
638 route_handle->message = pending;
639 message->unique_id = GNUNET_htonll (route_handle->uid);
640 GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
641 key,
642 route_handle,
643 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1106 } 644 }
1107 else 645 else
1108 { 646 {
1109 handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; 647 route_handle = NULL;
1110 handle->retransmission_buffer = pending; 648 pending->free_on_send = GNUNET_YES;
1111 } 649 pending->timeout_task = GNUNET_SCHEDULER_add_delayed (handle->sched,
1112 650 timeout,
1113 route_handle->message = message; 651 &timeout_route_request,
652 pending);
653 }
654 GNUNET_CONTAINER_DLL_insert (handle->pending_head,
655 handle->pending_tail,
656 pending);
657 pending->in_pending_queue = GNUNET_YES;
658 process_pending_messages (handle);
1114 return route_handle; 659 return route_handle;
1115} 660}
1116 661
1117 662
1118/** 663/**
1119 * Perform an asynchronous GET operation on the DHT identified.
1120 *
1121 * @param handle handle to the DHT service
1122 * @param timeout how long to wait for transmission of this request to the service
1123 * @param type expected type of the response object
1124 * @param key the key to look up
1125 * @param iter function to call on each result
1126 * @param iter_cls closure for iter
1127 * @param cont continuation to call once message sent
1128 * @param cont_cls closure for continuation
1129 *
1130 * @return handle to stop the async get
1131 */
1132struct GNUNET_DHT_GetHandle *
1133GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1134 struct GNUNET_TIME_Relative timeout,
1135 enum GNUNET_BLOCK_Type type,
1136 const GNUNET_HashCode * key,
1137 GNUNET_DHT_GetIterator iter,
1138 void *iter_cls,
1139 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1140{
1141 struct GNUNET_DHT_GetHandle *get_handle;
1142 struct GNUNET_DHT_GetMessage get_msg;
1143
1144 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
1145 return NULL;
1146
1147 get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
1148 get_handle->get_context.iter = iter;
1149 get_handle->get_context.iter_cls = iter_cls;
1150
1151#if DEBUG_DHT_API
1152 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1153 "`%s': Inserting pending get request with key %s\n", "DHT API",
1154 GNUNET_h2s (key));
1155#endif
1156
1157 get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
1158 get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
1159 get_msg.type = htons (type);
1160
1161 get_handle->route_handle =
1162 GNUNET_DHT_route_start (handle, key, DEFAULT_GET_REPLICATION, 0, &get_msg.header, timeout,
1163 &get_reply_iterator, get_handle, cont, cont_cls);
1164
1165 return get_handle;
1166}
1167
1168
1169/**
1170 * Stop a previously issued routing request 664 * Stop a previously issued routing request
1171 * 665 *
1172 * @param route_handle handle to the request to stop 666 * @param route_handle handle to the request to stop
1173 * @param cont continuation to call once this message is sent to the service or times out
1174 * @param cont_cls closure for the continuation
1175 */ 667 */
1176void 668void
1177GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, 669GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle)
1178 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1179{ 670{
671 struct GNUNET_DHT_Handle *handle;
1180 struct PendingMessage *pending; 672 struct PendingMessage *pending;
1181 struct GNUNET_DHT_StopMessage *message; 673 struct GNUNET_DHT_StopMessage *message;
1182 size_t msize; 674 size_t msize;
1183 GNUNET_HashCode uid_key;
1184
1185 msize = sizeof (struct GNUNET_DHT_StopMessage);
1186 message = GNUNET_malloc (msize);
1187 message->header.size = htons (msize);
1188 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
1189#if DEBUG_DHT_API
1190 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1191 "`%s': Remove outstanding request for uid %llu\n", "DHT API",
1192 route_handle->uid);
1193#endif
1194 message->unique_id = GNUNET_htonll (route_handle->uid);
1195 memcpy(&message->key, &route_handle->key, sizeof(GNUNET_HashCode));
1196 pending = GNUNET_malloc (sizeof (struct PendingMessage));
1197 pending->msg = (struct GNUNET_MessageHeader *) message;
1198 pending->timeout = GNUNET_TIME_relative_get_forever();
1199 pending->cont = cont;
1200 pending->cont_cls = cont_cls;
1201 pending->free_on_send = GNUNET_YES;
1202 pending->unique_id = 0; /* When finished is called, free pending->msg */
1203 675
1204 if (route_handle->dht_handle->current == NULL) 676 handle = route_handle->dht_handle;
1205 { 677 if (GNUNET_NO == route_handle->message->in_pending_queue)
1206 route_handle->dht_handle->current = pending; 678 {
1207 process_pending_message (route_handle->dht_handle); 679 /* need to send stop message */
1208 } 680 msize = sizeof (struct GNUNET_DHT_StopMessage);
1209 else if (route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING) 681 pending = GNUNET_malloc (sizeof (struct PendingMessage) +
1210 { 682 msize);
1211 route_handle->dht_handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED; 683 message = (struct GNUNET_DHT_StopMessage*) &pending[1];
1212 route_handle->dht_handle->retransmission_buffer = pending; 684 pending->msg = &message->header;
685 message->header.size = htons (msize);
686 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
687 message->unique_id = GNUNET_htonll (route_handle->uid);
688 message->key = route_handle->key;
689 pending->handle = handle;
690 pending->free_on_send = GNUNET_YES;
691 pending->in_pending_queue = GNUNET_YES;
692 GNUNET_CONTAINER_DLL_insert (handle->pending_head,
693 handle->pending_tail,
694 pending);
695 process_pending_messages (handle);
1213 } 696 }
1214 else 697 else
1215 { 698 {
1216 GNUNET_free(pending); 699 /* simply remove pending request from message queue before
1217 GNUNET_break(0); 700 transmission, no need to transmit STOP request! */
701 GNUNET_CONTAINER_DLL_remove (handle->pending_head,
702 handle->pending_tail,
703 route_handle->message);
1218 } 704 }
1219 705 GNUNET_assert (GNUNET_YES ==
1220 hash_from_uid (route_handle->uid, &uid_key); 706 GNUNET_CONTAINER_multihashmap_remove (route_handle->dht_handle->active_requests,
1221 GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove 707 &route_handle->key,
1222 (route_handle->dht_handle->outstanding_requests, &uid_key, 708 route_handle));
1223 route_handle) == GNUNET_YES);
1224
1225 GNUNET_free(route_handle->message); 709 GNUNET_free(route_handle->message);
1226 GNUNET_free(route_handle); 710 GNUNET_free(route_handle);
1227} 711}
1228 712
1229 713
714
715/* ***** Special API for controlling DHT routing maintenance ******* */
716
717
1230/** 718/**
1231 * Stop async DHT-get. 719 * Send a control message to the DHT.
1232 * 720 *
1233 * @param get_handle handle to the GET operation to stop 721 * @param handle handle to the DHT service
1234 * @param cont continuation to call once this message is sent to the service or times out 722 * @param command command
1235 * @param cont_cls closure for the continuation 723 * @param variable variable to the command
724 * @param cont continuation to call when done (transmitting request to service)
725 * @param cont_cls closure for cont
1236 */ 726 */
1237void 727static void
1238GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, 728send_control_message (struct GNUNET_DHT_Handle *handle,
1239 GNUNET_SCHEDULER_Task cont, void *cont_cls) 729 uint16_t command,
730 uint16_t variable,
731 GNUNET_SCHEDULER_Task cont,
732 void *cont_cls)
1240{ 733{
1241 if ((get_handle->route_handle->dht_handle->current != NULL) && 734 struct GNUNET_DHT_ControlMessage *msg;
1242 (get_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING)) 735 struct PendingMessage *pending;
1243 {
1244 if (cont != NULL)
1245 {
1246 GNUNET_SCHEDULER_add_continuation (get_handle->route_handle->dht_handle->sched, cont, cont_cls,
1247 GNUNET_SCHEDULER_REASON_TIMEOUT);
1248 }
1249 return;
1250 }
1251 736
1252#if DEBUG_DHT_API 737 pending = GNUNET_malloc (sizeof (struct PendingMessage) +
1253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 738 sizeof(struct GNUNET_DHT_ControlMessage));
1254 "`%s': Removing pending get request with key %s, uid %llu\n", 739 msg = (struct GNUNET_DHT_ControlMessage*) &pending[1];
1255 "DHT API", GNUNET_h2s (&get_handle->route_handle->key), 740 pending->msg = &msg->header;
1256 get_handle->route_handle->uid); 741 msg->header.size = htons (sizeof(struct GNUNET_DHT_ControlMessage));
1257#endif 742 msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CONTROL);
1258 GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls); 743 msg->command = htons (command);
1259 GNUNET_free (get_handle); 744 msg->variable = htons (variable);
745 pending->free_on_send = GNUNET_YES;
746 pending->cont = cont;
747 pending->cont_cls = cont_cls;
748 pending->in_pending_queue = GNUNET_YES;
749 GNUNET_CONTAINER_DLL_insert (handle->pending_head,
750 handle->pending_tail,
751 pending);
752 process_pending_messages (handle);
1260} 753}
1261 754
1262 755
1263/** 756/**
1264 * Perform an asynchronous FIND PEER operation on the DHT. 757 * Send a message to the DHT telling it to issue a single find
758 * peer request using the peers unique identifier as key. This
759 * is used to fill the routing table, and is normally controlled
760 * by the DHT itself. However, for testing and perhaps more
761 * close control over the DHT, this can be explicitly managed.
1265 * 762 *
1266 * @param handle handle to the DHT service 763 * @param handle handle to the DHT service
1267 * @param timeout timeout for this request to be sent to the 764 * @param cont continuation to call when done (transmitting request to service)
1268 * service 765 * @param cont_cls closure for cont
1269 * @param options routing options for this message
1270 * @param key the key to look up
1271 * @param proc function to call on each result
1272 * @param proc_cls closure for proc
1273 * @param cont continuation to call once message sent
1274 * @param cont_cls closure for continuation
1275 *
1276 * @return handle to stop the async get, NULL on error
1277 */ 766 */
1278struct GNUNET_DHT_FindPeerHandle * 767void
1279GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle, 768GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
1280 struct GNUNET_TIME_Relative timeout, 769 GNUNET_SCHEDULER_Task cont,
1281 enum GNUNET_DHT_RouteOption options, 770 void *cont_cls)
1282 const GNUNET_HashCode * key,
1283 GNUNET_DHT_FindPeerProcessor proc,
1284 void *proc_cls,
1285 GNUNET_SCHEDULER_Task cont,
1286 void *cont_cls)
1287{ 771{
1288 struct GNUNET_DHT_FindPeerHandle *find_peer_handle; 772 send_control_message (handle,
1289 struct GNUNET_DHT_FindPeerMessage find_peer_msg; 773 GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0,
774 cont, cont_cls);
775}
1290 776
1291 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
1292 return NULL;
1293 777
1294 find_peer_handle =
1295 GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
1296 find_peer_handle->find_peer_context.proc = proc;
1297 find_peer_handle->find_peer_context.proc_cls = proc_cls;
1298 778
1299#if DEBUG_DHT_API 779#if HAVE_MALICIOUS
1300 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1301 "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
1302 "FIND PEER", GNUNET_h2s (key));
1303#endif
1304
1305 find_peer_msg.header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
1306 find_peer_msg.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
1307 find_peer_handle->route_handle =
1308 GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg.header,
1309 timeout, &find_peer_reply_iterator,
1310 find_peer_handle, cont, cont_cls);
1311 return find_peer_handle;
1312}
1313 780
1314/** 781/**
1315 * Stop async find peer. Frees associated resources. 782 * Send a message to the DHT telling it to start issuing random GET
783 * requests every 'frequency' milliseconds.
1316 * 784 *
1317 * @param find_peer_handle GET operation to stop. 785 * @param handle handle to the DHT service
1318 * @param cont continuation to call once this message is sent to the service or times out 786 * @param frequency delay between sending malicious messages
1319 * @param cont_cls closure for the continuation
1320 */ 787 */
1321void 788void
1322GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, 789GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle,
1323 GNUNET_SCHEDULER_Task cont, void *cont_cls) 790 struct GNUNET_TIME_Relative frequency)
1324{ 791{
1325 if ((find_peer_handle->route_handle->dht_handle->current != NULL) && 792 if (frequency.value > UINT16_MAX)
1326 (find_peer_handle->route_handle->dht_handle->retransmit_stage != DHT_RETRANSMITTING))
1327 { 793 {
1328 if (cont != NULL) 794 GNUNET_break (0);
1329 {
1330 GNUNET_SCHEDULER_add_continuation (find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
1331 GNUNET_SCHEDULER_REASON_TIMEOUT);
1332 }
1333 return; 795 return;
1334 } 796 }
1335 797 send_control_message (handle,
1336#if DEBUG_DHT_API 798 GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET, frequency.value,
1337 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 799 NULL, NULL);
1338 "`%s': Removing pending `%s' request with key %s, uid %llu\n",
1339 "DHT API", "FIND PEER",
1340 GNUNET_h2s (&find_peer_handle->route_handle->key),
1341 find_peer_handle->route_handle->uid);
1342#endif
1343 GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
1344 GNUNET_free (find_peer_handle);
1345
1346} 800}
1347 801
1348
1349/** 802/**
1350 * Perform a PUT operation storing data in the DHT. 803 * Send a message to the DHT telling it to start issuing random PUT
1351 * 804 * requests every 'frequency' milliseconds.
1352 * @param handle handle to DHT service
1353 * @param key the key to store under
1354 * @param type type of the value
1355 * @param size number of bytes in data; must be less than 64k
1356 * @param data the data to store
1357 * @param exp desired expiration time for the value
1358 * @param timeout how long to wait for transmission of this request
1359 * @param cont continuation to call when done;
1360 * reason will be TIMEOUT on error,
1361 * reason will be PREREQ_DONE on success
1362 * @param cont_cls closure for cont
1363 * 805 *
1364 * @return GNUNET_YES if put message is queued for transmission 806 * @param handle handle to the DHT service
807 * @param frequency delay between sending malicious messages
1365 */ 808 */
1366void 809void
1367GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, 810GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle,
1368 const GNUNET_HashCode * key, 811 struct GNUNET_TIME_Relative frequency)
1369 enum GNUNET_BLOCK_Type type,
1370 uint32_t size,
1371 const char *data,
1372 struct GNUNET_TIME_Absolute exp,
1373 struct GNUNET_TIME_Relative timeout,
1374 GNUNET_SCHEDULER_Task cont, void *cont_cls)
1375{ 812{
1376 struct GNUNET_DHT_PutMessage *put_msg; 813 if (frequency.value > UINT16_MAX)
1377 struct GNUNET_DHT_RouteHandle *put_route;
1378 size_t msize;
1379
1380 if ((handle->current != NULL) && (handle->retransmit_stage != DHT_RETRANSMITTING))
1381 { 814 {
1382 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "handle->current is not null!\n"); 815 GNUNET_break (0);
1383 if (cont != NULL)
1384 {
1385 GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1386 GNUNET_SCHEDULER_REASON_TIMEOUT);
1387 }
1388 return; 816 return;
1389 } 817 }
818 send_control_message (handle,
819 GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT, frequency.value,
820 NULL, NULL);
821}
1390 822
1391#if DEBUG_DHT_API
1392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1393 "`%s': Inserting pending put request with key %s\n", "DHT API",
1394 GNUNET_h2s (key));
1395#endif
1396
1397 msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
1398 put_msg = GNUNET_malloc (msize);
1399 put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
1400 put_msg->header.size = htons (msize);
1401 put_msg->type = htons (type);
1402 put_msg->data_size = htons (size);
1403 put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
1404 memcpy (&put_msg[1], data, size);
1405
1406 put_route = GNUNET_DHT_route_start (handle, key, DEFAULT_PUT_REPLICATION, 0, &put_msg->header, timeout, NULL,
1407 NULL, cont, cont_cls);
1408
1409 if (put_route == NULL) /* Route start failed! */
1410 {
1411 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "route start for PUT failed!\n");
1412 if (cont != NULL)
1413 {
1414 GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
1415 GNUNET_SCHEDULER_REASON_TIMEOUT);
1416 }
1417 }
1418 else
1419 {
1420 GNUNET_free(put_route);
1421 }
1422 823
1423 GNUNET_free (put_msg); 824/**
825 * Send a message to the DHT telling it to start dropping
826 * all requests received.
827 *
828 * @param handle handle to the DHT service
829 */
830void
831GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle)
832{
833 send_control_message (handle,
834 GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP, 0,
835 NULL, NULL);
1424} 836}
1425 837
838#endif
839
1426/* end of dht_api.c */ 840/* end of dht_api.c */
diff --git a/src/dht/gnunet-dht-get-peer.c b/src/dht/gnunet-dht-get-peer.c
index 04959792b..f6974cddb 100644
--- a/src/dht/gnunet-dht-get-peer.c
+++ b/src/dht/gnunet-dht-get-peer.c
@@ -78,22 +78,30 @@ static unsigned int result_count;
78static int ret; 78static int ret;
79 79
80static void 80static void
81shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 81shutdown_task (void *cls,
82 const struct GNUNET_SCHEDULER_TaskContext *tc)
82{ 83{
83
84 if (dht_handle != NULL) 84 if (dht_handle != NULL)
85 GNUNET_DHT_disconnect (dht_handle); 85 {
86 86 GNUNET_DHT_disconnect (dht_handle);
87 dht_handle = NULL; 87 dht_handle = NULL;
88 }
89 fprintf (stderr,
90 _("Found %u peers\n"),
91 result_count);
88} 92}
89 93
94
90static void 95static void
91cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 96cleanup_task (void *cls,
97 const struct GNUNET_SCHEDULER_TaskContext *tc)
92{ 98{
93 if (find_peer_handle != NULL) 99 if (find_peer_handle != NULL)
94 GNUNET_DHT_find_peer_stop (find_peer_handle, &shutdown_task, NULL); 100 {
95 else 101 GNUNET_DHT_find_peer_stop (find_peer_handle);
96 GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL); 102 find_peer_handle = NULL;
103 }
104 GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
97} 105}
98 106
99/** 107/**
@@ -103,46 +111,23 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
103 * @param cls closure (NULL) 111 * @param cls closure (NULL)
104 * @param hello the response message, a HELLO 112 * @param hello the response message, a HELLO
105 */ 113 */
106void find_peer_processor (void *cls, 114static void
107 const struct GNUNET_HELLO_Message *hello) 115find_peer_processor (void *cls,
116 const struct GNUNET_HELLO_Message *hello)
108{ 117{
109 struct GNUNET_PeerIdentity peer; 118 struct GNUNET_PeerIdentity peer;
119
110 if (GNUNET_OK == GNUNET_HELLO_get_id(hello, &peer)) 120 if (GNUNET_OK == GNUNET_HELLO_get_id(hello, &peer))
111 { 121 {
112 result_count++; 122 result_count++;
113 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
114 "test_find_peer_processor called (peer `%s'), total results %d!\n", GNUNET_i2s(&peer), result_count);
115 }
116}
117
118
119/**
120 * Signature of the main function of a task.
121 *
122 * @param cls closure
123 * @param tc context information (why was this task triggered now)
124 */
125void
126message_sent_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
127{
128 if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT)
129 {
130 if (verbose)
131 fprintf (stderr,
132 "Failed to send FIND PEER request to service, quitting.\n");
133 ret = 1;
134 GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
135 }
136 else
137 {
138 if (verbose) 123 if (verbose)
139 fprintf (stderr, "FIND PEER request sent, awaiting results!\n"); 124 fprintf (stderr,
140 GNUNET_SCHEDULER_add_delayed (sched, 125 _("Found peer `%s'\n"),
141 GNUNET_TIME_absolute_get_remaining 126 GNUNET_i2s (&peer));
142 (absolute_timeout), &cleanup_task, NULL);
143 } 127 }
144} 128}
145 129
130
146/** 131/**
147 * Main function that will be run by the scheduler. 132 * Main function that will be run by the scheduler.
148 * 133 *
@@ -194,13 +179,19 @@ run (void *cls,
194 179
195 find_peer_handle = GNUNET_DHT_find_peer_start (dht_handle, 180 find_peer_handle = GNUNET_DHT_find_peer_start (dht_handle,
196 timeout, 181 timeout,
197 0,
198 &key, 182 &key,
183 GNUNET_DHT_RO_NONE,
199 &find_peer_processor, 184 &find_peer_processor,
200 NULL,
201 &message_sent_cont,
202 NULL); 185 NULL);
203 186 if (NULL == find_peer_handle)
187 {
188 GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
189 return;
190 }
191 GNUNET_SCHEDULER_add_delayed (sched,
192 GNUNET_TIME_absolute_get_remaining
193 (absolute_timeout),
194 &cleanup_task, NULL);
204} 195}
205 196
206 197
@@ -234,8 +225,10 @@ main (int argc, char *const *argv)
234 return (GNUNET_OK == 225 return (GNUNET_OK ==
235 GNUNET_PROGRAM_run (argc, 226 GNUNET_PROGRAM_run (argc,
236 argv, 227 argv,
237 "gnunet-dht-get", 228 "gnunet-dht-get-peer",
238 gettext_noop 229 gettext_noop
239 ("Issue a GET request to the GNUnet DHT, prints results."), 230 ("Issue a GET PEER request to the GNUnet DHT, print results."),
240 options, &run, NULL)) ? ret : 1; 231 options, &run, NULL)) ? ret : 1;
241} 232}
233
234/* end of gnunet-dht-get-peer */
diff --git a/src/dht/gnunet-dht-get.c b/src/dht/gnunet-dht-get.c
index 023188f35..fabe4aeb9 100644
--- a/src/dht/gnunet-dht-get.c
+++ b/src/dht/gnunet-dht-get.c
@@ -81,23 +81,27 @@ static unsigned int result_count;
81 */ 81 */
82static int ret; 82static int ret;
83 83
84
84static void 85static void
85shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 86shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
86{ 87{
87
88 if (dht_handle != NULL) 88 if (dht_handle != NULL)
89 GNUNET_DHT_disconnect (dht_handle); 89 {
90 90 GNUNET_DHT_disconnect (dht_handle);
91 dht_handle = NULL; 91 dht_handle = NULL;
92 }
92} 93}
93 94
95
94static void 96static void
95cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 97cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
96{ 98{
97 if (get_handle != NULL) 99 if (get_handle != NULL)
98 GNUNET_DHT_get_stop (get_handle, &shutdown_task, NULL); 100 {
99 else 101 GNUNET_DHT_get_stop (get_handle);
100 GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL); 102 get_handle = NULL;
103 }
104 GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
101} 105}
102 106
103 107
@@ -108,6 +112,10 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
108 * @param cls closure 112 * @param cls closure
109 * @param exp when will this value expire 113 * @param exp when will this value expire
110 * @param key key of the result 114 * @param key key of the result
115 * @param get_path NULL-terminated array of pointers
116 * to the peers on reverse GET path (or NULL if not recorded)
117 * @param put_path NULL-terminated array of pointers
118 * to the peers on the PUT path (or NULL if not recorded)
111 * @param type type of the result 119 * @param type type of the result
112 * @param size number of bytes in data 120 * @param size number of bytes in data
113 * @param data pointer to the result data 121 * @param data pointer to the result data
@@ -116,39 +124,17 @@ void
116get_result_iterator (void *cls, 124get_result_iterator (void *cls,
117 struct GNUNET_TIME_Absolute exp, 125 struct GNUNET_TIME_Absolute exp,
118 const GNUNET_HashCode * key, 126 const GNUNET_HashCode * key,
119 uint32_t type, uint32_t size, const void *data) 127 const struct GNUNET_PeerIdentity * const *get_path,
128 const struct GNUNET_PeerIdentity * const *put_path,
129 enum GNUNET_BLOCK_Type type,
130 size_t size,
131 const void *data)
120{ 132{
121 fprintf (stdout, "Result %d, type %d:\n%.*s\n", result_count, type, size, 133 fprintf (stdout, "Result %d, type %d:\n%.*s\n", result_count, type, size,
122 (char *) data); 134 (char *) data);
123 result_count++; 135 result_count++;
124} 136}
125 137
126/**
127 * Signature of the main function of a task.
128 *
129 * @param cls closure
130 * @param tc context information (why was this task triggered now)
131 */
132void
133message_sent_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
134{
135 if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT)
136 {
137 if (verbose)
138 fprintf (stderr,
139 "Failed to send GET request to service, quitting.\n");
140 ret = 1;
141 GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
142 }
143 else
144 {
145 if (verbose)
146 fprintf (stderr, "GET request sent, awaiting results!\n");
147 GNUNET_SCHEDULER_add_delayed (sched,
148 GNUNET_TIME_absolute_get_remaining
149 (absolute_timeout), &cleanup_task, NULL);
150 }
151}
152 138
153/** 139/**
154 * Main function that will be run by the scheduler. 140 * Main function that will be run by the scheduler.
@@ -198,8 +184,17 @@ run (void *cls,
198 184
199 if (verbose) 185 if (verbose)
200 fprintf (stderr, "Issuing GET request for %s!\n", query_key); 186 fprintf (stderr, "Issuing GET request for %s!\n", query_key);
201 get_handle = GNUNET_DHT_get_start (dht_handle, timeout, query_type, &key, 187 GNUNET_SCHEDULER_add_delayed (sched,
202 &get_result_iterator, NULL, &message_sent_cont, NULL); 188 GNUNET_TIME_absolute_get_remaining
189 (absolute_timeout), &cleanup_task, NULL);
190 get_handle = GNUNET_DHT_get_start (dht_handle,
191 timeout,
192 query_type,
193 &key,
194 GNUNET_DHT_RO_NONE,
195 NULL, 0,
196 NULL, 0,
197 &get_result_iterator, NULL);
203 198
204} 199}
205 200
@@ -242,3 +237,5 @@ main (int argc, char *const *argv)
242 ("Issue a GET request to the GNUnet DHT, prints results."), 237 ("Issue a GET request to the GNUnet DHT, prints results."),
243 options, &run, NULL)) ? ret : 1; 238 options, &run, NULL)) ? ret : 1;
244} 239}
240
241/* end of gnunet-dht-get.c */
diff --git a/src/dht/gnunet-dht-put.c b/src/dht/gnunet-dht-put.c
index 9d3b2707b..9f7091b84 100644
--- a/src/dht/gnunet-dht-put.c
+++ b/src/dht/gnunet-dht-put.c
@@ -79,11 +79,11 @@ static char *data;
79static void 79static void
80shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 80shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
81{ 81{
82
83 if (dht_handle != NULL) 82 if (dht_handle != NULL)
84 GNUNET_DHT_disconnect (dht_handle); 83 {
85 84 GNUNET_DHT_disconnect (dht_handle);
86 dht_handle = NULL; 85 dht_handle = NULL;
86 }
87} 87}
88 88
89/** 89/**
@@ -165,8 +165,14 @@ run (void *cls,
165 fprintf (stderr, "Issuing put request for `%s' with data `%s'!\n", 165 fprintf (stderr, "Issuing put request for `%s' with data `%s'!\n",
166 query_key, data); 166 query_key, data);
167 167
168 GNUNET_DHT_put (dht_handle, &key, query_type, strlen (data), data, 168 GNUNET_DHT_put (dht_handle,
169 expiration, timeout, &message_sent_cont, NULL); 169 &key,
170 GNUNET_DHT_RO_NONE,
171 query_type,
172 strlen (data), data,
173 expiration,
174 timeout,
175 &message_sent_cont, NULL);
170 176
171} 177}
172 178
@@ -215,3 +221,5 @@ main (int argc, char *const *argv)
215 ("Issue a PUT request to the GNUnet DHT insert DATA under KEY."), 221 ("Issue a PUT request to the GNUnet DHT insert DATA under KEY."),
216 options, &run, NULL)) ? ret : 1; 222 options, &run, NULL)) ? ret : 1;
217} 223}
224
225/* end of gnunet-dht-put.c */