aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_put.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_put.c')
-rw-r--r--src/fs/gnunet-service-fs_put.c174
1 files changed, 97 insertions, 77 deletions
diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c
index 121a90bcd..b15207ce8 100644
--- a/src/fs/gnunet-service-fs_put.c
+++ b/src/fs/gnunet-service-fs_put.c
@@ -35,25 +35,50 @@
35 35
36 36
37/** 37/**
38 * Request to datastore for DHT PUTs (or NULL). 38 * Context for each zero-anonymity iterator.
39 */ 39 */
40static struct GNUNET_DATASTORE_QueueEntry *dht_qe; 40struct PutOperator
41{
41 42
42/** 43 /**
43 * Type we will request for the next DHT PUT round from the datastore. 44 * Request to datastore for DHT PUTs (or NULL).
44 */ 45 */
45static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; 46 struct GNUNET_DATASTORE_QueueEntry *dht_qe;
47
48 /**
49 * Type we request from the datastore.
50 */
51 enum GNUNET_BLOCK_Type dht_put_type;
52
53 /**
54 * ID of task that collects blocks for DHT PUTs.
55 */
56 GNUNET_SCHEDULER_TaskIdentifier dht_task;
57
58 /**
59 * How many entires with zero anonymity of our type do we currently
60 * estimate to have in the database?
61 */
62 uint64_t zero_anonymity_count_estimate;
63
64 /**
65 * Current offset when iterating the database.
66 */
67 uint64_t current_offset;
68};
46 69
47/**
48 * ID of task that collects blocks for DHT PUTs.
49 */
50static GNUNET_SCHEDULER_TaskIdentifier dht_task;
51 70
52/** 71/**
53 * How many entires with zero anonymity do we currently estimate 72 * ANY-terminated list of our operators (one per type
54 * to have in the database? 73 * of block that we're putting into the DHT).
55 */ 74 */
56static unsigned int zero_anonymity_count_estimate; 75static struct PutOperator operators[] =
76 {
77 { NULL, GNUNET_BLOCK_TYPE_FS_KBLOCK, 0, 0, 0 },
78 { NULL, GNUNET_BLOCK_TYPE_FS_SBLOCK, 0, 0, 0 },
79 { NULL, GNUNET_BLOCK_TYPE_FS_NBLOCK, 0, 0, 0 },
80 { NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0 }
81 };
57 82
58 83
59/** 84/**
@@ -67,26 +92,26 @@ gather_dht_put_blocks (void *cls,
67 const struct GNUNET_SCHEDULER_TaskContext *tc); 92 const struct GNUNET_SCHEDULER_TaskContext *tc);
68 93
69 94
70
71/** 95/**
72 * If the DHT PUT gathering task is not currently running, consider 96 * Task that is run periodically to obtain blocks for DHT PUTs.
73 * (re)scheduling it with the appropriate delay. 97 *
98 * @param cls type of blocks to gather
99 * @param tc scheduler context (unused)
74 */ 100 */
75static void 101static void
76consider_dht_put_gathering (void *cls) 102delay_dht_put_blocks (void *cls,
103 const struct GNUNET_SCHEDULER_TaskContext *tc)
77{ 104{
105 struct PutOperator *po = cls;
78 struct GNUNET_TIME_Relative delay; 106 struct GNUNET_TIME_Relative delay;
79 107
80 if (GSF_dsh == NULL) 108 po->dht_task = GNUNET_SCHEDULER_NO_TASK;
81 return; 109 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
82 if (dht_qe != NULL)
83 return; 110 return;
84 if (dht_task != GNUNET_SCHEDULER_NO_TASK) 111 if (po->zero_anonymity_count_estimate > 0)
85 return;
86 if (zero_anonymity_count_estimate > 0)
87 { 112 {
88 delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY, 113 delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
89 zero_anonymity_count_estimate); 114 po->zero_anonymity_count_estimate);
90 delay = GNUNET_TIME_relative_min (delay, 115 delay = GNUNET_TIME_relative_min (delay,
91 MAX_DHT_PUT_FREQ); 116 MAX_DHT_PUT_FREQ);
92 } 117 }
@@ -96,20 +121,9 @@ consider_dht_put_gathering (void *cls)
96 (hopefully) appear */ 121 (hopefully) appear */
97 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); 122 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
98 } 123 }
99 dht_task = GNUNET_SCHEDULER_add_delayed (delay, 124 po->dht_task = GNUNET_SCHEDULER_add_delayed (delay,
100 &gather_dht_put_blocks, 125 &gather_dht_put_blocks,
101 cls); 126 po);
102}
103
104
105/**
106 * Function called upon completion of the DHT PUT operation.
107 */
108static void
109dht_put_continuation (void *cls,
110 const struct GNUNET_SCHEDULER_TaskContext *tc)
111{
112 GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
113} 127}
114 128
115 129
@@ -138,31 +152,19 @@ process_dht_put_content (void *cls,
138 struct GNUNET_TIME_Absolute 152 struct GNUNET_TIME_Absolute
139 expiration, uint64_t uid) 153 expiration, uint64_t uid)
140{ 154{
141 static unsigned int counter; 155 struct PutOperator *po = cls;
142 static GNUNET_HashCode last_vhash;
143 static GNUNET_HashCode vhash;
144 156
157 po->dht_qe = NULL;
145 if (key == NULL) 158 if (key == NULL)
146 { 159 {
147 dht_qe = NULL; 160 po->zero_anonymity_count_estimate = po->current_offset - 1;
148 consider_dht_put_gathering (cls); 161 po->current_offset = 0;
162 po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks,
163 po);
149 return; 164 return;
150 } 165 }
151 /* slightly funky code to estimate the total number of values with zero 166 po->zero_anonymity_count_estimate = GNUNET_MAX (po->current_offset,
152 anonymity from the maximum observed length of a monotonically increasing 167 po->zero_anonymity_count_estimate);
153 sequence of hashes over the contents */
154 GNUNET_CRYPTO_hash (data, size, &vhash);
155 if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
156 {
157 if (zero_anonymity_count_estimate > 0)
158 zero_anonymity_count_estimate /= 2;
159 counter = 0;
160 }
161 last_vhash = vhash;
162 if (counter < 31)
163 counter++;
164 if (zero_anonymity_count_estimate < (1 << counter))
165 zero_anonymity_count_estimate = (1 << counter);
166#if DEBUG_FS 168#if DEBUG_FS
167 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 169 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
168 "Retrieved block `%s' of type %u for DHT PUT\n", 170 "Retrieved block `%s' of type %u for DHT PUT\n",
@@ -178,8 +180,8 @@ process_dht_put_content (void *cls,
178 data, 180 data,
179 expiration, 181 expiration,
180 GNUNET_TIME_UNIT_FOREVER_REL, 182 GNUNET_TIME_UNIT_FOREVER_REL,
181 &dht_put_continuation, 183 &delay_dht_put_blocks,
182 cls); 184 po);
183} 185}
184 186
185 187
@@ -193,17 +195,20 @@ static void
193gather_dht_put_blocks (void *cls, 195gather_dht_put_blocks (void *cls,
194 const struct GNUNET_SCHEDULER_TaskContext *tc) 196 const struct GNUNET_SCHEDULER_TaskContext *tc)
195{ 197{
196 dht_task = GNUNET_SCHEDULER_NO_TASK; 198 struct PutOperator *po = cls;
197 if (GSF_dsh == NULL) 199
200 po->dht_task = GNUNET_SCHEDULER_NO_TASK;
201 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
198 return; 202 return;
199 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 203 po->dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh,
200 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK; 204 po->current_offset++,
201 dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (GSF_dsh,
202 0, UINT_MAX, 205 0, UINT_MAX,
203 GNUNET_TIME_UNIT_FOREVER_REL, 206 GNUNET_TIME_UNIT_FOREVER_REL,
204 dht_put_type++, 207 po->dht_put_type,
205 &process_dht_put_content, NULL); 208 &process_dht_put_content, po);
206 GNUNET_assert (dht_qe != NULL); 209 if (NULL == po->dht_qe)
210 po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks,
211 po);
207} 212}
208 213
209 214
@@ -213,7 +218,14 @@ gather_dht_put_blocks (void *cls,
213void 218void
214GSF_put_init_ () 219GSF_put_init_ ()
215{ 220{
216 dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, NULL); 221 unsigned int i;
222
223 i = 0;
224 while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY)
225 {
226 operators[i].dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]);
227 i++;
228 }
217} 229}
218 230
219 231
@@ -223,15 +235,23 @@ GSF_put_init_ ()
223void 235void
224GSF_put_done_ () 236GSF_put_done_ ()
225{ 237{
226 if (GNUNET_SCHEDULER_NO_TASK != dht_task) 238 struct PutOperator *po;
227 { 239 unsigned int i;
228 GNUNET_SCHEDULER_cancel (dht_task); 240
229 dht_task = GNUNET_SCHEDULER_NO_TASK; 241 i = 0;
230 } 242 while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY)
231 if (NULL != dht_qe)
232 { 243 {
233 GNUNET_DATASTORE_cancel (dht_qe); 244 if (GNUNET_SCHEDULER_NO_TASK != po->dht_task)
234 dht_qe = NULL; 245 {
246 GNUNET_SCHEDULER_cancel (po->dht_task);
247 po->dht_task = GNUNET_SCHEDULER_NO_TASK;
248 }
249 if (NULL != po->dht_qe)
250 {
251 GNUNET_DATASTORE_cancel (po->dht_qe);
252 po->dht_qe = NULL;
253 }
254 i++;
235 } 255 }
236} 256}
237 257