From b2c1c35b6344bd03d9a1b07afcd064b7be34094a Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 13 Jul 2011 11:08:01 +0000 Subject: rename --- src/fragmentation/Makefile.am | 4 +- src/fragmentation/defragmentation.c | 549 ++++++++++++++++++++++++++++++++ src/fragmentation/defragmentation_new.c | 549 -------------------------------- src/fragmentation/fragmentation.c | 374 ++++++++++++++++++++++ src/fragmentation/fragmentation_new.c | 374 ---------------------- 5 files changed, 925 insertions(+), 925 deletions(-) create mode 100644 src/fragmentation/defragmentation.c delete mode 100644 src/fragmentation/defragmentation_new.c create mode 100644 src/fragmentation/fragmentation.c delete mode 100644 src/fragmentation/fragmentation_new.c diff --git a/src/fragmentation/Makefile.am b/src/fragmentation/Makefile.am index e20d3efb5..ab7513cc1 100644 --- a/src/fragmentation/Makefile.am +++ b/src/fragmentation/Makefile.am @@ -11,8 +11,8 @@ endif lib_LTLIBRARIES = libgnunetfragmentation.la libgnunetfragmentation_la_SOURCES = \ - fragmentation_new.c \ - defragmentation_new.c + fragmentation.c \ + defragmentation.c libgnunetfragmentation_la_LIBADD = \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ $(top_builddir)/src/util/libgnunetutil.la diff --git a/src/fragmentation/defragmentation.c b/src/fragmentation/defragmentation.c new file mode 100644 index 000000000..cc42d3e75 --- /dev/null +++ b/src/fragmentation/defragmentation.c @@ -0,0 +1,549 @@ +/* + This file is part of GNUnet + (C) 2009, 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ +/** + * @file src/fragmentation/defragmentation_new.c + * @brief library to help defragment messages + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_fragmentation_lib.h" +#include "fragmentation.h" + +/** + * Timestamps for fragments. + */ +struct FragTimes +{ + /** + * The time the fragment was received. + */ + struct GNUNET_TIME_Absolute time; + + /** + * Number of the bit for the fragment (in [0,..,63]). + */ + unsigned int bit; +}; + + +/** + * Information we keep for one message that is being assembled. Note + * that we keep the context around even after the assembly is done to + * handle 'stray' messages that are received 'late'. A message + * context is ONLY discarded when the queue gets too big. + */ +struct MessageContext +{ + /** + * This is a DLL. + */ + struct MessageContext *next; + + /** + * This is a DLL. + */ + struct MessageContext *prev; + + /** + * Associated defragmentation context. + */ + struct GNUNET_DEFRAGMENT_Context *dc; + + /** + * Pointer to the assembled message, allocated at the + * end of this struct. + */ + const struct GNUNET_MessageHeader *msg; + + /** + * Last time we received any update for this message + * (least-recently updated message will be discarded + * if we hit the queue size). + */ + struct GNUNET_TIME_Absolute last_update; + + /** + * Task scheduled for transmitting the next ACK to the + * other peer. + */ + GNUNET_SCHEDULER_TaskIdentifier ack_task; + + /** + * When did we receive which fragment? Used to calculate + * the time we should send the ACK. + */ + struct FragTimes frag_times[64]; + + /** + * Which fragments have we gotten yet? bits that are 1 + * indicate missing fragments. + */ + uint64_t bits; + + /** + * Unique ID for this message. + */ + uint32_t fragment_id; + + /** + * Which 'bit' did the last fragment we received correspond to? + */ + unsigned int last_bit; + + /** + * For the current ACK round, which is the first relevant + * offset in 'frag_times'? + */ + unsigned int frag_times_start_offset; + + /** + * Which offset whould we write the next frag value into + * in the 'frag_times' array? All smaller entries are valid. + */ + unsigned int frag_times_write_offset; + + /** + * Total size of the message that we are assembling. + */ + uint16_t total_size; + +}; + + +/** + * Defragmentation context (one per connection). + */ +struct GNUNET_DEFRAGMENT_Context +{ + + /** + * For statistics. + */ + struct GNUNET_STATISTICS_Handle *stats; + + /** + * Head of list of messages we're defragmenting. + */ + struct MessageContext *head; + + /** + * Tail of list of messages we're defragmenting. + */ + struct MessageContext *tail; + + /** + * Closure for 'proc' and 'ackp'. + */ + void *cls; + + /** + * Function to call with defragmented messages. + */ + GNUNET_FRAGMENT_MessageProcessor proc; + + /** + * Function to call with acknowledgements. + */ + GNUNET_DEFRAGMENT_AckProcessor ackp; + + /** + * Running average of the latency (delay between messages) for this + * connection. + */ + struct GNUNET_TIME_Relative latency; + + /** + * num_msgs how many fragmented messages + * to we defragment at most at the same time? + */ + unsigned int num_msgs; + + /** + * Current number of messages in the 'struct MessageContext' + * DLL (smaller or equal to 'num_msgs'). + */ + unsigned int list_size; + + /** + * Maximum message size for each fragment. + */ + uint16_t mtu; +}; + + +/** + * Create a defragmentation context. + * + * @param stats statistics context + * @param mtu the maximum message size for each fragment + * @param num_msgs how many fragmented messages + * to we defragment at most at the same time? + * @param cls closure for proc and ackp + * @param proc function to call with defragmented messages + * @param ackp function to call with acknowledgements (to send + * back to the other side) + * @return the defragmentation context + */ +struct GNUNET_DEFRAGMENT_Context * +GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, + uint16_t mtu, + unsigned int num_msgs, + void *cls, + GNUNET_FRAGMENT_MessageProcessor proc, + GNUNET_DEFRAGMENT_AckProcessor ackp) +{ + struct GNUNET_DEFRAGMENT_Context *dc; + + dc = GNUNET_malloc (sizeof (struct GNUNET_DEFRAGMENT_Context)); + dc->stats = stats; + dc->cls = cls; + dc->proc = proc; + dc->ackp = ackp; + dc->num_msgs = num_msgs; + dc->mtu = mtu; + dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */ + return dc; +} + + +/** + * Destroy the given defragmentation context. + * + * @param dc defragmentation context + */ +void +GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc) +{ + struct MessageContext *mc; + + while (NULL != (mc = dc->head)) + { + GNUNET_CONTAINER_DLL_remove (dc->head, + dc->tail, + mc); + dc->list_size--; + if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task) + { + GNUNET_SCHEDULER_cancel (mc->ack_task); + mc->ack_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_free (mc); + } + GNUNET_assert (0 == dc->list_size); + GNUNET_free (dc); +} + + +/** + * Send acknowledgement to the other peer now. + * + * @param cls the message context + * @param tc the scheduler context + */ +static void +send_ack (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct MessageContext *mc = cls; + struct GNUNET_DEFRAGMENT_Context *dc = mc->dc; + struct FragmentAcknowledgement fa; + + mc->ack_task = GNUNET_SCHEDULER_NO_TASK; + fa.header.size = htons (sizeof (struct FragmentAcknowledgement)); + fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK); + fa.fragment_id = htonl (mc->fragment_id); + fa.bits = GNUNET_htonll (mc->bits); + dc->ackp (dc->cls, mc->fragment_id, &fa.header); +} + + +/** + * This function is from the GNU Scientific Library, linear/fit.c, + * (C) 2000 Brian Gough + */ +static void +gsl_fit_mul (const double *x, const size_t xstride, + const double *y, const size_t ystride, + const size_t n, + double *c1, double *cov_11, double *sumsq) +{ + double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0; + + size_t i; + + for (i = 0; i < n; i++) + { + m_x += (x[i * xstride] - m_x) / (i + 1.0); + m_y += (y[i * ystride] - m_y) / (i + 1.0); + } + + for (i = 0; i < n; i++) + { + const double dx = x[i * xstride] - m_x; + const double dy = y[i * ystride] - m_y; + + m_dx2 += (dx * dx - m_dx2) / (i + 1.0); + m_dxdy += (dx * dy - m_dxdy) / (i + 1.0); + } + + /* In terms of y = b x */ + + { + double s2 = 0, d2 = 0; + double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2); + + *c1 = b; + + /* Compute chi^2 = \sum (y_i - b * x_i)^2 */ + + for (i = 0; i < n; i++) + { + const double dx = x[i * xstride] - m_x; + const double dy = y[i * ystride] - m_y; + const double d = (m_y - b * m_x) + dy - b * dx; + d2 += d * d; + } + + s2 = d2 / (n - 1.0); /* chisq per degree of freedom */ + + *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2)); + + *sumsq = d2; + } +} + + +/** + * Estimate the latency between messages based on the most recent + * message time stamps. + * + * @param mc context with time stamps + * @return average delay between time stamps (based on least-squares fit) + */ +static struct GNUNET_TIME_Relative +estimate_latency (struct MessageContext *mc) +{ + struct FragTimes *first; + size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset; + double x[total]; + double y[total]; + size_t i; + double c1; + double cov11; + double sumsq; + struct GNUNET_TIME_Relative ret; + + first = &mc->frag_times[mc->frag_times_start_offset]; + GNUNET_assert (total > 1); + for (i=0;ihead; + while (NULL != pos) + { + if ( (old == NULL) || + (old->last_update.abs_value > pos->last_update.abs_value) ) + old = pos; + pos = pos->next; + } + GNUNET_assert (NULL != old); + GNUNET_CONTAINER_DLL_remove (dc->head, + dc->tail, + old); + dc->list_size--; + if (GNUNET_SCHEDULER_NO_TASK != old->ack_task) + GNUNET_SCHEDULER_cancel (old->ack_task); + GNUNET_free (old); + fprintf (stderr, "D"); +} + + +/** + * We have received a fragment. Process it. + * + * @param dc the context + * @param msg the message that was received + * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error + */ +int +GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, + const struct GNUNET_MessageHeader *msg) +{ + struct MessageContext *mc; + const struct FragmentHeader *fh; + uint16_t msize; + uint16_t foff; + uint32_t fid; + char *mbuf; + unsigned int bit; + struct GNUNET_TIME_Absolute now; + struct GNUNET_TIME_Relative delay; + unsigned int bc; + unsigned int b; + unsigned int n; + int duplicate; + + if (ntohs(msg->size) < sizeof (struct FragmentHeader)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (ntohs (msg->size) > dc->mtu) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + fh = (const struct FragmentHeader*) msg; + msize = ntohs (fh->total_size); + fid = ntohl (fh->fragment_id); + foff = ntohs (fh->offset); + if (foff >= msize) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader)))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + GNUNET_STATISTICS_update (dc->stats, + _("Fragments received"), + 1, + GNUNET_NO); + mc = dc->head; + while ( (NULL != mc) && + (fid != mc->fragment_id) ) + mc = mc->next; + bit = foff / (dc->mtu - sizeof (struct FragmentHeader)); + if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) + - sizeof (struct FragmentHeader) > msize) + { + /* payload extends past total message size */ + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if ( (NULL != mc) && (msize != mc->total_size) ) + { + /* inconsistent message size */ + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + now = GNUNET_TIME_absolute_get (); + if (NULL == mc) + { + mc = GNUNET_malloc (sizeof (struct MessageContext) + msize); + mc->msg = (const struct GNUNET_MessageHeader*) &mc[1]; + mc->dc = dc; + mc->total_size = msize; + mc->fragment_id = fid; + mc->last_update = now; + n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu - sizeof (struct FragmentHeader)); + if (n == 64) + mc->bits = UINT64_MAX; /* set all 64 bit */ + else + mc->bits = (1LL << n) - 1; /* set lowest 'bits' bit */ + GNUNET_CONTAINER_DLL_insert (dc->head, + dc->tail, + mc); + dc->list_size++; + if (dc->list_size > dc->num_msgs) + discard_oldest_mc (dc); + } + + /* copy data to 'mc' */ + if (0 != (mc->bits & (1LL << bit))) + { + mc->bits -= 1LL << bit; + mbuf = (char* )&mc[1]; + memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], + &fh[1], + ntohs (msg->size) - sizeof (struct FragmentHeader)); + mc->last_update = now; + if (bit < mc->last_bit) + mc->frag_times_start_offset = mc->frag_times_write_offset; + mc->last_bit = bit; + mc->frag_times[mc->frag_times_write_offset].time = now; + mc->frag_times[mc->frag_times_write_offset].bit = bit; + mc->frag_times_write_offset++; + duplicate = GNUNET_NO; + } + else + { + duplicate = GNUNET_YES; + GNUNET_STATISTICS_update (dc->stats, + _("Duplicate fragments received"), + 1, + GNUNET_NO); + } + + /* count number of missing fragments */ + bc = 0; + for (b=0;b<64;b++) + if (0 != (mc->bits & (1LL << b))) bc++; + if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1) + dc->latency = estimate_latency (mc); + delay = GNUNET_TIME_relative_multiply (dc->latency, + bc + 1); + if ( (0 == mc->bits) || (GNUNET_YES == duplicate) ) /* message complete or duplicate, ACK now! */ + delay = GNUNET_TIME_UNIT_ZERO; + if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task) + GNUNET_SCHEDULER_cancel (mc->ack_task); + mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay, + &send_ack, + mc); + if ( (duplicate == GNUNET_NO) && + (0 == mc->bits) ) + { + GNUNET_STATISTICS_update (dc->stats, + _("Messages defragmented"), + 1, + GNUNET_NO); + /* message complete, notify! */ + dc->proc (dc->cls, + mc->msg); + } + if (duplicate == GNUNET_YES) + return GNUNET_NO; + return GNUNET_YES; +} + +/* end of defragmentation_new.c */ + diff --git a/src/fragmentation/defragmentation_new.c b/src/fragmentation/defragmentation_new.c deleted file mode 100644 index cc42d3e75..000000000 --- a/src/fragmentation/defragmentation_new.c +++ /dev/null @@ -1,549 +0,0 @@ -/* - This file is part of GNUnet - (C) 2009, 2011 Christian Grothoff (and other contributing authors) - - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. - - GNUnet is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. -*/ -/** - * @file src/fragmentation/defragmentation_new.c - * @brief library to help defragment messages - * @author Christian Grothoff - */ -#include "platform.h" -#include "gnunet_fragmentation_lib.h" -#include "fragmentation.h" - -/** - * Timestamps for fragments. - */ -struct FragTimes -{ - /** - * The time the fragment was received. - */ - struct GNUNET_TIME_Absolute time; - - /** - * Number of the bit for the fragment (in [0,..,63]). - */ - unsigned int bit; -}; - - -/** - * Information we keep for one message that is being assembled. Note - * that we keep the context around even after the assembly is done to - * handle 'stray' messages that are received 'late'. A message - * context is ONLY discarded when the queue gets too big. - */ -struct MessageContext -{ - /** - * This is a DLL. - */ - struct MessageContext *next; - - /** - * This is a DLL. - */ - struct MessageContext *prev; - - /** - * Associated defragmentation context. - */ - struct GNUNET_DEFRAGMENT_Context *dc; - - /** - * Pointer to the assembled message, allocated at the - * end of this struct. - */ - const struct GNUNET_MessageHeader *msg; - - /** - * Last time we received any update for this message - * (least-recently updated message will be discarded - * if we hit the queue size). - */ - struct GNUNET_TIME_Absolute last_update; - - /** - * Task scheduled for transmitting the next ACK to the - * other peer. - */ - GNUNET_SCHEDULER_TaskIdentifier ack_task; - - /** - * When did we receive which fragment? Used to calculate - * the time we should send the ACK. - */ - struct FragTimes frag_times[64]; - - /** - * Which fragments have we gotten yet? bits that are 1 - * indicate missing fragments. - */ - uint64_t bits; - - /** - * Unique ID for this message. - */ - uint32_t fragment_id; - - /** - * Which 'bit' did the last fragment we received correspond to? - */ - unsigned int last_bit; - - /** - * For the current ACK round, which is the first relevant - * offset in 'frag_times'? - */ - unsigned int frag_times_start_offset; - - /** - * Which offset whould we write the next frag value into - * in the 'frag_times' array? All smaller entries are valid. - */ - unsigned int frag_times_write_offset; - - /** - * Total size of the message that we are assembling. - */ - uint16_t total_size; - -}; - - -/** - * Defragmentation context (one per connection). - */ -struct GNUNET_DEFRAGMENT_Context -{ - - /** - * For statistics. - */ - struct GNUNET_STATISTICS_Handle *stats; - - /** - * Head of list of messages we're defragmenting. - */ - struct MessageContext *head; - - /** - * Tail of list of messages we're defragmenting. - */ - struct MessageContext *tail; - - /** - * Closure for 'proc' and 'ackp'. - */ - void *cls; - - /** - * Function to call with defragmented messages. - */ - GNUNET_FRAGMENT_MessageProcessor proc; - - /** - * Function to call with acknowledgements. - */ - GNUNET_DEFRAGMENT_AckProcessor ackp; - - /** - * Running average of the latency (delay between messages) for this - * connection. - */ - struct GNUNET_TIME_Relative latency; - - /** - * num_msgs how many fragmented messages - * to we defragment at most at the same time? - */ - unsigned int num_msgs; - - /** - * Current number of messages in the 'struct MessageContext' - * DLL (smaller or equal to 'num_msgs'). - */ - unsigned int list_size; - - /** - * Maximum message size for each fragment. - */ - uint16_t mtu; -}; - - -/** - * Create a defragmentation context. - * - * @param stats statistics context - * @param mtu the maximum message size for each fragment - * @param num_msgs how many fragmented messages - * to we defragment at most at the same time? - * @param cls closure for proc and ackp - * @param proc function to call with defragmented messages - * @param ackp function to call with acknowledgements (to send - * back to the other side) - * @return the defragmentation context - */ -struct GNUNET_DEFRAGMENT_Context * -GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, - uint16_t mtu, - unsigned int num_msgs, - void *cls, - GNUNET_FRAGMENT_MessageProcessor proc, - GNUNET_DEFRAGMENT_AckProcessor ackp) -{ - struct GNUNET_DEFRAGMENT_Context *dc; - - dc = GNUNET_malloc (sizeof (struct GNUNET_DEFRAGMENT_Context)); - dc->stats = stats; - dc->cls = cls; - dc->proc = proc; - dc->ackp = ackp; - dc->num_msgs = num_msgs; - dc->mtu = mtu; - dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */ - return dc; -} - - -/** - * Destroy the given defragmentation context. - * - * @param dc defragmentation context - */ -void -GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc) -{ - struct MessageContext *mc; - - while (NULL != (mc = dc->head)) - { - GNUNET_CONTAINER_DLL_remove (dc->head, - dc->tail, - mc); - dc->list_size--; - if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task) - { - GNUNET_SCHEDULER_cancel (mc->ack_task); - mc->ack_task = GNUNET_SCHEDULER_NO_TASK; - } - GNUNET_free (mc); - } - GNUNET_assert (0 == dc->list_size); - GNUNET_free (dc); -} - - -/** - * Send acknowledgement to the other peer now. - * - * @param cls the message context - * @param tc the scheduler context - */ -static void -send_ack (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct MessageContext *mc = cls; - struct GNUNET_DEFRAGMENT_Context *dc = mc->dc; - struct FragmentAcknowledgement fa; - - mc->ack_task = GNUNET_SCHEDULER_NO_TASK; - fa.header.size = htons (sizeof (struct FragmentAcknowledgement)); - fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK); - fa.fragment_id = htonl (mc->fragment_id); - fa.bits = GNUNET_htonll (mc->bits); - dc->ackp (dc->cls, mc->fragment_id, &fa.header); -} - - -/** - * This function is from the GNU Scientific Library, linear/fit.c, - * (C) 2000 Brian Gough - */ -static void -gsl_fit_mul (const double *x, const size_t xstride, - const double *y, const size_t ystride, - const size_t n, - double *c1, double *cov_11, double *sumsq) -{ - double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0; - - size_t i; - - for (i = 0; i < n; i++) - { - m_x += (x[i * xstride] - m_x) / (i + 1.0); - m_y += (y[i * ystride] - m_y) / (i + 1.0); - } - - for (i = 0; i < n; i++) - { - const double dx = x[i * xstride] - m_x; - const double dy = y[i * ystride] - m_y; - - m_dx2 += (dx * dx - m_dx2) / (i + 1.0); - m_dxdy += (dx * dy - m_dxdy) / (i + 1.0); - } - - /* In terms of y = b x */ - - { - double s2 = 0, d2 = 0; - double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2); - - *c1 = b; - - /* Compute chi^2 = \sum (y_i - b * x_i)^2 */ - - for (i = 0; i < n; i++) - { - const double dx = x[i * xstride] - m_x; - const double dy = y[i * ystride] - m_y; - const double d = (m_y - b * m_x) + dy - b * dx; - d2 += d * d; - } - - s2 = d2 / (n - 1.0); /* chisq per degree of freedom */ - - *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2)); - - *sumsq = d2; - } -} - - -/** - * Estimate the latency between messages based on the most recent - * message time stamps. - * - * @param mc context with time stamps - * @return average delay between time stamps (based on least-squares fit) - */ -static struct GNUNET_TIME_Relative -estimate_latency (struct MessageContext *mc) -{ - struct FragTimes *first; - size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset; - double x[total]; - double y[total]; - size_t i; - double c1; - double cov11; - double sumsq; - struct GNUNET_TIME_Relative ret; - - first = &mc->frag_times[mc->frag_times_start_offset]; - GNUNET_assert (total > 1); - for (i=0;ihead; - while (NULL != pos) - { - if ( (old == NULL) || - (old->last_update.abs_value > pos->last_update.abs_value) ) - old = pos; - pos = pos->next; - } - GNUNET_assert (NULL != old); - GNUNET_CONTAINER_DLL_remove (dc->head, - dc->tail, - old); - dc->list_size--; - if (GNUNET_SCHEDULER_NO_TASK != old->ack_task) - GNUNET_SCHEDULER_cancel (old->ack_task); - GNUNET_free (old); - fprintf (stderr, "D"); -} - - -/** - * We have received a fragment. Process it. - * - * @param dc the context - * @param msg the message that was received - * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error - */ -int -GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, - const struct GNUNET_MessageHeader *msg) -{ - struct MessageContext *mc; - const struct FragmentHeader *fh; - uint16_t msize; - uint16_t foff; - uint32_t fid; - char *mbuf; - unsigned int bit; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Relative delay; - unsigned int bc; - unsigned int b; - unsigned int n; - int duplicate; - - if (ntohs(msg->size) < sizeof (struct FragmentHeader)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (ntohs (msg->size) > dc->mtu) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - fh = (const struct FragmentHeader*) msg; - msize = ntohs (fh->total_size); - fid = ntohl (fh->fragment_id); - foff = ntohs (fh->offset); - if (foff >= msize) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader)))) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - GNUNET_STATISTICS_update (dc->stats, - _("Fragments received"), - 1, - GNUNET_NO); - mc = dc->head; - while ( (NULL != mc) && - (fid != mc->fragment_id) ) - mc = mc->next; - bit = foff / (dc->mtu - sizeof (struct FragmentHeader)); - if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) - - sizeof (struct FragmentHeader) > msize) - { - /* payload extends past total message size */ - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if ( (NULL != mc) && (msize != mc->total_size) ) - { - /* inconsistent message size */ - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - now = GNUNET_TIME_absolute_get (); - if (NULL == mc) - { - mc = GNUNET_malloc (sizeof (struct MessageContext) + msize); - mc->msg = (const struct GNUNET_MessageHeader*) &mc[1]; - mc->dc = dc; - mc->total_size = msize; - mc->fragment_id = fid; - mc->last_update = now; - n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu - sizeof (struct FragmentHeader)); - if (n == 64) - mc->bits = UINT64_MAX; /* set all 64 bit */ - else - mc->bits = (1LL << n) - 1; /* set lowest 'bits' bit */ - GNUNET_CONTAINER_DLL_insert (dc->head, - dc->tail, - mc); - dc->list_size++; - if (dc->list_size > dc->num_msgs) - discard_oldest_mc (dc); - } - - /* copy data to 'mc' */ - if (0 != (mc->bits & (1LL << bit))) - { - mc->bits -= 1LL << bit; - mbuf = (char* )&mc[1]; - memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], - &fh[1], - ntohs (msg->size) - sizeof (struct FragmentHeader)); - mc->last_update = now; - if (bit < mc->last_bit) - mc->frag_times_start_offset = mc->frag_times_write_offset; - mc->last_bit = bit; - mc->frag_times[mc->frag_times_write_offset].time = now; - mc->frag_times[mc->frag_times_write_offset].bit = bit; - mc->frag_times_write_offset++; - duplicate = GNUNET_NO; - } - else - { - duplicate = GNUNET_YES; - GNUNET_STATISTICS_update (dc->stats, - _("Duplicate fragments received"), - 1, - GNUNET_NO); - } - - /* count number of missing fragments */ - bc = 0; - for (b=0;b<64;b++) - if (0 != (mc->bits & (1LL << b))) bc++; - if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1) - dc->latency = estimate_latency (mc); - delay = GNUNET_TIME_relative_multiply (dc->latency, - bc + 1); - if ( (0 == mc->bits) || (GNUNET_YES == duplicate) ) /* message complete or duplicate, ACK now! */ - delay = GNUNET_TIME_UNIT_ZERO; - if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task) - GNUNET_SCHEDULER_cancel (mc->ack_task); - mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay, - &send_ack, - mc); - if ( (duplicate == GNUNET_NO) && - (0 == mc->bits) ) - { - GNUNET_STATISTICS_update (dc->stats, - _("Messages defragmented"), - 1, - GNUNET_NO); - /* message complete, notify! */ - dc->proc (dc->cls, - mc->msg); - } - if (duplicate == GNUNET_YES) - return GNUNET_NO; - return GNUNET_YES; -} - -/* end of defragmentation_new.c */ - diff --git a/src/fragmentation/fragmentation.c b/src/fragmentation/fragmentation.c new file mode 100644 index 000000000..db66f5a5b --- /dev/null +++ b/src/fragmentation/fragmentation.c @@ -0,0 +1,374 @@ +/* + This file is part of GNUnet + (C) 2009, 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ +/** + * @file src/fragmentation/fragmentation_new.c + * @brief library to help fragment messages + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_fragmentation_lib.h" +#include "gnunet_protocols.h" +#include "fragmentation.h" + + +/** + * Fragmentation context. + */ +struct GNUNET_FRAGMENT_Context +{ + /** + * Statistics to use. + */ + struct GNUNET_STATISTICS_Handle *stats; + + /** + * Tracker for flow control. + */ + struct GNUNET_BANDWIDTH_Tracker *tracker; + + /** + * Current expected delay for ACKs. + */ + struct GNUNET_TIME_Relative delay; + + /** + * Time we transmitted the last message of the last round. + */ + struct GNUNET_TIME_Absolute last_round; + + /** + * Message to fragment (allocated at the end of this struct). + */ + const struct GNUNET_MessageHeader *msg; + + /** + * Function to call for transmissions. + */ + GNUNET_FRAGMENT_MessageProcessor proc; + + /** + * Closure for 'proc'. + */ + void *proc_cls; + + /** + * Bitfield, set to 1 for each unacknowledged fragment. + */ + uint64_t acks; + + /** + * Task performing work for the fragmenter. + */ + GNUNET_SCHEDULER_TaskIdentifier task; + + /** + * Our fragmentation ID. (chosen at random) + */ + uint32_t fragment_id; + + /** + * Round-robin selector for the next transmission. + */ + unsigned int next_transmission; + + /** + * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done' + */ + int8_t proc_busy; + + /** + * GNUNET_YES if we are waiting for an ACK. + */ + int8_t wack; + + /** + * Target fragment size. + */ + uint16_t mtu; + +}; + + +/** + * Transmit the next fragment to the other peer. + * + * @param cls the 'struct GNUNET_FRAGMENT_Context' + * @param tc scheduler context + */ +static void +transmit_next (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_FRAGMENT_Context *fc = cls; + char msg[fc->mtu]; + const char *mbuf; + struct FragmentHeader *fh; + struct GNUNET_TIME_Relative delay; + unsigned int bit; + size_t size; + size_t fsize; + int wrap; + + fc->task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_assert (GNUNET_NO == fc->proc_busy); + if (0 == fc->acks) + return; /* all done */ + + /* calculate delay */ + wrap = 0; + while (0 == (fc->acks & (1LL << fc->next_transmission))) + { + fc->next_transmission = (fc->next_transmission + 1) % 64; + wrap |= (fc->next_transmission == 0); + } + bit = fc->next_transmission; + size = ntohs (fc->msg->size); + if (bit == size / (fc->mtu - sizeof (struct FragmentHeader))) + fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader); + else + fsize = fc->mtu; + if (fc->tracker != NULL) + delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, + fsize); + else + delay = GNUNET_TIME_UNIT_ZERO; + if (delay.rel_value > 0) + { + fc->task = GNUNET_SCHEDULER_add_delayed (delay, + &transmit_next, + fc); + return; + } + fc->next_transmission = (fc->next_transmission + 1) % 64; + wrap |= (fc->next_transmission == 0); + + /* assemble fragmentation message */ + mbuf = (const char*) &fc[1]; + fh = (struct FragmentHeader*) msg; + fh->header.size = htons (fsize); + fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT); + fh->fragment_id = htonl (fc->fragment_id); + fh->total_size = fc->msg->size; /* already in big-endian */ + fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit); + memcpy (&fh[1], + &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], + fsize - sizeof (struct FragmentHeader)); + if (NULL != fc->tracker) + GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize); + GNUNET_STATISTICS_update (fc->stats, + _("Fragments transmitted"), + 1, GNUNET_NO); + if (0 != fc->last_round.abs_value) + GNUNET_STATISTICS_update (fc->stats, + _("Fragments retransmitted"), + 1, GNUNET_NO); + + /* select next message to calculate delay */ + bit = fc->next_transmission; + size = ntohs (fc->msg->size); + if (bit == size / (fc->mtu - sizeof (struct FragmentHeader))) + fsize = size % (fc->mtu - sizeof (struct FragmentHeader)); + else + fsize = fc->mtu; + if (NULL != fc->tracker) + delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, + fsize); + else + delay = GNUNET_TIME_UNIT_ZERO; + if (wrap) + { + /* full round transmitted wait 2x delay for ACK before going again */ + delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2), + fc->delay); + fc->last_round = GNUNET_TIME_absolute_get (); + fc->wack = GNUNET_YES; + } + fc->proc_busy = GNUNET_YES; + fc->proc (fc->proc_cls, &fh->header); +} + + +/** + * Create a fragmentation context for the given message. + * Fragments the message into fragments of size "mtu" or + * less. Calls 'proc' on each un-acknowledged fragment, + * using both the expected 'delay' between messages and + * acknowledgements and the given 'tracker' to guide the + * frequency of calls to 'proc'. + * + * @param stats statistics context + * @param mtu the maximum message size for each fragment + * @param tracker bandwidth tracker to use for flow control (can be NULL) + * @param delay expected delay between fragment transmission + * and ACK based on previous messages + * @param msg the message to fragment + * @param proc function to call for each fragment to transmit + * @param proc_cls closure for proc + * @return the fragmentation context + */ +struct GNUNET_FRAGMENT_Context * +GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, + uint16_t mtu, + struct GNUNET_BANDWIDTH_Tracker *tracker, + struct GNUNET_TIME_Relative delay, + const struct GNUNET_MessageHeader *msg, + GNUNET_FRAGMENT_MessageProcessor proc, + void *proc_cls) +{ + struct GNUNET_FRAGMENT_Context *fc; + size_t size; + uint64_t bits; + + GNUNET_STATISTICS_update (stats, + _("Messages fragmented"), + 1, GNUNET_NO); + GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader)); + size = ntohs (msg->size); + GNUNET_STATISTICS_update (stats, + _("Total size of fragmented messages"), + size, GNUNET_NO); + GNUNET_assert (size > mtu); + fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size); + fc->stats = stats; + fc->mtu = mtu; + fc->tracker = tracker; + fc->delay = delay; + fc->msg = (const struct GNUNET_MessageHeader*)&fc[1]; + fc->proc = proc; + fc->proc_cls = proc_cls; + fc->fragment_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + UINT32_MAX); + memcpy (&fc[1], msg, size); + bits = (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - sizeof (struct FragmentHeader)); + GNUNET_assert (bits <= 64); + if (bits == 64) + fc->acks = UINT64_MAX; /* set all 64 bit */ + else + fc->acks = (1LL << bits) - 1; /* set lowest 'bits' bit */ + fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, + fc); + return fc; +} + + +/** + * Continuation to call from the 'proc' function after the fragment + * has been transmitted (and hence the next fragment can now be + * given to proc). + * + * @param fc fragmentation context + */ +void +GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc) +{ + GNUNET_assert (fc->proc_busy == GNUNET_YES); + fc->proc_busy = GNUNET_NO; + GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK); + fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, + fc); +} + + +/** + * Process an acknowledgement message we got from the other + * side (to control re-transmits). + * + * @param fc fragmentation context + * @param msg acknowledgement message we received + * @return GNUNET_OK if this ack completes the work of the 'fc' + * (all fragments have been received); + * GNUNET_NO if more messages are pending + * GNUNET_SYSERR if this ack is not valid for this fc + */ +int +GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, + const struct GNUNET_MessageHeader *msg) +{ + const struct FragmentAcknowledgement *fa; + uint64_t abits; + struct GNUNET_TIME_Relative ndelay; + + if (sizeof (struct FragmentAcknowledgement) != + ntohs (msg->size)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + fa = (const struct FragmentAcknowledgement *) msg; + if (ntohl (fa->fragment_id) != fc->fragment_id) + return GNUNET_SYSERR; /* not our ACK */ + abits = GNUNET_ntohll (fa->bits); + if (GNUNET_YES == fc->wack) + { + /* normal ACK, can update running average of delay... */ + fc->wack = GNUNET_NO; + ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round); + fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4; + } + if (abits != (fc->acks & abits)) + { + /* ID collission or message reordering, count! This should be rare! */ + GNUNET_STATISTICS_update (fc->stats, + _("Bits removed from ACK"), + 1, GNUNET_NO); + } + fc->acks = abits; + if (0 != fc->acks) + { + /* more to transmit, do so right now (if tracker permits...) */ + GNUNET_SCHEDULER_cancel (fc->task); + fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, + fc); + return GNUNET_NO; + } + + /* all done */ + if (fc->task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (fc->task); + fc->task = GNUNET_SCHEDULER_NO_TASK; + } + return GNUNET_OK; +} + + +/** + * Destroy the given fragmentation context (stop calling 'proc', free + * resources). + * + * @param fc fragmentation context + * @return average delay between transmission and ACK for the + * last message, FOREVER if the message was not fully transmitted + */ +struct GNUNET_TIME_Relative +GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc) +{ + struct GNUNET_TIME_Relative ret; + + if (fc->task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (fc->task); + ret = fc->delay; + GNUNET_free (fc); + return ret; +} + + +/* end of fragmentation_new.c */ + diff --git a/src/fragmentation/fragmentation_new.c b/src/fragmentation/fragmentation_new.c deleted file mode 100644 index db66f5a5b..000000000 --- a/src/fragmentation/fragmentation_new.c +++ /dev/null @@ -1,374 +0,0 @@ -/* - This file is part of GNUnet - (C) 2009, 2011 Christian Grothoff (and other contributing authors) - - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. - - GNUnet is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. -*/ -/** - * @file src/fragmentation/fragmentation_new.c - * @brief library to help fragment messages - * @author Christian Grothoff - */ -#include "platform.h" -#include "gnunet_fragmentation_lib.h" -#include "gnunet_protocols.h" -#include "fragmentation.h" - - -/** - * Fragmentation context. - */ -struct GNUNET_FRAGMENT_Context -{ - /** - * Statistics to use. - */ - struct GNUNET_STATISTICS_Handle *stats; - - /** - * Tracker for flow control. - */ - struct GNUNET_BANDWIDTH_Tracker *tracker; - - /** - * Current expected delay for ACKs. - */ - struct GNUNET_TIME_Relative delay; - - /** - * Time we transmitted the last message of the last round. - */ - struct GNUNET_TIME_Absolute last_round; - - /** - * Message to fragment (allocated at the end of this struct). - */ - const struct GNUNET_MessageHeader *msg; - - /** - * Function to call for transmissions. - */ - GNUNET_FRAGMENT_MessageProcessor proc; - - /** - * Closure for 'proc'. - */ - void *proc_cls; - - /** - * Bitfield, set to 1 for each unacknowledged fragment. - */ - uint64_t acks; - - /** - * Task performing work for the fragmenter. - */ - GNUNET_SCHEDULER_TaskIdentifier task; - - /** - * Our fragmentation ID. (chosen at random) - */ - uint32_t fragment_id; - - /** - * Round-robin selector for the next transmission. - */ - unsigned int next_transmission; - - /** - * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done' - */ - int8_t proc_busy; - - /** - * GNUNET_YES if we are waiting for an ACK. - */ - int8_t wack; - - /** - * Target fragment size. - */ - uint16_t mtu; - -}; - - -/** - * Transmit the next fragment to the other peer. - * - * @param cls the 'struct GNUNET_FRAGMENT_Context' - * @param tc scheduler context - */ -static void -transmit_next (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_FRAGMENT_Context *fc = cls; - char msg[fc->mtu]; - const char *mbuf; - struct FragmentHeader *fh; - struct GNUNET_TIME_Relative delay; - unsigned int bit; - size_t size; - size_t fsize; - int wrap; - - fc->task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (GNUNET_NO == fc->proc_busy); - if (0 == fc->acks) - return; /* all done */ - - /* calculate delay */ - wrap = 0; - while (0 == (fc->acks & (1LL << fc->next_transmission))) - { - fc->next_transmission = (fc->next_transmission + 1) % 64; - wrap |= (fc->next_transmission == 0); - } - bit = fc->next_transmission; - size = ntohs (fc->msg->size); - if (bit == size / (fc->mtu - sizeof (struct FragmentHeader))) - fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader); - else - fsize = fc->mtu; - if (fc->tracker != NULL) - delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, - fsize); - else - delay = GNUNET_TIME_UNIT_ZERO; - if (delay.rel_value > 0) - { - fc->task = GNUNET_SCHEDULER_add_delayed (delay, - &transmit_next, - fc); - return; - } - fc->next_transmission = (fc->next_transmission + 1) % 64; - wrap |= (fc->next_transmission == 0); - - /* assemble fragmentation message */ - mbuf = (const char*) &fc[1]; - fh = (struct FragmentHeader*) msg; - fh->header.size = htons (fsize); - fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT); - fh->fragment_id = htonl (fc->fragment_id); - fh->total_size = fc->msg->size; /* already in big-endian */ - fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit); - memcpy (&fh[1], - &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], - fsize - sizeof (struct FragmentHeader)); - if (NULL != fc->tracker) - GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize); - GNUNET_STATISTICS_update (fc->stats, - _("Fragments transmitted"), - 1, GNUNET_NO); - if (0 != fc->last_round.abs_value) - GNUNET_STATISTICS_update (fc->stats, - _("Fragments retransmitted"), - 1, GNUNET_NO); - - /* select next message to calculate delay */ - bit = fc->next_transmission; - size = ntohs (fc->msg->size); - if (bit == size / (fc->mtu - sizeof (struct FragmentHeader))) - fsize = size % (fc->mtu - sizeof (struct FragmentHeader)); - else - fsize = fc->mtu; - if (NULL != fc->tracker) - delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, - fsize); - else - delay = GNUNET_TIME_UNIT_ZERO; - if (wrap) - { - /* full round transmitted wait 2x delay for ACK before going again */ - delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2), - fc->delay); - fc->last_round = GNUNET_TIME_absolute_get (); - fc->wack = GNUNET_YES; - } - fc->proc_busy = GNUNET_YES; - fc->proc (fc->proc_cls, &fh->header); -} - - -/** - * Create a fragmentation context for the given message. - * Fragments the message into fragments of size "mtu" or - * less. Calls 'proc' on each un-acknowledged fragment, - * using both the expected 'delay' between messages and - * acknowledgements and the given 'tracker' to guide the - * frequency of calls to 'proc'. - * - * @param stats statistics context - * @param mtu the maximum message size for each fragment - * @param tracker bandwidth tracker to use for flow control (can be NULL) - * @param delay expected delay between fragment transmission - * and ACK based on previous messages - * @param msg the message to fragment - * @param proc function to call for each fragment to transmit - * @param proc_cls closure for proc - * @return the fragmentation context - */ -struct GNUNET_FRAGMENT_Context * -GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, - uint16_t mtu, - struct GNUNET_BANDWIDTH_Tracker *tracker, - struct GNUNET_TIME_Relative delay, - const struct GNUNET_MessageHeader *msg, - GNUNET_FRAGMENT_MessageProcessor proc, - void *proc_cls) -{ - struct GNUNET_FRAGMENT_Context *fc; - size_t size; - uint64_t bits; - - GNUNET_STATISTICS_update (stats, - _("Messages fragmented"), - 1, GNUNET_NO); - GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader)); - size = ntohs (msg->size); - GNUNET_STATISTICS_update (stats, - _("Total size of fragmented messages"), - size, GNUNET_NO); - GNUNET_assert (size > mtu); - fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size); - fc->stats = stats; - fc->mtu = mtu; - fc->tracker = tracker; - fc->delay = delay; - fc->msg = (const struct GNUNET_MessageHeader*)&fc[1]; - fc->proc = proc; - fc->proc_cls = proc_cls; - fc->fragment_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - UINT32_MAX); - memcpy (&fc[1], msg, size); - bits = (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - sizeof (struct FragmentHeader)); - GNUNET_assert (bits <= 64); - if (bits == 64) - fc->acks = UINT64_MAX; /* set all 64 bit */ - else - fc->acks = (1LL << bits) - 1; /* set lowest 'bits' bit */ - fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, - fc); - return fc; -} - - -/** - * Continuation to call from the 'proc' function after the fragment - * has been transmitted (and hence the next fragment can now be - * given to proc). - * - * @param fc fragmentation context - */ -void -GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc) -{ - GNUNET_assert (fc->proc_busy == GNUNET_YES); - fc->proc_busy = GNUNET_NO; - GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK); - fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, - fc); -} - - -/** - * Process an acknowledgement message we got from the other - * side (to control re-transmits). - * - * @param fc fragmentation context - * @param msg acknowledgement message we received - * @return GNUNET_OK if this ack completes the work of the 'fc' - * (all fragments have been received); - * GNUNET_NO if more messages are pending - * GNUNET_SYSERR if this ack is not valid for this fc - */ -int -GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, - const struct GNUNET_MessageHeader *msg) -{ - const struct FragmentAcknowledgement *fa; - uint64_t abits; - struct GNUNET_TIME_Relative ndelay; - - if (sizeof (struct FragmentAcknowledgement) != - ntohs (msg->size)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - fa = (const struct FragmentAcknowledgement *) msg; - if (ntohl (fa->fragment_id) != fc->fragment_id) - return GNUNET_SYSERR; /* not our ACK */ - abits = GNUNET_ntohll (fa->bits); - if (GNUNET_YES == fc->wack) - { - /* normal ACK, can update running average of delay... */ - fc->wack = GNUNET_NO; - ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round); - fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4; - } - if (abits != (fc->acks & abits)) - { - /* ID collission or message reordering, count! This should be rare! */ - GNUNET_STATISTICS_update (fc->stats, - _("Bits removed from ACK"), - 1, GNUNET_NO); - } - fc->acks = abits; - if (0 != fc->acks) - { - /* more to transmit, do so right now (if tracker permits...) */ - GNUNET_SCHEDULER_cancel (fc->task); - fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, - fc); - return GNUNET_NO; - } - - /* all done */ - if (fc->task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (fc->task); - fc->task = GNUNET_SCHEDULER_NO_TASK; - } - return GNUNET_OK; -} - - -/** - * Destroy the given fragmentation context (stop calling 'proc', free - * resources). - * - * @param fc fragmentation context - * @return average delay between transmission and ACK for the - * last message, FOREVER if the message was not fully transmitted - */ -struct GNUNET_TIME_Relative -GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc) -{ - struct GNUNET_TIME_Relative ret; - - if (fc->task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (fc->task); - ret = fc->delay; - GNUNET_free (fc); - return ret; -} - - -/* end of fragmentation_new.c */ - -- cgit v1.2.3