diff options
author | Christian Grothoff <christian@grothoff.org> | 2011-04-26 18:19:15 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2011-04-26 18:19:15 +0000 |
commit | 27ed8fcbc85a361864948edb517d47804c2b5a56 (patch) | |
tree | 01626713ea5b2ead4691f13eb66a1574b1c0c7fd /src/fs/gnunet-service-fs_pr.c | |
parent | b6c71d97d2a4bb3cb0e0e0ac1cd2a4e145748cc6 (diff) | |
download | gnunet-27ed8fcbc85a361864948edb517d47804c2b5a56.tar.gz gnunet-27ed8fcbc85a361864948edb517d47804c2b5a56.zip |
datastore and fs fixes from Easter
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r-- | src/fs/gnunet-service-fs_pr.c | 147 |
1 files changed, 117 insertions, 30 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 7406bed0f..c1074e8bf 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c | |||
@@ -100,6 +100,20 @@ struct GSF_PendingRequest | |||
100 | GNUNET_PEER_Id sender_pid; | 100 | GNUNET_PEER_Id sender_pid; |
101 | 101 | ||
102 | /** | 102 | /** |
103 | * Current offset for querying our local datastore for results. | ||
104 | * Starts at a random value, incremented until we get the same | ||
105 | * UID again (detected using 'first_uid'), which is then used | ||
106 | * to termiante the iteration. | ||
107 | */ | ||
108 | uint64_t local_result_offset; | ||
109 | |||
110 | /** | ||
111 | * Unique ID of the first result from the local datastore; | ||
112 | * used to detect wrap-around of the offset. | ||
113 | */ | ||
114 | uint64_t first_uid; | ||
115 | |||
116 | /** | ||
103 | * Number of valid entries in the 'replies_seen' array. | 117 | * Number of valid entries in the 'replies_seen' array. |
104 | */ | 118 | */ |
105 | unsigned int replies_seen_count; | 119 | unsigned int replies_seen_count; |
@@ -113,7 +127,7 @@ struct GSF_PendingRequest | |||
113 | * Mingle value we currently use for the bf. | 127 | * Mingle value we currently use for the bf. |
114 | */ | 128 | */ |
115 | uint32_t mingle; | 129 | uint32_t mingle; |
116 | 130 | ||
117 | }; | 131 | }; |
118 | 132 | ||
119 | 133 | ||
@@ -273,6 +287,8 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, | |||
273 | type); | 287 | type); |
274 | #endif | 288 | #endif |
275 | pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); | 289 | pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); |
290 | pr->local_result_offset = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, | ||
291 | UINT64_MAX); | ||
276 | pr->public_data.query = *query; | 292 | pr->public_data.query = *query; |
277 | if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type) | 293 | if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type) |
278 | { | 294 | { |
@@ -535,7 +551,20 @@ clean_request (void *cls, | |||
535 | void *value) | 551 | void *value) |
536 | { | 552 | { |
537 | struct GSF_PendingRequest *pr = value; | 553 | struct GSF_PendingRequest *pr = value; |
538 | 554 | GSF_LocalLookupContinuation cont; | |
555 | |||
556 | #if DEBUG_FS | ||
557 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
558 | "Cleaning up pending request for `%s'.\n", | ||
559 | GNUNET_h2s (key)); | ||
560 | #endif | ||
561 | if (NULL != (cont = pr->llc_cont)) | ||
562 | { | ||
563 | pr->llc_cont = NULL; | ||
564 | cont (pr->llc_cont_cls, | ||
565 | pr, | ||
566 | pr->local_result); | ||
567 | } | ||
539 | GSF_plan_notify_request_done_ (pr); | 568 | GSF_plan_notify_request_done_ (pr); |
540 | GNUNET_free_non_null (pr->replies_seen); | 569 | GNUNET_free_non_null (pr->replies_seen); |
541 | if (NULL != pr->bf) | 570 | if (NULL != pr->bf) |
@@ -560,6 +589,7 @@ clean_request (void *cls, | |||
560 | void | 589 | void |
561 | GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) | 590 | GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) |
562 | { | 591 | { |
592 | if (NULL == pr_map) return; /* already cleaned up! */ | ||
563 | GNUNET_assert (GNUNET_OK == | 593 | GNUNET_assert (GNUNET_OK == |
564 | GNUNET_CONTAINER_multihashmap_remove (pr_map, | 594 | GNUNET_CONTAINER_multihashmap_remove (pr_map, |
565 | &pr->public_data.query, | 595 | &pr->public_data.query, |
@@ -1023,13 +1053,22 @@ process_local_reply (void *cls, | |||
1023 | GNUNET_HashCode query; | 1053 | GNUNET_HashCode query; |
1024 | unsigned int old_rf; | 1054 | unsigned int old_rf; |
1025 | 1055 | ||
1056 | pr->qe = NULL; | ||
1057 | if (0 == pr->replies_seen_count) | ||
1058 | { | ||
1059 | pr->first_uid = uid; | ||
1060 | } | ||
1061 | else | ||
1062 | { | ||
1063 | if (uid == pr->first_uid) | ||
1064 | key = NULL; /* all replies seen! */ | ||
1065 | } | ||
1026 | if (NULL == key) | 1066 | if (NULL == key) |
1027 | { | 1067 | { |
1028 | #if DEBUG_FS | 1068 | #if DEBUG_FS |
1029 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1069 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1030 | "No further local responses available.\n"); | 1070 | "No further local responses available.\n"); |
1031 | #endif | 1071 | #endif |
1032 | pr->qe = NULL; | ||
1033 | if (NULL != (cont = pr->llc_cont)) | 1072 | if (NULL != (cont = pr->llc_cont)) |
1034 | { | 1073 | { |
1035 | pr->llc_cont = NULL; | 1074 | pr->llc_cont = NULL; |
@@ -1041,9 +1080,10 @@ process_local_reply (void *cls, | |||
1041 | } | 1080 | } |
1042 | #if DEBUG_FS | 1081 | #if DEBUG_FS |
1043 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1082 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1044 | "New local response to `%s' of type %u.\n", | 1083 | "Received reply for `%s' of type %d with UID %llu from datastore.\n", |
1045 | GNUNET_h2s (key), | 1084 | GNUNET_h2s (key), |
1046 | type); | 1085 | type, |
1086 | (unsigned long long) uid); | ||
1047 | #endif | 1087 | #endif |
1048 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) | 1088 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) |
1049 | { | 1089 | { |
@@ -1061,8 +1101,22 @@ process_local_reply (void *cls, | |||
1061 | &process_local_reply, | 1101 | &process_local_reply, |
1062 | pr)) | 1102 | pr)) |
1063 | { | 1103 | { |
1064 | if (pr->qe != NULL) | 1104 | pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, |
1065 | GNUNET_DATASTORE_iterate_get_next (GSF_dsh); | 1105 | pr->local_result_offset - 1, |
1106 | &pr->public_data.query, | ||
1107 | pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK | ||
1108 | ? GNUNET_BLOCK_TYPE_ANY | ||
1109 | : pr->public_data.type, | ||
1110 | (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) | ||
1111 | ? UINT_MAX | ||
1112 | : 1 /* queue priority */, | ||
1113 | (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) | ||
1114 | ? UINT_MAX | ||
1115 | : 1 /* max queue size */, | ||
1116 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1117 | &process_local_reply, | ||
1118 | pr); | ||
1119 | GNUNET_assert (NULL != pr->qe); | ||
1066 | } | 1120 | } |
1067 | return; | 1121 | return; |
1068 | } | 1122 | } |
@@ -1085,7 +1139,22 @@ process_local_reply (void *cls, | |||
1085 | -1, -1, | 1139 | -1, -1, |
1086 | GNUNET_TIME_UNIT_FOREVER_REL, | 1140 | GNUNET_TIME_UNIT_FOREVER_REL, |
1087 | NULL, NULL); | 1141 | NULL, NULL); |
1088 | GNUNET_DATASTORE_iterate_get_next (GSF_dsh); | 1142 | pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, |
1143 | pr->local_result_offset - 1, | ||
1144 | &pr->public_data.query, | ||
1145 | pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK | ||
1146 | ? GNUNET_BLOCK_TYPE_ANY | ||
1147 | : pr->public_data.type, | ||
1148 | (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) | ||
1149 | ? UINT_MAX | ||
1150 | : 1 /* queue priority */, | ||
1151 | (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) | ||
1152 | ? UINT_MAX | ||
1153 | : 1 /* max queue size */, | ||
1154 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1155 | &process_local_reply, | ||
1156 | pr); | ||
1157 | GNUNET_assert (NULL != pr->qe); | ||
1089 | return; | 1158 | return; |
1090 | } | 1159 | } |
1091 | prq.type = type; | 1160 | prq.type = type; |
@@ -1097,12 +1166,16 @@ process_local_reply (void *cls, | |||
1097 | GSF_update_datastore_delay_ (pr->public_data.start_time); | 1166 | GSF_update_datastore_delay_ (pr->public_data.start_time); |
1098 | process_reply (&prq, key, pr); | 1167 | process_reply (&prq, key, pr); |
1099 | pr->local_result = prq.eval; | 1168 | pr->local_result = prq.eval; |
1100 | if (pr->qe == NULL) | 1169 | if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) |
1101 | { | 1170 | { |
1102 | #if DEBUG_FS | 1171 | if (NULL != (cont = pr->llc_cont)) |
1103 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1172 | { |
1104 | "Request cancelled, not asking datastore for more\n"); | 1173 | pr->llc_cont = NULL; |
1105 | #endif | 1174 | cont (pr->llc_cont_cls, |
1175 | pr, | ||
1176 | pr->local_result); | ||
1177 | } | ||
1178 | return; | ||
1106 | } | 1179 | } |
1107 | if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && | 1180 | if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && |
1108 | ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) || | 1181 | ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) || |
@@ -1116,8 +1189,6 @@ process_local_reply (void *cls, | |||
1116 | gettext_noop ("# processing result set cut short due to load"), | 1189 | gettext_noop ("# processing result set cut short due to load"), |
1117 | 1, | 1190 | 1, |
1118 | GNUNET_NO); | 1191 | GNUNET_NO); |
1119 | GNUNET_DATASTORE_cancel (pr->qe); | ||
1120 | pr->qe = NULL; | ||
1121 | if (NULL != (cont = pr->llc_cont)) | 1192 | if (NULL != (cont = pr->llc_cont)) |
1122 | { | 1193 | { |
1123 | pr->llc_cont = NULL; | 1194 | pr->llc_cont = NULL; |
@@ -1127,7 +1198,22 @@ process_local_reply (void *cls, | |||
1127 | } | 1198 | } |
1128 | return; | 1199 | return; |
1129 | } | 1200 | } |
1130 | GNUNET_DATASTORE_iterate_get_next (GSF_dsh); | 1201 | pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, |
1202 | pr->local_result_offset++, | ||
1203 | &pr->public_data.query, | ||
1204 | pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK | ||
1205 | ? GNUNET_BLOCK_TYPE_ANY | ||
1206 | : pr->public_data.type, | ||
1207 | (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) | ||
1208 | ? UINT_MAX | ||
1209 | : 1 /* queue priority */, | ||
1210 | (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) | ||
1211 | ? UINT_MAX | ||
1212 | : 1 /* max queue size */, | ||
1213 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1214 | &process_local_reply, | ||
1215 | pr); | ||
1216 | GNUNET_assert (NULL != pr->qe); | ||
1131 | } | 1217 | } |
1132 | 1218 | ||
1133 | 1219 | ||
@@ -1147,20 +1233,21 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, | |||
1147 | GNUNET_assert (NULL == pr->llc_cont); | 1233 | GNUNET_assert (NULL == pr->llc_cont); |
1148 | pr->llc_cont = cont; | 1234 | pr->llc_cont = cont; |
1149 | pr->llc_cont_cls = cont_cls; | 1235 | pr->llc_cont_cls = cont_cls; |
1150 | pr->qe = GNUNET_DATASTORE_iterate_key (GSF_dsh, | 1236 | pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, |
1151 | &pr->public_data.query, | 1237 | pr->local_result_offset++, |
1152 | pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK | 1238 | &pr->public_data.query, |
1153 | ? GNUNET_BLOCK_TYPE_ANY | 1239 | pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK |
1154 | : pr->public_data.type, | 1240 | ? GNUNET_BLOCK_TYPE_ANY |
1155 | (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) | 1241 | : pr->public_data.type, |
1156 | ? UINT_MAX | 1242 | (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) |
1157 | : 1 /* queue priority */, | 1243 | ? UINT_MAX |
1158 | (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) | 1244 | : 1 /* queue priority */, |
1159 | ? UINT_MAX | 1245 | (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) |
1160 | : 1 /* max queue size */, | 1246 | ? UINT_MAX |
1161 | GNUNET_TIME_UNIT_FOREVER_REL, | 1247 | : 1 /* max queue size */, |
1162 | &process_local_reply, | 1248 | GNUNET_TIME_UNIT_FOREVER_REL, |
1163 | pr); | 1249 | &process_local_reply, |
1250 | pr); | ||
1164 | } | 1251 | } |
1165 | 1252 | ||
1166 | 1253 | ||