diff options
Diffstat (limited to 'src/fs/gnunet-service-fs_put.c')
-rw-r--r-- | src/fs/gnunet-service-fs_put.c | 174 |
1 files changed, 97 insertions, 77 deletions
diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c index 121a90bcd..b15207ce8 100644 --- a/src/fs/gnunet-service-fs_put.c +++ b/src/fs/gnunet-service-fs_put.c | |||
@@ -35,25 +35,50 @@ | |||
35 | 35 | ||
36 | 36 | ||
37 | /** | 37 | /** |
38 | * Request to datastore for DHT PUTs (or NULL). | 38 | * Context for each zero-anonymity iterator. |
39 | */ | 39 | */ |
40 | static struct GNUNET_DATASTORE_QueueEntry *dht_qe; | 40 | struct PutOperator |
41 | { | ||
41 | 42 | ||
42 | /** | 43 | /** |
43 | * Type we will request for the next DHT PUT round from the datastore. | 44 | * Request to datastore for DHT PUTs (or NULL). |
44 | */ | 45 | */ |
45 | static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; | 46 | struct GNUNET_DATASTORE_QueueEntry *dht_qe; |
47 | |||
48 | /** | ||
49 | * Type we request from the datastore. | ||
50 | */ | ||
51 | enum GNUNET_BLOCK_Type dht_put_type; | ||
52 | |||
53 | /** | ||
54 | * ID of task that collects blocks for DHT PUTs. | ||
55 | */ | ||
56 | GNUNET_SCHEDULER_TaskIdentifier dht_task; | ||
57 | |||
58 | /** | ||
59 | * How many entires with zero anonymity of our type do we currently | ||
60 | * estimate to have in the database? | ||
61 | */ | ||
62 | uint64_t zero_anonymity_count_estimate; | ||
63 | |||
64 | /** | ||
65 | * Current offset when iterating the database. | ||
66 | */ | ||
67 | uint64_t current_offset; | ||
68 | }; | ||
46 | 69 | ||
47 | /** | ||
48 | * ID of task that collects blocks for DHT PUTs. | ||
49 | */ | ||
50 | static GNUNET_SCHEDULER_TaskIdentifier dht_task; | ||
51 | 70 | ||
52 | /** | 71 | /** |
53 | * How many entires with zero anonymity do we currently estimate | 72 | * ANY-terminated list of our operators (one per type |
54 | * to have in the database? | 73 | * of block that we're putting into the DHT). |
55 | */ | 74 | */ |
56 | static unsigned int zero_anonymity_count_estimate; | 75 | static struct PutOperator operators[] = |
76 | { | ||
77 | { NULL, GNUNET_BLOCK_TYPE_FS_KBLOCK, 0, 0, 0 }, | ||
78 | { NULL, GNUNET_BLOCK_TYPE_FS_SBLOCK, 0, 0, 0 }, | ||
79 | { NULL, GNUNET_BLOCK_TYPE_FS_NBLOCK, 0, 0, 0 }, | ||
80 | { NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0 } | ||
81 | }; | ||
57 | 82 | ||
58 | 83 | ||
59 | /** | 84 | /** |
@@ -67,26 +92,26 @@ gather_dht_put_blocks (void *cls, | |||
67 | const struct GNUNET_SCHEDULER_TaskContext *tc); | 92 | const struct GNUNET_SCHEDULER_TaskContext *tc); |
68 | 93 | ||
69 | 94 | ||
70 | |||
71 | /** | 95 | /** |
72 | * If the DHT PUT gathering task is not currently running, consider | 96 | * Task that is run periodically to obtain blocks for DHT PUTs. |
73 | * (re)scheduling it with the appropriate delay. | 97 | * |
98 | * @param cls type of blocks to gather | ||
99 | * @param tc scheduler context (unused) | ||
74 | */ | 100 | */ |
75 | static void | 101 | static void |
76 | consider_dht_put_gathering (void *cls) | 102 | delay_dht_put_blocks (void *cls, |
103 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
77 | { | 104 | { |
105 | struct PutOperator *po = cls; | ||
78 | struct GNUNET_TIME_Relative delay; | 106 | struct GNUNET_TIME_Relative delay; |
79 | 107 | ||
80 | if (GSF_dsh == NULL) | 108 | po->dht_task = GNUNET_SCHEDULER_NO_TASK; |
81 | return; | 109 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) |
82 | if (dht_qe != NULL) | ||
83 | return; | 110 | return; |
84 | if (dht_task != GNUNET_SCHEDULER_NO_TASK) | 111 | if (po->zero_anonymity_count_estimate > 0) |
85 | return; | ||
86 | if (zero_anonymity_count_estimate > 0) | ||
87 | { | 112 | { |
88 | delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY, | 113 | delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY, |
89 | zero_anonymity_count_estimate); | 114 | po->zero_anonymity_count_estimate); |
90 | delay = GNUNET_TIME_relative_min (delay, | 115 | delay = GNUNET_TIME_relative_min (delay, |
91 | MAX_DHT_PUT_FREQ); | 116 | MAX_DHT_PUT_FREQ); |
92 | } | 117 | } |
@@ -96,20 +121,9 @@ consider_dht_put_gathering (void *cls) | |||
96 | (hopefully) appear */ | 121 | (hopefully) appear */ |
97 | delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); | 122 | delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); |
98 | } | 123 | } |
99 | dht_task = GNUNET_SCHEDULER_add_delayed (delay, | 124 | po->dht_task = GNUNET_SCHEDULER_add_delayed (delay, |
100 | &gather_dht_put_blocks, | 125 | &gather_dht_put_blocks, |
101 | cls); | 126 | po); |
102 | } | ||
103 | |||
104 | |||
105 | /** | ||
106 | * Function called upon completion of the DHT PUT operation. | ||
107 | */ | ||
108 | static void | ||
109 | dht_put_continuation (void *cls, | ||
110 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
111 | { | ||
112 | GNUNET_DATASTORE_iterate_get_next (GSF_dsh); | ||
113 | } | 127 | } |
114 | 128 | ||
115 | 129 | ||
@@ -138,31 +152,19 @@ process_dht_put_content (void *cls, | |||
138 | struct GNUNET_TIME_Absolute | 152 | struct GNUNET_TIME_Absolute |
139 | expiration, uint64_t uid) | 153 | expiration, uint64_t uid) |
140 | { | 154 | { |
141 | static unsigned int counter; | 155 | struct PutOperator *po = cls; |
142 | static GNUNET_HashCode last_vhash; | ||
143 | static GNUNET_HashCode vhash; | ||
144 | 156 | ||
157 | po->dht_qe = NULL; | ||
145 | if (key == NULL) | 158 | if (key == NULL) |
146 | { | 159 | { |
147 | dht_qe = NULL; | 160 | po->zero_anonymity_count_estimate = po->current_offset - 1; |
148 | consider_dht_put_gathering (cls); | 161 | po->current_offset = 0; |
162 | po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, | ||
163 | po); | ||
149 | return; | 164 | return; |
150 | } | 165 | } |
151 | /* slightly funky code to estimate the total number of values with zero | 166 | po->zero_anonymity_count_estimate = GNUNET_MAX (po->current_offset, |
152 | anonymity from the maximum observed length of a monotonically increasing | 167 | po->zero_anonymity_count_estimate); |
153 | sequence of hashes over the contents */ | ||
154 | GNUNET_CRYPTO_hash (data, size, &vhash); | ||
155 | if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0) | ||
156 | { | ||
157 | if (zero_anonymity_count_estimate > 0) | ||
158 | zero_anonymity_count_estimate /= 2; | ||
159 | counter = 0; | ||
160 | } | ||
161 | last_vhash = vhash; | ||
162 | if (counter < 31) | ||
163 | counter++; | ||
164 | if (zero_anonymity_count_estimate < (1 << counter)) | ||
165 | zero_anonymity_count_estimate = (1 << counter); | ||
166 | #if DEBUG_FS | 168 | #if DEBUG_FS |
167 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 169 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
168 | "Retrieved block `%s' of type %u for DHT PUT\n", | 170 | "Retrieved block `%s' of type %u for DHT PUT\n", |
@@ -178,8 +180,8 @@ process_dht_put_content (void *cls, | |||
178 | data, | 180 | data, |
179 | expiration, | 181 | expiration, |
180 | GNUNET_TIME_UNIT_FOREVER_REL, | 182 | GNUNET_TIME_UNIT_FOREVER_REL, |
181 | &dht_put_continuation, | 183 | &delay_dht_put_blocks, |
182 | cls); | 184 | po); |
183 | } | 185 | } |
184 | 186 | ||
185 | 187 | ||
@@ -193,17 +195,20 @@ static void | |||
193 | gather_dht_put_blocks (void *cls, | 195 | gather_dht_put_blocks (void *cls, |
194 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 196 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
195 | { | 197 | { |
196 | dht_task = GNUNET_SCHEDULER_NO_TASK; | 198 | struct PutOperator *po = cls; |
197 | if (GSF_dsh == NULL) | 199 | |
200 | po->dht_task = GNUNET_SCHEDULER_NO_TASK; | ||
201 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
198 | return; | 202 | return; |
199 | if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) | 203 | po->dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, |
200 | dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; | 204 | po->current_offset++, |
201 | dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (GSF_dsh, | ||
202 | 0, UINT_MAX, | 205 | 0, UINT_MAX, |
203 | GNUNET_TIME_UNIT_FOREVER_REL, | 206 | GNUNET_TIME_UNIT_FOREVER_REL, |
204 | dht_put_type++, | 207 | po->dht_put_type, |
205 | &process_dht_put_content, NULL); | 208 | &process_dht_put_content, po); |
206 | GNUNET_assert (dht_qe != NULL); | 209 | if (NULL == po->dht_qe) |
210 | po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks, | ||
211 | po); | ||
207 | } | 212 | } |
208 | 213 | ||
209 | 214 | ||
@@ -213,7 +218,14 @@ gather_dht_put_blocks (void *cls, | |||
213 | void | 218 | void |
214 | GSF_put_init_ () | 219 | GSF_put_init_ () |
215 | { | 220 | { |
216 | dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, NULL); | 221 | unsigned int i; |
222 | |||
223 | i = 0; | ||
224 | while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY) | ||
225 | { | ||
226 | operators[i].dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]); | ||
227 | i++; | ||
228 | } | ||
217 | } | 229 | } |
218 | 230 | ||
219 | 231 | ||
@@ -223,15 +235,23 @@ GSF_put_init_ () | |||
223 | void | 235 | void |
224 | GSF_put_done_ () | 236 | GSF_put_done_ () |
225 | { | 237 | { |
226 | if (GNUNET_SCHEDULER_NO_TASK != dht_task) | 238 | struct PutOperator *po; |
227 | { | 239 | unsigned int i; |
228 | GNUNET_SCHEDULER_cancel (dht_task); | 240 | |
229 | dht_task = GNUNET_SCHEDULER_NO_TASK; | 241 | i = 0; |
230 | } | 242 | while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY) |
231 | if (NULL != dht_qe) | ||
232 | { | 243 | { |
233 | GNUNET_DATASTORE_cancel (dht_qe); | 244 | if (GNUNET_SCHEDULER_NO_TASK != po->dht_task) |
234 | dht_qe = NULL; | 245 | { |
246 | GNUNET_SCHEDULER_cancel (po->dht_task); | ||
247 | po->dht_task = GNUNET_SCHEDULER_NO_TASK; | ||
248 | } | ||
249 | if (NULL != po->dht_qe) | ||
250 | { | ||
251 | GNUNET_DATASTORE_cancel (po->dht_qe); | ||
252 | po->dht_qe = NULL; | ||
253 | } | ||
254 | i++; | ||
235 | } | 255 | } |
236 | } | 256 | } |
237 | 257 | ||