From 0ad72d359ab3ea5f232f4a8a0eb04700e8c84b49 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 15 Feb 2011 14:01:44 +0000 Subject: stuff --- src/fs/gnunet-service-fs_cp.c | 136 ++++++++++++++++++++++++------------------ 1 file changed, 78 insertions(+), 58 deletions(-) (limited to 'src/fs/gnunet-service-fs_cp.c') diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index f9a642199..d88598be7 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -32,6 +32,11 @@ */ #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) +/** + * After how long do we discard a reply? + */ +#define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2) + /** * Handle to cancel a transmission request. @@ -444,6 +449,43 @@ GSF_handle_p2p_migration_stop_ (void *cls, } +/** + * Copy reply and free put message. + * + * @param cls the 'struct PutMessage' + * @param buf_size number of bytes available in buf + * @param buf where to copy the message, NULL on error (peer disconnect) + * @return number of bytes copied to 'buf', can be 0 (without indicating an error) + */ +static size_t +copy_reply (void *cls, + size_t buf_size, + void *buf) +{ + struct PutMessage *pm = cls; + + if (buf != NULL) + { + GNUNET_assert (size >= ntohs (pm->header.size)); + size = ntohs (pm->header.size); + memcpy (buf, pm, size); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# replies transmitted to other peers"), + 1, + GNUNET_NO); + } + else + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# replies dropped"), + 1, + GNUNET_NO); + } + GNUNET_free (pm); + return size; +} + + /** * Handle a reply to a pending request. Also called if a request * expires (then with data == NULL). The handler may be called @@ -455,84 +497,62 @@ GSF_handle_p2p_migration_stop_ (void *cls, * @param cls 'struct GSF_ConnectedPeer' of the peer that would * have liked an answer to the request * @param pr handle to the original pending request + * @param expiration when does 'data' expire? * @param data response data, NULL on request expiration * @param data_len number of bytes in data + * @param more GNUNET_YES if the request remains active (may call + * this function again), GNUNET_NO if the request is + * finished (client must not call GSF_pending_request_cancel_) */ static void handle_p2p_reply (void *cls, struct GSF_PendingRequest *pr, + struct GNUNET_TIME_Absolute expiration, const void *data, - size_t data_len) + size_t data_len, + int more) { struct GSF_ConnectedPeer *cp = cls; + struct GSF_PendingRequest *prd; + struct PutMessage *pm; + size_t msize; -#if SUPPORT_DELAYS - struct GNUNET_TIME_Relative art_delay; -#endif - - /* FIXME: adapt code fragments below to new API! */ + prd = GSF_pending_request_get_data_ (pr); if (NULL == data) { - /* FIXME: request expired! clean up! */ + GNUNET_assert (GNUNET_NO == more); GNUNET_STATISTICS_update (stats, gettext_noop ("# P2P searches active"), -1, GNUNET_NO); + GNUNET_break (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (cp->request_map, + &prd->query, + pr)); return; } - - /* reply will go over the network, check for cover traffic */ - if ( (prq->anonymity_level > 1) && - (cover_content_count < prq->anonymity_level - 1) ) - { - /* insufficient cover traffic, skip */ - GNUNET_STATISTICS_update (stats, - gettext_noop ("# replies suppressed due to lack of cover traffic"), - 1, - GNUNET_NO); - return GNUNET_YES; - } - if (prq->anonymity_level > 1) - cover_content_count -= prq->anonymity_level - 1; - - - cp = pr->cp; #if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting result for query `%s' to other peer (PID=%u)\n", - GNUNET_h2s (key), - (unsigned int) cp->pid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmitting result for query `%s'\n", + GNUNET_h2s (key)); #endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# replies received for other peers"), - 1, - GNUNET_NO); - msize = sizeof (struct PutMessage) + prq->size; - reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); - reply->cont = &transmit_reply_continuation; - reply->cont_cls = pr; -#if SUPPORT_DELAYS - art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - TTL_DECREMENT)); - reply->delay_until - = GNUNET_TIME_relative_to_absolute (art_delay); - GNUNET_STATISTICS_update (stats, - gettext_noop ("cummulative artificial delay introduced (ms)"), - art_delay.abs_value, - GNUNET_NO); -#endif - reply->msize = msize; - reply->priority = UINT32_MAX; /* send replies first! */ - pm = (struct PutMessage*) &reply[1]; - pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); - pm->header.size = htons (msize); - pm->type = htonl (prq->type); - pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); - memcpy (&pm[1], prq->data, prq->size); - add_to_pending_messages_for_peer (cp, reply, pr); - - + GNUNET_STATISTICS_update (stats, + gettext_noop ("# replies received for other peers"), + 1, + GNUNET_NO); + msize = sizeof (struct PutMessage) + data_len; + pm = GNUNET_malloc (sizeof (msize)); + pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); + pm->header.size = htons (msize); + pm->type = htonl (prd->type); + pm->expiration = GNUNET_TIME_absolute_hton (expiration); + memcpy (&pm[1], data, data_len); + (void) GSF_peer_transmit_ (cp, GNUNET_NO, + UINT32_MAX, + REPLY_TIMEOUT, + msize, + ©_reply, + pm); } -- cgit v1.2.3