aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2010-02-01 09:10:08 +0000
committerChristian Grothoff <christian@grothoff.org>2010-02-01 09:10:08 +0000
commitb49dc6cbb5f75fcf9a8168f3366de7ecc2262672 (patch)
tree07fdabe1d38b2e58d304e3ff34fe3225baba0603 /src/fs
parent44805bd751283cc7fdc11a4ccb46ed122dbf23ba (diff)
downloadgnunet-b49dc6cbb5f75fcf9a8168f3366de7ecc2262672.tar.gz
gnunet-b49dc6cbb5f75fcf9a8168f3366de7ecc2262672.zip
fixing drq code
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/gnunet-service-fs.c6
-rw-r--r--src/fs/gnunet-service-fs_drq.c195
-rw-r--r--src/fs/gnunet-service-fs_drq.h5
3 files changed, 152 insertions, 54 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index 740f63624..2b3b44f93 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -2168,7 +2168,8 @@ handle_p2p_get (void *cls,
2168 pr->type, 2168 pr->type,
2169 &process_local_reply, 2169 &process_local_reply,
2170 pr, 2170 pr,
2171 timeout); 2171 timeout,
2172 GNUNET_NO);
2172 2173
2173 /* Are multiple results possible? If so, start processing remotely now! */ 2174 /* Are multiple results possible? If so, start processing remotely now! */
2174 switch (pr->type) 2175 switch (pr->type)
@@ -2286,7 +2287,8 @@ handle_start_search (void *cls,
2286 pr->type, 2287 pr->type,
2287 &process_local_reply, 2288 &process_local_reply,
2288 pr, 2289 pr,
2289 GNUNET_TIME_UNIT_FOREVER_REL); 2290 GNUNET_TIME_UNIT_FOREVER_REL,
2291 GNUNET_YES);
2290} 2292}
2291 2293
2292 2294
diff --git a/src/fs/gnunet-service-fs_drq.c b/src/fs/gnunet-service-fs_drq.c
index c15e37a0d..afca9d970 100644
--- a/src/fs/gnunet-service-fs_drq.c
+++ b/src/fs/gnunet-service-fs_drq.c
@@ -74,6 +74,12 @@ struct DatastoreRequestQueue
74 */ 74 */
75 GNUNET_SCHEDULER_TaskIdentifier task; 75 GNUNET_SCHEDULER_TaskIdentifier task;
76 76
77 /**
78 * Is this request at the head of the queue irrespective of its
79 * timeout value?
80 */
81 int forced_head;
82
77}; 83};
78 84
79/** 85/**
@@ -101,62 +107,81 @@ static struct DatastoreRequestQueue *drq_tail;
101 */ 107 */
102static struct GNUNET_DATASTORE_Handle *dsh; 108static struct GNUNET_DATASTORE_Handle *dsh;
103 109
110/**
111 * Pointer to the currently actively running request,
112 * NULL if none is running.
113 */
114static struct DatastoreRequestQueue *drq_running;
115
104 116
105/** 117/**
106 * Run the next DS request in our 118 * A datastore request had to be timed out.
107 * queue, we're done with the current one. 119 *
120 * @param cls closure (unused)
121 * @param tc task context, unused
108 */ 122 */
109static void 123static void
110next_ds_request () 124timeout_ds_request (void *cls,
125 const struct GNUNET_SCHEDULER_TaskContext *tc)
111{ 126{
112 struct DatastoreRequestQueue *e; 127 struct DatastoreRequestQueue *e = cls;
113 128
114 while (NULL != (e = drq_head))
115 {
116 if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
117 break;
118 if (e->task != GNUNET_SCHEDULER_NO_TASK)
119 GNUNET_SCHEDULER_cancel (sched, e->task);
120 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
121 e->req (e->req_cls, GNUNET_NO);
122 GNUNET_free (e);
123 }
124 if (e == NULL)
125 return;
126 if (e->task != GNUNET_SCHEDULER_NO_TASK)
127 GNUNET_SCHEDULER_cancel (sched, e->task);
128 e->task = GNUNET_SCHEDULER_NO_TASK; 129 e->task = GNUNET_SCHEDULER_NO_TASK;
129 e->req (e->req_cls, GNUNET_YES);
130 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); 130 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
131 e->req (e->req_cls, GNUNET_NO);
131 GNUNET_free (e); 132 GNUNET_free (e);
132} 133}
133 134
134 135
135/** 136/**
136 * A datastore request had to be timed out. 137 * A datastore request can be run right now. Run it.
137 * 138 *
138 * @param cls closure (of type "struct DatastoreRequestQueue*") 139 * @param cls closure (of type "struct DatastoreRequestQueue*")
139 * @param tc task context, unused 140 * @param tc task context, unused
140 */ 141 */
141static void 142static void
142timeout_ds_request (void *cls, 143run_next_request (void *cls,
143 const struct GNUNET_SCHEDULER_TaskContext *tc) 144 const struct GNUNET_SCHEDULER_TaskContext *tc)
144{ 145{
145 struct DatastoreRequestQueue *e = cls; 146 struct DatastoreRequestQueue *e = cls;
146 147
147 e->task = GNUNET_SCHEDULER_NO_TASK;
148 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); 148 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
149 e->req (e->req_cls, GNUNET_NO); 149 drq_running = e;
150 GNUNET_free (e); 150 e->req (e->req_cls, GNUNET_YES);
151} 151}
152 152
153 153
154/**
155 * Run the next DS request in our queue, we're done with the current
156 * one.
157 */
158static void
159next_ds_request ()
160{
161 struct DatastoreRequestQueue *e;
162
163 GNUNET_free_non_null (drq_running);
164 drq_running = NULL;
165 e = drq_head;
166 if (e == NULL)
167 return;
168 GNUNET_SCHEDULER_cancel (sched, e->task);
169 e->task = GNUNET_SCHEDULER_add_now (sched,
170 &run_next_request,
171 e);
172}
173
174
175/**
176 * Remove a pending request from the request queue.
177 *
178 * @param req request to remove
179 */
154static void 180static void
155dequeue_ds_request (struct DatastoreRequestQueue *req) 181dequeue_ds_request (struct DatastoreRequestQueue *req)
156{ 182{
157 if (req->task != GNUNET_SCHEDULER_NO_TASK)
158 GNUNET_SCHEDULER_cancel (sched, req->task);
159 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, req); 183 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, req);
184 GNUNET_SCHEDULER_cancel (sched, req->task);
160 GNUNET_free (req); 185 GNUNET_free (req);
161} 186}
162 187
@@ -167,29 +192,25 @@ dequeue_ds_request (struct DatastoreRequestQueue *req)
167 * @param deadline by when the request should run 192 * @param deadline by when the request should run
168 * @param fun function to call once the request can be run 193 * @param fun function to call once the request can be run
169 * @param fun_cls closure for fun 194 * @param fun_cls closure for fun
195 * @param immediate should this be queued immediately at
196 * the head of the queue (irrespecitive of the deadline)?
170 * @return handle that can be used to dequeue the request 197 * @return handle that can be used to dequeue the request
171 */ 198 */
172static struct DatastoreRequestQueue * 199static struct DatastoreRequestQueue *
173queue_ds_request (struct GNUNET_TIME_Relative deadline, 200queue_ds_request (struct GNUNET_TIME_Relative deadline,
174 RequestFunction fun, 201 RequestFunction fun,
175 void *fun_cls) 202 void *fun_cls,
203 int immediate)
176{ 204{
177 struct DatastoreRequestQueue *e; 205 struct DatastoreRequestQueue *e;
178 struct DatastoreRequestQueue *bef; 206 struct DatastoreRequestQueue *bef;
179 207
180 if (drq_head == NULL)
181 {
182 /* no other requests pending, run immediately */
183 // FIXME: should probably use scheduler nevertheless
184 // and return non-null!
185 fun (fun_cls, GNUNET_OK);
186 return NULL;
187 }
188 e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue)); 208 e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
189 e->timeout = GNUNET_TIME_relative_to_absolute (deadline); 209 e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
190 e->req = fun; 210 e->req = fun;
191 e->req_cls = fun_cls; 211 e->req_cls = fun_cls;
192 if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) 212 e->forced_head = immediate;
213 if (GNUNET_YES == immediate)
193 { 214 {
194 /* local request, highest prio, put at head of queue 215 /* local request, highest prio, put at head of queue
195 regardless of deadline */ 216 regardless of deadline */
@@ -199,16 +220,17 @@ queue_ds_request (struct GNUNET_TIME_Relative deadline,
199 { 220 {
200 bef = drq_tail; 221 bef = drq_tail;
201 while ( (NULL != bef) && 222 while ( (NULL != bef) &&
202 (e->timeout.value < bef->timeout.value) ) 223 (e->timeout.value < bef->timeout.value) &&
224 (GNUNET_YES != e->forced_head) )
203 bef = bef->prev; 225 bef = bef->prev;
204 } 226 }
205 GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e); 227 GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
206 if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
207 return e;
208 e->task = GNUNET_SCHEDULER_add_delayed (sched, 228 e->task = GNUNET_SCHEDULER_add_delayed (sched,
209 deadline, 229 deadline,
210 &timeout_ds_request, 230 &timeout_ds_request,
211 e); 231 e);
232 if (drq_running == NULL)
233 next_ds_request ();
212 return e; 234 return e;
213} 235}
214 236
@@ -232,23 +254,53 @@ shutdown_task (void *cls,
232 while (NULL != (drq = drq_head)) 254 while (NULL != (drq = drq_head))
233 { 255 {
234 drq_head = drq->next; 256 drq_head = drq->next;
257 GNUNET_SCHEDULER_cancel (sched, drq->task);
235 drq->req (drq->req_cls, GNUNET_NO); 258 drq->req (drq->req_cls, GNUNET_NO);
236 dequeue_ds_request (drq); 259 GNUNET_free (drq);
237 } 260 }
238 drq_tail = NULL; 261 drq_tail = NULL;
239} 262}
240 263
241 264
265/**
266 * Closure for 'do_get' and 'get_iterator'.
267 */
242struct GetClosure 268struct GetClosure
243{ 269{
270 /**
271 * Key we are doing the 'get' for.
272 */
244 GNUNET_HashCode key; 273 GNUNET_HashCode key;
274
275 /**
276 * Datastore entry type we are doing the 'get' for.
277 */
245 uint32_t type; 278 uint32_t type;
279
280 /**
281 * Function to call for each entry.
282 */
246 GNUNET_DATASTORE_Iterator iter; 283 GNUNET_DATASTORE_Iterator iter;
284
285 /**
286 * Closure for iter.
287 */
247 void *iter_cls; 288 void *iter_cls;
289
290 /**
291 * Timeout for this operation.
292 */
248 struct GNUNET_TIME_Absolute timeout; 293 struct GNUNET_TIME_Absolute timeout;
249}; 294};
250 295
251 296
297/**
298 * Wrapper for the datastore get operation. Makes sure to trigger the
299 * next datastore operation in the queue once the operation is
300 * complete.
301 *
302 * @param cls our 'struct GetClosure*'
303 */
252static void 304static void
253get_iterator (void *cls, 305get_iterator (void *cls,
254 const GNUNET_HashCode * key, 306 const GNUNET_HashCode * key,
@@ -263,9 +315,17 @@ get_iterator (void *cls,
263{ 315{
264 struct GetClosure *gc = cls; 316 struct GetClosure *gc = cls;
265 317
266 gc->iter (gc->iter_cls, 318 if (gc->iter == NULL)
267 key, size, data, type, 319 {
268 priority, anonymity, expiration, uid); 320 /* stop the iteration */
321 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
322 }
323 else
324 {
325 gc->iter (gc->iter_cls,
326 key, size, data, type,
327 priority, anonymity, expiration, uid);
328 }
269 if (key == NULL) 329 if (key == NULL)
270 { 330 {
271 next_ds_request (); 331 next_ds_request ();
@@ -274,6 +334,14 @@ get_iterator (void *cls,
274} 334}
275 335
276 336
337/**
338 * We're at the head of the reqeust queue, execute the
339 * get operation (or signal error).
340 *
341 * @param cls the 'struct GetClosure'
342 * @param ok GNUNET_OK if we can run the GET, otherwise
343 * we need to time out
344 */
277static void 345static void
278do_get (void *cls, 346do_get (void *cls,
279 int ok) 347 int ok)
@@ -282,9 +350,10 @@ do_get (void *cls,
282 350
283 if (ok != GNUNET_OK) 351 if (ok != GNUNET_OK)
284 { 352 {
285 gc->iter (gc->iter_cls, 353 if (gc->iter != NULL)
286 NULL, 0, NULL, 0, 0, 0, 354 gc->iter (gc->iter_cls,
287 GNUNET_TIME_UNIT_ZERO_ABS, 0); 355 NULL, 0, NULL, 0, 0, 0,
356 GNUNET_TIME_UNIT_ZERO_ABS, 0);
288 GNUNET_free (gc); 357 GNUNET_free (gc);
289 next_ds_request (); 358 next_ds_request ();
290 return; 359 return;
@@ -309,13 +378,16 @@ do_get (void *cls,
309 * will be called once with a NULL value at the end 378 * will be called once with a NULL value at the end
310 * @param iter_cls closure for iter 379 * @param iter_cls closure for iter
311 * @param timeout how long to wait at most for a response 380 * @param timeout how long to wait at most for a response
381 * @param immediate should this be queued immediately at
382 * the head of the queue (irrespecitive of the timeout)?
312 */ 383 */
313struct DatastoreRequestQueue * 384struct DatastoreRequestQueue *
314GNUNET_FS_drq_get (const GNUNET_HashCode * key, 385GNUNET_FS_drq_get (const GNUNET_HashCode * key,
315 uint32_t type, 386 uint32_t type,
316 GNUNET_DATASTORE_Iterator iter, 387 GNUNET_DATASTORE_Iterator iter,
317 void *iter_cls, 388 void *iter_cls,
318 struct GNUNET_TIME_Relative timeout) 389 struct GNUNET_TIME_Relative timeout,
390 int immediate)
319{ 391{
320 struct GetClosure *gc; 392 struct GetClosure *gc;
321 393
@@ -327,14 +399,35 @@ GNUNET_FS_drq_get (const GNUNET_HashCode * key,
327 gc->timeout = GNUNET_TIME_relative_to_absolute (timeout); 399 gc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
328 return queue_ds_request (timeout, 400 return queue_ds_request (timeout,
329 &do_get, 401 &do_get,
330 gc); 402 gc,
403 immediate);
331} 404}
332 405
333 406
407/**
408 * Cancel the given operation.
409 *
410 * @param drq the queued operation (must not have been
411 * triggered so far)
412 */
334void 413void
335GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq) 414GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq)
336{ 415{
337 dequeue_ds_request (drq); 416 struct GetClosure *gc;
417 if (drq == drq_running)
418 {
419 /* 'DATASTORE_get' has already been started (and this call might
420 actually be be legal since it is possible that the client has
421 not yet received any calls to its the iterator; so we need
422 to cancel somehow; we do this by getting to the 'GetClosure'
423 and zeroing the 'iter' field, which stops the iteration */
424 gc = drq_running->req_cls;
425 gc->iter = NULL;
426 }
427 else
428 {
429 dequeue_ds_request (drq);
430 }
338} 431}
339 432
340 433
diff --git a/src/fs/gnunet-service-fs_drq.h b/src/fs/gnunet-service-fs_drq.h
index 306db9bad..ec86b99be 100644
--- a/src/fs/gnunet-service-fs_drq.h
+++ b/src/fs/gnunet-service-fs_drq.h
@@ -49,13 +49,16 @@ struct DatastoreRequestQueue;
49 * will be called once with a NULL value at the end 49 * will be called once with a NULL value at the end
50 * @param iter_cls closure for iter 50 * @param iter_cls closure for iter
51 * @param timeout how long to wait at most for a response 51 * @param timeout how long to wait at most for a response
52 * @param immediate should this be queued immediately at
53 * the head of the queue (irrespecitive of the timeout)?
52 */ 54 */
53struct DatastoreRequestQueue * 55struct DatastoreRequestQueue *
54GNUNET_FS_drq_get (const GNUNET_HashCode * key, 56GNUNET_FS_drq_get (const GNUNET_HashCode * key,
55 uint32_t type, 57 uint32_t type,
56 GNUNET_DATASTORE_Iterator iter, 58 GNUNET_DATASTORE_Iterator iter,
57 void *iter_cls, 59 void *iter_cls,
58 struct GNUNET_TIME_Relative timeout); 60 struct GNUNET_TIME_Relative timeout,
61 int immediate);
59 62
60 63
61 64