aboutsummaryrefslogtreecommitdiff
path: root/src/transport/transport_api_core.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/transport_api_core.c')
-rw-r--r--src/transport/transport_api_core.c600
1 files changed, 301 insertions, 299 deletions
diff --git a/src/transport/transport_api_core.c b/src/transport/transport_api_core.c
index d88461d5a..5d841fa10 100644
--- a/src/transport/transport_api_core.c
+++ b/src/transport/transport_api_core.c
@@ -32,7 +32,7 @@
32#include "gnunet_transport_service.h" 32#include "gnunet_transport_service.h"
33#include "transport.h" 33#include "transport.h"
34 34
35#define LOG(kind, ...) GNUNET_log_from(kind, "transport-api-core", __VA_ARGS__) 35#define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__)
36 36
37/** 37/**
38 * If we could not send any payload to a peer for this amount of 38 * If we could not send any payload to a peer for this amount of
@@ -49,7 +49,8 @@
49/** 49/**
50 * Entry in hash table of all of our current (connected) neighbours. 50 * Entry in hash table of all of our current (connected) neighbours.
51 */ 51 */
52struct Neighbour { 52struct Neighbour
53{
53 /** 54 /**
54 * Overall transport handle. 55 * Overall transport handle.
55 */ 56 */
@@ -118,7 +119,8 @@ struct Neighbour {
118 * Handle for the transport service (includes all of the 119 * Handle for the transport service (includes all of the
119 * state for the transport service). 120 * state for the transport service).
120 */ 121 */
121struct GNUNET_TRANSPORT_CoreHandle { 122struct GNUNET_TRANSPORT_CoreHandle
123{
122 /** 124 /**
123 * Closure for the callbacks. 125 * Closure for the callbacks.
124 */ 126 */
@@ -200,7 +202,7 @@ struct GNUNET_TRANSPORT_CoreHandle {
200 * @param h transport service to reconnect 202 * @param h transport service to reconnect
201 */ 203 */
202static void 204static void
203disconnect_and_schedule_reconnect(struct GNUNET_TRANSPORT_CoreHandle *h); 205disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h);
204 206
205 207
206/** 208/**
@@ -211,10 +213,10 @@ disconnect_and_schedule_reconnect(struct GNUNET_TRANSPORT_CoreHandle *h);
211 * @return NULL if no such peer entry exists 213 * @return NULL if no such peer entry exists
212 */ 214 */
213static struct Neighbour * 215static struct Neighbour *
214neighbour_find(struct GNUNET_TRANSPORT_CoreHandle *h, 216neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
215 const struct GNUNET_PeerIdentity *peer) 217 const struct GNUNET_PeerIdentity *peer)
216{ 218{
217 return GNUNET_CONTAINER_multipeermap_get(h->neighbours, peer); 219 return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer);
218} 220}
219 221
220 222
@@ -225,17 +227,17 @@ neighbour_find(struct GNUNET_TRANSPORT_CoreHandle *h,
225 * @param cls the `struct Neighbour` that has excess bandwidth 227 * @param cls the `struct Neighbour` that has excess bandwidth
226 */ 228 */
227static void 229static void
228notify_excess_cb(void *cls) 230notify_excess_cb (void *cls)
229{ 231{
230 struct Neighbour *n = cls; 232 struct Neighbour *n = cls;
231 struct GNUNET_TRANSPORT_CoreHandle *h = n->h; 233 struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
232 234
233 LOG(GNUNET_ERROR_TYPE_DEBUG, 235 LOG (GNUNET_ERROR_TYPE_DEBUG,
234 "Notifying CORE that more bandwidth is available for %s\n", 236 "Notifying CORE that more bandwidth is available for %s\n",
235 GNUNET_i2s(&n->id)); 237 GNUNET_i2s (&n->id));
236 238
237 if (NULL != h->neb_cb) 239 if (NULL != h->neb_cb)
238 h->neb_cb(h->cls, &n->id, n->handlers_cls); 240 h->neb_cb (h->cls, &n->id, n->handlers_cls);
239} 241}
240 242
241 243
@@ -250,33 +252,33 @@ notify_excess_cb(void *cls)
250 * #GNUNET_NO if not. 252 * #GNUNET_NO if not.
251 */ 253 */
252static int 254static int
253neighbour_delete(void *cls, const struct GNUNET_PeerIdentity *key, void *value) 255neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
254{ 256{
255 struct GNUNET_TRANSPORT_CoreHandle *handle = cls; 257 struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
256 struct Neighbour *n = value; 258 struct Neighbour *n = value;
257 259
258 LOG(GNUNET_ERROR_TYPE_DEBUG, 260 LOG (GNUNET_ERROR_TYPE_DEBUG,
259 "Dropping entry for neighbour `%s'.\n", 261 "Dropping entry for neighbour `%s'.\n",
260 GNUNET_i2s(key)); 262 GNUNET_i2s (key));
261 GNUNET_BANDWIDTH_tracker_notification_stop(&n->out_tracker); 263 GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
262 if (NULL != handle->nd_cb) 264 if (NULL != handle->nd_cb)
263 handle->nd_cb(handle->cls, &n->id, n->handlers_cls); 265 handle->nd_cb (handle->cls, &n->id, n->handlers_cls);
264 if (NULL != n->timeout_task) 266 if (NULL != n->timeout_task)
265 { 267 {
266 GNUNET_SCHEDULER_cancel(n->timeout_task); 268 GNUNET_SCHEDULER_cancel (n->timeout_task);
267 n->timeout_task = NULL; 269 n->timeout_task = NULL;
268 } 270 }
269 if (NULL != n->env) 271 if (NULL != n->env)
270 { 272 {
271 GNUNET_MQ_send_cancel(n->env); 273 GNUNET_MQ_send_cancel (n->env);
272 n->env = NULL; 274 n->env = NULL;
273 } 275 }
274 GNUNET_MQ_destroy(n->mq); 276 GNUNET_MQ_destroy (n->mq);
275 GNUNET_assert(NULL == n->mq); 277 GNUNET_assert (NULL == n->mq);
276 GNUNET_assert( 278 GNUNET_assert (
277 GNUNET_YES == 279 GNUNET_YES ==
278 GNUNET_CONTAINER_multipeermap_remove(handle->neighbours, key, n)); 280 GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n));
279 GNUNET_free(n); 281 GNUNET_free (n);
280 return GNUNET_YES; 282 return GNUNET_YES;
281} 283}
282 284
@@ -291,14 +293,14 @@ neighbour_delete(void *cls, const struct GNUNET_PeerIdentity *key, void *value)
291 * @param error error code 293 * @param error error code
292 */ 294 */
293static void 295static void
294mq_error_handler(void *cls, enum GNUNET_MQ_Error error) 296mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
295{ 297{
296 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 298 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
297 299
298 LOG(GNUNET_ERROR_TYPE_ERROR, 300 LOG (GNUNET_ERROR_TYPE_ERROR,
299 "Error receiving from transport service (%d), disconnecting temporarily.\n", 301 "Error receiving from transport service (%d), disconnecting temporarily.\n",
300 error); 302 error);
301 disconnect_and_schedule_reconnect(h); 303 disconnect_and_schedule_reconnect (h);
302} 304}
303 305
304 306
@@ -310,16 +312,16 @@ mq_error_handler(void *cls, enum GNUNET_MQ_Error error)
310 * @return #GNUNET_OK if message is well-formed 312 * @return #GNUNET_OK if message is well-formed
311 */ 313 */
312static int 314static int
313check_hello(void *cls, const struct GNUNET_MessageHeader *msg) 315check_hello (void *cls, const struct GNUNET_MessageHeader *msg)
314{ 316{
315 struct GNUNET_PeerIdentity me; 317 struct GNUNET_PeerIdentity me;
316 318
317 if (GNUNET_OK != 319 if (GNUNET_OK !=
318 GNUNET_HELLO_get_id((const struct GNUNET_HELLO_Message *)msg, &me)) 320 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, &me))
319 { 321 {
320 GNUNET_break(0); 322 GNUNET_break (0);
321 return GNUNET_SYSERR; 323 return GNUNET_SYSERR;
322 } 324 }
323 return GNUNET_OK; 325 return GNUNET_OK;
324} 326}
325 327
@@ -331,7 +333,7 @@ check_hello(void *cls, const struct GNUNET_MessageHeader *msg)
331 * @param msg message received 333 * @param msg message received
332 */ 334 */
333static void 335static void
334handle_hello(void *cls, const struct GNUNET_MessageHeader *msg) 336handle_hello (void *cls, const struct GNUNET_MessageHeader *msg)
335{ 337{
336 /* we do not care => FIXME: signal in options to NEVER send HELLOs! */ 338 /* we do not care => FIXME: signal in options to NEVER send HELLOs! */
337} 339}
@@ -346,13 +348,13 @@ handle_hello(void *cls, const struct GNUNET_MessageHeader *msg)
346 * @param cls the `struct Neighbour` where the message was sent 348 * @param cls the `struct Neighbour` where the message was sent
347 */ 349 */
348static void 350static void
349notify_send_done_fin(void *cls) 351notify_send_done_fin (void *cls)
350{ 352{
351 struct Neighbour *n = cls; 353 struct Neighbour *n = cls;
352 354
353 n->timeout_task = NULL; 355 n->timeout_task = NULL;
354 n->is_ready = GNUNET_YES; 356 n->is_ready = GNUNET_YES;
355 GNUNET_MQ_impl_send_continue(n->mq); 357 GNUNET_MQ_impl_send_continue (n->mq);
356} 358}
357 359
358 360
@@ -365,31 +367,31 @@ notify_send_done_fin(void *cls)
365 * @param cls the `struct Neighbour` where the message was sent 367 * @param cls the `struct Neighbour` where the message was sent
366 */ 368 */
367static void 369static void
368notify_send_done(void *cls) 370notify_send_done (void *cls)
369{ 371{
370 struct Neighbour *n = cls; 372 struct Neighbour *n = cls;
371 struct GNUNET_TIME_Relative delay; 373 struct GNUNET_TIME_Relative delay;
372 374
373 n->timeout_task = NULL; 375 n->timeout_task = NULL;
374 if (NULL != n->env) 376 if (NULL != n->env)
375 { 377 {
376 GNUNET_BANDWIDTH_tracker_consume(&n->out_tracker, 378 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
377 n->env_size + n->traffic_overhead); 379 n->env_size + n->traffic_overhead);
378 n->env = NULL; 380 n->env = NULL;
379 n->traffic_overhead = 0; 381 n->traffic_overhead = 0;
380 } 382 }
381 delay = GNUNET_BANDWIDTH_tracker_get_delay(&n->out_tracker, 128); 383 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128);
382 if (0 == delay.rel_value_us) 384 if (0 == delay.rel_value_us)
383 { 385 {
384 n->is_ready = GNUNET_YES; 386 n->is_ready = GNUNET_YES;
385 GNUNET_MQ_impl_send_continue(n->mq); 387 GNUNET_MQ_impl_send_continue (n->mq);
386 return; 388 return;
387 } 389 }
388 GNUNET_MQ_impl_send_in_flight(n->mq); 390 GNUNET_MQ_impl_send_in_flight (n->mq);
389 /* cannot send even a small message without violating 391 /* cannot send even a small message without violating
390 quota, wait a before allowing MQ to send next message */ 392 quota, wait a before allowing MQ to send next message */
391 n->timeout_task = 393 n->timeout_task =
392 GNUNET_SCHEDULER_add_delayed(delay, &notify_send_done_fin, n); 394 GNUNET_SCHEDULER_add_delayed (delay, &notify_send_done_fin, n);
393} 395}
394 396
395 397
@@ -404,44 +406,44 @@ notify_send_done(void *cls)
404 * @param impl_state state of the implementation 406 * @param impl_state state of the implementation
405 */ 407 */
406static void 408static void
407mq_send_impl(struct GNUNET_MQ_Handle *mq, 409mq_send_impl (struct GNUNET_MQ_Handle *mq,
408 const struct GNUNET_MessageHeader *msg, 410 const struct GNUNET_MessageHeader *msg,
409 void *impl_state) 411 void *impl_state)
410{ 412{
411 struct Neighbour *n = impl_state; 413 struct Neighbour *n = impl_state;
412 struct GNUNET_TRANSPORT_CoreHandle *h = n->h; 414 struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
413 struct OutboundMessage *obm; 415 struct OutboundMessage *obm;
414 uint16_t msize; 416 uint16_t msize;
415 417
416 GNUNET_assert(GNUNET_YES == n->is_ready); 418 GNUNET_assert (GNUNET_YES == n->is_ready);
417 msize = ntohs(msg->size); 419 msize = ntohs (msg->size);
418 if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof(*obm)) 420 if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof(*obm))
419 { 421 {
420 GNUNET_break(0); 422 GNUNET_break (0);
421 GNUNET_MQ_impl_send_continue(mq); 423 GNUNET_MQ_impl_send_continue (mq);
422 return; 424 return;
423 } 425 }
424 GNUNET_assert(NULL == n->env); 426 GNUNET_assert (NULL == n->env);
425 n->env = 427 n->env =
426 GNUNET_MQ_msg_nested_mh(obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg); 428 GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg);
427 { 429 {
428 struct GNUNET_MQ_Envelope *env; 430 struct GNUNET_MQ_Envelope *env;
429 431
430 env = GNUNET_MQ_get_current_envelope(mq); 432 env = GNUNET_MQ_get_current_envelope (mq);
431 obm->priority = htonl((uint32_t)GNUNET_MQ_env_get_options(env)); 433 obm->priority = htonl ((uint32_t) GNUNET_MQ_env_get_options (env));
432 } 434 }
433 obm->timeout = GNUNET_TIME_relative_hton( 435 obm->timeout = GNUNET_TIME_relative_hton (
434 GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */ 436 GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
435 obm->peer = n->id; 437 obm->peer = n->id;
436 GNUNET_assert(NULL == n->timeout_task); 438 GNUNET_assert (NULL == n->timeout_task);
437 n->is_ready = GNUNET_NO; 439 n->is_ready = GNUNET_NO;
438 n->env_size = ntohs(msg->size); 440 n->env_size = ntohs (msg->size);
439 GNUNET_MQ_notify_sent(n->env, &notify_send_done, n); 441 GNUNET_MQ_notify_sent (n->env, &notify_send_done, n);
440 GNUNET_MQ_send(h->mq, n->env); 442 GNUNET_MQ_send (h->mq, n->env);
441 LOG(GNUNET_ERROR_TYPE_DEBUG, 443 LOG (GNUNET_ERROR_TYPE_DEBUG,
442 "Queued message of type %u for neighbour `%s'.\n", 444 "Queued message of type %u for neighbour `%s'.\n",
443 ntohs(msg->type), 445 ntohs (msg->type),
444 GNUNET_i2s(&n->id)); 446 GNUNET_i2s (&n->id));
445} 447}
446 448
447 449
@@ -453,11 +455,11 @@ mq_send_impl(struct GNUNET_MQ_Handle *mq,
453 * @param impl_state state of the implementation 455 * @param impl_state state of the implementation
454 */ 456 */
455static void 457static void
456mq_destroy_impl(struct GNUNET_MQ_Handle *mq, void *impl_state) 458mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
457{ 459{
458 struct Neighbour *n = impl_state; 460 struct Neighbour *n = impl_state;
459 461
460 GNUNET_assert(mq == n->mq); 462 GNUNET_assert (mq == n->mq);
461 n->mq = NULL; 463 n->mq = NULL;
462} 464}
463 465
@@ -470,16 +472,16 @@ mq_destroy_impl(struct GNUNET_MQ_Handle *mq, void *impl_state)
470 * @param impl_state state specific to the implementation 472 * @param impl_state state specific to the implementation
471 */ 473 */
472static void 474static void
473mq_cancel_impl(struct GNUNET_MQ_Handle *mq, void *impl_state) 475mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
474{ 476{
475 struct Neighbour *n = impl_state; 477 struct Neighbour *n = impl_state;
476 478
477 GNUNET_assert(GNUNET_NO == n->is_ready); 479 GNUNET_assert (GNUNET_NO == n->is_ready);
478 if (NULL != n->env) 480 if (NULL != n->env)
479 { 481 {
480 GNUNET_MQ_send_cancel(n->env); 482 GNUNET_MQ_send_cancel (n->env);
481 n->env = NULL; 483 n->env = NULL;
482 } 484 }
483 485
484 n->is_ready = GNUNET_YES; 486 n->is_ready = GNUNET_YES;
485} 487}
@@ -494,11 +496,11 @@ mq_cancel_impl(struct GNUNET_MQ_Handle *mq, void *impl_state)
494 * @param error error code 496 * @param error error code
495 */ 497 */
496static void 498static void
497peer_mq_error_handler(void *cls, enum GNUNET_MQ_Error error) 499peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
498{ 500{
499 /* struct Neighbour *n = cls; */ 501 /* struct Neighbour *n = cls; */
500 502
501 GNUNET_break_op(0); 503 GNUNET_break_op (0);
502} 504}
503 505
504 506
@@ -509,16 +511,16 @@ peer_mq_error_handler(void *cls, enum GNUNET_MQ_Error error)
509 * @param cls the `struct Neighbour` for which the timeout changed 511 * @param cls the `struct Neighbour` for which the timeout changed
510 */ 512 */
511static void 513static void
512outbound_bw_tracker_update(void *cls) 514outbound_bw_tracker_update (void *cls)
513{ 515{
514 struct Neighbour *n = cls; 516 struct Neighbour *n = cls;
515 struct GNUNET_TIME_Relative delay; 517 struct GNUNET_TIME_Relative delay;
516 518
517 if (NULL == n->timeout_task) 519 if (NULL == n->timeout_task)
518 return; 520 return;
519 delay = GNUNET_BANDWIDTH_tracker_get_delay(&n->out_tracker, 128); 521 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128);
520 GNUNET_SCHEDULER_cancel(n->timeout_task); 522 GNUNET_SCHEDULER_cancel (n->timeout_task);
521 n->timeout_task = GNUNET_SCHEDULER_add_delayed(delay, &notify_send_done, n); 523 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, &notify_send_done, n);
522} 524}
523 525
524 526
@@ -529,54 +531,54 @@ outbound_bw_tracker_update(void *cls)
529 * @param cim message received 531 * @param cim message received
530 */ 532 */
531static void 533static void
532handle_connect(void *cls, const struct ConnectInfoMessage *cim) 534handle_connect (void *cls, const struct ConnectInfoMessage *cim)
533{ 535{
534 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 536 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
535 struct Neighbour *n; 537 struct Neighbour *n;
536 538
537 LOG(GNUNET_ERROR_TYPE_DEBUG, 539 LOG (GNUNET_ERROR_TYPE_DEBUG,
538 "Receiving CONNECT message for `%s' with quota %u\n", 540 "Receiving CONNECT message for `%s' with quota %u\n",
539 GNUNET_i2s(&cim->id), 541 GNUNET_i2s (&cim->id),
540 ntohl(cim->quota_out.value__)); 542 ntohl (cim->quota_out.value__));
541 n = neighbour_find(h, &cim->id); 543 n = neighbour_find (h, &cim->id);
542 if (NULL != n) 544 if (NULL != n)
543 { 545 {
544 GNUNET_break(0); /* FIXME: this assertion seems to fail sometimes!? */ 546 GNUNET_break (0); /* FIXME: this assertion seems to fail sometimes!? */
545 disconnect_and_schedule_reconnect(h); 547 disconnect_and_schedule_reconnect (h);
546 return; 548 return;
547 } 549 }
548 n = GNUNET_new(struct Neighbour); 550 n = GNUNET_new (struct Neighbour);
549 n->id = cim->id; 551 n->id = cim->id;
550 n->h = h; 552 n->h = h;
551 n->is_ready = GNUNET_YES; 553 n->is_ready = GNUNET_YES;
552 n->traffic_overhead = 0; 554 n->traffic_overhead = 0;
553 GNUNET_BANDWIDTH_tracker_init2(&n->out_tracker, 555 GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
554 &outbound_bw_tracker_update, 556 &outbound_bw_tracker_update,
555 n, 557 n,
556 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, 558 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
557 MAX_BANDWIDTH_CARRY_S, 559 MAX_BANDWIDTH_CARRY_S,
558 &notify_excess_cb, 560 &notify_excess_cb,
559 n); 561 n);
560 GNUNET_assert(GNUNET_OK == 562 GNUNET_assert (GNUNET_OK ==
561 GNUNET_CONTAINER_multipeermap_put( 563 GNUNET_CONTAINER_multipeermap_put (
562 h->neighbours, 564 h->neighbours,
563 &n->id, 565 &n->id,
564 n, 566 n,
565 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 567 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
566 568
567 GNUNET_BANDWIDTH_tracker_update_quota(&n->out_tracker, cim->quota_out); 569 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, cim->quota_out);
568 n->mq = GNUNET_MQ_queue_for_callbacks(&mq_send_impl, 570 n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
569 &mq_destroy_impl, 571 &mq_destroy_impl,
570 &mq_cancel_impl, 572 &mq_cancel_impl,
571 n, 573 n,
572 h->handlers, 574 h->handlers,
573 &peer_mq_error_handler, 575 &peer_mq_error_handler,
574 n); 576 n);
575 if (NULL != h->nc_cb) 577 if (NULL != h->nc_cb)
576 { 578 {
577 n->handlers_cls = h->nc_cb(h->cls, &n->id, n->mq); 579 n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq);
578 GNUNET_MQ_set_handlers_closure(n->mq, n->handlers_cls); 580 GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls);
579 } 581 }
580} 582}
581 583
582 584
@@ -587,23 +589,23 @@ handle_connect(void *cls, const struct ConnectInfoMessage *cim)
587 * @param dim message received 589 * @param dim message received
588 */ 590 */
589static void 591static void
590handle_disconnect(void *cls, const struct DisconnectInfoMessage *dim) 592handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim)
591{ 593{
592 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 594 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
593 struct Neighbour *n; 595 struct Neighbour *n;
594 596
595 GNUNET_break(ntohl(dim->reserved) == 0); 597 GNUNET_break (ntohl (dim->reserved) == 0);
596 LOG(GNUNET_ERROR_TYPE_DEBUG, 598 LOG (GNUNET_ERROR_TYPE_DEBUG,
597 "Receiving DISCONNECT message for `%s'.\n", 599 "Receiving DISCONNECT message for `%s'.\n",
598 GNUNET_i2s(&dim->peer)); 600 GNUNET_i2s (&dim->peer));
599 n = neighbour_find(h, &dim->peer); 601 n = neighbour_find (h, &dim->peer);
600 if (NULL == n) 602 if (NULL == n)
601 { 603 {
602 GNUNET_break(0); 604 GNUNET_break (0);
603 disconnect_and_schedule_reconnect(h); 605 disconnect_and_schedule_reconnect (h);
604 return; 606 return;
605 } 607 }
606 GNUNET_assert(GNUNET_YES == neighbour_delete(h, &dim->peer, n)); 608 GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n));
607} 609}
608 610
609 611
@@ -614,36 +616,36 @@ handle_disconnect(void *cls, const struct DisconnectInfoMessage *dim)
614 * @param okm message received 616 * @param okm message received
615 */ 617 */
616static void 618static void
617handle_send_ok(void *cls, const struct SendOkMessage *okm) 619handle_send_ok (void *cls, const struct SendOkMessage *okm)
618{ 620{
619 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 621 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
620 struct Neighbour *n; 622 struct Neighbour *n;
621 uint32_t bytes_msg; 623 uint32_t bytes_msg;
622 uint32_t bytes_physical; 624 uint32_t bytes_physical;
623 625
624 bytes_msg = ntohl(okm->bytes_msg); 626 bytes_msg = ntohl (okm->bytes_msg);
625 bytes_physical = ntohl(okm->bytes_physical); 627 bytes_physical = ntohl (okm->bytes_physical);
626 LOG(GNUNET_ERROR_TYPE_DEBUG, 628 LOG (GNUNET_ERROR_TYPE_DEBUG,
627 "Receiving SEND_OK message, transmission to %s %s.\n", 629 "Receiving SEND_OK message, transmission to %s %s.\n",
628 GNUNET_i2s(&okm->peer), 630 GNUNET_i2s (&okm->peer),
629 ntohl(okm->success) == GNUNET_OK ? "succeeded" : "failed"); 631 ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
630 n = neighbour_find(h, &okm->peer); 632 n = neighbour_find (h, &okm->peer);
631 if (NULL == n) 633 if (NULL == n)
632 { 634 {
633 /* We should never get a 'SEND_OK' for a peer that we are not 635 /* We should never get a 'SEND_OK' for a peer that we are not
634 connected to */ 636 connected to */
635 GNUNET_break(0); 637 GNUNET_break (0);
636 disconnect_and_schedule_reconnect(h); 638 disconnect_and_schedule_reconnect (h);
637 return; 639 return;
638 } 640 }
639 if (bytes_physical > bytes_msg) 641 if (bytes_physical > bytes_msg)
640 { 642 {
641 LOG(GNUNET_ERROR_TYPE_DEBUG, 643 LOG (GNUNET_ERROR_TYPE_DEBUG,
642 "Overhead for %u byte message was %u\n", 644 "Overhead for %u byte message was %u\n",
643 bytes_msg, 645 bytes_msg,
644 bytes_physical - bytes_msg); 646 bytes_physical - bytes_msg);
645 n->traffic_overhead += bytes_physical - bytes_msg; 647 n->traffic_overhead += bytes_physical - bytes_msg;
646 } 648 }
647} 649}
648 650
649 651
@@ -654,23 +656,23 @@ handle_send_ok(void *cls, const struct SendOkMessage *okm)
654 * @param im message received 656 * @param im message received
655 */ 657 */
656static int 658static int
657check_recv(void *cls, const struct InboundMessage *im) 659check_recv (void *cls, const struct InboundMessage *im)
658{ 660{
659 const struct GNUNET_MessageHeader *imm; 661 const struct GNUNET_MessageHeader *imm;
660 uint16_t size; 662 uint16_t size;
661 663
662 size = ntohs(im->header.size) - sizeof(*im); 664 size = ntohs (im->header.size) - sizeof(*im);
663 if (size < sizeof(struct GNUNET_MessageHeader)) 665 if (size < sizeof(struct GNUNET_MessageHeader))
664 { 666 {
665 GNUNET_break(0); 667 GNUNET_break (0);
666 return GNUNET_SYSERR; 668 return GNUNET_SYSERR;
667 } 669 }
668 imm = (const struct GNUNET_MessageHeader *)&im[1]; 670 imm = (const struct GNUNET_MessageHeader *) &im[1];
669 if (ntohs(imm->size) != size) 671 if (ntohs (imm->size) != size)
670 { 672 {
671 GNUNET_break(0); 673 GNUNET_break (0);
672 return GNUNET_SYSERR; 674 return GNUNET_SYSERR;
673 } 675 }
674 return GNUNET_OK; 676 return GNUNET_OK;
675} 677}
676 678
@@ -682,27 +684,27 @@ check_recv(void *cls, const struct InboundMessage *im)
682 * @param im message received 684 * @param im message received
683 */ 685 */
684static void 686static void
685handle_recv(void *cls, const struct InboundMessage *im) 687handle_recv (void *cls, const struct InboundMessage *im)
686{ 688{
687 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 689 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
688 const struct GNUNET_MessageHeader *imm = 690 const struct GNUNET_MessageHeader *imm =
689 (const struct GNUNET_MessageHeader *)&im[1]; 691 (const struct GNUNET_MessageHeader *) &im[1];
690 struct Neighbour *n; 692 struct Neighbour *n;
691 693
692 LOG(GNUNET_ERROR_TYPE_DEBUG, 694 LOG (GNUNET_ERROR_TYPE_DEBUG,
693 "Received message of type %u with %u bytes from `%s'.\n", 695 "Received message of type %u with %u bytes from `%s'.\n",
694 (unsigned int)ntohs(imm->type), 696 (unsigned int) ntohs (imm->type),
695 (unsigned int)ntohs(imm->size), 697 (unsigned int) ntohs (imm->size),
696 GNUNET_i2s(&im->peer)); 698 GNUNET_i2s (&im->peer));
697 n = neighbour_find(h, &im->peer); 699 n = neighbour_find (h, &im->peer);
698 if (NULL == n) 700 if (NULL == n)
699 { 701 {
700 GNUNET_break(0); 702 GNUNET_break (0);
701 disconnect_and_schedule_reconnect(h); 703 disconnect_and_schedule_reconnect (h);
702 return; 704 return;
703 } 705 }
704 h->rom_pending++; 706 h->rom_pending++;
705 GNUNET_MQ_inject_message(n->mq, imm); 707 GNUNET_MQ_inject_message (n->mq, imm);
706} 708}
707 709
708 710
@@ -713,24 +715,24 @@ handle_recv(void *cls, const struct InboundMessage *im)
713 * @param msg message received 715 * @param msg message received
714 */ 716 */
715static void 717static void
716handle_set_quota(void *cls, const struct QuotaSetMessage *qm) 718handle_set_quota (void *cls, const struct QuotaSetMessage *qm)
717{ 719{
718 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 720 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
719 struct Neighbour *n; 721 struct Neighbour *n;
720 722
721 LOG(GNUNET_ERROR_TYPE_DEBUG, 723 LOG (GNUNET_ERROR_TYPE_DEBUG,
722 "Receiving SET_QUOTA message for `%s' with quota %u\n", 724 "Receiving SET_QUOTA message for `%s' with quota %u\n",
723 GNUNET_i2s(&qm->peer), 725 GNUNET_i2s (&qm->peer),
724 ntohl(qm->quota.value__)); 726 ntohl (qm->quota.value__));
725 n = neighbour_find(h, &qm->peer); 727 n = neighbour_find (h, &qm->peer);
726 if (NULL == n) 728 if (NULL == n)
727 { 729 {
728 GNUNET_break( 730 GNUNET_break (
729 0); /* FIXME: julius reports this assertion fails sometimes? */ 731 0); /* FIXME: julius reports this assertion fails sometimes? */
730 disconnect_and_schedule_reconnect(h); 732 disconnect_and_schedule_reconnect (h);
731 return; 733 return;
732 } 734 }
733 GNUNET_BANDWIDTH_tracker_update_quota(&n->out_tracker, qm->quota); 735 GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, qm->quota);
734} 736}
735 737
736 738
@@ -740,55 +742,55 @@ handle_set_quota(void *cls, const struct QuotaSetMessage *qm)
740 * @param cls the handle to the transport service 742 * @param cls the handle to the transport service
741 */ 743 */
742static void 744static void
743reconnect(void *cls) 745reconnect (void *cls)
744{ 746{
745 struct GNUNET_TRANSPORT_CoreHandle *h = cls; 747 struct GNUNET_TRANSPORT_CoreHandle *h = cls;
746 struct GNUNET_MQ_MessageHandler handlers[] = 748 struct GNUNET_MQ_MessageHandler handlers[] =
747 { GNUNET_MQ_hd_var_size(hello, 749 { GNUNET_MQ_hd_var_size (hello,
748 GNUNET_MESSAGE_TYPE_HELLO, 750 GNUNET_MESSAGE_TYPE_HELLO,
749 struct GNUNET_MessageHeader, 751 struct GNUNET_MessageHeader,
750 h), 752 h),
751 GNUNET_MQ_hd_fixed_size(connect, 753 GNUNET_MQ_hd_fixed_size (connect,
752 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, 754 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
753 struct ConnectInfoMessage, 755 struct ConnectInfoMessage,
754 h), 756 h),
755 GNUNET_MQ_hd_fixed_size(disconnect, 757 GNUNET_MQ_hd_fixed_size (disconnect,
756 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, 758 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
757 struct DisconnectInfoMessage, 759 struct DisconnectInfoMessage,
758 h), 760 h),
759 GNUNET_MQ_hd_fixed_size(send_ok, 761 GNUNET_MQ_hd_fixed_size (send_ok,
760 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, 762 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
761 struct SendOkMessage, 763 struct SendOkMessage,
762 h), 764 h),
763 GNUNET_MQ_hd_var_size(recv, 765 GNUNET_MQ_hd_var_size (recv,
764 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, 766 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
765 struct InboundMessage, 767 struct InboundMessage,
766 h), 768 h),
767 GNUNET_MQ_hd_fixed_size(set_quota, 769 GNUNET_MQ_hd_fixed_size (set_quota,
768 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, 770 GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
769 struct QuotaSetMessage, 771 struct QuotaSetMessage,
770 h), 772 h),
771 GNUNET_MQ_handler_end() }; 773 GNUNET_MQ_handler_end () };
772 struct GNUNET_MQ_Envelope *env; 774 struct GNUNET_MQ_Envelope *env;
773 struct StartMessage *s; 775 struct StartMessage *s;
774 uint32_t options; 776 uint32_t options;
775 777
776 h->reconnect_task = NULL; 778 h->reconnect_task = NULL;
777 LOG(GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); 779 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
778 GNUNET_assert(NULL == h->mq); 780 GNUNET_assert (NULL == h->mq);
779 h->mq = 781 h->mq =
780 GNUNET_CLIENT_connect(h->cfg, "transport", handlers, &mq_error_handler, h); 782 GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h);
781 if (NULL == h->mq) 783 if (NULL == h->mq)
782 return; 784 return;
783 env = GNUNET_MQ_msg(s, GNUNET_MESSAGE_TYPE_TRANSPORT_START); 785 env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START);
784 options = 0; 786 options = 0;
785 if (h->check_self) 787 if (h->check_self)
786 options |= 1; 788 options |= 1;
787 if (NULL != h->handlers) 789 if (NULL != h->handlers)
788 options |= 2; 790 options |= 2;
789 s->options = htonl(options); 791 s->options = htonl (options);
790 s->self = h->self; 792 s->self = h->self;
791 GNUNET_MQ_send(h->mq, env); 793 GNUNET_MQ_send (h->mq, env);
792} 794}
793 795
794 796
@@ -799,22 +801,22 @@ reconnect(void *cls)
799 * @param h transport service to reconnect 801 * @param h transport service to reconnect
800 */ 802 */
801static void 803static void
802disconnect_and_schedule_reconnect(struct GNUNET_TRANSPORT_CoreHandle *h) 804disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
803{ 805{
804 GNUNET_assert(NULL == h->reconnect_task); 806 GNUNET_assert (NULL == h->reconnect_task);
805 /* Forget about all neighbours that we used to be connected to */ 807 /* Forget about all neighbours that we used to be connected to */
806 GNUNET_CONTAINER_multipeermap_iterate(h->neighbours, &neighbour_delete, h); 808 GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h);
807 if (NULL != h->mq) 809 if (NULL != h->mq)
808 { 810 {
809 GNUNET_MQ_destroy(h->mq); 811 GNUNET_MQ_destroy (h->mq);
810 h->mq = NULL; 812 h->mq = NULL;
811 } 813 }
812 LOG(GNUNET_ERROR_TYPE_DEBUG, 814 LOG (GNUNET_ERROR_TYPE_DEBUG,
813 "Scheduling task to reconnect to transport service in %s.\n", 815 "Scheduling task to reconnect to transport service in %s.\n",
814 GNUNET_STRINGS_relative_time_to_string(h->reconnect_delay, GNUNET_YES)); 816 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
815 h->reconnect_task = 817 h->reconnect_task =
816 GNUNET_SCHEDULER_add_delayed(h->reconnect_delay, &reconnect, h); 818 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
817 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF(h->reconnect_delay); 819 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
818} 820}
819 821
820 822
@@ -826,12 +828,12 @@ disconnect_and_schedule_reconnect(struct GNUNET_TRANSPORT_CoreHandle *h)
826 * @return NULL if disconnected, otherwise message queue for @a peer 828 * @return NULL if disconnected, otherwise message queue for @a peer
827 */ 829 */
828struct GNUNET_MQ_Handle * 830struct GNUNET_MQ_Handle *
829GNUNET_TRANSPORT_core_get_mq(struct GNUNET_TRANSPORT_CoreHandle *handle, 831GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
830 const struct GNUNET_PeerIdentity *peer) 832 const struct GNUNET_PeerIdentity *peer)
831{ 833{
832 struct Neighbour *n; 834 struct Neighbour *n;
833 835
834 n = neighbour_find(handle, peer); 836 n = neighbour_find (handle, peer);
835 if (NULL == n) 837 if (NULL == n)
836 return NULL; 838 return NULL;
837 return n->mq; 839 return n->mq;
@@ -853,23 +855,23 @@ GNUNET_TRANSPORT_core_get_mq(struct GNUNET_TRANSPORT_CoreHandle *handle,
853 * @return NULL on error 855 * @return NULL on error
854 */ 856 */
855struct GNUNET_TRANSPORT_CoreHandle * 857struct GNUNET_TRANSPORT_CoreHandle *
856GNUNET_TRANSPORT_core_connect(const struct GNUNET_CONFIGURATION_Handle *cfg, 858GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
857 const struct GNUNET_PeerIdentity *self, 859 const struct GNUNET_PeerIdentity *self,
858 const struct GNUNET_MQ_MessageHandler *handlers, 860 const struct GNUNET_MQ_MessageHandler *handlers,
859 void *cls, 861 void *cls,
860 GNUNET_TRANSPORT_NotifyConnect nc, 862 GNUNET_TRANSPORT_NotifyConnect nc,
861 GNUNET_TRANSPORT_NotifyDisconnect nd, 863 GNUNET_TRANSPORT_NotifyDisconnect nd,
862 GNUNET_TRANSPORT_NotifyExcessBandwidth neb) 864 GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
863{ 865{
864 struct GNUNET_TRANSPORT_CoreHandle *h; 866 struct GNUNET_TRANSPORT_CoreHandle *h;
865 unsigned int i; 867 unsigned int i;
866 868
867 h = GNUNET_new(struct GNUNET_TRANSPORT_CoreHandle); 869 h = GNUNET_new (struct GNUNET_TRANSPORT_CoreHandle);
868 if (NULL != self) 870 if (NULL != self)
869 { 871 {
870 h->self = *self; 872 h->self = *self;
871 h->check_self = GNUNET_YES; 873 h->check_self = GNUNET_YES;
872 } 874 }
873 h->cfg = cfg; 875 h->cfg = cfg;
874 h->cls = cls; 876 h->cls = cls;
875 h->nc_cb = nc; 877 h->nc_cb = nc;
@@ -877,24 +879,24 @@ GNUNET_TRANSPORT_core_connect(const struct GNUNET_CONFIGURATION_Handle *cfg,
877 h->neb_cb = neb; 879 h->neb_cb = neb;
878 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 880 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
879 if (NULL != handlers) 881 if (NULL != handlers)
880 { 882 {
881 for (i = 0; NULL != handlers[i].cb; i++) 883 for (i = 0; NULL != handlers[i].cb; i++)
882 ; 884 ;
883 h->handlers = GNUNET_new_array(i + 1, struct GNUNET_MQ_MessageHandler); 885 h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler);
884 GNUNET_memcpy(h->handlers, 886 GNUNET_memcpy (h->handlers,
885 handlers, 887 handlers,
886 i * sizeof(struct GNUNET_MQ_MessageHandler)); 888 i * sizeof(struct GNUNET_MQ_MessageHandler));
887 } 889 }
888 LOG(GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n"); 890 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n");
889 reconnect(h); 891 reconnect (h);
890 if (NULL == h->mq) 892 if (NULL == h->mq)
891 { 893 {
892 GNUNET_free_non_null(h->handlers); 894 GNUNET_free_non_null (h->handlers);
893 GNUNET_free(h); 895 GNUNET_free (h);
894 return NULL; 896 return NULL;
895 } 897 }
896 h->neighbours = 898 h->neighbours =
897 GNUNET_CONTAINER_multipeermap_create(STARTING_NEIGHBOURS_SIZE, GNUNET_YES); 899 GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
898 return h; 900 return h;
899} 901}
900 902
@@ -906,23 +908,23 @@ GNUNET_TRANSPORT_core_connect(const struct GNUNET_CONFIGURATION_Handle *cfg,
906 * #GNUNET_TRANSPORT_core_connect() 908 * #GNUNET_TRANSPORT_core_connect()
907 */ 909 */
908void 910void
909GNUNET_TRANSPORT_core_disconnect(struct GNUNET_TRANSPORT_CoreHandle *handle) 911GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
910{ 912{
911 LOG(GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n"); 913 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
912 /* this disconnects all neighbours... */ 914 /* this disconnects all neighbours... */
913 if (NULL == handle->reconnect_task) 915 if (NULL == handle->reconnect_task)
914 disconnect_and_schedule_reconnect(handle); 916 disconnect_and_schedule_reconnect (handle);
915 /* and now we stop trying to connect again... */ 917 /* and now we stop trying to connect again... */
916 if (NULL != handle->reconnect_task) 918 if (NULL != handle->reconnect_task)
917 { 919 {
918 GNUNET_SCHEDULER_cancel(handle->reconnect_task); 920 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
919 handle->reconnect_task = NULL; 921 handle->reconnect_task = NULL;
920 } 922 }
921 GNUNET_CONTAINER_multipeermap_destroy(handle->neighbours); 923 GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
922 handle->neighbours = NULL; 924 handle->neighbours = NULL;
923 GNUNET_free_non_null(handle->handlers); 925 GNUNET_free_non_null (handle->handlers);
924 handle->handlers = NULL; 926 handle->handlers = NULL;
925 GNUNET_free(handle); 927 GNUNET_free (handle);
926} 928}
927 929
928 930
@@ -947,18 +949,18 @@ GNUNET_TRANSPORT_core_disconnect(struct GNUNET_TRANSPORT_CoreHandle *handle)
947 * @param pid which peer was the message from that was fully processed by CORE 949 * @param pid which peer was the message from that was fully processed by CORE
948 */ 950 */
949void 951void
950GNUNET_TRANSPORT_core_receive_continue(struct GNUNET_TRANSPORT_CoreHandle *ch, 952GNUNET_TRANSPORT_core_receive_continue (struct GNUNET_TRANSPORT_CoreHandle *ch,
951 const struct GNUNET_PeerIdentity *pid) 953 const struct GNUNET_PeerIdentity *pid)
952{ 954{
953 struct RecvOkMessage *rom; 955 struct RecvOkMessage *rom;
954 struct GNUNET_MQ_Envelope *env; 956 struct GNUNET_MQ_Envelope *env;
955 957
956 GNUNET_assert(ch->rom_pending > 0); 958 GNUNET_assert (ch->rom_pending > 0);
957 ch->rom_pending--; 959 ch->rom_pending--;
958 env = GNUNET_MQ_msg(rom, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK); 960 env = GNUNET_MQ_msg (rom, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK);
959 rom->increase_window_delta = htonl(1); 961 rom->increase_window_delta = htonl (1);
960 rom->peer = *pid; 962 rom->peer = *pid;
961 GNUNET_MQ_send(ch->mq, env); 963 GNUNET_MQ_send (ch->mq, env);
962} 964}
963 965
964 966