aboutsummaryrefslogtreecommitdiff
path: root/src/dht/dht_api.c
diff options
context:
space:
mode:
authorNathan S. Evans <evans@in.tum.de>2010-03-16 19:41:11 +0000
committerNathan S. Evans <evans@in.tum.de>2010-03-16 19:41:11 +0000
commit48db873e37ec4127d4d4b86658dea5907bcc1606 (patch)
tree2f892a7e36a270c327c5df77a35cff200b2019a4 /src/dht/dht_api.c
parentf2e9dfa6d34c5f213b4ee4d4d38c59104d37a74b (diff)
downloadgnunet-48db873e37ec4127d4d4b86658dea5907bcc1606.tar.gz
gnunet-48db873e37ec4127d4d4b86658dea5907bcc1606.zip
dht api, shell dht service, base of future test case.... not yet working
Diffstat (limited to 'src/dht/dht_api.c')
-rw-r--r--src/dht/dht_api.c583
1 files changed, 583 insertions, 0 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
new file mode 100644
index 000000000..db25379fa
--- /dev/null
+++ b/src/dht/dht_api.c
@@ -0,0 +1,583 @@
1/*
2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/dht_api.c
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "platform.h"
28#include "gnunet_bandwidth_lib.h"
29#include "gnunet_client_lib.h"
30#include "gnunet_constants.h"
31#include "gnunet_container_lib.h"
32#include "gnunet_arm_service.h"
33#include "gnunet_hello_lib.h"
34#include "gnunet_protocols.h"
35#include "gnunet_server_lib.h"
36#include "gnunet_time_lib.h"
37#include "gnunet_dht_service.h"
38#include "dht.h"
39
40#define DEBUG_DHT_API GNUNET_YES
41
42struct PendingMessages
43{
44 /**
45 * Linked list of pending messages
46 */
47 struct PendingMessages *next;
48
49 /**
50 * Message that is pending
51 */
52 struct GNUNET_MessageHeader *msg;
53
54 /**
55 * Timeout for this message
56 */
57 struct GNUNET_TIME_Relative timeout;
58
59};
60
61/**
62 * Connection to the DHT service.
63 */
64struct GNUNET_DHT_Handle
65{
66 /**
67 * Our scheduler.
68 */
69 struct GNUNET_SCHEDULER_Handle *sched;
70
71 /**
72 * Configuration to use.
73 */
74 const struct GNUNET_CONFIGURATION_Handle *cfg;
75
76 /**
77 * Socket (if available).
78 */
79 struct GNUNET_CLIENT_Connection *client;
80
81 /**
82 * Currently pending transmission request.
83 */
84 struct GNUNET_CLIENT_TransmitHandle *th;
85
86 /**
87 * List of the currently pending messages for the DHT service.
88 */
89 struct PendingMessages *pending_list;
90
91 /**
92 * Message we are currently sending.
93 */
94 struct PendingMessages *current;
95
96 /**
97 * Hash map containing the current outstanding get requests
98 */
99 struct GNUNET_CONTAINER_MultiHashMap *outstanding_get_requests;
100
101 /**
102 * Hash map containing the current outstanding put requests, awaiting
103 * a response
104 */
105 struct GNUNET_CONTAINER_MultiHashMap *outstanding_put_requests;
106
107 /**
108 * Kill off the connection and any pending messages.
109 */
110 int do_destroy;
111
112};
113
114static struct GNUNET_TIME_Relative default_request_timeout;
115
116/* Forward declaration */
117static void process_pending_message(struct GNUNET_DHT_Handle *handle);
118
119/**
120 * Handler for messages received from the DHT service
121 * a demultiplexer which handles numerous message types
122 *
123 */
124void service_message_handler (void *cls,
125 const struct GNUNET_MessageHeader *msg)
126{
127
128 /* TODO: find out message type, handle callbacks for different types of messages.
129 * Should be a put acknowledgment, get data or find node result. */
130}
131
132
133/**
134 * Initialize the connection with the DHT service.
135 *
136 * @param cfg configuration to use
137 * @param sched scheduler to use
138 * @return NULL on error
139 */
140struct GNUNET_DHT_Handle *
141GNUNET_DHT_connect (struct GNUNET_SCHEDULER_Handle *sched,
142 const struct GNUNET_CONFIGURATION_Handle *cfg)
143{
144 struct GNUNET_DHT_Handle *handle;
145
146 handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_Handle));
147
148 default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
149 handle->cfg = cfg;
150 handle->sched = sched;
151 handle->pending_list = NULL;
152 handle->current = NULL;
153 handle->do_destroy = GNUNET_NO;
154 handle->th = NULL;
155
156 handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg);
157 handle->outstanding_get_requests = GNUNET_CONTAINER_multihashmap_create(100); /* FIXME: better number */
158 handle->outstanding_put_requests = GNUNET_CONTAINER_multihashmap_create(100); /* FIXME: better number */
159 if (handle->client == NULL)
160 return NULL;
161#if DEBUG_DHT_API
162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
163 "`%s': Connection to service in progress\n", "DHT API");
164#endif
165 GNUNET_CLIENT_receive (handle->client,
166 &service_message_handler,
167 handle, GNUNET_TIME_UNIT_FOREVER_REL);
168
169 return handle;
170}
171
172
173/**
174 * Shutdown connection with the DHT service.
175 *
176 * @param h connection to shut down
177 */
178void
179GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
180{
181 struct PendingMessages *pos;
182#if DEBUG_DHT_API
183 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
184 "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
185#endif
186 GNUNET_assert(handle != NULL);
187
188 if (handle->th != NULL) /* We have a live transmit request in the Aether */
189 {
190 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
191 handle->th = NULL;
192 }
193 if (handle->current != NULL) /* We are trying to send something now, clean it up */
194 GNUNET_free(handle->current);
195
196 while (NULL != (pos = handle->pending_list)) /* Remove all pending sends from the list */
197 {
198 handle->pending_list = pos->next;
199 GNUNET_free(pos);
200 }
201 if (handle->client != NULL) /* Finally, disconnect from the service */
202 {
203 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
204 handle->client = NULL;
205 }
206
207 GNUNET_free (handle);
208}
209
210
211/**
212 * Handle to control a GET operation.
213 */
214struct GNUNET_DHT_GetHandle
215{
216
217 /**
218 * Key that this get request is for
219 */
220 GNUNET_HashCode key;
221
222 /**
223 * Type of data get request was for
224 */
225 uint32_t type;
226
227 /**
228 * Iterator to call on data receipt
229 */
230 GNUNET_DHT_Iterator iter;
231
232 /**
233 * Closure for the iterator callback
234 */
235 void *iter_cls;
236};
237
238/**
239 * Handle for a PUT request, holds callback
240 */
241struct GNUNET_DHT_PutHandle
242{
243 /**
244 * Key that this get request is for
245 */
246 GNUNET_HashCode key;
247
248 /**
249 * Type of data get request was for
250 */
251 uint32_t type;
252
253 /**
254 * Continuation to call on put send
255 */
256 GNUNET_SCHEDULER_Task cont;
257
258 /**
259 * Send continuation cls
260 */
261 void *cont_cls;
262};
263
264/**
265 * Send complete (or failed), schedule next (or don't)
266 */
267static void
268finish (struct GNUNET_DHT_Handle *handle, int code)
269{
270 /* TODO: if code is not GNUNET_OK, do something! */
271 struct PendingMessages *pos = handle->current;
272 struct GNUNET_DHT_GetMessage *get;
273 struct GNUNET_DHT_PutMessage *put;
274
275 GNUNET_assert(pos != NULL);
276
277 switch (ntohs(pos->msg->type))
278 {
279 case GNUNET_MESSAGE_TYPE_DHT_GET:
280 get = (struct GNUNET_DHT_GetMessage *)pos->msg;
281 GNUNET_free(get);
282 break;
283 case GNUNET_MESSAGE_TYPE_DHT_PUT:
284 put = (struct GNUNET_DHT_PutMessage *)pos->msg;
285 GNUNET_free(put);
286 break;
287 default:
288 GNUNET_break(0);
289 }
290
291 handle->current = NULL;
292
293 if (code != GNUNET_SYSERR)
294 process_pending_message (handle);
295
296 GNUNET_free(pos);
297}
298
299/**
300 * Transmit the next pending message, called by notify_transmit_ready
301 */
302static size_t
303transmit_pending (void *cls, size_t size, void *buf)
304{
305 struct GNUNET_DHT_Handle *handle = cls;
306 size_t ret;
307 size_t tsize;
308
309 if (buf == NULL)
310 {
311#if DEBUG_DHT_API
312 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313 "`%s': In transmit_pending buf is NULL\n", "DHT API");
314#endif
315 /* FIXME: free associated resources or summat */
316 finish(handle, GNUNET_SYSERR);
317 return 0;
318 }
319
320 handle->th = NULL;
321 ret = 0;
322
323 if (handle->current != NULL)
324 {
325 tsize = ntohs(handle->current->msg->size);
326 if (size >= tsize)
327 {
328#if DEBUG_DHT_API
329 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
330 "`%s': Sending message size %d\n", "DHT API", tsize);
331#endif
332 memcpy(buf, handle->current->msg, tsize);
333 }
334 else
335 {
336 return ret;
337 }
338 }
339
340 return ret;
341}
342
343
344/**
345 * Try to (re)connect to the dht service.
346 *
347 * @return GNUNET_YES on success, GNUNET_NO on failure.
348 */
349static int
350try_connect (struct GNUNET_DHT_Handle *ret)
351{
352 if (ret->client != NULL)
353 return GNUNET_OK;
354 ret->client = GNUNET_CLIENT_connect (ret->sched, "dht", ret->cfg);
355 if (ret->client != NULL)
356 return GNUNET_YES;
357#if DEBUG_STATISTICS
358 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
359 _("Failed to connect to the dht service!\n"));
360#endif
361 return GNUNET_NO;
362}
363
364
365/**
366 * Try to send messages from list of messages to send
367 */
368static void process_pending_message(struct GNUNET_DHT_Handle *handle)
369{
370
371 if (handle->current != NULL)
372 return; /* action already pending */
373 if (GNUNET_YES != try_connect (handle))
374 {
375 finish (handle, GNUNET_SYSERR);
376 return;
377 }
378
379 /* TODO: set do_destroy somewhere's, see what needs to happen in that case! */
380 if (handle->do_destroy)
381 {
382 //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
383 }
384
385 /* schedule next action */
386 handle->current = handle->pending_list;
387 if (NULL == handle->current)
388 {
389 return;
390 }
391 handle->pending_list = handle->pending_list->next;
392 handle->current->next = NULL;
393
394 if (NULL ==
395 (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
396 ntohs(handle->current->msg->size),
397 handle->current->timeout,
398 GNUNET_YES,
399 &transmit_pending, handle)))
400 {
401#if DEBUG_DHT_API
402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
403 "Failed to transmit request to dht service.\n");
404#endif
405 finish (handle, GNUNET_SYSERR);
406 }
407#if DEBUG_DHT_API
408 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
409 "`%s': Scheduled sending message of size %d to service\n", "DHT API", ntohs(handle->current->msg->size));
410#endif
411}
412
413/**
414 * Add a pending message to the linked list of messages which need to be sent
415 *
416 * @param handle handle to the specified DHT api
417 * @param msg the message to add to the list
418 */
419static void add_pending(struct GNUNET_DHT_Handle *handle, struct GNUNET_MessageHeader *msg)
420{
421 struct PendingMessages *new_message;
422 struct PendingMessages *pos;
423 struct PendingMessages *last;
424
425 new_message = GNUNET_malloc(sizeof(struct PendingMessages));
426 new_message->msg = msg;
427 new_message->timeout = default_request_timeout;
428
429 if (handle->pending_list != NULL)
430 {
431 pos = handle->pending_list;
432 while(pos != NULL)
433 {
434 last = pos;
435 pos = pos->next;
436 }
437 new_message->next = last->next; /* Should always be null */
438 last->next = new_message;
439 }
440 else
441 {
442 new_message->next = handle->pending_list; /* Will always be null */
443 handle->pending_list = new_message;
444 }
445
446 process_pending_message(handle);
447}
448
449/**
450 * Perform an asynchronous GET operation on the DHT identified.
451 *
452 * @param h handle to the DHT service
453 * @param type expected type of the response object
454 * @param key the key to look up
455 * @param iter function to call on each result
456 * @param iter_cls closure for iter
457 * @return handle to stop the async get
458 */
459struct GNUNET_DHT_GetHandle *
460GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
461 uint32_t type,
462 const GNUNET_HashCode * key,
463 GNUNET_DHT_Iterator iter,
464 void *iter_cls)
465{
466 struct GNUNET_DHT_GetMessage *get_msg;
467 struct GNUNET_DHT_GetHandle *get_handle;
468
469 get_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_get_requests, key);
470
471 if (get_handle != NULL)
472 {
473 /*
474 * A get has been previously sent, return existing handle.
475 * FIXME: should we re-transmit the request to the DHT service?
476 */
477 return get_handle;
478 }
479
480 get_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetHandle));
481 get_handle->type = type;
482 memcpy(&get_handle->key, key, sizeof(GNUNET_HashCode));
483 get_handle->iter = iter;
484 get_handle->iter_cls = iter_cls;
485
486 GNUNET_CONTAINER_multihashmap_put(handle->outstanding_get_requests, key, get_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
487
488 get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
489 get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
490 get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
491 get_msg->type = htonl(type);
492 memcpy(&get_msg->key, key, sizeof(GNUNET_HashCode));
493
494 add_pending(handle, &get_msg->header);
495
496 return get_handle;
497}
498
499
500/**
501 * Stop async DHT-get. Frees associated resources.
502 *
503 * @param record GET operation to stop.
504 */
505void
506GNUNET_DHT_get_stop (struct GNUNET_DHT_Handle *handle, struct GNUNET_DHT_GetHandle *get_handle)
507{
508 struct GNUNET_DHT_GetMessage *get_msg;
509
510 if (handle->do_destroy == GNUNET_NO)
511 {
512 get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
513 get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_STOP);
514 get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
515 get_msg->type = htonl(get_handle->type);
516 memcpy(&get_msg->key, &get_handle->key, sizeof(GNUNET_HashCode));
517
518 add_pending(handle, &get_msg->header);
519 }
520
521 GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_get_requests, &get_handle->key, get_handle) == GNUNET_YES);
522 GNUNET_free(get_handle);
523}
524
525
526/**
527 * Perform a PUT operation storing data in the DHT.
528 *
529 * @param h handle to DHT service
530 * @param key the key to store under
531 * @param type type of the value
532 * @param size number of bytes in data; must be less than 64k
533 * @param data the data to store
534 * @param exp desired expiration time for the value
535 * @param cont continuation to call when done;
536 * reason will be TIMEOUT on error,
537 * reason will be PREREQ_DONE on success
538 * @param cont_cls closure for cont
539 *
540 * @return GNUNET_YES if put message is queued for transmission
541 */
542int GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
543 const GNUNET_HashCode * key,
544 uint32_t type,
545 uint32_t size,
546 const char *data,
547 struct GNUNET_TIME_Relative exp,
548 GNUNET_SCHEDULER_Task cont,
549 void *cont_cls)
550{
551 struct GNUNET_DHT_PutMessage *put_msg;
552 struct GNUNET_DHT_PutHandle *put_handle;
553 size_t msize;
554
555 put_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_put_requests, key);
556
557 if (put_handle != NULL)
558 {
559 /*
560 * A put has been previously queued, but not yet sent.
561 * FIXME: change the continuation function and callback or something?
562 */
563 return GNUNET_NO;
564 }
565
566 put_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_PutHandle));
567 put_handle->type = type;
568 memcpy(&put_handle->key, key, sizeof(GNUNET_HashCode));
569
570 GNUNET_CONTAINER_multihashmap_put(handle->outstanding_put_requests, key, put_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
571
572 msize = sizeof(struct GNUNET_DHT_PutMessage) + size;
573 put_msg = GNUNET_malloc(msize);
574 put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
575 put_msg->header.size = htons(msize);
576 put_msg->type = htonl(type);
577 memcpy(&put_msg->key, key, sizeof(GNUNET_HashCode));
578 memcpy(&put_msg[1], data, size);
579
580 add_pending(handle, &put_msg->header);
581
582 return GNUNET_YES;
583}