aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-04-26 18:19:15 +0000
committerChristian Grothoff <christian@grothoff.org>2011-04-26 18:19:15 +0000
commit27ed8fcbc85a361864948edb517d47804c2b5a56 (patch)
tree01626713ea5b2ead4691f13eb66a1574b1c0c7fd /src/fs/gnunet-service-fs_pr.c
parentb6c71d97d2a4bb3cb0e0e0ac1cd2a4e145748cc6 (diff)
downloadgnunet-27ed8fcbc85a361864948edb517d47804c2b5a56.tar.gz
gnunet-27ed8fcbc85a361864948edb517d47804c2b5a56.zip
datastore and fs fixes from Easter
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c147
1 files changed, 117 insertions, 30 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 7406bed0f..c1074e8bf 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -100,6 +100,20 @@ struct GSF_PendingRequest
100 GNUNET_PEER_Id sender_pid; 100 GNUNET_PEER_Id sender_pid;
101 101
102 /** 102 /**
103 * Current offset for querying our local datastore for results.
104 * Starts at a random value, incremented until we get the same
105 * UID again (detected using 'first_uid'), which is then used
106 * to termiante the iteration.
107 */
108 uint64_t local_result_offset;
109
110 /**
111 * Unique ID of the first result from the local datastore;
112 * used to detect wrap-around of the offset.
113 */
114 uint64_t first_uid;
115
116 /**
103 * Number of valid entries in the 'replies_seen' array. 117 * Number of valid entries in the 'replies_seen' array.
104 */ 118 */
105 unsigned int replies_seen_count; 119 unsigned int replies_seen_count;
@@ -113,7 +127,7 @@ struct GSF_PendingRequest
113 * Mingle value we currently use for the bf. 127 * Mingle value we currently use for the bf.
114 */ 128 */
115 uint32_t mingle; 129 uint32_t mingle;
116 130
117}; 131};
118 132
119 133
@@ -273,6 +287,8 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
273 type); 287 type);
274#endif 288#endif
275 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); 289 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
290 pr->local_result_offset = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
291 UINT64_MAX);
276 pr->public_data.query = *query; 292 pr->public_data.query = *query;
277 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type) 293 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
278 { 294 {
@@ -535,7 +551,20 @@ clean_request (void *cls,
535 void *value) 551 void *value)
536{ 552{
537 struct GSF_PendingRequest *pr = value; 553 struct GSF_PendingRequest *pr = value;
538 554 GSF_LocalLookupContinuation cont;
555
556#if DEBUG_FS
557 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558 "Cleaning up pending request for `%s'.\n",
559 GNUNET_h2s (key));
560#endif
561 if (NULL != (cont = pr->llc_cont))
562 {
563 pr->llc_cont = NULL;
564 cont (pr->llc_cont_cls,
565 pr,
566 pr->local_result);
567 }
539 GSF_plan_notify_request_done_ (pr); 568 GSF_plan_notify_request_done_ (pr);
540 GNUNET_free_non_null (pr->replies_seen); 569 GNUNET_free_non_null (pr->replies_seen);
541 if (NULL != pr->bf) 570 if (NULL != pr->bf)
@@ -560,6 +589,7 @@ clean_request (void *cls,
560void 589void
561GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) 590GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
562{ 591{
592 if (NULL == pr_map) return; /* already cleaned up! */
563 GNUNET_assert (GNUNET_OK == 593 GNUNET_assert (GNUNET_OK ==
564 GNUNET_CONTAINER_multihashmap_remove (pr_map, 594 GNUNET_CONTAINER_multihashmap_remove (pr_map,
565 &pr->public_data.query, 595 &pr->public_data.query,
@@ -1023,13 +1053,22 @@ process_local_reply (void *cls,
1023 GNUNET_HashCode query; 1053 GNUNET_HashCode query;
1024 unsigned int old_rf; 1054 unsigned int old_rf;
1025 1055
1056 pr->qe = NULL;
1057 if (0 == pr->replies_seen_count)
1058 {
1059 pr->first_uid = uid;
1060 }
1061 else
1062 {
1063 if (uid == pr->first_uid)
1064 key = NULL; /* all replies seen! */
1065 }
1026 if (NULL == key) 1066 if (NULL == key)
1027 { 1067 {
1028#if DEBUG_FS 1068#if DEBUG_FS
1029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1069 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030 "No further local responses available.\n"); 1070 "No further local responses available.\n");
1031#endif 1071#endif
1032 pr->qe = NULL;
1033 if (NULL != (cont = pr->llc_cont)) 1072 if (NULL != (cont = pr->llc_cont))
1034 { 1073 {
1035 pr->llc_cont = NULL; 1074 pr->llc_cont = NULL;
@@ -1041,9 +1080,10 @@ process_local_reply (void *cls,
1041 } 1080 }
1042#if DEBUG_FS 1081#if DEBUG_FS
1043 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1082 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044 "New local response to `%s' of type %u.\n", 1083 "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1045 GNUNET_h2s (key), 1084 GNUNET_h2s (key),
1046 type); 1085 type,
1086 (unsigned long long) uid);
1047#endif 1087#endif
1048 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 1088 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1049 { 1089 {
@@ -1061,8 +1101,22 @@ process_local_reply (void *cls,
1061 &process_local_reply, 1101 &process_local_reply,
1062 pr)) 1102 pr))
1063 { 1103 {
1064 if (pr->qe != NULL) 1104 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1065 GNUNET_DATASTORE_iterate_get_next (GSF_dsh); 1105 pr->local_result_offset - 1,
1106 &pr->public_data.query,
1107 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
1108 ? GNUNET_BLOCK_TYPE_ANY
1109 : pr->public_data.type,
1110 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1111 ? UINT_MAX
1112 : 1 /* queue priority */,
1113 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1114 ? UINT_MAX
1115 : 1 /* max queue size */,
1116 GNUNET_TIME_UNIT_FOREVER_REL,
1117 &process_local_reply,
1118 pr);
1119 GNUNET_assert (NULL != pr->qe);
1066 } 1120 }
1067 return; 1121 return;
1068 } 1122 }
@@ -1085,7 +1139,22 @@ process_local_reply (void *cls,
1085 -1, -1, 1139 -1, -1,
1086 GNUNET_TIME_UNIT_FOREVER_REL, 1140 GNUNET_TIME_UNIT_FOREVER_REL,
1087 NULL, NULL); 1141 NULL, NULL);
1088 GNUNET_DATASTORE_iterate_get_next (GSF_dsh); 1142 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1143 pr->local_result_offset - 1,
1144 &pr->public_data.query,
1145 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
1146 ? GNUNET_BLOCK_TYPE_ANY
1147 : pr->public_data.type,
1148 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1149 ? UINT_MAX
1150 : 1 /* queue priority */,
1151 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1152 ? UINT_MAX
1153 : 1 /* max queue size */,
1154 GNUNET_TIME_UNIT_FOREVER_REL,
1155 &process_local_reply,
1156 pr);
1157 GNUNET_assert (NULL != pr->qe);
1089 return; 1158 return;
1090 } 1159 }
1091 prq.type = type; 1160 prq.type = type;
@@ -1097,12 +1166,16 @@ process_local_reply (void *cls,
1097 GSF_update_datastore_delay_ (pr->public_data.start_time); 1166 GSF_update_datastore_delay_ (pr->public_data.start_time);
1098 process_reply (&prq, key, pr); 1167 process_reply (&prq, key, pr);
1099 pr->local_result = prq.eval; 1168 pr->local_result = prq.eval;
1100 if (pr->qe == NULL) 1169 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
1101 { 1170 {
1102#if DEBUG_FS 1171 if (NULL != (cont = pr->llc_cont))
1103 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1172 {
1104 "Request cancelled, not asking datastore for more\n"); 1173 pr->llc_cont = NULL;
1105#endif 1174 cont (pr->llc_cont_cls,
1175 pr,
1176 pr->local_result);
1177 }
1178 return;
1106 } 1179 }
1107 if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && 1180 if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1108 ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) || 1181 ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
@@ -1116,8 +1189,6 @@ process_local_reply (void *cls,
1116 gettext_noop ("# processing result set cut short due to load"), 1189 gettext_noop ("# processing result set cut short due to load"),
1117 1, 1190 1,
1118 GNUNET_NO); 1191 GNUNET_NO);
1119 GNUNET_DATASTORE_cancel (pr->qe);
1120 pr->qe = NULL;
1121 if (NULL != (cont = pr->llc_cont)) 1192 if (NULL != (cont = pr->llc_cont))
1122 { 1193 {
1123 pr->llc_cont = NULL; 1194 pr->llc_cont = NULL;
@@ -1127,7 +1198,22 @@ process_local_reply (void *cls,
1127 } 1198 }
1128 return; 1199 return;
1129 } 1200 }
1130 GNUNET_DATASTORE_iterate_get_next (GSF_dsh); 1201 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1202 pr->local_result_offset++,
1203 &pr->public_data.query,
1204 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
1205 ? GNUNET_BLOCK_TYPE_ANY
1206 : pr->public_data.type,
1207 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1208 ? UINT_MAX
1209 : 1 /* queue priority */,
1210 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1211 ? UINT_MAX
1212 : 1 /* max queue size */,
1213 GNUNET_TIME_UNIT_FOREVER_REL,
1214 &process_local_reply,
1215 pr);
1216 GNUNET_assert (NULL != pr->qe);
1131} 1217}
1132 1218
1133 1219
@@ -1147,20 +1233,21 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1147 GNUNET_assert (NULL == pr->llc_cont); 1233 GNUNET_assert (NULL == pr->llc_cont);
1148 pr->llc_cont = cont; 1234 pr->llc_cont = cont;
1149 pr->llc_cont_cls = cont_cls; 1235 pr->llc_cont_cls = cont_cls;
1150 pr->qe = GNUNET_DATASTORE_iterate_key (GSF_dsh, 1236 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1151 &pr->public_data.query, 1237 pr->local_result_offset++,
1152 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 1238 &pr->public_data.query,
1153 ? GNUNET_BLOCK_TYPE_ANY 1239 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
1154 : pr->public_data.type, 1240 ? GNUNET_BLOCK_TYPE_ANY
1155 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) 1241 : pr->public_data.type,
1156 ? UINT_MAX 1242 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1157 : 1 /* queue priority */, 1243 ? UINT_MAX
1158 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) 1244 : 1 /* queue priority */,
1159 ? UINT_MAX 1245 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1160 : 1 /* max queue size */, 1246 ? UINT_MAX
1161 GNUNET_TIME_UNIT_FOREVER_REL, 1247 : 1 /* max queue size */,
1162 &process_local_reply, 1248 GNUNET_TIME_UNIT_FOREVER_REL,
1163 pr); 1249 &process_local_reply,
1250 pr);
1164} 1251}
1165 1252
1166 1253