aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht.c
diff options
context:
space:
mode:
authorNathan S. Evans <evans@in.tum.de>2010-04-05 13:52:41 +0000
committerNathan S. Evans <evans@in.tum.de>2010-04-05 13:52:41 +0000
commite4de5bb96aa6560bbd85ebca896e7cb36d426ad5 (patch)
tree658263e857a93978b669cc90438ed1c5521e188d /src/dht/gnunet-service-dht.c
parent2b5542569ff904ad595b40aa150823bfbd7bf39b (diff)
downloadgnunet-e4de5bb96aa6560bbd85ebca896e7cb36d426ad5.tar.gz
gnunet-e4de5bb96aa6560bbd85ebca896e7cb36d426ad5.zip
add gnunet-dht-get and gnunet-dht-put binaries, changes to service
Diffstat (limited to 'src/dht/gnunet-service-dht.c')
-rw-r--r--src/dht/gnunet-service-dht.c344
1 files changed, 288 insertions, 56 deletions
diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c
index f03cb379b..7e733615c 100644
--- a/src/dht/gnunet-service-dht.c
+++ b/src/dht/gnunet-service-dht.c
@@ -72,6 +72,54 @@ static struct GNUNET_PeerIdentity my_identity;
72 */ 72 */
73static GNUNET_SCHEDULER_TaskIdentifier cleanup_task; 73static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
74 74
75
76/**
77 * Linked list of messages to send to clients.
78 */
79struct PendingMessage
80{
81 /**
82 * Pointer to next item in the list
83 */
84 struct PendingMessage *next;
85
86 /**
87 * Actual message to be sent
88 */
89 struct GNUNET_MessageHeader *msg;
90
91};
92
93/**
94 * Struct containing information about a client,
95 * handle to connect to it, and any pending messages
96 * that need to be sent to it.
97 */
98struct ClientList
99{
100 /**
101 * Linked list of active clients
102 */
103 struct ClientList *next;
104
105 /**
106 * The handle to this client
107 */
108 struct GNUNET_SERVER_Client *client_handle;
109
110 /**
111 * Handle to the current transmission request, NULL
112 * if none pending.
113 */
114 struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
115
116 /**
117 * Linked list of pending messages for this client
118 */
119 struct PendingMessage *pending_head;
120
121};
122
75/** 123/**
76 * Context for handling results from a get request. 124 * Context for handling results from a get request.
77 */ 125 */
@@ -80,7 +128,7 @@ struct DatacacheGetContext
80 /** 128 /**
81 * The client to send the result to. 129 * The client to send the result to.
82 */ 130 */
83 struct GNUNET_SERVER_Client *client; 131 struct ClientList *client;
84 132
85 /** 133 /**
86 * The unique id of this request 134 * The unique id of this request
@@ -88,13 +136,15 @@ struct DatacacheGetContext
88 unsigned long long unique_id; 136 unsigned long long unique_id;
89}; 137};
90 138
91 139/**
140 * Context containing information about a DHT message received.
141 */
92struct DHT_MessageContext 142struct DHT_MessageContext
93{ 143{
94 /** 144 /**
95 * The client this request was received from. 145 * The client this request was received from.
96 */ 146 */
97 struct GNUNET_SERVER_Client *client; 147 struct ClientList *client;
98 148
99 /** 149 /**
100 * The key this request was about 150 * The key this request was about
@@ -118,7 +168,13 @@ struct DHT_MessageContext
118}; 168};
119 169
120/** 170/**
121 * Server handler for handling locally received dht requests 171 * List of active clients.
172 */
173static struct ClientList *client_list;
174
175
176/**
177 * Server handlers for handling locally received dht requests
122 */ 178 */
123static void 179static void
124handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, 180handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
@@ -170,34 +226,157 @@ static struct GNUNET_CORE_MessageHandler core_handlers[] = {
170 {NULL, 0, 0} 226 {NULL, 0, 0}
171}; 227};
172 228
229/**
230 * Forward declaration.
231 */
232static size_t send_generic_reply (void *cls, size_t size, void *buf);
233
234/**
235 * Task run to check for messages that need to be sent to a client.
236 *
237 * @param cls a ClientList, containing the client and any messages to be sent to it
238 * @param tc reason this was called
239 */
240static void
241process_pending_messages (void *cls,
242 const struct GNUNET_SCHEDULER_TaskContext *tc)
243{
244 struct ClientList *client = cls;
245
246 if (client->pending_head == NULL) /* No messages queued */
247 {
248#if DEBUG_DHT
249 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
250 "`%s': Have no pending messages for client.\n", "DHT");
251#endif
252 return;
253 }
254
255 if (client->transmit_handle == NULL) /* No current pending messages, we can try to send! */
256 client->transmit_handle =
257 GNUNET_SERVER_notify_transmit_ready (client->client_handle,
258 ntohs (client->pending_head->msg->
259 size),
260 GNUNET_TIME_relative_multiply
261 (GNUNET_TIME_UNIT_SECONDS, 5),
262 &send_generic_reply, client);
263 else
264 {
265#if DEBUG_DHT
266 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
267 "`%s': Transmit handle is non-null.\n", "DHT");
268#endif
269 }
270}
173 271
272/**
273 * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
274 * request. A ClientList is passed as closure, take the head of the list
275 * and copy it into buf, which has the result of sending the message to the
276 * client.
277 *
278 * @param cls closure to this call
279 * @param size maximum number of bytes available to send
280 * @param buf where to copy the actual message to
281 *
282 * @return the number of bytes actually copied, 0 indicates failure
283 */
174static size_t 284static size_t
175send_reply (void *cls, size_t size, void *buf) 285send_generic_reply (void *cls, size_t size, void *buf)
176{ 286{
177 struct GNUNET_DHT_Message *reply = cls; 287 struct ClientList *client = cls;
288 struct PendingMessage *reply = client->pending_head;
289 int ret;
178 290
291 client->transmit_handle = NULL;
179 if (buf == NULL) /* Message timed out, that's crappy... */ 292 if (buf == NULL) /* Message timed out, that's crappy... */
180 { 293 {
294#if DEBUG_DHT
295 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': buffer was NULL\n", "DHT");
296#endif
297 client->pending_head = reply->next;
298 GNUNET_free (reply->msg);
181 GNUNET_free (reply); 299 GNUNET_free (reply);
182 return 0; 300 return 0;
183 } 301 }
184 302
185 if (size >= ntohs (reply->header.size)) 303 if (size >= ntohs (reply->msg->size))
186 { 304 {
187 memcpy (buf, reply, ntohs (reply->header.size)); 305#if DEBUG_DHT
188 return ntohs (reply->header.size); 306 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
307 "`%s': Copying reply to buffer, REALLY SENT\n", "DHT");
308#endif
309 memcpy (buf, reply->msg, ntohs (reply->msg->size));
310
311 ret = ntohs (reply->msg->size);
189 } 312 }
190 else 313 else
191 return 0; 314 ret = 0;
315
316 client->pending_head = reply->next;
317 GNUNET_free (reply->msg);
318 GNUNET_free (reply);
319
320 GNUNET_SCHEDULER_add_now (sched, &process_pending_messages, client);
321 return ret;
192} 322}
193 323
324/**
325 * Add a PendingMessage to the clients list of messages to be sent
326 *
327 * @param client the active client to send the message to
328 * @param pending_message the actual message to send
329 */
330static void
331add_pending_message (struct ClientList *client,
332 struct PendingMessage *pending_message)
333{
334 struct PendingMessage *pos;
335 struct PendingMessage *prev;
336
337 pos = client->pending_head;
338
339#if DEBUG_DHT
340 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
341 "`%s': Adding pending message for client.\n", "DHT");
342#endif
343
344 if (pos == NULL)
345 {
346 client->pending_head = pending_message;
347 }
348 else /* This means another request is already queued, rely on send_reply to process all pending messages */
349 {
350 while (pos != NULL) /* Find end of list */
351 {
352 prev = pos;
353 pos = pos->next;
354 }
355
356 GNUNET_assert (prev != NULL);
357 prev->next = pending_message;
358 }
359
360 GNUNET_SCHEDULER_add_now (sched, &process_pending_messages, client);
194 361
362}
363
364/**
365 * Called when a reply needs to be sent to a client, either as
366 * a result it found to a GET or FIND PEER request.
367 *
368 * @param client the client to send the reply to
369 * @param message the encapsulated message to send
370 * @param uid the unique identifier of this request
371 */
195static void 372static void
196send_reply_to_client (struct GNUNET_SERVER_Client *client, 373send_reply_to_client (struct ClientList *client,
197 struct GNUNET_MessageHeader *message, 374 struct GNUNET_MessageHeader *message,
198 unsigned long long uid) 375 unsigned long long uid)
199{ 376{
200 struct GNUNET_DHT_Message *reply; 377 struct GNUNET_DHT_Message *reply;
378 struct PendingMessage *pending_message;
379
201 size_t msize; 380 size_t msize;
202 size_t tsize; 381 size_t tsize;
203#if DEBUG_DHT 382#if DEBUG_DHT
@@ -214,18 +393,25 @@ send_reply_to_client (struct GNUNET_SERVER_Client *client,
214 reply->unique_id = GNUNET_htonll (uid); 393 reply->unique_id = GNUNET_htonll (uid);
215 memcpy (&reply[1], message, msize); 394 memcpy (&reply[1], message, msize);
216 395
217 GNUNET_SERVER_notify_transmit_ready (client, 396 pending_message = GNUNET_malloc (sizeof (struct PendingMessage));
218 tsize, 397 pending_message->msg = &reply->header;
219 GNUNET_TIME_relative_multiply 398 pending_message->next = NULL; /* We insert at the end of the list */
220 (GNUNET_TIME_UNIT_SECONDS, 5),
221 &send_reply, reply);
222 399
400 add_pending_message (client, pending_message);
223} 401}
224 402
225 403
226/** 404/**
227 * Iterator for local get request results, return 405 * Iterator for local get request results,
228 * GNUNET_OK to continue iteration, anything else 406 *
407 * @param cls closure for iterator, a DatacacheGetContext
408 * @param exp when does this value expire?
409 * @param key the key this data is stored under
410 * @param size the size of the data identified by key
411 * @param data the actual data
412 * @param type the type of the data
413 *
414 * @return GNUNET_OK to continue iteration, anything else
229 * to stop iteration. 415 * to stop iteration.
230 */ 416 */
231static int 417static int
@@ -236,7 +422,10 @@ datacache_get_iterator (void *cls,
236{ 422{
237 struct DatacacheGetContext *datacache_get_ctx = cls; 423 struct DatacacheGetContext *datacache_get_ctx = cls;
238 struct GNUNET_DHT_GetResultMessage *get_result; 424 struct GNUNET_DHT_GetResultMessage *get_result;
239 425#if DEBUG_DHT
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427 "`%s': Received `%s' response from datacache\n", "DHT", "GET");
428#endif
240 get_result = 429 get_result =
241 GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size); 430 GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
242 get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT); 431 get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
@@ -257,6 +446,11 @@ datacache_get_iterator (void *cls,
257 446
258/** 447/**
259 * Server handler for initiating local dht get requests 448 * Server handler for initiating local dht get requests
449 *
450 * @param cls closure for service
451 * @param get_msg the actual get message
452 * @param message_context struct containing pertinent information about the get request
453 *
260 */ 454 */
261static void 455static void
262handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg, 456handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg,
@@ -288,7 +482,7 @@ handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg,
288 if (datacache != NULL) 482 if (datacache != NULL)
289 results = 483 results =
290 GNUNET_DATACACHE_get (datacache, message_context->key, get_type, 484 GNUNET_DATACACHE_get (datacache, message_context->key, get_type,
291 datacache_get_iterator, datacache_get_context); 485 &datacache_get_iterator, datacache_get_context);
292 486
293#if DEBUG_DHT 487#if DEBUG_DHT
294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 488 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -302,6 +496,11 @@ handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg,
302 496
303/** 497/**
304 * Server handler for initiating local dht find peer requests 498 * Server handler for initiating local dht find peer requests
499 *
500 * @param cls closure for service
501 * @param find_msg the actual find peer message
502 * @param message_context struct containing pertinent information about the request
503 *
305 */ 504 */
306static void 505static void
307handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg, 506handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg,
@@ -324,6 +523,10 @@ handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg,
324 523
325/** 524/**
326 * Server handler for initiating local dht put requests 525 * Server handler for initiating local dht put requests
526 *
527 * @param cls closure for service
528 * @param put_msg the actual put message
529 * @param message_context struct containing pertinent information about the request
327 */ 530 */
328static void 531static void
329handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg, 532handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg,
@@ -365,49 +568,51 @@ handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg,
365 568
366} 569}
367 570
571
368/** 572/**
369 * Context for sending receipt confirmations. Not used yet. 573 * Find a client if it exists, add it otherwise.
574 *
575 * @param client the server handle to the client
576 *
577 * @return the client if found, a new client otherwise
370 */ 578 */
371struct SendConfirmationContext 579static struct ClientList *
580find_active_client (struct GNUNET_SERVER_Client *client)
372{ 581{
373 /** 582 struct ClientList *pos = client_list;
374 * The message to send. 583 struct ClientList *ret;
375 */
376 struct GNUNET_DHT_StopMessage *message;
377 584
378 /** 585 while (pos != NULL)
379 * Transmit handle.
380 */
381 struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
382};
383
384static size_t
385send_confirmation (void *cls, size_t size, void *buf)
386{
387 struct GNUNET_DHT_StopMessage *confirmation_message = cls;
388
389 if (buf == NULL) /* Message timed out, that's crappy... */
390 { 586 {
391 GNUNET_free (confirmation_message); 587 if (pos->client_handle == client)
392 return 0; 588 return pos;
589 pos = pos->next;
393 } 590 }
394 591
395 if (size >= ntohs (confirmation_message->header.size)) 592 ret = GNUNET_malloc (sizeof (struct ClientList));
396 { 593 ret->client_handle = client;
397 memcpy (buf, confirmation_message, 594 ret->next = client_list;
398 ntohs (confirmation_message->header.size)); 595 client_list = ret;
399 return ntohs (confirmation_message->header.size); 596 ret->pending_head = NULL;
400 }
401 else
402 return 0;
403}
404 597
598 return ret;
599}
405 600
601/**
602 * Construct a message receipt confirmation for a particular uid.
603 * Receipt confirmations are used for any requests that don't expect
604 * a reply otherwise (i.e. put requests, stop requests).
605 *
606 * @param client the handle for the client
607 * @param uid the unique identifier of this message
608 */
406static void 609static void
407send_client_receipt_confirmation (struct GNUNET_SERVER_Client *client, 610send_client_receipt_confirmation (struct GNUNET_SERVER_Client *client,
408 uint64_t uid) 611 uint64_t uid)
409{ 612{
410 struct GNUNET_DHT_StopMessage *confirm_message; 613 struct GNUNET_DHT_StopMessage *confirm_message;
614 struct ClientList *active_client;
615 struct PendingMessage *pending_message;
411 616
412#if DEBUG_DHT 617#if DEBUG_DHT
413 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -420,14 +625,23 @@ send_client_receipt_confirmation (struct GNUNET_SERVER_Client *client,
420 htons (sizeof (struct GNUNET_DHT_StopMessage)); 625 htons (sizeof (struct GNUNET_DHT_StopMessage));
421 confirm_message->unique_id = GNUNET_htonll (uid); 626 confirm_message->unique_id = GNUNET_htonll (uid);
422 627
423 GNUNET_SERVER_notify_transmit_ready (client, 628 active_client = find_active_client (client);
424 sizeof (struct GNUNET_DHT_StopMessage), 629 pending_message = GNUNET_malloc (sizeof (struct PendingMessage));
425 GNUNET_TIME_relative_multiply 630 pending_message->msg = &confirm_message->header;
426 (GNUNET_TIME_UNIT_SECONDS, 5), 631
427 &send_confirmation, confirm_message); 632 add_pending_message (active_client, pending_message);
428 633
429} 634}
430 635
636/**
637 * Handler for any generic DHT messages, calls the appropriate handler
638 * depending on message type, sends confirmation if responses aren't otherwise
639 * expected.
640 *
641 * @param cls closure for the service
642 * @param client the client we received this message from
643 * @param message the actual message received
644 */
431static void 645static void
432handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, 646handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
433 const struct GNUNET_MessageHeader *message) 647 const struct GNUNET_MessageHeader *message)
@@ -435,6 +649,7 @@ handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
435 struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *) message; 649 struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *) message;
436 struct GNUNET_MessageHeader *enc_msg; 650 struct GNUNET_MessageHeader *enc_msg;
437 struct DHT_MessageContext *message_context; 651 struct DHT_MessageContext *message_context;
652
438 size_t enc_type; 653 size_t enc_type;
439 654
440 enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1]; 655 enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
@@ -449,7 +664,7 @@ handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
449#endif 664#endif
450 665
451 message_context = GNUNET_malloc (sizeof (struct DHT_MessageContext)); 666 message_context = GNUNET_malloc (sizeof (struct DHT_MessageContext));
452 message_context->client = client; 667 message_context->client = find_active_client (client);
453 message_context->key = &dht_msg->key; 668 message_context->key = &dht_msg->key;
454 message_context->unique_id = GNUNET_ntohll (dht_msg->unique_id); 669 message_context->unique_id = GNUNET_ntohll (dht_msg->unique_id);
455 message_context->replication = ntohs (dht_msg->desired_replication_level); 670 message_context->replication = ntohs (dht_msg->desired_replication_level);
@@ -482,7 +697,17 @@ handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
482 697
483} 698}
484 699
485 700/**
701 * Handler for any generic DHT stop messages, calls the appropriate handler
702 * depending on message type, sends confirmation by default (stop messages
703 * do not otherwise expect replies)
704 *
705 * @param cls closure for the service
706 * @param client the client we received this message from
707 * @param message the actual message received
708 *
709 * TODO: add demultiplexing for stop message types.
710 */
486static void 711static void
487handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client, 712handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client,
488 const struct GNUNET_MessageHeader *message) 713 const struct GNUNET_MessageHeader *message)
@@ -573,6 +798,11 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
573 798
574/** 799/**
575 * To be called on core init/fail. 800 * To be called on core init/fail.
801 *
802 * @param cls service closure
803 * @param server handle to the server for this service
804 * @param identity the public identity of this peer
805 * @param publicKey the public key of this peer
576 */ 806 */
577void 807void
578core_init (void *cls, 808core_init (void *cls,
@@ -592,7 +822,9 @@ core_init (void *cls,
592 "%s: Core connection initialized, I am peer: %s\n", "dht", 822 "%s: Core connection initialized, I am peer: %s\n", "dht",
593 GNUNET_i2s (identity)); 823 GNUNET_i2s (identity));
594#endif 824#endif
825 /* Copy our identity so we can use it */
595 memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity)); 826 memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
827 /* Set the server to local variable */
596 coreAPI = server; 828 coreAPI = server;
597} 829}
598 830