diff options
author | Bart Polot <bart@net.in.tum.de> | 2012-04-20 17:18:14 +0000 |
---|---|---|
committer | Bart Polot <bart@net.in.tum.de> | 2012-04-20 17:18:14 +0000 |
commit | 90c9abc573f95de334a1f61faa089e79046d2f11 (patch) | |
tree | 89b596897cf1a17ea9c048748ec063b063f8d634 /src/dht/dht_api.c | |
parent | 6c889a1786be40c0d023e8971411bc327af352c6 (diff) | |
download | gnunet-90c9abc573f95de334a1f61faa089e79046d2f11.tar.gz gnunet-90c9abc573f95de334a1f61faa089e79046d2f11.zip |
- Rewritten DHT monitoring
Diffstat (limited to 'src/dht/dht_api.c')
-rw-r--r-- | src/dht/dht_api.c | 238 |
1 files changed, 200 insertions, 38 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 1030b7999..3f850b3e7 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c | |||
@@ -171,9 +171,19 @@ struct GNUNET_DHT_MonitorHandle | |||
171 | GNUNET_HashCode *key; | 171 | GNUNET_HashCode *key; |
172 | 172 | ||
173 | /** | 173 | /** |
174 | * Callback for each received message of interest. | 174 | * Callback for each received message of type get. |
175 | */ | 175 | */ |
176 | GNUNET_DHT_MonitorCB cb; | 176 | GNUNET_DHT_MonitorGetCB get_cb; |
177 | |||
178 | /** | ||
179 | * Callback for each received message of type get response. | ||
180 | */ | ||
181 | GNUNET_DHT_MonitorGetRespCB get_resp_cb; | ||
182 | |||
183 | /** | ||
184 | * Callback for each received message of type put. | ||
185 | */ | ||
186 | GNUNET_DHT_MonitorPutCB put_cb; | ||
177 | 187 | ||
178 | /** | 188 | /** |
179 | * Closure for cb. | 189 | * Closure for cb. |
@@ -533,63 +543,205 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) | |||
533 | return GNUNET_YES; | 543 | return GNUNET_YES; |
534 | } | 544 | } |
535 | 545 | ||
536 | |||
537 | /** | 546 | /** |
538 | * Process a monitoring message from the service. | 547 | * Process a get monitor message from the service. |
539 | * | 548 | * |
540 | * @param handle The DHT handle. | 549 | * @param handle The DHT handle. |
541 | * @param msg Message from the service. | 550 | * @param msg Monitor get message from the service. |
542 | * | 551 | * |
543 | * @return GNUNET_OK if everything went fine, | 552 | * @return GNUNET_OK if everything went fine, |
544 | * GNUNET_SYSERR if the message is malformed. | 553 | * GNUNET_SYSERR if the message is malformed. |
545 | */ | 554 | */ |
546 | static int | 555 | static int |
547 | process_monitor_message (struct GNUNET_DHT_Handle *handle, | 556 | process_monitor_get_message (struct GNUNET_DHT_Handle *handle, |
548 | const struct GNUNET_MessageHeader *msg) | 557 | const struct GNUNET_DHT_MonitorGetMessage *msg) |
549 | { | 558 | { |
550 | struct GNUNET_DHT_MonitorMessage *m; | ||
551 | struct GNUNET_DHT_MonitorHandle *h; | 559 | struct GNUNET_DHT_MonitorHandle *h; |
552 | size_t msize; | 560 | size_t msize; |
553 | 561 | ||
554 | if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET || | 562 | msize = ntohs (msg->header.size); |
555 | ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT) | 563 | if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage)) |
556 | return GNUNET_SYSERR; | 564 | return GNUNET_SYSERR; |
557 | msize = ntohs (msg->size); | 565 | |
558 | if (msize < sizeof (struct GNUNET_DHT_MonitorMessage)) | 566 | h = handle->monitor_head; |
567 | while (NULL != h) | ||
568 | { | ||
569 | int type_ok; | ||
570 | int key_ok; | ||
571 | |||
572 | type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); | ||
573 | key_ok = NULL == h->key || memcmp (h->key, &msg->key, | ||
574 | sizeof (GNUNET_HashCode)) == 0; | ||
575 | if (type_ok && key_ok && NULL != h->get_cb) | ||
576 | { | ||
577 | h->get_cb (h->cb_cls, | ||
578 | ntohl (msg->options), | ||
579 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), | ||
580 | ntohl (msg->hop_count), | ||
581 | ntohl (msg->desired_replication_level), | ||
582 | ntohl (msg->get_path_length), | ||
583 | (struct GNUNET_PeerIdentity *) &msg[1], | ||
584 | &msg->key); | ||
585 | } | ||
586 | h = h->next; | ||
587 | } | ||
588 | return GNUNET_OK; | ||
589 | } | ||
590 | |||
591 | |||
592 | /** | ||
593 | * Process a get response monitor message from the service. | ||
594 | * | ||
595 | * @param handle The DHT handle. | ||
596 | * @param msg Monitor get response message from the service. | ||
597 | * | ||
598 | * @return GNUNET_OK if everything went fine, | ||
599 | * GNUNET_SYSERR if the message is malformed. | ||
600 | */ | ||
601 | static int | ||
602 | process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle, | ||
603 | const struct GNUNET_DHT_MonitorGetRespMessage | ||
604 | *msg) | ||
605 | { | ||
606 | struct GNUNET_DHT_MonitorHandle *h; | ||
607 | size_t msize; | ||
608 | |||
609 | msize = ntohs (msg->header.size); | ||
610 | if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) | ||
559 | return GNUNET_SYSERR; | 611 | return GNUNET_SYSERR; |
560 | 612 | ||
561 | m = (struct GNUNET_DHT_MonitorMessage *) msg; | ||
562 | h = handle->monitor_head; | 613 | h = handle->monitor_head; |
563 | while (NULL != h) | 614 | while (NULL != h) |
564 | { | 615 | { |
565 | if (h->type == ntohl(m->type) && | 616 | int type_ok; |
566 | (NULL == h->key || | 617 | int key_ok; |
567 | memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0)) | 618 | |
619 | type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); | ||
620 | key_ok = NULL == h->key || memcmp (h->key, &msg->key, | ||
621 | sizeof (GNUNET_HashCode)) == 0; | ||
622 | if (type_ok && key_ok && NULL != h->get_resp_cb) | ||
568 | { | 623 | { |
569 | struct GNUNET_PeerIdentity *path; | 624 | struct GNUNET_PeerIdentity *path; |
570 | uint32_t getl; | 625 | uint32_t getl; |
571 | uint32_t putl; | 626 | uint32_t putl; |
572 | 627 | ||
573 | path = (struct GNUNET_PeerIdentity *) &m[1]; | 628 | path = (struct GNUNET_PeerIdentity *) &msg[1]; |
574 | getl = ntohl (m->get_path_length); | 629 | getl = ntohl (msg->get_path_length); |
575 | putl = ntohl (m->put_path_length); | 630 | putl = ntohl (msg->put_path_length); |
576 | h->cb (h->cb_cls, ntohs(msg->type), | 631 | h->get_resp_cb (h->cb_cls, |
577 | GNUNET_TIME_absolute_ntoh(m->expiration), | 632 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), |
578 | &m->key, | 633 | path, getl, |
579 | &path[getl], putl, path, getl, | 634 | &path[getl], putl, |
580 | ntohl (m->desired_replication_level), | 635 | GNUNET_TIME_absolute_ntoh(msg->expiration_time), |
581 | ntohl (m->options), ntohl (m->type), | 636 | &msg->key, |
582 | (void *) &path[getl + putl], | 637 | (void *) &path[getl + putl], |
583 | ntohs (msg->size) - | 638 | msize - |
584 | sizeof (struct GNUNET_DHT_MonitorMessage) - | 639 | sizeof (struct GNUNET_DHT_MonitorGetRespMessage) - |
585 | sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); | 640 | sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); |
586 | } | 641 | } |
587 | h = h->next; | 642 | h = h->next; |
588 | } | 643 | } |
644 | return GNUNET_OK; | ||
645 | } | ||
646 | |||
647 | |||
648 | /** | ||
649 | * Process a put monitor message from the service. | ||
650 | * | ||
651 | * @param handle The DHT handle. | ||
652 | * @param msg Monitor put message from the service. | ||
653 | * | ||
654 | * @return GNUNET_OK if everything went fine, | ||
655 | * GNUNET_SYSERR if the message is malformed. | ||
656 | */ | ||
657 | static int | ||
658 | process_monitor_put_message (struct GNUNET_DHT_Handle *handle, | ||
659 | const struct GNUNET_DHT_MonitorPutMessage *msg) | ||
660 | { | ||
661 | struct GNUNET_DHT_MonitorHandle *h; | ||
662 | size_t msize; | ||
663 | |||
664 | msize = ntohs (msg->header.size); | ||
665 | if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage)) | ||
666 | return GNUNET_SYSERR; | ||
667 | |||
668 | h = handle->monitor_head; | ||
669 | while (NULL != h) | ||
670 | { | ||
671 | int type_ok; | ||
672 | int key_ok; | ||
673 | |||
674 | type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); | ||
675 | key_ok = NULL == h->key || memcmp (h->key, &msg->key, | ||
676 | sizeof (GNUNET_HashCode)) == 0; | ||
677 | if (type_ok && key_ok && NULL != h->put_cb) | ||
678 | { | ||
679 | struct GNUNET_PeerIdentity *path; | ||
680 | uint32_t putl; | ||
589 | 681 | ||
682 | path = (struct GNUNET_PeerIdentity *) &msg[1]; | ||
683 | putl = ntohl (msg->put_path_length); | ||
684 | h->put_cb (h->cb_cls, | ||
685 | ntohl (msg->options), | ||
686 | (enum GNUNET_BLOCK_Type) ntohl(msg->type), | ||
687 | ntohl (msg->hop_count), | ||
688 | ntohl (msg->desired_replication_level), | ||
689 | putl, path, | ||
690 | GNUNET_TIME_absolute_ntoh(msg->expiration_time), | ||
691 | &msg->key, | ||
692 | (void *) &path[putl], | ||
693 | msize - | ||
694 | sizeof (struct GNUNET_DHT_MonitorPutMessage) - | ||
695 | sizeof (struct GNUNET_PeerIdentity) * putl); | ||
696 | } | ||
697 | h = h->next; | ||
698 | } | ||
590 | return GNUNET_OK; | 699 | return GNUNET_OK; |
591 | } | 700 | } |
592 | 701 | ||
702 | |||
703 | /** | ||
704 | * Process a monitoring message from the service: demultiplex for proper type. | ||
705 | * | ||
706 | * @param handle The DHT handle. | ||
707 | * @param msg Message from the service. | ||
708 | * | ||
709 | * @return GNUNET_OK if everything went fine, | ||
710 | * GNUNET_SYSERR if the message is malformed. | ||
711 | */ | ||
712 | static int | ||
713 | process_monitor_message (struct GNUNET_DHT_Handle *handle, | ||
714 | const struct GNUNET_MessageHeader *msg) | ||
715 | { | ||
716 | switch (ntohs (msg->type)) | ||
717 | { | ||
718 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET: | ||
719 | return process_monitor_get_message(handle, | ||
720 | (struct GNUNET_DHT_MonitorGetMessage *) | ||
721 | msg); | ||
722 | |||
723 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP: | ||
724 | { | ||
725 | return process_monitor_get_resp_message( | ||
726 | handle, | ||
727 | (struct GNUNET_DHT_MonitorGetRespMessage *) msg); | ||
728 | } | ||
729 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT: | ||
730 | { | ||
731 | return process_monitor_put_message(handle, | ||
732 | (struct GNUNET_DHT_MonitorPutMessage *) | ||
733 | msg); | ||
734 | } | ||
735 | case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP: | ||
736 | /* Not implemented yet */ | ||
737 | GNUNET_break(0); | ||
738 | /* Fall through */ | ||
739 | default: | ||
740 | GNUNET_break(0); | ||
741 | return GNUNET_SYSERR; | ||
742 | } | ||
743 | } | ||
744 | |||
593 | /** | 745 | /** |
594 | * Handler for messages received from the DHT service | 746 | * Handler for messages received from the DHT service |
595 | * a demultiplexer which handles numerous message types | 747 | * a demultiplexer which handles numerous message types |
@@ -930,7 +1082,9 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) | |||
930 | * @param handle Handle to the DHT service. | 1082 | * @param handle Handle to the DHT service. |
931 | * @param type Type of blocks that are of interest. | 1083 | * @param type Type of blocks that are of interest. |
932 | * @param key Key of data of interest, NULL for all. | 1084 | * @param key Key of data of interest, NULL for all. |
933 | * @param cb Callback to process all monitored data. | 1085 | * @param get_cb Callback to process monitored get messages. |
1086 | * @param get_resp_cb Callback to process monitored get response messages. | ||
1087 | * @param put_cb Callback to process monitored put messages. | ||
934 | * @param cb_cls Closure for cb. | 1088 | * @param cb_cls Closure for cb. |
935 | * | 1089 | * |
936 | * @return Handle to stop monitoring. | 1090 | * @return Handle to stop monitoring. |
@@ -939,18 +1093,21 @@ struct GNUNET_DHT_MonitorHandle * | |||
939 | GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, | 1093 | GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, |
940 | enum GNUNET_BLOCK_Type type, | 1094 | enum GNUNET_BLOCK_Type type, |
941 | const GNUNET_HashCode *key, | 1095 | const GNUNET_HashCode *key, |
942 | GNUNET_DHT_MonitorCB cb, | 1096 | GNUNET_DHT_MonitorGetCB get_cb, |
1097 | GNUNET_DHT_MonitorGetRespCB get_resp_cb, | ||
1098 | GNUNET_DHT_MonitorPutCB put_cb, | ||
943 | void *cb_cls) | 1099 | void *cb_cls) |
944 | { | 1100 | { |
945 | struct GNUNET_DHT_MonitorHandle *h; | 1101 | struct GNUNET_DHT_MonitorHandle *h; |
946 | struct GNUNET_DHT_MonitorMessage *m; | 1102 | struct GNUNET_DHT_MonitorStartMessage *m; |
947 | struct PendingMessage *pending; | 1103 | struct PendingMessage *pending; |
948 | 1104 | ||
949 | h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle)); | 1105 | h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle)); |
950 | GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h); | 1106 | GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h); |
951 | 1107 | ||
952 | GNUNET_assert (NULL != cb); | 1108 | h->get_cb = get_cb; |
953 | h->cb = cb; | 1109 | h->get_resp_cb = get_resp_cb; |
1110 | h->put_cb = put_cb; | ||
954 | h->cb_cls = cb_cls; | 1111 | h->cb_cls = cb_cls; |
955 | h->type = type; | 1112 | h->type = type; |
956 | h->dht_handle = handle; | 1113 | h->dht_handle = handle; |
@@ -960,17 +1117,22 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, | |||
960 | memcpy (h->key, key, sizeof(GNUNET_HashCode)); | 1117 | memcpy (h->key, key, sizeof(GNUNET_HashCode)); |
961 | } | 1118 | } |
962 | 1119 | ||
963 | pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) + | 1120 | pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartMessage) + |
964 | sizeof (struct PendingMessage)); | 1121 | sizeof (struct PendingMessage)); |
965 | m = (struct GNUNET_DHT_MonitorMessage *) &pending[1]; | 1122 | m = (struct GNUNET_DHT_MonitorStartMessage *) &pending[1]; |
966 | pending->msg = &m->header; | 1123 | pending->msg = &m->header; |
967 | pending->handle = handle; | 1124 | pending->handle = handle; |
968 | pending->free_on_send = GNUNET_YES; | 1125 | pending->free_on_send = GNUNET_YES; |
969 | m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); | 1126 | m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); |
970 | m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorMessage)); | 1127 | m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartMessage)); |
971 | m->type = htonl(type); | 1128 | m->type = htonl(type); |
972 | if (NULL != key) | 1129 | m->get = (NULL != get_cb); |
1130 | m->get_resp = (NULL != get_resp_cb); | ||
1131 | m->put = (NULL != put_cb); | ||
1132 | if (NULL != key) { | ||
1133 | m->filter_key = 1; | ||
973 | memcpy (&m->key, key, sizeof(GNUNET_HashCode)); | 1134 | memcpy (&m->key, key, sizeof(GNUNET_HashCode)); |
1135 | } | ||
974 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, | 1136 | GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, |
975 | pending); | 1137 | pending); |
976 | pending->in_pending_queue = GNUNET_YES; | 1138 | pending->in_pending_queue = GNUNET_YES; |