aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-04-26 18:19:15 +0000
committerChristian Grothoff <christian@grothoff.org>2011-04-26 18:19:15 +0000
commit27ed8fcbc85a361864948edb517d47804c2b5a56 (patch)
tree01626713ea5b2ead4691f13eb66a1574b1c0c7fd /src/fs
parentb6c71d97d2a4bb3cb0e0e0ac1cd2a4e145748cc6 (diff)
downloadgnunet-27ed8fcbc85a361864948edb517d47804c2b5a56.tar.gz
gnunet-27ed8fcbc85a361864948edb517d47804c2b5a56.zip
datastore and fs fixes from Easter
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/Makefile.am4
-rw-r--r--src/fs/fs_download.c19
-rw-r--r--src/fs/fs_test_lib_data.conf6
-rw-r--r--src/fs/gnunet-pseudonym.c2
-rw-r--r--src/fs/gnunet-service-fs_cp.c7
-rw-r--r--src/fs/gnunet-service-fs_indexing.c2
-rw-r--r--src/fs/gnunet-service-fs_indexing.h2
-rw-r--r--src/fs/gnunet-service-fs_pe.c2
-rw-r--r--src/fs/gnunet-service-fs_pr.c147
-rw-r--r--src/fs/gnunet-service-fs_put.c174
-rw-r--r--src/fs/test_fs_download_data.conf3
-rwxr-xr-xsrc/fs/test_gnunet_fs_idx.py.in2
-rw-r--r--src/fs/test_gnunet_fs_ns_data.conf2
-rw-r--r--src/fs/test_gnunet_service_fs_migration_data.conf2
14 files changed, 242 insertions, 132 deletions
diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am
index f980f4206..20aa652ae 100644
--- a/src/fs/Makefile.am
+++ b/src/fs/Makefile.am
@@ -1,4 +1,3 @@
1
2INCLUDES = -I$(top_srcdir)/src/include 1INCLUDES = -I$(top_srcdir)/src/include
3 2
4if MINGW 3if MINGW
@@ -173,8 +172,7 @@ check_SCRIPTS = \
173 test_gnunet_fs_idx.py 172 test_gnunet_fs_idx.py
174endif 173endif
175 174
176#if !DISABLE_TEST_RUN 175if !DISABLE_TEST_RUN
177if 0
178TESTS = \ 176TESTS = \
179 test_fs_directory \ 177 test_fs_directory \
180 test_fs_download \ 178 test_fs_download \
diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c
index 8192b8c1f..8eb2b4331 100644
--- a/src/fs/fs_download.c
+++ b/src/fs/fs_download.c
@@ -756,10 +756,12 @@ try_top_down_reconstruction (struct GNUNET_FS_DownloadContext *dc,
756 child_block_size = GNUNET_FS_tree_compute_tree_size (drc->depth); 756 child_block_size = GNUNET_FS_tree_compute_tree_size (drc->depth);
757 GNUNET_assert (0 == (drc->offset - dr->offset) % child_block_size); 757 GNUNET_assert (0 == (drc->offset - dr->offset) % child_block_size);
758 chk_off = (drc->offset - dr->offset) / child_block_size; 758 chk_off = (drc->offset - dr->offset) / child_block_size;
759 GNUNET_assert (drc->state == BRS_INIT); 759 if (drc->state == BRS_INIT)
760 drc->state = BRS_CHK_SET; 760 {
761 drc->chk = chks[chk_off]; 761 drc->state = BRS_CHK_SET;
762 try_top_down_reconstruction (dc, drc); 762 drc->chk = chks[chk_off];
763 try_top_down_reconstruction (dc, drc);
764 }
763 if (drc->state != BRS_DOWNLOAD_UP) 765 if (drc->state != BRS_DOWNLOAD_UP)
764 up_done = GNUNET_NO; /* children not all done */ 766 up_done = GNUNET_NO; /* children not all done */
765 } 767 }
@@ -815,10 +817,11 @@ schedule_block_download (struct GNUNET_FS_DownloadContext *dc,
815 dr->depth, 817 dr->depth,
816 GNUNET_h2s (&dr->chk.query)); 818 GNUNET_h2s (&dr->chk.query));
817#endif 819#endif
818 GNUNET_assert (GNUNET_NO == 820 if (GNUNET_NO !=
819 GNUNET_CONTAINER_multihashmap_contains_value (dc->active, 821 GNUNET_CONTAINER_multihashmap_contains_value (dc->active,
820 &dr->chk.query, 822 &dr->chk.query,
821 dr)); 823 dr))
824 return; /* already active */
822 GNUNET_CONTAINER_multihashmap_put (dc->active, 825 GNUNET_CONTAINER_multihashmap_put (dc->active,
823 &dr->chk.query, 826 &dr->chk.query,
824 dr, 827 dr,
diff --git a/src/fs/fs_test_lib_data.conf b/src/fs/fs_test_lib_data.conf
index 68c5166b3..204bb90cf 100644
--- a/src/fs/fs_test_lib_data.conf
+++ b/src/fs/fs_test_lib_data.conf
@@ -43,7 +43,7 @@ HOSTNAME = localhost
43#TOTAL_QUOTA_OUT = 9321 43#TOTAL_QUOTA_OUT = 9321
44TOTAL_QUOTA_IN = 3932160 44TOTAL_QUOTA_IN = 3932160
45TOTAL_QUOTA_OUT = 3932160 45TOTAL_QUOTA_OUT = 3932160
46DEBUG = YES 46#DEBUG = YES
47#PREFIX = valgrind --tool=memcheck --leak-check=yes 47#PREFIX = valgrind --tool=memcheck --leak-check=yes
48#BINARY = /home/grothoff/bin/gnunet-service-core 48#BINARY = /home/grothoff/bin/gnunet-service-core
49 49
@@ -53,8 +53,8 @@ HOSTNAME = localhost
53#OPTIONS = -L DEBUG 53#OPTIONS = -L DEBUG
54CONTENT_CACHING = NO 54CONTENT_CACHING = NO
55CONTENT_PUSHING = NO 55CONTENT_PUSHING = NO
56DEBUG = YES 56# DEBUG = YES
57#PREFIX = valgrind --tool=memcheck --leak-check=yes 57# PREFIX = valgrind --tool=memcheck --leak-check=yes --trace-children=yes
58#BINARY = /home/grothoff/gn9/bin/gnunet-service-fs 58#BINARY = /home/grothoff/gn9/bin/gnunet-service-fs
59#PREFIX = xterm -e gdb -x cmd --args 59#PREFIX = xterm -e gdb -x cmd --args
60 60
diff --git a/src/fs/gnunet-pseudonym.c b/src/fs/gnunet-pseudonym.c
index 769b4239d..68a760867 100644
--- a/src/fs/gnunet-pseudonym.c
+++ b/src/fs/gnunet-pseudonym.c
@@ -341,7 +341,7 @@ main (int argc, char *const *argv)
341 0, &GNUNET_GETOPT_set_one, &no_remote_printing}, 341 0, &GNUNET_GETOPT_set_one, &no_remote_printing},
342 {'r', "replication", "LEVEL", 342 {'r', "replication", "LEVEL",
343 gettext_noop ("set the desired replication LEVEL"), 343 gettext_noop ("set the desired replication LEVEL"),
344 0, &GNUNET_GETOPT_set_uint, &bo.replication_level}, 344 1, &GNUNET_GETOPT_set_uint, &bo.replication_level},
345 {'R', "root", "ID", 345 {'R', "root", "ID",
346 gettext_noop 346 gettext_noop
347 ("specify ID of the root of the namespace"), 347 ("specify ID of the root of the namespace"),
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 2522cbe7b..acad54501 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -704,9 +704,9 @@ copy_reply (void *cls,
704 704
705 705
706/** 706/**
707 * Free the given client request. 707 * Free the given request.
708 * 708 *
709 * @param cls the client request to free 709 * @param cls the request to free
710 * @param tc task context 710 * @param tc task context
711 */ 711 */
712static void 712static void
@@ -1182,6 +1182,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1182 NULL, 0, /* replies_seen */ 1182 NULL, 0, /* replies_seen */
1183 &handle_p2p_reply, 1183 &handle_p2p_reply,
1184 peerreq); 1184 peerreq);
1185 GNUNET_assert (NULL != pr);
1185 peerreq->pr = pr; 1186 peerreq->pr = pr;
1186 GNUNET_break (GNUNET_OK == 1187 GNUNET_break (GNUNET_OK ==
1187 GNUNET_CONTAINER_multihashmap_put (cp->request_map, 1188 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
@@ -1427,7 +1428,7 @@ cancel_pending_request (void *cls,
1427 const GNUNET_HashCode *query, 1428 const GNUNET_HashCode *query,
1428 void *value) 1429 void *value)
1429{ 1430{
1430 struct PeerRequest *peerreq = cls; 1431 struct PeerRequest *peerreq = value;
1431 struct GSF_PendingRequest *pr = peerreq->pr; 1432 struct GSF_PendingRequest *pr = peerreq->pr;
1432 1433
1433 GSF_pending_request_cancel_ (pr); 1434 GSF_pending_request_cancel_ (pr);
diff --git a/src/fs/gnunet-service-fs_indexing.c b/src/fs/gnunet-service-fs_indexing.c
index cc99d3962..dc6b82952 100644
--- a/src/fs/gnunet-service-fs_indexing.c
+++ b/src/fs/gnunet-service-fs_indexing.c
@@ -566,7 +566,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key,
566 uint32_t anonymity, 566 uint32_t anonymity,
567 struct GNUNET_TIME_Absolute 567 struct GNUNET_TIME_Absolute
568 expiration, uint64_t uid, 568 expiration, uint64_t uid,
569 GNUNET_DATASTORE_Iterator cont, 569 GNUNET_DATASTORE_DatumProcessor cont,
570 void *cont_cls) 570 void *cont_cls)
571{ 571{
572 const struct OnDemandBlock *odb; 572 const struct OnDemandBlock *odb;
diff --git a/src/fs/gnunet-service-fs_indexing.h b/src/fs/gnunet-service-fs_indexing.h
index 6a2c3d4a0..e1154830b 100644
--- a/src/fs/gnunet-service-fs_indexing.h
+++ b/src/fs/gnunet-service-fs_indexing.h
@@ -63,7 +63,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key,
63 uint32_t anonymity, 63 uint32_t anonymity,
64 struct GNUNET_TIME_Absolute 64 struct GNUNET_TIME_Absolute
65 expiration, uint64_t uid, 65 expiration, uint64_t uid,
66 GNUNET_DATASTORE_Iterator cont, 66 GNUNET_DATASTORE_DatumProcessor cont,
67 void *cont_cls); 67 void *cont_cls);
68 68
69/** 69/**
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index 28036150f..4dc9de1b8 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -158,7 +158,7 @@ plan (struct PeerPlan *pp,
158 rp->transmission_counter); 158 rp->transmission_counter);
159#endif 159#endif
160 160
161 161 GNUNET_assert (rp->hn == NULL);
162 if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0) 162 if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value == 0)
163 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, 163 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
164 rp, 164 rp,
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 7406bed0f..c1074e8bf 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -100,6 +100,20 @@ struct GSF_PendingRequest
100 GNUNET_PEER_Id sender_pid; 100 GNUNET_PEER_Id sender_pid;
101 101
102 /** 102 /**
103 * Current offset for querying our local datastore for results.
104 * Starts at a random value, incremented until we get the same
105 * UID again (detected using 'first_uid'), which is then used
106 * to termiante the iteration.
107 */
108 uint64_t local_result_offset;
109
110 /**
111 * Unique ID of the first result from the local datastore;
112 * used to detect wrap-around of the offset.
113 */
114 uint64_t first_uid;
115
116 /**
103 * Number of valid entries in the 'replies_seen' array. 117 * Number of valid entries in the 'replies_seen' array.
104 */ 118 */
105 unsigned int replies_seen_count; 119 unsigned int replies_seen_count;
@@ -113,7 +127,7 @@ struct GSF_PendingRequest
113 * Mingle value we currently use for the bf. 127 * Mingle value we currently use for the bf.
114 */ 128 */
115 uint32_t mingle; 129 uint32_t mingle;
116 130
117}; 131};
118 132
119 133
@@ -273,6 +287,8 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
273 type); 287 type);
274#endif 288#endif
275 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); 289 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
290 pr->local_result_offset = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
291 UINT64_MAX);
276 pr->public_data.query = *query; 292 pr->public_data.query = *query;
277 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type) 293 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
278 { 294 {
@@ -535,7 +551,20 @@ clean_request (void *cls,
535 void *value) 551 void *value)
536{ 552{
537 struct GSF_PendingRequest *pr = value; 553 struct GSF_PendingRequest *pr = value;
538 554 GSF_LocalLookupContinuation cont;
555
556#if DEBUG_FS
557 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558 "Cleaning up pending request for `%s'.\n",
559 GNUNET_h2s (key));
560#endif
561 if (NULL != (cont = pr->llc_cont))
562 {
563 pr->llc_cont = NULL;
564 cont (pr->llc_cont_cls,
565 pr,
566 pr->local_result);
567 }
539 GSF_plan_notify_request_done_ (pr); 568 GSF_plan_notify_request_done_ (pr);
540 GNUNET_free_non_null (pr->replies_seen); 569 GNUNET_free_non_null (pr->replies_seen);
541 if (NULL != pr->bf) 570 if (NULL != pr->bf)
@@ -560,6 +589,7 @@ clean_request (void *cls,
560void 589void
561GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) 590GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
562{ 591{
592 if (NULL == pr_map) return; /* already cleaned up! */
563 GNUNET_assert (GNUNET_OK == 593 GNUNET_assert (GNUNET_OK ==
564 GNUNET_CONTAINER_multihashmap_remove (pr_map, 594 GNUNET_CONTAINER_multihashmap_remove (pr_map,
565 &pr->public_data.query, 595 &pr->public_data.query,
@@ -1023,13 +1053,22 @@ process_local_reply (void *cls,
1023 GNUNET_HashCode query; 1053 GNUNET_HashCode query;
1024 unsigned int old_rf; 1054 unsigned int old_rf;
1025 1055
1056 pr->qe = NULL;
1057 if (0 == pr->replies_seen_count)
1058 {
1059 pr->first_uid = uid;
1060 }
1061 else
1062 {
1063 if (uid == pr->first_uid)
1064 key = NULL; /* all replies seen! */
1065 }
1026 if (NULL == key) 1066 if (NULL == key)
1027 { 1067 {
1028#if DEBUG_FS 1068#if DEBUG_FS
1029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1069 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030 "No further local responses available.\n"); 1070 "No further local responses available.\n");
1031#endif 1071#endif
1032 pr->qe = NULL;
1033 if (NULL != (cont = pr->llc_cont)) 1072 if (NULL != (cont = pr->llc_cont))
1034 { 1073 {
1035 pr->llc_cont = NULL; 1074 pr->llc_cont = NULL;
@@ -1041,9 +1080,10 @@ process_local_reply (void *cls,
1041 } 1080 }
1042#if DEBUG_FS 1081#if DEBUG_FS
1043 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1082 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1044 "New local response to `%s' of type %u.\n", 1083 "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1045 GNUNET_h2s (key), 1084 GNUNET_h2s (key),
1046 type); 1085 type,
1086 (unsigned long long) uid);
1047#endif 1087#endif
1048 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 1088 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1049 { 1089 {
@@ -1061,8 +1101,22 @@ process_local_reply (void *cls,
1061 &process_local_reply, 1101 &process_local_reply,
1062 pr)) 1102 pr))
1063 { 1103 {
1064 if (pr->qe != NULL) 1104 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1065 GNUNET_DATASTORE_iterate_get_next (GSF_dsh); 1105 pr->local_result_offset - 1,
1106 &pr->public_data.query,
1107 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
1108 ? GNUNET_BLOCK_TYPE_ANY
1109 : pr->public_data.type,
1110 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1111 ? UINT_MAX
1112 : 1 /* queue priority */,
1113 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1114 ? UINT_MAX
1115 : 1 /* max queue size */,
1116 GNUNET_TIME_UNIT_FOREVER_REL,
1117 &process_local_reply,
1118 pr);
1119 GNUNET_assert (NULL != pr->qe);
1066 } 1120 }
1067 return; 1121 return;
1068 } 1122 }
@@ -1085,7 +1139,22 @@ process_local_reply (void *cls,
1085 -1, -1, 1139 -1, -1,
1086 GNUNET_TIME_UNIT_FOREVER_REL, 1140 GNUNET_TIME_UNIT_FOREVER_REL,
1087 NULL, NULL); 1141 NULL, NULL);
1088 GNUNET_DATASTORE_iterate_get_next (GSF_dsh); 1142 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1143 pr->local_result_offset - 1,
1144 &pr->public_data.query,
1145 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
1146 ? GNUNET_BLOCK_TYPE_ANY
1147 : pr->public_data.type,
1148 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1149 ? UINT_MAX
1150 : 1 /* queue priority */,
1151 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1152 ? UINT_MAX
1153 : 1 /* max queue size */,
1154 GNUNET_TIME_UNIT_FOREVER_REL,
1155 &process_local_reply,
1156 pr);
1157 GNUNET_assert (NULL != pr->qe);
1089 return; 1158 return;
1090 } 1159 }
1091 prq.type = type; 1160 prq.type = type;
@@ -1097,12 +1166,16 @@ process_local_reply (void *cls,
1097 GSF_update_datastore_delay_ (pr->public_data.start_time); 1166 GSF_update_datastore_delay_ (pr->public_data.start_time);
1098 process_reply (&prq, key, pr); 1167 process_reply (&prq, key, pr);
1099 pr->local_result = prq.eval; 1168 pr->local_result = prq.eval;
1100 if (pr->qe == NULL) 1169 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
1101 { 1170 {
1102#if DEBUG_FS 1171 if (NULL != (cont = pr->llc_cont))
1103 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1172 {
1104 "Request cancelled, not asking datastore for more\n"); 1173 pr->llc_cont = NULL;
1105#endif 1174 cont (pr->llc_cont_cls,
1175 pr,
1176 pr->local_result);
1177 }
1178 return;
1106 } 1179 }
1107 if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && 1180 if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1108 ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) || 1181 ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
@@ -1116,8 +1189,6 @@ process_local_reply (void *cls,
1116 gettext_noop ("# processing result set cut short due to load"), 1189 gettext_noop ("# processing result set cut short due to load"),
1117 1, 1190 1,
1118 GNUNET_NO); 1191 GNUNET_NO);
1119 GNUNET_DATASTORE_cancel (pr->qe);
1120 pr->qe = NULL;
1121 if (NULL != (cont = pr->llc_cont)) 1192 if (NULL != (cont = pr->llc_cont))
1122 { 1193 {
1123 pr->llc_cont = NULL; 1194 pr->llc_cont = NULL;
@@ -1127,7 +1198,22 @@ process_local_reply (void *cls,
1127 } 1198 }
1128 return; 1199 return;
1129 } 1200 }
1130 GNUNET_DATASTORE_iterate_get_next (GSF_dsh); 1201 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1202 pr->local_result_offset++,
1203 &pr->public_data.query,
1204 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
1205 ? GNUNET_BLOCK_TYPE_ANY
1206 : pr->public_data.type,
1207 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1208 ? UINT_MAX
1209 : 1 /* queue priority */,
1210 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1211 ? UINT_MAX
1212 : 1 /* max queue size */,
1213 GNUNET_TIME_UNIT_FOREVER_REL,
1214 &process_local_reply,
1215 pr);
1216 GNUNET_assert (NULL != pr->qe);
1131} 1217}
1132 1218
1133 1219
@@ -1147,20 +1233,21 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1147 GNUNET_assert (NULL == pr->llc_cont); 1233 GNUNET_assert (NULL == pr->llc_cont);
1148 pr->llc_cont = cont; 1234 pr->llc_cont = cont;
1149 pr->llc_cont_cls = cont_cls; 1235 pr->llc_cont_cls = cont_cls;
1150 pr->qe = GNUNET_DATASTORE_iterate_key (GSF_dsh, 1236 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1151 &pr->public_data.query, 1237 pr->local_result_offset++,
1152 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 1238 &pr->public_data.query,
1153 ? GNUNET_BLOCK_TYPE_ANY 1239 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
1154 : pr->public_data.type, 1240 ? GNUNET_BLOCK_TYPE_ANY
1155 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) 1241 : pr->public_data.type,
1156 ? UINT_MAX 1242 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1157 : 1 /* queue priority */, 1243 ? UINT_MAX
1158 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) 1244 : 1 /* queue priority */,
1159 ? UINT_MAX 1245 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1160 : 1 /* max queue size */, 1246 ? UINT_MAX
1161 GNUNET_TIME_UNIT_FOREVER_REL, 1247 : 1 /* max queue size */,
1162 &process_local_reply, 1248 GNUNET_TIME_UNIT_FOREVER_REL,
1163 pr); 1249 &process_local_reply,
1250 pr);
1164} 1251}
1165 1252
1166 1253
diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c
index 121a90bcd..b15207ce8 100644
--- a/src/fs/gnunet-service-fs_put.c
+++ b/src/fs/gnunet-service-fs_put.c
@@ -35,25 +35,50 @@
35 35
36 36
37/** 37/**
38 * Request to datastore for DHT PUTs (or NULL). 38 * Context for each zero-anonymity iterator.
39 */ 39 */
40static struct GNUNET_DATASTORE_QueueEntry *dht_qe; 40struct PutOperator
41{
41 42
42/** 43 /**
43 * Type we will request for the next DHT PUT round from the datastore. 44 * Request to datastore for DHT PUTs (or NULL).
44 */ 45 */
45static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; 46 struct GNUNET_DATASTORE_QueueEntry *dht_qe;
47
48 /**
49 * Type we request from the datastore.
50 */
51 enum GNUNET_BLOCK_Type dht_put_type;
52
53 /**
54 * ID of task that collects blocks for DHT PUTs.
55 */
56 GNUNET_SCHEDULER_TaskIdentifier dht_task;
57
58 /**
59 * How many entires with zero anonymity of our type do we currently
60 * estimate to have in the database?
61 */
62 uint64_t zero_anonymity_count_estimate;
63
64 /**
65 * Current offset when iterating the database.
66 */
67 uint64_t current_offset;
68};
46 69
47/**
48 * ID of task that collects blocks for DHT PUTs.
49 */
50static GNUNET_SCHEDULER_TaskIdentifier dht_task;
51 70
52/** 71/**
53 * How many entires with zero anonymity do we currently estimate 72 * ANY-terminated list of our operators (one per type
54 * to have in the database? 73 * of block that we're putting into the DHT).
55 */ 74 */
56static unsigned int zero_anonymity_count_estimate; 75static struct PutOperator operators[] =
76 {
77 { NULL, GNUNET_BLOCK_TYPE_FS_KBLOCK, 0, 0, 0 },
78 { NULL, GNUNET_BLOCK_TYPE_FS_SBLOCK, 0, 0, 0 },
79 { NULL, GNUNET_BLOCK_TYPE_FS_NBLOCK, 0, 0, 0 },
80 { NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0 }
81 };
57 82
58 83
59/** 84/**
@@ -67,26 +92,26 @@ gather_dht_put_blocks (void *cls,
67 const struct GNUNET_SCHEDULER_TaskContext *tc); 92 const struct GNUNET_SCHEDULER_TaskContext *tc);
68 93
69 94
70
71/** 95/**
72 * If the DHT PUT gathering task is not currently running, consider 96 * Task that is run periodically to obtain blocks for DHT PUTs.
73 * (re)scheduling it with the appropriate delay. 97 *
98 * @param cls type of blocks to gather
99 * @param tc scheduler context (unused)
74 */ 100 */
75static void 101static void
76consider_dht_put_gathering (void *cls) 102delay_dht_put_blocks (void *cls,
103 const struct GNUNET_SCHEDULER_TaskContext *tc)
77{ 104{
105 struct PutOperator *po = cls;
78 struct GNUNET_TIME_Relative delay; 106 struct GNUNET_TIME_Relative delay;
79 107
80 if (GSF_dsh == NULL) 108 po->dht_task = GNUNET_SCHEDULER_NO_TASK;
81 return; 109 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
82 if (dht_qe != NULL)
83 return; 110 return;
84 if (dht_task != GNUNET_SCHEDULER_NO_TASK) 111 if (po->zero_anonymity_count_estimate > 0)
85 return;
86 if (zero_anonymity_count_estimate > 0)
87 { 112 {
88 delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY, 113 delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
89 zero_anonymity_count_estimate); 114 po->zero_anonymity_count_estimate);
90 delay = GNUNET_TIME_relative_min (delay, 115 delay = GNUNET_TIME_relative_min (delay,
91 MAX_DHT_PUT_FREQ); 116 MAX_DHT_PUT_FREQ);
92 } 117 }
@@ -96,20 +121,9 @@ consider_dht_put_gathering (void *cls)
96 (hopefully) appear */ 121 (hopefully) appear */
97 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); 122 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
98 } 123 }
99 dht_task = GNUNET_SCHEDULER_add_delayed (delay, 124 po->dht_task = GNUNET_SCHEDULER_add_delayed (delay,
100 &gather_dht_put_blocks, 125 &gather_dht_put_blocks,
101 cls); 126 po);
102}
103
104
105/**
106 * Function called upon completion of the DHT PUT operation.
107 */
108static void
109dht_put_continuation (void *cls,
110 const struct GNUNET_SCHEDULER_TaskContext *tc)
111{
112 GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
113} 127}
114 128
115 129
@@ -138,31 +152,19 @@ process_dht_put_content (void *cls,
138 struct GNUNET_TIME_Absolute 152 struct GNUNET_TIME_Absolute
139 expiration, uint64_t uid) 153 expiration, uint64_t uid)
140{ 154{
141 static unsigned int counter; 155 struct PutOperator *po = cls;
142 static GNUNET_HashCode last_vhash;
143 static GNUNET_HashCode vhash;
144 156
157 po->dht_qe = NULL;
145 if (key == NULL) 158 if (key == NULL)
146 { 159 {
147 dht_qe = NULL; 160 po->zero_anonymity_count_estimate = po->current_offset - 1;
148 consider_dht_put_gathering (cls); 161 po->current_offset = 0;
162 po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks,
163 po);
149 return; 164 return;
150 } 165 }
151 /* slightly funky code to estimate the total number of values with zero 166 po->zero_anonymity_count_estimate = GNUNET_MAX (po->current_offset,
152 anonymity from the maximum observed length of a monotonically increasing 167 po->zero_anonymity_count_estimate);
153 sequence of hashes over the contents */
154 GNUNET_CRYPTO_hash (data, size, &vhash);
155 if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
156 {
157 if (zero_anonymity_count_estimate > 0)
158 zero_anonymity_count_estimate /= 2;
159 counter = 0;
160 }
161 last_vhash = vhash;
162 if (counter < 31)
163 counter++;
164 if (zero_anonymity_count_estimate < (1 << counter))
165 zero_anonymity_count_estimate = (1 << counter);
166#if DEBUG_FS 168#if DEBUG_FS
167 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 169 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
168 "Retrieved block `%s' of type %u for DHT PUT\n", 170 "Retrieved block `%s' of type %u for DHT PUT\n",
@@ -178,8 +180,8 @@ process_dht_put_content (void *cls,
178 data, 180 data,
179 expiration, 181 expiration,
180 GNUNET_TIME_UNIT_FOREVER_REL, 182 GNUNET_TIME_UNIT_FOREVER_REL,
181 &dht_put_continuation, 183 &delay_dht_put_blocks,
182 cls); 184 po);
183} 185}
184 186
185 187
@@ -193,17 +195,20 @@ static void
193gather_dht_put_blocks (void *cls, 195gather_dht_put_blocks (void *cls,
194 const struct GNUNET_SCHEDULER_TaskContext *tc) 196 const struct GNUNET_SCHEDULER_TaskContext *tc)
195{ 197{
196 dht_task = GNUNET_SCHEDULER_NO_TASK; 198 struct PutOperator *po = cls;
197 if (GSF_dsh == NULL) 199
200 po->dht_task = GNUNET_SCHEDULER_NO_TASK;
201 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
198 return; 202 return;
199 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 203 po->dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh,
200 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; 204 po->current_offset++,
201 dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (GSF_dsh,
202 0, UINT_MAX, 205 0, UINT_MAX,
203 GNUNET_TIME_UNIT_FOREVER_REL, 206 GNUNET_TIME_UNIT_FOREVER_REL,
204 dht_put_type++, 207 po->dht_put_type,
205 &process_dht_put_content, NULL); 208 &process_dht_put_content, po);
206 GNUNET_assert (dht_qe != NULL); 209 if (NULL == po->dht_qe)
210 po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks,
211 po);
207} 212}
208 213
209 214
@@ -213,7 +218,14 @@ gather_dht_put_blocks (void *cls,
213void 218void
214GSF_put_init_ () 219GSF_put_init_ ()
215{ 220{
216 dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, NULL); 221 unsigned int i;
222
223 i = 0;
224 while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY)
225 {
226 operators[i].dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]);
227 i++;
228 }
217} 229}
218 230
219 231
@@ -223,15 +235,23 @@ GSF_put_init_ ()
223void 235void
224GSF_put_done_ () 236GSF_put_done_ ()
225{ 237{
226 if (GNUNET_SCHEDULER_NO_TASK != dht_task) 238 struct PutOperator *po;
227 { 239 unsigned int i;
228 GNUNET_SCHEDULER_cancel (dht_task); 240
229 dht_task = GNUNET_SCHEDULER_NO_TASK; 241 i = 0;
230 } 242 while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY)
231 if (NULL != dht_qe)
232 { 243 {
233 GNUNET_DATASTORE_cancel (dht_qe); 244 if (GNUNET_SCHEDULER_NO_TASK != po->dht_task)
234 dht_qe = NULL; 245 {
246 GNUNET_SCHEDULER_cancel (po->dht_task);
247 po->dht_task = GNUNET_SCHEDULER_NO_TASK;
248 }
249 if (NULL != po->dht_qe)
250 {
251 GNUNET_DATASTORE_cancel (po->dht_qe);
252 po->dht_qe = NULL;
253 }
254 i++;
235 } 255 }
236} 256}
237 257
diff --git a/src/fs/test_fs_download_data.conf b/src/fs/test_fs_download_data.conf
index 0a7eb311a..6bbae9dc9 100644
--- a/src/fs/test_fs_download_data.conf
+++ b/src/fs/test_fs_download_data.conf
@@ -36,7 +36,8 @@ HOSTNAME = localhost
36[fs] 36[fs]
37PORT = 42471 37PORT = 42471
38HOSTNAME = localhost 38HOSTNAME = localhost
39ACTIVEMIGRATION = NO 39CONTENT_CACHING = NO
40CONTENT_PUSHING = NO
40# DEBUG = YES 41# DEBUG = YES
41#PREFIX = valgrind --tool=memcheck --leak-check=yes 42#PREFIX = valgrind --tool=memcheck --leak-check=yes
42#BINARY = /home/grothoff/bin/gnunet-service-fs 43#BINARY = /home/grothoff/bin/gnunet-service-fs
diff --git a/src/fs/test_gnunet_fs_idx.py.in b/src/fs/test_gnunet_fs_idx.py.in
index 3bb3681c6..c97ffd883 100755
--- a/src/fs/test_gnunet_fs_idx.py.in
+++ b/src/fs/test_gnunet_fs_idx.py.in
@@ -31,7 +31,7 @@ try:
31 pub.expect ("URI is `gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147'.\r") 31 pub.expect ("URI is `gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147'.\r")
32 pub.expect (pexpect.EOF) 32 pub.expect (pexpect.EOF)
33 33
34 down = pexpect.spawn ('gnunet-download -c test_gnunet_fs_idx_data.conf -o \"COPYING\" gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147') 34 down = pexpect.spawn ('gnunet-download -c test_gnunet_fs_idx_data.conf -o COPYING gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147')
35 down.expect (re.compile ("Downloading `COPYING\' done \(.*\).\r")); 35 down.expect (re.compile ("Downloading `COPYING\' done \(.*\).\r"));
36 down.expect (pexpect.EOF); 36 down.expect (pexpect.EOF);
37 os.system ('rm COPYING'); 37 os.system ('rm COPYING');
diff --git a/src/fs/test_gnunet_fs_ns_data.conf b/src/fs/test_gnunet_fs_ns_data.conf
index 65bac0a15..2086cd0fd 100644
--- a/src/fs/test_gnunet_fs_ns_data.conf
+++ b/src/fs/test_gnunet_fs_ns_data.conf
@@ -36,7 +36,7 @@ HOSTNAME = localhost
36[fs] 36[fs]
37PORT = 47471 37PORT = 47471
38HOSTNAME = localhost 38HOSTNAME = localhost
39#DEBUG = YES 39DEBUG = YES
40#PREFIX = valgrind --tool=memcheck --leak-check=yes 40#PREFIX = valgrind --tool=memcheck --leak-check=yes
41#BINARY = /home/grothoff/bin/gnunet-service-fs 41#BINARY = /home/grothoff/bin/gnunet-service-fs
42 42
diff --git a/src/fs/test_gnunet_service_fs_migration_data.conf b/src/fs/test_gnunet_service_fs_migration_data.conf
index a72a98e97..3ab61d76c 100644
--- a/src/fs/test_gnunet_service_fs_migration_data.conf
+++ b/src/fs/test_gnunet_service_fs_migration_data.conf
@@ -53,7 +53,7 @@ HOSTNAME = localhost
53ACTIVEMIGRATION = YES 53ACTIVEMIGRATION = YES
54CONTENT_CACHING = YES 54CONTENT_CACHING = YES
55CONTENT_PUSHING = YES 55CONTENT_PUSHING = YES
56DEBUG = YES 56#DEBUG = YES
57#PREFIX = valgrind --tool=memcheck --leak-check=yes 57#PREFIX = valgrind --tool=memcheck --leak-check=yes
58#PREFIX = xterm -e gdb -x cmd --args 58#PREFIX = xterm -e gdb -x cmd --args
59 59