aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api2_core.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport_api2_core.c')
-rw-r--r--src/transport/transport_api2_core.c506
1 files changed, 182 insertions, 324 deletions
diff --git a/src/transport/transport_api2_core.c b/src/transport/transport_api2_core.c
index f00d00a44..a3c49e94f 100644
--- a/src/transport/transport_api2_core.c
+++ b/src/transport/transport_api2_core.c
@@ -32,13 +32,23 @@
32#include "gnunet_transport_core_service.h" 32#include "gnunet_transport_core_service.h"
33#include "transport.h" 33#include "transport.h"
34 34
35#define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__) 35#define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__)
36 36
37/** 37/**
38 * How large to start with for the hashmap of neighbours. 38 * How large to start with for the hashmap of neighbours.
39 */ 39 */
40#define STARTING_NEIGHBOURS_SIZE 16 40#define STARTING_NEIGHBOURS_SIZE 16
41 41
42/**
43 * Window size. How many messages to the same target do we pass
44 * to TRANSPORT without a SEND_OK in between? Small values limit
45 * thoughput, large values will increase latency.
46 *
47 * FIXME-OPTIMIZE: find out what good values are experimentally,
48 * maybe set adaptively (i.e. to observed available bandwidth).
49 */
50#define SEND_WINDOW_SIZE 4
51
42 52
43/** 53/**
44 * Entry in hash table of all of our current (connected) neighbours. 54 * Entry in hash table of all of our current (connected) neighbours.
@@ -72,46 +82,27 @@ struct Neighbour
72 void *handlers_cls; 82 void *handlers_cls;
73 83
74 /** 84 /**
75 * Entry in our readyness heap (which is sorted by @e next_ready 85 * How many messages can we still send to this peer before we should
76 * value). NULL if there is no pending transmission request for 86 * throttle?
77 * this neighbour or if we're waiting for @e is_ready to become
78 * true AFTER the @e out_tracker suggested that this peer's quota
79 * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
80 * we should immediately go back into the heap).
81 */ 87 */
82 struct GNUNET_CONTAINER_HeapNode *hn; 88 unsigned int ready_window;
83 89
84 /** 90 /**
85 * Task to trigger MQ when we have enough bandwidth for the 91 * Used to indicate our status if @e env is non-NULL. Set to
86 * next transmission. 92 * #GNUNET_YES if we did pass a message to the MQ and are waiting
93 * for the call to #notify_send_done(). Set to #GNUNET_NO if the @e
94 * ready_window is 0 and @e env is waiting for a
95 * #GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK?
87 */ 96 */
88 struct GNUNET_SCHEDULER_Task *timeout_task; 97 int16_t awaiting_done;
89
90 /**
91 * Outbound bandwidh tracker.
92 */
93 struct GNUNET_BANDWIDTH_Tracker out_tracker;
94
95 /**
96 * Sending consumed more bytes on wire than payload was announced
97 * This overhead is added to the delay of next sending operation
98 */
99 unsigned long long traffic_overhead;
100
101 /**
102 * Is this peer currently ready to receive a message?
103 */
104 int is_ready;
105 98
106 /** 99 /**
107 * Size of the message in @e env. 100 * Size of the message in @e env.
108 */ 101 */
109 uint16_t env_size; 102 uint16_t env_size;
110
111}; 103};
112 104
113 105
114
115/** 106/**
116 * Handle for the transport service (includes all of the 107 * Handle for the transport service (includes all of the
117 * state for the transport service). 108 * state for the transport service).
@@ -141,11 +132,6 @@ struct GNUNET_TRANSPORT_CoreHandle
141 GNUNET_TRANSPORT_NotifyDisconnect nd_cb; 132 GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
142 133
143 /** 134 /**
144 * function to call on excess bandwidth events
145 */
146 GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
147
148 /**
149 * My client connection to the transport service. 135 * My client connection to the transport service.
150 */ 136 */
151 struct GNUNET_MQ_Handle *mq; 137 struct GNUNET_MQ_Handle *mq;
@@ -181,7 +167,6 @@ struct GNUNET_TRANSPORT_CoreHandle
181 * (if #GNUNET_NO, then @e self is all zeros!). 167 * (if #GNUNET_NO, then @e self is all zeros!).
182 */ 168 */
183 int check_self; 169 int check_self;
184
185}; 170};
186 171
187 172
@@ -206,31 +191,7 @@ static struct Neighbour *
206neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h, 191neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
207 const struct GNUNET_PeerIdentity *peer) 192 const struct GNUNET_PeerIdentity *peer)
208{ 193{
209 return GNUNET_CONTAINER_multipeermap_get (h->neighbours, 194 return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer);
210 peer);
211}
212
213
214/**
215 * Function called by the bandwidth tracker if we have excess
216 * bandwidth.
217 *
218 * @param cls the `struct Neighbour` that has excess bandwidth
219 */
220static void
221notify_excess_cb (void *cls)
222{
223 struct Neighbour *n = cls;
224 struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
225
226 LOG (GNUNET_ERROR_TYPE_DEBUG,
227 "Notifying CORE that more bandwidth is available for %s\n",
228 GNUNET_i2s (&n->id));
229
230 if (NULL != h->neb_cb)
231 h->neb_cb (h->cls,
232 &n->id,
233 n->handlers_cls);
234} 195}
235 196
236 197
@@ -245,9 +206,7 @@ notify_excess_cb (void *cls)
245 * #GNUNET_NO if not. 206 * #GNUNET_NO if not.
246 */ 207 */
247static int 208static int
248neighbour_delete (void *cls, 209neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
249 const struct GNUNET_PeerIdentity *key,
250 void *value)
251{ 210{
252 struct GNUNET_TRANSPORT_CoreHandle *handle = cls; 211 struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
253 struct Neighbour *n = value; 212 struct Neighbour *n = value;
@@ -255,16 +214,8 @@ neighbour_delete (void *cls,
255 LOG (GNUNET_ERROR_TYPE_DEBUG, 214 LOG (GNUNET_ERROR_TYPE_DEBUG,
256 "Dropping entry for neighbour `%s'.\n", 215 "Dropping entry for neighbour `%s'.\n",
257 GNUNET_i2s (key)); 216 GNUNET_i2s (key));
258 GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
259 if (NULL != handle->nd_cb) 217 if (NULL != handle->nd_cb)
260 handle->nd_cb (handle->cls, 218 handle->nd_cb (handle->cls, &n->id, n->handlers_cls);
261 &n->id,
262 n->handlers_cls);
263 if (NULL != n->timeout_task)
264 {
265 GNUNET_SCHEDULER_cancel (n->timeout_task);
266 n->timeout_task = NULL;
267 }
268 if (NULL != n->env) 219 if (NULL != n->env)
269 { 220 {
270 GNUNET_MQ_send_cancel (n->env); 221 GNUNET_MQ_send_cancel (n->env);
@@ -272,10 +223,9 @@ neighbour_delete (void *cls,
272 } 223 }
273 GNUNET_MQ_destroy (n->mq); 224 GNUNET_MQ_destroy (n->mq);
274 GNUNET_assert (NULL == n->mq); 225 GNUNET_assert (NULL == n->mq);
275 GNUNET_assert (GNUNET_YES == 226 GNUNET_assert (
276 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, 227 GNUNET_YES ==
277 key, 228 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n));
278 n));
279 GNUNET_free (n); 229 GNUNET_free (n);
280 return GNUNET_YES; 230 return GNUNET_YES;
281} 231}
@@ -291,8 +241,7 @@ neighbour_delete (void *cls,
291 * @param error error code 241 * @param error error code
292 */ 242 */
293static void 243static void
294mq_error_handler (void *cls, 244mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
295 enum GNUNET_MQ_Error error)
296{ 245{
297 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 246 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
298 247
@@ -306,57 +255,42 @@ mq_error_handler (void *cls,
306 * A message from the handler's message queue to a neighbour was 255 * A message from the handler's message queue to a neighbour was
307 * transmitted. Now trigger (possibly delayed) notification of the 256 * transmitted. Now trigger (possibly delayed) notification of the
308 * neighbour's message queue that we are done and thus ready for 257 * neighbour's message queue that we are done and thus ready for
309 * the next message. 258 * the next message. Note that the MQ being ready is independent
259 * of the send window, as we may queue many messages and simply
260 * not pass them to TRANSPORT if the send window is insufficient.
310 * 261 *
311 * @param cls the `struct Neighbour` where the message was sent 262 * @param cls the `struct Neighbour` where the message was sent
312 */ 263 */
313static void 264static void
314notify_send_done_fin (void *cls) 265notify_send_done (void *cls)
315{ 266{
316 struct Neighbour *n = cls; 267 struct Neighbour *n = cls;
317 268
318 n->timeout_task = NULL; 269 n->awaiting_done = GNUNET_NO;
319 n->is_ready = GNUNET_YES; 270 n->env = NULL;
320 GNUNET_MQ_impl_send_continue (n->mq); 271 GNUNET_MQ_impl_send_continue (n->mq);
321} 272}
322 273
323 274
324/** 275/**
325 * A message from the handler's message queue to a neighbour was 276 * We have an envelope waiting for transmission at @a n, and
326 * transmitted. Now trigger (possibly delayed) notification of the 277 * our transmission window is positive. Perform the transmission.
327 * neighbour's message queue that we are done and thus ready for
328 * the next message.
329 * 278 *
330 * @param cls the `struct Neighbour` where the message was sent 279 * @param n neighbour to perform transmission for
331 */ 280 */
332static void 281static void
333notify_send_done (void *cls) 282do_send (struct Neighbour *n)
334{ 283{
335 struct Neighbour *n = cls; 284 GNUNET_assert (0 < n->ready_window);
336 struct GNUNET_TIME_Relative delay; 285 GNUNET_assert (NULL != n->env);
337 286 n->ready_window--;
338 n->timeout_task = NULL; 287 n->awaiting_done = GNUNET_YES;
339 if (NULL != n->env) 288 GNUNET_MQ_notify_sent (n->env, &notify_send_done, n);
340 { 289 GNUNET_MQ_send (n->h->mq, n->env);
341 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, 290 LOG (GNUNET_ERROR_TYPE_DEBUG,
342 n->env_size + n->traffic_overhead); 291 "Passed message of type %u for neighbour `%s' to TRANSPORT.\n",
343 n->env = NULL; 292 ntohs (GNUNET_MQ_env_get_msg (n->env)->type),
344 n->traffic_overhead = 0; 293 GNUNET_i2s (&n->id));
345 }
346 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
347 128);
348 if (0 == delay.rel_value_us)
349 {
350 n->is_ready = GNUNET_YES;
351 GNUNET_MQ_impl_send_continue (n->mq);
352 return;
353 }
354 GNUNET_MQ_impl_send_in_flight (n->mq);
355 /* cannot send even a small message without violating
356 quota, wait a before allowing MQ to send next message */
357 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
358 &notify_send_done_fin,
359 n);
360} 294}
361 295
362 296
@@ -376,11 +310,9 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
376 void *impl_state) 310 void *impl_state)
377{ 311{
378 struct Neighbour *n = impl_state; 312 struct Neighbour *n = impl_state;
379 struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
380 struct OutboundMessage *obm; 313 struct OutboundMessage *obm;
381 uint16_t msize; 314 uint16_t msize;
382 315
383 GNUNET_assert (GNUNET_YES == n->is_ready);
384 msize = ntohs (msg->size); 316 msize = ntohs (msg->size);
385 if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm)) 317 if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm))
386 { 318 {
@@ -388,25 +320,24 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
388 GNUNET_MQ_impl_send_continue (mq); 320 GNUNET_MQ_impl_send_continue (mq);
389 return; 321 return;
390 } 322 }
391 GNUNET_assert (NULL == n->env);
392 n->env = GNUNET_MQ_msg_nested_mh (obm,
393 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
394 msg);
395 obm->reserved = htonl (0);
396 obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
397 obm->peer = n->id;
398 GNUNET_assert (NULL == n->timeout_task);
399 n->is_ready = GNUNET_NO;
400 n->env_size = ntohs (msg->size);
401 GNUNET_MQ_notify_sent (n->env,
402 &notify_send_done,
403 n);
404 GNUNET_MQ_send (h->mq,
405 n->env);
406 LOG (GNUNET_ERROR_TYPE_DEBUG, 323 LOG (GNUNET_ERROR_TYPE_DEBUG,
407 "Queued message of type %u for neighbour `%s'.\n", 324 "CORE requested transmission of message of type %u to neighbour `%s'.\n",
408 ntohs (msg->type), 325 ntohs (msg->type),
409 GNUNET_i2s (&n->id)); 326 GNUNET_i2s (&n->id));
327
328 GNUNET_assert (NULL == n->env);
329 n->env =
330 GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg);
331 n->env_size = ntohs (msg->size);
332 obm->reserved = htonl (0);
333 obm->peer = n->id;
334 if (0 == n->ready_window)
335 {
336 LOG (GNUNET_ERROR_TYPE_DEBUG,
337 "Flow control delays transmission to CORE until we see SEND_OK.\n");
338 return; /* can't send yet, need to wait for SEND_OK */
339 }
340 do_send (n);
410} 341}
411 342
412 343
@@ -418,8 +349,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
418 * @param impl_state state of the implementation 349 * @param impl_state state of the implementation
419 */ 350 */
420static void 351static void
421mq_destroy_impl (struct GNUNET_MQ_Handle *mq, 352mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
422 void *impl_state)
423{ 353{
424 struct Neighbour *n = impl_state; 354 struct Neighbour *n = impl_state;
425 355
@@ -436,19 +366,22 @@ mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
436 * @param impl_state state specific to the implementation 366 * @param impl_state state specific to the implementation
437 */ 367 */
438static void 368static void
439mq_cancel_impl (struct GNUNET_MQ_Handle *mq, 369mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
440 void *impl_state)
441{ 370{
442 struct Neighbour *n = impl_state; 371 struct Neighbour *n = impl_state;
443 372
444 GNUNET_assert (GNUNET_NO == n->is_ready); 373 n->ready_window++;
445 if (NULL != n->env) 374 if (GNUNET_YES == n->awaiting_done)
446 { 375 {
447 GNUNET_MQ_send_cancel (n->env); 376 GNUNET_MQ_send_cancel (n->env);
448 n->env = NULL; 377 n->env = NULL;
378 n->awaiting_done = GNUNET_NO;
379 }
380 else
381 {
382 GNUNET_assert (0 == n->ready_window);
383 n->env = NULL;
449 } 384 }
450
451 n->is_ready = GNUNET_YES;
452} 385}
453 386
454 387
@@ -461,8 +394,7 @@ mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
461 * @param error error code 394 * @param error error code
462 */ 395 */
463static void 396static void
464peer_mq_error_handler (void *cls, 397peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
465 enum GNUNET_MQ_Error error)
466{ 398{
467 /* struct Neighbour *n = cls; */ 399 /* struct Neighbour *n = cls; */
468 400
@@ -471,47 +403,21 @@ peer_mq_error_handler (void *cls,
471 403
472 404
473/** 405/**
474 * The outbound quota has changed in a way that may require
475 * us to reset the timeout. Update the timeout.
476 *
477 * @param cls the `struct Neighbour` for which the timeout changed
478 */
479static void
480outbound_bw_tracker_update (void *cls)
481{
482 struct Neighbour *n = cls;
483 struct GNUNET_TIME_Relative delay;
484
485 if (NULL == n->timeout_task)
486 return;
487 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
488 128);
489 GNUNET_SCHEDULER_cancel (n->timeout_task);
490 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
491 &notify_send_done,
492 n);
493}
494
495
496/**
497 * Function we use for handling incoming connect messages. 406 * Function we use for handling incoming connect messages.
498 * 407 *
499 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *` 408 * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
500 * @param cim message received 409 * @param cim message received
501 */ 410 */
502static void 411static void
503handle_connect (void *cls, 412handle_connect (void *cls, const struct ConnectInfoMessage *cim)
504 const struct ConnectInfoMessage *cim)
505{ 413{
506 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 414 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
507 struct Neighbour *n; 415 struct Neighbour *n;
508 416
509 LOG (GNUNET_ERROR_TYPE_DEBUG, 417 LOG (GNUNET_ERROR_TYPE_DEBUG,
510 "Receiving CONNECT message for `%s' with quota %u\n", 418 "Receiving CONNECT message for `%s'\n",
511 GNUNET_i2s (&cim->id), 419 GNUNET_i2s (&cim->id));
512 ntohl (cim->quota_out.value__)); 420 n = neighbour_find (h, &cim->id);
513 n = neighbour_find (h,
514 &cim->id);
515 if (NULL != n) 421 if (NULL != n)
516 { 422 {
517 GNUNET_break (0); 423 GNUNET_break (0);
@@ -521,23 +427,14 @@ handle_connect (void *cls,
521 n = GNUNET_new (struct Neighbour); 427 n = GNUNET_new (struct Neighbour);
522 n->id = cim->id; 428 n->id = cim->id;
523 n->h = h; 429 n->h = h;
524 n->is_ready = GNUNET_YES; 430 n->ready_window = SEND_WINDOW_SIZE;
525 n->traffic_overhead = 0;
526 GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
527 &outbound_bw_tracker_update,
528 n,
529 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
530 MAX_BANDWIDTH_CARRY_S,
531 &notify_excess_cb,
532 n);
533 GNUNET_assert (GNUNET_OK == 431 GNUNET_assert (GNUNET_OK ==
534 GNUNET_CONTAINER_multipeermap_put (h->neighbours, 432 GNUNET_CONTAINER_multipeermap_put (
535 &n->id, 433 h->neighbours,
536 n, 434 &n->id,
537 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 435 n,
436 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
538 437
539 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
540 cim->quota_out);
541 n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl, 438 n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
542 &mq_destroy_impl, 439 &mq_destroy_impl,
543 &mq_cancel_impl, 440 &mq_cancel_impl,
@@ -547,11 +444,8 @@ handle_connect (void *cls,
547 n); 444 n);
548 if (NULL != h->nc_cb) 445 if (NULL != h->nc_cb)
549 { 446 {
550 n->handlers_cls = h->nc_cb (h->cls, 447 n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq);
551 &n->id, 448 GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls);
552 n->mq);
553 GNUNET_MQ_set_handlers_closure (n->mq,
554 n->handlers_cls);
555 } 449 }
556} 450}
557 451
@@ -563,8 +457,7 @@ handle_connect (void *cls,
563 * @param dim message received 457 * @param dim message received
564 */ 458 */
565static void 459static void
566handle_disconnect (void *cls, 460handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim)
567 const struct DisconnectInfoMessage *dim)
568{ 461{
569 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 462 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
570 struct Neighbour *n; 463 struct Neighbour *n;
@@ -573,18 +466,14 @@ handle_disconnect (void *cls,
573 LOG (GNUNET_ERROR_TYPE_DEBUG, 466 LOG (GNUNET_ERROR_TYPE_DEBUG,
574 "Receiving DISCONNECT message for `%s'.\n", 467 "Receiving DISCONNECT message for `%s'.\n",
575 GNUNET_i2s (&dim->peer)); 468 GNUNET_i2s (&dim->peer));
576 n = neighbour_find (h, 469 n = neighbour_find (h, &dim->peer);
577 &dim->peer);
578 if (NULL == n) 470 if (NULL == n)
579 { 471 {
580 GNUNET_break (0); 472 GNUNET_break (0);
581 disconnect_and_schedule_reconnect (h); 473 disconnect_and_schedule_reconnect (h);
582 return; 474 return;
583 } 475 }
584 GNUNET_assert (GNUNET_YES == 476 GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n));
585 neighbour_delete (h,
586 &dim->peer,
587 n));
588} 477}
589 478
590 479
@@ -595,24 +484,15 @@ handle_disconnect (void *cls,
595 * @param okm message received 484 * @param okm message received
596 */ 485 */
597static void 486static void
598handle_send_ok (void *cls, 487handle_send_ok (void *cls, const struct SendOkMessage *okm)
599 const struct SendOkMessage *okm)
600{ 488{
601 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 489 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
602 struct Neighbour *n; 490 struct Neighbour *n;
603 uint16_t bytes_msg;
604 uint32_t bytes_physical;
605 491
606 bytes_msg = ntohs (okm->bytes_msg);
607 bytes_physical = ntohl (okm->bytes_physical);
608 LOG (GNUNET_ERROR_TYPE_DEBUG, 492 LOG (GNUNET_ERROR_TYPE_DEBUG,
609 "Receiving SEND_OK message, transmission to %s %s.\n", 493 "Receiving SEND_OK message for transmission to %s\n",
610 GNUNET_i2s (&okm->peer), 494 GNUNET_i2s (&okm->peer));
611 (GNUNET_OK == ntohs (okm->success)) 495 n = neighbour_find (h, &okm->peer);
612 ? "succeeded"
613 : "failed");
614 n = neighbour_find (h,
615 &okm->peer);
616 if (NULL == n) 496 if (NULL == n)
617 { 497 {
618 /* We should never get a 'SEND_OK' for a peer that we are not 498 /* We should never get a 'SEND_OK' for a peer that we are not
@@ -621,14 +501,9 @@ handle_send_ok (void *cls,
621 disconnect_and_schedule_reconnect (h); 501 disconnect_and_schedule_reconnect (h);
622 return; 502 return;
623 } 503 }
624 if (bytes_physical > bytes_msg) 504 n->ready_window++;
625 { 505 if ((NULL != n->env) && (1 == n->ready_window))
626 LOG (GNUNET_ERROR_TYPE_DEBUG, 506 do_send (n);
627 "Overhead for %u byte message was %u\n",
628 (unsigned int) bytes_msg,
629 (unsigned int) (bytes_physical - bytes_msg));
630 n->traffic_overhead += bytes_physical - bytes_msg;
631 }
632} 507}
633 508
634 509
@@ -639,8 +514,7 @@ handle_send_ok (void *cls,
639 * @param im message received 514 * @param im message received
640 */ 515 */
641static int 516static int
642check_recv (void *cls, 517check_recv (void *cls, const struct InboundMessage *im)
643 const struct InboundMessage *im)
644{ 518{
645 const struct GNUNET_MessageHeader *imm; 519 const struct GNUNET_MessageHeader *imm;
646 uint16_t size; 520 uint16_t size;
@@ -668,12 +542,11 @@ check_recv (void *cls,
668 * @param im message received 542 * @param im message received
669 */ 543 */
670static void 544static void
671handle_recv (void *cls, 545handle_recv (void *cls, const struct InboundMessage *im)
672 const struct InboundMessage *im)
673{ 546{
674 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 547 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
675 const struct GNUNET_MessageHeader *imm 548 const struct GNUNET_MessageHeader *imm =
676 = (const struct GNUNET_MessageHeader *) &im[1]; 549 (const struct GNUNET_MessageHeader *) &im[1];
677 struct Neighbour *n; 550 struct Neighbour *n;
678 551
679 LOG (GNUNET_ERROR_TYPE_DEBUG, 552 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -681,46 +554,14 @@ handle_recv (void *cls,
681 (unsigned int) ntohs (imm->type), 554 (unsigned int) ntohs (imm->type),
682 (unsigned int) ntohs (imm->size), 555 (unsigned int) ntohs (imm->size),
683 GNUNET_i2s (&im->peer)); 556 GNUNET_i2s (&im->peer));
684 n = neighbour_find (h, 557 n = neighbour_find (h, &im->peer);
685 &im->peer);
686 if (NULL == n)
687 {
688 GNUNET_break (0);
689 disconnect_and_schedule_reconnect (h);
690 return;
691 }
692 GNUNET_MQ_inject_message (n->mq,
693 imm);
694}
695
696
697/**
698 * Function we use for handling incoming set quota messages.
699 *
700 * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
701 * @param msg message received
702 */
703static void
704handle_set_quota (void *cls,
705 const struct QuotaSetMessage *qm)
706{
707 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
708 struct Neighbour *n;
709
710 n = neighbour_find (h,
711 &qm->peer);
712 if (NULL == n) 558 if (NULL == n)
713 { 559 {
714 GNUNET_break (0); 560 GNUNET_break (0);
715 disconnect_and_schedule_reconnect (h); 561 disconnect_and_schedule_reconnect (h);
716 return; 562 return;
717 } 563 }
718 LOG (GNUNET_ERROR_TYPE_DEBUG, 564 GNUNET_MQ_inject_message (n->mq, imm);
719 "Receiving SET_QUOTA message for `%s' with quota %u\n",
720 GNUNET_i2s (&qm->peer),
721 (unsigned int) ntohl (qm->quota.value__));
722 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
723 qm->quota);
724} 565}
725 566
726 567
@@ -733,46 +574,36 @@ static void
733reconnect (void *cls) 574reconnect (void *cls)
734{ 575{
735 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 576 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
736 struct GNUNET_MQ_MessageHandler handlers[] = { 577 struct GNUNET_MQ_MessageHandler handlers[] =
737 GNUNET_MQ_hd_fixed_size (connect, 578 {GNUNET_MQ_hd_fixed_size (connect,
738 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, 579 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
739 struct ConnectInfoMessage, 580 struct ConnectInfoMessage,
740 h), 581 h),
741 GNUNET_MQ_hd_fixed_size (disconnect, 582 GNUNET_MQ_hd_fixed_size (disconnect,
742 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, 583 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
743 struct DisconnectInfoMessage, 584 struct DisconnectInfoMessage,
744 h), 585 h),
745 GNUNET_MQ_hd_fixed_size (send_ok, 586 GNUNET_MQ_hd_fixed_size (send_ok,
746 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, 587 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
747 struct SendOkMessage, 588 struct SendOkMessage,
748 h), 589 h),
749 GNUNET_MQ_hd_var_size (recv, 590 GNUNET_MQ_hd_var_size (recv,
750 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, 591 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
751 struct InboundMessage, 592 struct InboundMessage,
752 h), 593 h),
753 GNUNET_MQ_hd_fixed_size (set_quota, 594 GNUNET_MQ_handler_end ()};
754 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
755 struct QuotaSetMessage,
756 h),
757 GNUNET_MQ_handler_end ()
758 };
759 struct GNUNET_MQ_Envelope *env; 595 struct GNUNET_MQ_Envelope *env;
760 struct StartMessage *s; 596 struct StartMessage *s;
761 uint32_t options; 597 uint32_t options;
762 598
763 h->reconnect_task = NULL; 599 h->reconnect_task = NULL;
764 LOG (GNUNET_ERROR_TYPE_DEBUG, 600 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
765 "Connecting to transport service.\n");
766 GNUNET_assert (NULL == h->mq); 601 GNUNET_assert (NULL == h->mq);
767 h->mq = GNUNET_CLIENT_connect (h->cfg, 602 h->mq =
768 "transport", 603 GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h);
769 handlers,
770 &mq_error_handler,
771 h);
772 if (NULL == h->mq) 604 if (NULL == h->mq)
773 return; 605 return;
774 env = GNUNET_MQ_msg (s, 606 env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START);
775 GNUNET_MESSAGE_TYPE_TRANSPORT_START);
776 options = 0; 607 options = 0;
777 if (h->check_self) 608 if (h->check_self)
778 options |= 1; 609 options |= 1;
@@ -780,8 +611,7 @@ reconnect (void *cls)
780 options |= 2; 611 options |= 2;
781 s->options = htonl (options); 612 s->options = htonl (options);
782 s->self = h->self; 613 s->self = h->self;
783 GNUNET_MQ_send (h->mq, 614 GNUNET_MQ_send (h->mq, env);
784 env);
785} 615}
786 616
787 617
@@ -793,9 +623,7 @@ reconnect (void *cls)
793static void 623static void
794disconnect (struct GNUNET_TRANSPORT_CoreHandle *h) 624disconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
795{ 625{
796 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, 626 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h);
797 &neighbour_delete,
798 h);
799 if (NULL != h->mq) 627 if (NULL != h->mq)
800 { 628 {
801 GNUNET_MQ_destroy (h->mq); 629 GNUNET_MQ_destroy (h->mq);
@@ -817,12 +645,9 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
817 disconnect (h); 645 disconnect (h);
818 LOG (GNUNET_ERROR_TYPE_DEBUG, 646 LOG (GNUNET_ERROR_TYPE_DEBUG,
819 "Scheduling task to reconnect to transport service in %s.\n", 647 "Scheduling task to reconnect to transport service in %s.\n",
820 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, 648 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
821 GNUNET_YES));
822 h->reconnect_task = 649 h->reconnect_task =
823 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, 650 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
824 &reconnect,
825 h);
826 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); 651 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
827} 652}
828 653
@@ -840,8 +665,7 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
840{ 665{
841 struct Neighbour *n; 666 struct Neighbour *n;
842 667
843 n = neighbour_find (handle, 668 n = neighbour_find (handle, peer);
844 peer);
845 if (NULL == n) 669 if (NULL == n)
846 return NULL; 670 return NULL;
847 return n->mq; 671 return n->mq;
@@ -849,6 +673,45 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
849 673
850 674
851/** 675/**
676 * Notification from the CORE service to the TRANSPORT service
677 * that the CORE service has finished processing a message from
678 * TRANSPORT (via the @code{handlers} of #GNUNET_TRANSPORT_core_connect())
679 * and that it is thus now OK for TRANSPORT to send more messages
680 * for @a pid.
681 *
682 * Used to provide flow control, this is our equivalent to
683 * #GNUNET_SERVICE_client_continue() of an ordinary service.
684 *
685 * Note that due to the use of a window, TRANSPORT may send multiple
686 * messages destined for the same peer even without an intermediate
687 * call to this function. However, CORE must still call this function
688 * once per message received, as otherwise eventually the window will
689 * be full and TRANSPORT will stop providing messages to CORE for @a
690 * pid.
691 *
692 * @param ch core handle
693 * @param pid which peer was the message from that was fully processed by CORE
694 */
695void
696GNUNET_TRANSPORT_core_receive_continue (struct GNUNET_TRANSPORT_CoreHandle *ch,
697 const struct GNUNET_PeerIdentity *pid)
698{
699 struct GNUNET_MQ_Envelope *env;
700 struct RecvOkMessage *rok;
701
702 LOG (GNUNET_ERROR_TYPE_DEBUG,
703 "Message for %s finished CORE processing, sending RECV_OK.\n",
704 GNUNET_i2s (pid));
705 if (NULL == ch->mq)
706 return;
707 env = GNUNET_MQ_msg (rok, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK);
708 rok->increase_window_delta = htonl (1);
709 rok->peer = *pid;
710 GNUNET_MQ_send (ch->mq, env);
711}
712
713
714/**
852 * Connect to the transport service. Note that the connection may 715 * Connect to the transport service. Note that the connection may
853 * complete (or fail) asynchronously. 716 * complete (or fail) asynchronously.
854 * 717 *
@@ -859,17 +722,15 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
859 * @param rec receive function to call 722 * @param rec receive function to call
860 * @param nc function to call on connect events 723 * @param nc function to call on connect events
861 * @param nd function to call on disconnect events 724 * @param nd function to call on disconnect events
862 * @param neb function to call if we have excess bandwidth to a peer
863 * @return NULL on error 725 * @return NULL on error
864 */ 726 */
865struct GNUNET_TRANSPORT_CoreHandle * 727struct GNUNET_TRANSPORT_CoreHandle *
866GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, 728GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
867 const struct GNUNET_PeerIdentity *self, 729 const struct GNUNET_PeerIdentity *self,
868 const struct GNUNET_MQ_MessageHandler *handlers, 730 const struct GNUNET_MQ_MessageHandler *handlers,
869 void *cls, 731 void *cls,
870 GNUNET_TRANSPORT_NotifyConnect nc, 732 GNUNET_TRANSPORT_NotifyConnect nc,
871 GNUNET_TRANSPORT_NotifyDisconnect nd, 733 GNUNET_TRANSPORT_NotifyDisconnect nd)
872 GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
873{ 734{
874 struct GNUNET_TRANSPORT_CoreHandle *h; 735 struct GNUNET_TRANSPORT_CoreHandle *h;
875 unsigned int i; 736 unsigned int i;
@@ -884,19 +745,17 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
884 h->cls = cls; 745 h->cls = cls;
885 h->nc_cb = nc; 746 h->nc_cb = nc;
886 h->nd_cb = nd; 747 h->nd_cb = nd;
887 h->neb_cb = neb;
888 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 748 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
889 if (NULL != handlers) 749 if (NULL != handlers)
890 { 750 {
891 for (i=0;NULL != handlers[i].cb; i++) ; 751 for (i = 0; NULL != handlers[i].cb; i++)
892 h->handlers = GNUNET_new_array (i + 1, 752 ;
893 struct GNUNET_MQ_MessageHandler); 753 h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler);
894 GNUNET_memcpy (h->handlers, 754 GNUNET_memcpy (h->handlers,
895 handlers, 755 handlers,
896 i * sizeof (struct GNUNET_MQ_MessageHandler)); 756 i * sizeof (struct GNUNET_MQ_MessageHandler));
897 } 757 }
898 LOG (GNUNET_ERROR_TYPE_DEBUG, 758 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n");
899 "Connecting to transport service\n");
900 reconnect (h); 759 reconnect (h);
901 if (NULL == h->mq) 760 if (NULL == h->mq)
902 { 761 {
@@ -905,8 +764,7 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
905 return NULL; 764 return NULL;
906 } 765 }
907 h->neighbours = 766 h->neighbours =
908 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, 767 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
909 GNUNET_YES);
910 return h; 768 return h;
911} 769}
912 770
@@ -914,13 +772,13 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
914/** 772/**
915 * Disconnect from the transport service. 773 * Disconnect from the transport service.
916 * 774 *
917 * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect() 775 * @param handle handle to the service as returned from
776 * #GNUNET_TRANSPORT_core_connect()
918 */ 777 */
919void 778void
920GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle) 779GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
921{ 780{
922 LOG (GNUNET_ERROR_TYPE_DEBUG, 781 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
923 "Transport disconnect called!\n");
924 /* this disconnects all neighbours... */ 782 /* this disconnects all neighbours... */
925 disconnect (handle); 783 disconnect (handle);
926 /* and now we stop trying to connect again... */ 784 /* and now we stop trying to connect again... */