From a7ccf828ae4f7e306ffe3e7efebc0e678615f6c5 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 17 Jul 2019 10:50:45 +0200 Subject: remove duplication MQ options, make conversation build --- src/fs/gnunet-service-fs_cadet_client.c | 243 ++++++++++++++------------------ 1 file changed, 105 insertions(+), 138 deletions(-) (limited to 'src/fs/gnunet-service-fs_cadet_client.c') diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c index 9f654849d..81afe0411 100644 --- a/src/fs/gnunet-service-fs_cadet_client.c +++ b/src/fs/gnunet-service-fs_cadet_client.c @@ -41,7 +41,8 @@ /** * After how long do we reset connections without replies? */ -#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) +#define CLIENT_RETRY_TIMEOUT \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) /** @@ -143,7 +144,6 @@ struct CadetHandle * callback from the cadet API). */ struct GNUNET_SCHEDULER_Task *reset_task; - }; @@ -181,20 +181,15 @@ transmit_pending (void *cls); * @return #GNUNET_YES (continue to iterate) */ static int -move_to_pending (void *cls, - const struct GNUNET_HashCode *key, - void *value) +move_to_pending (void *cls, const struct GNUNET_HashCode *key, void *value) { struct CadetHandle *mh = cls; struct GSF_CadetRequest *sr = value; - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, - key, - value)); - GNUNET_CONTAINER_DLL_insert (mh->pending_head, - mh->pending_tail, - sr); + GNUNET_assert ( + GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, key, value)); + GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr); sr->was_transmitted = GNUNET_NO; return GNUNET_YES; } @@ -209,8 +204,7 @@ move_to_pending (void *cls, * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing */ static int -check_reply (void *cls, - const struct CadetReplyMessage *srm) +check_reply (void *cls, const struct CadetReplyMessage *srm) { /* We check later... */ return GNUNET_OK; @@ -237,8 +231,7 @@ reset_cadet_async (struct CadetHandle *mh) { if (NULL != mh->reset_task) GNUNET_SCHEDULER_cancel (mh->reset_task); - mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task, - mh); + mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task, mh); } @@ -285,18 +278,16 @@ struct HandleReplyClosure * @return #GNUNET_YES (continue to iterate) */ static int -process_reply (void *cls, - const struct GNUNET_HashCode *key, - void *value) +process_reply (void *cls, const struct GNUNET_HashCode *key, void *value) { struct HandleReplyClosure *hrc = cls; struct GSF_CadetRequest *sr = value; sr->proc (sr->proc_cls, - hrc->type, - hrc->expiration, - hrc->data_size, - hrc->data); + hrc->type, + hrc->expiration, + hrc->data_size, + hrc->data); sr->proc = NULL; GSF_cadet_query_cancel (sr); hrc->found = GNUNET_YES; @@ -315,9 +306,7 @@ process_reply (void *cls, * @return #GNUNET_YES (continue to iterate) */ static int -free_waiting_entry (void *cls, - const struct GNUNET_HashCode *key, - void *value) +free_waiting_entry (void *cls, const struct GNUNET_HashCode *key, void *value) { struct GSF_CadetRequest *sr = value; @@ -334,8 +323,7 @@ free_waiting_entry (void *cls, * @param srm the actual message */ static void -handle_reply (void *cls, - const struct CadetReplyMessage *srm) +handle_reply (void *cls, const struct CadetReplyMessage *srm) { struct CadetHandle *mh = cls; struct HandleReplyClosure hrc; @@ -346,43 +334,43 @@ handle_reply (void *cls, msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage); type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); if (GNUNET_YES != - GNUNET_BLOCK_get_key (GSF_block_ctx, - type, - &srm[1], - msize, - &query)) + GNUNET_BLOCK_get_key (GSF_block_ctx, type, &srm[1], msize, &query)) { GNUNET_break_op (0); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Received bogus reply of type %u with %u bytes via cadet from peer %s\n", - type, - msize, - GNUNET_i2s (&mh->target)); + GNUNET_log ( + GNUNET_ERROR_TYPE_WARNING, + "Received bogus reply of type %u with %u bytes via cadet from peer %s\n", + type, + msize, + GNUNET_i2s (&mh->target)); reset_cadet_async (mh); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received reply `%s' via cadet from peer %s\n", - GNUNET_h2s (&query), - GNUNET_i2s (&mh->target)); + "Received reply `%s' via cadet from peer %s\n", + GNUNET_h2s (&query), + GNUNET_i2s (&mh->target)); GNUNET_CADET_receive_done (mh->channel); GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# replies received via cadet"), 1, - GNUNET_NO); + gettext_noop ("# replies received via cadet"), + 1, + GNUNET_NO); hrc.data = &srm[1]; hrc.data_size = msize; hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration); hrc.type = type; hrc.found = GNUNET_NO; GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map, - &query, - &process_reply, - &hrc); + &query, + &process_reply, + &hrc); if (GNUNET_NO == hrc.found) { GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# replies received via cadet dropped"), 1, - GNUNET_NO); + gettext_noop ( + "# replies received via cadet dropped"), + 1, + GNUNET_NO); } } @@ -395,8 +383,7 @@ handle_reply (void *cls, * @param channel channel of the disconnecting client */ static void -disconnect_cb (void *cls, - const struct GNUNET_CADET_Channel *channel) +disconnect_cb (void *cls, const struct GNUNET_CADET_Channel *channel) { struct CadetHandle *mh = cls; struct GSF_CadetRequest *sr; @@ -411,19 +398,17 @@ disconnect_cb (void *cls, callback from `free_waiting_entry()` happens to re-issue the request, we don't immediately have it back in the `waiting_map`. */ - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_remove (cadet_map, - &mh->target, - mh)); + GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove (cadet_map, + &mh->target, + mh)); GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, - &free_waiting_entry, - mh); + &free_waiting_entry, + mh); if (NULL != mh->timeout_task) GNUNET_SCHEDULER_cancel (mh->timeout_task); if (NULL != mh->reset_task) GNUNET_SCHEDULER_cancel (mh->reset_task); - GNUNET_assert (0 == - GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)); + GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)); GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map); GNUNET_free (mh); } @@ -467,24 +452,21 @@ static void reset_cadet (struct CadetHandle *mh) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Resetting cadet channel to %s\n", - GNUNET_i2s (&mh->target)); + "Resetting cadet channel to %s\n", + GNUNET_i2s (&mh->target)); if (NULL != mh->channel) { GNUNET_CADET_channel_destroy (mh->channel); mh->channel = NULL; } - GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, - &move_to_pending, - mh); + GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, &move_to_pending, mh); { - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_var_size (reply, - GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, - struct CadetReplyMessage, - mh), - GNUNET_MQ_handler_end () - }; + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_var_size (reply, + GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, + struct CadetReplyMessage, + mh), + GNUNET_MQ_handler_end ()}; struct GNUNET_HashCode port; GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, @@ -514,8 +496,8 @@ cadet_timeout (void *cls) struct GNUNET_CADET_Channel *tun; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Timeout on cadet channel to %s\n", - GNUNET_i2s (&mh->target)); + "Timeout on cadet channel to %s\n", + GNUNET_i2s (&mh->target)); mh->timeout_task = NULL; tun = mh->channel; mh->channel = NULL; @@ -553,33 +535,28 @@ transmit_pending (void *cls) struct GNUNET_MQ_Envelope *env; struct CadetQueryMessage *sqm; - if ( (0 != GNUNET_MQ_get_length (mq)) || - (NULL == (sr = mh->pending_head)) ) + if ((0 != GNUNET_MQ_get_length (mq)) || (NULL == (sr = mh->pending_head))) return; - GNUNET_CONTAINER_DLL_remove (mh->pending_head, - mh->pending_tail, - sr); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (mh->waiting_map, - &sr->query, - sr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); + GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr); + GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put ( + mh->waiting_map, + &sr->query, + sr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); sr->was_transmitted = GNUNET_YES; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending query for %s via cadet to %s\n", - GNUNET_h2s (&sr->query), - GNUNET_i2s (&mh->target)); - env = GNUNET_MQ_msg (sqm, - GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); - GNUNET_MQ_env_set_options(env, - GNUNET_MQ_PREF_RELIABLE); + "Sending query for %s via cadet to %s\n", + GNUNET_h2s (&sr->query), + GNUNET_i2s (&mh->target)); + env = GNUNET_MQ_msg (sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); + GNUNET_MQ_env_set_options (env, + GNUNET_MQ_PREF_GOODPUT | + GNUNET_MQ_PREF_CORK_ALLOWED | + GNUNET_MQ_PREF_OUT_OF_ORDER); sqm->type = htonl (sr->type); sqm->query = sr->query; - GNUNET_MQ_notify_sent (env, - &transmit_pending, - mh); - GNUNET_MQ_send (mq, - env); + GNUNET_MQ_notify_sent (env, &transmit_pending, mh); + GNUNET_MQ_send (mq, env); } @@ -593,8 +570,7 @@ get_cadet (const struct GNUNET_PeerIdentity *target) { struct CadetHandle *mh; - mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, - target); + mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, target); if (NULL != mh) { if (NULL != mh->timeout_task) @@ -605,27 +581,26 @@ get_cadet (const struct GNUNET_PeerIdentity *target) return mh; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating cadet channel to %s\n", - GNUNET_i2s (target)); + "Creating cadet channel to %s\n", + GNUNET_i2s (target)); mh = GNUNET_new (struct CadetHandle); - mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, - &reset_cadet_task, - mh); + mh->reset_task = + GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh); mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES); mh->target = *target; GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_put (cadet_map, - &mh->target, - mh, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_CONTAINER_multipeermap_put ( + cadet_map, + &mh->target, + mh, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); { - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_var_size (reply, - GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, - struct CadetReplyMessage, - mh), - GNUNET_MQ_handler_end () - }; + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_var_size (reply, + GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, + struct CadetReplyMessage, + mh), + GNUNET_MQ_handler_end ()}; struct GNUNET_HashCode port; GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, @@ -664,9 +639,9 @@ GSF_cadet_query (const struct GNUNET_PeerIdentity *target, struct GSF_CadetRequest *sr; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Preparing to send query for %s via cadet to %s\n", - GNUNET_h2s (query), - GNUNET_i2s (target)); + "Preparing to send query for %s via cadet to %s\n", + GNUNET_h2s (query), + GNUNET_i2s (target)); mh = get_cadet (target); sr = GNUNET_new (struct GSF_CadetRequest); sr->mh = mh; @@ -674,9 +649,7 @@ GSF_cadet_query (const struct GNUNET_PeerIdentity *target, sr->proc_cls = proc_cls; sr->type = type; sr->query = *query; - GNUNET_CONTAINER_DLL_insert (mh->pending_head, - mh->pending_tail, - sr); + GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr); transmit_pending (mh); return sr; } @@ -699,29 +672,24 @@ GSF_cadet_query_cancel (struct GSF_CadetRequest *sr) if (NULL != p) { /* signal failure / cancellation to callback */ - p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, - GNUNET_TIME_UNIT_ZERO_ABS, - 0, NULL); + p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL); } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Cancelled query for %s via cadet to %s\n", - GNUNET_h2s (&sr->query), - GNUNET_i2s (&sr->mh->target)); + "Cancelled query for %s via cadet to %s\n", + GNUNET_h2s (&sr->query), + GNUNET_i2s (&sr->mh->target)); if (GNUNET_YES == sr->was_transmitted) - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, - &sr->query, - sr)); + GNUNET_assert ( + GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, &sr->query, sr)); else - GNUNET_CONTAINER_DLL_remove (mh->pending_head, - mh->pending_tail, - sr); + GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr); GNUNET_free (sr); - if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) && - (NULL == mh->pending_head) ) + if ((0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) && + (NULL == mh->pending_head)) mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, - &cadet_timeout, - mh); + &cadet_timeout, + mh); } @@ -741,8 +709,8 @@ GSF_cadet_release_clients (void *cls, struct CadetHandle *mh = value; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Timeout on cadet channel to %s\n", - GNUNET_i2s (&mh->target)); + "Timeout on cadet channel to %s\n", + GNUNET_i2s (&mh->target)); if (NULL != mh->channel) { struct GNUNET_CADET_Channel *channel = mh->channel; @@ -759,5 +727,4 @@ GSF_cadet_release_clients (void *cls, } - /* end of gnunet-service-fs_cadet_client.c */ -- cgit v1.2.3