aboutsummaryrefslogtreecommitdiff
path: root/src/dht
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht')
-rw-r--r--src/dht/dht.h42
-rw-r--r--src/dht/dht_api.c551
-rw-r--r--src/dht/gnunet-service-dht.c179
3 files changed, 480 insertions, 292 deletions
diff --git a/src/dht/dht.h b/src/dht/dht.h
index 82b0df9a2..0cfd9b3bf 100644
--- a/src/dht/dht.h
+++ b/src/dht/dht.h
@@ -35,6 +35,24 @@ typedef void (*GNUNET_DHT_MessageReceivedHandler) (void *cls,
35/** 35/**
36 * Generic DHT message, wrapper for other message types 36 * Generic DHT message, wrapper for other message types
37 */ 37 */
38struct GNUNET_DHT_StopMessage
39{
40 /**
41 * Type: GNUNET_MESSAGE_TYPE_DHT_MESSAGE
42 */
43 struct GNUNET_MessageHeader header;
44
45 /**
46 * Unique ID identifying this request
47 */
48 uint64_t unique_id;
49
50};
51
52
53/**
54 * Generic DHT message, wrapper for other message types
55 */
38struct GNUNET_DHT_Message 56struct GNUNET_DHT_Message
39{ 57{
40 /** 58 /**
@@ -58,12 +76,18 @@ struct GNUNET_DHT_Message
58 uint16_t options; 76 uint16_t options;
59 77
60 /** 78 /**
61 * Is this message uniquely identified? If so it has 79 * Is this message uniquely identified? If so it will
62 * a unique_id appended to it. 80 * be fire and forget, if not we will wait for a receipt
81 * from the service.
63 */ 82 */
64 /* uint16_t unique; I don't think we need this, it should be held in the encapsulated message */ 83 uint16_t unique;
84
85
86 /**
87 * Unique ID identifying this request
88 */
89 uint64_t unique_id;
65 90
66 /* uint64_t unique_id*/
67 /* */ 91 /* */
68 /* GNUNET_MessageHeader *enc actual DHT message, copied to end of this dealy do */ 92 /* GNUNET_MessageHeader *enc actual DHT message, copied to end of this dealy do */
69 93
@@ -112,11 +136,6 @@ struct GNUNET_DHT_GetMessage
112 */ 136 */
113 size_t type; 137 size_t type;
114 138
115 /**
116 * The key to search for
117 */
118 GNUNET_HashCode key;
119
120}; 139};
121 140
122/** 141/**
@@ -156,11 +175,6 @@ struct GNUNET_DHT_FindPeerMessage
156 */ 175 */
157 struct GNUNET_MessageHeader header; 176 struct GNUNET_MessageHeader header;
158 177
159 /**
160 * The key being looked up
161 */
162 GNUNET_HashCode key;
163
164}; 178};
165 179
166/** 180/**
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index a55508b07..7ded088d8 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -23,6 +23,13 @@
23 * @brief library to access the DHT service 23 * @brief library to access the DHT service
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * @author Nathan Evans 25 * @author Nathan Evans
26 *
27 * TODO: Only allow a single message until confirmed as received by
28 * the service. For put messages call continuation as soon as
29 * receipt acknowledged (then remove), for GET or other messages
30 * only call continuation when data received.
31 * Add unique identifier to message types requesting data to be
32 * returned.
26 */ 33 */
27#include "platform.h" 34#include "platform.h"
28#include "gnunet_bandwidth_lib.h" 35#include "gnunet_bandwidth_lib.h"
@@ -39,13 +46,10 @@
39 46
40#define DEBUG_DHT_API GNUNET_YES 47#define DEBUG_DHT_API GNUNET_YES
41 48
42struct PendingMessages 49#define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
43{
44 /**
45 * Linked list of pending messages
46 */
47 struct PendingMessages *next;
48 50
51struct PendingMessage
52{
49 /** 53 /**
50 * Message that is pending 54 * Message that is pending
51 */ 55 */
@@ -56,9 +60,110 @@ struct PendingMessages
56 */ 60 */
57 struct GNUNET_TIME_Relative timeout; 61 struct GNUNET_TIME_Relative timeout;
58 62
63 /**
64 * Continuation to call on message send
65 * or message receipt confirmation
66 */
67 GNUNET_DHT_MessageCallback cont;
68
69 /**
70 * Continuation closure
71 */
72 void *cont_cls;
73
74 /**
75 * Whether or not to await verification the message
76 * was received by the service
77 */
78 size_t is_unique;
79
80 /**
81 * Unique ID for this request
82 */
83 uint64_t unique_id;
84
85};
86
87struct GNUNET_DHT_GetContext
88{
89
90
91 /**
92 * Iterator to call on data receipt
93 */
94 GNUNET_DHT_GetIterator iter;
95
96 /**
97 * Closure for the iterator callback
98 */
99 void *iter_cls;
100
59}; 101};
60 102
61/** 103/**
104 * Handle to control a unique operation (one that is
105 * expected to return results)
106 */
107struct GNUNET_DHT_RouteHandle
108{
109
110 /**
111 * Unique identifier for this request (for key collisions)
112 */
113 uint64_t uid;
114
115 /**
116 * Key that this get request is for
117 */
118 GNUNET_HashCode key;
119
120 /**
121 * Iterator to call on data receipt
122 */
123 GNUNET_DHT_ReplyProcessor iter;
124
125 /**
126 * Closure for the iterator callback
127 */
128 void *iter_cls;
129
130 /**
131 * Main handle to this DHT api
132 */
133 struct GNUNET_DHT_Handle *dht_handle;
134};
135
136/**
137 * Handle for a non unique request, holds callback
138 * which needs to be called before we allow other
139 * messages to be processed and sent to the DHT service
140 */
141struct GNUNET_DHT_NonUniqueHandle
142{
143 /**
144 * Key that this get request is for
145 */
146 GNUNET_HashCode key;
147
148 /**
149 * Type of data get request was for
150 */
151 uint32_t type;
152
153 /**
154 * Continuation to call on service
155 * confirmation of message receipt.
156 */
157 GNUNET_SCHEDULER_Task cont;
158
159 /**
160 * Send continuation cls
161 */
162 void *cont_cls;
163};
164
165
166/**
62 * Connection to the DHT service. 167 * Connection to the DHT service.
63 */ 168 */
64struct GNUNET_DHT_Handle 169struct GNUNET_DHT_Handle
@@ -84,25 +189,24 @@ struct GNUNET_DHT_Handle
84 struct GNUNET_CLIENT_TransmitHandle *th; 189 struct GNUNET_CLIENT_TransmitHandle *th;
85 190
86 /** 191 /**
87 * List of the currently pending messages for the DHT service. 192 * Message we are currently sending, only allow
88 */ 193 * a single message to be queued. If not unique
89 struct PendingMessages *pending_list; 194 * (typically a put request), await a confirmation
90 195 * from the service that the message was received.
91 /** 196 * If unique, just fire and forget.
92 * Message we are currently sending.
93 */ 197 */
94 struct PendingMessages *current; 198 struct PendingMessage *current;
95 199
96 /** 200 /**
97 * Hash map containing the current outstanding get requests 201 * Hash map containing the current outstanding unique requests
98 */ 202 */
99 struct GNUNET_CONTAINER_MultiHashMap *outstanding_get_requests; 203 struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
100 204
101 /** 205 /**
102 * Hash map containing the current outstanding put requests, awaiting 206 * Non unique handle. If set don't schedule another non
103 * a response 207 * unique request.
104 */ 208 */
105 struct GNUNET_CONTAINER_MultiHashMap *outstanding_put_requests; 209 struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
106 210
107 /** 211 /**
108 * Kill off the connection and any pending messages. 212 * Kill off the connection and any pending messages.
@@ -116,6 +220,27 @@ static struct GNUNET_TIME_Relative default_request_timeout;
116/* Forward declaration */ 220/* Forward declaration */
117static void process_pending_message(struct GNUNET_DHT_Handle *handle); 221static void process_pending_message(struct GNUNET_DHT_Handle *handle);
118 222
223static GNUNET_HashCode * hash_from_uid(uint64_t uid)
224{
225 int count;
226 int remaining;
227 GNUNET_HashCode *hash;
228 hash = GNUNET_malloc(sizeof(GNUNET_HashCode));
229 count = 0;
230
231 while (count < sizeof(GNUNET_HashCode))
232 {
233 remaining = sizeof(GNUNET_HashCode) - count;
234 if (remaining > sizeof(uid))
235 remaining = sizeof(uid);
236
237 memcpy(hash, &uid, remaining);
238 count += remaining;
239 }
240
241 return hash;
242}
243
119/** 244/**
120 * Handler for messages received from the DHT service 245 * Handler for messages received from the DHT service
121 * a demultiplexer which handles numerous message types 246 * a demultiplexer which handles numerous message types
@@ -124,9 +249,52 @@ static void process_pending_message(struct GNUNET_DHT_Handle *handle);
124void service_message_handler (void *cls, 249void service_message_handler (void *cls,
125 const struct GNUNET_MessageHeader *msg) 250 const struct GNUNET_MessageHeader *msg)
126{ 251{
127 252 struct GNUNET_DHT_Handle *handle = cls;
253 struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *)msg;
254 struct GNUNET_MessageHeader *enc_msg;
255 struct GNUNET_DHT_RouteHandle *route_handle;
256 GNUNET_HashCode *uid_hash;
257 size_t enc_size;
128 /* TODO: find out message type, handle callbacks for different types of messages. 258 /* TODO: find out message type, handle callbacks for different types of messages.
129 * Should be a put acknowledgment, get data or find node result. */ 259 * Should be a non unique acknowledgment, or unique result. */
260
261#if DEBUG_DHT_API
262 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
263 "`%s': Received response to message (uid %llu)\n", "DHT API", ntohl(dht_msg->unique_id));
264#endif
265
266 if (ntohs(dht_msg->unique))
267 {
268 uid_hash = hash_from_uid(ntohl(dht_msg->unique_id));
269
270 route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash);
271 if (route_handle == NULL) /* We have no recollection of this request */
272 {
273#if DEBUG_DHT_API
274 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
275 "`%s': Received response to message (uid %llu), but have no recollection of it!\n", "DHT API", ntohl(dht_msg->unique_id));
276#endif
277 }
278 else
279 {
280 enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message);
281 GNUNET_assert(enc_size > 0);
282 enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1];
283 route_handle->iter(route_handle->iter_cls, enc_msg);
284 }
285 }
286 else
287 {
288 if (handle->current->unique_id == ntohl(dht_msg->unique_id))
289 {
290 handle->current->cont(handle->current->cont_cls, GNUNET_OK);
291 GNUNET_free(handle->current->msg);
292 handle->current = NULL;
293 GNUNET_free(handle->current);
294 }
295 }
296
297
130} 298}
131 299
132 300
@@ -151,14 +319,14 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
151 default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5); 319 default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
152 handle->cfg = cfg; 320 handle->cfg = cfg;
153 handle->sched = sched; 321 handle->sched = sched;
154 handle->pending_list = NULL; 322
155 handle->current = NULL; 323 handle->current = NULL;
156 handle->do_destroy = GNUNET_NO; 324 handle->do_destroy = GNUNET_NO;
157 handle->th = NULL; 325 handle->th = NULL;
158 326
159 handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg); 327 handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg);
160 handle->outstanding_get_requests = GNUNET_CONTAINER_multihashmap_create(100); /* FIXME: better number */ 328 handle->outstanding_requests = GNUNET_CONTAINER_multihashmap_create(ht_len);
161 handle->outstanding_put_requests = GNUNET_CONTAINER_multihashmap_create(100); /* FIXME: better number */ 329
162 if (handle->client == NULL) 330 if (handle->client == NULL)
163 return NULL; 331 return NULL;
164#if DEBUG_DHT_API 332#if DEBUG_DHT_API
@@ -181,7 +349,6 @@ GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
181void 349void
182GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) 350GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
183{ 351{
184 struct PendingMessages *pos;
185#if DEBUG_DHT_API 352#if DEBUG_DHT_API
186 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
187 "`%s': Called GNUNET_DHT_disconnect\n", "DHT API"); 354 "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
@@ -196,11 +363,6 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
196 if (handle->current != NULL) /* We are trying to send something now, clean it up */ 363 if (handle->current != NULL) /* We are trying to send something now, clean it up */
197 GNUNET_free(handle->current); 364 GNUNET_free(handle->current);
198 365
199 while (NULL != (pos = handle->pending_list)) /* Remove all pending sends from the list */
200 {
201 handle->pending_list = pos->next;
202 GNUNET_free(pos);
203 }
204 if (handle->client != NULL) /* Finally, disconnect from the service */ 366 if (handle->client != NULL) /* Finally, disconnect from the service */
205 { 367 {
206 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); 368 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
@@ -212,96 +374,26 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
212 374
213 375
214/** 376/**
215 * Handle to control a GET operation.
216 */
217struct GNUNET_DHT_GetHandle
218{
219
220 /**
221 * Key that this get request is for
222 */
223 GNUNET_HashCode key;
224
225 /**
226 * Type of data get request was for
227 */
228 uint32_t type;
229
230 /**
231 * Iterator to call on data receipt
232 */
233 GNUNET_DHT_Iterator iter;
234
235 /**
236 * Closure for the iterator callback
237 */
238 void *iter_cls;
239
240 /**
241 * Main handle to this DHT api
242 */
243 struct GNUNET_DHT_Handle *dht_handle;
244};
245
246/**
247 * Handle for a PUT request, holds callback
248 */
249struct GNUNET_DHT_PutHandle
250{
251 /**
252 * Key that this get request is for
253 */
254 GNUNET_HashCode key;
255
256 /**
257 * Type of data get request was for
258 */
259 uint32_t type;
260
261 /**
262 * Continuation to call on put send
263 */
264 GNUNET_SCHEDULER_Task cont;
265
266 /**
267 * Send continuation cls
268 */
269 void *cont_cls;
270};
271
272/**
273 * Send complete (or failed), schedule next (or don't) 377 * Send complete (or failed), schedule next (or don't)
274 */ 378 */
275static void 379static void
276finish (struct GNUNET_DHT_Handle *handle, int code) 380finish (struct GNUNET_DHT_Handle *handle, int code)
277{ 381{
278 /* TODO: if code is not GNUNET_OK, do something! */ 382 /* TODO: if code is not GNUNET_OK, do something! */
279 struct PendingMessages *pos = handle->current; 383 struct PendingMessage *pos = handle->current;
280 struct GNUNET_DHT_GetMessage *get;
281 struct GNUNET_DHT_PutMessage *put;
282 384
283 GNUNET_assert(pos != NULL); 385 GNUNET_assert(pos != NULL);
284 386
285 switch (ntohs(pos->msg->type)) 387 if (pos->is_unique)
286 { 388 {
287 case GNUNET_MESSAGE_TYPE_DHT_GET: 389 if (pos->cont != NULL)
288 get = (struct GNUNET_DHT_GetMessage *)pos->msg; 390 pos->cont(pos->cont_cls, code);
289 GNUNET_free(get);
290 break;
291 case GNUNET_MESSAGE_TYPE_DHT_PUT:
292 put = (struct GNUNET_DHT_PutMessage *)pos->msg;
293 GNUNET_free(put);
294 break;
295 default:
296 GNUNET_break(0);
297 }
298
299 handle->current = NULL;
300
301 if (code != GNUNET_SYSERR)
302 process_pending_message (handle);
303 391
304 GNUNET_free(pos); 392 GNUNET_free(pos->msg);
393 handle->current = NULL;
394 GNUNET_free(pos);
395 }
396 /* Otherwise we need to wait for a response to this message! */
305} 397}
306 398
307/** 399/**
@@ -389,14 +481,6 @@ static void process_pending_message(struct GNUNET_DHT_Handle *handle)
389 //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */ 481 //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
390 } 482 }
391 483
392 /* schedule next action */
393 handle->current = handle->pending_list;
394 if (NULL == handle->current)
395 {
396 return;
397 }
398 handle->pending_list = handle->pending_list->next;
399 handle->current->next = NULL;
400 484
401 if (NULL == 485 if (NULL ==
402 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client, 486 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
@@ -418,39 +502,120 @@ static void process_pending_message(struct GNUNET_DHT_Handle *handle)
418} 502}
419 503
420/** 504/**
421 * Add a pending message to the linked list of messages which need to be sent 505 * Iterator called on each result obtained from a generic route
506 * operation
507 */
508void get_reply_iterator (void *cls,
509 const struct GNUNET_MessageHeader *reply)
510{
511
512}
513
514/**
515 * Perform an asynchronous FIND_PEER operation on the DHT.
422 * 516 *
423 * @param handle handle to the specified DHT api 517 * @param handle handle to the DHT service
424 * @param msg the message to add to the list 518 * @param key the key to look up
519 * @param desired_replication_level how many peers should ultimately receive
520 * this message (advisory only, target may be too high for the
521 * given DHT or not hit exactly).
522 * @param options options for routing
523 * @param enc send the encapsulated message to a peer close to the key
524 * @param iter function to call on each result, NULL if no replies are expected
525 * @param iter_cls closure for iter
526 * @param timeout when to abort with an error if we fail to get
527 * a confirmation for the PUT from the local DHT service
528 * @param cont continuation to call when done;
529 * reason will be TIMEOUT on error,
530 * reason will be PREREQ_DONE on success
531 * @param cont_cls closure for cont
532 * @return handle to stop the request
425 */ 533 */
426static void add_pending(struct GNUNET_DHT_Handle *handle, struct GNUNET_MessageHeader *msg) 534struct GNUNET_DHT_RouteHandle *
535GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
536 const GNUNET_HashCode *key,
537 unsigned int desired_replication_level,
538 enum GNUNET_DHT_RouteOption options,
539 const struct GNUNET_MessageHeader *enc,
540 struct GNUNET_TIME_Relative timeout,
541 GNUNET_DHT_ReplyProcessor iter,
542 void *iter_cls,
543 GNUNET_DHT_MessageCallback cont,
544 void *cont_cls)
427{ 545{
428 struct PendingMessages *new_message; 546 struct GNUNET_DHT_RouteHandle *route_handle;
429 struct PendingMessages *pos; 547 struct PendingMessage *pending;
430 struct PendingMessages *last; 548 struct GNUNET_DHT_Message *message;
549 size_t is_unique;
550 size_t msize;
551 GNUNET_HashCode *uid_key;
552 int count;
431 553
432 new_message = GNUNET_malloc(sizeof(struct PendingMessages)); 554 is_unique = GNUNET_YES;
433 new_message->msg = msg; 555 if (iter == NULL)
434 new_message->timeout = default_request_timeout; 556 is_unique = GNUNET_NO;
435 557
436 if (handle->pending_list != NULL) 558 route_handle = NULL;
559
560 if (is_unique)
437 { 561 {
438 pos = handle->pending_list; 562 route_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_RouteHandle));
439 while(pos != NULL) 563 memcpy(&route_handle->key, key, sizeof(GNUNET_HashCode));
564 route_handle->iter = iter;
565 route_handle->iter_cls = iter_cls;
566 route_handle->dht_handle = handle;
567 route_handle->uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
568
569 count = 0;
570 uid_key = hash_from_uid(route_handle->uid);
571 /* While we have an outstanding request with the same identifier! */
572 while (GNUNET_CONTAINER_multihashmap_contains(handle->outstanding_requests, uid_key) == GNUNET_YES)
440 { 573 {
441 last = pos; 574 GNUNET_free(uid_key);
442 pos = pos->next; 575 uid_key = hash_from_uid(route_handle->uid);
443 } 576 }
444 new_message->next = last->next; /* Should always be null */ 577 /**
445 last->next = new_message; 578 * Store based on random identifier!
579 */
580 GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
581 msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size) + sizeof(route_handle->uid);
582 GNUNET_free(uid_key);
446 } 583 }
447 else 584 else
448 { 585 {
449 new_message->next = handle->pending_list; /* Will always be null */ 586 msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
450 handle->pending_list = new_message;
451 } 587 }
452 588
589 message = GNUNET_malloc(msize);
590 message->header.size = htons(msize);
591 message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT);
592 memcpy(&message->key, key, sizeof(GNUNET_HashCode));
593 message->options = htons(options);
594 message->desired_replication_level = htons(options);
595 message->unique = htons(is_unique);
596
597 pending = GNUNET_malloc(sizeof(struct PendingMessage));
598 pending->msg = &message->header;
599 pending->timeout = timeout;
600 pending->cont = cont;
601 pending->cont_cls = cont_cls;
602 pending->is_unique = is_unique;
603
604 GNUNET_assert(handle->current == NULL);
605
453 process_pending_message(handle); 606 process_pending_message(handle);
607
608 return route_handle;
609}
610
611void
612GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *fph);
613
614
615void dht_get_processor (void *cls,
616 const struct GNUNET_MessageHeader *reply)
617{
618
454} 619}
455 620
456/** 621/**
@@ -463,48 +628,76 @@ static void add_pending(struct GNUNET_DHT_Handle *handle, struct GNUNET_MessageH
463 * @param iter_cls closure for iter 628 * @param iter_cls closure for iter
464 * @return handle to stop the async get 629 * @return handle to stop the async get
465 */ 630 */
466struct GNUNET_DHT_GetHandle * 631struct GNUNET_DHT_RouteHandle *
467GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, 632GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
633 struct GNUNET_TIME_Relative timeout,
468 uint32_t type, 634 uint32_t type,
469 const GNUNET_HashCode * key, 635 const GNUNET_HashCode * key,
470 GNUNET_DHT_Iterator iter, 636 GNUNET_DHT_GetIterator iter,
471 void *iter_cls) 637 void *iter_cls)
472{ 638{
639 struct GNUNET_DHT_GetContext *get_context;
473 struct GNUNET_DHT_GetMessage *get_msg; 640 struct GNUNET_DHT_GetMessage *get_msg;
474 struct GNUNET_DHT_GetHandle *get_handle;
475
476 get_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_get_requests, key);
477 641
478 if (get_handle != NULL) 642 if (handle->current != NULL) /* Can't send right now, we have a pending message... */
479 { 643 return NULL;
480 /*
481 * A get has been previously sent, return existing handle.
482 * FIXME: should we re-transmit the request to the DHT service?
483 */
484 return get_handle;
485 }
486 644
487 get_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetHandle)); 645 get_context = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetContext));
488 get_handle->type = type; 646 get_context->iter = iter;
489 memcpy(&get_handle->key, key, sizeof(GNUNET_HashCode)); 647 get_context->iter_cls = iter_cls;
490 get_handle->iter = iter;
491 get_handle->iter_cls = iter_cls;
492 648
493#if DEBUG_DHT_API 649#if DEBUG_DHT_API
494 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 650 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
495 "`%s': Inserting pending get request with key %s\n", "DHT API", GNUNET_h2s(key)); 651 "`%s': Inserting pending get request with key %s\n", "DHT API", GNUNET_h2s(key));
496#endif 652#endif
497 GNUNET_CONTAINER_multihashmap_put(handle->outstanding_get_requests, key, get_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
498 653
499 get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage)); 654 get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
500 get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET); 655 get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
501 get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage)); 656 get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
502 get_msg->type = htonl(type); 657 get_msg->type = htonl(type);
503 memcpy(&get_msg->key, key, sizeof(GNUNET_HashCode));
504 658
505 add_pending(handle, &get_msg->header); 659 return GNUNET_DHT_route_start(handle, key, 0, 0, &get_msg->header, timeout, &get_reply_iterator, get_context, NULL, NULL);
506 660
507 return get_handle; 661}
662
663
664void
665GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle)
666{
667 struct PendingMessage *pending;
668 struct GNUNET_DHT_StopMessage *message;
669 size_t msize;
670 GNUNET_HashCode *uid_key;
671
672 msize = sizeof(struct GNUNET_DHT_StopMessage);
673
674 message = GNUNET_malloc(msize);
675 message->header.size = htons(msize);
676 message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP);
677 message->unique_id = htonl(route_handle->uid);
678
679 pending = GNUNET_malloc(sizeof(struct PendingMessage));
680 pending->msg = (struct GNUNET_MessageHeader *)message;
681 pending->timeout = DEFAULT_DHT_TIMEOUT;
682 pending->cont = NULL;
683 pending->cont_cls = NULL;
684 pending->is_unique = GNUNET_NO;
685
686 GNUNET_assert(route_handle->dht_handle->current == NULL);
687
688 process_pending_message(route_handle->dht_handle);
689
690 uid_key = hash_from_uid(route_handle->uid);
691
692 if (GNUNET_CONTAINER_multihashmap_remove(route_handle->dht_handle->outstanding_requests, uid_key, route_handle) != GNUNET_YES)
693 {
694#if DEBUG_DHT_API
695 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
696 "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n", "DHT API", GNUNET_h2s(uid_key), route_handle->uid);
697#endif
698 }
699
700 return;
508} 701}
509 702
510 703
@@ -514,27 +707,33 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
514 * @param record GET operation to stop. 707 * @param record GET operation to stop.
515 */ 708 */
516void 709void
517GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) 710GNUNET_DHT_get_stop (struct GNUNET_DHT_RouteHandle *handle)
518{ 711{
712#if OLDREMOVE
519 struct GNUNET_DHT_GetMessage *get_msg; 713 struct GNUNET_DHT_GetMessage *get_msg;
520 struct GNUNET_DHT_Handle *handle; 714 struct GNUNET_DHT_Handle *handle;
715 GNUNET_HashCode *uid_key;
716#endif
717
718 GNUNET_DHT_route_stop(handle);
719
720#if OLDREMOVE
721 uid_key = hash_from_uid(get_handle->uid);
722 GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_requests, uid_key, get_handle) == GNUNET_YES);
521 723
522 if (handle->do_destroy == GNUNET_NO) 724 if (handle->do_destroy == GNUNET_NO)
523 { 725 {
524 get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage)); 726 get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
525 get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_STOP); 727 get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_STOP);
526 get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage)); 728 get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
527 get_msg->type = htonl(get_handle->type);
528 memcpy(&get_msg->key, &get_handle->key, sizeof(GNUNET_HashCode));
529 729
530 add_pending(handle, &get_msg->header); 730
531 } 731 }
732#endif
532#if DEBUG_DHT_API 733#if DEBUG_DHT_API
533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
534 "`%s': Removing pending get request with key %s\n", "DHT API", GNUNET_h2s(&get_handle->key)); 735 "`%s': Removing pending get request with key %s, uid %llu\n", "DHT API", GNUNET_h2s(&handle->key), handle->uid);
535#endif 736#endif
536 GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_get_requests, &get_handle->key, get_handle) == GNUNET_YES);
537 GNUNET_free(get_handle);
538} 737}
539 738
540 739
@@ -562,44 +761,30 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
562 const char *data, 761 const char *data,
563 struct GNUNET_TIME_Absolute exp, 762 struct GNUNET_TIME_Absolute exp,
564 struct GNUNET_TIME_Relative timeout, 763 struct GNUNET_TIME_Relative timeout,
565 GNUNET_SCHEDULER_Task cont, 764 GNUNET_DHT_MessageCallback cont,
566 void *cont_cls) 765 void *cont_cls)
567{ 766{
568 struct GNUNET_DHT_PutMessage *put_msg; 767 struct GNUNET_DHT_PutMessage *put_msg;
569 struct GNUNET_DHT_PutHandle *put_handle;
570 size_t msize; 768 size_t msize;
571 769
572 put_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_put_requests, key); 770 if (handle->current != NULL)
573
574 if (put_handle != NULL)
575 { 771 {
576 /* 772 cont(cont_cls, GNUNET_SYSERR);
577 * A put has been previously queued, but not yet sent.
578 * FIXME: change the continuation function and callback or something?
579 */
580 return; 773 return;
581 } 774 }
582 775
583 put_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_PutHandle));
584 put_handle->type = type;
585 memcpy(&put_handle->key, key, sizeof(GNUNET_HashCode));
586
587#if DEBUG_DHT_API 776#if DEBUG_DHT_API
588 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 777 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
589 "`%s': Inserting pending put request with key %s\n", "DHT API", GNUNET_h2s(key)); 778 "`%s': Inserting pending put request with key %s\n", "DHT API", GNUNET_h2s(key));
590#endif 779#endif
591 780
592 GNUNET_CONTAINER_multihashmap_put(handle->outstanding_put_requests, key, put_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
593
594 msize = sizeof(struct GNUNET_DHT_PutMessage) + size; 781 msize = sizeof(struct GNUNET_DHT_PutMessage) + size;
595 put_msg = GNUNET_malloc(msize); 782 put_msg = GNUNET_malloc(msize);
596 put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT); 783 put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
597 put_msg->header.size = htons(msize); 784 put_msg->header.size = htons(msize);
598 put_msg->type = htonl(type); 785 put_msg->type = htonl(type);
599 memcpy(&put_msg->key, key, sizeof(GNUNET_HashCode));
600 memcpy(&put_msg[1], data, size); 786 memcpy(&put_msg[1], data, size);
601 787
602 add_pending(handle, &put_msg->header); 788 GNUNET_DHT_route_start(handle, key, 0, 0, &put_msg->header, timeout, NULL, NULL, cont, cont_cls);
603 789
604 return;
605} 790}
diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c
index 5bbb2e739..215d39c44 100644
--- a/src/dht/gnunet-service-dht.c
+++ b/src/dht/gnunet-service-dht.c
@@ -88,42 +88,15 @@ struct ClientList
88/** 88/**
89 * Server handler for initiating local dht get requests 89 * Server handler for initiating local dht get requests
90 */ 90 */
91static void handle_dht_get (void *cls, struct GNUNET_SERVER_Client * client, 91static void handle_dht_plugin_message (void *cls, struct GNUNET_SERVER_Client * client,
92 const struct GNUNET_MessageHeader *message); 92 const struct GNUNET_MessageHeader *message);
93
94/**
95 * Server handler for stopping local dht get requests
96 */
97static void handle_dht_get_stop (void *cls, struct GNUNET_SERVER_Client * client,
98 const struct GNUNET_MessageHeader *message);
99
100/**
101 * Server handler for initiating local dht find peer requests
102 */
103static void handle_dht_find_peer (void *cls, struct GNUNET_SERVER_Client *
104 client, const struct GNUNET_MessageHeader *
105 message);
106
107/**
108 * Server handler for stopping local dht find peer requests
109 */
110static void handle_dht_find_peer_stop (void *cls, struct GNUNET_SERVER_Client *
111 client, const struct GNUNET_MessageHeader *
112 message);
113
114/**
115 * Server handler for initiating local dht put requests
116 */
117static void handle_dht_put (void *cls, struct GNUNET_SERVER_Client * client,
118 const struct GNUNET_MessageHeader *message);
119
120 93
121static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { 94static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
122 {&handle_dht_get, NULL, GNUNET_MESSAGE_TYPE_DHT_GET, 0}, 95 {&handle_dht_plugin_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0},
123 {&handle_dht_get_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_GET_STOP, 0}, 96/* {&handle_dht_get_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_GET_STOP, 0},
124 {&handle_dht_put, NULL, GNUNET_MESSAGE_TYPE_DHT_PUT, 0}, 97 {&handle_dht_put, NULL, GNUNET_MESSAGE_TYPE_DHT_PUT, 0},
125 {&handle_dht_find_peer, NULL, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0}, 98 {&handle_dht_find_peer, NULL, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0},
126 {&handle_dht_find_peer_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_STOP, 0}, 99 {&handle_dht_find_peer_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_STOP, 0},*/
127 {NULL, NULL, 0, 0} 100 {NULL, NULL, 0, 0}
128}; 101};
129 102
@@ -163,18 +136,16 @@ static struct GNUNET_CORE_MessageHandler core_handlers[] = {
163}; 136};
164 137
165 138
139
166/** 140/**
167 * Server handler for initiating local dht get requests 141 * Server handler for initiating local dht get requests
168 */ 142 */
169static void handle_dht_get (void *cls, struct GNUNET_SERVER_Client * client, 143static void handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg, GNUNET_HashCode *key)
170 const struct GNUNET_MessageHeader *message)
171{ 144{
172 struct GNUNET_DHT_GetMessage *get_msg = (struct GNUNET_DHT_GetMessage *)message;
173 GNUNET_HashCode get_key; 145 GNUNET_HashCode get_key;
174 size_t get_type; 146 size_t get_type;
175 147
176 GNUNET_assert(ntohs(get_msg->header.size) >= sizeof(struct GNUNET_DHT_GetMessage)); 148 GNUNET_assert(ntohs(get_msg->header.size) >= sizeof(struct GNUNET_DHT_GetMessage));
177 memcpy(&get_key, &get_msg->key, sizeof(GNUNET_HashCode));
178 get_type = ntohs(get_msg->type); 149 get_type = ntohs(get_msg->type);
179 150
180#if DEBUG_DHT 151#if DEBUG_DHT
@@ -182,92 +153,38 @@ static void handle_dht_get (void *cls, struct GNUNET_SERVER_Client * client,
182 "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "GET", get_type, GNUNET_h2s(&get_key)); 153 "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "GET", get_type, GNUNET_h2s(&get_key));
183#endif 154#endif
184 155
185 /* FIXME: Implement get stop functionality here */ 156 /* FIXME: Implement get functionality here */
186
187} 157}
188 158
189/**
190 * Server handler for stopping local dht get requests
191 */
192static void handle_dht_get_stop (void *cls, struct GNUNET_SERVER_Client * client,
193 const struct GNUNET_MessageHeader *message)
194{
195 struct GNUNET_DHT_GetMessage *get_msg = (struct GNUNET_DHT_GetMessage *)message; /* Get message and get stop message are the same except for type */
196 GNUNET_HashCode get_key;
197 size_t get_type;
198
199 GNUNET_assert(ntohs(get_msg->header.size) >= sizeof(struct GNUNET_DHT_GetMessage));
200
201 memcpy(&get_key, &get_msg->key, sizeof(GNUNET_HashCode));
202 get_type = ntohs(get_msg->type);
203
204#if DEBUG_DHT
205 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
206 "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "GET STOP", get_type, GNUNET_h2s(&get_key));
207#endif
208
209 /* FIXME: Implement get stop functionality here */
210
211}
212 159
213/** 160/**
214 * Server handler for initiating local dht find peer requests 161 * Server handler for initiating local dht find peer requests
215 */ 162 */
216static void handle_dht_find_peer (void *cls, struct GNUNET_SERVER_Client * 163static void handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg, GNUNET_HashCode *key)
217 client, const struct GNUNET_MessageHeader *
218 message)
219{ 164{
220 struct GNUNET_DHT_FindPeerMessage *find_msg = (struct GNUNET_DHT_FindPeerMessage *)message;
221 struct GNUNET_PeerIdentity peer;
222 165
223 GNUNET_assert(ntohs(find_msg->header.size) == sizeof(struct GNUNET_DHT_FindPeerMessage)); 166 GNUNET_assert(ntohs(find_msg->header.size) == sizeof(struct GNUNET_DHT_FindPeerMessage));
224 memcpy(&peer, &find_msg->peer, sizeof(struct GNUNET_PeerIdentity));
225 167
226#if DEBUG_DHT 168#if DEBUG_DHT
227 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 169 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
228 "`%s': Received `%s' request from client, peer id %s\n", "DHT", "FIND PEER", GNUNET_i2s(&peer)); 170 "`%s': Received `%s' request from client, key %s\n", "DHT", "FIND PEER", GNUNET_h2s(key));
229#endif 171#endif
230 172
231 /* FIXME: Implement find peer functionality here */ 173 /* FIXME: Implement find peer functionality here */
232} 174}
233 175
234/**
235 * Server handler for stopping local dht find peer requests
236 */
237static void handle_dht_find_peer_stop (void *cls, struct GNUNET_SERVER_Client *
238 client, const struct GNUNET_MessageHeader *
239 message)
240{
241 struct GNUNET_DHT_FindPeerMessage *find_msg = (struct GNUNET_DHT_FindPeerMessage *)message; /* Find peer stop message is identical to find peer message */
242 struct GNUNET_PeerIdentity peer;
243
244 GNUNET_assert(ntohs(find_msg->header.size) == sizeof(struct GNUNET_DHT_FindPeerMessage));
245 memcpy(&peer, &find_msg->peer, sizeof(struct GNUNET_PeerIdentity));
246
247#if DEBUG_DHT
248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
249 "`%s': Received `%s' request from client, for peer id %s\n", "DHT", "FIND PEER STOP", GNUNET_i2s(&peer));
250#endif
251
252 /* FIXME: Implement find peer stop functionality here */
253
254}
255 176
256/** 177/**
257 * Server handler for initiating local dht put requests 178 * Server handler for initiating local dht put requests
258 */ 179 */
259static void handle_dht_put (void *cls, struct GNUNET_SERVER_Client * client, 180static void handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg, GNUNET_HashCode *key)
260 const struct GNUNET_MessageHeader *message)
261{ 181{
262 struct GNUNET_DHT_PutMessage *put_msg = (struct GNUNET_DHT_PutMessage *)message;
263 GNUNET_HashCode put_key;
264 size_t put_type; 182 size_t put_type;
265 size_t data_size; 183 size_t data_size;
266 char *data; 184 char *data;
267 185
268 GNUNET_assert(ntohs(put_msg->header.size) >= sizeof(struct GNUNET_DHT_PutMessage)); 186 GNUNET_assert(ntohs(put_msg->header.size) >= sizeof(struct GNUNET_DHT_PutMessage));
269 187
270 memcpy(&put_key, &put_msg->key, sizeof(GNUNET_HashCode));
271 put_type = ntohs(put_msg->type); 188 put_type = ntohs(put_msg->type);
272 data_size = ntohs(put_msg->data_size); 189 data_size = ntohs(put_msg->data_size);
273 GNUNET_assert(ntohs(put_msg->header.size) == sizeof(struct GNUNET_DHT_PutMessage) + data_size); 190 GNUNET_assert(ntohs(put_msg->header.size) == sizeof(struct GNUNET_DHT_PutMessage) + data_size);
@@ -276,7 +193,7 @@ static void handle_dht_put (void *cls, struct GNUNET_SERVER_Client * client,
276 193
277#if DEBUG_DHT 194#if DEBUG_DHT
278 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 195 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
279 "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "PUT", put_type, GNUNET_h2s(&put_key)); 196 "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "PUT", put_type, GNUNET_h2s(key));
280#endif 197#endif
281 198
282 /** 199 /**
@@ -284,7 +201,79 @@ static void handle_dht_put (void *cls, struct GNUNET_SERVER_Client * client,
284 */ 201 */
285 202
286 GNUNET_free(data); 203 GNUNET_free(data);
204}
205
206
207static void
208handle_dht_start_message(void *cls, struct GNUNET_DHT_Message *dht_msg)
209{
210 struct GNUNET_MessageHeader *enc_msg;
211 size_t enc_type;
212
213 enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1];
214 enc_type = ntohs(enc_msg->type);
215
216
217#if DEBUG_DHT
218 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
219 "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", "DHT", "GENERIC", enc_type, GNUNET_h2s(&dht_msg->key), ntohl(dht_msg->unique_id));
220#endif
221
222 /* FIXME: Implement demultiplexing functionality here */
223 switch (enc_type)
224 {
225 case GNUNET_MESSAGE_TYPE_DHT_GET:
226 handle_dht_get(cls, (struct GNUNET_DHT_GetMessage *)enc_msg, &dht_msg->key);
227 break;
228 case GNUNET_MESSAGE_TYPE_DHT_PUT:
229 handle_dht_put(cls, (struct GNUNET_DHT_PutMessage *)enc_msg, &dht_msg->key);
230 break;
231 case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
232 handle_dht_find_peer(cls, (struct GNUNET_DHT_FindPeerMessage *)enc_msg, &dht_msg->key);
233 break;
234 default:
235#if DEBUG_DHT
236 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
237 "`%s': Message type (%d) not handled\n", "DHT", enc_type);
238#endif
239 }
240
241}
242
243
244static void
245handle_dht_stop_message(void *cls, struct GNUNET_DHT_StopMessage *dht_stop_msg)
246{
247
248#if DEBUG_DHT
249 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
250 "`%s': Received `%s' request from client, uid %llu\n", "DHT", "GENERIC STOP", ntohl(dht_stop_msg->unique_id));
251#endif
252}
253
254
255
256/**
257 * Server handler for initiating local dht get requests
258 */
259static void handle_dht_plugin_message (void *cls, struct GNUNET_SERVER_Client * client,
260 const struct GNUNET_MessageHeader *message)
261{
262
263 #if DEBUG_DHT
264 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
265 "`%s': Received `%s' request from client, message type %d, size %d\n", "DHT", "GENERIC", ntohs(message->type), ntohs(message->size));
266#endif
267
268 switch(ntohs(message->type))
269 {
270 case GNUNET_MESSAGE_TYPE_DHT:
271 handle_dht_start_message(cls, (struct GNUNET_DHT_Message *)message);
272 case GNUNET_MESSAGE_TYPE_DHT_STOP:
273 handle_dht_stop_message(cls, (struct GNUNET_DHT_StopMessage *)message);
274 }
287 275
276 GNUNET_SERVER_receive_done(client, GNUNET_OK);
288} 277}
289 278
290/** 279/**