diff options
author | Christian Grothoff <christian@grothoff.org> | 2012-04-26 16:56:00 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2012-04-26 16:56:00 +0000 |
commit | 4757258de33285fed2aa318c374dcdbf586c29f0 (patch) | |
tree | 283fa337cdaa54fbb3bbf08c937fdb6467c2b5d1 /src/dht/dht_api.c | |
parent | 506d42b2ba6eb104e64fd0c8889ea7233a9b96b3 (diff) | |
download | gnunet-4757258de33285fed2aa318c374dcdbf586c29f0.tar.gz gnunet-4757258de33285fed2aa318c374dcdbf586c29f0.zip |
-fixing #2277
Diffstat (limited to 'src/dht/dht_api.c')
-rw-r--r-- | src/dht/dht_api.c | 480 |
1 files changed, 328 insertions, 152 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 9e95155a9..37b65b022 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | (C) 2009, 2010 Christian Grothoff (and other contributing authors) | 3 | (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors) |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -74,11 +74,6 @@ struct PendingMessage | |||
74 | void *cont_cls; | 74 | void *cont_cls; |
75 | 75 | ||
76 | /** | 76 | /** |
77 | * Timeout task for this message | ||
78 | */ | ||
79 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | ||
80 | |||
81 | /** | ||
82 | * Unique ID for this request | 77 | * Unique ID for this request |
83 | */ | 78 | */ |
84 | uint64_t unique_id; | 79 | uint64_t unique_id; |
@@ -100,6 +95,56 @@ struct PendingMessage | |||
100 | 95 | ||
101 | 96 | ||
102 | /** | 97 | /** |
98 | * Handle to a PUT request. | ||
99 | */ | ||
100 | struct GNUNET_DHT_PutHandle | ||
101 | { | ||
102 | /** | ||
103 | * Kept in a DLL. | ||
104 | */ | ||
105 | struct GNUNET_DHT_PutHandle *next; | ||
106 | |||
107 | /** | ||
108 | * Kept in a DLL. | ||
109 | */ | ||
110 | struct GNUNET_DHT_PutHandle *prev; | ||
111 | |||
112 | /** | ||
113 | * Continuation to call when done. | ||
114 | */ | ||
115 | GNUNET_DHT_PutContinuation cont; | ||
116 | |||
117 | /** | ||
118 | * Pending message associated with this PUT operation, | ||
119 | * NULL after the message has been transmitted to the service. | ||
120 | */ | ||
121 | struct PendingMessage *pending; | ||
122 | |||
123 | /** | ||
124 | * Main handle to this DHT api | ||
125 | */ | ||
126 | struct GNUNET_DHT_Handle *dht_handle; | ||
127 | |||
128 | /** | ||
129 | * Closure for 'cont'. | ||
130 | */ | ||
131 | void *cont_cls; | ||
132 | |||
133 | /** | ||
134 | * Timeout task for this operation. | ||
135 | */ | ||
136 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | ||
137 | |||
138 | /** | ||
139 | * Unique ID for the PUT operation. | ||
140 | */ | ||
141 | uint64_t unique_id; | ||
142 | |||
143 | }; | ||
144 | |||
145 | |||
146 | |||
147 | /** | ||
103 | * Handle to a GET request | 148 | * Handle to a GET request |
104 | */ | 149 | */ |
105 | struct GNUNET_DHT_GetHandle | 150 | struct GNUNET_DHT_GetHandle |
@@ -235,8 +280,18 @@ struct GNUNET_DHT_Handle | |||
235 | struct GNUNET_DHT_MonitorHandle *monitor_tail; | 280 | struct GNUNET_DHT_MonitorHandle *monitor_tail; |
236 | 281 | ||
237 | /** | 282 | /** |
238 | * Hash map containing the current outstanding unique requests | 283 | * Head of active PUT requests. |
239 | * (values are of type 'struct GNUNET_DHT_RouteHandle'). | 284 | */ |
285 | struct GNUNET_DHT_PutHandle *put_head; | ||
286 | |||
287 | /** | ||
288 | * Tail of active PUT requests. | ||
289 | */ | ||
290 | struct GNUNET_DHT_PutHandle *put_tail; | ||
291 | |||
292 | /** | ||
293 | * Hash map containing the current outstanding unique GET requests | ||
294 | * (values are of type 'struct GNUNET_DHT_GetHandle'). | ||
240 | */ | 295 | */ |
241 | struct GNUNET_CONTAINER_MultiHashMap *active_requests; | 296 | struct GNUNET_CONTAINER_MultiHashMap *active_requests; |
242 | 297 | ||
@@ -267,6 +322,8 @@ struct GNUNET_DHT_Handle | |||
267 | * Handler for messages received from the DHT service | 322 | * Handler for messages received from the DHT service |
268 | * a demultiplexer which handles numerous message types | 323 | * a demultiplexer which handles numerous message types |
269 | * | 324 | * |
325 | * @param cls the 'struct GNUNET_DHT_Handle' | ||
326 | * @param msg the incoming message | ||
270 | */ | 327 | */ |
271 | static void | 328 | static void |
272 | service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg); | 329 | service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg); |
@@ -275,16 +332,17 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg); | |||
275 | /** | 332 | /** |
276 | * Try to (re)connect to the DHT service. | 333 | * Try to (re)connect to the DHT service. |
277 | * | 334 | * |
335 | * @param handle DHT handle to reconnect | ||
278 | * @return GNUNET_YES on success, GNUNET_NO on failure. | 336 | * @return GNUNET_YES on success, GNUNET_NO on failure. |
279 | */ | 337 | */ |
280 | static int | 338 | static int |
281 | try_connect (struct GNUNET_DHT_Handle *handle) | 339 | try_connect (struct GNUNET_DHT_Handle *handle) |
282 | { | 340 | { |
283 | if (handle->client != NULL) | 341 | if (NULL != handle->client) |
284 | return GNUNET_OK; | 342 | return GNUNET_OK; |
285 | handle->in_receive = GNUNET_NO; | 343 | handle->in_receive = GNUNET_NO; |
286 | handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg); | 344 | handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg); |
287 | if (handle->client == NULL) | 345 | if (NULL == handle->client) |
288 | { | 346 | { |
289 | LOG (GNUNET_ERROR_TYPE_WARNING, | 347 | LOG (GNUNET_ERROR_TYPE_WARNING, |
290 | _("Failed to connect to the DHT service!\n")); | 348 | _("Failed to connect to the DHT service!\n")); |
@@ -324,6 +382,7 @@ add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value) | |||
324 | 382 | ||
325 | /** | 383 | /** |
326 | * Try to send messages from list of messages to send | 384 | * Try to send messages from list of messages to send |
385 | * | ||
327 | * @param handle DHT_Handle | 386 | * @param handle DHT_Handle |
328 | */ | 387 | */ |
329 | static void | 388 | static void |
@@ -369,9 +428,12 @@ try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
369 | static void | 428 | static void |
370 | do_disconnect (struct GNUNET_DHT_Handle *handle) | 429 | do_disconnect (struct GNUNET_DHT_Handle *handle) |
371 | { | 430 | { |
372 | if (handle->client == NULL) | 431 | struct GNUNET_DHT_PutHandle *ph; |
432 | struct GNUNET_DHT_PutHandle *next; | ||
433 | |||
434 | if (NULL == handle->client) | ||
373 | return; | 435 | return; |
374 | GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK); | 436 | GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task); |
375 | if (NULL != handle->th) | 437 | if (NULL != handle->th) |
376 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); | 438 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); |
377 | handle->th = NULL; | 439 | handle->th = NULL; |
@@ -380,6 +442,20 @@ do_disconnect (struct GNUNET_DHT_Handle *handle) | |||
380 | (unsigned long long) handle->retry_time.rel_value); | 442 | (unsigned long long) handle->retry_time.rel_value); |
381 | GNUNET_CLIENT_disconnect (handle->client); | 443 | GNUNET_CLIENT_disconnect (handle->client); |
382 | handle->client = NULL; | 444 | handle->client = NULL; |
445 | |||
446 | /* signal disconnect to all PUT requests that were transmitted but waiting | ||
447 | for the put confirmation */ | ||
448 | next = handle->put_head; | ||
449 | while (NULL != (ph = next)) | ||
450 | { | ||
451 | next = ph->next; | ||
452 | if (NULL == ph->pending) | ||
453 | { | ||
454 | if (NULL != ph->cont) | ||
455 | ph->cont (ph->cont_cls, GNUNET_SYSERR); | ||
456 | GNUNET_DHT_put_cancel (ph); | ||
457 | } | ||
458 | } | ||
383 | handle->reconnect_task = | 459 | handle->reconnect_task = |
384 | GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle); | 460 | GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle); |
385 | } | 461 | } |
@@ -387,6 +463,11 @@ do_disconnect (struct GNUNET_DHT_Handle *handle) | |||
387 | 463 | ||
388 | /** | 464 | /** |
389 | * Transmit the next pending message, called by notify_transmit_ready | 465 | * Transmit the next pending message, called by notify_transmit_ready |
466 | * | ||
467 | * @param cls the DHT handle | ||
468 | * @param size number of bytes available in 'buf' for transmission | ||
469 | * @param buf where to copy messages for the service | ||
470 | * @return number of bytes written to 'buf' | ||
390 | */ | 471 | */ |
391 | static size_t | 472 | static size_t |
392 | transmit_pending (void *cls, size_t size, void *buf); | 473 | transmit_pending (void *cls, size_t size, void *buf); |
@@ -394,20 +475,22 @@ transmit_pending (void *cls, size_t size, void *buf); | |||
394 | 475 | ||
395 | /** | 476 | /** |
396 | * Try to send messages from list of messages to send | 477 | * Try to send messages from list of messages to send |
478 | * | ||
479 | * @param handle handle to DHT | ||
397 | */ | 480 | */ |
398 | static void | 481 | static void |
399 | process_pending_messages (struct GNUNET_DHT_Handle *handle) | 482 | process_pending_messages (struct GNUNET_DHT_Handle *handle) |
400 | { | 483 | { |
401 | struct PendingMessage *head; | 484 | struct PendingMessage *head; |
402 | 485 | ||
403 | if (handle->client == NULL) | 486 | if (NULL == handle->client) |
404 | { | 487 | { |
405 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 488 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
406 | "process_pending_messages called, but client is null, reconnecting\n"); | 489 | "process_pending_messages called, but client is NULL, reconnecting\n"); |
407 | do_disconnect (handle); | 490 | do_disconnect (handle); |
408 | return; | 491 | return; |
409 | } | 492 | } |
410 | if (handle->th != NULL) | 493 | if (NULL != handle->th) |
411 | return; | 494 | return; |
412 | if (NULL == (head = handle->pending_head)) | 495 | if (NULL == (head = handle->pending_head)) |
413 | return; | 496 | return; |
@@ -427,6 +510,11 @@ process_pending_messages (struct GNUNET_DHT_Handle *handle) | |||
427 | 510 | ||
428 | /** | 511 | /** |
429 | * Transmit the next pending message, called by notify_transmit_ready | 512 | * Transmit the next pending message, called by notify_transmit_ready |
513 | * | ||
514 | * @param cls the DHT handle | ||
515 | * @param size number of bytes available in 'buf' for transmission | ||
516 | * @param buf where to copy messages for the service | ||
517 | * @return number of bytes written to 'buf' | ||
430 | */ | 518 | */ |
431 | static size_t | 519 | static size_t |
432 | transmit_pending (void *cls, size_t size, void *buf) | 520 | transmit_pending (void *cls, size_t size, void *buf) |
@@ -436,7 +524,7 @@ transmit_pending (void *cls, size_t size, void *buf) | |||
436 | size_t tsize; | 524 | size_t tsize; |
437 | 525 | ||
438 | handle->th = NULL; | 526 | handle->th = NULL; |
439 | if (buf == NULL) | 527 | if (NULL == buf) |
440 | { | 528 | { |
441 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 529 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
442 | "Transmission to DHT service failed! Reconnecting!\n"); | 530 | "Transmission to DHT service failed! Reconnecting!\n"); |
@@ -456,11 +544,6 @@ transmit_pending (void *cls, size_t size, void *buf) | |||
456 | GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, | 544 | GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, |
457 | head); | 545 | head); |
458 | head->in_pending_queue = GNUNET_NO; | 546 | head->in_pending_queue = GNUNET_NO; |
459 | if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK) | ||
460 | { | ||
461 | GNUNET_SCHEDULER_cancel (head->timeout_task); | ||
462 | head->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
463 | } | ||
464 | if (NULL != head->cont) | 547 | if (NULL != head->cont) |
465 | { | 548 | { |
466 | head->cont (head->cont_cls, NULL); | 549 | head->cont (head->cont_cls, NULL); |
@@ -557,33 +640,24 @@ process_monitor_get_message (struct GNUNET_DHT_Handle *handle, | |||
557 | const struct GNUNET_DHT_MonitorGetMessage *msg) | 640 | const struct GNUNET_DHT_MonitorGetMessage *msg) |
558 | { | 641 | { |
559 | struct GNUNET_DHT_MonitorHandle *h; | 642 | struct GNUNET_DHT_MonitorHandle *h; |
560 | size_t msize; | ||
561 | 643 | ||
562 | msize = ntohs (msg->header.size); | 644 | for (h = handle->monitor_head; NULL != h; h = h->next) |
563 | if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage)) | ||
564 | return GNUNET_SYSERR; | ||
565 | |||
566 | h = handle->monitor_head; | ||
567 | while (NULL != h) | ||
568 | { | 645 | { |
569 | int type_ok; | 646 | int type_ok; |
570 | int key_ok; | 647 | int key_ok; |
571 | 648 | ||
572 | type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); | 649 | type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); |
573 | key_ok = NULL == h->key || memcmp (h->key, &msg->key, | 650 | key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, |
574 | sizeof (GNUNET_HashCode)) == 0; | 651 | sizeof (GNUNET_HashCode))); |
575 | if (type_ok && key_ok && NULL != h->get_cb) | 652 | if (type_ok && key_ok && (NULL != h->get_cb)) |
576 | { | ||
577 | h->get_cb (h->cb_cls, | 653 | h->get_cb (h->cb_cls, |
578 | ntohl (msg->options), | 654 | ntohl (msg->options), |
579 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), | 655 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), |
580 | ntohl (msg->hop_count), | 656 | ntohl (msg->hop_count), |
581 | ntohl (msg->desired_replication_level), | 657 | ntohl (msg->desired_replication_level), |
582 | ntohl (msg->get_path_length), | 658 | ntohl (msg->get_path_length), |
583 | (struct GNUNET_PeerIdentity *) &msg[1], | 659 | (struct GNUNET_PeerIdentity *) &msg[1], |
584 | &msg->key); | 660 | &msg->key); |
585 | } | ||
586 | h = h->next; | ||
587 | } | 661 | } |
588 | return GNUNET_OK; | 662 | return GNUNET_OK; |
589 | } | 663 | } |
@@ -593,8 +667,7 @@ process_monitor_get_message (struct GNUNET_DHT_Handle *handle, | |||
593 | * Process a get response monitor message from the service. | 667 | * Process a get response monitor message from the service. |
594 | * | 668 | * |
595 | * @param handle The DHT handle. | 669 | * @param handle The DHT handle. |
596 | * @param msg Monitor get response message from the service. | 670 | * @param msg monitor get response message from the service |
597 | * | ||
598 | * @return GNUNET_OK if everything went fine, | 671 | * @return GNUNET_OK if everything went fine, |
599 | * GNUNET_SYSERR if the message is malformed. | 672 | * GNUNET_SYSERR if the message is malformed. |
600 | */ | 673 | */ |
@@ -604,30 +677,30 @@ process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle, | |||
604 | *msg) | 677 | *msg) |
605 | { | 678 | { |
606 | struct GNUNET_DHT_MonitorHandle *h; | 679 | struct GNUNET_DHT_MonitorHandle *h; |
680 | struct GNUNET_PeerIdentity *path; | ||
681 | uint32_t getl; | ||
682 | uint32_t putl; | ||
607 | size_t msize; | 683 | size_t msize; |
608 | 684 | ||
609 | msize = ntohs (msg->header.size); | 685 | msize = ntohs (msg->header.size); |
610 | if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) | 686 | path = (struct GNUNET_PeerIdentity *) &msg[1]; |
687 | getl = ntohl (msg->get_path_length); | ||
688 | putl = ntohl (msg->put_path_length); | ||
689 | if ( (getl + putl < getl) || | ||
690 | ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) ) | ||
691 | { | ||
692 | GNUNET_break (0); | ||
611 | return GNUNET_SYSERR; | 693 | return GNUNET_SYSERR; |
612 | 694 | } | |
613 | h = handle->monitor_head; | 695 | for (h = handle->monitor_head; NULL != h; h = h->next) |
614 | while (NULL != h) | ||
615 | { | 696 | { |
616 | int type_ok; | 697 | int type_ok; |
617 | int key_ok; | 698 | int key_ok; |
618 | 699 | ||
619 | type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); | 700 | type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); |
620 | key_ok = NULL == h->key || memcmp (h->key, &msg->key, | 701 | key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, |
621 | sizeof (GNUNET_HashCode)) == 0; | 702 | sizeof (GNUNET_HashCode))); |
622 | if (type_ok && key_ok && NULL != h->get_resp_cb) | 703 | if (type_ok && key_ok && (NULL != h->get_resp_cb)) |
623 | { | ||
624 | struct GNUNET_PeerIdentity *path; | ||
625 | uint32_t getl; | ||
626 | uint32_t putl; | ||
627 | |||
628 | path = (struct GNUNET_PeerIdentity *) &msg[1]; | ||
629 | getl = ntohl (msg->get_path_length); | ||
630 | putl = ntohl (msg->put_path_length); | ||
631 | h->get_resp_cb (h->cb_cls, | 704 | h->get_resp_cb (h->cb_cls, |
632 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), | 705 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), |
633 | path, getl, | 706 | path, getl, |
@@ -638,8 +711,6 @@ process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle, | |||
638 | msize - | 711 | msize - |
639 | sizeof (struct GNUNET_DHT_MonitorGetRespMessage) - | 712 | sizeof (struct GNUNET_DHT_MonitorGetRespMessage) - |
640 | sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); | 713 | sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); |
641 | } | ||
642 | h = h->next; | ||
643 | } | 714 | } |
644 | return GNUNET_OK; | 715 | return GNUNET_OK; |
645 | } | 716 | } |
@@ -660,27 +731,26 @@ process_monitor_put_message (struct GNUNET_DHT_Handle *handle, | |||
660 | { | 731 | { |
661 | struct GNUNET_DHT_MonitorHandle *h; | 732 | struct GNUNET_DHT_MonitorHandle *h; |
662 | size_t msize; | 733 | size_t msize; |
734 | struct GNUNET_PeerIdentity *path; | ||
735 | uint32_t putl; | ||
663 | 736 | ||
664 | msize = ntohs (msg->header.size); | 737 | msize = ntohs (msg->header.size); |
665 | if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage)) | 738 | path = (struct GNUNET_PeerIdentity *) &msg[1]; |
739 | putl = ntohl (msg->put_path_length); | ||
740 | if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl) | ||
741 | { | ||
742 | GNUNET_break (0); | ||
666 | return GNUNET_SYSERR; | 743 | return GNUNET_SYSERR; |
667 | 744 | } | |
668 | h = handle->monitor_head; | 745 | for (h = handle->monitor_head; NULL != h; h = h->next) |
669 | while (NULL != h) | ||
670 | { | 746 | { |
671 | int type_ok; | 747 | int type_ok; |
672 | int key_ok; | 748 | int key_ok; |
673 | 749 | ||
674 | type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); | 750 | type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); |
675 | key_ok = NULL == h->key || memcmp (h->key, &msg->key, | 751 | key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, |
676 | sizeof (GNUNET_HashCode)) == 0; | 752 | sizeof (GNUNET_HashCode))); |
677 | if (type_ok && key_ok && NULL != h->put_cb) | 753 | if (type_ok && key_ok && (NULL != h->put_cb)) |
678 | { | ||
679 | struct GNUNET_PeerIdentity *path; | ||
680 | uint32_t putl; | ||
681 | |||
682 | path = (struct GNUNET_PeerIdentity *) &msg[1]; | ||
683 | putl = ntohl (msg->put_path_length); | ||
684 | h->put_cb (h->cb_cls, | 754 | h->put_cb (h->cb_cls, |
685 | ntohl (msg->options), | 755 | ntohl (msg->options), |
686 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), | 756 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), |
@@ -693,55 +763,41 @@ process_monitor_put_message (struct GNUNET_DHT_Handle *handle, | |||
693 | msize - | 763 | msize - |
694 | sizeof (struct GNUNET_DHT_MonitorPutMessage) - | 764 | sizeof (struct GNUNET_DHT_MonitorPutMessage) - |
695 | sizeof (struct GNUNET_PeerIdentity) * putl); | 765 | sizeof (struct GNUNET_PeerIdentity) * putl); |
696 | } | ||
697 | h = h->next; | ||
698 | } | 766 | } |
699 | return GNUNET_OK; | 767 | return GNUNET_OK; |
700 | } | 768 | } |
701 | 769 | ||
702 | 770 | ||
703 | /** | 771 | /** |
704 | * Process a monitoring message from the service: demultiplex for proper type. | 772 | * Process a put confirmation message from the service. |
705 | * | 773 | * |
706 | * @param handle The DHT handle. | 774 | * @param handle The DHT handle. |
707 | * @param msg Message from the service. | 775 | * @param msg confirmation message from the service. |
708 | * | ||
709 | * @return GNUNET_OK if everything went fine, | 776 | * @return GNUNET_OK if everything went fine, |
710 | * GNUNET_SYSERR if the message is malformed. | 777 | * GNUNET_SYSERR if the message is malformed. |
711 | */ | 778 | */ |
712 | static int | 779 | static int |
713 | process_monitor_message (struct GNUNET_DHT_Handle *handle, | 780 | process_put_confirmation_message (struct GNUNET_DHT_Handle *handle, |
714 | const struct GNUNET_MessageHeader *msg) | 781 | const struct GNUNET_DHT_ClientPutConfirmationMessage *msg) |
715 | { | 782 | { |
716 | switch (ntohs (msg->type)) | 783 | struct GNUNET_DHT_PutHandle *ph; |
717 | { | 784 | GNUNET_DHT_PutContinuation cont; |
718 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET: | 785 | void *cont_cls; |
719 | return process_monitor_get_message(handle, | ||
720 | (struct GNUNET_DHT_MonitorGetMessage *) | ||
721 | msg); | ||
722 | 786 | ||
723 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP: | 787 | for (ph = handle->put_head; NULL != ph; ph = ph->next) |
724 | { | 788 | if (ph->unique_id == msg->unique_id) |
725 | return process_monitor_get_resp_message( | 789 | break; |
726 | handle, | 790 | if (NULL == ph) |
727 | (struct GNUNET_DHT_MonitorGetRespMessage *) msg); | 791 | return GNUNET_OK; |
728 | } | 792 | cont = ph->cont; |
729 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT: | 793 | cont_cls = ph->cont_cls; |
730 | { | 794 | GNUNET_DHT_put_cancel (ph); |
731 | return process_monitor_put_message(handle, | 795 | if (NULL != cont) |
732 | (struct GNUNET_DHT_MonitorPutMessage *) | 796 | cont (cont_cls, GNUNET_OK); |
733 | msg); | 797 | return GNUNET_OK; |
734 | } | ||
735 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP: | ||
736 | /* Not implemented yet */ | ||
737 | GNUNET_break(0); | ||
738 | /* Fall through */ | ||
739 | default: | ||
740 | GNUNET_break(0); | ||
741 | return GNUNET_SYSERR; | ||
742 | } | ||
743 | } | 798 | } |
744 | 799 | ||
800 | |||
745 | /** | 801 | /** |
746 | * Handler for messages received from the DHT service | 802 | * Handler for messages received from the DHT service |
747 | * a demultiplexer which handles numerous message types | 803 | * a demultiplexer which handles numerous message types |
@@ -754,38 +810,84 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
754 | { | 810 | { |
755 | struct GNUNET_DHT_Handle *handle = cls; | 811 | struct GNUNET_DHT_Handle *handle = cls; |
756 | const struct GNUNET_DHT_ClientResultMessage *dht_msg; | 812 | const struct GNUNET_DHT_ClientResultMessage *dht_msg; |
813 | uint16_t msize; | ||
814 | int ret; | ||
757 | 815 | ||
758 | if (msg == NULL) | 816 | if (NULL == msg) |
759 | { | 817 | { |
760 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 818 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
761 | "Error receiving data from DHT service, reconnecting\n"); | 819 | "Error receiving data from DHT service, reconnecting\n"); |
762 | do_disconnect (handle); | 820 | do_disconnect (handle); |
763 | return; | 821 | return; |
764 | } | 822 | } |
765 | if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT) | 823 | ret = GNUNET_SYSERR; |
824 | msize = ntohs (msg->size); | ||
825 | switch (ntohs (msg->type)) | ||
766 | { | 826 | { |
767 | if (process_monitor_message (handle, msg) == GNUNET_OK) | 827 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET: |
828 | if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage)) | ||
768 | { | 829 | { |
769 | GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, | 830 | GNUNET_break (0); |
770 | GNUNET_TIME_UNIT_FOREVER_REL); | 831 | break; |
771 | return; | ||
772 | } | 832 | } |
773 | GNUNET_break (0); | 833 | ret = process_monitor_get_message(handle, |
774 | do_disconnect (handle); | 834 | (const struct GNUNET_DHT_MonitorGetMessage *) msg); |
775 | return; | 835 | break; |
836 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP: | ||
837 | if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) | ||
838 | { | ||
839 | GNUNET_break (0); | ||
840 | break; | ||
841 | } | ||
842 | ret = process_monitor_get_resp_message(handle, | ||
843 | (const struct GNUNET_DHT_MonitorGetRespMessage *) msg); | ||
844 | break; | ||
845 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT: | ||
846 | if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage)) | ||
847 | { | ||
848 | GNUNET_break (0); | ||
849 | break; | ||
850 | } | ||
851 | ret = process_monitor_put_message(handle, | ||
852 | (const struct GNUNET_DHT_MonitorPutMessage *) msg); | ||
853 | break; | ||
854 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP: | ||
855 | /* Not implemented yet */ | ||
856 | GNUNET_break(0); | ||
857 | break; | ||
858 | case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT: | ||
859 | if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage)) | ||
860 | { | ||
861 | GNUNET_break (0); | ||
862 | break; | ||
863 | } | ||
864 | ret = GNUNET_OK; | ||
865 | dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg; | ||
866 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n", | ||
867 | GNUNET_h2s (&dht_msg->key), handle); | ||
868 | GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests, | ||
869 | &dht_msg->key, &process_reply, | ||
870 | (void *) dht_msg); | ||
871 | break; | ||
872 | case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK: | ||
873 | if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)) | ||
874 | { | ||
875 | GNUNET_break (0); | ||
876 | break; | ||
877 | } | ||
878 | ret = process_put_confirmation_message (handle, | ||
879 | (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg); | ||
880 | break; | ||
881 | default: | ||
882 | GNUNET_break(0); | ||
883 | break; | ||
776 | } | 884 | } |
777 | if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage)) | 885 | if (GNUNET_OK != ret) |
778 | { | 886 | { |
779 | GNUNET_break (0); | 887 | GNUNET_break (0); |
780 | do_disconnect (handle); | 888 | do_disconnect (handle); |
781 | return; | 889 | return; |
782 | } | 890 | } |
783 | dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg; | ||
784 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n", | ||
785 | GNUNET_h2s (&dht_msg->key), handle); | ||
786 | GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests, | ||
787 | &dht_msg->key, &process_reply, | ||
788 | (void *) dht_msg); | ||
789 | GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, | 891 | GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, |
790 | GNUNET_TIME_UNIT_FOREVER_REL); | 892 | GNUNET_TIME_UNIT_FOREVER_REL); |
791 | } | 893 | } |
@@ -829,11 +931,12 @@ void | |||
829 | GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) | 931 | GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) |
830 | { | 932 | { |
831 | struct PendingMessage *pm; | 933 | struct PendingMessage *pm; |
934 | struct GNUNET_DHT_PutHandle *ph; | ||
832 | 935 | ||
833 | GNUNET_assert (handle != NULL); | 936 | GNUNET_assert (NULL != handle); |
834 | GNUNET_assert (0 == | 937 | GNUNET_assert (0 == |
835 | GNUNET_CONTAINER_multihashmap_size (handle->active_requests)); | 938 | GNUNET_CONTAINER_multihashmap_size (handle->active_requests)); |
836 | if (handle->th != NULL) | 939 | if (NULL != handle->th) |
837 | { | 940 | { |
838 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); | 941 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); |
839 | handle->th = NULL; | 942 | handle->th = NULL; |
@@ -845,18 +948,24 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) | |||
845 | pm); | 948 | pm); |
846 | pm->in_pending_queue = GNUNET_NO; | 949 | pm->in_pending_queue = GNUNET_NO; |
847 | GNUNET_assert (GNUNET_YES == pm->free_on_send); | 950 | GNUNET_assert (GNUNET_YES == pm->free_on_send); |
848 | if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task) | ||
849 | GNUNET_SCHEDULER_cancel (pm->timeout_task); | ||
850 | if (NULL != pm->cont) | 951 | if (NULL != pm->cont) |
851 | pm->cont (pm->cont_cls, NULL); | 952 | pm->cont (pm->cont_cls, NULL); |
852 | GNUNET_free (pm); | 953 | GNUNET_free (pm); |
853 | } | 954 | } |
854 | if (handle->client != NULL) | 955 | while (NULL != (ph = handle->put_head)) |
956 | { | ||
957 | GNUNET_break (NULL == ph->pending); | ||
958 | if (NULL != ph->cont) | ||
959 | ph->cont (ph->cont_cls, GNUNET_SYSERR); | ||
960 | GNUNET_DHT_put_cancel (ph); | ||
961 | } | ||
962 | |||
963 | if (NULL != handle->client) | ||
855 | { | 964 | { |
856 | GNUNET_CLIENT_disconnect (handle->client); | 965 | GNUNET_CLIENT_disconnect (handle->client); |
857 | handle->client = NULL; | 966 | handle->client = NULL; |
858 | } | 967 | } |
859 | if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK) | 968 | if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task) |
860 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); | 969 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); |
861 | GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests); | 970 | GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests); |
862 | GNUNET_free (handle); | 971 | GNUNET_free (handle); |
@@ -872,17 +981,40 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) | |||
872 | static void | 981 | static void |
873 | timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 982 | timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
874 | { | 983 | { |
875 | struct PendingMessage *pending = cls; | 984 | struct GNUNET_DHT_PutHandle *ph = cls; |
876 | struct GNUNET_DHT_Handle *handle; | 985 | struct GNUNET_DHT_Handle *handle = ph->dht_handle; |
877 | 986 | ||
878 | handle = pending->handle; | 987 | ph->timeout_task = GNUNET_SCHEDULER_NO_TASK; |
879 | GNUNET_assert (GNUNET_YES == pending->in_pending_queue); | 988 | if (NULL != ph->pending) |
880 | GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, | 989 | { |
881 | pending); | 990 | GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, |
882 | pending->in_pending_queue = GNUNET_NO; | 991 | ph->pending); |
883 | if (pending->cont != NULL) | 992 | ph->pending->in_pending_queue = GNUNET_NO; |
884 | pending->cont (pending->cont_cls, tc); | 993 | GNUNET_free (ph->pending); |
885 | GNUNET_free (pending); | 994 | } |
995 | if (NULL != ph->cont) | ||
996 | ph->cont (ph->cont_cls, GNUNET_NO); | ||
997 | GNUNET_CONTAINER_DLL_remove (handle->put_head, | ||
998 | handle->put_tail, | ||
999 | ph); | ||
1000 | GNUNET_free (ph); | ||
1001 | } | ||
1002 | |||
1003 | |||
1004 | /** | ||
1005 | * Function called whenever the PUT message leaves the queue. Sets | ||
1006 | * the message pointer in the put handle to NULL. | ||
1007 | * | ||
1008 | * @param cls the 'struct GNUNET_DHT_PutHandle' | ||
1009 | * @param tc unused | ||
1010 | */ | ||
1011 | static void | ||
1012 | mark_put_message_gone (void *cls, | ||
1013 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1014 | { | ||
1015 | struct GNUNET_DHT_PutHandle *ph = cls; | ||
1016 | |||
1017 | ph->pending = NULL; | ||
886 | } | 1018 | } |
887 | 1019 | ||
888 | 1020 | ||
@@ -907,49 +1039,94 @@ timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
907 | * You must not call GNUNET_DHT_DISCONNECT in this continuation | 1039 | * You must not call GNUNET_DHT_DISCONNECT in this continuation |
908 | * @param cont_cls closure for cont | 1040 | * @param cont_cls closure for cont |
909 | */ | 1041 | */ |
910 | void | 1042 | struct GNUNET_DHT_PutHandle * |
911 | GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, | 1043 | GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, |
912 | uint32_t desired_replication_level, | 1044 | uint32_t desired_replication_level, |
913 | enum GNUNET_DHT_RouteOption options, | 1045 | enum GNUNET_DHT_RouteOption options, |
914 | enum GNUNET_BLOCK_Type type, size_t size, const char *data, | 1046 | enum GNUNET_BLOCK_Type type, size_t size, const char *data, |
915 | struct GNUNET_TIME_Absolute exp, | 1047 | struct GNUNET_TIME_Absolute exp, |
916 | struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont, | 1048 | struct GNUNET_TIME_Relative timeout, GNUNET_DHT_PutContinuation cont, |
917 | void *cont_cls) | 1049 | void *cont_cls) |
918 | { | 1050 | { |
919 | struct GNUNET_DHT_ClientPutMessage *put_msg; | 1051 | struct GNUNET_DHT_ClientPutMessage *put_msg; |
920 | size_t msize; | 1052 | size_t msize; |
921 | struct PendingMessage *pending; | 1053 | struct PendingMessage *pending; |
1054 | struct GNUNET_DHT_PutHandle *ph; | ||
922 | 1055 | ||
923 | msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size; | 1056 | msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size; |
924 | if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || | 1057 | if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || |
925 | (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)) | 1058 | (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)) |
926 | { | 1059 | { |
927 | GNUNET_break (0); | 1060 | GNUNET_break (0); |
928 | if (NULL != cont) | 1061 | return NULL; |
929 | cont (cont_cls, NULL); | ||
930 | return; | ||
931 | } | 1062 | } |
1063 | ph = GNUNET_malloc (sizeof (struct GNUNET_DHT_PutHandle)); | ||
1064 | ph->dht_handle = handle; | ||
1065 | ph->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, ph); | ||
1066 | ph->cont = cont; | ||
1067 | ph->cont_cls = cont_cls; | ||
1068 | ph->unique_id = ++handle->uid_gen; | ||
932 | pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize); | 1069 | pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize); |
1070 | ph->pending = pending; | ||
933 | put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1]; | 1071 | put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1]; |
934 | pending->msg = &put_msg->header; | 1072 | pending->msg = &put_msg->header; |
935 | pending->handle = handle; | 1073 | pending->handle = handle; |
936 | pending->cont = cont; | 1074 | pending->cont = &mark_put_message_gone; |
937 | pending->cont_cls = cont_cls; | 1075 | pending->cont_cls = ph; |
938 | pending->free_on_send = GNUNET_YES; | 1076 | pending->free_on_send = GNUNET_YES; |
939 | pending->timeout_task = | ||
940 | GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending); | ||
941 | put_msg->header.size = htons (msize); | 1077 | put_msg->header.size = htons (msize); |
942 | put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT); | 1078 | put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT); |
943 | put_msg->type = htonl (type); | 1079 | put_msg->type = htonl (type); |
944 | put_msg->options = htonl ((uint32_t) options); | 1080 | put_msg->options = htonl ((uint32_t) options); |
945 | put_msg->desired_replication_level = htonl (desired_replication_level); | 1081 | put_msg->desired_replication_level = htonl (desired_replication_level); |
1082 | put_msg->unique_id = ph->unique_id; | ||
946 | put_msg->expiration = GNUNET_TIME_absolute_hton (exp); | 1083 | put_msg->expiration = GNUNET_TIME_absolute_hton (exp); |
947 | put_msg->key = *key; | 1084 | put_msg->key = *key; |
948 | memcpy (&put_msg[1], data, size); | 1085 | memcpy (&put_msg[1], data, size); |
949 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, | 1086 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, |
950 | pending); | 1087 | pending); |
951 | pending->in_pending_queue = GNUNET_YES; | 1088 | pending->in_pending_queue = GNUNET_YES; |
1089 | GNUNET_CONTAINER_DLL_insert_tail (handle->put_head, | ||
1090 | handle->put_tail, | ||
1091 | ph); | ||
952 | process_pending_messages (handle); | 1092 | process_pending_messages (handle); |
1093 | return ph; | ||
1094 | } | ||
1095 | |||
1096 | |||
1097 | /** | ||
1098 | * Cancels a DHT PUT operation. Note that the PUT request may still | ||
1099 | * go out over the network (we can't stop that); However, if the PUT | ||
1100 | * has not yet been sent to the service, cancelling the PUT will stop | ||
1101 | * this from happening (but there is no way for the user of this API | ||
1102 | * to tell if that is the case). The only use for this API is to | ||
1103 | * prevent a later call to 'cont' from "GNUNET_DHT_put" (i.e. because | ||
1104 | * the system is shutting down). | ||
1105 | * | ||
1106 | * @param ph put operation to cancel ('cont' will no longer be called) | ||
1107 | */ | ||
1108 | void | ||
1109 | GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph) | ||
1110 | { | ||
1111 | struct GNUNET_DHT_Handle *handle = ph->dht_handle; | ||
1112 | |||
1113 | if (NULL != ph->pending) | ||
1114 | { | ||
1115 | GNUNET_CONTAINER_DLL_remove (handle->pending_head, | ||
1116 | handle->pending_tail, | ||
1117 | ph->pending); | ||
1118 | GNUNET_free (ph->pending); | ||
1119 | ph->pending = NULL; | ||
1120 | } | ||
1121 | if (ph->timeout_task != GNUNET_SCHEDULER_NO_TASK) | ||
1122 | { | ||
1123 | GNUNET_SCHEDULER_cancel (ph->timeout_task); | ||
1124 | ph->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
1125 | } | ||
1126 | GNUNET_CONTAINER_DLL_remove (handle->put_head, | ||
1127 | handle->put_tail, | ||
1128 | ph); | ||
1129 | GNUNET_free (ph); | ||
953 | } | 1130 | } |
954 | 1131 | ||
955 | 1132 | ||
@@ -1004,8 +1181,7 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, | |||
1004 | get_msg->desired_replication_level = htonl (desired_replication_level); | 1181 | get_msg->desired_replication_level = htonl (desired_replication_level); |
1005 | get_msg->type = htonl (type); | 1182 | get_msg->type = htonl (type); |
1006 | get_msg->key = *key; | 1183 | get_msg->key = *key; |
1007 | handle->uid_gen++; | 1184 | get_msg->unique_id = ++handle->uid_gen; |
1008 | get_msg->unique_id = handle->uid_gen; | ||
1009 | memcpy (&get_msg[1], xquery, xquery_size); | 1185 | memcpy (&get_msg[1], xquery, xquery_size); |
1010 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, | 1186 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, |
1011 | pending); | 1187 | pending); |