diff options
author | Christian Grothoff <christian@grothoff.org> | 2011-03-10 09:36:50 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2011-03-10 09:36:50 +0000 |
commit | faecd2a496b5d356509b0b6b0157db34e8b3188e (patch) | |
tree | e5d54c0cb196a05e4a54031f8498e5ffe82bc3a9 /src/fs/gnunet-service-fs_pr.c | |
parent | 073c4a9ae448041fdc9a0683fed49d55ae61803e (diff) | |
download | gnunet-faecd2a496b5d356509b0b6b0157db34e8b3188e.tar.gz gnunet-faecd2a496b5d356509b0b6b0157db34e8b3188e.zip |
stuff
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r-- | src/fs/gnunet-service-fs_pr.c | 277 |
1 files changed, 268 insertions, 9 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 45767f204..58af8be65 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c | |||
@@ -65,6 +65,16 @@ struct GSF_PendingRequest | |||
65 | struct GNUNET_CONTAINER_HeapNode *hnode; | 65 | struct GNUNET_CONTAINER_HeapNode *hnode; |
66 | 66 | ||
67 | /** | 67 | /** |
68 | * Datastore queue entry for this request (or NULL for none). | ||
69 | */ | ||
70 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
71 | |||
72 | /** | ||
73 | * DHT request handle for this request (or NULL for none). | ||
74 | */ | ||
75 | struct GNUNET_DHT_GetHandle *gh; | ||
76 | |||
77 | /** | ||
68 | * Identity of the peer that we should use for the 'sender' | 78 | * Identity of the peer that we should use for the 'sender' |
69 | * (recipient of the response) when forwarding (0 for none). | 79 | * (recipient of the response) when forwarding (0 for none). |
70 | */ | 80 | */ |
@@ -500,6 +510,10 @@ clean_request (void *cls, | |||
500 | if (NULL != pr->hnode) | 510 | if (NULL != pr->hnode) |
501 | GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap, | 511 | GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap, |
502 | pr->hnode); | 512 | pr->hnode); |
513 | if (NULL != pr->qe) | ||
514 | GNUNET_DATASTORE_cancel (pr->qe); | ||
515 | if (NULL != pr->gh) | ||
516 | GNUNET_DHT_get_stop (pr->gh); | ||
503 | GNUNET_free (pr); | 517 | GNUNET_free (pr); |
504 | return GNUNET_YES; | 518 | return GNUNET_YES; |
505 | } | 519 | } |
@@ -713,6 +727,10 @@ process_reply (void *cls, | |||
713 | 1, | 727 | 1, |
714 | GNUNET_NO); | 728 | GNUNET_NO); |
715 | } | 729 | } |
730 | else | ||
731 | { | ||
732 | GSF_dht_lookup_ (pr); | ||
733 | } | ||
716 | prq->priority += pr->public_data.original_priority; | 734 | prq->priority += pr->public_data.original_priority; |
717 | pr->public_data.priority = 0; | 735 | pr->public_data.priority = 0; |
718 | pr->public_data.original_priority = 0; | 736 | pr->public_data.original_priority = 0; |
@@ -799,15 +817,15 @@ test_put_load_too_high (uint32_t priority) | |||
799 | * @param size number of bytes in data | 817 | * @param size number of bytes in data |
800 | * @param data pointer to the result data | 818 | * @param data pointer to the result data |
801 | */ | 819 | */ |
802 | void | 820 | static void |
803 | GSF_handle_dht_reply_ (void *cls, | 821 | handle_dht_reply (void *cls, |
804 | struct GNUNET_TIME_Absolute exp, | 822 | struct GNUNET_TIME_Absolute exp, |
805 | const GNUNET_HashCode *key, | 823 | const GNUNET_HashCode *key, |
806 | const struct GNUNET_PeerIdentity * const *get_path, | 824 | const struct GNUNET_PeerIdentity * const *get_path, |
807 | const struct GNUNET_PeerIdentity * const *put_path, | 825 | const struct GNUNET_PeerIdentity * const *put_path, |
808 | enum GNUNET_BLOCK_Type type, | 826 | enum GNUNET_BLOCK_Type type, |
809 | size_t size, | 827 | size_t size, |
810 | const void *data) | 828 | const void *data) |
811 | { | 829 | { |
812 | struct GSF_PendingRequest *pr = cls; | 830 | struct GSF_PendingRequest *pr = cls; |
813 | struct ProcessReplyClosure prq; | 831 | struct ProcessReplyClosure prq; |
@@ -843,6 +861,247 @@ GSF_handle_dht_reply_ (void *cls, | |||
843 | 861 | ||
844 | 862 | ||
845 | /** | 863 | /** |
864 | * Consider looking up the data in the DHT (anonymity-level permitting). | ||
865 | * | ||
866 | * @param pr the pending request to process | ||
867 | */ | ||
868 | void | ||
869 | GSF_dht_lookup_ (struct GSF_PendingRequest *pr) | ||
870 | { | ||
871 | const void *xquery; | ||
872 | size_t xquery_size; | ||
873 | struct GNUNET_PeerIdentity pi; | ||
874 | char buf[sizeof (GNUNET_HashCode) * 2]; | ||
875 | |||
876 | if (0 != pr->public_data.anonymity_level) | ||
877 | return; | ||
878 | if (NULL != pr->gh) | ||
879 | { | ||
880 | GNUNET_DHT_get_stop (pr->gh); | ||
881 | pr->gh = NULL; | ||
882 | } | ||
883 | xquery = NULL; | ||
884 | xquery_size = 0; | ||
885 | if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) | ||
886 | { | ||
887 | xquery = buf; | ||
888 | memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode)); | ||
889 | xquery_size = sizeof (GNUNET_HashCode); | ||
890 | } | ||
891 | if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY)) | ||
892 | { | ||
893 | GNUNET_PEER_resolve (pr->sender_pid, | ||
894 | &pi); | ||
895 | memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity)); | ||
896 | xquery_size += sizeof (struct GNUNET_PeerIdentity); | ||
897 | } | ||
898 | pr->gh = GNUNET_DHT_get_start (GSF_dht, | ||
899 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
900 | pr->public_data.type, | ||
901 | &pr->public_data.query, | ||
902 | DEFAULT_GET_REPLICATION, | ||
903 | GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, | ||
904 | pr->bf, | ||
905 | pr->mingle, | ||
906 | xquery, | ||
907 | xquery_size, | ||
908 | &handle_dht_reply, | ||
909 | pr); | ||
910 | } | ||
911 | |||
912 | |||
913 | /** | ||
914 | * We're processing (local) results for a search request | ||
915 | * from another peer. Pass applicable results to the | ||
916 | * peer and if we are done either clean up (operation | ||
917 | * complete) or forward to other peers (more results possible). | ||
918 | * | ||
919 | * @param cls our closure (struct LocalGetContext) | ||
920 | * @param key key for the content | ||
921 | * @param size number of bytes in data | ||
922 | * @param data content stored | ||
923 | * @param type type of the content | ||
924 | * @param priority priority of the content | ||
925 | * @param anonymity anonymity-level for the content | ||
926 | * @param expiration expiration time for the content | ||
927 | * @param uid unique identifier for the datum; | ||
928 | * maybe 0 if no unique identifier is available | ||
929 | */ | ||
930 | static void | ||
931 | process_local_reply (void *cls, | ||
932 | const GNUNET_HashCode * key, | ||
933 | size_t size, | ||
934 | const void *data, | ||
935 | enum GNUNET_BLOCK_Type type, | ||
936 | uint32_t priority, | ||
937 | uint32_t anonymity, | ||
938 | struct GNUNET_TIME_Absolute expiration, | ||
939 | uint64_t uid) | ||
940 | { | ||
941 | #if FIXME | ||
942 | struct PendingRequest *pr = cls; | ||
943 | struct ProcessReplyClosure prq; | ||
944 | struct CheckDuplicateRequestClosure cdrc; | ||
945 | GNUNET_HashCode query; | ||
946 | unsigned int old_rf; | ||
947 | |||
948 | if (NULL == key) | ||
949 | { | ||
950 | #if DEBUG_FS > 1 | ||
951 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
952 | "Done processing local replies, forwarding request to other peers.\n"); | ||
953 | #endif | ||
954 | pr->qe = NULL; | ||
955 | if (pr->client_request_list != NULL) | ||
956 | { | ||
957 | GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, | ||
958 | GNUNET_YES); | ||
959 | /* Figure out if this is a duplicate request and possibly | ||
960 | merge 'struct PendingRequest' entries */ | ||
961 | cdrc.have = NULL; | ||
962 | cdrc.pr = pr; | ||
963 | GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, | ||
964 | &pr->query, | ||
965 | &check_duplicate_request_client, | ||
966 | &cdrc); | ||
967 | if (cdrc.have != NULL) | ||
968 | { | ||
969 | #if DEBUG_FS | ||
970 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
971 | "Received request for block `%s' twice from client, will only request once.\n", | ||
972 | GNUNET_h2s (&pr->query)); | ||
973 | #endif | ||
974 | |||
975 | destroy_pending_request (pr); | ||
976 | return; | ||
977 | } | ||
978 | } | ||
979 | if (pr->local_only == GNUNET_YES) | ||
980 | { | ||
981 | destroy_pending_request (pr); | ||
982 | return; | ||
983 | } | ||
984 | /* no more results */ | ||
985 | if (pr->task == GNUNET_SCHEDULER_NO_TASK) | ||
986 | pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task, | ||
987 | pr); | ||
988 | return; | ||
989 | } | ||
990 | #if DEBUG_FS | ||
991 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
992 | "New local response to `%s' of type %u.\n", | ||
993 | GNUNET_h2s (key), | ||
994 | type); | ||
995 | #endif | ||
996 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) | ||
997 | { | ||
998 | #if DEBUG_FS | ||
999 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1000 | "Found ONDEMAND block, performing on-demand encoding\n"); | ||
1001 | #endif | ||
1002 | GNUNET_STATISTICS_update (stats, | ||
1003 | gettext_noop ("# on-demand blocks matched requests"), | ||
1004 | 1, | ||
1005 | GNUNET_NO); | ||
1006 | if (GNUNET_OK != | ||
1007 | GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, | ||
1008 | anonymity, expiration, uid, | ||
1009 | &process_local_reply, | ||
1010 | pr)) | ||
1011 | if (pr->qe != NULL) | ||
1012 | { | ||
1013 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | ||
1014 | } | ||
1015 | return; | ||
1016 | } | ||
1017 | old_rf = pr->results_found; | ||
1018 | memset (&prq, 0, sizeof (prq)); | ||
1019 | prq.data = data; | ||
1020 | prq.expiration = expiration; | ||
1021 | prq.size = size; | ||
1022 | if (GNUNET_OK != | ||
1023 | GNUNET_BLOCK_get_key (block_ctx, | ||
1024 | type, | ||
1025 | data, | ||
1026 | size, | ||
1027 | &query)) | ||
1028 | { | ||
1029 | GNUNET_break (0); | ||
1030 | GNUNET_DATASTORE_remove (dsh, | ||
1031 | key, | ||
1032 | size, data, | ||
1033 | -1, -1, | ||
1034 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1035 | NULL, NULL); | ||
1036 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | ||
1037 | return; | ||
1038 | } | ||
1039 | prq.type = type; | ||
1040 | prq.priority = priority; | ||
1041 | prq.finished = GNUNET_NO; | ||
1042 | prq.request_found = GNUNET_NO; | ||
1043 | prq.anonymity_level = anonymity; | ||
1044 | if ( (old_rf == 0) && | ||
1045 | (pr->results_found == 0) ) | ||
1046 | update_datastore_delays (pr->start_time); | ||
1047 | process_reply (&prq, key, pr); | ||
1048 | if (prq.finished == GNUNET_YES) | ||
1049 | return; | ||
1050 | if (pr->qe == NULL) | ||
1051 | return; /* done here */ | ||
1052 | if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) | ||
1053 | { | ||
1054 | pr->local_only = GNUNET_YES; /* do not forward */ | ||
1055 | GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); | ||
1056 | return; | ||
1057 | } | ||
1058 | if ( (pr->client_request_list == NULL) && | ||
1059 | ( (GNUNET_YES == test_get_load_too_high (0)) || | ||
1060 | (pr->results_found > 5 + 2 * pr->priority) ) ) | ||
1061 | { | ||
1062 | #if DEBUG_FS > 2 | ||
1063 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1064 | "Load too high, done with request\n"); | ||
1065 | #endif | ||
1066 | GNUNET_STATISTICS_update (stats, | ||
1067 | gettext_noop ("# processing result set cut short due to load"), | ||
1068 | 1, | ||
1069 | GNUNET_NO); | ||
1070 | GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); | ||
1071 | return; | ||
1072 | } | ||
1073 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | ||
1074 | #endif | ||
1075 | } | ||
1076 | |||
1077 | |||
1078 | /** | ||
1079 | * Look up the request in the local datastore. | ||
1080 | * | ||
1081 | * @param pr the pending request to process | ||
1082 | * @param cont function to call at the end | ||
1083 | * @param cont_cls closure for cont | ||
1084 | */ | ||
1085 | void | ||
1086 | GSF_local_lookup_ (struct GSF_PendingRequest *pr, | ||
1087 | GSF_LocalLookupContinuation cont, | ||
1088 | void *cont_cls) | ||
1089 | { | ||
1090 | // FIXME: fix process_local_reply / cont! | ||
1091 | GNUNET_assert (NULL == pr->gh); | ||
1092 | pr->qe = GNUNET_DATASTORE_get (GSF_dsh, | ||
1093 | &pr->public_data.query, | ||
1094 | pr->public_data.type, | ||
1095 | 1 /* queue priority */, | ||
1096 | 1 /* max queue size */, | ||
1097 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1098 | &process_local_reply, | ||
1099 | pr); | ||
1100 | } | ||
1101 | |||
1102 | |||
1103 | |||
1104 | /** | ||
846 | * Handle P2P "CONTENT" message. Checks that the message is | 1105 | * Handle P2P "CONTENT" message. Checks that the message is |
847 | * well-formed and then checks if there are any pending requests for | 1106 | * well-formed and then checks if there are any pending requests for |
848 | * this content and possibly passes it on (to local clients or other | 1107 | * this content and possibly passes it on (to local clients or other |