diff options
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r-- | src/datastore/datastore_api.c | 1295 |
1 files changed, 651 insertions, 644 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 7069975e4..b10c43944 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c | |||
@@ -31,7 +31,7 @@ | |||
31 | #include "gnunet_statistics_service.h" | 31 | #include "gnunet_statistics_service.h" |
32 | #include "datastore.h" | 32 | #include "datastore.h" |
33 | 33 | ||
34 | #define LOG(kind, ...) GNUNET_log_from(kind, "datastore-api", __VA_ARGS__) | 34 | #define LOG(kind, ...) GNUNET_log_from (kind, "datastore-api", __VA_ARGS__) |
35 | 35 | ||
36 | #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES | 36 | #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES |
37 | 37 | ||
@@ -51,7 +51,8 @@ | |||
51 | /** | 51 | /** |
52 | * Context for processing status messages. | 52 | * Context for processing status messages. |
53 | */ | 53 | */ |
54 | struct StatusContext { | 54 | struct StatusContext |
55 | { | ||
55 | /** | 56 | /** |
56 | * Continuation to call with the status. | 57 | * Continuation to call with the status. |
57 | */ | 58 | */ |
@@ -67,7 +68,8 @@ struct StatusContext { | |||
67 | /** | 68 | /** |
68 | * Context for processing result messages. | 69 | * Context for processing result messages. |
69 | */ | 70 | */ |
70 | struct ResultContext { | 71 | struct ResultContext |
72 | { | ||
71 | /** | 73 | /** |
72 | * Function to call with the result. | 74 | * Function to call with the result. |
73 | */ | 75 | */ |
@@ -83,7 +85,8 @@ struct ResultContext { | |||
83 | /** | 85 | /** |
84 | * Context for a queue operation. | 86 | * Context for a queue operation. |
85 | */ | 87 | */ |
86 | union QueueContext { | 88 | union QueueContext |
89 | { | ||
87 | struct StatusContext sc; | 90 | struct StatusContext sc; |
88 | 91 | ||
89 | struct ResultContext rc; | 92 | struct ResultContext rc; |
@@ -93,7 +96,8 @@ union QueueContext { | |||
93 | /** | 96 | /** |
94 | * Entry in our priority queue. | 97 | * Entry in our priority queue. |
95 | */ | 98 | */ |
96 | struct GNUNET_DATASTORE_QueueEntry { | 99 | struct GNUNET_DATASTORE_QueueEntry |
100 | { | ||
97 | /** | 101 | /** |
98 | * This is a linked list. | 102 | * This is a linked list. |
99 | */ | 103 | */ |
@@ -157,7 +161,8 @@ struct GNUNET_DATASTORE_QueueEntry { | |||
157 | /** | 161 | /** |
158 | * Handle to the datastore service. | 162 | * Handle to the datastore service. |
159 | */ | 163 | */ |
160 | struct GNUNET_DATASTORE_Handle { | 164 | struct GNUNET_DATASTORE_Handle |
165 | { | ||
161 | /** | 166 | /** |
162 | * Our configuration. | 167 | * Our configuration. |
163 | */ | 168 | */ |
@@ -219,7 +224,7 @@ struct GNUNET_DATASTORE_Handle { | |||
219 | * @param cls the `struct GNUNET_DATASTORE_Handle` | 224 | * @param cls the `struct GNUNET_DATASTORE_Handle` |
220 | */ | 225 | */ |
221 | static void | 226 | static void |
222 | try_reconnect(void *cls); | 227 | try_reconnect (void *cls); |
223 | 228 | ||
224 | 229 | ||
225 | /** | 230 | /** |
@@ -229,20 +234,20 @@ try_reconnect(void *cls); | |||
229 | * @param h handle to datastore to disconnect and reconnect | 234 | * @param h handle to datastore to disconnect and reconnect |
230 | */ | 235 | */ |
231 | static void | 236 | static void |
232 | do_disconnect(struct GNUNET_DATASTORE_Handle *h) | 237 | do_disconnect (struct GNUNET_DATASTORE_Handle *h) |
233 | { | 238 | { |
234 | if (NULL == h->mq) | 239 | if (NULL == h->mq) |
235 | { | 240 | { |
236 | GNUNET_break(0); | 241 | GNUNET_break (0); |
237 | return; | 242 | return; |
238 | } | 243 | } |
239 | GNUNET_MQ_destroy(h->mq); | 244 | GNUNET_MQ_destroy (h->mq); |
240 | h->mq = NULL; | 245 | h->mq = NULL; |
241 | h->skip_next_messages = 0; | 246 | h->skip_next_messages = 0; |
242 | h->reconnect_task | 247 | h->reconnect_task |
243 | = GNUNET_SCHEDULER_add_delayed(h->retry_time, | 248 | = GNUNET_SCHEDULER_add_delayed (h->retry_time, |
244 | &try_reconnect, | 249 | &try_reconnect, |
245 | h); | 250 | h); |
246 | } | 251 | } |
247 | 252 | ||
248 | 253 | ||
@@ -254,19 +259,19 @@ do_disconnect(struct GNUNET_DATASTORE_Handle *h) | |||
254 | * @param qe entry to free. | 259 | * @param qe entry to free. |
255 | */ | 260 | */ |
256 | static void | 261 | static void |
257 | free_queue_entry(struct GNUNET_DATASTORE_QueueEntry *qe) | 262 | free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) |
258 | { | 263 | { |
259 | struct GNUNET_DATASTORE_Handle *h = qe->h; | 264 | struct GNUNET_DATASTORE_Handle *h = qe->h; |
260 | 265 | ||
261 | GNUNET_CONTAINER_DLL_remove(h->queue_head, | 266 | GNUNET_CONTAINER_DLL_remove (h->queue_head, |
262 | h->queue_tail, | 267 | h->queue_tail, |
263 | qe); | 268 | qe); |
264 | h->queue_size--; | 269 | h->queue_size--; |
265 | if (NULL != qe->env) | 270 | if (NULL != qe->env) |
266 | GNUNET_MQ_discard(qe->env); | 271 | GNUNET_MQ_discard (qe->env); |
267 | if (NULL != qe->delay_warn_task) | 272 | if (NULL != qe->delay_warn_task) |
268 | GNUNET_SCHEDULER_cancel(qe->delay_warn_task); | 273 | GNUNET_SCHEDULER_cancel (qe->delay_warn_task); |
269 | GNUNET_free(qe); | 274 | GNUNET_free (qe); |
270 | } | 275 | } |
271 | 276 | ||
272 | 277 | ||
@@ -276,20 +281,20 @@ free_queue_entry(struct GNUNET_DATASTORE_QueueEntry *qe) | |||
276 | * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is | 281 | * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is |
277 | */ | 282 | */ |
278 | static void | 283 | static void |
279 | delay_warning(void *cls) | 284 | delay_warning (void *cls) |
280 | { | 285 | { |
281 | struct GNUNET_DATASTORE_QueueEntry *qe = cls; | 286 | struct GNUNET_DATASTORE_QueueEntry *qe = cls; |
282 | 287 | ||
283 | qe->delay_warn_task = NULL; | 288 | qe->delay_warn_task = NULL; |
284 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, | 289 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
285 | "Request %p of type %u at head of datastore queue for more than %s\n", | 290 | "Request %p of type %u at head of datastore queue for more than %s\n", |
286 | qe, | 291 | qe, |
287 | (unsigned int)qe->response_type, | 292 | (unsigned int) qe->response_type, |
288 | GNUNET_STRINGS_relative_time_to_string(DELAY_WARN_TIMEOUT, | 293 | GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT, |
289 | GNUNET_YES)); | 294 | GNUNET_YES)); |
290 | qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed(DELAY_WARN_TIMEOUT, | 295 | qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT, |
291 | &delay_warning, | 296 | &delay_warning, |
292 | qe); | 297 | qe); |
293 | } | 298 | } |
294 | 299 | ||
295 | 300 | ||
@@ -300,59 +305,59 @@ delay_warning(void *cls) | |||
300 | * @param error error code | 305 | * @param error error code |
301 | */ | 306 | */ |
302 | static void | 307 | static void |
303 | mq_error_handler(void *cls, | 308 | mq_error_handler (void *cls, |
304 | enum GNUNET_MQ_Error error) | 309 | enum GNUNET_MQ_Error error) |
305 | { | 310 | { |
306 | struct GNUNET_DATASTORE_Handle *h = cls; | 311 | struct GNUNET_DATASTORE_Handle *h = cls; |
307 | struct GNUNET_DATASTORE_QueueEntry *qe; | 312 | struct GNUNET_DATASTORE_QueueEntry *qe; |
308 | 313 | ||
309 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 314 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
310 | "MQ error, reconnecting to DATASTORE\n"); | 315 | "MQ error, reconnecting to DATASTORE\n"); |
311 | do_disconnect(h); | 316 | do_disconnect (h); |
312 | qe = h->queue_head; | 317 | qe = h->queue_head; |
313 | if (NULL == qe) | 318 | if (NULL == qe) |
314 | return; | 319 | return; |
315 | if (NULL != qe->delay_warn_task) | 320 | if (NULL != qe->delay_warn_task) |
316 | { | 321 | { |
317 | GNUNET_SCHEDULER_cancel(qe->delay_warn_task); | 322 | GNUNET_SCHEDULER_cancel (qe->delay_warn_task); |
318 | qe->delay_warn_task = NULL; | 323 | qe->delay_warn_task = NULL; |
319 | } | 324 | } |
320 | if (NULL == qe->env) | 325 | if (NULL == qe->env) |
326 | { | ||
327 | union QueueContext qc = qe->qc; | ||
328 | uint16_t rt = qe->response_type; | ||
329 | |||
330 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
331 | "Failed to receive response from database.\n"); | ||
332 | free_queue_entry (qe); | ||
333 | switch (rt) | ||
321 | { | 334 | { |
322 | union QueueContext qc = qe->qc; | 335 | case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: |
323 | uint16_t rt = qe->response_type; | 336 | if (NULL != qc.sc.cont) |
324 | 337 | qc.sc.cont (qc.sc.cont_cls, | |
325 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 338 | GNUNET_SYSERR, |
326 | "Failed to receive response from database.\n"); | 339 | GNUNET_TIME_UNIT_ZERO_ABS, |
327 | free_queue_entry(qe); | 340 | _ ("DATASTORE disconnected")); |
328 | switch (rt) | 341 | break; |
329 | { | 342 | |
330 | case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: | 343 | case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: |
331 | if (NULL != qc.sc.cont) | 344 | if (NULL != qc.rc.proc) |
332 | qc.sc.cont(qc.sc.cont_cls, | 345 | qc.rc.proc (qc.rc.proc_cls, |
333 | GNUNET_SYSERR, | 346 | NULL, |
334 | GNUNET_TIME_UNIT_ZERO_ABS, | 347 | 0, |
335 | _("DATASTORE disconnected")); | 348 | NULL, |
336 | break; | 349 | 0, |
337 | 350 | 0, | |
338 | case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: | 351 | 0, |
339 | if (NULL != qc.rc.proc) | 352 | 0, |
340 | qc.rc.proc(qc.rc.proc_cls, | 353 | GNUNET_TIME_UNIT_ZERO_ABS, |
341 | NULL, | 354 | 0); |
342 | 0, | 355 | break; |
343 | NULL, | 356 | |
344 | 0, | 357 | default: |
345 | 0, | 358 | GNUNET_break (0); |
346 | 0, | ||
347 | 0, | ||
348 | GNUNET_TIME_UNIT_ZERO_ABS, | ||
349 | 0); | ||
350 | break; | ||
351 | |||
352 | default: | ||
353 | GNUNET_break(0); | ||
354 | } | ||
355 | } | 359 | } |
360 | } | ||
356 | } | 361 | } |
357 | 362 | ||
358 | 363 | ||
@@ -363,22 +368,22 @@ mq_error_handler(void *cls, | |||
363 | * @return handle to use to access the service | 368 | * @return handle to use to access the service |
364 | */ | 369 | */ |
365 | struct GNUNET_DATASTORE_Handle * | 370 | struct GNUNET_DATASTORE_Handle * |
366 | GNUNET_DATASTORE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg) | 371 | GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) |
367 | { | 372 | { |
368 | struct GNUNET_DATASTORE_Handle *h; | 373 | struct GNUNET_DATASTORE_Handle *h; |
369 | 374 | ||
370 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 375 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
371 | "Establishing DATASTORE connection!\n"); | 376 | "Establishing DATASTORE connection!\n"); |
372 | h = GNUNET_new(struct GNUNET_DATASTORE_Handle); | 377 | h = GNUNET_new (struct GNUNET_DATASTORE_Handle); |
373 | h->cfg = cfg; | 378 | h->cfg = cfg; |
374 | try_reconnect(h); | 379 | try_reconnect (h); |
375 | if (NULL == h->mq) | 380 | if (NULL == h->mq) |
376 | { | 381 | { |
377 | GNUNET_free(h); | 382 | GNUNET_free (h); |
378 | return NULL; | 383 | return NULL; |
379 | } | 384 | } |
380 | h->stats = GNUNET_STATISTICS_create("datastore-api", | 385 | h->stats = GNUNET_STATISTICS_create ("datastore-api", |
381 | cfg); | 386 | cfg); |
382 | return h; | 387 | return h; |
383 | } | 388 | } |
384 | 389 | ||
@@ -390,14 +395,14 @@ GNUNET_DATASTORE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
390 | * @param cls the datastore handle | 395 | * @param cls the datastore handle |
391 | */ | 396 | */ |
392 | static void | 397 | static void |
393 | disconnect_after_drop(void *cls) | 398 | disconnect_after_drop (void *cls) |
394 | { | 399 | { |
395 | struct GNUNET_DATASTORE_Handle *h = cls; | 400 | struct GNUNET_DATASTORE_Handle *h = cls; |
396 | 401 | ||
397 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 402 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
398 | "Drop sent, disconnecting\n"); | 403 | "Drop sent, disconnecting\n"); |
399 | GNUNET_DATASTORE_disconnect(h, | 404 | GNUNET_DATASTORE_disconnect (h, |
400 | GNUNET_NO); | 405 | GNUNET_NO); |
401 | } | 406 | } |
402 | 407 | ||
403 | 408 | ||
@@ -408,15 +413,15 @@ disconnect_after_drop(void *cls) | |||
408 | * @param error error code | 413 | * @param error error code |
409 | */ | 414 | */ |
410 | static void | 415 | static void |
411 | disconnect_on_mq_error(void *cls, | 416 | disconnect_on_mq_error (void *cls, |
412 | enum GNUNET_MQ_Error error) | 417 | enum GNUNET_MQ_Error error) |
413 | { | 418 | { |
414 | struct GNUNET_DATASTORE_Handle *h = cls; | 419 | struct GNUNET_DATASTORE_Handle *h = cls; |
415 | 420 | ||
416 | LOG(GNUNET_ERROR_TYPE_ERROR, | 421 | LOG (GNUNET_ERROR_TYPE_ERROR, |
417 | "Failed to ask datastore to drop tables\n"); | 422 | "Failed to ask datastore to drop tables\n"); |
418 | GNUNET_DATASTORE_disconnect(h, | 423 | GNUNET_DATASTORE_disconnect (h, |
419 | GNUNET_NO); | 424 | GNUNET_NO); |
420 | } | 425 | } |
421 | 426 | ||
422 | 427 | ||
@@ -428,84 +433,84 @@ disconnect_on_mq_error(void *cls, | |||
428 | * @param drop set to #GNUNET_YES to delete all data in datastore (!) | 433 | * @param drop set to #GNUNET_YES to delete all data in datastore (!) |
429 | */ | 434 | */ |
430 | void | 435 | void |
431 | GNUNET_DATASTORE_disconnect(struct GNUNET_DATASTORE_Handle *h, | 436 | GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, |
432 | int drop) | 437 | int drop) |
433 | { | 438 | { |
434 | struct GNUNET_DATASTORE_QueueEntry *qe; | 439 | struct GNUNET_DATASTORE_QueueEntry *qe; |
435 | 440 | ||
436 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 441 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
437 | "Datastore disconnect\n"); | 442 | "Datastore disconnect\n"); |
438 | if (NULL != h->mq) | 443 | if (NULL != h->mq) |
439 | { | 444 | { |
440 | GNUNET_MQ_destroy(h->mq); | 445 | GNUNET_MQ_destroy (h->mq); |
441 | h->mq = NULL; | 446 | h->mq = NULL; |
442 | } | 447 | } |
443 | if (NULL != h->reconnect_task) | 448 | if (NULL != h->reconnect_task) |
444 | { | 449 | { |
445 | GNUNET_SCHEDULER_cancel(h->reconnect_task); | 450 | GNUNET_SCHEDULER_cancel (h->reconnect_task); |
446 | h->reconnect_task = NULL; | 451 | h->reconnect_task = NULL; |
447 | } | 452 | } |
448 | while (NULL != (qe = h->queue_head)) | 453 | while (NULL != (qe = h->queue_head)) |
454 | { | ||
455 | switch (qe->response_type) | ||
449 | { | 456 | { |
450 | switch (qe->response_type) | 457 | case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: |
451 | { | 458 | if (NULL != qe->qc.sc.cont) |
452 | case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS: | 459 | qe->qc.sc.cont (qe->qc.sc.cont_cls, |
453 | if (NULL != qe->qc.sc.cont) | 460 | GNUNET_SYSERR, |
454 | qe->qc.sc.cont(qe->qc.sc.cont_cls, | 461 | GNUNET_TIME_UNIT_ZERO_ABS, |
455 | GNUNET_SYSERR, | 462 | _ ("Disconnected from DATASTORE")); |
456 | GNUNET_TIME_UNIT_ZERO_ABS, | 463 | break; |
457 | _("Disconnected from DATASTORE")); | 464 | |
458 | break; | 465 | case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: |
459 | 466 | if (NULL != qe->qc.rc.proc) | |
460 | case GNUNET_MESSAGE_TYPE_DATASTORE_DATA: | 467 | qe->qc.rc.proc (qe->qc.rc.proc_cls, |
461 | if (NULL != qe->qc.rc.proc) | 468 | NULL, |
462 | qe->qc.rc.proc(qe->qc.rc.proc_cls, | 469 | 0, |
463 | NULL, | 470 | NULL, |
464 | 0, | 471 | 0, |
465 | NULL, | 472 | 0, |
466 | 0, | 473 | 0, |
467 | 0, | 474 | 0, |
468 | 0, | 475 | GNUNET_TIME_UNIT_ZERO_ABS, |
469 | 0, | 476 | 0); |
470 | GNUNET_TIME_UNIT_ZERO_ABS, | 477 | break; |
471 | 0); | 478 | |
472 | break; | 479 | default: |
473 | 480 | GNUNET_break (0); | |
474 | default: | ||
475 | GNUNET_break(0); | ||
476 | } | ||
477 | free_queue_entry(qe); | ||
478 | } | 481 | } |
482 | free_queue_entry (qe); | ||
483 | } | ||
479 | if (GNUNET_YES == drop) | 484 | if (GNUNET_YES == drop) |
485 | { | ||
486 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
487 | "Re-connecting to issue DROP!\n"); | ||
488 | GNUNET_assert (NULL == h->mq); | ||
489 | h->mq = GNUNET_CLIENT_connect (h->cfg, | ||
490 | "datastore", | ||
491 | NULL, | ||
492 | &disconnect_on_mq_error, | ||
493 | h); | ||
494 | if (NULL != h->mq) | ||
480 | { | 495 | { |
481 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 496 | struct GNUNET_MessageHeader *hdr; |
482 | "Re-connecting to issue DROP!\n"); | 497 | struct GNUNET_MQ_Envelope *env; |
483 | GNUNET_assert(NULL == h->mq); | 498 | |
484 | h->mq = GNUNET_CLIENT_connect(h->cfg, | 499 | env = GNUNET_MQ_msg (hdr, |
485 | "datastore", | 500 | GNUNET_MESSAGE_TYPE_DATASTORE_DROP); |
486 | NULL, | 501 | GNUNET_MQ_notify_sent (env, |
487 | &disconnect_on_mq_error, | 502 | &disconnect_after_drop, |
488 | h); | 503 | h); |
489 | if (NULL != h->mq) | 504 | GNUNET_MQ_send (h->mq, |
490 | { | 505 | env); |
491 | struct GNUNET_MessageHeader *hdr; | 506 | return; |
492 | struct GNUNET_MQ_Envelope *env; | ||
493 | |||
494 | env = GNUNET_MQ_msg(hdr, | ||
495 | GNUNET_MESSAGE_TYPE_DATASTORE_DROP); | ||
496 | GNUNET_MQ_notify_sent(env, | ||
497 | &disconnect_after_drop, | ||
498 | h); | ||
499 | GNUNET_MQ_send(h->mq, | ||
500 | env); | ||
501 | return; | ||
502 | } | ||
503 | GNUNET_break(0); | ||
504 | } | 507 | } |
505 | GNUNET_STATISTICS_destroy(h->stats, | 508 | GNUNET_break (0); |
506 | GNUNET_NO); | 509 | } |
510 | GNUNET_STATISTICS_destroy (h->stats, | ||
511 | GNUNET_NO); | ||
507 | h->stats = NULL; | 512 | h->stats = NULL; |
508 | GNUNET_free(h); | 513 | GNUNET_free (h); |
509 | } | 514 | } |
510 | 515 | ||
511 | 516 | ||
@@ -525,12 +530,12 @@ GNUNET_DATASTORE_disconnect(struct GNUNET_DATASTORE_Handle *h, | |||
525 | * @return NULL if the queue is full | 530 | * @return NULL if the queue is full |
526 | */ | 531 | */ |
527 | static struct GNUNET_DATASTORE_QueueEntry * | 532 | static struct GNUNET_DATASTORE_QueueEntry * |
528 | make_queue_entry(struct GNUNET_DATASTORE_Handle *h, | 533 | make_queue_entry (struct GNUNET_DATASTORE_Handle *h, |
529 | struct GNUNET_MQ_Envelope *env, | 534 | struct GNUNET_MQ_Envelope *env, |
530 | unsigned int queue_priority, | 535 | unsigned int queue_priority, |
531 | unsigned int max_queue_size, | 536 | unsigned int max_queue_size, |
532 | uint16_t expected_type, | 537 | uint16_t expected_type, |
533 | const union QueueContext *qc) | 538 | const union QueueContext *qc) |
534 | { | 539 | { |
535 | struct GNUNET_DATASTORE_QueueEntry *qe; | 540 | struct GNUNET_DATASTORE_QueueEntry *qe; |
536 | struct GNUNET_DATASTORE_QueueEntry *pos; | 541 | struct GNUNET_DATASTORE_QueueEntry *pos; |
@@ -538,32 +543,32 @@ make_queue_entry(struct GNUNET_DATASTORE_Handle *h, | |||
538 | 543 | ||
539 | if ((NULL != h->queue_tail) && | 544 | if ((NULL != h->queue_tail) && |
540 | (h->queue_tail->priority >= queue_priority)) | 545 | (h->queue_tail->priority >= queue_priority)) |
541 | { | 546 | { |
542 | c = h->queue_size; | 547 | c = h->queue_size; |
543 | pos = NULL; | 548 | pos = NULL; |
544 | } | 549 | } |
545 | else | 550 | else |
546 | { | 551 | { |
547 | c = 0; | 552 | c = 0; |
548 | pos = h->queue_head; | 553 | pos = h->queue_head; |
549 | } | 554 | } |
550 | while ((NULL != pos) && | 555 | while ((NULL != pos) && |
551 | (c < max_queue_size) && | 556 | (c < max_queue_size) && |
552 | (pos->priority >= queue_priority)) | 557 | (pos->priority >= queue_priority)) |
553 | { | 558 | { |
554 | c++; | 559 | c++; |
555 | pos = pos->next; | 560 | pos = pos->next; |
556 | } | 561 | } |
557 | if (c >= max_queue_size) | 562 | if (c >= max_queue_size) |
558 | { | 563 | { |
559 | GNUNET_STATISTICS_update(h->stats, | 564 | GNUNET_STATISTICS_update (h->stats, |
560 | gettext_noop("# queue overflows"), | 565 | gettext_noop ("# queue overflows"), |
561 | 1, | 566 | 1, |
562 | GNUNET_NO); | 567 | GNUNET_NO); |
563 | GNUNET_MQ_discard(env); | 568 | GNUNET_MQ_discard (env); |
564 | return NULL; | 569 | return NULL; |
565 | } | 570 | } |
566 | qe = GNUNET_new(struct GNUNET_DATASTORE_QueueEntry); | 571 | qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry); |
567 | qe->h = h; | 572 | qe->h = h; |
568 | qe->env = env; | 573 | qe->env = env; |
569 | qe->response_type = expected_type; | 574 | qe->response_type = expected_type; |
@@ -571,30 +576,30 @@ make_queue_entry(struct GNUNET_DATASTORE_Handle *h, | |||
571 | qe->priority = queue_priority; | 576 | qe->priority = queue_priority; |
572 | qe->max_queue = max_queue_size; | 577 | qe->max_queue = max_queue_size; |
573 | if (NULL == pos) | 578 | if (NULL == pos) |
574 | { | 579 | { |
575 | /* append at the tail */ | 580 | /* append at the tail */ |
576 | pos = h->queue_tail; | 581 | pos = h->queue_tail; |
577 | } | 582 | } |
578 | else | 583 | else |
579 | { | 584 | { |
580 | pos = pos->prev; | 585 | pos = pos->prev; |
581 | /* do not insert at HEAD if HEAD query was already | 586 | /* do not insert at HEAD if HEAD query was already |
582 | * transmitted and we are still receiving replies! */ | 587 | * transmitted and we are still receiving replies! */ |
583 | if ((NULL == pos) && | 588 | if ((NULL == pos) && |
584 | (NULL == h->queue_head->env)) | 589 | (NULL == h->queue_head->env)) |
585 | pos = h->queue_head; | 590 | pos = h->queue_head; |
586 | } | 591 | } |
587 | c++; | 592 | c++; |
588 | #if INSANE_STATISTICS | 593 | #if INSANE_STATISTICS |
589 | GNUNET_STATISTICS_update(h->stats, | 594 | GNUNET_STATISTICS_update (h->stats, |
590 | gettext_noop("# queue entries created"), | 595 | gettext_noop ("# queue entries created"), |
591 | 1, | 596 | 1, |
592 | GNUNET_NO); | 597 | GNUNET_NO); |
593 | #endif | 598 | #endif |
594 | GNUNET_CONTAINER_DLL_insert_after(h->queue_head, | 599 | GNUNET_CONTAINER_DLL_insert_after (h->queue_head, |
595 | h->queue_tail, | 600 | h->queue_tail, |
596 | pos, | 601 | pos, |
597 | qe); | 602 | qe); |
598 | h->queue_size++; | 603 | h->queue_size++; |
599 | return qe; | 604 | return qe; |
600 | } | 605 | } |
@@ -607,37 +612,37 @@ make_queue_entry(struct GNUNET_DATASTORE_Handle *h, | |||
607 | * @param h handle to the datastore | 612 | * @param h handle to the datastore |
608 | */ | 613 | */ |
609 | static void | 614 | static void |
610 | process_queue(struct GNUNET_DATASTORE_Handle *h) | 615 | process_queue (struct GNUNET_DATASTORE_Handle *h) |
611 | { | 616 | { |
612 | struct GNUNET_DATASTORE_QueueEntry *qe; | 617 | struct GNUNET_DATASTORE_QueueEntry *qe; |
613 | 618 | ||
614 | if (NULL == (qe = h->queue_head)) | 619 | if (NULL == (qe = h->queue_head)) |
615 | { | 620 | { |
616 | /* no entry in queue */ | 621 | /* no entry in queue */ |
617 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 622 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
618 | "Queue empty\n"); | 623 | "Queue empty\n"); |
619 | return; | 624 | return; |
620 | } | 625 | } |
621 | if (NULL == qe->env) | 626 | if (NULL == qe->env) |
622 | { | 627 | { |
623 | /* waiting for replies */ | 628 | /* waiting for replies */ |
624 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 629 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
625 | "Head request already transmitted\n"); | 630 | "Head request already transmitted\n"); |
626 | return; | 631 | return; |
627 | } | 632 | } |
628 | if (NULL == h->mq) | 633 | if (NULL == h->mq) |
629 | { | 634 | { |
630 | /* waiting for reconnect */ | 635 | /* waiting for reconnect */ |
631 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 636 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
632 | "Not connected\n"); | 637 | "Not connected\n"); |
633 | return; | 638 | return; |
634 | } | 639 | } |
635 | GNUNET_assert(NULL == qe->delay_warn_task); | 640 | GNUNET_assert (NULL == qe->delay_warn_task); |
636 | qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed(DELAY_WARN_TIMEOUT, | 641 | qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT, |
637 | &delay_warning, | 642 | &delay_warning, |
638 | qe); | 643 | qe); |
639 | GNUNET_MQ_send(h->mq, | 644 | GNUNET_MQ_send (h->mq, |
640 | qe->env); | 645 | qe->env); |
641 | qe->env = NULL; | 646 | qe->env = NULL; |
642 | } | 647 | } |
643 | 648 | ||
@@ -650,36 +655,36 @@ process_queue(struct GNUNET_DATASTORE_Handle *h) | |||
650 | * @return the queue entry | 655 | * @return the queue entry |
651 | */ | 656 | */ |
652 | static struct GNUNET_DATASTORE_QueueEntry * | 657 | static struct GNUNET_DATASTORE_QueueEntry * |
653 | get_queue_head(struct GNUNET_DATASTORE_Handle *h, | 658 | get_queue_head (struct GNUNET_DATASTORE_Handle *h, |
654 | uint16_t response_type) | 659 | uint16_t response_type) |
655 | { | 660 | { |
656 | struct GNUNET_DATASTORE_QueueEntry *qe; | 661 | struct GNUNET_DATASTORE_QueueEntry *qe; |
657 | 662 | ||
658 | if (h->skip_next_messages > 0) | 663 | if (h->skip_next_messages > 0) |
659 | { | 664 | { |
660 | h->skip_next_messages--; | 665 | h->skip_next_messages--; |
661 | process_queue(h); | 666 | process_queue (h); |
662 | return NULL; | 667 | return NULL; |
663 | } | 668 | } |
664 | qe = h->queue_head; | 669 | qe = h->queue_head; |
665 | if (NULL == qe) | 670 | if (NULL == qe) |
666 | { | 671 | { |
667 | GNUNET_break(0); | 672 | GNUNET_break (0); |
668 | do_disconnect(h); | 673 | do_disconnect (h); |
669 | return NULL; | 674 | return NULL; |
670 | } | 675 | } |
671 | if (NULL != qe->env) | 676 | if (NULL != qe->env) |
672 | { | 677 | { |
673 | GNUNET_break(0); | 678 | GNUNET_break (0); |
674 | do_disconnect(h); | 679 | do_disconnect (h); |
675 | return NULL; | 680 | return NULL; |
676 | } | 681 | } |
677 | if (response_type != qe->response_type) | 682 | if (response_type != qe->response_type) |
678 | { | 683 | { |
679 | GNUNET_break(0); | 684 | GNUNET_break (0); |
680 | do_disconnect(h); | 685 | do_disconnect (h); |
681 | return NULL; | 686 | return NULL; |
682 | } | 687 | } |
683 | return qe; | 688 | return qe; |
684 | } | 689 | } |
685 | 690 | ||
@@ -692,27 +697,27 @@ get_queue_head(struct GNUNET_DATASTORE_Handle *h, | |||
692 | * @return #GNUNET_OK if the message is well-formed | 697 | * @return #GNUNET_OK if the message is well-formed |
693 | */ | 698 | */ |
694 | static int | 699 | static int |
695 | check_status(void *cls, | 700 | check_status (void *cls, |
696 | const struct StatusMessage *sm) | 701 | const struct StatusMessage *sm) |
697 | { | 702 | { |
698 | uint16_t msize = ntohs(sm->header.size) - sizeof(*sm); | 703 | uint16_t msize = ntohs (sm->header.size) - sizeof(*sm); |
699 | int32_t status = ntohl(sm->status); | 704 | int32_t status = ntohl (sm->status); |
700 | 705 | ||
701 | if (msize > 0) | 706 | if (msize > 0) |
702 | { | 707 | { |
703 | const char *emsg = (const char *)&sm[1]; | 708 | const char *emsg = (const char *) &sm[1]; |
704 | 709 | ||
705 | if ('\0' != emsg[msize - 1]) | 710 | if ('\0' != emsg[msize - 1]) |
706 | { | ||
707 | GNUNET_break(0); | ||
708 | return GNUNET_SYSERR; | ||
709 | } | ||
710 | } | ||
711 | else if (GNUNET_SYSERR == status) | ||
712 | { | 711 | { |
713 | GNUNET_break(0); | 712 | GNUNET_break (0); |
714 | return GNUNET_SYSERR; | 713 | return GNUNET_SYSERR; |
715 | } | 714 | } |
715 | } | ||
716 | else if (GNUNET_SYSERR == status) | ||
717 | { | ||
718 | GNUNET_break (0); | ||
719 | return GNUNET_SYSERR; | ||
720 | } | ||
716 | return GNUNET_OK; | 721 | return GNUNET_OK; |
717 | } | 722 | } |
718 | 723 | ||
@@ -724,40 +729,40 @@ check_status(void *cls, | |||
724 | * @param sm status message received | 729 | * @param sm status message received |
725 | */ | 730 | */ |
726 | static void | 731 | static void |
727 | handle_status(void *cls, | 732 | handle_status (void *cls, |
728 | const struct StatusMessage *sm) | 733 | const struct StatusMessage *sm) |
729 | { | 734 | { |
730 | struct GNUNET_DATASTORE_Handle *h = cls; | 735 | struct GNUNET_DATASTORE_Handle *h = cls; |
731 | struct GNUNET_DATASTORE_QueueEntry *qe; | 736 | struct GNUNET_DATASTORE_QueueEntry *qe; |
732 | struct StatusContext rc; | 737 | struct StatusContext rc; |
733 | const char *emsg; | 738 | const char *emsg; |
734 | int32_t status = ntohl(sm->status); | 739 | int32_t status = ntohl (sm->status); |
735 | 740 | ||
736 | qe = get_queue_head(h, | 741 | qe = get_queue_head (h, |
737 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS); | 742 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS); |
738 | if (NULL == qe) | 743 | if (NULL == qe) |
739 | return; | 744 | return; |
740 | rc = qe->qc.sc; | 745 | rc = qe->qc.sc; |
741 | free_queue_entry(qe); | 746 | free_queue_entry (qe); |
742 | if (ntohs(sm->header.size) > sizeof(struct StatusMessage)) | 747 | if (ntohs (sm->header.size) > sizeof(struct StatusMessage)) |
743 | emsg = (const char *)&sm[1]; | 748 | emsg = (const char *) &sm[1]; |
744 | else | 749 | else |
745 | emsg = NULL; | 750 | emsg = NULL; |
746 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 751 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
747 | "Received status %d/%s\n", | 752 | "Received status %d/%s\n", |
748 | (int)status, | 753 | (int) status, |
749 | emsg); | 754 | emsg); |
750 | GNUNET_STATISTICS_update(h->stats, | 755 | GNUNET_STATISTICS_update (h->stats, |
751 | gettext_noop("# status messages received"), | 756 | gettext_noop ("# status messages received"), |
752 | 1, | 757 | 1, |
753 | GNUNET_NO); | 758 | GNUNET_NO); |
754 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | 759 | h->retry_time = GNUNET_TIME_UNIT_ZERO; |
755 | process_queue(h); | 760 | process_queue (h); |
756 | if (NULL != rc.cont) | 761 | if (NULL != rc.cont) |
757 | rc.cont(rc.cont_cls, | 762 | rc.cont (rc.cont_cls, |
758 | status, | 763 | status, |
759 | GNUNET_TIME_absolute_ntoh(sm->min_expiration), | 764 | GNUNET_TIME_absolute_ntoh (sm->min_expiration), |
760 | emsg); | 765 | emsg); |
761 | } | 766 | } |
762 | 767 | ||
763 | 768 | ||
@@ -768,16 +773,16 @@ handle_status(void *cls, | |||
768 | * @param dm message received | 773 | * @param dm message received |
769 | */ | 774 | */ |
770 | static int | 775 | static int |
771 | check_data(void *cls, | 776 | check_data (void *cls, |
772 | const struct DataMessage *dm) | 777 | const struct DataMessage *dm) |
773 | { | 778 | { |
774 | uint16_t msize = ntohs(dm->header.size) - sizeof(*dm); | 779 | uint16_t msize = ntohs (dm->header.size) - sizeof(*dm); |
775 | 780 | ||
776 | if (msize != ntohl(dm->size)) | 781 | if (msize != ntohl (dm->size)) |
777 | { | 782 | { |
778 | GNUNET_break(0); | 783 | GNUNET_break (0); |
779 | return GNUNET_SYSERR; | 784 | return GNUNET_SYSERR; |
780 | } | 785 | } |
781 | return GNUNET_OK; | 786 | return GNUNET_OK; |
782 | } | 787 | } |
783 | 788 | ||
@@ -789,44 +794,44 @@ check_data(void *cls, | |||
789 | * @param dm message received | 794 | * @param dm message received |
790 | */ | 795 | */ |
791 | static void | 796 | static void |
792 | handle_data(void *cls, | 797 | handle_data (void *cls, |
793 | const struct DataMessage *dm) | 798 | const struct DataMessage *dm) |
794 | { | 799 | { |
795 | struct GNUNET_DATASTORE_Handle *h = cls; | 800 | struct GNUNET_DATASTORE_Handle *h = cls; |
796 | struct GNUNET_DATASTORE_QueueEntry *qe; | 801 | struct GNUNET_DATASTORE_QueueEntry *qe; |
797 | struct ResultContext rc; | 802 | struct ResultContext rc; |
798 | 803 | ||
799 | qe = get_queue_head(h, | 804 | qe = get_queue_head (h, |
800 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA); | 805 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA); |
801 | if (NULL == qe) | 806 | if (NULL == qe) |
802 | return; | 807 | return; |
803 | #if INSANE_STATISTICS | 808 | #if INSANE_STATISTICS |
804 | GNUNET_STATISTICS_update(h->stats, | 809 | GNUNET_STATISTICS_update (h->stats, |
805 | gettext_noop("# Results received"), | 810 | gettext_noop ("# Results received"), |
806 | 1, | 811 | 1, |
807 | GNUNET_NO); | 812 | GNUNET_NO); |
808 | #endif | 813 | #endif |
809 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 814 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
810 | "Received result %llu with type %u and size %u with key %s\n", | 815 | "Received result %llu with type %u and size %u with key %s\n", |
811 | (unsigned long long)GNUNET_ntohll(dm->uid), | 816 | (unsigned long long) GNUNET_ntohll (dm->uid), |
812 | ntohl(dm->type), | 817 | ntohl (dm->type), |
813 | ntohl(dm->size), | 818 | ntohl (dm->size), |
814 | GNUNET_h2s(&dm->key)); | 819 | GNUNET_h2s (&dm->key)); |
815 | rc = qe->qc.rc; | 820 | rc = qe->qc.rc; |
816 | free_queue_entry(qe); | 821 | free_queue_entry (qe); |
817 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | 822 | h->retry_time = GNUNET_TIME_UNIT_ZERO; |
818 | process_queue(h); | 823 | process_queue (h); |
819 | if (NULL != rc.proc) | 824 | if (NULL != rc.proc) |
820 | rc.proc(rc.proc_cls, | 825 | rc.proc (rc.proc_cls, |
821 | &dm->key, | 826 | &dm->key, |
822 | ntohl(dm->size), | 827 | ntohl (dm->size), |
823 | &dm[1], | 828 | &dm[1], |
824 | ntohl(dm->type), | 829 | ntohl (dm->type), |
825 | ntohl(dm->priority), | 830 | ntohl (dm->priority), |
826 | ntohl(dm->anonymity), | 831 | ntohl (dm->anonymity), |
827 | ntohl(dm->replication), | 832 | ntohl (dm->replication), |
828 | GNUNET_TIME_absolute_ntoh(dm->expiration), | 833 | GNUNET_TIME_absolute_ntoh (dm->expiration), |
829 | GNUNET_ntohll(dm->uid)); | 834 | GNUNET_ntohll (dm->uid)); |
830 | } | 835 | } |
831 | 836 | ||
832 | 837 | ||
@@ -838,37 +843,37 @@ handle_data(void *cls, | |||
838 | * @param msg message received | 843 | * @param msg message received |
839 | */ | 844 | */ |
840 | static void | 845 | static void |
841 | handle_data_end(void *cls, | 846 | handle_data_end (void *cls, |
842 | const struct GNUNET_MessageHeader *msg) | 847 | const struct GNUNET_MessageHeader *msg) |
843 | { | 848 | { |
844 | struct GNUNET_DATASTORE_Handle *h = cls; | 849 | struct GNUNET_DATASTORE_Handle *h = cls; |
845 | struct GNUNET_DATASTORE_QueueEntry *qe; | 850 | struct GNUNET_DATASTORE_QueueEntry *qe; |
846 | struct ResultContext rc; | 851 | struct ResultContext rc; |
847 | 852 | ||
848 | qe = get_queue_head(h, | 853 | qe = get_queue_head (h, |
849 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA); | 854 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA); |
850 | if (NULL == qe) | 855 | if (NULL == qe) |
851 | return; | 856 | return; |
852 | rc = qe->qc.rc; | 857 | rc = qe->qc.rc; |
853 | free_queue_entry(qe); | 858 | free_queue_entry (qe); |
854 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 859 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
855 | "Received end of result set, new queue size is %u\n", | 860 | "Received end of result set, new queue size is %u\n", |
856 | h->queue_size); | 861 | h->queue_size); |
857 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | 862 | h->retry_time = GNUNET_TIME_UNIT_ZERO; |
858 | h->result_count = 0; | 863 | h->result_count = 0; |
859 | process_queue(h); | 864 | process_queue (h); |
860 | /* signal end of iteration */ | 865 | /* signal end of iteration */ |
861 | if (NULL != rc.proc) | 866 | if (NULL != rc.proc) |
862 | rc.proc(rc.proc_cls, | 867 | rc.proc (rc.proc_cls, |
863 | NULL, | 868 | NULL, |
864 | 0, | 869 | 0, |
865 | NULL, | 870 | NULL, |
866 | 0, | 871 | 0, |
867 | 0, | 872 | 0, |
868 | 0, | 873 | 0, |
869 | 0, | 874 | 0, |
870 | GNUNET_TIME_UNIT_ZERO_ABS, | 875 | GNUNET_TIME_UNIT_ZERO_ABS, |
871 | 0); | 876 | 0); |
872 | } | 877 | } |
873 | 878 | ||
874 | 879 | ||
@@ -878,42 +883,43 @@ handle_data_end(void *cls, | |||
878 | * @param cls the `struct GNUNET_DATASTORE_Handle` | 883 | * @param cls the `struct GNUNET_DATASTORE_Handle` |
879 | */ | 884 | */ |
880 | static void | 885 | static void |
881 | try_reconnect(void *cls) | 886 | try_reconnect (void *cls) |
882 | { | 887 | { |
883 | struct GNUNET_DATASTORE_Handle *h = cls; | 888 | struct GNUNET_DATASTORE_Handle *h = cls; |
884 | struct GNUNET_MQ_MessageHandler handlers[] = { | 889 | struct GNUNET_MQ_MessageHandler handlers[] = { |
885 | GNUNET_MQ_hd_var_size(status, | 890 | GNUNET_MQ_hd_var_size (status, |
886 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | 891 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, |
887 | struct StatusMessage, | 892 | struct StatusMessage, |
888 | h), | 893 | h), |
889 | GNUNET_MQ_hd_var_size(data, | 894 | GNUNET_MQ_hd_var_size (data, |
890 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, | 895 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, |
891 | struct DataMessage, | 896 | struct DataMessage, |
892 | h), | 897 | h), |
893 | GNUNET_MQ_hd_fixed_size(data_end, | 898 | GNUNET_MQ_hd_fixed_size (data_end, |
894 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END, | 899 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END, |
895 | struct GNUNET_MessageHeader, | 900 | struct GNUNET_MessageHeader, |
896 | h), | 901 | h), |
897 | GNUNET_MQ_handler_end() | 902 | GNUNET_MQ_handler_end () |
898 | }; | 903 | }; |
899 | 904 | ||
900 | h->retry_time = GNUNET_TIME_STD_BACKOFF(h->retry_time); | 905 | h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time); |
901 | h->reconnect_task = NULL; | 906 | h->reconnect_task = NULL; |
902 | GNUNET_assert(NULL == h->mq); | 907 | GNUNET_assert (NULL == h->mq); |
903 | h->mq = GNUNET_CLIENT_connect(h->cfg, | 908 | h->mq = GNUNET_CLIENT_connect (h->cfg, |
904 | "datastore", | 909 | "datastore", |
905 | handlers, | 910 | handlers, |
906 | &mq_error_handler, | 911 | &mq_error_handler, |
907 | h); | 912 | h); |
908 | if (NULL == h->mq) | 913 | if (NULL == h->mq) |
909 | return; | 914 | return; |
910 | GNUNET_STATISTICS_update(h->stats, | 915 | GNUNET_STATISTICS_update (h->stats, |
911 | gettext_noop("# datastore connections (re)created"), | 916 | gettext_noop ( |
912 | 1, | 917 | "# datastore connections (re)created"), |
913 | GNUNET_NO); | 918 | 1, |
914 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 919 | GNUNET_NO); |
915 | "Reconnected to DATASTORE\n"); | 920 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
916 | process_queue(h); | 921 | "Reconnected to DATASTORE\n"); |
922 | process_queue (h); | ||
917 | } | 923 | } |
918 | 924 | ||
919 | 925 | ||
@@ -926,10 +932,10 @@ try_reconnect(void *cls) | |||
926 | * @param emsg error message | 932 | * @param emsg error message |
927 | */ | 933 | */ |
928 | static void | 934 | static void |
929 | drop_status_cont(void *cls, | 935 | drop_status_cont (void *cls, |
930 | int32_t result, | 936 | int32_t result, |
931 | struct GNUNET_TIME_Absolute min_expiration, | 937 | struct GNUNET_TIME_Absolute min_expiration, |
932 | const char *emsg) | 938 | const char *emsg) |
933 | { | 939 | { |
934 | /* do nothing */ | 940 | /* do nothing */ |
935 | } | 941 | } |
@@ -961,20 +967,20 @@ drop_status_cont(void *cls, | |||
961 | * (or rather, will already have been invoked) | 967 | * (or rather, will already have been invoked) |
962 | */ | 968 | */ |
963 | struct GNUNET_DATASTORE_QueueEntry * | 969 | struct GNUNET_DATASTORE_QueueEntry * |
964 | GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h, | 970 | GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, |
965 | uint32_t rid, | 971 | uint32_t rid, |
966 | const struct GNUNET_HashCode *key, | 972 | const struct GNUNET_HashCode *key, |
967 | size_t size, | 973 | size_t size, |
968 | const void *data, | 974 | const void *data, |
969 | enum GNUNET_BLOCK_Type type, | 975 | enum GNUNET_BLOCK_Type type, |
970 | uint32_t priority, | 976 | uint32_t priority, |
971 | uint32_t anonymity, | 977 | uint32_t anonymity, |
972 | uint32_t replication, | 978 | uint32_t replication, |
973 | struct GNUNET_TIME_Absolute expiration, | 979 | struct GNUNET_TIME_Absolute expiration, |
974 | unsigned int queue_priority, | 980 | unsigned int queue_priority, |
975 | unsigned int max_queue_size, | 981 | unsigned int max_queue_size, |
976 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 982 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
977 | void *cont_cls) | 983 | void *cont_cls) |
978 | { | 984 | { |
979 | struct GNUNET_DATASTORE_QueueEntry *qe; | 985 | struct GNUNET_DATASTORE_QueueEntry *qe; |
980 | struct GNUNET_MQ_Envelope *env; | 986 | struct GNUNET_MQ_Envelope *env; |
@@ -982,50 +988,51 @@ GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h, | |||
982 | union QueueContext qc; | 988 | union QueueContext qc; |
983 | 989 | ||
984 | if (size + sizeof(*dm) >= GNUNET_MAX_MESSAGE_SIZE) | 990 | if (size + sizeof(*dm) >= GNUNET_MAX_MESSAGE_SIZE) |
985 | { | 991 | { |
986 | GNUNET_break(0); | 992 | GNUNET_break (0); |
987 | return NULL; | 993 | return NULL; |
988 | } | 994 | } |
989 | 995 | ||
990 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 996 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
991 | "Asked to put %u bytes of data under key `%s' for %s\n", | 997 | "Asked to put %u bytes of data under key `%s' for %s\n", |
992 | size, | 998 | size, |
993 | GNUNET_h2s(key), | 999 | GNUNET_h2s (key), |
994 | GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining(expiration), | 1000 | GNUNET_STRINGS_relative_time_to_string ( |
995 | GNUNET_YES)); | 1001 | GNUNET_TIME_absolute_get_remaining (expiration), |
996 | env = GNUNET_MQ_msg_extra(dm, | 1002 | GNUNET_YES)); |
997 | size, | 1003 | env = GNUNET_MQ_msg_extra (dm, |
998 | GNUNET_MESSAGE_TYPE_DATASTORE_PUT); | 1004 | size, |
999 | dm->rid = htonl(rid); | 1005 | GNUNET_MESSAGE_TYPE_DATASTORE_PUT); |
1000 | dm->size = htonl((uint32_t)size); | 1006 | dm->rid = htonl (rid); |
1001 | dm->type = htonl(type); | 1007 | dm->size = htonl ((uint32_t) size); |
1002 | dm->priority = htonl(priority); | 1008 | dm->type = htonl (type); |
1003 | dm->anonymity = htonl(anonymity); | 1009 | dm->priority = htonl (priority); |
1004 | dm->replication = htonl(replication); | 1010 | dm->anonymity = htonl (anonymity); |
1005 | dm->expiration = GNUNET_TIME_absolute_hton(expiration); | 1011 | dm->replication = htonl (replication); |
1012 | dm->expiration = GNUNET_TIME_absolute_hton (expiration); | ||
1006 | dm->key = *key; | 1013 | dm->key = *key; |
1007 | GNUNET_memcpy(&dm[1], | 1014 | GNUNET_memcpy (&dm[1], |
1008 | data, | 1015 | data, |
1009 | size); | 1016 | size); |
1010 | qc.sc.cont = cont; | 1017 | qc.sc.cont = cont; |
1011 | qc.sc.cont_cls = cont_cls; | 1018 | qc.sc.cont_cls = cont_cls; |
1012 | qe = make_queue_entry(h, | 1019 | qe = make_queue_entry (h, |
1013 | env, | 1020 | env, |
1014 | queue_priority, | 1021 | queue_priority, |
1015 | max_queue_size, | 1022 | max_queue_size, |
1016 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | 1023 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, |
1017 | &qc); | 1024 | &qc); |
1018 | if (NULL == qe) | 1025 | if (NULL == qe) |
1019 | { | 1026 | { |
1020 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1027 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1021 | "Could not create queue entry for PUT\n"); | 1028 | "Could not create queue entry for PUT\n"); |
1022 | return NULL; | 1029 | return NULL; |
1023 | } | 1030 | } |
1024 | GNUNET_STATISTICS_update(h->stats, | 1031 | GNUNET_STATISTICS_update (h->stats, |
1025 | gettext_noop("# PUT requests executed"), | 1032 | gettext_noop ("# PUT requests executed"), |
1026 | 1, | 1033 | 1, |
1027 | GNUNET_NO); | 1034 | GNUNET_NO); |
1028 | process_queue(h); | 1035 | process_queue (h); |
1029 | return qe; | 1036 | return qe; |
1030 | } | 1037 | } |
1031 | 1038 | ||
@@ -1046,11 +1053,11 @@ GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h, | |||
1046 | * (or rather, will already have been invoked) | 1053 | * (or rather, will already have been invoked) |
1047 | */ | 1054 | */ |
1048 | struct GNUNET_DATASTORE_QueueEntry * | 1055 | struct GNUNET_DATASTORE_QueueEntry * |
1049 | GNUNET_DATASTORE_reserve(struct GNUNET_DATASTORE_Handle *h, | 1056 | GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, |
1050 | uint64_t amount, | 1057 | uint64_t amount, |
1051 | uint32_t entries, | 1058 | uint32_t entries, |
1052 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 1059 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
1053 | void *cont_cls) | 1060 | void *cont_cls) |
1054 | { | 1061 | { |
1055 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1062 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1056 | struct GNUNET_MQ_Envelope *env; | 1063 | struct GNUNET_MQ_Envelope *env; |
@@ -1059,34 +1066,34 @@ GNUNET_DATASTORE_reserve(struct GNUNET_DATASTORE_Handle *h, | |||
1059 | 1066 | ||
1060 | if (NULL == cont) | 1067 | if (NULL == cont) |
1061 | cont = &drop_status_cont; | 1068 | cont = &drop_status_cont; |
1062 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1069 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1063 | "Asked to reserve %llu bytes of data and %u entries\n", | 1070 | "Asked to reserve %llu bytes of data and %u entries\n", |
1064 | (unsigned long long)amount, | 1071 | (unsigned long long) amount, |
1065 | (unsigned int)entries); | 1072 | (unsigned int) entries); |
1066 | env = GNUNET_MQ_msg(rm, | 1073 | env = GNUNET_MQ_msg (rm, |
1067 | GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); | 1074 | GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); |
1068 | rm->entries = htonl(entries); | 1075 | rm->entries = htonl (entries); |
1069 | rm->amount = GNUNET_htonll(amount); | 1076 | rm->amount = GNUNET_htonll (amount); |
1070 | 1077 | ||
1071 | qc.sc.cont = cont; | 1078 | qc.sc.cont = cont; |
1072 | qc.sc.cont_cls = cont_cls; | 1079 | qc.sc.cont_cls = cont_cls; |
1073 | qe = make_queue_entry(h, | 1080 | qe = make_queue_entry (h, |
1074 | env, | 1081 | env, |
1075 | UINT_MAX, | 1082 | UINT_MAX, |
1076 | UINT_MAX, | 1083 | UINT_MAX, |
1077 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | 1084 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, |
1078 | &qc); | 1085 | &qc); |
1079 | if (NULL == qe) | 1086 | if (NULL == qe) |
1080 | { | 1087 | { |
1081 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1088 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1082 | "Could not create queue entry to reserve\n"); | 1089 | "Could not create queue entry to reserve\n"); |
1083 | return NULL; | 1090 | return NULL; |
1084 | } | 1091 | } |
1085 | GNUNET_STATISTICS_update(h->stats, | 1092 | GNUNET_STATISTICS_update (h->stats, |
1086 | gettext_noop("# RESERVE requests executed"), | 1093 | gettext_noop ("# RESERVE requests executed"), |
1087 | 1, | 1094 | 1, |
1088 | GNUNET_NO); | 1095 | GNUNET_NO); |
1089 | process_queue(h); | 1096 | process_queue (h); |
1090 | return qe; | 1097 | return qe; |
1091 | } | 1098 | } |
1092 | 1099 | ||
@@ -1112,12 +1119,12 @@ GNUNET_DATASTORE_reserve(struct GNUNET_DATASTORE_Handle *h, | |||
1112 | * (or rather, will already have been invoked) | 1119 | * (or rather, will already have been invoked) |
1113 | */ | 1120 | */ |
1114 | struct GNUNET_DATASTORE_QueueEntry * | 1121 | struct GNUNET_DATASTORE_QueueEntry * |
1115 | GNUNET_DATASTORE_release_reserve(struct GNUNET_DATASTORE_Handle *h, | 1122 | GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, |
1116 | uint32_t rid, | 1123 | uint32_t rid, |
1117 | unsigned int queue_priority, | 1124 | unsigned int queue_priority, |
1118 | unsigned int max_queue_size, | 1125 | unsigned int max_queue_size, |
1119 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 1126 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
1120 | void *cont_cls) | 1127 | void *cont_cls) |
1121 | { | 1128 | { |
1122 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1129 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1123 | struct GNUNET_MQ_Envelope *env; | 1130 | struct GNUNET_MQ_Envelope *env; |
@@ -1126,31 +1133,31 @@ GNUNET_DATASTORE_release_reserve(struct GNUNET_DATASTORE_Handle *h, | |||
1126 | 1133 | ||
1127 | if (NULL == cont) | 1134 | if (NULL == cont) |
1128 | cont = &drop_status_cont; | 1135 | cont = &drop_status_cont; |
1129 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1136 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1130 | "Asked to release reserve %d\n", | 1137 | "Asked to release reserve %d\n", |
1131 | rid); | 1138 | rid); |
1132 | env = GNUNET_MQ_msg(rrm, | 1139 | env = GNUNET_MQ_msg (rrm, |
1133 | GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); | 1140 | GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE); |
1134 | rrm->rid = htonl(rid); | 1141 | rrm->rid = htonl (rid); |
1135 | qc.sc.cont = cont; | 1142 | qc.sc.cont = cont; |
1136 | qc.sc.cont_cls = cont_cls; | 1143 | qc.sc.cont_cls = cont_cls; |
1137 | qe = make_queue_entry(h, | 1144 | qe = make_queue_entry (h, |
1138 | env, | 1145 | env, |
1139 | queue_priority, | 1146 | queue_priority, |
1140 | max_queue_size, | 1147 | max_queue_size, |
1141 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | 1148 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, |
1142 | &qc); | 1149 | &qc); |
1143 | if (NULL == qe) | 1150 | if (NULL == qe) |
1144 | { | 1151 | { |
1145 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1152 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1146 | "Could not create queue entry to release reserve\n"); | 1153 | "Could not create queue entry to release reserve\n"); |
1147 | return NULL; | 1154 | return NULL; |
1148 | } | 1155 | } |
1149 | GNUNET_STATISTICS_update(h->stats, | 1156 | GNUNET_STATISTICS_update (h->stats, |
1150 | gettext_noop | 1157 | gettext_noop |
1151 | ("# RELEASE RESERVE requests executed"), 1, | 1158 | ("# RELEASE RESERVE requests executed"), 1, |
1152 | GNUNET_NO); | 1159 | GNUNET_NO); |
1153 | process_queue(h); | 1160 | process_queue (h); |
1154 | return qe; | 1161 | return qe; |
1155 | } | 1162 | } |
1156 | 1163 | ||
@@ -1176,14 +1183,14 @@ GNUNET_DATASTORE_release_reserve(struct GNUNET_DATASTORE_Handle *h, | |||
1176 | * (or rather, will already have been invoked) | 1183 | * (or rather, will already have been invoked) |
1177 | */ | 1184 | */ |
1178 | struct GNUNET_DATASTORE_QueueEntry * | 1185 | struct GNUNET_DATASTORE_QueueEntry * |
1179 | GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h, | 1186 | GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, |
1180 | const struct GNUNET_HashCode *key, | 1187 | const struct GNUNET_HashCode *key, |
1181 | size_t size, | 1188 | size_t size, |
1182 | const void *data, | 1189 | const void *data, |
1183 | unsigned int queue_priority, | 1190 | unsigned int queue_priority, |
1184 | unsigned int max_queue_size, | 1191 | unsigned int max_queue_size, |
1185 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 1192 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
1186 | void *cont_cls) | 1193 | void *cont_cls) |
1187 | { | 1194 | { |
1188 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1195 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1189 | struct DataMessage *dm; | 1196 | struct DataMessage *dm; |
@@ -1191,45 +1198,45 @@ GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h, | |||
1191 | union QueueContext qc; | 1198 | union QueueContext qc; |
1192 | 1199 | ||
1193 | if (sizeof(*dm) + size >= GNUNET_MAX_MESSAGE_SIZE) | 1200 | if (sizeof(*dm) + size >= GNUNET_MAX_MESSAGE_SIZE) |
1194 | { | 1201 | { |
1195 | GNUNET_break(0); | 1202 | GNUNET_break (0); |
1196 | return NULL; | 1203 | return NULL; |
1197 | } | 1204 | } |
1198 | if (NULL == cont) | 1205 | if (NULL == cont) |
1199 | cont = &drop_status_cont; | 1206 | cont = &drop_status_cont; |
1200 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1207 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1201 | "Asked to remove %u bytes under key `%s'\n", | 1208 | "Asked to remove %u bytes under key `%s'\n", |
1202 | size, | 1209 | size, |
1203 | GNUNET_h2s(key)); | 1210 | GNUNET_h2s (key)); |
1204 | env = GNUNET_MQ_msg_extra(dm, | 1211 | env = GNUNET_MQ_msg_extra (dm, |
1205 | size, | 1212 | size, |
1206 | GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); | 1213 | GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); |
1207 | dm->size = htonl(size); | 1214 | dm->size = htonl (size); |
1208 | dm->key = *key; | 1215 | dm->key = *key; |
1209 | GNUNET_memcpy(&dm[1], | 1216 | GNUNET_memcpy (&dm[1], |
1210 | data, | 1217 | data, |
1211 | size); | 1218 | size); |
1212 | 1219 | ||
1213 | qc.sc.cont = cont; | 1220 | qc.sc.cont = cont; |
1214 | qc.sc.cont_cls = cont_cls; | 1221 | qc.sc.cont_cls = cont_cls; |
1215 | 1222 | ||
1216 | qe = make_queue_entry(h, | 1223 | qe = make_queue_entry (h, |
1217 | env, | 1224 | env, |
1218 | queue_priority, | 1225 | queue_priority, |
1219 | max_queue_size, | 1226 | max_queue_size, |
1220 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, | 1227 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS, |
1221 | &qc); | 1228 | &qc); |
1222 | if (NULL == qe) | 1229 | if (NULL == qe) |
1223 | { | 1230 | { |
1224 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1231 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1225 | "Could not create queue entry for REMOVE\n"); | 1232 | "Could not create queue entry for REMOVE\n"); |
1226 | return NULL; | 1233 | return NULL; |
1227 | } | 1234 | } |
1228 | GNUNET_STATISTICS_update(h->stats, | 1235 | GNUNET_STATISTICS_update (h->stats, |
1229 | gettext_noop("# REMOVE requests executed"), | 1236 | gettext_noop ("# REMOVE requests executed"), |
1230 | 1, | 1237 | 1, |
1231 | GNUNET_NO); | 1238 | GNUNET_NO); |
1232 | process_queue(h); | 1239 | process_queue (h); |
1233 | return qe; | 1240 | return qe; |
1234 | } | 1241 | } |
1235 | 1242 | ||
@@ -1254,41 +1261,41 @@ GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h, | |||
1254 | * cancel | 1261 | * cancel |
1255 | */ | 1262 | */ |
1256 | struct GNUNET_DATASTORE_QueueEntry * | 1263 | struct GNUNET_DATASTORE_QueueEntry * |
1257 | GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h, | 1264 | GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h, |
1258 | unsigned int queue_priority, | 1265 | unsigned int queue_priority, |
1259 | unsigned int max_queue_size, | 1266 | unsigned int max_queue_size, |
1260 | GNUNET_DATASTORE_DatumProcessor proc, | 1267 | GNUNET_DATASTORE_DatumProcessor proc, |
1261 | void *proc_cls) | 1268 | void *proc_cls) |
1262 | { | 1269 | { |
1263 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1270 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1264 | struct GNUNET_MQ_Envelope *env; | 1271 | struct GNUNET_MQ_Envelope *env; |
1265 | struct GNUNET_MessageHeader *m; | 1272 | struct GNUNET_MessageHeader *m; |
1266 | union QueueContext qc; | 1273 | union QueueContext qc; |
1267 | 1274 | ||
1268 | GNUNET_assert(NULL != proc); | 1275 | GNUNET_assert (NULL != proc); |
1269 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1276 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1270 | "Asked to get replication entry\n"); | 1277 | "Asked to get replication entry\n"); |
1271 | env = GNUNET_MQ_msg(m, | 1278 | env = GNUNET_MQ_msg (m, |
1272 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); | 1279 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION); |
1273 | qc.rc.proc = proc; | 1280 | qc.rc.proc = proc; |
1274 | qc.rc.proc_cls = proc_cls; | 1281 | qc.rc.proc_cls = proc_cls; |
1275 | qe = make_queue_entry(h, | 1282 | qe = make_queue_entry (h, |
1276 | env, | 1283 | env, |
1277 | queue_priority, | 1284 | queue_priority, |
1278 | max_queue_size, | 1285 | max_queue_size, |
1279 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, | 1286 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, |
1280 | &qc); | 1287 | &qc); |
1281 | if (NULL == qe) | 1288 | if (NULL == qe) |
1282 | { | 1289 | { |
1283 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1290 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1284 | "Could not create queue entry for GET REPLICATION\n"); | 1291 | "Could not create queue entry for GET REPLICATION\n"); |
1285 | return NULL; | 1292 | return NULL; |
1286 | } | 1293 | } |
1287 | GNUNET_STATISTICS_update(h->stats, | 1294 | GNUNET_STATISTICS_update (h->stats, |
1288 | gettext_noop | 1295 | gettext_noop |
1289 | ("# GET REPLICATION requests executed"), 1, | 1296 | ("# GET REPLICATION requests executed"), 1, |
1290 | GNUNET_NO); | 1297 | GNUNET_NO); |
1291 | process_queue(h); | 1298 | process_queue (h); |
1292 | return qe; | 1299 | return qe; |
1293 | } | 1300 | } |
1294 | 1301 | ||
@@ -1310,47 +1317,47 @@ GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h, | |||
1310 | * cancel | 1317 | * cancel |
1311 | */ | 1318 | */ |
1312 | struct GNUNET_DATASTORE_QueueEntry * | 1319 | struct GNUNET_DATASTORE_QueueEntry * |
1313 | GNUNET_DATASTORE_get_zero_anonymity(struct GNUNET_DATASTORE_Handle *h, | 1320 | GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, |
1314 | uint64_t next_uid, | 1321 | uint64_t next_uid, |
1315 | unsigned int queue_priority, | 1322 | unsigned int queue_priority, |
1316 | unsigned int max_queue_size, | 1323 | unsigned int max_queue_size, |
1317 | enum GNUNET_BLOCK_Type type, | 1324 | enum GNUNET_BLOCK_Type type, |
1318 | GNUNET_DATASTORE_DatumProcessor proc, | 1325 | GNUNET_DATASTORE_DatumProcessor proc, |
1319 | void *proc_cls) | 1326 | void *proc_cls) |
1320 | { | 1327 | { |
1321 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1328 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1322 | struct GNUNET_MQ_Envelope *env; | 1329 | struct GNUNET_MQ_Envelope *env; |
1323 | struct GetZeroAnonymityMessage *m; | 1330 | struct GetZeroAnonymityMessage *m; |
1324 | union QueueContext qc; | 1331 | union QueueContext qc; |
1325 | 1332 | ||
1326 | GNUNET_assert(NULL != proc); | 1333 | GNUNET_assert (NULL != proc); |
1327 | GNUNET_assert(type != GNUNET_BLOCK_TYPE_ANY); | 1334 | GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); |
1328 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1335 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1329 | "Asked to get a zero-anonymity entry of type %d\n", | 1336 | "Asked to get a zero-anonymity entry of type %d\n", |
1330 | type); | 1337 | type); |
1331 | env = GNUNET_MQ_msg(m, | 1338 | env = GNUNET_MQ_msg (m, |
1332 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); | 1339 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY); |
1333 | m->type = htonl((uint32_t)type); | 1340 | m->type = htonl ((uint32_t) type); |
1334 | m->next_uid = GNUNET_htonll(next_uid); | 1341 | m->next_uid = GNUNET_htonll (next_uid); |
1335 | qc.rc.proc = proc; | 1342 | qc.rc.proc = proc; |
1336 | qc.rc.proc_cls = proc_cls; | 1343 | qc.rc.proc_cls = proc_cls; |
1337 | qe = make_queue_entry(h, | 1344 | qe = make_queue_entry (h, |
1338 | env, | 1345 | env, |
1339 | queue_priority, | 1346 | queue_priority, |
1340 | max_queue_size, | 1347 | max_queue_size, |
1341 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, | 1348 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, |
1342 | &qc); | 1349 | &qc); |
1343 | if (NULL == qe) | 1350 | if (NULL == qe) |
1344 | { | 1351 | { |
1345 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1352 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1346 | "Could not create queue entry for zero-anonymity procation\n"); | 1353 | "Could not create queue entry for zero-anonymity procation\n"); |
1347 | return NULL; | 1354 | return NULL; |
1348 | } | 1355 | } |
1349 | GNUNET_STATISTICS_update(h->stats, | 1356 | GNUNET_STATISTICS_update (h->stats, |
1350 | gettext_noop | 1357 | gettext_noop |
1351 | ("# GET ZERO ANONYMITY requests executed"), 1, | 1358 | ("# GET ZERO ANONYMITY requests executed"), 1, |
1352 | GNUNET_NO); | 1359 | GNUNET_NO); |
1353 | process_queue(h); | 1360 | process_queue (h); |
1354 | return qe; | 1361 | return qe; |
1355 | } | 1362 | } |
1356 | 1363 | ||
@@ -1374,15 +1381,15 @@ GNUNET_DATASTORE_get_zero_anonymity(struct GNUNET_DATASTORE_Handle *h, | |||
1374 | * cancel | 1381 | * cancel |
1375 | */ | 1382 | */ |
1376 | struct GNUNET_DATASTORE_QueueEntry * | 1383 | struct GNUNET_DATASTORE_QueueEntry * |
1377 | GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h, | 1384 | GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, |
1378 | uint64_t next_uid, | 1385 | uint64_t next_uid, |
1379 | bool random, | 1386 | bool random, |
1380 | const struct GNUNET_HashCode *key, | 1387 | const struct GNUNET_HashCode *key, |
1381 | enum GNUNET_BLOCK_Type type, | 1388 | enum GNUNET_BLOCK_Type type, |
1382 | unsigned int queue_priority, | 1389 | unsigned int queue_priority, |
1383 | unsigned int max_queue_size, | 1390 | unsigned int max_queue_size, |
1384 | GNUNET_DATASTORE_DatumProcessor proc, | 1391 | GNUNET_DATASTORE_DatumProcessor proc, |
1385 | void *proc_cls) | 1392 | void *proc_cls) |
1386 | { | 1393 | { |
1387 | struct GNUNET_DATASTORE_QueueEntry *qe; | 1394 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1388 | struct GNUNET_MQ_Envelope *env; | 1395 | struct GNUNET_MQ_Envelope *env; |
@@ -1390,50 +1397,50 @@ GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h, | |||
1390 | struct GetMessage *gm; | 1397 | struct GetMessage *gm; |
1391 | union QueueContext qc; | 1398 | union QueueContext qc; |
1392 | 1399 | ||
1393 | GNUNET_assert(NULL != proc); | 1400 | GNUNET_assert (NULL != proc); |
1394 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1401 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1395 | "Asked to look for data of type %u under key `%s'\n", | 1402 | "Asked to look for data of type %u under key `%s'\n", |
1396 | (unsigned int)type, | 1403 | (unsigned int) type, |
1397 | GNUNET_h2s(key)); | 1404 | GNUNET_h2s (key)); |
1398 | if (NULL == key) | 1405 | if (NULL == key) |
1399 | { | 1406 | { |
1400 | env = GNUNET_MQ_msg(gm, | 1407 | env = GNUNET_MQ_msg (gm, |
1401 | GNUNET_MESSAGE_TYPE_DATASTORE_GET); | 1408 | GNUNET_MESSAGE_TYPE_DATASTORE_GET); |
1402 | gm->type = htonl(type); | 1409 | gm->type = htonl (type); |
1403 | gm->next_uid = GNUNET_htonll(next_uid); | 1410 | gm->next_uid = GNUNET_htonll (next_uid); |
1404 | gm->random = random; | 1411 | gm->random = random; |
1405 | } | 1412 | } |
1406 | else | 1413 | else |
1407 | { | 1414 | { |
1408 | env = GNUNET_MQ_msg(gkm, | 1415 | env = GNUNET_MQ_msg (gkm, |
1409 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); | 1416 | GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY); |
1410 | gkm->type = htonl(type); | 1417 | gkm->type = htonl (type); |
1411 | gkm->next_uid = GNUNET_htonll(next_uid); | 1418 | gkm->next_uid = GNUNET_htonll (next_uid); |
1412 | gkm->random = random; | 1419 | gkm->random = random; |
1413 | gkm->key = *key; | 1420 | gkm->key = *key; |
1414 | } | 1421 | } |
1415 | qc.rc.proc = proc; | 1422 | qc.rc.proc = proc; |
1416 | qc.rc.proc_cls = proc_cls; | 1423 | qc.rc.proc_cls = proc_cls; |
1417 | qe = make_queue_entry(h, | 1424 | qe = make_queue_entry (h, |
1418 | env, | 1425 | env, |
1419 | queue_priority, | 1426 | queue_priority, |
1420 | max_queue_size, | 1427 | max_queue_size, |
1421 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, | 1428 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA, |
1422 | &qc); | 1429 | &qc); |
1423 | if (NULL == qe) | 1430 | if (NULL == qe) |
1424 | { | 1431 | { |
1425 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1432 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1426 | "Could not queue request for `%s'\n", | 1433 | "Could not queue request for `%s'\n", |
1427 | GNUNET_h2s(key)); | 1434 | GNUNET_h2s (key)); |
1428 | return NULL; | 1435 | return NULL; |
1429 | } | 1436 | } |
1430 | #if INSANE_STATISTICS | 1437 | #if INSANE_STATISTICS |
1431 | GNUNET_STATISTICS_update(h->stats, | 1438 | GNUNET_STATISTICS_update (h->stats, |
1432 | gettext_noop("# GET requests executed"), | 1439 | gettext_noop ("# GET requests executed"), |
1433 | 1, | 1440 | 1, |
1434 | GNUNET_NO); | 1441 | GNUNET_NO); |
1435 | #endif | 1442 | #endif |
1436 | process_queue(h); | 1443 | process_queue (h); |
1437 | return qe; | 1444 | return qe; |
1438 | } | 1445 | } |
1439 | 1446 | ||
@@ -1445,23 +1452,23 @@ GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h, | |||
1445 | * @param qe operation to cancel | 1452 | * @param qe operation to cancel |
1446 | */ | 1453 | */ |
1447 | void | 1454 | void |
1448 | GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe) | 1455 | GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) |
1449 | { | 1456 | { |
1450 | struct GNUNET_DATASTORE_Handle *h = qe->h; | 1457 | struct GNUNET_DATASTORE_Handle *h = qe->h; |
1451 | 1458 | ||
1452 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1459 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1453 | "Pending DATASTORE request %p cancelled (%d, %d)\n", | 1460 | "Pending DATASTORE request %p cancelled (%d, %d)\n", |
1454 | qe, | 1461 | qe, |
1455 | NULL == qe->env, | 1462 | NULL == qe->env, |
1456 | h->queue_head == qe); | 1463 | h->queue_head == qe); |
1457 | if (NULL == qe->env) | 1464 | if (NULL == qe->env) |
1458 | { | 1465 | { |
1459 | free_queue_entry(qe); | 1466 | free_queue_entry (qe); |
1460 | h->skip_next_messages++; | 1467 | h->skip_next_messages++; |
1461 | return; | 1468 | return; |
1462 | } | 1469 | } |
1463 | free_queue_entry(qe); | 1470 | free_queue_entry (qe); |
1464 | process_queue(h); | 1471 | process_queue (h); |
1465 | } | 1472 | } |
1466 | 1473 | ||
1467 | 1474 | ||