From a78990b412db2c0ead2da8061c4f454f068991d1 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 31 Jul 2016 21:23:23 +0000 Subject: converting FS to new MQ-based core API --- src/fs/gnunet-service-fs_pe.c | 165 ++++++++++++++---------------------------- 1 file changed, 55 insertions(+), 110 deletions(-) (limited to 'src/fs/gnunet-service-fs_pe.c') diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c index b338c1a13..098c3d180 100644 --- a/src/fs/gnunet-service-fs_pe.c +++ b/src/fs/gnunet-service-fs_pe.c @@ -188,11 +188,6 @@ struct PeerPlan */ struct GNUNET_CONTAINER_MultiHashMap *plan_map; - /** - * Current transmission request handle. - */ - struct GSF_PeerTransmitHandle *pth; - /** * Peer for which this is the plan. */ @@ -202,6 +197,12 @@ struct PeerPlan * Current task for executing the plan. */ struct GNUNET_SCHEDULER_Task *task; + + /** + * Current message under transmission for the plan. + */ + struct GNUNET_MQ_Envelope *env; + }; @@ -240,15 +241,6 @@ get_rp_key (struct GSF_RequestPlan *rp) } -/** - * Figure out when and how to transmit to the given peer. - * - * @param cls the `struct GSF_ConnectedPeer` for transmission - */ -static void -schedule_peer_transmission (void *cls); - - /** * Insert the given request plan into the heap with the appropriate weight. * @@ -329,21 +321,22 @@ plan (struct PeerPlan *pp, rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Earliest (re)transmission for `%s' in %us\n", - GNUNET_h2s (&prd->query), rp->transmission_counter); + GNUNET_h2s (&prd->query), + rp->transmission_counter); GNUNET_assert (rp->hn == NULL); if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us) - rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority); + rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, + rp, + rp->priority); else rp->hn = - GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp, + GNUNET_CONTAINER_heap_insert (pp->delay_heap, + rp, rp->earliest_transmission.abs_value_us); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map, get_rp_key (rp), rp)); - if (NULL != pp->task) - GNUNET_SCHEDULER_cancel (pp->task); - pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); #undef N } @@ -382,75 +375,6 @@ get_latest (const struct GSF_RequestPlan *rp) } -/** - * Function called to get a message for transmission. - * - * @param cls closure - * @param buf_size number of bytes available in @a buf - * @param buf where to copy the message, NULL on error (peer disconnect) - * @return number of bytes copied to @a buf, can be 0 (without indicating an error) - */ -static size_t -transmit_message_callback (void *cls, - size_t buf_size, - void *buf) -{ - struct PeerPlan *pp = cls; - struct GSF_RequestPlan *rp; - size_t msize; - - pp->pth = NULL; - if (NULL == buf) - { - /* failed, try again... */ - if (NULL != pp->task) - GNUNET_SCHEDULER_cancel (pp->task); - - pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop - ("# transmission failed (core has no bandwidth)"), - 1, GNUNET_NO); - return 0; - } - rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); - if (NULL == rp) - { - if (NULL != pp->task) - GNUNET_SCHEDULER_cancel (pp->task); - pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); - return 0; - } - msize = GSF_pending_request_get_message_ (get_latest (rp), - buf_size, - buf); - if (msize > buf_size) - { - if (NULL != pp->task) - GNUNET_SCHEDULER_cancel (pp->task); - /* buffer to small (message changed), try again */ - pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp); - return 0; - } - /* remove from root, add again elsewhere... */ - GNUNET_assert (rp == - GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)); - rp->hn = NULL; - rp->last_transmission = GNUNET_TIME_absolute_get (); - rp->transmission_counter++; - total_delay++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Executing plan %p executed %u times, planning retransmission\n", - rp, rp->transmission_counter); - plan (pp, rp); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# query messages sent to other peers"), - 1, - GNUNET_NO); - return msize; -} - - /** * Figure out when and how to transmit to the given peer. * @@ -461,14 +385,16 @@ schedule_peer_transmission (void *cls) { struct PeerPlan *pp = cls; struct GSF_RequestPlan *rp; - size_t msize; struct GNUNET_TIME_Relative delay; - pp->task = NULL; - if (NULL != pp->pth) + if (NULL != pp->task) + { + pp->task = NULL; + } + else { - GSF_peer_transmit_cancel_ (pp->pth); - pp->pth = NULL; + GNUNET_assert (NULL != pp->env); + pp->env = NULL; } /* move ready requests to priority queue */ while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && @@ -508,23 +434,40 @@ schedule_peer_transmission (void *cls) return; } #if INSANE_STATISTICS - GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"), - 1, GNUNET_NO); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# query plans executed"), + 1, + GNUNET_NO); #endif /* process from priority heap */ - rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); + rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp); GNUNET_assert (NULL != rp); - msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL); - pp->pth = - GSF_peer_transmit_ (pp->cp, GNUNET_YES, - rp->priority, - GNUNET_TIME_UNIT_FOREVER_REL, - msize, - &transmit_message_callback, pp); - GNUNET_assert (NULL != pp->pth); + rp->hn = NULL; + rp->last_transmission = GNUNET_TIME_absolute_get (); + rp->transmission_counter++; + total_delay++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Executing plan %p executed %u times, planning retransmission\n", + rp, + rp->transmission_counter); + GNUNET_assert (NULL == pp->env); + pp->env = GSF_pending_request_get_message_ (get_latest (rp)); + GNUNET_MQ_notify_sent (pp->env, + &schedule_peer_transmission, + pp); + GSF_peer_transmit_ (pp->cp, + GNUNET_YES, + rp->priority, + pp->env); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# query messages sent to other peers"), + 1, + GNUNET_NO); + plan (pp, + rp); } @@ -646,6 +589,8 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp, id, pp, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, + pp); } mpc.merged = GNUNET_NO; mpc.pr = pr; @@ -710,16 +655,16 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (plans, id, pp)); - if (NULL != pp->pth) - { - GSF_peer_transmit_cancel_ (pp->pth); - pp->pth = NULL; - } if (NULL != pp->task) { GNUNET_SCHEDULER_cancel (pp->task); pp->task = NULL; } + if (NULL != pp->env) + { + GNUNET_MQ_send_cancel (pp->env); + pp->env = NULL; + } while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) { GNUNET_break (GNUNET_YES == -- cgit v1.2.3