diff options
author | Christian Grothoff <christian@grothoff.org> | 2010-02-01 09:10:08 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2010-02-01 09:10:08 +0000 |
commit | b49dc6cbb5f75fcf9a8168f3366de7ecc2262672 (patch) | |
tree | 07fdabe1d38b2e58d304e3ff34fe3225baba0603 /src/fs | |
parent | 44805bd751283cc7fdc11a4ccb46ed122dbf23ba (diff) | |
download | gnunet-b49dc6cbb5f75fcf9a8168f3366de7ecc2262672.tar.gz gnunet-b49dc6cbb5f75fcf9a8168f3366de7ecc2262672.zip |
fixing drq code
Diffstat (limited to 'src/fs')
-rw-r--r-- | src/fs/gnunet-service-fs.c | 6 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_drq.c | 195 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_drq.h | 5 |
3 files changed, 152 insertions, 54 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 740f63624..2b3b44f93 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c | |||
@@ -2168,7 +2168,8 @@ handle_p2p_get (void *cls, | |||
2168 | pr->type, | 2168 | pr->type, |
2169 | &process_local_reply, | 2169 | &process_local_reply, |
2170 | pr, | 2170 | pr, |
2171 | timeout); | 2171 | timeout, |
2172 | GNUNET_NO); | ||
2172 | 2173 | ||
2173 | /* Are multiple results possible? If so, start processing remotely now! */ | 2174 | /* Are multiple results possible? If so, start processing remotely now! */ |
2174 | switch (pr->type) | 2175 | switch (pr->type) |
@@ -2286,7 +2287,8 @@ handle_start_search (void *cls, | |||
2286 | pr->type, | 2287 | pr->type, |
2287 | &process_local_reply, | 2288 | &process_local_reply, |
2288 | pr, | 2289 | pr, |
2289 | GNUNET_TIME_UNIT_FOREVER_REL); | 2290 | GNUNET_TIME_UNIT_FOREVER_REL, |
2291 | GNUNET_YES); | ||
2290 | } | 2292 | } |
2291 | 2293 | ||
2292 | 2294 | ||
diff --git a/src/fs/gnunet-service-fs_drq.c b/src/fs/gnunet-service-fs_drq.c index c15e37a0d..afca9d970 100644 --- a/src/fs/gnunet-service-fs_drq.c +++ b/src/fs/gnunet-service-fs_drq.c | |||
@@ -74,6 +74,12 @@ struct DatastoreRequestQueue | |||
74 | */ | 74 | */ |
75 | GNUNET_SCHEDULER_TaskIdentifier task; | 75 | GNUNET_SCHEDULER_TaskIdentifier task; |
76 | 76 | ||
77 | /** | ||
78 | * Is this request at the head of the queue irrespective of its | ||
79 | * timeout value? | ||
80 | */ | ||
81 | int forced_head; | ||
82 | |||
77 | }; | 83 | }; |
78 | 84 | ||
79 | /** | 85 | /** |
@@ -101,62 +107,81 @@ static struct DatastoreRequestQueue *drq_tail; | |||
101 | */ | 107 | */ |
102 | static struct GNUNET_DATASTORE_Handle *dsh; | 108 | static struct GNUNET_DATASTORE_Handle *dsh; |
103 | 109 | ||
110 | /** | ||
111 | * Pointer to the currently actively running request, | ||
112 | * NULL if none is running. | ||
113 | */ | ||
114 | static struct DatastoreRequestQueue *drq_running; | ||
115 | |||
104 | 116 | ||
105 | /** | 117 | /** |
106 | * Run the next DS request in our | 118 | * A datastore request had to be timed out. |
107 | * queue, we're done with the current one. | 119 | * |
120 | * @param cls closure (unused) | ||
121 | * @param tc task context, unused | ||
108 | */ | 122 | */ |
109 | static void | 123 | static void |
110 | next_ds_request () | 124 | timeout_ds_request (void *cls, |
125 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
111 | { | 126 | { |
112 | struct DatastoreRequestQueue *e; | 127 | struct DatastoreRequestQueue *e = cls; |
113 | 128 | ||
114 | while (NULL != (e = drq_head)) | ||
115 | { | ||
116 | if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value) | ||
117 | break; | ||
118 | if (e->task != GNUNET_SCHEDULER_NO_TASK) | ||
119 | GNUNET_SCHEDULER_cancel (sched, e->task); | ||
120 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); | ||
121 | e->req (e->req_cls, GNUNET_NO); | ||
122 | GNUNET_free (e); | ||
123 | } | ||
124 | if (e == NULL) | ||
125 | return; | ||
126 | if (e->task != GNUNET_SCHEDULER_NO_TASK) | ||
127 | GNUNET_SCHEDULER_cancel (sched, e->task); | ||
128 | e->task = GNUNET_SCHEDULER_NO_TASK; | 129 | e->task = GNUNET_SCHEDULER_NO_TASK; |
129 | e->req (e->req_cls, GNUNET_YES); | ||
130 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); | 130 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); |
131 | e->req (e->req_cls, GNUNET_NO); | ||
131 | GNUNET_free (e); | 132 | GNUNET_free (e); |
132 | } | 133 | } |
133 | 134 | ||
134 | 135 | ||
135 | /** | 136 | /** |
136 | * A datastore request had to be timed out. | 137 | * A datastore request can be run right now. Run it. |
137 | * | 138 | * |
138 | * @param cls closure (of type "struct DatastoreRequestQueue*") | 139 | * @param cls closure (of type "struct DatastoreRequestQueue*") |
139 | * @param tc task context, unused | 140 | * @param tc task context, unused |
140 | */ | 141 | */ |
141 | static void | 142 | static void |
142 | timeout_ds_request (void *cls, | 143 | run_next_request (void *cls, |
143 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 144 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
144 | { | 145 | { |
145 | struct DatastoreRequestQueue *e = cls; | 146 | struct DatastoreRequestQueue *e = cls; |
146 | 147 | ||
147 | e->task = GNUNET_SCHEDULER_NO_TASK; | ||
148 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); | 148 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); |
149 | e->req (e->req_cls, GNUNET_NO); | 149 | drq_running = e; |
150 | GNUNET_free (e); | 150 | e->req (e->req_cls, GNUNET_YES); |
151 | } | 151 | } |
152 | 152 | ||
153 | 153 | ||
154 | /** | ||
155 | * Run the next DS request in our queue, we're done with the current | ||
156 | * one. | ||
157 | */ | ||
158 | static void | ||
159 | next_ds_request () | ||
160 | { | ||
161 | struct DatastoreRequestQueue *e; | ||
162 | |||
163 | GNUNET_free_non_null (drq_running); | ||
164 | drq_running = NULL; | ||
165 | e = drq_head; | ||
166 | if (e == NULL) | ||
167 | return; | ||
168 | GNUNET_SCHEDULER_cancel (sched, e->task); | ||
169 | e->task = GNUNET_SCHEDULER_add_now (sched, | ||
170 | &run_next_request, | ||
171 | e); | ||
172 | } | ||
173 | |||
174 | |||
175 | /** | ||
176 | * Remove a pending request from the request queue. | ||
177 | * | ||
178 | * @param req request to remove | ||
179 | */ | ||
154 | static void | 180 | static void |
155 | dequeue_ds_request (struct DatastoreRequestQueue *req) | 181 | dequeue_ds_request (struct DatastoreRequestQueue *req) |
156 | { | 182 | { |
157 | if (req->task != GNUNET_SCHEDULER_NO_TASK) | ||
158 | GNUNET_SCHEDULER_cancel (sched, req->task); | ||
159 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, req); | 183 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, req); |
184 | GNUNET_SCHEDULER_cancel (sched, req->task); | ||
160 | GNUNET_free (req); | 185 | GNUNET_free (req); |
161 | } | 186 | } |
162 | 187 | ||
@@ -167,29 +192,25 @@ dequeue_ds_request (struct DatastoreRequestQueue *req) | |||
167 | * @param deadline by when the request should run | 192 | * @param deadline by when the request should run |
168 | * @param fun function to call once the request can be run | 193 | * @param fun function to call once the request can be run |
169 | * @param fun_cls closure for fun | 194 | * @param fun_cls closure for fun |
195 | * @param immediate should this be queued immediately at | ||
196 | * the head of the queue (irrespecitive of the deadline)? | ||
170 | * @return handle that can be used to dequeue the request | 197 | * @return handle that can be used to dequeue the request |
171 | */ | 198 | */ |
172 | static struct DatastoreRequestQueue * | 199 | static struct DatastoreRequestQueue * |
173 | queue_ds_request (struct GNUNET_TIME_Relative deadline, | 200 | queue_ds_request (struct GNUNET_TIME_Relative deadline, |
174 | RequestFunction fun, | 201 | RequestFunction fun, |
175 | void *fun_cls) | 202 | void *fun_cls, |
203 | int immediate) | ||
176 | { | 204 | { |
177 | struct DatastoreRequestQueue *e; | 205 | struct DatastoreRequestQueue *e; |
178 | struct DatastoreRequestQueue *bef; | 206 | struct DatastoreRequestQueue *bef; |
179 | 207 | ||
180 | if (drq_head == NULL) | ||
181 | { | ||
182 | /* no other requests pending, run immediately */ | ||
183 | // FIXME: should probably use scheduler nevertheless | ||
184 | // and return non-null! | ||
185 | fun (fun_cls, GNUNET_OK); | ||
186 | return NULL; | ||
187 | } | ||
188 | e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue)); | 208 | e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue)); |
189 | e->timeout = GNUNET_TIME_relative_to_absolute (deadline); | 209 | e->timeout = GNUNET_TIME_relative_to_absolute (deadline); |
190 | e->req = fun; | 210 | e->req = fun; |
191 | e->req_cls = fun_cls; | 211 | e->req_cls = fun_cls; |
192 | if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) | 212 | e->forced_head = immediate; |
213 | if (GNUNET_YES == immediate) | ||
193 | { | 214 | { |
194 | /* local request, highest prio, put at head of queue | 215 | /* local request, highest prio, put at head of queue |
195 | regardless of deadline */ | 216 | regardless of deadline */ |
@@ -199,16 +220,17 @@ queue_ds_request (struct GNUNET_TIME_Relative deadline, | |||
199 | { | 220 | { |
200 | bef = drq_tail; | 221 | bef = drq_tail; |
201 | while ( (NULL != bef) && | 222 | while ( (NULL != bef) && |
202 | (e->timeout.value < bef->timeout.value) ) | 223 | (e->timeout.value < bef->timeout.value) && |
224 | (GNUNET_YES != e->forced_head) ) | ||
203 | bef = bef->prev; | 225 | bef = bef->prev; |
204 | } | 226 | } |
205 | GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e); | 227 | GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e); |
206 | if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) | ||
207 | return e; | ||
208 | e->task = GNUNET_SCHEDULER_add_delayed (sched, | 228 | e->task = GNUNET_SCHEDULER_add_delayed (sched, |
209 | deadline, | 229 | deadline, |
210 | &timeout_ds_request, | 230 | &timeout_ds_request, |
211 | e); | 231 | e); |
232 | if (drq_running == NULL) | ||
233 | next_ds_request (); | ||
212 | return e; | 234 | return e; |
213 | } | 235 | } |
214 | 236 | ||
@@ -232,23 +254,53 @@ shutdown_task (void *cls, | |||
232 | while (NULL != (drq = drq_head)) | 254 | while (NULL != (drq = drq_head)) |
233 | { | 255 | { |
234 | drq_head = drq->next; | 256 | drq_head = drq->next; |
257 | GNUNET_SCHEDULER_cancel (sched, drq->task); | ||
235 | drq->req (drq->req_cls, GNUNET_NO); | 258 | drq->req (drq->req_cls, GNUNET_NO); |
236 | dequeue_ds_request (drq); | 259 | GNUNET_free (drq); |
237 | } | 260 | } |
238 | drq_tail = NULL; | 261 | drq_tail = NULL; |
239 | } | 262 | } |
240 | 263 | ||
241 | 264 | ||
265 | /** | ||
266 | * Closure for 'do_get' and 'get_iterator'. | ||
267 | */ | ||
242 | struct GetClosure | 268 | struct GetClosure |
243 | { | 269 | { |
270 | /** | ||
271 | * Key we are doing the 'get' for. | ||
272 | */ | ||
244 | GNUNET_HashCode key; | 273 | GNUNET_HashCode key; |
274 | |||
275 | /** | ||
276 | * Datastore entry type we are doing the 'get' for. | ||
277 | */ | ||
245 | uint32_t type; | 278 | uint32_t type; |
279 | |||
280 | /** | ||
281 | * Function to call for each entry. | ||
282 | */ | ||
246 | GNUNET_DATASTORE_Iterator iter; | 283 | GNUNET_DATASTORE_Iterator iter; |
284 | |||
285 | /** | ||
286 | * Closure for iter. | ||
287 | */ | ||
247 | void *iter_cls; | 288 | void *iter_cls; |
289 | |||
290 | /** | ||
291 | * Timeout for this operation. | ||
292 | */ | ||
248 | struct GNUNET_TIME_Absolute timeout; | 293 | struct GNUNET_TIME_Absolute timeout; |
249 | }; | 294 | }; |
250 | 295 | ||
251 | 296 | ||
297 | /** | ||
298 | * Wrapper for the datastore get operation. Makes sure to trigger the | ||
299 | * next datastore operation in the queue once the operation is | ||
300 | * complete. | ||
301 | * | ||
302 | * @param cls our 'struct GetClosure*' | ||
303 | */ | ||
252 | static void | 304 | static void |
253 | get_iterator (void *cls, | 305 | get_iterator (void *cls, |
254 | const GNUNET_HashCode * key, | 306 | const GNUNET_HashCode * key, |
@@ -263,9 +315,17 @@ get_iterator (void *cls, | |||
263 | { | 315 | { |
264 | struct GetClosure *gc = cls; | 316 | struct GetClosure *gc = cls; |
265 | 317 | ||
266 | gc->iter (gc->iter_cls, | 318 | if (gc->iter == NULL) |
267 | key, size, data, type, | 319 | { |
268 | priority, anonymity, expiration, uid); | 320 | /* stop the iteration */ |
321 | GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); | ||
322 | } | ||
323 | else | ||
324 | { | ||
325 | gc->iter (gc->iter_cls, | ||
326 | key, size, data, type, | ||
327 | priority, anonymity, expiration, uid); | ||
328 | } | ||
269 | if (key == NULL) | 329 | if (key == NULL) |
270 | { | 330 | { |
271 | next_ds_request (); | 331 | next_ds_request (); |
@@ -274,6 +334,14 @@ get_iterator (void *cls, | |||
274 | } | 334 | } |
275 | 335 | ||
276 | 336 | ||
337 | /** | ||
338 | * We're at the head of the reqeust queue, execute the | ||
339 | * get operation (or signal error). | ||
340 | * | ||
341 | * @param cls the 'struct GetClosure' | ||
342 | * @param ok GNUNET_OK if we can run the GET, otherwise | ||
343 | * we need to time out | ||
344 | */ | ||
277 | static void | 345 | static void |
278 | do_get (void *cls, | 346 | do_get (void *cls, |
279 | int ok) | 347 | int ok) |
@@ -282,9 +350,10 @@ do_get (void *cls, | |||
282 | 350 | ||
283 | if (ok != GNUNET_OK) | 351 | if (ok != GNUNET_OK) |
284 | { | 352 | { |
285 | gc->iter (gc->iter_cls, | 353 | if (gc->iter != NULL) |
286 | NULL, 0, NULL, 0, 0, 0, | 354 | gc->iter (gc->iter_cls, |
287 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 355 | NULL, 0, NULL, 0, 0, 0, |
356 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
288 | GNUNET_free (gc); | 357 | GNUNET_free (gc); |
289 | next_ds_request (); | 358 | next_ds_request (); |
290 | return; | 359 | return; |
@@ -309,13 +378,16 @@ do_get (void *cls, | |||
309 | * will be called once with a NULL value at the end | 378 | * will be called once with a NULL value at the end |
310 | * @param iter_cls closure for iter | 379 | * @param iter_cls closure for iter |
311 | * @param timeout how long to wait at most for a response | 380 | * @param timeout how long to wait at most for a response |
381 | * @param immediate should this be queued immediately at | ||
382 | * the head of the queue (irrespecitive of the timeout)? | ||
312 | */ | 383 | */ |
313 | struct DatastoreRequestQueue * | 384 | struct DatastoreRequestQueue * |
314 | GNUNET_FS_drq_get (const GNUNET_HashCode * key, | 385 | GNUNET_FS_drq_get (const GNUNET_HashCode * key, |
315 | uint32_t type, | 386 | uint32_t type, |
316 | GNUNET_DATASTORE_Iterator iter, | 387 | GNUNET_DATASTORE_Iterator iter, |
317 | void *iter_cls, | 388 | void *iter_cls, |
318 | struct GNUNET_TIME_Relative timeout) | 389 | struct GNUNET_TIME_Relative timeout, |
390 | int immediate) | ||
319 | { | 391 | { |
320 | struct GetClosure *gc; | 392 | struct GetClosure *gc; |
321 | 393 | ||
@@ -327,14 +399,35 @@ GNUNET_FS_drq_get (const GNUNET_HashCode * key, | |||
327 | gc->timeout = GNUNET_TIME_relative_to_absolute (timeout); | 399 | gc->timeout = GNUNET_TIME_relative_to_absolute (timeout); |
328 | return queue_ds_request (timeout, | 400 | return queue_ds_request (timeout, |
329 | &do_get, | 401 | &do_get, |
330 | gc); | 402 | gc, |
403 | immediate); | ||
331 | } | 404 | } |
332 | 405 | ||
333 | 406 | ||
407 | /** | ||
408 | * Cancel the given operation. | ||
409 | * | ||
410 | * @param drq the queued operation (must not have been | ||
411 | * triggered so far) | ||
412 | */ | ||
334 | void | 413 | void |
335 | GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq) | 414 | GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq) |
336 | { | 415 | { |
337 | dequeue_ds_request (drq); | 416 | struct GetClosure *gc; |
417 | if (drq == drq_running) | ||
418 | { | ||
419 | /* 'DATASTORE_get' has already been started (and this call might | ||
420 | actually be be legal since it is possible that the client has | ||
421 | not yet received any calls to its the iterator; so we need | ||
422 | to cancel somehow; we do this by getting to the 'GetClosure' | ||
423 | and zeroing the 'iter' field, which stops the iteration */ | ||
424 | gc = drq_running->req_cls; | ||
425 | gc->iter = NULL; | ||
426 | } | ||
427 | else | ||
428 | { | ||
429 | dequeue_ds_request (drq); | ||
430 | } | ||
338 | } | 431 | } |
339 | 432 | ||
340 | 433 | ||
diff --git a/src/fs/gnunet-service-fs_drq.h b/src/fs/gnunet-service-fs_drq.h index 306db9bad..ec86b99be 100644 --- a/src/fs/gnunet-service-fs_drq.h +++ b/src/fs/gnunet-service-fs_drq.h | |||
@@ -49,13 +49,16 @@ struct DatastoreRequestQueue; | |||
49 | * will be called once with a NULL value at the end | 49 | * will be called once with a NULL value at the end |
50 | * @param iter_cls closure for iter | 50 | * @param iter_cls closure for iter |
51 | * @param timeout how long to wait at most for a response | 51 | * @param timeout how long to wait at most for a response |
52 | * @param immediate should this be queued immediately at | ||
53 | * the head of the queue (irrespecitive of the timeout)? | ||
52 | */ | 54 | */ |
53 | struct DatastoreRequestQueue * | 55 | struct DatastoreRequestQueue * |
54 | GNUNET_FS_drq_get (const GNUNET_HashCode * key, | 56 | GNUNET_FS_drq_get (const GNUNET_HashCode * key, |
55 | uint32_t type, | 57 | uint32_t type, |
56 | GNUNET_DATASTORE_Iterator iter, | 58 | GNUNET_DATASTORE_Iterator iter, |
57 | void *iter_cls, | 59 | void *iter_cls, |
58 | struct GNUNET_TIME_Relative timeout); | 60 | struct GNUNET_TIME_Relative timeout, |
61 | int immediate); | ||
59 | 62 | ||
60 | 63 | ||
61 | 64 | ||