summaryrefslogtreecommitdiff
path: root/src/fragmentation/defragmentation.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fragmentation/defragmentation.c')
-rw-r--r--src/fragmentation/defragmentation.c381
1 files changed, 193 insertions, 188 deletions
diff --git a/src/fragmentation/defragmentation.c b/src/fragmentation/defragmentation.c
index d68a98c52..5dd935d7d 100644
--- a/src/fragmentation/defragmentation.c
+++ b/src/fragmentation/defragmentation.c
@@ -29,7 +29,8 @@
29/** 29/**
30 * Timestamps for fragments. 30 * Timestamps for fragments.
31 */ 31 */
32struct FragTimes { 32struct FragTimes
33{
33 /** 34 /**
34 * The time the fragment was received. 35 * The time the fragment was received.
35 */ 36 */
@@ -48,7 +49,8 @@ struct FragTimes {
48 * handle 'stray' messages that are received 'late'. A message 49 * handle 'stray' messages that are received 'late'. A message
49 * context is ONLY discarded when the queue gets too big. 50 * context is ONLY discarded when the queue gets too big.
50 */ 51 */
51struct MessageContext { 52struct MessageContext
53{
52 /** 54 /**
53 * This is a DLL. 55 * This is a DLL.
54 */ 56 */
@@ -81,7 +83,7 @@ struct MessageContext {
81 * Task scheduled for transmitting the next ACK to the 83 * Task scheduled for transmitting the next ACK to the
82 * other peer. 84 * other peer.
83 */ 85 */
84 struct GNUNET_SCHEDULER_Task * ack_task; 86 struct GNUNET_SCHEDULER_Task *ack_task;
85 87
86 /** 88 /**
87 * When did we receive which fragment? Used to calculate 89 * When did we receive which fragment? Used to calculate
@@ -132,7 +134,8 @@ struct MessageContext {
132/** 134/**
133 * Defragmentation context (one per connection). 135 * Defragmentation context (one per connection).
134 */ 136 */
135struct GNUNET_DEFRAGMENT_Context { 137struct GNUNET_DEFRAGMENT_Context
138{
136 /** 139 /**
137 * For statistics. 140 * For statistics.
138 */ 141 */
@@ -202,15 +205,15 @@ struct GNUNET_DEFRAGMENT_Context {
202 * @return the defragmentation context 205 * @return the defragmentation context
203 */ 206 */
204struct GNUNET_DEFRAGMENT_Context * 207struct GNUNET_DEFRAGMENT_Context *
205GNUNET_DEFRAGMENT_context_create(struct GNUNET_STATISTICS_Handle *stats, 208GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
206 uint16_t mtu, unsigned int num_msgs, 209 uint16_t mtu, unsigned int num_msgs,
207 void *cls, 210 void *cls,
208 GNUNET_FRAGMENT_MessageProcessor proc, 211 GNUNET_FRAGMENT_MessageProcessor proc,
209 GNUNET_DEFRAGMENT_AckProcessor ackp) 212 GNUNET_DEFRAGMENT_AckProcessor ackp)
210{ 213{
211 struct GNUNET_DEFRAGMENT_Context *dc; 214 struct GNUNET_DEFRAGMENT_Context *dc;
212 215
213 dc = GNUNET_new(struct GNUNET_DEFRAGMENT_Context); 216 dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context);
214 dc->stats = stats; 217 dc->stats = stats;
215 dc->cls = cls; 218 dc->cls = cls;
216 dc->proc = proc; 219 dc->proc = proc;
@@ -228,23 +231,23 @@ GNUNET_DEFRAGMENT_context_create(struct GNUNET_STATISTICS_Handle *stats,
228 * @param dc defragmentation context 231 * @param dc defragmentation context
229 */ 232 */
230void 233void
231GNUNET_DEFRAGMENT_context_destroy(struct GNUNET_DEFRAGMENT_Context *dc) 234GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
232{ 235{
233 struct MessageContext *mc; 236 struct MessageContext *mc;
234 237
235 while (NULL != (mc = dc->head)) 238 while (NULL != (mc = dc->head))
239 {
240 GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
241 dc->list_size--;
242 if (NULL != mc->ack_task)
236 { 243 {
237 GNUNET_CONTAINER_DLL_remove(dc->head, dc->tail, mc); 244 GNUNET_SCHEDULER_cancel (mc->ack_task);
238 dc->list_size--; 245 mc->ack_task = NULL;
239 if (NULL != mc->ack_task)
240 {
241 GNUNET_SCHEDULER_cancel(mc->ack_task);
242 mc->ack_task = NULL;
243 }
244 GNUNET_free(mc);
245 } 246 }
246 GNUNET_assert(0 == dc->list_size); 247 GNUNET_free (mc);
247 GNUNET_free(dc); 248 }
249 GNUNET_assert (0 == dc->list_size);
250 GNUNET_free (dc);
248} 251}
249 252
250 253
@@ -254,25 +257,25 @@ GNUNET_DEFRAGMENT_context_destroy(struct GNUNET_DEFRAGMENT_Context *dc)
254 * @param cls the message context 257 * @param cls the message context
255 */ 258 */
256static void 259static void
257send_ack(void *cls) 260send_ack (void *cls)
258{ 261{
259 struct MessageContext *mc = cls; 262 struct MessageContext *mc = cls;
260 struct GNUNET_DEFRAGMENT_Context *dc = mc->dc; 263 struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
261 struct FragmentAcknowledgement fa; 264 struct FragmentAcknowledgement fa;
262 265
263 mc->ack_task = NULL; 266 mc->ack_task = NULL;
264 fa.header.size = htons(sizeof(struct FragmentAcknowledgement)); 267 fa.header.size = htons (sizeof(struct FragmentAcknowledgement));
265 fa.header.type = htons(GNUNET_MESSAGE_TYPE_FRAGMENT_ACK); 268 fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
266 fa.fragment_id = htonl(mc->fragment_id); 269 fa.fragment_id = htonl (mc->fragment_id);
267 fa.bits = GNUNET_htonll(mc->bits); 270 fa.bits = GNUNET_htonll (mc->bits);
268 GNUNET_STATISTICS_update(mc->dc->stats, 271 GNUNET_STATISTICS_update (mc->dc->stats,
269 _("# acknowledgements sent for fragment"), 272 _ ("# acknowledgements sent for fragment"),
270 1, 273 1,
271 GNUNET_NO); 274 GNUNET_NO);
272 mc->last_duplicate = GNUNET_NO; /* clear flag */ 275 mc->last_duplicate = GNUNET_NO; /* clear flag */
273 dc->ackp(dc->cls, 276 dc->ackp (dc->cls,
274 mc->fragment_id, 277 mc->fragment_id,
275 &fa.header); 278 &fa.header);
276} 279}
277 280
278 281
@@ -281,28 +284,28 @@ send_ack(void *cls)
281 * Copyright (C) 2000 Brian Gough 284 * Copyright (C) 2000 Brian Gough
282 */ 285 */
283static void 286static void
284gsl_fit_mul(const double *x, const size_t xstride, const double *y, 287gsl_fit_mul (const double *x, const size_t xstride, const double *y,
285 const size_t ystride, const size_t n, double *c1, double *cov_11, 288 const size_t ystride, const size_t n, double *c1, double *cov_11,
286 double *sumsq) 289 double *sumsq)
287{ 290{
288 double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0; 291 double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
289 292
290 size_t i; 293 size_t i;
291 294
292 for (i = 0; i < n; i++) 295 for (i = 0; i < n; i++)
293 { 296 {
294 m_x += (x[i * xstride] - m_x) / (i + 1.0); 297 m_x += (x[i * xstride] - m_x) / (i + 1.0);
295 m_y += (y[i * ystride] - m_y) / (i + 1.0); 298 m_y += (y[i * ystride] - m_y) / (i + 1.0);
296 } 299 }
297 300
298 for (i = 0; i < n; i++) 301 for (i = 0; i < n; i++)
299 { 302 {
300 const double dx = x[i * xstride] - m_x; 303 const double dx = x[i * xstride] - m_x;
301 const double dy = y[i * ystride] - m_y; 304 const double dy = y[i * ystride] - m_y;
302 305
303 m_dx2 += (dx * dx - m_dx2) / (i + 1.0); 306 m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
304 m_dxdy += (dx * dy - m_dxdy) / (i + 1.0); 307 m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
305 } 308 }
306 309
307 /* In terms of y = b x */ 310 /* In terms of y = b x */
308 311
@@ -315,13 +318,13 @@ gsl_fit_mul(const double *x, const size_t xstride, const double *y,
315 /* Compute chi^2 = \sum (y_i - b * x_i)^2 */ 318 /* Compute chi^2 = \sum (y_i - b * x_i)^2 */
316 319
317 for (i = 0; i < n; i++) 320 for (i = 0; i < n; i++)
318 { 321 {
319 const double dx = x[i * xstride] - m_x; 322 const double dx = x[i * xstride] - m_x;
320 const double dy = y[i * ystride] - m_y; 323 const double dy = y[i * ystride] - m_y;
321 const double d = (m_y - b * m_x) + dy - b * dx; 324 const double d = (m_y - b * m_x) + dy - b * dx;
322 325
323 d2 += d * d; 326 d2 += d * d;
324 } 327 }
325 328
326 s2 = d2 / (n - 1.0); /* chisq per degree of freedom */ 329 s2 = d2 / (n - 1.0); /* chisq per degree of freedom */
327 330
@@ -340,7 +343,7 @@ gsl_fit_mul(const double *x, const size_t xstride, const double *y,
340 * @return average delay between time stamps (based on least-squares fit) 343 * @return average delay between time stamps (based on least-squares fit)
341 */ 344 */
342static struct GNUNET_TIME_Relative 345static struct GNUNET_TIME_Relative
343estimate_latency(struct MessageContext *mc) 346estimate_latency (struct MessageContext *mc)
344{ 347{
345 struct FragTimes *first; 348 struct FragTimes *first;
346 size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset; 349 size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
@@ -353,15 +356,15 @@ estimate_latency(struct MessageContext *mc)
353 struct GNUNET_TIME_Relative ret; 356 struct GNUNET_TIME_Relative ret;
354 357
355 first = &mc->frag_times[mc->frag_times_start_offset]; 358 first = &mc->frag_times[mc->frag_times_start_offset];
356 GNUNET_assert(total > 1); 359 GNUNET_assert (total > 1);
357 for (i = 0; i < total; i++) 360 for (i = 0; i < total; i++)
358 { 361 {
359 x[i] = (double)i; 362 x[i] = (double) i;
360 y[i] = (double)(first[i].time.abs_value_us - first[0].time.abs_value_us); 363 y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
361 } 364 }
362 gsl_fit_mul(x, 1, y, 1, total, &c1, &cov11, &sumsq); 365 gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
363 c1 += sqrt(sumsq); /* add 1 std dev */ 366 c1 += sqrt (sumsq); /* add 1 std dev */
364 ret.rel_value_us = (uint64_t)c1; 367 ret.rel_value_us = (uint64_t) c1;
365 if (0 == ret.rel_value_us) 368 if (0 == ret.rel_value_us)
366 ret = GNUNET_TIME_UNIT_MICROSECONDS; /* always at least 1 */ 369 ret = GNUNET_TIME_UNIT_MICROSECONDS; /* always at least 1 */
367 return ret; 370 return ret;
@@ -374,7 +377,7 @@ estimate_latency(struct MessageContext *mc)
374 * @param dc defragmentation context 377 * @param dc defragmentation context
375 */ 378 */
376static void 379static void
377discard_oldest_mc(struct GNUNET_DEFRAGMENT_Context *dc) 380discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
378{ 381{
379 struct MessageContext *old; 382 struct MessageContext *old;
380 struct MessageContext *pos; 383 struct MessageContext *pos;
@@ -382,21 +385,21 @@ discard_oldest_mc(struct GNUNET_DEFRAGMENT_Context *dc)
382 old = NULL; 385 old = NULL;
383 pos = dc->head; 386 pos = dc->head;
384 while (NULL != pos) 387 while (NULL != pos)
385 { 388 {
386 if ((old == NULL) || 389 if ((old == NULL) ||
387 (old->last_update.abs_value_us > pos->last_update.abs_value_us)) 390 (old->last_update.abs_value_us > pos->last_update.abs_value_us))
388 old = pos; 391 old = pos;
389 pos = pos->next; 392 pos = pos->next;
390 } 393 }
391 GNUNET_assert(NULL != old); 394 GNUNET_assert (NULL != old);
392 GNUNET_CONTAINER_DLL_remove(dc->head, dc->tail, old); 395 GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
393 dc->list_size--; 396 dc->list_size--;
394 if (NULL != old->ack_task) 397 if (NULL != old->ack_task)
395 { 398 {
396 GNUNET_SCHEDULER_cancel(old->ack_task); 399 GNUNET_SCHEDULER_cancel (old->ack_task);
397 old->ack_task = NULL; 400 old->ack_task = NULL;
398 } 401 }
399 GNUNET_free(old); 402 GNUNET_free (old);
400} 403}
401 404
402 405
@@ -410,8 +413,8 @@ discard_oldest_mc(struct GNUNET_DEFRAGMENT_Context *dc)
410 * #GNUNET_SYSERR on error 413 * #GNUNET_SYSERR on error
411 */ 414 */
412int 415int
413GNUNET_DEFRAGMENT_process_fragment(struct GNUNET_DEFRAGMENT_Context *dc, 416GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
414 const struct GNUNET_MessageHeader *msg) 417 const struct GNUNET_MessageHeader *msg)
415{ 418{
416 struct MessageContext *mc; 419 struct MessageContext *mc;
417 const struct FragmentHeader *fh; 420 const struct FragmentHeader *fh;
@@ -429,40 +432,41 @@ GNUNET_DEFRAGMENT_process_fragment(struct GNUNET_DEFRAGMENT_Context *dc,
429 int duplicate; 432 int duplicate;
430 int last; 433 int last;
431 434
432 if (ntohs(msg->size) < sizeof(struct FragmentHeader)) 435 if (ntohs (msg->size) < sizeof(struct FragmentHeader))
433 { 436 {
434 GNUNET_break_op(0); 437 GNUNET_break_op (0);
435 return GNUNET_SYSERR; 438 return GNUNET_SYSERR;
436 } 439 }
437 if (ntohs(msg->size) > dc->mtu) 440 if (ntohs (msg->size) > dc->mtu)
438 { 441 {
439 GNUNET_break_op(0); 442 GNUNET_break_op (0);
440 return GNUNET_SYSERR; 443 return GNUNET_SYSERR;
441 } 444 }
442 fh = (const struct FragmentHeader *)msg; 445 fh = (const struct FragmentHeader *) msg;
443 msize = ntohs(fh->total_size); 446 msize = ntohs (fh->total_size);
444 if (msize < sizeof(struct GNUNET_MessageHeader)) 447 if (msize < sizeof(struct GNUNET_MessageHeader))
445 { 448 {
446 GNUNET_break_op(0); 449 GNUNET_break_op (0);
447 return GNUNET_SYSERR; 450 return GNUNET_SYSERR;
448 } 451 }
449 fid = ntohl(fh->fragment_id); 452 fid = ntohl (fh->fragment_id);
450 foff = ntohs(fh->offset); 453 foff = ntohs (fh->offset);
451 if (foff >= msize) 454 if (foff >= msize)
452 { 455 {
453 GNUNET_break_op(0); 456 GNUNET_break_op (0);
454 return GNUNET_SYSERR; 457 return GNUNET_SYSERR;
455 } 458 }
456 if (0 != (foff % (dc->mtu - sizeof(struct FragmentHeader)))) 459 if (0 != (foff % (dc->mtu - sizeof(struct FragmentHeader))))
457 { 460 {
458 GNUNET_break_op(0); 461 GNUNET_break_op (0);
459 return GNUNET_SYSERR; 462 return GNUNET_SYSERR;
460 } 463 }
461 GNUNET_STATISTICS_update(dc->stats, 464 GNUNET_STATISTICS_update (dc->stats,
462 _("# fragments received"), 465 _ ("# fragments received"),
463 1, 466 1,
464 GNUNET_NO); 467 GNUNET_NO);
465 num_fragments = (ntohs(msg->size) + dc->mtu - sizeof(struct FragmentHeader) - 1) / (dc->mtu - sizeof(struct FragmentHeader)); 468 num_fragments = (ntohs (msg->size) + dc->mtu - sizeof(struct FragmentHeader)
469 - 1) / (dc->mtu - sizeof(struct FragmentHeader));
466 last = 0; 470 last = 0;
467 for (mc = dc->head; NULL != mc; mc = mc->next) 471 for (mc = dc->head; NULL != mc; mc = mc->next)
468 if (mc->fragment_id > fid) 472 if (mc->fragment_id > fid)
@@ -472,67 +476,68 @@ GNUNET_DEFRAGMENT_process_fragment(struct GNUNET_DEFRAGMENT_Context *dc,
472 while ((NULL != mc) && (fid != mc->fragment_id)) 476 while ((NULL != mc) && (fid != mc->fragment_id))
473 mc = mc->next; 477 mc = mc->next;
474 bit = foff / (dc->mtu - sizeof(struct FragmentHeader)); 478 bit = foff / (dc->mtu - sizeof(struct FragmentHeader));
475 if (bit * (dc->mtu - sizeof(struct FragmentHeader)) + ntohs(msg->size) - 479 if (bit * (dc->mtu - sizeof(struct FragmentHeader)) + ntohs (msg->size)
476 sizeof(struct FragmentHeader) > msize) 480 - sizeof(struct FragmentHeader) > msize)
477 { 481 {
478 /* payload extends past total message size */ 482 /* payload extends past total message size */
479 GNUNET_break_op(0); 483 GNUNET_break_op (0);
480 return GNUNET_SYSERR; 484 return GNUNET_SYSERR;
481 } 485 }
482 if ((NULL != mc) && (msize != mc->total_size)) 486 if ((NULL != mc) && (msize != mc->total_size))
483 { 487 {
484 /* inconsistent message size */ 488 /* inconsistent message size */
485 GNUNET_break_op(0); 489 GNUNET_break_op (0);
486 return GNUNET_SYSERR; 490 return GNUNET_SYSERR;
487 } 491 }
488 now = GNUNET_TIME_absolute_get(); 492 now = GNUNET_TIME_absolute_get ();
489 if (NULL == mc) 493 if (NULL == mc)
490 { 494 {
491 mc = GNUNET_malloc(sizeof(struct MessageContext) + msize); 495 mc = GNUNET_malloc (sizeof(struct MessageContext) + msize);
492 mc->msg = (const struct GNUNET_MessageHeader *)&mc[1]; 496 mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
493 mc->dc = dc; 497 mc->dc = dc;
494 mc->total_size = msize; 498 mc->total_size = msize;
495 mc->fragment_id = fid; 499 mc->fragment_id = fid;
496 mc->last_update = now; 500 mc->last_update = now;
497 n = (msize + dc->mtu - sizeof(struct FragmentHeader) - 1) / (dc->mtu - 501 n = (msize + dc->mtu - sizeof(struct FragmentHeader) - 1) / (dc->mtu
498 sizeof(struct 502 - sizeof(struct
499 FragmentHeader)); 503 FragmentHeader));
500 if (n == 64) 504 if (n == 64)
501 mc->bits = UINT64_MAX; /* set all 64 bit */ 505 mc->bits = UINT64_MAX; /* set all 64 bit */
502 else 506 else
503 mc->bits = (1LLU << n) - 1; /* set lowest 'bits' bit */ 507 mc->bits = (1LLU << n) - 1; /* set lowest 'bits' bit */
504 if (dc->list_size >= dc->num_msgs) 508 if (dc->list_size >= dc->num_msgs)
505 discard_oldest_mc(dc); 509 discard_oldest_mc (dc);
506 GNUNET_CONTAINER_DLL_insert(dc->head, 510 GNUNET_CONTAINER_DLL_insert (dc->head,
507 dc->tail, 511 dc->tail,
508 mc); 512 mc);
509 dc->list_size++; 513 dc->list_size++;
510 } 514 }
511 515
512 /* copy data to 'mc' */ 516 /* copy data to 'mc' */
513 if (0 != (mc->bits & (1LLU << bit))) 517 if (0 != (mc->bits & (1LLU << bit)))
514 { 518 {
515 mc->bits -= 1LLU << bit; 519 mc->bits -= 1LLU << bit;
516 mbuf = (char *)&mc[1]; 520 mbuf = (char *) &mc[1];
517 GNUNET_memcpy(&mbuf[bit * (dc->mtu - sizeof(struct FragmentHeader))], &fh[1], 521 GNUNET_memcpy (&mbuf[bit * (dc->mtu - sizeof(struct FragmentHeader))],
518 ntohs(msg->size) - sizeof(struct FragmentHeader)); 522 &fh[1],
519 mc->last_update = now; 523 ntohs (msg->size) - sizeof(struct FragmentHeader));
520 if (bit < mc->last_bit) 524 mc->last_update = now;
521 mc->frag_times_start_offset = mc->frag_times_write_offset; 525 if (bit < mc->last_bit)
522 mc->last_bit = bit; 526 mc->frag_times_start_offset = mc->frag_times_write_offset;
523 mc->frag_times[mc->frag_times_write_offset].time = now; 527 mc->last_bit = bit;
524 mc->frag_times[mc->frag_times_write_offset].bit = bit; 528 mc->frag_times[mc->frag_times_write_offset].time = now;
525 mc->frag_times_write_offset++; 529 mc->frag_times[mc->frag_times_write_offset].bit = bit;
526 duplicate = GNUNET_NO; 530 mc->frag_times_write_offset++;
527 } 531 duplicate = GNUNET_NO;
532 }
528 else 533 else
529 { 534 {
530 duplicate = GNUNET_YES; 535 duplicate = GNUNET_YES;
531 GNUNET_STATISTICS_update(dc->stats, 536 GNUNET_STATISTICS_update (dc->stats,
532 _("# duplicate fragments received"), 537 _ ("# duplicate fragments received"),
533 1, 538 1,
534 GNUNET_NO); 539 GNUNET_NO);
535 } 540 }
536 541
537 /* count number of missing fragments after the current one */ 542 /* count number of missing fragments after the current one */
538 bc = 0; 543 bc = 0;
@@ -545,39 +550,39 @@ GNUNET_DEFRAGMENT_process_fragment(struct GNUNET_DEFRAGMENT_Context *dc,
545 /* notify about complete message */ 550 /* notify about complete message */
546 if ((GNUNET_NO == duplicate) && 551 if ((GNUNET_NO == duplicate) &&
547 (0 == mc->bits)) 552 (0 == mc->bits))
548 { 553 {
549 GNUNET_STATISTICS_update(dc->stats, 554 GNUNET_STATISTICS_update (dc->stats,
550 _("# messages defragmented"), 555 _ ("# messages defragmented"),
551 1, 556 1,
552 GNUNET_NO); 557 GNUNET_NO);
553 /* message complete, notify! */ 558 /* message complete, notify! */
554 dc->proc(dc->cls, mc->msg); 559 dc->proc (dc->cls, mc->msg);
555 } 560 }
556 /* send ACK */ 561 /* send ACK */
557 if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1) 562 if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
558 { 563 {
559 dc->latency = estimate_latency(mc); 564 dc->latency = estimate_latency (mc);
560 } 565 }
561 delay = GNUNET_TIME_relative_saturating_multiply(dc->latency, 566 delay = GNUNET_TIME_relative_saturating_multiply (dc->latency,
562 bc + 1); 567 bc + 1);
563 if ((last + fid == num_fragments) || 568 if ((last + fid == num_fragments) ||
564 (0 == mc->bits) || 569 (0 == mc->bits) ||
565 (GNUNET_YES == duplicate)) 570 (GNUNET_YES == duplicate))
566 { 571 {
567 /* message complete or duplicate or last missing fragment in 572 /* message complete or duplicate or last missing fragment in
568 linear sequence; ACK now! */ 573 linear sequence; ACK now! */
569 delay = GNUNET_TIME_UNIT_ZERO; 574 delay = GNUNET_TIME_UNIT_ZERO;
570 } 575 }
571 if (NULL != mc->ack_task) 576 if (NULL != mc->ack_task)
572 GNUNET_SCHEDULER_cancel(mc->ack_task); 577 GNUNET_SCHEDULER_cancel (mc->ack_task);
573 mc->ack_task = GNUNET_SCHEDULER_add_delayed(delay, 578 mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
574 &send_ack, 579 &send_ack,
575 mc); 580 mc);
576 if (GNUNET_YES == duplicate) 581 if (GNUNET_YES == duplicate)
577 { 582 {
578 mc->last_duplicate = GNUNET_YES; 583 mc->last_duplicate = GNUNET_YES;
579 return GNUNET_NO; 584 return GNUNET_NO;
580 } 585 }
581 return GNUNET_YES; 586 return GNUNET_YES;
582} 587}
583 588