aboutsummaryrefslogtreecommitdiff
path: root/src/dht/dht_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2012-04-26 16:56:00 +0000
committerChristian Grothoff <christian@grothoff.org>2012-04-26 16:56:00 +0000
commit4757258de33285fed2aa318c374dcdbf586c29f0 (patch)
tree283fa337cdaa54fbb3bbf08c937fdb6467c2b5d1 /src/dht/dht_api.c
parent506d42b2ba6eb104e64fd0c8889ea7233a9b96b3 (diff)
downloadgnunet-4757258de33285fed2aa318c374dcdbf586c29f0.tar.gz
gnunet-4757258de33285fed2aa318c374dcdbf586c29f0.zip
-fixing #2277
Diffstat (limited to 'src/dht/dht_api.c')
-rw-r--r--src/dht/dht_api.c480
1 files changed, 328 insertions, 152 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 9e95155a9..37b65b022 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors) 3 (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -74,11 +74,6 @@ struct PendingMessage
74 void *cont_cls; 74 void *cont_cls;
75 75
76 /** 76 /**
77 * Timeout task for this message
78 */
79 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
80
81 /**
82 * Unique ID for this request 77 * Unique ID for this request
83 */ 78 */
84 uint64_t unique_id; 79 uint64_t unique_id;
@@ -100,6 +95,56 @@ struct PendingMessage
100 95
101 96
102/** 97/**
98 * Handle to a PUT request.
99 */
100struct GNUNET_DHT_PutHandle
101{
102 /**
103 * Kept in a DLL.
104 */
105 struct GNUNET_DHT_PutHandle *next;
106
107 /**
108 * Kept in a DLL.
109 */
110 struct GNUNET_DHT_PutHandle *prev;
111
112 /**
113 * Continuation to call when done.
114 */
115 GNUNET_DHT_PutContinuation cont;
116
117 /**
118 * Pending message associated with this PUT operation,
119 * NULL after the message has been transmitted to the service.
120 */
121 struct PendingMessage *pending;
122
123 /**
124 * Main handle to this DHT api
125 */
126 struct GNUNET_DHT_Handle *dht_handle;
127
128 /**
129 * Closure for 'cont'.
130 */
131 void *cont_cls;
132
133 /**
134 * Timeout task for this operation.
135 */
136 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
137
138 /**
139 * Unique ID for the PUT operation.
140 */
141 uint64_t unique_id;
142
143};
144
145
146
147/**
103 * Handle to a GET request 148 * Handle to a GET request
104 */ 149 */
105struct GNUNET_DHT_GetHandle 150struct GNUNET_DHT_GetHandle
@@ -235,8 +280,18 @@ struct GNUNET_DHT_Handle
235 struct GNUNET_DHT_MonitorHandle *monitor_tail; 280 struct GNUNET_DHT_MonitorHandle *monitor_tail;
236 281
237 /** 282 /**
238 * Hash map containing the current outstanding unique requests 283 * Head of active PUT requests.
239 * (values are of type 'struct GNUNET_DHT_RouteHandle'). 284 */
285 struct GNUNET_DHT_PutHandle *put_head;
286
287 /**
288 * Tail of active PUT requests.
289 */
290 struct GNUNET_DHT_PutHandle *put_tail;
291
292 /**
293 * Hash map containing the current outstanding unique GET requests
294 * (values are of type 'struct GNUNET_DHT_GetHandle').
240 */ 295 */
241 struct GNUNET_CONTAINER_MultiHashMap *active_requests; 296 struct GNUNET_CONTAINER_MultiHashMap *active_requests;
242 297
@@ -267,6 +322,8 @@ struct GNUNET_DHT_Handle
267 * Handler for messages received from the DHT service 322 * Handler for messages received from the DHT service
268 * a demultiplexer which handles numerous message types 323 * a demultiplexer which handles numerous message types
269 * 324 *
325 * @param cls the 'struct GNUNET_DHT_Handle'
326 * @param msg the incoming message
270 */ 327 */
271static void 328static void
272service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg); 329service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
@@ -275,16 +332,17 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
275/** 332/**
276 * Try to (re)connect to the DHT service. 333 * Try to (re)connect to the DHT service.
277 * 334 *
335 * @param handle DHT handle to reconnect
278 * @return GNUNET_YES on success, GNUNET_NO on failure. 336 * @return GNUNET_YES on success, GNUNET_NO on failure.
279 */ 337 */
280static int 338static int
281try_connect (struct GNUNET_DHT_Handle *handle) 339try_connect (struct GNUNET_DHT_Handle *handle)
282{ 340{
283 if (handle->client != NULL) 341 if (NULL != handle->client)
284 return GNUNET_OK; 342 return GNUNET_OK;
285 handle->in_receive = GNUNET_NO; 343 handle->in_receive = GNUNET_NO;
286 handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg); 344 handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
287 if (handle->client == NULL) 345 if (NULL == handle->client)
288 { 346 {
289 LOG (GNUNET_ERROR_TYPE_WARNING, 347 LOG (GNUNET_ERROR_TYPE_WARNING,
290 _("Failed to connect to the DHT service!\n")); 348 _("Failed to connect to the DHT service!\n"));
@@ -324,6 +382,7 @@ add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
324 382
325/** 383/**
326 * Try to send messages from list of messages to send 384 * Try to send messages from list of messages to send
385 *
327 * @param handle DHT_Handle 386 * @param handle DHT_Handle
328 */ 387 */
329static void 388static void
@@ -369,9 +428,12 @@ try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
369static void 428static void
370do_disconnect (struct GNUNET_DHT_Handle *handle) 429do_disconnect (struct GNUNET_DHT_Handle *handle)
371{ 430{
372 if (handle->client == NULL) 431 struct GNUNET_DHT_PutHandle *ph;
432 struct GNUNET_DHT_PutHandle *next;
433
434 if (NULL == handle->client)
373 return; 435 return;
374 GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK); 436 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task);
375 if (NULL != handle->th) 437 if (NULL != handle->th)
376 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); 438 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
377 handle->th = NULL; 439 handle->th = NULL;
@@ -380,6 +442,20 @@ do_disconnect (struct GNUNET_DHT_Handle *handle)
380 (unsigned long long) handle->retry_time.rel_value); 442 (unsigned long long) handle->retry_time.rel_value);
381 GNUNET_CLIENT_disconnect (handle->client); 443 GNUNET_CLIENT_disconnect (handle->client);
382 handle->client = NULL; 444 handle->client = NULL;
445
446 /* signal disconnect to all PUT requests that were transmitted but waiting
447 for the put confirmation */
448 next = handle->put_head;
449 while (NULL != (ph = next))
450 {
451 next = ph->next;
452 if (NULL == ph->pending)
453 {
454 if (NULL != ph->cont)
455 ph->cont (ph->cont_cls, GNUNET_SYSERR);
456 GNUNET_DHT_put_cancel (ph);
457 }
458 }
383 handle->reconnect_task = 459 handle->reconnect_task =
384 GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle); 460 GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
385} 461}
@@ -387,6 +463,11 @@ do_disconnect (struct GNUNET_DHT_Handle *handle)
387 463
388/** 464/**
389 * Transmit the next pending message, called by notify_transmit_ready 465 * Transmit the next pending message, called by notify_transmit_ready
466 *
467 * @param cls the DHT handle
468 * @param size number of bytes available in 'buf' for transmission
469 * @param buf where to copy messages for the service
470 * @return number of bytes written to 'buf'
390 */ 471 */
391static size_t 472static size_t
392transmit_pending (void *cls, size_t size, void *buf); 473transmit_pending (void *cls, size_t size, void *buf);
@@ -394,20 +475,22 @@ transmit_pending (void *cls, size_t size, void *buf);
394 475
395/** 476/**
396 * Try to send messages from list of messages to send 477 * Try to send messages from list of messages to send
478 *
479 * @param handle handle to DHT
397 */ 480 */
398static void 481static void
399process_pending_messages (struct GNUNET_DHT_Handle *handle) 482process_pending_messages (struct GNUNET_DHT_Handle *handle)
400{ 483{
401 struct PendingMessage *head; 484 struct PendingMessage *head;
402 485
403 if (handle->client == NULL) 486 if (NULL == handle->client)
404 { 487 {
405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 488 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
406 "process_pending_messages called, but client is null, reconnecting\n"); 489 "process_pending_messages called, but client is NULL, reconnecting\n");
407 do_disconnect (handle); 490 do_disconnect (handle);
408 return; 491 return;
409 } 492 }
410 if (handle->th != NULL) 493 if (NULL != handle->th)
411 return; 494 return;
412 if (NULL == (head = handle->pending_head)) 495 if (NULL == (head = handle->pending_head))
413 return; 496 return;
@@ -427,6 +510,11 @@ process_pending_messages (struct GNUNET_DHT_Handle *handle)
427 510
428/** 511/**
429 * Transmit the next pending message, called by notify_transmit_ready 512 * Transmit the next pending message, called by notify_transmit_ready
513 *
514 * @param cls the DHT handle
515 * @param size number of bytes available in 'buf' for transmission
516 * @param buf where to copy messages for the service
517 * @return number of bytes written to 'buf'
430 */ 518 */
431static size_t 519static size_t
432transmit_pending (void *cls, size_t size, void *buf) 520transmit_pending (void *cls, size_t size, void *buf)
@@ -436,7 +524,7 @@ transmit_pending (void *cls, size_t size, void *buf)
436 size_t tsize; 524 size_t tsize;
437 525
438 handle->th = NULL; 526 handle->th = NULL;
439 if (buf == NULL) 527 if (NULL == buf)
440 { 528 {
441 LOG (GNUNET_ERROR_TYPE_DEBUG, 529 LOG (GNUNET_ERROR_TYPE_DEBUG,
442 "Transmission to DHT service failed! Reconnecting!\n"); 530 "Transmission to DHT service failed! Reconnecting!\n");
@@ -456,11 +544,6 @@ transmit_pending (void *cls, size_t size, void *buf)
456 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, 544 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
457 head); 545 head);
458 head->in_pending_queue = GNUNET_NO; 546 head->in_pending_queue = GNUNET_NO;
459 if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
460 {
461 GNUNET_SCHEDULER_cancel (head->timeout_task);
462 head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
463 }
464 if (NULL != head->cont) 547 if (NULL != head->cont)
465 { 548 {
466 head->cont (head->cont_cls, NULL); 549 head->cont (head->cont_cls, NULL);
@@ -557,33 +640,24 @@ process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
557 const struct GNUNET_DHT_MonitorGetMessage *msg) 640 const struct GNUNET_DHT_MonitorGetMessage *msg)
558{ 641{
559 struct GNUNET_DHT_MonitorHandle *h; 642 struct GNUNET_DHT_MonitorHandle *h;
560 size_t msize;
561 643
562 msize = ntohs (msg->header.size); 644 for (h = handle->monitor_head; NULL != h; h = h->next)
563 if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
564 return GNUNET_SYSERR;
565
566 h = handle->monitor_head;
567 while (NULL != h)
568 { 645 {
569 int type_ok; 646 int type_ok;
570 int key_ok; 647 int key_ok;
571 648
572 type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); 649 type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
573 key_ok = NULL == h->key || memcmp (h->key, &msg->key, 650 key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
574 sizeof (GNUNET_HashCode)) == 0; 651 sizeof (GNUNET_HashCode)));
575 if (type_ok && key_ok && NULL != h->get_cb) 652 if (type_ok && key_ok && (NULL != h->get_cb))
576 {
577 h->get_cb (h->cb_cls, 653 h->get_cb (h->cb_cls,
578 ntohl (msg->options), 654 ntohl (msg->options),
579 (enum GNUNET_BLOCK_Type) ntohl(msg->type), 655 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
580 ntohl (msg->hop_count), 656 ntohl (msg->hop_count),
581 ntohl (msg->desired_replication_level), 657 ntohl (msg->desired_replication_level),
582 ntohl (msg->get_path_length), 658 ntohl (msg->get_path_length),
583 (struct GNUNET_PeerIdentity *) &msg[1], 659 (struct GNUNET_PeerIdentity *) &msg[1],
584 &msg->key); 660 &msg->key);
585 }
586 h = h->next;
587 } 661 }
588 return GNUNET_OK; 662 return GNUNET_OK;
589} 663}
@@ -593,8 +667,7 @@ process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
593 * Process a get response monitor message from the service. 667 * Process a get response monitor message from the service.
594 * 668 *
595 * @param handle The DHT handle. 669 * @param handle The DHT handle.
596 * @param msg Monitor get response message from the service. 670 * @param msg monitor get response message from the service
597 *
598 * @return GNUNET_OK if everything went fine, 671 * @return GNUNET_OK if everything went fine,
599 * GNUNET_SYSERR if the message is malformed. 672 * GNUNET_SYSERR if the message is malformed.
600 */ 673 */
@@ -604,30 +677,30 @@ process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
604 *msg) 677 *msg)
605{ 678{
606 struct GNUNET_DHT_MonitorHandle *h; 679 struct GNUNET_DHT_MonitorHandle *h;
680 struct GNUNET_PeerIdentity *path;
681 uint32_t getl;
682 uint32_t putl;
607 size_t msize; 683 size_t msize;
608 684
609 msize = ntohs (msg->header.size); 685 msize = ntohs (msg->header.size);
610 if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) 686 path = (struct GNUNET_PeerIdentity *) &msg[1];
687 getl = ntohl (msg->get_path_length);
688 putl = ntohl (msg->put_path_length);
689 if ( (getl + putl < getl) ||
690 ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) )
691 {
692 GNUNET_break (0);
611 return GNUNET_SYSERR; 693 return GNUNET_SYSERR;
612 694 }
613 h = handle->monitor_head; 695 for (h = handle->monitor_head; NULL != h; h = h->next)
614 while (NULL != h)
615 { 696 {
616 int type_ok; 697 int type_ok;
617 int key_ok; 698 int key_ok;
618 699
619 type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); 700 type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
620 key_ok = NULL == h->key || memcmp (h->key, &msg->key, 701 key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
621 sizeof (GNUNET_HashCode)) == 0; 702 sizeof (GNUNET_HashCode)));
622 if (type_ok && key_ok && NULL != h->get_resp_cb) 703 if (type_ok && key_ok && (NULL != h->get_resp_cb))
623 {
624 struct GNUNET_PeerIdentity *path;
625 uint32_t getl;
626 uint32_t putl;
627
628 path = (struct GNUNET_PeerIdentity *) &msg[1];
629 getl = ntohl (msg->get_path_length);
630 putl = ntohl (msg->put_path_length);
631 h->get_resp_cb (h->cb_cls, 704 h->get_resp_cb (h->cb_cls,
632 (enum GNUNET_BLOCK_Type) ntohl(msg->type), 705 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
633 path, getl, 706 path, getl,
@@ -638,8 +711,6 @@ process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
638 msize - 711 msize -
639 sizeof (struct GNUNET_DHT_MonitorGetRespMessage) - 712 sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
640 sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); 713 sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
641 }
642 h = h->next;
643 } 714 }
644 return GNUNET_OK; 715 return GNUNET_OK;
645} 716}
@@ -660,27 +731,26 @@ process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
660{ 731{
661 struct GNUNET_DHT_MonitorHandle *h; 732 struct GNUNET_DHT_MonitorHandle *h;
662 size_t msize; 733 size_t msize;
734 struct GNUNET_PeerIdentity *path;
735 uint32_t putl;
663 736
664 msize = ntohs (msg->header.size); 737 msize = ntohs (msg->header.size);
665 if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage)) 738 path = (struct GNUNET_PeerIdentity *) &msg[1];
739 putl = ntohl (msg->put_path_length);
740 if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl)
741 {
742 GNUNET_break (0);
666 return GNUNET_SYSERR; 743 return GNUNET_SYSERR;
667 744 }
668 h = handle->monitor_head; 745 for (h = handle->monitor_head; NULL != h; h = h->next)
669 while (NULL != h)
670 { 746 {
671 int type_ok; 747 int type_ok;
672 int key_ok; 748 int key_ok;
673 749
674 type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); 750 type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
675 key_ok = NULL == h->key || memcmp (h->key, &msg->key, 751 key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
676 sizeof (GNUNET_HashCode)) == 0; 752 sizeof (GNUNET_HashCode)));
677 if (type_ok && key_ok && NULL != h->put_cb) 753 if (type_ok && key_ok && (NULL != h->put_cb))
678 {
679 struct GNUNET_PeerIdentity *path;
680 uint32_t putl;
681
682 path = (struct GNUNET_PeerIdentity *) &msg[1];
683 putl = ntohl (msg->put_path_length);
684 h->put_cb (h->cb_cls, 754 h->put_cb (h->cb_cls,
685 ntohl (msg->options), 755 ntohl (msg->options),
686 (enum GNUNET_BLOCK_Type) ntohl(msg->type), 756 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
@@ -693,55 +763,41 @@ process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
693 msize - 763 msize -
694 sizeof (struct GNUNET_DHT_MonitorPutMessage) - 764 sizeof (struct GNUNET_DHT_MonitorPutMessage) -
695 sizeof (struct GNUNET_PeerIdentity) * putl); 765 sizeof (struct GNUNET_PeerIdentity) * putl);
696 }
697 h = h->next;
698 } 766 }
699 return GNUNET_OK; 767 return GNUNET_OK;
700} 768}
701 769
702 770
703/** 771/**
704 * Process a monitoring message from the service: demultiplex for proper type. 772 * Process a put confirmation message from the service.
705 * 773 *
706 * @param handle The DHT handle. 774 * @param handle The DHT handle.
707 * @param msg Message from the service. 775 * @param msg confirmation message from the service.
708 *
709 * @return GNUNET_OK if everything went fine, 776 * @return GNUNET_OK if everything went fine,
710 * GNUNET_SYSERR if the message is malformed. 777 * GNUNET_SYSERR if the message is malformed.
711 */ 778 */
712static int 779static int
713process_monitor_message (struct GNUNET_DHT_Handle *handle, 780process_put_confirmation_message (struct GNUNET_DHT_Handle *handle,
714 const struct GNUNET_MessageHeader *msg) 781 const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
715{ 782{
716 switch (ntohs (msg->type)) 783 struct GNUNET_DHT_PutHandle *ph;
717 { 784 GNUNET_DHT_PutContinuation cont;
718 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET: 785 void *cont_cls;
719 return process_monitor_get_message(handle,
720 (struct GNUNET_DHT_MonitorGetMessage *)
721 msg);
722 786
723 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP: 787 for (ph = handle->put_head; NULL != ph; ph = ph->next)
724 { 788 if (ph->unique_id == msg->unique_id)
725 return process_monitor_get_resp_message( 789 break;
726 handle, 790 if (NULL == ph)
727 (struct GNUNET_DHT_MonitorGetRespMessage *) msg); 791 return GNUNET_OK;
728 } 792 cont = ph->cont;
729 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT: 793 cont_cls = ph->cont_cls;
730 { 794 GNUNET_DHT_put_cancel (ph);
731 return process_monitor_put_message(handle, 795 if (NULL != cont)
732 (struct GNUNET_DHT_MonitorPutMessage *) 796 cont (cont_cls, GNUNET_OK);
733 msg); 797 return GNUNET_OK;
734 }
735 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
736 /* Not implemented yet */
737 GNUNET_break(0);
738 /* Fall through */
739 default:
740 GNUNET_break(0);
741 return GNUNET_SYSERR;
742 }
743} 798}
744 799
800
745/** 801/**
746 * Handler for messages received from the DHT service 802 * Handler for messages received from the DHT service
747 * a demultiplexer which handles numerous message types 803 * a demultiplexer which handles numerous message types
@@ -754,38 +810,84 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
754{ 810{
755 struct GNUNET_DHT_Handle *handle = cls; 811 struct GNUNET_DHT_Handle *handle = cls;
756 const struct GNUNET_DHT_ClientResultMessage *dht_msg; 812 const struct GNUNET_DHT_ClientResultMessage *dht_msg;
813 uint16_t msize;
814 int ret;
757 815
758 if (msg == NULL) 816 if (NULL == msg)
759 { 817 {
760 LOG (GNUNET_ERROR_TYPE_DEBUG, 818 LOG (GNUNET_ERROR_TYPE_DEBUG,
761 "Error receiving data from DHT service, reconnecting\n"); 819 "Error receiving data from DHT service, reconnecting\n");
762 do_disconnect (handle); 820 do_disconnect (handle);
763 return; 821 return;
764 } 822 }
765 if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT) 823 ret = GNUNET_SYSERR;
824 msize = ntohs (msg->size);
825 switch (ntohs (msg->type))
766 { 826 {
767 if (process_monitor_message (handle, msg) == GNUNET_OK) 827 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
828 if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
768 { 829 {
769 GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, 830 GNUNET_break (0);
770 GNUNET_TIME_UNIT_FOREVER_REL); 831 break;
771 return;
772 } 832 }
773 GNUNET_break (0); 833 ret = process_monitor_get_message(handle,
774 do_disconnect (handle); 834 (const struct GNUNET_DHT_MonitorGetMessage *) msg);
775 return; 835 break;
836 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
837 if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
838 {
839 GNUNET_break (0);
840 break;
841 }
842 ret = process_monitor_get_resp_message(handle,
843 (const struct GNUNET_DHT_MonitorGetRespMessage *) msg);
844 break;
845 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
846 if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
847 {
848 GNUNET_break (0);
849 break;
850 }
851 ret = process_monitor_put_message(handle,
852 (const struct GNUNET_DHT_MonitorPutMessage *) msg);
853 break;
854 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
855 /* Not implemented yet */
856 GNUNET_break(0);
857 break;
858 case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT:
859 if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
860 {
861 GNUNET_break (0);
862 break;
863 }
864 ret = GNUNET_OK;
865 dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
866 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
867 GNUNET_h2s (&dht_msg->key), handle);
868 GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
869 &dht_msg->key, &process_reply,
870 (void *) dht_msg);
871 break;
872 case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK:
873 if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage))
874 {
875 GNUNET_break (0);
876 break;
877 }
878 ret = process_put_confirmation_message (handle,
879 (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg);
880 break;
881 default:
882 GNUNET_break(0);
883 break;
776 } 884 }
777 if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage)) 885 if (GNUNET_OK != ret)
778 { 886 {
779 GNUNET_break (0); 887 GNUNET_break (0);
780 do_disconnect (handle); 888 do_disconnect (handle);
781 return; 889 return;
782 } 890 }
783 dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
784 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
785 GNUNET_h2s (&dht_msg->key), handle);
786 GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
787 &dht_msg->key, &process_reply,
788 (void *) dht_msg);
789 GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, 891 GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
790 GNUNET_TIME_UNIT_FOREVER_REL); 892 GNUNET_TIME_UNIT_FOREVER_REL);
791} 893}
@@ -829,11 +931,12 @@ void
829GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) 931GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
830{ 932{
831 struct PendingMessage *pm; 933 struct PendingMessage *pm;
934 struct GNUNET_DHT_PutHandle *ph;
832 935
833 GNUNET_assert (handle != NULL); 936 GNUNET_assert (NULL != handle);
834 GNUNET_assert (0 == 937 GNUNET_assert (0 ==
835 GNUNET_CONTAINER_multihashmap_size (handle->active_requests)); 938 GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
836 if (handle->th != NULL) 939 if (NULL != handle->th)
837 { 940 {
838 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); 941 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
839 handle->th = NULL; 942 handle->th = NULL;
@@ -845,18 +948,24 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
845 pm); 948 pm);
846 pm->in_pending_queue = GNUNET_NO; 949 pm->in_pending_queue = GNUNET_NO;
847 GNUNET_assert (GNUNET_YES == pm->free_on_send); 950 GNUNET_assert (GNUNET_YES == pm->free_on_send);
848 if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
849 GNUNET_SCHEDULER_cancel (pm->timeout_task);
850 if (NULL != pm->cont) 951 if (NULL != pm->cont)
851 pm->cont (pm->cont_cls, NULL); 952 pm->cont (pm->cont_cls, NULL);
852 GNUNET_free (pm); 953 GNUNET_free (pm);
853 } 954 }
854 if (handle->client != NULL) 955 while (NULL != (ph = handle->put_head))
956 {
957 GNUNET_break (NULL == ph->pending);
958 if (NULL != ph->cont)
959 ph->cont (ph->cont_cls, GNUNET_SYSERR);
960 GNUNET_DHT_put_cancel (ph);
961 }
962
963 if (NULL != handle->client)
855 { 964 {
856 GNUNET_CLIENT_disconnect (handle->client); 965 GNUNET_CLIENT_disconnect (handle->client);
857 handle->client = NULL; 966 handle->client = NULL;
858 } 967 }
859 if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK) 968 if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
860 GNUNET_SCHEDULER_cancel (handle->reconnect_task); 969 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
861 GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests); 970 GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
862 GNUNET_free (handle); 971 GNUNET_free (handle);
@@ -872,17 +981,40 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
872static void 981static void
873timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 982timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
874{ 983{
875 struct PendingMessage *pending = cls; 984 struct GNUNET_DHT_PutHandle *ph = cls;
876 struct GNUNET_DHT_Handle *handle; 985 struct GNUNET_DHT_Handle *handle = ph->dht_handle;
877 986
878 handle = pending->handle; 987 ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
879 GNUNET_assert (GNUNET_YES == pending->in_pending_queue); 988 if (NULL != ph->pending)
880 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, 989 {
881 pending); 990 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
882 pending->in_pending_queue = GNUNET_NO; 991 ph->pending);
883 if (pending->cont != NULL) 992 ph->pending->in_pending_queue = GNUNET_NO;
884 pending->cont (pending->cont_cls, tc); 993 GNUNET_free (ph->pending);
885 GNUNET_free (pending); 994 }
995 if (NULL != ph->cont)
996 ph->cont (ph->cont_cls, GNUNET_NO);
997 GNUNET_CONTAINER_DLL_remove (handle->put_head,
998 handle->put_tail,
999 ph);
1000 GNUNET_free (ph);
1001}
1002
1003
1004/**
1005 * Function called whenever the PUT message leaves the queue. Sets
1006 * the message pointer in the put handle to NULL.
1007 *
1008 * @param cls the 'struct GNUNET_DHT_PutHandle'
1009 * @param tc unused
1010 */
1011static void
1012mark_put_message_gone (void *cls,
1013 const struct GNUNET_SCHEDULER_TaskContext *tc)
1014{
1015 struct GNUNET_DHT_PutHandle *ph = cls;
1016
1017 ph->pending = NULL;
886} 1018}
887 1019
888 1020
@@ -907,49 +1039,94 @@ timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
907 * You must not call GNUNET_DHT_DISCONNECT in this continuation 1039 * You must not call GNUNET_DHT_DISCONNECT in this continuation
908 * @param cont_cls closure for cont 1040 * @param cont_cls closure for cont
909 */ 1041 */
910void 1042struct GNUNET_DHT_PutHandle *
911GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, 1043GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
912 uint32_t desired_replication_level, 1044 uint32_t desired_replication_level,
913 enum GNUNET_DHT_RouteOption options, 1045 enum GNUNET_DHT_RouteOption options,
914 enum GNUNET_BLOCK_Type type, size_t size, const char *data, 1046 enum GNUNET_BLOCK_Type type, size_t size, const char *data,
915 struct GNUNET_TIME_Absolute exp, 1047 struct GNUNET_TIME_Absolute exp,
916 struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont, 1048 struct GNUNET_TIME_Relative timeout, GNUNET_DHT_PutContinuation cont,
917 void *cont_cls) 1049 void *cont_cls)
918{ 1050{
919 struct GNUNET_DHT_ClientPutMessage *put_msg; 1051 struct GNUNET_DHT_ClientPutMessage *put_msg;
920 size_t msize; 1052 size_t msize;
921 struct PendingMessage *pending; 1053 struct PendingMessage *pending;
1054 struct GNUNET_DHT_PutHandle *ph;
922 1055
923 msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size; 1056 msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
924 if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || 1057 if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
925 (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)) 1058 (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
926 { 1059 {
927 GNUNET_break (0); 1060 GNUNET_break (0);
928 if (NULL != cont) 1061 return NULL;
929 cont (cont_cls, NULL);
930 return;
931 } 1062 }
1063 ph = GNUNET_malloc (sizeof (struct GNUNET_DHT_PutHandle));
1064 ph->dht_handle = handle;
1065 ph->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, ph);
1066 ph->cont = cont;
1067 ph->cont_cls = cont_cls;
1068 ph->unique_id = ++handle->uid_gen;
932 pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize); 1069 pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1070 ph->pending = pending;
933 put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1]; 1071 put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
934 pending->msg = &put_msg->header; 1072 pending->msg = &put_msg->header;
935 pending->handle = handle; 1073 pending->handle = handle;
936 pending->cont = cont; 1074 pending->cont = &mark_put_message_gone;
937 pending->cont_cls = cont_cls; 1075 pending->cont_cls = ph;
938 pending->free_on_send = GNUNET_YES; 1076 pending->free_on_send = GNUNET_YES;
939 pending->timeout_task =
940 GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending);
941 put_msg->header.size = htons (msize); 1077 put_msg->header.size = htons (msize);
942 put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT); 1078 put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
943 put_msg->type = htonl (type); 1079 put_msg->type = htonl (type);
944 put_msg->options = htonl ((uint32_t) options); 1080 put_msg->options = htonl ((uint32_t) options);
945 put_msg->desired_replication_level = htonl (desired_replication_level); 1081 put_msg->desired_replication_level = htonl (desired_replication_level);
1082 put_msg->unique_id = ph->unique_id;
946 put_msg->expiration = GNUNET_TIME_absolute_hton (exp); 1083 put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
947 put_msg->key = *key; 1084 put_msg->key = *key;
948 memcpy (&put_msg[1], data, size); 1085 memcpy (&put_msg[1], data, size);
949 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, 1086 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
950 pending); 1087 pending);
951 pending->in_pending_queue = GNUNET_YES; 1088 pending->in_pending_queue = GNUNET_YES;
1089 GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1090 handle->put_tail,
1091 ph);
952 process_pending_messages (handle); 1092 process_pending_messages (handle);
1093 return ph;
1094}
1095
1096
1097/**
1098 * Cancels a DHT PUT operation. Note that the PUT request may still
1099 * go out over the network (we can't stop that); However, if the PUT
1100 * has not yet been sent to the service, cancelling the PUT will stop
1101 * this from happening (but there is no way for the user of this API
1102 * to tell if that is the case). The only use for this API is to
1103 * prevent a later call to 'cont' from "GNUNET_DHT_put" (i.e. because
1104 * the system is shutting down).
1105 *
1106 * @param ph put operation to cancel ('cont' will no longer be called)
1107 */
1108void
1109GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1110{
1111 struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1112
1113 if (NULL != ph->pending)
1114 {
1115 GNUNET_CONTAINER_DLL_remove (handle->pending_head,
1116 handle->pending_tail,
1117 ph->pending);
1118 GNUNET_free (ph->pending);
1119 ph->pending = NULL;
1120 }
1121 if (ph->timeout_task != GNUNET_SCHEDULER_NO_TASK)
1122 {
1123 GNUNET_SCHEDULER_cancel (ph->timeout_task);
1124 ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1125 }
1126 GNUNET_CONTAINER_DLL_remove (handle->put_head,
1127 handle->put_tail,
1128 ph);
1129 GNUNET_free (ph);
953} 1130}
954 1131
955 1132
@@ -1004,8 +1181,7 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1004 get_msg->desired_replication_level = htonl (desired_replication_level); 1181 get_msg->desired_replication_level = htonl (desired_replication_level);
1005 get_msg->type = htonl (type); 1182 get_msg->type = htonl (type);
1006 get_msg->key = *key; 1183 get_msg->key = *key;
1007 handle->uid_gen++; 1184 get_msg->unique_id = ++handle->uid_gen;
1008 get_msg->unique_id = handle->uid_gen;
1009 memcpy (&get_msg[1], xquery, xquery_size); 1185 memcpy (&get_msg[1], xquery, xquery_size);
1010 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, 1186 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
1011 pending); 1187 pending);