aboutsummaryrefslogtreecommitdiff
path: root/src/dht
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2012-04-26 16:56:00 +0000
committerChristian Grothoff <christian@grothoff.org>2012-04-26 16:56:00 +0000
commit4757258de33285fed2aa318c374dcdbf586c29f0 (patch)
tree283fa337cdaa54fbb3bbf08c937fdb6467c2b5d1 /src/dht
parent506d42b2ba6eb104e64fd0c8889ea7233a9b96b3 (diff)
downloadgnunet-4757258de33285fed2aa318c374dcdbf586c29f0.tar.gz
gnunet-4757258de33285fed2aa318c374dcdbf586c29f0.zip
-fixing #2277
Diffstat (limited to 'src/dht')
-rw-r--r--src/dht/dht.h29
-rw-r--r--src/dht/dht_api.c480
-rw-r--r--src/dht/gnunet-dht-put.c30
-rw-r--r--src/dht/gnunet-service-dht_clients.c91
-rw-r--r--src/dht/test_dht_api.c4
-rw-r--r--src/dht/test_dht_multipeer.c2
-rw-r--r--src/dht/test_dht_twopeer_path_tracking.c2
-rw-r--r--src/dht/test_dht_twopeer_put_get.c23
8 files changed, 440 insertions, 221 deletions
diff --git a/src/dht/dht.h b/src/dht/dht.h
index 373bc7b82..07cd18296 100644
--- a/src/dht/dht.h
+++ b/src/dht/dht.h
@@ -181,6 +181,11 @@ struct GNUNET_DHT_ClientPutMessage
181 uint32_t desired_replication_level GNUNET_PACKED; 181 uint32_t desired_replication_level GNUNET_PACKED;
182 182
183 /** 183 /**
184 * Unique ID for the PUT message.
185 */
186 uint64_t unique_id GNUNET_PACKED;
187
188 /**
184 * How long should this data persist? 189 * How long should this data persist?
185 */ 190 */
186 struct GNUNET_TIME_AbsoluteNBO expiration; 191 struct GNUNET_TIME_AbsoluteNBO expiration;
@@ -196,6 +201,30 @@ struct GNUNET_DHT_ClientPutMessage
196 201
197 202
198/** 203/**
204 * Message to confirming receipt of PUT, sent from DHT service to clients.
205 */
206struct GNUNET_DHT_ClientPutConfirmationMessage
207{
208 /**
209 * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK
210 */
211 struct GNUNET_MessageHeader header;
212
213 /**
214 * Always zero.
215 */
216 uint32_t reserved GNUNET_PACKED;
217
218 /**
219 * Unique ID from the PUT message that is being confirmed.
220 */
221 uint64_t unique_id GNUNET_PACKED;
222
223};
224
225
226
227/**
199 * Message to monitor put requests going through peer, DHT service -> clients. 228 * Message to monitor put requests going through peer, DHT service -> clients.
200 */ 229 */
201struct GNUNET_DHT_MonitorPutMessage 230struct GNUNET_DHT_MonitorPutMessage
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 9e95155a9..37b65b022 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors) 3 (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -74,11 +74,6 @@ struct PendingMessage
74 void *cont_cls; 74 void *cont_cls;
75 75
76 /** 76 /**
77 * Timeout task for this message
78 */
79 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
80
81 /**
82 * Unique ID for this request 77 * Unique ID for this request
83 */ 78 */
84 uint64_t unique_id; 79 uint64_t unique_id;
@@ -100,6 +95,56 @@ struct PendingMessage
100 95
101 96
102/** 97/**
98 * Handle to a PUT request.
99 */
100struct GNUNET_DHT_PutHandle
101{
102 /**
103 * Kept in a DLL.
104 */
105 struct GNUNET_DHT_PutHandle *next;
106
107 /**
108 * Kept in a DLL.
109 */
110 struct GNUNET_DHT_PutHandle *prev;
111
112 /**
113 * Continuation to call when done.
114 */
115 GNUNET_DHT_PutContinuation cont;
116
117 /**
118 * Pending message associated with this PUT operation,
119 * NULL after the message has been transmitted to the service.
120 */
121 struct PendingMessage *pending;
122
123 /**
124 * Main handle to this DHT api
125 */
126 struct GNUNET_DHT_Handle *dht_handle;
127
128 /**
129 * Closure for 'cont'.
130 */
131 void *cont_cls;
132
133 /**
134 * Timeout task for this operation.
135 */
136 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
137
138 /**
139 * Unique ID for the PUT operation.
140 */
141 uint64_t unique_id;
142
143};
144
145
146
147/**
103 * Handle to a GET request 148 * Handle to a GET request
104 */ 149 */
105struct GNUNET_DHT_GetHandle 150struct GNUNET_DHT_GetHandle
@@ -235,8 +280,18 @@ struct GNUNET_DHT_Handle
235 struct GNUNET_DHT_MonitorHandle *monitor_tail; 280 struct GNUNET_DHT_MonitorHandle *monitor_tail;
236 281
237 /** 282 /**
238 * Hash map containing the current outstanding unique requests 283 * Head of active PUT requests.
239 * (values are of type 'struct GNUNET_DHT_RouteHandle'). 284 */
285 struct GNUNET_DHT_PutHandle *put_head;
286
287 /**
288 * Tail of active PUT requests.
289 */
290 struct GNUNET_DHT_PutHandle *put_tail;
291
292 /**
293 * Hash map containing the current outstanding unique GET requests
294 * (values are of type 'struct GNUNET_DHT_GetHandle').
240 */ 295 */
241 struct GNUNET_CONTAINER_MultiHashMap *active_requests; 296 struct GNUNET_CONTAINER_MultiHashMap *active_requests;
242 297
@@ -267,6 +322,8 @@ struct GNUNET_DHT_Handle
267 * Handler for messages received from the DHT service 322 * Handler for messages received from the DHT service
268 * a demultiplexer which handles numerous message types 323 * a demultiplexer which handles numerous message types
269 * 324 *
325 * @param cls the 'struct GNUNET_DHT_Handle'
326 * @param msg the incoming message
270 */ 327 */
271static void 328static void
272service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg); 329service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
@@ -275,16 +332,17 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
275/** 332/**
276 * Try to (re)connect to the DHT service. 333 * Try to (re)connect to the DHT service.
277 * 334 *
335 * @param handle DHT handle to reconnect
278 * @return GNUNET_YES on success, GNUNET_NO on failure. 336 * @return GNUNET_YES on success, GNUNET_NO on failure.
279 */ 337 */
280static int 338static int
281try_connect (struct GNUNET_DHT_Handle *handle) 339try_connect (struct GNUNET_DHT_Handle *handle)
282{ 340{
283 if (handle->client != NULL) 341 if (NULL != handle->client)
284 return GNUNET_OK; 342 return GNUNET_OK;
285 handle->in_receive = GNUNET_NO; 343 handle->in_receive = GNUNET_NO;
286 handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg); 344 handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
287 if (handle->client == NULL) 345 if (NULL == handle->client)
288 { 346 {
289 LOG (GNUNET_ERROR_TYPE_WARNING, 347 LOG (GNUNET_ERROR_TYPE_WARNING,
290 _("Failed to connect to the DHT service!\n")); 348 _("Failed to connect to the DHT service!\n"));
@@ -324,6 +382,7 @@ add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
324 382
325/** 383/**
326 * Try to send messages from list of messages to send 384 * Try to send messages from list of messages to send
385 *
327 * @param handle DHT_Handle 386 * @param handle DHT_Handle
328 */ 387 */
329static void 388static void
@@ -369,9 +428,12 @@ try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
369static void 428static void
370do_disconnect (struct GNUNET_DHT_Handle *handle) 429do_disconnect (struct GNUNET_DHT_Handle *handle)
371{ 430{
372 if (handle->client == NULL) 431 struct GNUNET_DHT_PutHandle *ph;
432 struct GNUNET_DHT_PutHandle *next;
433
434 if (NULL == handle->client)
373 return; 435 return;
374 GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK); 436 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task);
375 if (NULL != handle->th) 437 if (NULL != handle->th)
376 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); 438 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
377 handle->th = NULL; 439 handle->th = NULL;
@@ -380,6 +442,20 @@ do_disconnect (struct GNUNET_DHT_Handle *handle)
380 (unsigned long long) handle->retry_time.rel_value); 442 (unsigned long long) handle->retry_time.rel_value);
381 GNUNET_CLIENT_disconnect (handle->client); 443 GNUNET_CLIENT_disconnect (handle->client);
382 handle->client = NULL; 444 handle->client = NULL;
445
446 /* signal disconnect to all PUT requests that were transmitted but waiting
447 for the put confirmation */
448 next = handle->put_head;
449 while (NULL != (ph = next))
450 {
451 next = ph->next;
452 if (NULL == ph->pending)
453 {
454 if (NULL != ph->cont)
455 ph->cont (ph->cont_cls, GNUNET_SYSERR);
456 GNUNET_DHT_put_cancel (ph);
457 }
458 }
383 handle->reconnect_task = 459 handle->reconnect_task =
384 GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle); 460 GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
385} 461}
@@ -387,6 +463,11 @@ do_disconnect (struct GNUNET_DHT_Handle *handle)
387 463
388/** 464/**
389 * Transmit the next pending message, called by notify_transmit_ready 465 * Transmit the next pending message, called by notify_transmit_ready
466 *
467 * @param cls the DHT handle
468 * @param size number of bytes available in 'buf' for transmission
469 * @param buf where to copy messages for the service
470 * @return number of bytes written to 'buf'
390 */ 471 */
391static size_t 472static size_t
392transmit_pending (void *cls, size_t size, void *buf); 473transmit_pending (void *cls, size_t size, void *buf);
@@ -394,20 +475,22 @@ transmit_pending (void *cls, size_t size, void *buf);
394 475
395/** 476/**
396 * Try to send messages from list of messages to send 477 * Try to send messages from list of messages to send
478 *
479 * @param handle handle to DHT
397 */ 480 */
398static void 481static void
399process_pending_messages (struct GNUNET_DHT_Handle *handle) 482process_pending_messages (struct GNUNET_DHT_Handle *handle)
400{ 483{
401 struct PendingMessage *head; 484 struct PendingMessage *head;
402 485
403 if (handle->client == NULL) 486 if (NULL == handle->client)
404 { 487 {
405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 488 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
406 "process_pending_messages called, but client is null, reconnecting\n"); 489 "process_pending_messages called, but client is NULL, reconnecting\n");
407 do_disconnect (handle); 490 do_disconnect (handle);
408 return; 491 return;
409 } 492 }
410 if (handle->th != NULL) 493 if (NULL != handle->th)
411 return; 494 return;
412 if (NULL == (head = handle->pending_head)) 495 if (NULL == (head = handle->pending_head))
413 return; 496 return;
@@ -427,6 +510,11 @@ process_pending_messages (struct GNUNET_DHT_Handle *handle)
427 510
428/** 511/**
429 * Transmit the next pending message, called by notify_transmit_ready 512 * Transmit the next pending message, called by notify_transmit_ready
513 *
514 * @param cls the DHT handle
515 * @param size number of bytes available in 'buf' for transmission
516 * @param buf where to copy messages for the service
517 * @return number of bytes written to 'buf'
430 */ 518 */
431static size_t 519static size_t
432transmit_pending (void *cls, size_t size, void *buf) 520transmit_pending (void *cls, size_t size, void *buf)
@@ -436,7 +524,7 @@ transmit_pending (void *cls, size_t size, void *buf)
436 size_t tsize; 524 size_t tsize;
437 525
438 handle->th = NULL; 526 handle->th = NULL;
439 if (buf == NULL) 527 if (NULL == buf)
440 { 528 {
441 LOG (GNUNET_ERROR_TYPE_DEBUG, 529 LOG (GNUNET_ERROR_TYPE_DEBUG,
442 "Transmission to DHT service failed! Reconnecting!\n"); 530 "Transmission to DHT service failed! Reconnecting!\n");
@@ -456,11 +544,6 @@ transmit_pending (void *cls, size_t size, void *buf)
456 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, 544 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
457 head); 545 head);
458 head->in_pending_queue = GNUNET_NO; 546 head->in_pending_queue = GNUNET_NO;
459 if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
460 {
461 GNUNET_SCHEDULER_cancel (head->timeout_task);
462 head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
463 }
464 if (NULL != head->cont) 547 if (NULL != head->cont)
465 { 548 {
466 head->cont (head->cont_cls, NULL); 549 head->cont (head->cont_cls, NULL);
@@ -557,33 +640,24 @@ process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
557 const struct GNUNET_DHT_MonitorGetMessage *msg) 640 const struct GNUNET_DHT_MonitorGetMessage *msg)
558{ 641{
559 struct GNUNET_DHT_MonitorHandle *h; 642 struct GNUNET_DHT_MonitorHandle *h;
560 size_t msize;
561 643
562 msize = ntohs (msg->header.size); 644 for (h = handle->monitor_head; NULL != h; h = h->next)
563 if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
564 return GNUNET_SYSERR;
565
566 h = handle->monitor_head;
567 while (NULL != h)
568 { 645 {
569 int type_ok; 646 int type_ok;
570 int key_ok; 647 int key_ok;
571 648
572 type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); 649 type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
573 key_ok = NULL == h->key || memcmp (h->key, &msg->key, 650 key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
574 sizeof (GNUNET_HashCode)) == 0; 651 sizeof (GNUNET_HashCode)));
575 if (type_ok && key_ok && NULL != h->get_cb) 652 if (type_ok && key_ok && (NULL != h->get_cb))
576 {
577 h->get_cb (h->cb_cls, 653 h->get_cb (h->cb_cls,
578 ntohl (msg->options), 654 ntohl (msg->options),
579 (enum GNUNET_BLOCK_Type) ntohl(msg->type), 655 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
580 ntohl (msg->hop_count), 656 ntohl (msg->hop_count),
581 ntohl (msg->desired_replication_level), 657 ntohl (msg->desired_replication_level),
582 ntohl (msg->get_path_length), 658 ntohl (msg->get_path_length),
583 (struct GNUNET_PeerIdentity *) &msg[1], 659 (struct GNUNET_PeerIdentity *) &msg[1],
584 &msg->key); 660 &msg->key);
585 }
586 h = h->next;
587 } 661 }
588 return GNUNET_OK; 662 return GNUNET_OK;
589} 663}
@@ -593,8 +667,7 @@ process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
593 * Process a get response monitor message from the service. 667 * Process a get response monitor message from the service.
594 * 668 *
595 * @param handle The DHT handle. 669 * @param handle The DHT handle.
596 * @param msg Monitor get response message from the service. 670 * @param msg monitor get response message from the service
597 *
598 * @return GNUNET_OK if everything went fine, 671 * @return GNUNET_OK if everything went fine,
599 * GNUNET_SYSERR if the message is malformed. 672 * GNUNET_SYSERR if the message is malformed.
600 */ 673 */
@@ -604,30 +677,30 @@ process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
604 *msg) 677 *msg)
605{ 678{
606 struct GNUNET_DHT_MonitorHandle *h; 679 struct GNUNET_DHT_MonitorHandle *h;
680 struct GNUNET_PeerIdentity *path;
681 uint32_t getl;
682 uint32_t putl;
607 size_t msize; 683 size_t msize;
608 684
609 msize = ntohs (msg->header.size); 685 msize = ntohs (msg->header.size);
610 if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) 686 path = (struct GNUNET_PeerIdentity *) &msg[1];
687 getl = ntohl (msg->get_path_length);
688 putl = ntohl (msg->put_path_length);
689 if ( (getl + putl < getl) ||
690 ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) )
691 {
692 GNUNET_break (0);
611 return GNUNET_SYSERR; 693 return GNUNET_SYSERR;
612 694 }
613 h = handle->monitor_head; 695 for (h = handle->monitor_head; NULL != h; h = h->next)
614 while (NULL != h)
615 { 696 {
616 int type_ok; 697 int type_ok;
617 int key_ok; 698 int key_ok;
618 699
619 type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); 700 type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
620 key_ok = NULL == h->key || memcmp (h->key, &msg->key, 701 key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
621 sizeof (GNUNET_HashCode)) == 0; 702 sizeof (GNUNET_HashCode)));
622 if (type_ok && key_ok && NULL != h->get_resp_cb) 703 if (type_ok && key_ok && (NULL != h->get_resp_cb))
623 {
624 struct GNUNET_PeerIdentity *path;
625 uint32_t getl;
626 uint32_t putl;
627
628 path = (struct GNUNET_PeerIdentity *) &msg[1];
629 getl = ntohl (msg->get_path_length);
630 putl = ntohl (msg->put_path_length);
631 h->get_resp_cb (h->cb_cls, 704 h->get_resp_cb (h->cb_cls,
632 (enum GNUNET_BLOCK_Type) ntohl(msg->type), 705 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
633 path, getl, 706 path, getl,
@@ -638,8 +711,6 @@ process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
638 msize - 711 msize -
639 sizeof (struct GNUNET_DHT_MonitorGetRespMessage) - 712 sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
640 sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); 713 sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
641 }
642 h = h->next;
643 } 714 }
644 return GNUNET_OK; 715 return GNUNET_OK;
645} 716}
@@ -660,27 +731,26 @@ process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
660{ 731{
661 struct GNUNET_DHT_MonitorHandle *h; 732 struct GNUNET_DHT_MonitorHandle *h;
662 size_t msize; 733 size_t msize;
734 struct GNUNET_PeerIdentity *path;
735 uint32_t putl;
663 736
664 msize = ntohs (msg->header.size); 737 msize = ntohs (msg->header.size);
665 if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage)) 738 path = (struct GNUNET_PeerIdentity *) &msg[1];
739 putl = ntohl (msg->put_path_length);
740 if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl)
741 {
742 GNUNET_break (0);
666 return GNUNET_SYSERR; 743 return GNUNET_SYSERR;
667 744 }
668 h = handle->monitor_head; 745 for (h = handle->monitor_head; NULL != h; h = h->next)
669 while (NULL != h)
670 { 746 {
671 int type_ok; 747 int type_ok;
672 int key_ok; 748 int key_ok;
673 749
674 type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); 750 type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
675 key_ok = NULL == h->key || memcmp (h->key, &msg->key, 751 key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
676 sizeof (GNUNET_HashCode)) == 0; 752 sizeof (GNUNET_HashCode)));
677 if (type_ok && key_ok && NULL != h->put_cb) 753 if (type_ok && key_ok && (NULL != h->put_cb))
678 {
679 struct GNUNET_PeerIdentity *path;
680 uint32_t putl;
681
682 path = (struct GNUNET_PeerIdentity *) &msg[1];
683 putl = ntohl (msg->put_path_length);
684 h->put_cb (h->cb_cls, 754 h->put_cb (h->cb_cls,
685 ntohl (msg->options), 755 ntohl (msg->options),
686 (enum GNUNET_BLOCK_Type) ntohl(msg->type), 756 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
@@ -693,55 +763,41 @@ process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
693 msize - 763 msize -
694 sizeof (struct GNUNET_DHT_MonitorPutMessage) - 764 sizeof (struct GNUNET_DHT_MonitorPutMessage) -
695 sizeof (struct GNUNET_PeerIdentity) * putl); 765 sizeof (struct GNUNET_PeerIdentity) * putl);
696 }
697 h = h->next;
698 } 766 }
699 return GNUNET_OK; 767 return GNUNET_OK;
700} 768}
701 769
702 770
703/** 771/**
704 * Process a monitoring message from the service: demultiplex for proper type. 772 * Process a put confirmation message from the service.
705 * 773 *
706 * @param handle The DHT handle. 774 * @param handle The DHT handle.
707 * @param msg Message from the service. 775 * @param msg confirmation message from the service.
708 *
709 * @return GNUNET_OK if everything went fine, 776 * @return GNUNET_OK if everything went fine,
710 * GNUNET_SYSERR if the message is malformed. 777 * GNUNET_SYSERR if the message is malformed.
711 */ 778 */
712static int 779static int
713process_monitor_message (struct GNUNET_DHT_Handle *handle, 780process_put_confirmation_message (struct GNUNET_DHT_Handle *handle,
714 const struct GNUNET_MessageHeader *msg) 781 const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
715{ 782{
716 switch (ntohs (msg->type)) 783 struct GNUNET_DHT_PutHandle *ph;
717 { 784 GNUNET_DHT_PutContinuation cont;
718 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET: 785 void *cont_cls;
719 return process_monitor_get_message(handle,
720 (struct GNUNET_DHT_MonitorGetMessage *)
721 msg);
722 786
723 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP: 787 for (ph = handle->put_head; NULL != ph; ph = ph->next)
724 { 788 if (ph->unique_id == msg->unique_id)
725 return process_monitor_get_resp_message( 789 break;
726 handle, 790 if (NULL == ph)
727 (struct GNUNET_DHT_MonitorGetRespMessage *) msg); 791 return GNUNET_OK;
728 } 792 cont = ph->cont;
729 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT: 793 cont_cls = ph->cont_cls;
730 { 794 GNUNET_DHT_put_cancel (ph);
731 return process_monitor_put_message(handle, 795 if (NULL != cont)
732 (struct GNUNET_DHT_MonitorPutMessage *) 796 cont (cont_cls, GNUNET_OK);
733 msg); 797 return GNUNET_OK;
734 }
735 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
736 /* Not implemented yet */
737 GNUNET_break(0);
738 /* Fall through */
739 default:
740 GNUNET_break(0);
741 return GNUNET_SYSERR;
742 }
743} 798}
744 799
800
745/** 801/**
746 * Handler for messages received from the DHT service 802 * Handler for messages received from the DHT service
747 * a demultiplexer which handles numerous message types 803 * a demultiplexer which handles numerous message types
@@ -754,38 +810,84 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
754{ 810{
755 struct GNUNET_DHT_Handle *handle = cls; 811 struct GNUNET_DHT_Handle *handle = cls;
756 const struct GNUNET_DHT_ClientResultMessage *dht_msg; 812 const struct GNUNET_DHT_ClientResultMessage *dht_msg;
813 uint16_t msize;
814 int ret;
757 815
758 if (msg == NULL) 816 if (NULL == msg)
759 { 817 {
760 LOG (GNUNET_ERROR_TYPE_DEBUG, 818 LOG (GNUNET_ERROR_TYPE_DEBUG,
761 "Error receiving data from DHT service, reconnecting\n"); 819 "Error receiving data from DHT service, reconnecting\n");
762 do_disconnect (handle); 820 do_disconnect (handle);
763 return; 821 return;
764 } 822 }
765 if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT) 823 ret = GNUNET_SYSERR;
824 msize = ntohs (msg->size);
825 switch (ntohs (msg->type))
766 { 826 {
767 if (process_monitor_message (handle, msg) == GNUNET_OK) 827 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
828 if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
768 { 829 {
769 GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, 830 GNUNET_break (0);
770 GNUNET_TIME_UNIT_FOREVER_REL); 831 break;
771 return;
772 } 832 }
773 GNUNET_break (0); 833 ret = process_monitor_get_message(handle,
774 do_disconnect (handle); 834 (const struct GNUNET_DHT_MonitorGetMessage *) msg);
775 return; 835 break;
836 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
837 if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
838 {
839 GNUNET_break (0);
840 break;
841 }
842 ret = process_monitor_get_resp_message(handle,
843 (const struct GNUNET_DHT_MonitorGetRespMessage *) msg);
844 break;
845 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
846 if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
847 {
848 GNUNET_break (0);
849 break;
850 }
851 ret = process_monitor_put_message(handle,
852 (const struct GNUNET_DHT_MonitorPutMessage *) msg);
853 break;
854 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
855 /* Not implemented yet */
856 GNUNET_break(0);
857 break;
858 case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT:
859 if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
860 {
861 GNUNET_break (0);
862 break;
863 }
864 ret = GNUNET_OK;
865 dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
866 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
867 GNUNET_h2s (&dht_msg->key), handle);
868 GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
869 &dht_msg->key, &process_reply,
870 (void *) dht_msg);
871 break;
872 case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK:
873 if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage))
874 {
875 GNUNET_break (0);
876 break;
877 }
878 ret = process_put_confirmation_message (handle,
879 (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg);
880 break;
881 default:
882 GNUNET_break(0);
883 break;
776 } 884 }
777 if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage)) 885 if (GNUNET_OK != ret)
778 { 886 {
779 GNUNET_break (0); 887 GNUNET_break (0);
780 do_disconnect (handle); 888 do_disconnect (handle);
781 return; 889 return;
782 } 890 }
783 dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
784 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
785 GNUNET_h2s (&dht_msg->key), handle);
786 GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
787 &dht_msg->key, &process_reply,
788 (void *) dht_msg);
789 GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, 891 GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
790 GNUNET_TIME_UNIT_FOREVER_REL); 892 GNUNET_TIME_UNIT_FOREVER_REL);
791} 893}
@@ -829,11 +931,12 @@ void
829GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) 931GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
830{ 932{
831 struct PendingMessage *pm; 933 struct PendingMessage *pm;
934 struct GNUNET_DHT_PutHandle *ph;
832 935
833 GNUNET_assert (handle != NULL); 936 GNUNET_assert (NULL != handle);
834 GNUNET_assert (0 == 937 GNUNET_assert (0 ==
835 GNUNET_CONTAINER_multihashmap_size (handle->active_requests)); 938 GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
836 if (handle->th != NULL) 939 if (NULL != handle->th)
837 { 940 {
838 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); 941 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
839 handle->th = NULL; 942 handle->th = NULL;
@@ -845,18 +948,24 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
845 pm); 948 pm);
846 pm->in_pending_queue = GNUNET_NO; 949 pm->in_pending_queue = GNUNET_NO;
847 GNUNET_assert (GNUNET_YES == pm->free_on_send); 950 GNUNET_assert (GNUNET_YES == pm->free_on_send);
848 if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
849 GNUNET_SCHEDULER_cancel (pm->timeout_task);
850 if (NULL != pm->cont) 951 if (NULL != pm->cont)
851 pm->cont (pm->cont_cls, NULL); 952 pm->cont (pm->cont_cls, NULL);
852 GNUNET_free (pm); 953 GNUNET_free (pm);
853 } 954 }
854 if (handle->client != NULL) 955 while (NULL != (ph = handle->put_head))
956 {
957 GNUNET_break (NULL == ph->pending);
958 if (NULL != ph->cont)
959 ph->cont (ph->cont_cls, GNUNET_SYSERR);
960 GNUNET_DHT_put_cancel (ph);
961 }
962
963 if (NULL != handle->client)
855 { 964 {
856 GNUNET_CLIENT_disconnect (handle->client); 965 GNUNET_CLIENT_disconnect (handle->client);
857 handle->client = NULL; 966 handle->client = NULL;
858 } 967 }
859 if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK) 968 if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
860 GNUNET_SCHEDULER_cancel (handle->reconnect_task); 969 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
861 GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests); 970 GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
862 GNUNET_free (handle); 971 GNUNET_free (handle);
@@ -872,17 +981,40 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
872static void 981static void
873timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 982timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
874{ 983{
875 struct PendingMessage *pending = cls; 984 struct GNUNET_DHT_PutHandle *ph = cls;
876 struct GNUNET_DHT_Handle *handle; 985 struct GNUNET_DHT_Handle *handle = ph->dht_handle;
877 986
878 handle = pending->handle; 987 ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
879 GNUNET_assert (GNUNET_YES == pending->in_pending_queue); 988 if (NULL != ph->pending)
880 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, 989 {
881 pending); 990 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
882 pending->in_pending_queue = GNUNET_NO; 991 ph->pending);
883 if (pending->cont != NULL) 992 ph->pending->in_pending_queue = GNUNET_NO;
884 pending->cont (pending->cont_cls, tc); 993 GNUNET_free (ph->pending);
885 GNUNET_free (pending); 994 }
995 if (NULL != ph->cont)
996 ph->cont (ph->cont_cls, GNUNET_NO);
997 GNUNET_CONTAINER_DLL_remove (handle->put_head,
998 handle->put_tail,
999 ph);
1000 GNUNET_free (ph);
1001}
1002
1003
1004/**
1005 * Function called whenever the PUT message leaves the queue. Sets
1006 * the message pointer in the put handle to NULL.
1007 *
1008 * @param cls the 'struct GNUNET_DHT_PutHandle'
1009 * @param tc unused
1010 */
1011static void
1012mark_put_message_gone (void *cls,
1013 const struct GNUNET_SCHEDULER_TaskContext *tc)
1014{
1015 struct GNUNET_DHT_PutHandle *ph = cls;
1016
1017 ph->pending = NULL;
886} 1018}
887 1019
888 1020
@@ -907,49 +1039,94 @@ timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
907 * You must not call GNUNET_DHT_DISCONNECT in this continuation 1039 * You must not call GNUNET_DHT_DISCONNECT in this continuation
908 * @param cont_cls closure for cont 1040 * @param cont_cls closure for cont
909 */ 1041 */
910void 1042struct GNUNET_DHT_PutHandle *
911GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, 1043GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
912 uint32_t desired_replication_level, 1044 uint32_t desired_replication_level,
913 enum GNUNET_DHT_RouteOption options, 1045 enum GNUNET_DHT_RouteOption options,
914 enum GNUNET_BLOCK_Type type, size_t size, const char *data, 1046 enum GNUNET_BLOCK_Type type, size_t size, const char *data,
915 struct GNUNET_TIME_Absolute exp, 1047 struct GNUNET_TIME_Absolute exp,
916 struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont, 1048 struct GNUNET_TIME_Relative timeout, GNUNET_DHT_PutContinuation cont,
917 void *cont_cls) 1049 void *cont_cls)
918{ 1050{
919 struct GNUNET_DHT_ClientPutMessage *put_msg; 1051 struct GNUNET_DHT_ClientPutMessage *put_msg;
920 size_t msize; 1052 size_t msize;
921 struct PendingMessage *pending; 1053 struct PendingMessage *pending;
1054 struct GNUNET_DHT_PutHandle *ph;
922 1055
923 msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size; 1056 msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
924 if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || 1057 if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
925 (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)) 1058 (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
926 { 1059 {
927 GNUNET_break (0); 1060 GNUNET_break (0);
928 if (NULL != cont) 1061 return NULL;
929 cont (cont_cls, NULL);
930 return;
931 } 1062 }
1063 ph = GNUNET_malloc (sizeof (struct GNUNET_DHT_PutHandle));
1064 ph->dht_handle = handle;
1065 ph->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, ph);
1066 ph->cont = cont;
1067 ph->cont_cls = cont_cls;
1068 ph->unique_id = ++handle->uid_gen;
932 pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize); 1069 pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1070 ph->pending = pending;
933 put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1]; 1071 put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
934 pending->msg = &put_msg->header; 1072 pending->msg = &put_msg->header;
935 pending->handle = handle; 1073 pending->handle = handle;
936 pending->cont = cont; 1074 pending->cont = &mark_put_message_gone;
937 pending->cont_cls = cont_cls; 1075 pending->cont_cls = ph;
938 pending->free_on_send = GNUNET_YES; 1076 pending->free_on_send = GNUNET_YES;
939 pending->timeout_task =
940 GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending);
941 put_msg->header.size = htons (msize); 1077 put_msg->header.size = htons (msize);
942 put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT); 1078 put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
943 put_msg->type = htonl (type); 1079 put_msg->type = htonl (type);
944 put_msg->options = htonl ((uint32_t) options); 1080 put_msg->options = htonl ((uint32_t) options);
945 put_msg->desired_replication_level = htonl (desired_replication_level); 1081 put_msg->desired_replication_level = htonl (desired_replication_level);
1082 put_msg->unique_id = ph->unique_id;
946 put_msg->expiration = GNUNET_TIME_absolute_hton (exp); 1083 put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
947 put_msg->key = *key; 1084 put_msg->key = *key;
948 memcpy (&put_msg[1], data, size); 1085 memcpy (&put_msg[1], data, size);
949 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, 1086 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
950 pending); 1087 pending);
951 pending->in_pending_queue = GNUNET_YES; 1088 pending->in_pending_queue = GNUNET_YES;
1089 GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1090 handle->put_tail,
1091 ph);
952 process_pending_messages (handle); 1092 process_pending_messages (handle);
1093 return ph;
1094}
1095
1096
1097/**
1098 * Cancels a DHT PUT operation. Note that the PUT request may still
1099 * go out over the network (we can't stop that); However, if the PUT
1100 * has not yet been sent to the service, cancelling the PUT will stop
1101 * this from happening (but there is no way for the user of this API
1102 * to tell if that is the case). The only use for this API is to
1103 * prevent a later call to 'cont' from "GNUNET_DHT_put" (i.e. because
1104 * the system is shutting down).
1105 *
1106 * @param ph put operation to cancel ('cont' will no longer be called)
1107 */
1108void
1109GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1110{
1111 struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1112
1113 if (NULL != ph->pending)
1114 {
1115 GNUNET_CONTAINER_DLL_remove (handle->pending_head,
1116 handle->pending_tail,
1117 ph->pending);
1118 GNUNET_free (ph->pending);
1119 ph->pending = NULL;
1120 }
1121 if (ph->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1122 {
1123 GNUNET_SCHEDULER_cancel (ph->timeout_task);
1124 ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1125 }
1126 GNUNET_CONTAINER_DLL_remove (handle->put_head,
1127 handle->put_tail,
1128 ph);
1129 GNUNET_free (ph);
953} 1130}
954 1131
955 1132
@@ -1004,8 +1181,7 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1004 get_msg->desired_replication_level = htonl (desired_replication_level); 1181 get_msg->desired_replication_level = htonl (desired_replication_level);
1005 get_msg->type = htonl (type); 1182 get_msg->type = htonl (type);
1006 get_msg->key = *key; 1183 get_msg->key = *key;
1007 handle->uid_gen++; 1184 get_msg->unique_id = ++handle->uid_gen;
1008 get_msg->unique_id = handle->uid_gen;
1009 memcpy (&get_msg[1], xquery, xquery_size); 1185 memcpy (&get_msg[1], xquery, xquery_size);
1010 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, 1186 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1011 pending); 1187 pending);
diff --git a/src/dht/gnunet-dht-put.c b/src/dht/gnunet-dht-put.c
index ef5ae5ea7..59acc792b 100644
--- a/src/dht/gnunet-dht-put.c
+++ b/src/dht/gnunet-dht-put.c
@@ -80,7 +80,7 @@ static char *data;
80static void 80static void
81shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 81shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
82{ 82{
83 if (dht_handle != NULL) 83 if (NULL != dht_handle)
84 { 84 {
85 GNUNET_DHT_disconnect (dht_handle); 85 GNUNET_DHT_disconnect (dht_handle);
86 dht_handle = NULL; 86 dht_handle = NULL;
@@ -91,13 +91,33 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
91 * Signature of the main function of a task. 91 * Signature of the main function of a task.
92 * 92 *
93 * @param cls closure 93 * @param cls closure
94 * @param tc context information (why was this task triggered now) 94 * @param success GNUNET_OK if the PUT was transmitted,
95 * GNUNET_NO on timeout,
96 * GNUNET_SYSERR on disconnect from service
97 * after the PUT message was transmitted
98 * (so we don't know if it was received or not)
95 */ 99 */
96void 100static void
97message_sent_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 101message_sent_cont (void *cls, int success)
98{ 102{
99 if (verbose) 103 if (verbose)
100 FPRINTF (stderr, "%s", _("PUT request sent!\n")); 104 {
105 switch (success)
106 {
107 case GNUNET_OK:
108 FPRINTF (stderr, "%s", _("PUT request sent!\n"));
109 break;
110 case GNUNET_NO:
111 FPRINTF (stderr, "%s", _("Timeout sending PUT request!\n"));
112 break;
113 case GNUNET_SYSERR:
114 FPRINTF (stderr, "%s", _("PUT request not confirmed!\n"));
115 break;
116 default:
117 GNUNET_break (0);
118 break;
119 }
120 }
101 GNUNET_SCHEDULER_add_now (&shutdown_task, NULL); 121 GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
102} 122}
103 123
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
index f26d77792..9eb1ef497 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -262,6 +262,31 @@ static GNUNET_SCHEDULER_TaskIdentifier retry_task;
262 262
263 263
264/** 264/**
265 * Task run to check for messages that need to be sent to a client.
266 *
267 * @param client a ClientList, containing the client and any messages to be sent to it
268 */
269static void
270process_pending_messages (struct ClientList *client);
271
272
273/**
274 * Add a PendingMessage to the clients list of messages to be sent
275 *
276 * @param client the active client to send the message to
277 * @param pending_message the actual message to send
278 */
279static void
280add_pending_message (struct ClientList *client,
281 struct PendingMessage *pending_message)
282{
283 GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
284 pending_message);
285 process_pending_messages (client);
286}
287
288
289/**
265 * Find a client if it exists, add it otherwise. 290 * Find a client if it exists, add it otherwise.
266 * 291 *
267 * @param client the server handle to the client 292 * @param client the server handle to the client
@@ -304,11 +329,9 @@ remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
304 329
305 if (record->client != client) 330 if (record->client != client)
306 return GNUNET_YES; 331 return GNUNET_YES;
307#if DEBUG_DHT
308 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 332 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
309 "Removing client %p's record for key %s\n", client, 333 "Removing client %p's record for key %s\n", client,
310 GNUNET_h2s (key)); 334 GNUNET_h2s (key));
311#endif
312 GNUNET_assert (GNUNET_YES == 335 GNUNET_assert (GNUNET_YES ==
313 GNUNET_CONTAINER_multihashmap_remove (forward_map, key, 336 GNUNET_CONTAINER_multihashmap_remove (forward_map, key,
314 record)); 337 record));
@@ -335,9 +358,7 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
335 struct PendingMessage *reply; 358 struct PendingMessage *reply;
336 struct ClientMonitorRecord *monitor; 359 struct ClientMonitorRecord *monitor;
337 360
338#if DEBUG_DHT
339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", client); 361 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", client);
340#endif
341 pos = find_active_client (client); 362 pos = find_active_client (client);
342 GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos); 363 GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos);
343 if (pos->transmit_handle != NULL) 364 if (pos->transmit_handle != NULL)
@@ -464,6 +485,8 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
464 const struct GNUNET_DHT_ClientPutMessage *dht_msg; 485 const struct GNUNET_DHT_ClientPutMessage *dht_msg;
465 struct GNUNET_CONTAINER_BloomFilter *peer_bf; 486 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
466 uint16_t size; 487 uint16_t size;
488 struct PendingMessage *pm;
489 struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
467 490
468 size = ntohs (message->size); 491 size = ntohs (message->size);
469 if (size < sizeof (struct GNUNET_DHT_ClientPutMessage)) 492 if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
@@ -478,12 +501,10 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
478 GNUNET_NO); 501 GNUNET_NO);
479 dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message; 502 dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
480 /* give to local clients */ 503 /* give to local clients */
481#if DEBUG_DHT
482 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 504 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
483 "Handling local PUT of %u-bytes for query %s\n", 505 "Handling local PUT of %u-bytes for query %s\n",
484 size - sizeof (struct GNUNET_DHT_ClientPutMessage), 506 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
485 GNUNET_h2s (&dht_msg->key)); 507 GNUNET_h2s (&dht_msg->key));
486#endif
487 GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration), 508 GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
488 &dht_msg->key, 0, NULL, 0, NULL, 509 &dht_msg->key, 0, NULL, 0, NULL,
489 ntohl (dht_msg->type), 510 ntohl (dht_msg->type),
@@ -516,6 +537,15 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
516 &dht_msg[1], 537 &dht_msg[1],
517 size - sizeof (struct GNUNET_DHT_ClientPutMessage)); 538 size - sizeof (struct GNUNET_DHT_ClientPutMessage));
518 GNUNET_CONTAINER_bloomfilter_free (peer_bf); 539 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
540 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
541 sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
542 conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1];
543 conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
544 conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
545 conf->reserved = htonl (0);
546 conf->unique_id = dht_msg->unique_id;
547 pm->msg = &conf->header;
548 add_pending_message (find_active_client (client), pm);
519 GNUNET_SERVER_receive_done (client, GNUNET_OK); 549 GNUNET_SERVER_receive_done (client, GNUNET_OK);
520} 550}
521 551
@@ -553,11 +583,9 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
553 gettext_noop 583 gettext_noop
554 ("# GET requests received from clients"), 1, 584 ("# GET requests received from clients"), 1,
555 GNUNET_NO); 585 GNUNET_NO);
556#if DEBUG_DHT
557 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 586 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558 "Received request for %s from local client %p\n", 587 "Received request for %s from local client %p\n",
559 GNUNET_h2s (&get->key), client); 588 GNUNET_h2s (&get->key), client);
560#endif
561 cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); 589 cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
562 cqr->key = get->key; 590 cqr->key = get->key;
563 cqr->client = find_active_client (client); 591 cqr->client = find_active_client (client);
@@ -625,11 +653,9 @@ remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value)
625 653
626 if (record->unique_id != ctx->unique_id) 654 if (record->unique_id != ctx->unique_id)
627 return GNUNET_YES; 655 return GNUNET_YES;
628#if DEBUG_DHT
629 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 656 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
630 "Removing client %p's record for key %s (by unique id)\n", 657 "Removing client %p's record for key %s (by unique id)\n",
631 ctx->client->client_handle, GNUNET_h2s (key)); 658 ctx->client->client_handle, GNUNET_h2s (key));
632#endif
633 return remove_client_records (ctx->client, key, record); 659 return remove_client_records (ctx->client, key, record);
634} 660}
635 661
@@ -655,10 +681,8 @@ handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
655 gettext_noop 681 gettext_noop
656 ("# GET STOP requests received from clients"), 1, 682 ("# GET STOP requests received from clients"), 1,
657 GNUNET_NO); 683 GNUNET_NO);
658#if DEBUG_DHT
659 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p stopped request for key %s\n", 684 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p stopped request for key %s\n",
660 client, GNUNET_h2s (&dht_stop_msg->key)); 685 client, GNUNET_h2s (&dht_stop_msg->key));
661#endif
662 ctx.client = find_active_client (client); 686 ctx.client = find_active_client (client);
663 ctx.unique_id = dht_stop_msg->unique_id; 687 ctx.unique_id = dht_stop_msg->unique_id;
664 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->key, 688 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->key,
@@ -708,15 +732,6 @@ handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client,
708 732
709 733
710/** 734/**
711 * Task run to check for messages that need to be sent to a client.
712 *
713 * @param client a ClientList, containing the client and any messages to be sent to it
714 */
715static void
716process_pending_messages (struct ClientList *client);
717
718
719/**
720 * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready 735 * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
721 * request. A ClientList is passed as closure, take the head of the list 736 * request. A ClientList is passed as closure, take the head of the list
722 * and copy it into buf, which has the result of sending the message to the 737 * and copy it into buf, which has the result of sending the message to the
@@ -741,11 +756,9 @@ send_reply_to_client (void *cls, size_t size, void *buf)
741 if (buf == NULL) 756 if (buf == NULL)
742 { 757 {
743 /* client disconnected */ 758 /* client disconnected */
744#if DEBUG_DHT
745 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746 "Client %p disconnected, pending messages will be discarded\n", 760 "Client %p disconnected, pending messages will be discarded\n",
747 client->client_handle); 761 client->client_handle);
748#endif
749 return 0; 762 return 0;
750 } 763 }
751 off = 0; 764 off = 0;
@@ -756,17 +769,13 @@ send_reply_to_client (void *cls, size_t size, void *buf)
756 reply); 769 reply);
757 memcpy (&cbuf[off], reply->msg, msize); 770 memcpy (&cbuf[off], reply->msg, msize);
758 GNUNET_free (reply); 771 GNUNET_free (reply);
759#if DEBUG_DHT
760 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client %p\n", 772 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client %p\n",
761 msize, client->client_handle); 773 msize, client->client_handle);
762#endif
763 off += msize; 774 off += msize;
764 } 775 }
765 process_pending_messages (client); 776 process_pending_messages (client);
766#if DEBUG_DHT
767 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client %p\n", 777 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client %p\n",
768 (unsigned int) off, (unsigned int) size, client->client_handle); 778 (unsigned int) off, (unsigned int) size, client->client_handle);
769#endif
770 return off; 779 return off;
771} 780}
772 781
@@ -781,20 +790,16 @@ process_pending_messages (struct ClientList *client)
781{ 790{
782 if ((client->pending_head == NULL) || (client->transmit_handle != NULL)) 791 if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
783 { 792 {
784#if DEBUG_DHT
785 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 793 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786 "Not asking for transmission to %p now: %s\n", 794 "Not asking for transmission to %p now: %s\n",
787 client->client_handle, 795 client->client_handle,
788 client->pending_head == 796 client->pending_head ==
789 NULL ? "no more messages" : "request already pending"); 797 NULL ? "no more messages" : "request already pending");
790#endif
791 return; 798 return;
792 } 799 }
793#if DEBUG_DHT
794 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 800 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
795 "Asking for transmission of %u bytes to client %p\n", 801 "Asking for transmission of %u bytes to client %p\n",
796 ntohs (client->pending_head->msg->size), client->client_handle); 802 ntohs (client->pending_head->msg->size), client->client_handle);
797#endif
798 client->transmit_handle = 803 client->transmit_handle =
799 GNUNET_SERVER_notify_transmit_ready (client->client_handle, 804 GNUNET_SERVER_notify_transmit_ready (client->client_handle,
800 ntohs (client->pending_head-> 805 ntohs (client->pending_head->
@@ -805,22 +810,6 @@ process_pending_messages (struct ClientList *client)
805 810
806 811
807/** 812/**
808 * Add a PendingMessage to the clients list of messages to be sent
809 *
810 * @param client the active client to send the message to
811 * @param pending_message the actual message to send
812 */
813static void
814add_pending_message (struct ClientList *client,
815 struct PendingMessage *pending_message)
816{
817 GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
818 pending_message);
819 process_pending_messages (client);
820}
821
822
823/**
824 * Closure for 'forward_reply' 813 * Closure for 'forward_reply'
825 */ 814 */
826struct ForwardReplyContext 815struct ForwardReplyContext
@@ -879,11 +868,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
879 868
880 if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type)) 869 if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type))
881 { 870 {
882#if DEBUG_DHT
883 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 871 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
884 "Record type missmatch, not passing request for key %s to local client\n", 872 "Record type missmatch, not passing request for key %s to local client\n",
885 GNUNET_h2s (key)); 873 GNUNET_h2s (key));
886#endif
887 GNUNET_STATISTICS_update (GDS_stats, 874 GNUNET_STATISTICS_update (GDS_stats,
888 gettext_noop 875 gettext_noop
889 ("# Key match, type mismatches in REPLY to CLIENT"), 876 ("# Key match, type mismatches in REPLY to CLIENT"),
@@ -894,11 +881,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
894 for (i = 0; i < record->seen_replies_count; i++) 881 for (i = 0; i < record->seen_replies_count; i++)
895 if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (GNUNET_HashCode))) 882 if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (GNUNET_HashCode)))
896 { 883 {
897#if DEBUG_DHT
898 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 884 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
899 "Duplicate reply, not passing request for key %s to local client\n", 885 "Duplicate reply, not passing request for key %s to local client\n",
900 GNUNET_h2s (key)); 886 GNUNET_h2s (key));
901#endif
902 GNUNET_STATISTICS_update (GDS_stats, 887 GNUNET_STATISTICS_update (GDS_stats,
903 gettext_noop 888 gettext_noop
904 ("# Duplicate REPLIES to CLIENT request dropped"), 889 ("# Duplicate REPLIES to CLIENT request dropped"),
@@ -909,11 +894,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
909 GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0, 894 GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0,
910 record->xquery, record->xquery_size, frc->data, 895 record->xquery, record->xquery_size, frc->data,
911 frc->data_size); 896 frc->data_size);
912#if DEBUG_DHT
913 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 897 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914 "Evaluation result is %d for key %s for local client's query\n", 898 "Evaluation result is %d for key %s for local client's query\n",
915 (int) eval, GNUNET_h2s (key)); 899 (int) eval, GNUNET_h2s (key));
916#endif
917 switch (eval) 900 switch (eval)
918 { 901 {
919 case GNUNET_BLOCK_EVALUATION_OK_LAST: 902 case GNUNET_BLOCK_EVALUATION_OK_LAST:
@@ -964,11 +947,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
964 GNUNET_NO); 947 GNUNET_NO);
965 reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1]; 948 reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1];
966 reply->unique_id = record->unique_id; 949 reply->unique_id = record->unique_id;
967#if DEBUG_DHT
968 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 950 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969 "Queueing reply to query %s for client %p\n", GNUNET_h2s (key), 951 "Queueing reply to query %s for client %p\n", GNUNET_h2s (key),
970 record->client->client_handle); 952 record->client->client_handle);
971#endif
972 add_pending_message (record->client, pm); 953 add_pending_message (record->client, pm);
973 if (GNUNET_YES == do_free) 954 if (GNUNET_YES == do_free)
974 remove_client_records (record->client, key, record); 955 remove_client_records (record->client, key, record);
diff --git a/src/dht/test_dht_api.c b/src/dht/test_dht_api.c
index 182856a8e..4f96cb82b 100644
--- a/src/dht/test_dht_api.c
+++ b/src/dht/test_dht_api.c
@@ -195,10 +195,10 @@ test_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
195 * Signature of the main function of a task. 195 * Signature of the main function of a task.
196 * 196 *
197 * @param cls closure 197 * @param cls closure
198 * @param tc context information (why was this task triggered now) 198 * @param success result of PUT
199 */ 199 */
200static void 200static void
201test_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 201test_get (void *cls, int success)
202{ 202{
203 struct PeerContext *peer = cls; 203 struct PeerContext *peer = cls;
204 GNUNET_HashCode hash; 204 GNUNET_HashCode hash;
diff --git a/src/dht/test_dht_multipeer.c b/src/dht/test_dht_multipeer.c
index 94e39d259..a821b303c 100644
--- a/src/dht/test_dht_multipeer.c
+++ b/src/dht/test_dht_multipeer.c
@@ -678,7 +678,7 @@ start_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
678 * Called when the PUT request has been transmitted to the DHT service. 678 * Called when the PUT request has been transmitted to the DHT service.
679 */ 679 */
680static void 680static void
681put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 681put_finished (void *cls, int success)
682{ 682{
683 struct TestPutContext *test_put = cls; 683 struct TestPutContext *test_put = cls;
684 684
diff --git a/src/dht/test_dht_twopeer_path_tracking.c b/src/dht/test_dht_twopeer_path_tracking.c
index 6e764a387..31d4f92ae 100644
--- a/src/dht/test_dht_twopeer_path_tracking.c
+++ b/src/dht/test_dht_twopeer_path_tracking.c
@@ -249,7 +249,7 @@ get_result_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
249 * Schedule the GET request for some time in the future. 249 * Schedule the GET request for some time in the future.
250 */ 250 */
251static void 251static void
252put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 252put_finished (void *cls, int success)
253{ 253{
254 GNUNET_HashCode key; /* Key for data lookup */ 254 GNUNET_HashCode key; /* Key for data lookup */
255 255
diff --git a/src/dht/test_dht_twopeer_put_get.c b/src/dht/test_dht_twopeer_put_get.c
index 48bf9f823..b2dd5a70b 100644
--- a/src/dht/test_dht_twopeer_put_get.c
+++ b/src/dht/test_dht_twopeer_put_get.c
@@ -129,9 +129,15 @@ static struct GNUNET_DHT_Handle *peer1dht;
129static struct GNUNET_DHT_Handle *peer2dht; 129static struct GNUNET_DHT_Handle *peer2dht;
130 130
131/** 131/**
132 * Handle for our PUT operation.
133 */
134static struct GNUNET_DHT_PutHandle *put_op;
135
136
137/**
132 * Check whether peers successfully shut down. 138 * Check whether peers successfully shut down.
133 */ 139 */
134void 140static void
135shutdown_callback (void *cls, const char *emsg) 141shutdown_callback (void *cls, const char *emsg)
136{ 142{
137 if (emsg != NULL) 143 if (emsg != NULL)
@@ -164,6 +170,11 @@ finish_testing (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
164static void 170static void
165end_badly_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 171end_badly_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
166{ 172{
173 if (NULL != put_op)
174 {
175 GNUNET_DHT_put_cancel (put_op);
176 put_op = NULL;
177 }
167 if (peer1dht != NULL) 178 if (peer1dht != NULL)
168 GNUNET_DHT_disconnect (peer1dht); 179 GNUNET_DHT_disconnect (peer1dht);
169 180
@@ -174,6 +185,7 @@ end_badly_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
174 GNUNET_TESTING_daemons_stop (pg, TIMEOUT, &shutdown_callback, NULL); 185 GNUNET_TESTING_daemons_stop (pg, TIMEOUT, &shutdown_callback, NULL);
175} 186}
176 187
188
177/** 189/**
178 * Check if the get_handle is being used, if so stop the request. Either 190 * Check if the get_handle is being used, if so stop the request. Either
179 * way, schedule the end_badly_cont function which actually shuts down the 191 * way, schedule the end_badly_cont function which actually shuts down the
@@ -242,10 +254,11 @@ get_result_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
242 * Schedule the GET request for some time in the future. 254 * Schedule the GET request for some time in the future.
243 */ 255 */
244static void 256static void
245put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 257put_finished (void *cls, int success)
246{ 258{
247 GNUNET_HashCode key; /* Key for data lookup */ 259 GNUNET_HashCode key; /* Key for data lookup */
248 260
261 put_op = NULL;
249 GNUNET_SCHEDULER_cancel (die_task); 262 GNUNET_SCHEDULER_cancel (die_task);
250 die_task = 263 die_task =
251 GNUNET_SCHEDULER_add_delayed (GET_TIMEOUT, &end_badly, 264 GNUNET_SCHEDULER_add_delayed (GET_TIMEOUT, &end_badly,
@@ -272,9 +285,9 @@ do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
272 memset (data, 43, sizeof (data)); 285 memset (data, 43, sizeof (data));
273 286
274 /* Insert the data at the first peer */ 287 /* Insert the data at the first peer */
275 GNUNET_DHT_put (peer1dht, &key, 1, GNUNET_DHT_RO_NONE, GNUNET_BLOCK_TYPE_TEST, 288 put_op = GNUNET_DHT_put (peer1dht, &key, 1, GNUNET_DHT_RO_NONE, GNUNET_BLOCK_TYPE_TEST,
276 sizeof (data), data, GNUNET_TIME_UNIT_FOREVER_ABS, 289 sizeof (data), data, GNUNET_TIME_UNIT_FOREVER_ABS,
277 GNUNET_TIME_UNIT_FOREVER_REL, &put_finished, NULL); 290 GNUNET_TIME_UNIT_FOREVER_REL, &put_finished, NULL);
278} 291}
279 292
280 293