summaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cadet_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_cadet_client.c')
-rw-r--r--src/fs/gnunet-service-fs_cadet_client.c243
1 files changed, 105 insertions, 138 deletions
diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c
index 9f654849d..81afe0411 100644
--- a/src/fs/gnunet-service-fs_cadet_client.c
+++ b/src/fs/gnunet-service-fs_cadet_client.c
@@ -41,7 +41,8 @@
/**
* After how long do we reset connections without replies?
*/
-#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
+#define CLIENT_RETRY_TIMEOUT \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
/**
@@ -143,7 +144,6 @@ struct CadetHandle
* callback from the cadet API).
*/
struct GNUNET_SCHEDULER_Task *reset_task;
-
};
@@ -181,20 +181,15 @@ transmit_pending (void *cls);
* @return #GNUNET_YES (continue to iterate)
*/
static int
-move_to_pending (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
+move_to_pending (void *cls, const struct GNUNET_HashCode *key, void *value)
{
struct CadetHandle *mh = cls;
struct GSF_CadetRequest *sr = value;
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
- key,
- value));
- GNUNET_CONTAINER_DLL_insert (mh->pending_head,
- mh->pending_tail,
- sr);
+ GNUNET_assert (
+ GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, key, value));
+ GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr);
sr->was_transmitted = GNUNET_NO;
return GNUNET_YES;
}
@@ -209,8 +204,7 @@ move_to_pending (void *cls,
* @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
*/
static int
-check_reply (void *cls,
- const struct CadetReplyMessage *srm)
+check_reply (void *cls, const struct CadetReplyMessage *srm)
{
/* We check later... */
return GNUNET_OK;
@@ -237,8 +231,7 @@ reset_cadet_async (struct CadetHandle *mh)
{
if (NULL != mh->reset_task)
GNUNET_SCHEDULER_cancel (mh->reset_task);
- mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task,
- mh);
+ mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task, mh);
}
@@ -285,18 +278,16 @@ struct HandleReplyClosure
* @return #GNUNET_YES (continue to iterate)
*/
static int
-process_reply (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
+process_reply (void *cls, const struct GNUNET_HashCode *key, void *value)
{
struct HandleReplyClosure *hrc = cls;
struct GSF_CadetRequest *sr = value;
sr->proc (sr->proc_cls,
- hrc->type,
- hrc->expiration,
- hrc->data_size,
- hrc->data);
+ hrc->type,
+ hrc->expiration,
+ hrc->data_size,
+ hrc->data);
sr->proc = NULL;
GSF_cadet_query_cancel (sr);
hrc->found = GNUNET_YES;
@@ -315,9 +306,7 @@ process_reply (void *cls,
* @return #GNUNET_YES (continue to iterate)
*/
static int
-free_waiting_entry (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
+free_waiting_entry (void *cls, const struct GNUNET_HashCode *key, void *value)
{
struct GSF_CadetRequest *sr = value;
@@ -334,8 +323,7 @@ free_waiting_entry (void *cls,
* @param srm the actual message
*/
static void
-handle_reply (void *cls,
- const struct CadetReplyMessage *srm)
+handle_reply (void *cls, const struct CadetReplyMessage *srm)
{
struct CadetHandle *mh = cls;
struct HandleReplyClosure hrc;
@@ -346,43 +334,43 @@ handle_reply (void *cls,
msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage);
type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
if (GNUNET_YES !=
- GNUNET_BLOCK_get_key (GSF_block_ctx,
- type,
- &srm[1],
- msize,
- &query))
+ GNUNET_BLOCK_get_key (GSF_block_ctx, type, &srm[1], msize, &query))
{
GNUNET_break_op (0);
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
- type,
- msize,
- GNUNET_i2s (&mh->target));
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_WARNING,
+ "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
+ type,
+ msize,
+ GNUNET_i2s (&mh->target));
reset_cadet_async (mh);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received reply `%s' via cadet from peer %s\n",
- GNUNET_h2s (&query),
- GNUNET_i2s (&mh->target));
+ "Received reply `%s' via cadet from peer %s\n",
+ GNUNET_h2s (&query),
+ GNUNET_i2s (&mh->target));
GNUNET_CADET_receive_done (mh->channel);
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies received via cadet"), 1,
- GNUNET_NO);
+ gettext_noop ("# replies received via cadet"),
+ 1,
+ GNUNET_NO);
hrc.data = &srm[1];
hrc.data_size = msize;
hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
hrc.type = type;
hrc.found = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
- &query,
- &process_reply,
- &hrc);
+ &query,
+ &process_reply,
+ &hrc);
if (GNUNET_NO == hrc.found)
{
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies received via cadet dropped"), 1,
- GNUNET_NO);
+ gettext_noop (
+ "# replies received via cadet dropped"),
+ 1,
+ GNUNET_NO);
}
}
@@ -395,8 +383,7 @@ handle_reply (void *cls,
* @param channel channel of the disconnecting client
*/
static void
-disconnect_cb (void *cls,
- const struct GNUNET_CADET_Channel *channel)
+disconnect_cb (void *cls, const struct GNUNET_CADET_Channel *channel)
{
struct CadetHandle *mh = cls;
struct GSF_CadetRequest *sr;
@@ -411,19 +398,17 @@ disconnect_cb (void *cls,
callback from `free_waiting_entry()` happens to re-issue
the request, we don't immediately have it back in the
`waiting_map`. */
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_remove (cadet_map,
- &mh->target,
- mh));
+ GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove (cadet_map,
+ &mh->target,
+ mh));
GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
- &free_waiting_entry,
- mh);
+ &free_waiting_entry,
+ mh);
if (NULL != mh->timeout_task)
GNUNET_SCHEDULER_cancel (mh->timeout_task);
if (NULL != mh->reset_task)
GNUNET_SCHEDULER_cancel (mh->reset_task);
- GNUNET_assert (0 ==
- GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
+ GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
GNUNET_free (mh);
}
@@ -467,24 +452,21 @@ static void
reset_cadet (struct CadetHandle *mh)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Resetting cadet channel to %s\n",
- GNUNET_i2s (&mh->target));
+ "Resetting cadet channel to %s\n",
+ GNUNET_i2s (&mh->target));
if (NULL != mh->channel)
{
GNUNET_CADET_channel_destroy (mh->channel);
mh->channel = NULL;
}
- GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
- &move_to_pending,
- mh);
+ GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, &move_to_pending, mh);
{
- struct GNUNET_MQ_MessageHandler handlers[] = {
- GNUNET_MQ_hd_var_size (reply,
- GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
- struct CadetReplyMessage,
- mh),
- GNUNET_MQ_handler_end ()
- };
+ struct GNUNET_MQ_MessageHandler handlers[] =
+ {GNUNET_MQ_hd_var_size (reply,
+ GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
+ struct CadetReplyMessage,
+ mh),
+ GNUNET_MQ_handler_end ()};
struct GNUNET_HashCode port;
GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
@@ -514,8 +496,8 @@ cadet_timeout (void *cls)
struct GNUNET_CADET_Channel *tun;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout on cadet channel to %s\n",
- GNUNET_i2s (&mh->target));
+ "Timeout on cadet channel to %s\n",
+ GNUNET_i2s (&mh->target));
mh->timeout_task = NULL;
tun = mh->channel;
mh->channel = NULL;
@@ -553,33 +535,28 @@ transmit_pending (void *cls)
struct GNUNET_MQ_Envelope *env;
struct CadetQueryMessage *sqm;
- if ( (0 != GNUNET_MQ_get_length (mq)) ||
- (NULL == (sr = mh->pending_head)) )
+ if ((0 != GNUNET_MQ_get_length (mq)) || (NULL == (sr = mh->pending_head)))
return;
- GNUNET_CONTAINER_DLL_remove (mh->pending_head,
- mh->pending_tail,
- sr);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
- &sr->query,
- sr,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+ GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr);
+ GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
+ mh->waiting_map,
+ &sr->query,
+ sr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
sr->was_transmitted = GNUNET_YES;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending query for %s via cadet to %s\n",
- GNUNET_h2s (&sr->query),
- GNUNET_i2s (&mh->target));
- env = GNUNET_MQ_msg (sqm,
- GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
- GNUNET_MQ_env_set_options(env,
- GNUNET_MQ_PREF_RELIABLE);
+ "Sending query for %s via cadet to %s\n",
+ GNUNET_h2s (&sr->query),
+ GNUNET_i2s (&mh->target));
+ env = GNUNET_MQ_msg (sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
+ GNUNET_MQ_env_set_options (env,
+ GNUNET_MQ_PREF_GOODPUT |
+ GNUNET_MQ_PREF_CORK_ALLOWED |
+ GNUNET_MQ_PREF_OUT_OF_ORDER);
sqm->type = htonl (sr->type);
sqm->query = sr->query;
- GNUNET_MQ_notify_sent (env,
- &transmit_pending,
- mh);
- GNUNET_MQ_send (mq,
- env);
+ GNUNET_MQ_notify_sent (env, &transmit_pending, mh);
+ GNUNET_MQ_send (mq, env);
}
@@ -593,8 +570,7 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
{
struct CadetHandle *mh;
- mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
- target);
+ mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, target);
if (NULL != mh)
{
if (NULL != mh->timeout_task)
@@ -605,27 +581,26 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
return mh;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating cadet channel to %s\n",
- GNUNET_i2s (target));
+ "Creating cadet channel to %s\n",
+ GNUNET_i2s (target));
mh = GNUNET_new (struct CadetHandle);
- mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
- &reset_cadet_task,
- mh);
+ mh->reset_task =
+ GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh);
mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
mh->target = *target;
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_put (cadet_map,
- &mh->target,
- mh,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ GNUNET_CONTAINER_multipeermap_put (
+ cadet_map,
+ &mh->target,
+ mh,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
{
- struct GNUNET_MQ_MessageHandler handlers[] = {
- GNUNET_MQ_hd_var_size (reply,
- GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
- struct CadetReplyMessage,
- mh),
- GNUNET_MQ_handler_end ()
- };
+ struct GNUNET_MQ_MessageHandler handlers[] =
+ {GNUNET_MQ_hd_var_size (reply,
+ GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
+ struct CadetReplyMessage,
+ mh),
+ GNUNET_MQ_handler_end ()};
struct GNUNET_HashCode port;
GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
@@ -664,9 +639,9 @@ GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
struct GSF_CadetRequest *sr;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Preparing to send query for %s via cadet to %s\n",
- GNUNET_h2s (query),
- GNUNET_i2s (target));
+ "Preparing to send query for %s via cadet to %s\n",
+ GNUNET_h2s (query),
+ GNUNET_i2s (target));
mh = get_cadet (target);
sr = GNUNET_new (struct GSF_CadetRequest);
sr->mh = mh;
@@ -674,9 +649,7 @@ GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
sr->proc_cls = proc_cls;
sr->type = type;
sr->query = *query;
- GNUNET_CONTAINER_DLL_insert (mh->pending_head,
- mh->pending_tail,
- sr);
+ GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr);
transmit_pending (mh);
return sr;
}
@@ -699,29 +672,24 @@ GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
if (NULL != p)
{
/* signal failure / cancellation to callback */
- p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
- GNUNET_TIME_UNIT_ZERO_ABS,
- 0, NULL);
+ p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL);
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Cancelled query for %s via cadet to %s\n",
- GNUNET_h2s (&sr->query),
- GNUNET_i2s (&sr->mh->target));
+ "Cancelled query for %s via cadet to %s\n",
+ GNUNET_h2s (&sr->query),
+ GNUNET_i2s (&sr->mh->target));
if (GNUNET_YES == sr->was_transmitted)
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
- &sr->query,
- sr));
+ GNUNET_assert (
+ GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, &sr->query, sr));
else
- GNUNET_CONTAINER_DLL_remove (mh->pending_head,
- mh->pending_tail,
- sr);
+ GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr);
GNUNET_free (sr);
- if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
- (NULL == mh->pending_head) )
+ if ((0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
+ (NULL == mh->pending_head))
mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
- &cadet_timeout,
- mh);
+ &cadet_timeout,
+ mh);
}
@@ -741,8 +709,8 @@ GSF_cadet_release_clients (void *cls,
struct CadetHandle *mh = value;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout on cadet channel to %s\n",
- GNUNET_i2s (&mh->target));
+ "Timeout on cadet channel to %s\n",
+ GNUNET_i2s (&mh->target));
if (NULL != mh->channel)
{
struct GNUNET_CADET_Channel *channel = mh->channel;
@@ -759,5 +727,4 @@ GSF_cadet_release_clients (void *cls,
}
-
/* end of gnunet-service-fs_cadet_client.c */