aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport_api.c')
-rw-r--r--src/transport/transport_api.c860
1 files changed, 408 insertions, 452 deletions
diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c
index 4583cf876..2cdaacad6 100644
--- a/src/transport/transport_api.c
+++ b/src/transport/transport_api.c
@@ -255,7 +255,7 @@ struct GNUNET_TRANSPORT_Handle
255 * specify when we could next send a message to the respective peer. 255 * specify when we could next send a message to the respective peer.
256 * Excludes control messages (which can always go out immediately). 256 * Excludes control messages (which can always go out immediately).
257 * Maps time stamps to 'struct Neighbour' entries. 257 * Maps time stamps to 'struct Neighbour' entries.
258 */ 258 */
259 struct GNUNET_CONTAINER_Heap *ready_heap; 259 struct GNUNET_CONTAINER_Heap *ready_heap;
260 260
261 /** 261 /**
@@ -295,8 +295,7 @@ struct GNUNET_TRANSPORT_Handle
295 * 295 *
296 * @param h transport service to schedule a transmission for 296 * @param h transport service to schedule a transmission for
297 */ 297 */
298static void 298static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
299schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
300 299
301 300
302/** 301/**
@@ -320,7 +319,7 @@ static struct Neighbour *
320neighbour_find (struct GNUNET_TRANSPORT_Handle *h, 319neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
321 const struct GNUNET_PeerIdentity *peer) 320 const struct GNUNET_PeerIdentity *peer)
322{ 321{
323 return GNUNET_CONTAINER_multihashmap_get(h->neighbours, &peer->hashPubKey); 322 return GNUNET_CONTAINER_multihashmap_get (h->neighbours, &peer->hashPubKey);
324} 323}
325 324
326 325
@@ -337,21 +336,20 @@ neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
337 336
338#if DEBUG_TRANSPORT 337#if DEBUG_TRANSPORT
339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 338 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
340 "Creating entry for neighbour `%4s'.\n", 339 "Creating entry for neighbour `%4s'.\n", GNUNET_i2s (pid));
341 GNUNET_i2s (pid));
342#endif 340#endif
343 n = GNUNET_malloc (sizeof (struct Neighbour)); 341 n = GNUNET_malloc (sizeof (struct Neighbour));
344 n->id = *pid; 342 n->id = *pid;
345 n->h = h; 343 n->h = h;
346 n->is_ready = GNUNET_YES; 344 n->is_ready = GNUNET_YES;
347 GNUNET_BANDWIDTH_tracker_init (&n->out_tracker, 345 GNUNET_BANDWIDTH_tracker_init (&n->out_tracker,
348 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, 346 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
349 MAX_BANDWIDTH_CARRY_S); 347 MAX_BANDWIDTH_CARRY_S);
350 GNUNET_assert (GNUNET_OK == 348 GNUNET_assert (GNUNET_OK ==
351 GNUNET_CONTAINER_multihashmap_put (h->neighbours, 349 GNUNET_CONTAINER_multihashmap_put (h->neighbours,
352 &pid->hashPubKey, 350 &pid->hashPubKey,
353 n, 351 n,
354 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 352 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
355 return n; 353 return n;
356} 354}
357 355
@@ -367,22 +365,18 @@ neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
367 * GNUNET_NO if not. 365 * GNUNET_NO if not.
368 */ 366 */
369static int 367static int
370neighbour_delete (void *cls, 368neighbour_delete (void *cls, const GNUNET_HashCode * key, void *value)
371 const GNUNET_HashCode * key,
372 void *value)
373{ 369{
374 struct GNUNET_TRANSPORT_Handle *handle = cls; 370 struct GNUNET_TRANSPORT_Handle *handle = cls;
375 struct Neighbour *n = value; 371 struct Neighbour *n = value;
376 372
377 if (NULL != handle->nd_cb) 373 if (NULL != handle->nd_cb)
378 handle->nd_cb (handle->cls, 374 handle->nd_cb (handle->cls, &n->id);
379 &n->id);
380 GNUNET_assert (NULL == n->th); 375 GNUNET_assert (NULL == n->th);
381 GNUNET_assert (NULL == n->hn); 376 GNUNET_assert (NULL == n->hn);
382 GNUNET_assert (GNUNET_YES == 377 GNUNET_assert (GNUNET_YES ==
383 GNUNET_CONTAINER_multihashmap_remove (handle->neighbours, 378 GNUNET_CONTAINER_multihashmap_remove (handle->neighbours,
384 key, 379 key, n));
385 n));
386 GNUNET_free (n); 380 GNUNET_free (n);
387 return GNUNET_YES; 381 return GNUNET_YES;
388} 382}
@@ -395,8 +389,7 @@ neighbour_delete (void *cls,
395 * @param msg message received, NULL on timeout or fatal error 389 * @param msg message received, NULL on timeout or fatal error
396 */ 390 */
397static void 391static void
398demultiplexer (void *cls, 392demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
399 const struct GNUNET_MessageHeader *msg)
400{ 393{
401 struct GNUNET_TRANSPORT_Handle *h = cls; 394 struct GNUNET_TRANSPORT_Handle *h = cls;
402 const struct DisconnectInfoMessage *dim; 395 const struct DisconnectInfoMessage *dim;
@@ -413,177 +406,171 @@ demultiplexer (void *cls,
413 406
414 GNUNET_assert (h->client != NULL); 407 GNUNET_assert (h->client != NULL);
415 if (msg == NULL) 408 if (msg == NULL)
416 { 409 {
417#if DEBUG_TRANSPORT 410#if DEBUG_TRANSPORT
418 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 411 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
419 "Error receiving from transport service, disconnecting temporarily.\n"); 412 "Error receiving from transport service, disconnecting temporarily.\n");
420#endif 413#endif
421 disconnect_and_schedule_reconnect (h); 414 disconnect_and_schedule_reconnect (h);
422 return; 415 return;
423 } 416 }
424 GNUNET_CLIENT_receive (h->client, 417 GNUNET_CLIENT_receive (h->client,
425 &demultiplexer, h, 418 &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
426 GNUNET_TIME_UNIT_FOREVER_REL);
427 size = ntohs (msg->size); 419 size = ntohs (msg->size);
428 switch (ntohs (msg->type)) 420 switch (ntohs (msg->type))
421 {
422 case GNUNET_MESSAGE_TYPE_HELLO:
423 if (GNUNET_OK !=
424 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, &me))
429 { 425 {
430 case GNUNET_MESSAGE_TYPE_HELLO: 426 GNUNET_break (0);
431 if (GNUNET_OK != 427 break;
432 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, 428 }
433 &me))
434 {
435 GNUNET_break (0);
436 break;
437 }
438#if DEBUG_TRANSPORT 429#if DEBUG_TRANSPORT
439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 430 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
440 "Receiving (my own) `%s' message, I am `%4s'.\n", 431 "Receiving (my own) `%s' message, I am `%4s'.\n",
441 "HELLO", GNUNET_i2s (&me)); 432 "HELLO", GNUNET_i2s (&me));
442#endif 433#endif
443 GNUNET_free_non_null (h->my_hello); 434 GNUNET_free_non_null (h->my_hello);
444 h->my_hello = NULL; 435 h->my_hello = NULL;
445 if (size < sizeof (struct GNUNET_MessageHeader)) 436 if (size < sizeof (struct GNUNET_MessageHeader))
446 { 437 {
447 GNUNET_break (0); 438 GNUNET_break (0);
448 break; 439 break;
449 } 440 }
450 h->my_hello = GNUNET_malloc (size); 441 h->my_hello = GNUNET_malloc (size);
451 memcpy (h->my_hello, msg, size); 442 memcpy (h->my_hello, msg, size);
452 hwl = h->hwl_head; 443 hwl = h->hwl_head;
453 while (NULL != hwl) 444 while (NULL != hwl)
454 { 445 {
455 next_hwl = hwl->next; 446 next_hwl = hwl->next;
456 hwl->rec (hwl->rec_cls, 447 hwl->rec (hwl->rec_cls,
457 (const struct GNUNET_MessageHeader *) h->my_hello); 448 (const struct GNUNET_MessageHeader *) h->my_hello);
458 hwl = next_hwl; 449 hwl = next_hwl;
459 } 450 }
451 break;
452 case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
453 if (size < sizeof (struct ConnectInfoMessage))
454 {
455 GNUNET_break (0);
456 break;
457 }
458 cim = (const struct ConnectInfoMessage *) msg;
459 ats_count = ntohl (cim->ats_count);
460 if (size !=
461 sizeof (struct ConnectInfoMessage) +
462 ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information))
463 {
464 GNUNET_break (0);
460 break; 465 break;
461 case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT: 466 }
462 if (size < sizeof (struct ConnectInfoMessage))
463 {
464 GNUNET_break (0);
465 break;
466 }
467 cim = (const struct ConnectInfoMessage *) msg;
468 ats_count = ntohl (cim->ats_count);
469 if (size != sizeof (struct ConnectInfoMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information))
470 {
471 GNUNET_break (0);
472 break;
473 }
474#if DEBUG_TRANSPORT 467#if DEBUG_TRANSPORT
475 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 468 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
476 "Receiving `%s' message for `%4s'.\n", 469 "Receiving `%s' message for `%4s'.\n",
477 "CONNECT", GNUNET_i2s (&cim->id)); 470 "CONNECT", GNUNET_i2s (&cim->id));
478#endif 471#endif
479 n = neighbour_find (h, &cim->id); 472 n = neighbour_find (h, &cim->id);
480 if (n != NULL) 473 if (n != NULL)
481 { 474 {
482 GNUNET_break (0); 475 GNUNET_break (0);
483 break; 476 break;
484 } 477 }
485 n = neighbour_add (h, &cim->id); 478 n = neighbour_add (h, &cim->id);
486 if (h->nc_cb != NULL) 479 if (h->nc_cb != NULL)
487 h->nc_cb (h->cls, &n->id, 480 h->nc_cb (h->cls, &n->id, &cim->ats, ats_count);
488 &cim->ats, ats_count); 481 break;
482 case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
483 if (size != sizeof (struct DisconnectInfoMessage))
484 {
485 GNUNET_break (0);
489 break; 486 break;
490 case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT: 487 }
491 if (size != sizeof (struct DisconnectInfoMessage)) 488 dim = (const struct DisconnectInfoMessage *) msg;
492 { 489 GNUNET_break (ntohl (dim->reserved) == 0);
493 GNUNET_break (0);
494 break;
495 }
496 dim = (const struct DisconnectInfoMessage *) msg;
497 GNUNET_break (ntohl (dim->reserved) == 0);
498#if DEBUG_TRANSPORT_DISCONNECT 490#if DEBUG_TRANSPORT_DISCONNECT
499 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500 "Receiving `%s' message for `%4s'.\n", 492 "Receiving `%s' message for `%4s'.\n",
501 "DISCONNECT", 493 "DISCONNECT", GNUNET_i2s (&dim->peer));
502 GNUNET_i2s (&dim->peer));
503#endif 494#endif
504 n = neighbour_find (h, &dim->peer); 495 n = neighbour_find (h, &dim->peer);
505 if (n == NULL) 496 if (n == NULL)
506 { 497 {
507 GNUNET_break (0); 498 GNUNET_break (0);
508 break; 499 break;
509 } 500 }
510 neighbour_delete (h, &dim->peer.hashPubKey, n); 501 neighbour_delete (h, &dim->peer.hashPubKey, n);
502 break;
503 case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
504 if (size != sizeof (struct SendOkMessage))
505 {
506 GNUNET_break (0);
511 break; 507 break;
512 case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK: 508 }
513 if (size != sizeof (struct SendOkMessage)) 509 okm = (const struct SendOkMessage *) msg;
514 {
515 GNUNET_break (0);
516 break;
517 }
518 okm = (const struct SendOkMessage *) msg;
519#if DEBUG_TRANSPORT 510#if DEBUG_TRANSPORT
520 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 511 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
521 "Receiving `%s' message, transmission %s.\n", "SEND_OK", 512 "Receiving `%s' message, transmission %s.\n", "SEND_OK",
522 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); 513 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
523#endif 514#endif
524 n = neighbour_find (h, &okm->peer); 515 n = neighbour_find (h, &okm->peer);
525 if (n == NULL) 516 if (n == NULL)
526 break;
527 GNUNET_break (GNUNET_NO == n->is_ready);
528 n->is_ready = GNUNET_YES;
529 if ( (n->th != NULL) &&
530 (n->hn == NULL) )
531 {
532 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->th->timeout_task);
533 GNUNET_SCHEDULER_cancel (n->th->timeout_task);
534 n->th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
535 /* we've been waiting for this (congestion, not quota,
536 caused delayed transmission) */
537 n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
538 n, 0);
539 schedule_transmission (h);
540 }
541 break; 517 break;
542 case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV: 518 GNUNET_break (GNUNET_NO == n->is_ready);
519 n->is_ready = GNUNET_YES;
520 if ((n->th != NULL) && (n->hn == NULL))
521 {
522 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->th->timeout_task);
523 GNUNET_SCHEDULER_cancel (n->th->timeout_task);
524 n->th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
525 /* we've been waiting for this (congestion, not quota,
526 * caused delayed transmission) */
527 n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap, n, 0);
528 schedule_transmission (h);
529 }
530 break;
531 case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
543#if DEBUG_TRANSPORT 532#if DEBUG_TRANSPORT
544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 533 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving `%s' message.\n", "RECV");
545 "Receiving `%s' message.\n", "RECV");
546#endif 534#endif
547 if (size < 535 if (size <
548 sizeof (struct InboundMessage) + 536 sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
549 sizeof (struct GNUNET_MessageHeader)) 537 {
550 { 538 GNUNET_break (0);
551 GNUNET_break (0); 539 break;
552 break; 540 }
553 } 541 im = (const struct InboundMessage *) msg;
554 im = (const struct InboundMessage *) msg; 542 GNUNET_break (0 == ntohl (im->reserved));
555 GNUNET_break (0 == ntohl (im->reserved)); 543 ats_count = ntohl (im->ats_count);
556 ats_count = ntohl(im->ats_count); 544 imm = (const struct GNUNET_MessageHeader *) &((&(im->ats))[ats_count + 1]);
557 imm = (const struct GNUNET_MessageHeader *) &((&(im->ats))[ats_count+1]); 545
558 546 if (ntohs (imm->size) + sizeof (struct InboundMessage) +
559 if (ntohs (imm->size) + sizeof (struct InboundMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) != size) 547 ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) != size)
560 { 548 {
561 GNUNET_break (0); 549 GNUNET_break (0);
562 break; 550 break;
563 } 551 }
564#if DEBUG_TRANSPORT 552#if DEBUG_TRANSPORT
565 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 553 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
566 "Received message of type %u from `%4s'.\n", 554 "Received message of type %u from `%4s'.\n",
567 ntohs (imm->type), GNUNET_i2s (&im->peer)); 555 ntohs (imm->type), GNUNET_i2s (&im->peer));
568#endif 556#endif
569 n = neighbour_find (h, &im->peer); 557 n = neighbour_find (h, &im->peer);
570 if (n == NULL) 558 if (n == NULL)
571 { 559 {
572 GNUNET_break (0);
573 break;
574 }
575 if (h->rec != NULL)
576 h->rec (h->cls, &im->peer, imm,
577 &im->ats, ats_count);
578 break;
579 default:
580 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
581 _
582 ("Received unexpected message of type %u in %s:%u\n"),
583 ntohs (msg->type), __FILE__, __LINE__);
584 GNUNET_break (0); 560 GNUNET_break (0);
585 break; 561 break;
586 } 562 }
563 if (h->rec != NULL)
564 h->rec (h->cls, &im->peer, imm, &im->ats, ats_count);
565 break;
566 default:
567 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
568 _
569 ("Received unexpected message of type %u in %s:%u\n"),
570 ntohs (msg->type), __FILE__, __LINE__);
571 GNUNET_break (0);
572 break;
573 }
587} 574}
588 575
589 576
@@ -596,7 +583,8 @@ demultiplexer (void *cls,
596 */ 583 */
597static void 584static void
598timeout_request_due_to_congestion (void *cls, 585timeout_request_due_to_congestion (void *cls,
599 const struct GNUNET_SCHEDULER_TaskContext *tc) 586 const struct GNUNET_SCHEDULER_TaskContext
587 *tc)
600{ 588{
601 struct GNUNET_TRANSPORT_TransmitHandle *th = cls; 589 struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
602 struct Neighbour *n = th->neighbour; 590 struct Neighbour *n = th->neighbour;
@@ -633,77 +621,78 @@ transport_notify_ready (void *cls, size_t size, void *buf)
633 GNUNET_assert (NULL != h->client); 621 GNUNET_assert (NULL != h->client);
634 h->cth = NULL; 622 h->cth = NULL;
635 if (NULL == buf) 623 if (NULL == buf)
636 { 624 {
637 /* transmission failed */ 625 /* transmission failed */
638 disconnect_and_schedule_reconnect (h); 626 disconnect_and_schedule_reconnect (h);
639 return 0; 627 return 0;
640 } 628 }
641 629
642 cbuf = buf; 630 cbuf = buf;
643 ret = 0; 631 ret = 0;
644 /* first send control messages */ 632 /* first send control messages */
645 while ( (NULL != (th = h->control_head)) && 633 while ((NULL != (th = h->control_head)) && (th->notify_size <= size))
646 (th->notify_size <= size) ) 634 {
647 { 635 GNUNET_CONTAINER_DLL_remove (h->control_head, h->control_tail, th);
648 GNUNET_CONTAINER_DLL_remove (h->control_head, 636 nret = th->notify (th->notify_cls, size, &cbuf[ret]);
649 h->control_tail,
650 th);
651 nret = th->notify (th->notify_cls, size, &cbuf[ret]);
652#if DEBUG_TRANSPORT 637#if DEBUG_TRANSPORT
653 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 638 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
654 "Added %u bytes of control message at %u\n", 639 "Added %u bytes of control message at %u\n", nret, ret);
655 nret,
656 ret);
657#endif 640#endif
658 GNUNET_free (th); 641 GNUNET_free (th);
659 ret += nret; 642 ret += nret;
660 size -= nret; 643 size -= nret;
661 } 644 }
662 645
663 /* then, if possible and no control messages pending, send data messages */ 646 /* then, if possible and no control messages pending, send data messages */
664 while ( (NULL == h->control_head) && 647 while ((NULL == h->control_head) &&
665 (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) ) 648 (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))))
649 {
650 if (GNUNET_YES != n->is_ready)
666 { 651 {
667 if (GNUNET_YES != n->is_ready) 652 /* peer not ready, wait for notification! */
668 {
669 /* peer not ready, wait for notification! */
670 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
671 n->hn = NULL;
672 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == n->th->timeout_task);
673 n->th->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (n->th->timeout),
674 &timeout_request_due_to_congestion,
675 n->th);
676 continue;
677 }
678 th = n->th;
679 if (th->notify_size + sizeof (struct OutboundMessage) > size)
680 break; /* does not fit */
681 if (GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, th->notify_size).rel_value > 0)
682 break; /* too early */
683 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap)); 653 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
684 n->hn = NULL; 654 n->hn = NULL;
685 n->th = NULL; 655 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == n->th->timeout_task);
686 n->is_ready = GNUNET_NO; 656 n->th->timeout_task =
687 GNUNET_assert (size >= sizeof (struct OutboundMessage)); 657 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
688 mret = th->notify (th->notify_cls, 658 (n->th->timeout),
689 size - sizeof (struct OutboundMessage), 659 &timeout_request_due_to_congestion,
690 &cbuf[ret + sizeof (struct OutboundMessage)]); 660 n->th);
691 GNUNET_assert (mret <= size - sizeof (struct OutboundMessage)); 661 continue;
692 if (mret != 0) 662 }
693 { 663 th = n->th;
694 GNUNET_assert (mret + sizeof (struct OutboundMessage) < GNUNET_SERVER_MAX_MESSAGE_SIZE); 664 if (th->notify_size + sizeof (struct OutboundMessage) > size)
695 obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND); 665 break; /* does not fit */
696 obm.header.size = htons (mret + sizeof (struct OutboundMessage)); 666 if (GNUNET_BANDWIDTH_tracker_get_delay
697 obm.priority = htonl (th->priority); 667 (&n->out_tracker, th->notify_size).rel_value > 0)
698 obm.timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (th->timeout)); 668 break; /* too early */
699 obm.peer = n->id; 669 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
700 memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage)); 670 n->hn = NULL;
701 ret += (mret + sizeof (struct OutboundMessage)); 671 n->th = NULL;
702 size -= (mret + sizeof (struct OutboundMessage)); 672 n->is_ready = GNUNET_NO;
703 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, mret); 673 GNUNET_assert (size >= sizeof (struct OutboundMessage));
704 } 674 mret = th->notify (th->notify_cls,
705 GNUNET_free (th); 675 size - sizeof (struct OutboundMessage),
676 &cbuf[ret + sizeof (struct OutboundMessage)]);
677 GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
678 if (mret != 0)
679 {
680 GNUNET_assert (mret + sizeof (struct OutboundMessage) <
681 GNUNET_SERVER_MAX_MESSAGE_SIZE);
682 obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
683 obm.header.size = htons (mret + sizeof (struct OutboundMessage));
684 obm.priority = htonl (th->priority);
685 obm.timeout =
686 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
687 (th->timeout));
688 obm.peer = n->id;
689 memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage));
690 ret += (mret + sizeof (struct OutboundMessage));
691 size -= (mret + sizeof (struct OutboundMessage));
692 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, mret);
706 } 693 }
694 GNUNET_free (th);
695 }
707 /* if there are more pending messages, try to schedule those */ 696 /* if there are more pending messages, try to schedule those */
708 schedule_transmission (h); 697 schedule_transmission (h);
709#if DEBUG_TRANSPORT 698#if DEBUG_TRANSPORT
@@ -723,7 +712,7 @@ transport_notify_ready (void *cls, size_t size, void *buf)
723 */ 712 */
724static void 713static void
725schedule_transmission_task (void *cls, 714schedule_transmission_task (void *cls,
726 const struct GNUNET_SCHEDULER_TaskContext *tc) 715 const struct GNUNET_SCHEDULER_TaskContext *tc)
727{ 716{
728 struct GNUNET_TRANSPORT_Handle *h = cls; 717 struct GNUNET_TRANSPORT_Handle *h = cls;
729 size_t size; 718 size_t size;
@@ -733,48 +722,45 @@ schedule_transmission_task (void *cls,
733 h->quota_task = GNUNET_SCHEDULER_NO_TASK; 722 h->quota_task = GNUNET_SCHEDULER_NO_TASK;
734 GNUNET_assert (NULL != h->client); 723 GNUNET_assert (NULL != h->client);
735 /* destroy all requests that have timed out */ 724 /* destroy all requests that have timed out */
736 while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) && 725 while ((NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
737 (GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value == 0) ) 726 (GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value == 0))
738 { 727 {
739 /* notify client that the request could not be satisfied within 728 /* notify client that the request could not be satisfied within
740 the given time constraints */ 729 * the given time constraints */
741 th = n->th; 730 th = n->th;
742 n->th = NULL; 731 n->th = NULL;
743 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap)); 732 GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
744 n->hn = NULL; 733 n->hn = NULL;
745#if DEBUG_TRANSPORT 734#if DEBUG_TRANSPORT
746 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 735 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
747 "Signalling timeout for transmission to peer %s due to congestion\n", 736 "Signalling timeout for transmission to peer %s due to congestion\n",
748 GNUNET_i2s (&n->id)); 737 GNUNET_i2s (&n->id));
749#endif 738#endif
750 GNUNET_assert (0 == 739 GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
751 th->notify (th->notify_cls, 0, NULL)); 740 GNUNET_free (th);
752 GNUNET_free (th); 741 }
753 }
754 if (NULL != h->cth) 742 if (NULL != h->cth)
755 return; 743 return;
756 if (NULL != h->control_head) 744 if (NULL != h->control_head)
757 { 745 {
758 size = h->control_head->notify_size; 746 size = h->control_head->notify_size;
759 } 747 }
760 else 748 else
761 { 749 {
762 n = GNUNET_CONTAINER_heap_peek (h->ready_heap); 750 n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
763 if (NULL == n) 751 if (NULL == n)
764 return; /* no pending messages */ 752 return; /* no pending messages */
765 size = n->th->notify_size + sizeof (struct OutboundMessage); 753 size = n->th->notify_size + sizeof (struct OutboundMessage);
766 } 754 }
767#if DEBUG_TRANSPORT 755#if DEBUG_TRANSPORT
768 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 756 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Calling notify_transmit_ready\n");
769 "Calling notify_transmit_ready\n");
770#endif 757#endif
771 h->cth = 758 h->cth =
772 GNUNET_CLIENT_notify_transmit_ready (h->client, 759 GNUNET_CLIENT_notify_transmit_ready (h->client,
773 size, 760 size,
774 GNUNET_TIME_UNIT_FOREVER_REL, 761 GNUNET_TIME_UNIT_FOREVER_REL,
775 GNUNET_NO, 762 GNUNET_NO,
776 &transport_notify_ready, 763 &transport_notify_ready, h);
777 h);
778 GNUNET_assert (NULL != h->cth); 764 GNUNET_assert (NULL != h->cth);
779} 765}
780 766
@@ -793,22 +779,23 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
793 779
794 GNUNET_assert (NULL != h->client); 780 GNUNET_assert (NULL != h->client);
795 if (h->quota_task != GNUNET_SCHEDULER_NO_TASK) 781 if (h->quota_task != GNUNET_SCHEDULER_NO_TASK)
796 { 782 {
797 GNUNET_SCHEDULER_cancel (h->quota_task); 783 GNUNET_SCHEDULER_cancel (h->quota_task);
798 h->quota_task = GNUNET_SCHEDULER_NO_TASK; 784 h->quota_task = GNUNET_SCHEDULER_NO_TASK;
799 } 785 }
800 if (NULL != h->control_head) 786 if (NULL != h->control_head)
801 delay = GNUNET_TIME_UNIT_ZERO; 787 delay = GNUNET_TIME_UNIT_ZERO;
802 else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) 788 else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
803 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, n->th->notify_size); 789 delay =
790 GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
791 n->th->notify_size);
804 else 792 else
805 return; /* no work to be done */ 793 return; /* no work to be done */
806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 794 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807 "Scheduling next transmission to service in %llu ms\n", 795 "Scheduling next transmission to service in %llu ms\n",
808 (unsigned long long) delay.rel_value); 796 (unsigned long long) delay.rel_value);
809 h->quota_task = GNUNET_SCHEDULER_add_delayed (delay, 797 h->quota_task = GNUNET_SCHEDULER_add_delayed (delay,
810 &schedule_transmission_task, 798 &schedule_transmission_task, h);
811 h);
812} 799}
813 800
814 801
@@ -831,16 +818,13 @@ schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
831 818
832#if DEBUG_TRANSPORT 819#if DEBUG_TRANSPORT
833 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 820 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834 "Control transmit of %u bytes requested\n", 821 "Control transmit of %u bytes requested\n", size);
835 size);
836#endif 822#endif
837 th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle)); 823 th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
838 th->notify = notify; 824 th->notify = notify;
839 th->notify_cls = notify_cls; 825 th->notify_cls = notify_cls;
840 th->notify_size = size; 826 th->notify_size = size;
841 GNUNET_CONTAINER_DLL_insert_tail (h->control_head, 827 GNUNET_CONTAINER_DLL_insert_tail (h->control_head, h->control_tail, th);
842 h->control_tail,
843 th);
844 schedule_transmission (h); 828 schedule_transmission (h);
845} 829}
846 830
@@ -860,18 +844,16 @@ send_start (void *cls, size_t size, void *buf)
860 struct StartMessage s; 844 struct StartMessage s;
861 845
862 if (buf == NULL) 846 if (buf == NULL)
863 { 847 {
864 /* Can only be shutdown, just give up */ 848 /* Can only be shutdown, just give up */
865#if DEBUG_TRANSPORT 849#if DEBUG_TRANSPORT
866 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 850 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
867 "Shutdown while trying to transmit `%s' request.\n", 851 "Shutdown while trying to transmit `%s' request.\n", "START");
868 "START");
869#endif 852#endif
870 return 0; 853 return 0;
871 } 854 }
872#if DEBUG_TRANSPORT 855#if DEBUG_TRANSPORT
873 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 856 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' request.\n", "START");
874 "Transmitting `%s' request.\n", "START");
875#endif 857#endif
876 GNUNET_assert (size >= sizeof (struct StartMessage)); 858 GNUNET_assert (size >= sizeof (struct StartMessage));
877 s.header.size = htons (sizeof (struct StartMessage)); 859 s.header.size = htons (sizeof (struct StartMessage));
@@ -892,29 +874,25 @@ send_start (void *cls, size_t size, void *buf)
892 * @param tc scheduler context 874 * @param tc scheduler context
893 */ 875 */
894static void 876static void
895reconnect (void *cls, 877reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
896 const struct GNUNET_SCHEDULER_TaskContext *tc)
897{ 878{
898 struct GNUNET_TRANSPORT_Handle *h = cls; 879 struct GNUNET_TRANSPORT_Handle *h = cls;
899 880
900 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; 881 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
901 if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) 882 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
902 { 883 {
903 /* shutdown, just give up */ 884 /* shutdown, just give up */
904 return; 885 return;
905 } 886 }
906#if DEBUG_TRANSPORT 887#if DEBUG_TRANSPORT
907 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 888 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
908 "Connecting to transport service.\n");
909#endif 889#endif
910 GNUNET_assert (h->client == NULL); 890 GNUNET_assert (h->client == NULL);
911 GNUNET_assert (h->control_head == NULL); 891 GNUNET_assert (h->control_head == NULL);
912 GNUNET_assert (h->control_tail == NULL); 892 GNUNET_assert (h->control_tail == NULL);
913 h->client = GNUNET_CLIENT_connect ("transport", h->cfg); 893 h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
914 GNUNET_assert (h->client != NULL); 894 GNUNET_assert (h->client != NULL);
915 schedule_control_transmit (h, 895 schedule_control_transmit (h, sizeof (struct StartMessage), &send_start, h);
916 sizeof (struct StartMessage),
917 &send_start, h);
918} 896}
919 897
920 898
@@ -931,50 +909,45 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
931 909
932 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK); 910 GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
933 /* Forget about all neighbours that we used to be connected to */ 911 /* Forget about all neighbours that we used to be connected to */
934 GNUNET_CONTAINER_multihashmap_iterate(h->neighbours, 912 GNUNET_CONTAINER_multihashmap_iterate (h->neighbours, &neighbour_delete, h);
935 &neighbour_delete,
936 h);
937 if (NULL != h->cth) 913 if (NULL != h->cth)
938 { 914 {
939 GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); 915 GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
940 h->cth = NULL; 916 h->cth = NULL;
941 } 917 }
942 if (NULL != h->client) 918 if (NULL != h->client)
943 { 919 {
944 GNUNET_CLIENT_disconnect (h->client, GNUNET_YES); 920 GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
945 h->client = NULL; 921 h->client = NULL;
946 } 922 }
947 if (h->quota_task != GNUNET_SCHEDULER_NO_TASK) 923 if (h->quota_task != GNUNET_SCHEDULER_NO_TASK)
948 { 924 {
949 GNUNET_SCHEDULER_cancel (h->quota_task); 925 GNUNET_SCHEDULER_cancel (h->quota_task);
950 h->quota_task = GNUNET_SCHEDULER_NO_TASK; 926 h->quota_task = GNUNET_SCHEDULER_NO_TASK;
951 } 927 }
952 while ( (NULL != (th = h->control_head))) 928 while ((NULL != (th = h->control_head)))
953 { 929 {
954 GNUNET_CONTAINER_DLL_remove (h->control_head, 930 GNUNET_CONTAINER_DLL_remove (h->control_head, h->control_tail, th);
955 h->control_tail, 931 th->notify (th->notify_cls, 0, NULL);
956 th); 932 GNUNET_free (th);
957 th->notify (th->notify_cls, 0, NULL); 933 }
958 GNUNET_free (th);
959 }
960#if DEBUG_TRANSPORT 934#if DEBUG_TRANSPORT
961 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 935 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
962 "Scheduling task to reconnect to transport service in %llu ms.\n", 936 "Scheduling task to reconnect to transport service in %llu ms.\n",
963 h->reconnect_delay.rel_value); 937 h->reconnect_delay.rel_value);
964#endif 938#endif
965 h->reconnect_task 939 h->reconnect_task
966 = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, 940 = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
967 &reconnect, h);
968 if (h->reconnect_delay.rel_value == 0) 941 if (h->reconnect_delay.rel_value == 0)
969 { 942 {
970 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; 943 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
971 } 944 }
972 else 945 else
973 { 946 {
974 h->reconnect_delay = GNUNET_TIME_relative_multiply (h->reconnect_delay, 2); 947 h->reconnect_delay = GNUNET_TIME_relative_multiply (h->reconnect_delay, 2);
975 h->reconnect_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, 948 h->reconnect_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
976 h->reconnect_delay); 949 h->reconnect_delay);
977 } 950 }
978} 951}
979 952
980 953
@@ -1011,15 +984,14 @@ send_set_quota (void *cls, size_t size, void *buf)
1011 struct QuotaSetMessage msg; 984 struct QuotaSetMessage msg;
1012 985
1013 if (buf == NULL) 986 if (buf == NULL)
1014 { 987 {
1015 GNUNET_free (sqc); 988 GNUNET_free (sqc);
1016 return 0; 989 return 0;
1017 } 990 }
1018#if DEBUG_TRANSPORT 991#if DEBUG_TRANSPORT
1019 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 992 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020 "Transmitting `%s' request with respect to `%4s'.\n", 993 "Transmitting `%s' request with respect to `%4s'.\n",
1021 "SET_QUOTA", 994 "SET_QUOTA", GNUNET_i2s (&sqc->target));
1022 GNUNET_i2s (&sqc->target));
1023#endif 995#endif
1024 GNUNET_assert (size >= sizeof (struct QuotaSetMessage)); 996 GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
1025 msg.header.size = htons (sizeof (struct QuotaSetMessage)); 997 msg.header.size = htons (sizeof (struct QuotaSetMessage));
@@ -1049,32 +1021,29 @@ GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
1049{ 1021{
1050 struct Neighbour *n; 1022 struct Neighbour *n;
1051 struct SetQuotaContext *sqc; 1023 struct SetQuotaContext *sqc;
1052 1024
1053 n = neighbour_find (handle, target); 1025 n = neighbour_find (handle, target);
1054 if (NULL == n) 1026 if (NULL == n)
1055 { 1027 {
1056 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1028 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1057 "Quota changed to %u for peer `%s', but I have no such neighbour!\n", 1029 "Quota changed to %u for peer `%s', but I have no such neighbour!\n",
1058 (unsigned int) ntohl (quota_out.value__), 1030 (unsigned int) ntohl (quota_out.value__), GNUNET_i2s (target));
1059 GNUNET_i2s (target)); 1031 return;
1060 return; 1032 }
1061 }
1062 GNUNET_assert (NULL != handle->client); 1033 GNUNET_assert (NULL != handle->client);
1063#if DEBUG_TRANSPORT 1034#if DEBUG_TRANSPORT
1064 if (ntohl (quota_out.value__) != n->out_tracker.available_bytes_per_s__) 1035 if (ntohl (quota_out.value__) != n->out_tracker.available_bytes_per_s__)
1065 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1036 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1066 "Quota changed from %u to %u for peer `%s'\n", 1037 "Quota changed from %u to %u for peer `%s'\n",
1067 (unsigned int) n->out_tracker.available_bytes_per_s__, 1038 (unsigned int) n->out_tracker.available_bytes_per_s__,
1068 (unsigned int) ntohl (quota_out.value__), 1039 (unsigned int) ntohl (quota_out.value__), GNUNET_i2s (target));
1069 GNUNET_i2s (target));
1070 else 1040 else
1071 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1041 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072 "Quota remains at %u for peer `%s'\n", 1042 "Quota remains at %u for peer `%s'\n",
1073 (unsigned int) n->out_tracker.available_bytes_per_s__, 1043 (unsigned int) n->out_tracker.available_bytes_per_s__,
1074 GNUNET_i2s (target)); 1044 GNUNET_i2s (target));
1075#endif 1045#endif
1076 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, 1046 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, quota_out);
1077 quota_out);
1078 sqc = GNUNET_malloc (sizeof (struct SetQuotaContext)); 1047 sqc = GNUNET_malloc (sizeof (struct SetQuotaContext));
1079 sqc->target = *target; 1048 sqc->target = *target;
1080 sqc->quota_in = quota_in; 1049 sqc->quota_in = quota_in;
@@ -1099,15 +1068,14 @@ send_try_connect (void *cls, size_t size, void *buf)
1099 struct TransportRequestConnectMessage msg; 1068 struct TransportRequestConnectMessage msg;
1100 1069
1101 if (buf == NULL) 1070 if (buf == NULL)
1102 { 1071 {
1103 GNUNET_free (pid); 1072 GNUNET_free (pid);
1104 return 0; 1073 return 0;
1105 } 1074 }
1106#if DEBUG_TRANSPORT 1075#if DEBUG_TRANSPORT
1107 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1076 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1108 "Transmitting `%s' request with respect to `%4s'.\n", 1077 "Transmitting `%s' request with respect to `%4s'.\n",
1109 "REQUEST_CONNECT", 1078 "REQUEST_CONNECT", GNUNET_i2s (pid));
1110 GNUNET_i2s (pid));
1111#endif 1079#endif
1112 GNUNET_assert (size >= sizeof (struct TransportRequestConnectMessage)); 1080 GNUNET_assert (size >= sizeof (struct TransportRequestConnectMessage));
1113 msg.header.size = htons (sizeof (struct TransportRequestConnectMessage)); 1081 msg.header.size = htons (sizeof (struct TransportRequestConnectMessage));
@@ -1129,7 +1097,7 @@ send_try_connect (void *cls, size_t size, void *buf)
1129 */ 1097 */
1130void 1098void
1131GNUNET_TRANSPORT_try_connect (struct GNUNET_TRANSPORT_Handle *handle, 1099GNUNET_TRANSPORT_try_connect (struct GNUNET_TRANSPORT_Handle *handle,
1132 const struct GNUNET_PeerIdentity *target) 1100 const struct GNUNET_PeerIdentity *target)
1133{ 1101{
1134 struct GNUNET_PeerIdentity *pid; 1102 struct GNUNET_PeerIdentity *pid;
1135 1103
@@ -1158,18 +1126,16 @@ send_hello (void *cls, size_t size, void *buf)
1158 uint16_t ssize; 1126 uint16_t ssize;
1159 1127
1160 if (buf == NULL) 1128 if (buf == NULL)
1161 { 1129 {
1162#if DEBUG_TRANSPORT_TIMEOUT 1130#if DEBUG_TRANSPORT_TIMEOUT
1163 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1131 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1164 "Timeout while trying to transmit `%s' request.\n", 1132 "Timeout while trying to transmit `%s' request.\n", "HELLO");
1165 "HELLO");
1166#endif 1133#endif
1167 GNUNET_free (msg); 1134 GNUNET_free (msg);
1168 return 0; 1135 return 0;
1169 } 1136 }
1170#if DEBUG_TRANSPORT 1137#if DEBUG_TRANSPORT
1171 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1138 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' request.\n", "HELLO");
1172 "Transmitting `%s' request.\n", "HELLO");
1173#endif 1139#endif
1174 ssize = ntohs (msg->size); 1140 ssize = ntohs (msg->size);
1175 GNUNET_assert (size >= ssize); 1141 GNUNET_assert (size >= ssize);
@@ -1193,8 +1159,7 @@ send_hello (void *cls, size_t size, void *buf)
1193void 1159void
1194GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, 1160GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
1195 const struct GNUNET_MessageHeader *hello, 1161 const struct GNUNET_MessageHeader *hello,
1196 GNUNET_SCHEDULER_Task cont, 1162 GNUNET_SCHEDULER_Task cont, void *cls)
1197 void *cls)
1198{ 1163{
1199 uint16_t size; 1164 uint16_t size;
1200 struct GNUNET_PeerIdentity peer; 1165 struct GNUNET_PeerIdentity peer;
@@ -1205,23 +1170,20 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
1205 GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO); 1170 GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
1206 size = ntohs (hello->size); 1171 size = ntohs (hello->size);
1207 GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader)); 1172 GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
1208 if (GNUNET_OK != GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message*) hello, 1173 if (GNUNET_OK !=
1209 &peer)) 1174 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello, &peer))
1210 { 1175 {
1211 GNUNET_break (0); 1176 GNUNET_break (0);
1212 return; 1177 return;
1213 } 1178 }
1214 msg = GNUNET_malloc(size); 1179 msg = GNUNET_malloc (size);
1215 memcpy (msg, hello, size); 1180 memcpy (msg, hello, size);
1216#if DEBUG_TRANSPORT 1181#if DEBUG_TRANSPORT
1217 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1182 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1218 "Offering `%s' message of `%4s' to transport for validation.\n", 1183 "Offering `%s' message of `%4s' to transport for validation.\n",
1219 "HELLO", 1184 "HELLO", GNUNET_i2s (&peer));
1220 GNUNET_i2s (&peer));
1221#endif 1185#endif
1222 schedule_control_transmit (handle, 1186 schedule_control_transmit (handle, size, &send_hello, msg);
1223 size,
1224 &send_hello, msg);
1225} 1187}
1226 1188
1227 1189
@@ -1245,9 +1207,7 @@ GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
1245 hwl = GNUNET_malloc (sizeof (struct HelloWaitList)); 1207 hwl = GNUNET_malloc (sizeof (struct HelloWaitList));
1246 hwl->rec = rec; 1208 hwl->rec = rec;
1247 hwl->rec_cls = rec_cls; 1209 hwl->rec_cls = rec_cls;
1248 GNUNET_CONTAINER_DLL_insert (handle->hwl_head, 1210 GNUNET_CONTAINER_DLL_insert (handle->hwl_head, handle->hwl_tail, hwl);
1249 handle->hwl_tail,
1250 hwl);
1251 if (handle->my_hello == NULL) 1211 if (handle->my_hello == NULL)
1252 return; 1212 return;
1253 rec (rec_cls, (const struct GNUNET_MessageHeader *) handle->my_hello); 1213 rec (rec_cls, (const struct GNUNET_MessageHeader *) handle->my_hello);
@@ -1263,25 +1223,22 @@ GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
1263 */ 1223 */
1264void 1224void
1265GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle, 1225GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle,
1266 GNUNET_TRANSPORT_HelloUpdateCallback rec, 1226 GNUNET_TRANSPORT_HelloUpdateCallback rec,
1267 void *rec_cls) 1227 void *rec_cls)
1268{ 1228{
1269 struct HelloWaitList *pos; 1229 struct HelloWaitList *pos;
1270 1230
1271 pos = handle->hwl_head; 1231 pos = handle->hwl_head;
1272 while (pos != NULL) 1232 while (pos != NULL)
1273 { 1233 {
1274 if ( (pos->rec == rec) && 1234 if ((pos->rec == rec) && (pos->rec_cls == rec_cls))
1275 (pos->rec_cls == rec_cls) ) 1235 break;
1276 break; 1236 pos = pos->next;
1277 pos = pos->next; 1237 }
1278 }
1279 GNUNET_break (pos != NULL); 1238 GNUNET_break (pos != NULL);
1280 if (pos == NULL) 1239 if (pos == NULL)
1281 return; 1240 return;
1282 GNUNET_CONTAINER_DLL_remove (handle->hwl_head, 1241 GNUNET_CONTAINER_DLL_remove (handle->hwl_head, handle->hwl_tail, pos);
1283 handle->hwl_tail,
1284 pos);
1285 GNUNET_free (pos); 1242 GNUNET_free (pos);
1286} 1243}
1287 1244
@@ -1300,7 +1257,7 @@ GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle,
1300 */ 1257 */
1301struct GNUNET_TRANSPORT_Handle * 1258struct GNUNET_TRANSPORT_Handle *
1302GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, 1259GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1303 const struct GNUNET_PeerIdentity *self, 1260 const struct GNUNET_PeerIdentity *self,
1304 void *cls, 1261 void *cls,
1305 GNUNET_TRANSPORT_ReceiveCallback rec, 1262 GNUNET_TRANSPORT_ReceiveCallback rec,
1306 GNUNET_TRANSPORT_NotifyConnect nc, 1263 GNUNET_TRANSPORT_NotifyConnect nc,
@@ -1310,18 +1267,20 @@ GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1310 1267
1311 ret = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_Handle)); 1268 ret = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_Handle));
1312 if (self != NULL) 1269 if (self != NULL)
1313 { 1270 {
1314 ret->self = *self; 1271 ret->self = *self;
1315 ret->check_self = GNUNET_YES; 1272 ret->check_self = GNUNET_YES;
1316 } 1273 }
1317 ret->cfg = cfg; 1274 ret->cfg = cfg;
1318 ret->cls = cls; 1275 ret->cls = cls;
1319 ret->rec = rec; 1276 ret->rec = rec;
1320 ret->nc_cb = nc; 1277 ret->nc_cb = nc;
1321 ret->nd_cb = nd; 1278 ret->nd_cb = nd;
1322 ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 1279 ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1323 ret->neighbours = GNUNET_CONTAINER_multihashmap_create(STARTING_NEIGHBOURS_SIZE); 1280 ret->neighbours =
1324 ret->ready_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 1281 GNUNET_CONTAINER_multihashmap_create (STARTING_NEIGHBOURS_SIZE);
1282 ret->ready_heap =
1283 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1325 ret->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, ret); 1284 ret->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, ret);
1326 return ret; 1285 return ret;
1327} 1286}
@@ -1336,25 +1295,24 @@ void
1336GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) 1295GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1337{ 1296{
1338#if DEBUG_TRANSPORT 1297#if DEBUG_TRANSPORT
1339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
1340 "Transport disconnect called!\n");
1341#endif 1299#endif
1342 /* this disconnects all neighbours... */ 1300 /* this disconnects all neighbours... */
1343 if (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK) 1301 if (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
1344 disconnect_and_schedule_reconnect (handle); 1302 disconnect_and_schedule_reconnect (handle);
1345 /* and now we stop trying to connect again... */ 1303 /* and now we stop trying to connect again... */
1346 if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK) 1304 if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1347 { 1305 {
1348 GNUNET_SCHEDULER_cancel (handle->reconnect_task); 1306 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1349 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK; 1307 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1350 } 1308 }
1351 GNUNET_CONTAINER_multihashmap_destroy (handle->neighbours); 1309 GNUNET_CONTAINER_multihashmap_destroy (handle->neighbours);
1352 handle->neighbours = NULL; 1310 handle->neighbours = NULL;
1353 if (handle->quota_task != GNUNET_SCHEDULER_NO_TASK) 1311 if (handle->quota_task != GNUNET_SCHEDULER_NO_TASK)
1354 { 1312 {
1355 GNUNET_SCHEDULER_cancel (handle->quota_task); 1313 GNUNET_SCHEDULER_cancel (handle->quota_task);
1356 handle->quota_task = GNUNET_SCHEDULER_NO_TASK; 1314 handle->quota_task = GNUNET_SCHEDULER_NO_TASK;
1357 } 1315 }
1358 GNUNET_free_non_null (handle->my_hello); 1316 GNUNET_free_non_null (handle->my_hello);
1359 handle->my_hello = NULL; 1317 handle->my_hello = NULL;
1360 GNUNET_assert (handle->hwl_head == NULL); 1318 GNUNET_assert (handle->hwl_head == NULL);
@@ -1386,31 +1344,30 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1386 */ 1344 */
1387struct GNUNET_TRANSPORT_TransmitHandle * 1345struct GNUNET_TRANSPORT_TransmitHandle *
1388GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle, 1346GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
1389 const struct GNUNET_PeerIdentity *target, 1347 const struct GNUNET_PeerIdentity
1390 size_t size, 1348 *target, size_t size, uint32_t priority,
1391 uint32_t priority,
1392 struct GNUNET_TIME_Relative timeout, 1349 struct GNUNET_TIME_Relative timeout,
1393 GNUNET_CONNECTION_TransmitReadyNotify notify, 1350 GNUNET_CONNECTION_TransmitReadyNotify
1394 void *notify_cls) 1351 notify, void *notify_cls)
1395{ 1352{
1396 struct Neighbour *n; 1353 struct Neighbour *n;
1397 struct GNUNET_TRANSPORT_TransmitHandle *th; 1354 struct GNUNET_TRANSPORT_TransmitHandle *th;
1398 struct GNUNET_TIME_Relative delay; 1355 struct GNUNET_TIME_Relative delay;
1399 1356
1400 n = neighbour_find (handle, target); 1357 n = neighbour_find (handle, target);
1401 if (NULL == n) 1358 if (NULL == n)
1402 { 1359 {
1403 /* use GNUNET_TRANSPORT_try_connect first, only use this function 1360 /* use GNUNET_TRANSPORT_try_connect first, only use this function
1404 once a connection has been established */ 1361 * once a connection has been established */
1405 GNUNET_assert (0); 1362 GNUNET_assert (0);
1406 return NULL; 1363 return NULL;
1407 } 1364 }
1408 if (NULL != n->th) 1365 if (NULL != n->th)
1409 { 1366 {
1410 /* attempt to send two messages at the same time to the same peer */ 1367 /* attempt to send two messages at the same time to the same peer */
1411 GNUNET_assert (0); 1368 GNUNET_assert (0);
1412 return NULL; 1369 return NULL;
1413 } 1370 }
1414 GNUNET_assert (NULL == n->hn); 1371 GNUNET_assert (NULL == n->hn);
1415 th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle)); 1372 th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
1416 th->neighbour = n; 1373 th->neighbour = n;
@@ -1423,16 +1380,13 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
1423 /* calculate when our transmission should be ready */ 1380 /* calculate when our transmission should be ready */
1424 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, size); 1381 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, size);
1425 if (delay.rel_value > timeout.rel_value) 1382 if (delay.rel_value > timeout.rel_value)
1426 delay.rel_value = 0; /* notify immediately (with failure) */ 1383 delay.rel_value = 0; /* notify immediately (with failure) */
1427#if DEBUG_TRANSPORT 1384#if DEBUG_TRANSPORT
1428 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1429 "Bandwidth tracker allows next transmission to peer %s in %llu ms\n", 1386 "Bandwidth tracker allows next transmission to peer %s in %llu ms\n",
1430 GNUNET_i2s (target), 1387 GNUNET_i2s (target), (unsigned long long) delay.rel_value);
1431 (unsigned long long) delay.rel_value);
1432#endif 1388#endif
1433 n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap, 1389 n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap, n, delay.rel_value);
1434 n,
1435 delay.rel_value);
1436 schedule_transmission (handle); 1390 schedule_transmission (handle);
1437 return th; 1391 return th;
1438} 1392}
@@ -1444,7 +1398,9 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
1444 * @param th handle returned from GNUNET_TRANSPORT_notify_transmit_ready 1398 * @param th handle returned from GNUNET_TRANSPORT_notify_transmit_ready
1445 */ 1399 */
1446void 1400void
1447GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th) 1401GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
1402 GNUNET_TRANSPORT_TransmitHandle
1403 *th)
1448{ 1404{
1449 struct Neighbour *n; 1405 struct Neighbour *n;
1450 1406
@@ -1454,17 +1410,17 @@ GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitH
1454 GNUNET_assert (th == n->th); 1410 GNUNET_assert (th == n->th);
1455 n->th = NULL; 1411 n->th = NULL;
1456 if (n->hn != NULL) 1412 if (n->hn != NULL)
1457 { 1413 {
1458 GNUNET_CONTAINER_heap_remove_node (n->hn); 1414 GNUNET_CONTAINER_heap_remove_node (n->hn);
1459 n->hn = NULL; 1415 n->hn = NULL;
1460 } 1416 }
1461 else 1417 else
1462 { 1418 {
1463 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != th->timeout_task); 1419 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != th->timeout_task);
1464 GNUNET_SCHEDULER_cancel (th->timeout_task); 1420 GNUNET_SCHEDULER_cancel (th->timeout_task);
1465 th->timeout_task = GNUNET_SCHEDULER_NO_TASK; 1421 th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1466 } 1422 }
1467 GNUNET_free (th); 1423 GNUNET_free (th);
1468} 1424}
1469 1425
1470 1426