diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-31 21:23:23 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-31 21:23:23 +0000 |
commit | a78990b412db2c0ead2da8061c4f454f068991d1 (patch) | |
tree | 2e87adae62fd1ca3cd5a9f4c69248986f78bb106 /src/fs/gnunet-service-fs_pe.c | |
parent | 406c7d2d2d126c994a1fff13470b1f96439c6f9d (diff) | |
download | gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.tar.gz gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.zip |
converting FS to new MQ-based core API
Diffstat (limited to 'src/fs/gnunet-service-fs_pe.c')
-rw-r--r-- | src/fs/gnunet-service-fs_pe.c | 165 |
1 files changed, 55 insertions, 110 deletions
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index b338c1a13..098c3d180 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c | |||
@@ -189,11 +189,6 @@ struct PeerPlan | |||
189 | struct GNUNET_CONTAINER_MultiHashMap *plan_map; | 189 | struct GNUNET_CONTAINER_MultiHashMap *plan_map; |
190 | 190 | ||
191 | /** | 191 | /** |
192 | * Current transmission request handle. | ||
193 | */ | ||
194 | struct GSF_PeerTransmitHandle *pth; | ||
195 | |||
196 | /** | ||
197 | * Peer for which this is the plan. | 192 | * Peer for which this is the plan. |
198 | */ | 193 | */ |
199 | struct GSF_ConnectedPeer *cp; | 194 | struct GSF_ConnectedPeer *cp; |
@@ -202,6 +197,12 @@ struct PeerPlan | |||
202 | * Current task for executing the plan. | 197 | * Current task for executing the plan. |
203 | */ | 198 | */ |
204 | struct GNUNET_SCHEDULER_Task *task; | 199 | struct GNUNET_SCHEDULER_Task *task; |
200 | |||
201 | /** | ||
202 | * Current message under transmission for the plan. | ||
203 | */ | ||
204 | struct GNUNET_MQ_Envelope *env; | ||
205 | |||
205 | }; | 206 | }; |
206 | 207 | ||
207 | 208 | ||
@@ -241,15 +242,6 @@ get_rp_key (struct GSF_RequestPlan *rp) | |||
241 | 242 | ||
242 | 243 | ||
243 | /** | 244 | /** |
244 | * Figure out when and how to transmit to the given peer. | ||
245 | * | ||
246 | * @param cls the `struct GSF_ConnectedPeer` for transmission | ||
247 | */ | ||
248 | static void | ||
249 | schedule_peer_transmission (void *cls); | ||
250 | |||
251 | |||
252 | /** | ||
253 | * Insert the given request plan into the heap with the appropriate weight. | 245 | * Insert the given request plan into the heap with the appropriate weight. |
254 | * | 246 | * |
255 | * @param pp associated peer's plan | 247 | * @param pp associated peer's plan |
@@ -329,21 +321,22 @@ plan (struct PeerPlan *pp, | |||
329 | rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay); | 321 | rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay); |
330 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 322 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
331 | "Earliest (re)transmission for `%s' in %us\n", | 323 | "Earliest (re)transmission for `%s' in %us\n", |
332 | GNUNET_h2s (&prd->query), rp->transmission_counter); | 324 | GNUNET_h2s (&prd->query), |
325 | rp->transmission_counter); | ||
333 | GNUNET_assert (rp->hn == NULL); | 326 | GNUNET_assert (rp->hn == NULL); |
334 | if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us) | 327 | if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us) |
335 | rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority); | 328 | rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, |
329 | rp, | ||
330 | rp->priority); | ||
336 | else | 331 | else |
337 | rp->hn = | 332 | rp->hn = |
338 | GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp, | 333 | GNUNET_CONTAINER_heap_insert (pp->delay_heap, |
334 | rp, | ||
339 | rp->earliest_transmission.abs_value_us); | 335 | rp->earliest_transmission.abs_value_us); |
340 | GNUNET_assert (GNUNET_YES == | 336 | GNUNET_assert (GNUNET_YES == |
341 | GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map, | 337 | GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map, |
342 | get_rp_key (rp), | 338 | get_rp_key (rp), |
343 | rp)); | 339 | rp)); |
344 | if (NULL != pp->task) | ||
345 | GNUNET_SCHEDULER_cancel (pp->task); | ||
346 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); | ||
347 | #undef N | 340 | #undef N |
348 | } | 341 | } |
349 | 342 | ||
@@ -383,75 +376,6 @@ get_latest (const struct GSF_RequestPlan *rp) | |||
383 | 376 | ||
384 | 377 | ||
385 | /** | 378 | /** |
386 | * Function called to get a message for transmission. | ||
387 | * | ||
388 | * @param cls closure | ||
389 | * @param buf_size number of bytes available in @a buf | ||
390 | * @param buf where to copy the message, NULL on error (peer disconnect) | ||
391 | * @return number of bytes copied to @a buf, can be 0 (without indicating an error) | ||
392 | */ | ||
393 | static size_t | ||
394 | transmit_message_callback (void *cls, | ||
395 | size_t buf_size, | ||
396 | void *buf) | ||
397 | { | ||
398 | struct PeerPlan *pp = cls; | ||
399 | struct GSF_RequestPlan *rp; | ||
400 | size_t msize; | ||
401 | |||
402 | pp->pth = NULL; | ||
403 | if (NULL == buf) | ||
404 | { | ||
405 | /* failed, try again... */ | ||
406 | if (NULL != pp->task) | ||
407 | GNUNET_SCHEDULER_cancel (pp->task); | ||
408 | |||
409 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); | ||
410 | GNUNET_STATISTICS_update (GSF_stats, | ||
411 | gettext_noop | ||
412 | ("# transmission failed (core has no bandwidth)"), | ||
413 | 1, GNUNET_NO); | ||
414 | return 0; | ||
415 | } | ||
416 | rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); | ||
417 | if (NULL == rp) | ||
418 | { | ||
419 | if (NULL != pp->task) | ||
420 | GNUNET_SCHEDULER_cancel (pp->task); | ||
421 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); | ||
422 | return 0; | ||
423 | } | ||
424 | msize = GSF_pending_request_get_message_ (get_latest (rp), | ||
425 | buf_size, | ||
426 | buf); | ||
427 | if (msize > buf_size) | ||
428 | { | ||
429 | if (NULL != pp->task) | ||
430 | GNUNET_SCHEDULER_cancel (pp->task); | ||
431 | /* buffer to small (message changed), try again */ | ||
432 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); | ||
433 | return 0; | ||
434 | } | ||
435 | /* remove from root, add again elsewhere... */ | ||
436 | GNUNET_assert (rp == | ||
437 | GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)); | ||
438 | rp->hn = NULL; | ||
439 | rp->last_transmission = GNUNET_TIME_absolute_get (); | ||
440 | rp->transmission_counter++; | ||
441 | total_delay++; | ||
442 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
443 | "Executing plan %p executed %u times, planning retransmission\n", | ||
444 | rp, rp->transmission_counter); | ||
445 | plan (pp, rp); | ||
446 | GNUNET_STATISTICS_update (GSF_stats, | ||
447 | gettext_noop ("# query messages sent to other peers"), | ||
448 | 1, | ||
449 | GNUNET_NO); | ||
450 | return msize; | ||
451 | } | ||
452 | |||
453 | |||
454 | /** | ||
455 | * Figure out when and how to transmit to the given peer. | 379 | * Figure out when and how to transmit to the given peer. |
456 | * | 380 | * |
457 | * @param cls the `struct PeerPlan` | 381 | * @param cls the `struct PeerPlan` |
@@ -461,14 +385,16 @@ schedule_peer_transmission (void *cls) | |||
461 | { | 385 | { |
462 | struct PeerPlan *pp = cls; | 386 | struct PeerPlan *pp = cls; |
463 | struct GSF_RequestPlan *rp; | 387 | struct GSF_RequestPlan *rp; |
464 | size_t msize; | ||
465 | struct GNUNET_TIME_Relative delay; | 388 | struct GNUNET_TIME_Relative delay; |
466 | 389 | ||
467 | pp->task = NULL; | 390 | if (NULL != pp->task) |
468 | if (NULL != pp->pth) | 391 | { |
392 | pp->task = NULL; | ||
393 | } | ||
394 | else | ||
469 | { | 395 | { |
470 | GSF_peer_transmit_cancel_ (pp->pth); | 396 | GNUNET_assert (NULL != pp->env); |
471 | pp->pth = NULL; | 397 | pp->env = NULL; |
472 | } | 398 | } |
473 | /* move ready requests to priority queue */ | 399 | /* move ready requests to priority queue */ |
474 | while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && | 400 | while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && |
@@ -508,23 +434,40 @@ schedule_peer_transmission (void *cls) | |||
508 | return; | 434 | return; |
509 | } | 435 | } |
510 | #if INSANE_STATISTICS | 436 | #if INSANE_STATISTICS |
511 | GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"), | 437 | GNUNET_STATISTICS_update (GSF_stats, |
512 | 1, GNUNET_NO); | 438 | gettext_noop ("# query plans executed"), |
439 | 1, | ||
440 | GNUNET_NO); | ||
513 | #endif | 441 | #endif |
514 | /* process from priority heap */ | 442 | /* process from priority heap */ |
515 | rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); | 443 | rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap); |
516 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 444 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
517 | "Executing query plan %p\n", | 445 | "Executing query plan %p\n", |
518 | rp); | 446 | rp); |
519 | GNUNET_assert (NULL != rp); | 447 | GNUNET_assert (NULL != rp); |
520 | msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL); | 448 | rp->hn = NULL; |
521 | pp->pth = | 449 | rp->last_transmission = GNUNET_TIME_absolute_get (); |
522 | GSF_peer_transmit_ (pp->cp, GNUNET_YES, | 450 | rp->transmission_counter++; |
523 | rp->priority, | 451 | total_delay++; |
524 | GNUNET_TIME_UNIT_FOREVER_REL, | 452 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
525 | msize, | 453 | "Executing plan %p executed %u times, planning retransmission\n", |
526 | &transmit_message_callback, pp); | 454 | rp, |
527 | GNUNET_assert (NULL != pp->pth); | 455 | rp->transmission_counter); |
456 | GNUNET_assert (NULL == pp->env); | ||
457 | pp->env = GSF_pending_request_get_message_ (get_latest (rp)); | ||
458 | GNUNET_MQ_notify_sent (pp->env, | ||
459 | &schedule_peer_transmission, | ||
460 | pp); | ||
461 | GSF_peer_transmit_ (pp->cp, | ||
462 | GNUNET_YES, | ||
463 | rp->priority, | ||
464 | pp->env); | ||
465 | GNUNET_STATISTICS_update (GSF_stats, | ||
466 | gettext_noop ("# query messages sent to other peers"), | ||
467 | 1, | ||
468 | GNUNET_NO); | ||
469 | plan (pp, | ||
470 | rp); | ||
528 | } | 471 | } |
529 | 472 | ||
530 | 473 | ||
@@ -646,6 +589,8 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, | |||
646 | id, | 589 | id, |
647 | pp, | 590 | pp, |
648 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | 591 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
592 | pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, | ||
593 | pp); | ||
649 | } | 594 | } |
650 | mpc.merged = GNUNET_NO; | 595 | mpc.merged = GNUNET_NO; |
651 | mpc.pr = pr; | 596 | mpc.pr = pr; |
@@ -710,16 +655,16 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) | |||
710 | GNUNET_assert (GNUNET_YES == | 655 | GNUNET_assert (GNUNET_YES == |
711 | GNUNET_CONTAINER_multipeermap_remove (plans, id, | 656 | GNUNET_CONTAINER_multipeermap_remove (plans, id, |
712 | pp)); | 657 | pp)); |
713 | if (NULL != pp->pth) | ||
714 | { | ||
715 | GSF_peer_transmit_cancel_ (pp->pth); | ||
716 | pp->pth = NULL; | ||
717 | } | ||
718 | if (NULL != pp->task) | 658 | if (NULL != pp->task) |
719 | { | 659 | { |
720 | GNUNET_SCHEDULER_cancel (pp->task); | 660 | GNUNET_SCHEDULER_cancel (pp->task); |
721 | pp->task = NULL; | 661 | pp->task = NULL; |
722 | } | 662 | } |
663 | if (NULL != pp->env) | ||
664 | { | ||
665 | GNUNET_MQ_send_cancel (pp->env); | ||
666 | pp->env = NULL; | ||
667 | } | ||
723 | while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) | 668 | while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) |
724 | { | 669 | { |
725 | GNUNET_break (GNUNET_YES == | 670 | GNUNET_break (GNUNET_YES == |