aboutsummaryrefslogtreecommitdiff
path: root/src/core/gnunet-service-core_sessions.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2014-01-30 19:22:23 +0000
committerChristian Grothoff <christian@grothoff.org>2014-01-30 19:22:23 +0000
commit6daf13eaa64b5b041edce219f30ab8dcfe38cdf5 (patch)
treec56e4517a0ee08cfb013e296b7d7cf33f2ab49d8 /src/core/gnunet-service-core_sessions.c
parent6fd0a7efde08115b568b99b7755861a50f1b6c2e (diff)
downloadgnunet-6daf13eaa64b5b041edce219f30ab8dcfe38cdf5.tar.gz
gnunet-6daf13eaa64b5b041edce219f30ab8dcfe38cdf5.zip
-towards fixing #3295 (core traffic prioritization)
Diffstat (limited to 'src/core/gnunet-service-core_sessions.c')
-rw-r--r--src/core/gnunet-service-core_sessions.c149
1 files changed, 112 insertions, 37 deletions
diff --git a/src/core/gnunet-service-core_sessions.c b/src/core/gnunet-service-core_sessions.c
index eb42d6fbc..080bbf88b 100644
--- a/src/core/gnunet-service-core_sessions.c
+++ b/src/core/gnunet-service-core_sessions.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2009-2013 Christian Grothoff (and other contributing authors) 3 (C) 2009-2014 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -74,6 +74,11 @@ struct SessionMessageEntry
74 */ 74 */
75 size_t size; 75 size_t size;
76 76
77 /**
78 * How important is this message.
79 */
80 enum GNUNET_CORE_Priority priority;
81
77}; 82};
78 83
79 84
@@ -145,7 +150,7 @@ struct Session
145 150
146 151
147/** 152/**
148 * Map of peer identities to 'struct Session'. 153 * Map of peer identities to `struct Session`.
149 */ 154 */
150static struct GNUNET_CONTAINER_MultiPeerMap *sessions; 155static struct GNUNET_CONTAINER_MultiPeerMap *sessions;
151 156
@@ -180,7 +185,8 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
180 session = find_session (pid); 185 session = find_session (pid);
181 if (NULL == session) 186 if (NULL == session)
182 return; 187 return;
183 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying session for peer `%4s'\n", 188 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
189 "Destroying session for peer `%4s'\n",
184 GNUNET_i2s (&session->peer)); 190 GNUNET_i2s (&session->peer));
185 if (GNUNET_SCHEDULER_NO_TASK != session->cork_task) 191 if (GNUNET_SCHEDULER_NO_TASK != session->cork_task)
186 { 192 {
@@ -218,11 +224,12 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
218 * Transmit our current typemap message to the other peer. 224 * Transmit our current typemap message to the other peer.
219 * (Done periodically in case an update got lost). 225 * (Done periodically in case an update got lost).
220 * 226 *
221 * @param cls the 'struct Session*' 227 * @param cls the `struct Session *`
222 * @param tc unused 228 * @param tc unused
223 */ 229 */
224static void 230static void
225transmit_typemap_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 231transmit_typemap_task (void *cls,
232 const struct GNUNET_SCHEDULER_TaskContext *tc)
226{ 233{
227 struct Session *session = cls; 234 struct Session *session = cls;
228 struct GNUNET_MessageHeader *hdr; 235 struct GNUNET_MessageHeader *hdr;
@@ -263,7 +270,8 @@ GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer,
263{ 270{
264 struct Session *session; 271 struct Session *session;
265 272
266 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating session for peer `%4s'\n", 273 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
274 "Creating session for peer `%4s'\n",
267 GNUNET_i2s (peer)); 275 GNUNET_i2s (peer));
268 session = GNUNET_new (struct Session); 276 session = GNUNET_new (struct Session);
269 session->tmap = GSC_TYPEMAP_create (); 277 session->tmap = GSC_TYPEMAP_create ();
@@ -286,13 +294,14 @@ GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer,
286/** 294/**
287 * Notify the given client about the session (client is new). 295 * Notify the given client about the session (client is new).
288 * 296 *
289 * @param cls the 'struct GSC_Client' 297 * @param cls the `struct GSC_Client`
290 * @param key peer identity 298 * @param key peer identity
291 * @param value the 'struct Session' 299 * @param value the `struct Session`
292 * @return GNUNET_OK (continue to iterate) 300 * @return #GNUNET_OK (continue to iterate)
293 */ 301 */
294static int 302static int
295notify_client_about_session (void *cls, const struct GNUNET_PeerIdentity * key, 303notify_client_about_session (void *cls,
304 const struct GNUNET_PeerIdentity *key,
296 void *value) 305 void *value)
297{ 306{
298 struct GSC_Client *client = cls; 307 struct GSC_Client *client = cls;
@@ -334,8 +343,8 @@ try_transmission (struct Session *session);
334 * 343 *
335 * @param car request to queue; this handle is then shared between 344 * @param car request to queue; this handle is then shared between
336 * the caller (CLIENTS subsystem) and SESSIONS and must not 345 * the caller (CLIENTS subsystem) and SESSIONS and must not
337 * be released by either until either 'GNUNET_SESSIONS_dequeue', 346 * be released by either until either #GSC_SESSIONS_dequeue(),
338 * 'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed' 347 * #GSC_SESSIONS_transmit() or #GSC_CLIENTS_failed()
339 * have been invoked on it 348 * have been invoked on it
340 */ 349 */
341void 350void
@@ -344,7 +353,7 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
344 struct Session *session; 353 struct Session *session;
345 354
346 session = find_session (&car->target); 355 session = find_session (&car->target);
347 if (session == NULL) 356 if (NULL == session)
348 { 357 {
349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 358 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
350 "Dropped client request for transmission (am disconnected)\n"); 359 "Dropped client request for transmission (am disconnected)\n");
@@ -423,27 +432,40 @@ discard_expired_requests (struct Session *session)
423 432
424 433
425/** 434/**
426 * Solicit messages for transmission. 435 * Solicit messages for transmission, starting with those of the highest
436 * priority.
427 * 437 *
428 * @param session session to solict messages for 438 * @param session session to solict messages for
439 * @param msize how many bytes do we have already
429 */ 440 */
430static void 441static void
431solicit_messages (struct Session *session) 442solicit_messages (struct Session *session,
443 size_t msize)
432{ 444{
433 struct GSC_ClientActiveRequest *car; 445 struct GSC_ClientActiveRequest *car;
434 struct GSC_ClientActiveRequest *nxt; 446 struct GSC_ClientActiveRequest *nxt;
435 size_t so_size; 447 size_t so_size;
448 enum GNUNET_CORE_Priority pmax;
436 449
437 discard_expired_requests (session); 450 discard_expired_requests (session);
438 so_size = 0; 451 so_size = msize;
452 pmax = GNUNET_CORE_PRIO_BACKGROUND;
453 for (car = session->active_client_request_head; NULL != car; car = car->next)
454 {
455 if (GNUNET_YES == car->was_solicited)
456 continue;
457 pmax = GNUNET_MAX (pmax, car->priority);
458 }
439 nxt = session->active_client_request_head; 459 nxt = session->active_client_request_head;
440 while (NULL != (car = nxt)) 460 while (NULL != (car = nxt))
441 { 461 {
442 nxt = car->next; 462 nxt = car->next;
463 if (car->priority < pmax)
464 continue;
443 if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) 465 if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
444 break; 466 break;
445 so_size += car->msize; 467 so_size += car->msize;
446 if (car->was_solicited == GNUNET_YES) 468 if (GNUNET_YES == car->was_solicited)
447 continue; 469 continue;
448 car->was_solicited = GNUNET_YES; 470 car->was_solicited = GNUNET_YES;
449 GSC_CLIENTS_solicit_request (car); 471 GSC_CLIENTS_solicit_request (car);
@@ -455,11 +477,12 @@ solicit_messages (struct Session *session)
455 * Some messages were delayed (corked), but the timeout has now expired. 477 * Some messages were delayed (corked), but the timeout has now expired.
456 * Send them now. 478 * Send them now.
457 * 479 *
458 * @param cls 'struct Session' with the messages to transmit now 480 * @param cls `struct Session` with the messages to transmit now
459 * @param tc scheduler context (unused) 481 * @param tc scheduler context (unused)
460 */ 482 */
461static void 483static void
462pop_cork_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 484pop_cork_task (void *cls,
485 const struct GNUNET_SCHEDULER_TaskContext *tc)
463{ 486{
464 struct Session *session = cls; 487 struct Session *session = cls;
465 488
@@ -470,7 +493,8 @@ pop_cork_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
470 493
471/** 494/**
472 * Try to perform a transmission on the given session. Will solicit 495 * Try to perform a transmission on the given session. Will solicit
473 * additional messages if the 'sme' queue is not full enough. 496 * additional messages if the 'sme' queue is not full enough or has
497 * only low-priority messages.
474 * 498 *
475 * @param session session to transmit messages from 499 * @param session session to transmit messages from
476 */ 500 */
@@ -481,33 +505,58 @@ try_transmission (struct Session *session)
481 size_t msize; 505 size_t msize;
482 struct GNUNET_TIME_Absolute now; 506 struct GNUNET_TIME_Absolute now;
483 struct GNUNET_TIME_Absolute min_deadline; 507 struct GNUNET_TIME_Absolute min_deadline;
508 enum GNUNET_CORE_Priority maxp;
509 enum GNUNET_CORE_Priority maxpc;
510 struct GSC_ClientActiveRequest *car;
484 511
485 if (GNUNET_YES != session->ready_to_transmit) 512 if (GNUNET_YES != session->ready_to_transmit)
486 return; 513 return;
487 msize = 0; 514 msize = 0;
488 min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; 515 min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
489 /* check 'ready' messages */ 516 /* check 'ready' messages */
517 maxp = GNUNET_CORE_PRIO_BACKGROUND;
490 pos = session->sme_head; 518 pos = session->sme_head;
491 while ((NULL != pos) && 519 while ((NULL != pos) &&
492 (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)) 520 (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE))
493 { 521 {
494 GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE); 522 GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
495 msize += pos->size; 523 msize += pos->size;
524 maxp = GNUNET_MAX (maxp, pos->priority);
496 min_deadline = GNUNET_TIME_absolute_min (min_deadline, pos->deadline); 525 min_deadline = GNUNET_TIME_absolute_min (min_deadline, pos->deadline);
497 pos = pos->next; 526 pos = pos->next;
498 } 527 }
528 if (maxp < GNUNET_CORE_PRIO_CRITICAL_CONTROL)
529 {
530 maxpc = GNUNET_CORE_PRIO_BACKGROUND;
531 for (car = session->active_client_request_head; NULL != car; car = car->next)
532 {
533 if (GNUNET_YES == car->was_solicited)
534 continue;
535 maxpc = GNUNET_MAX (maxpc, car->priority);
536 }
537 if (maxpc > maxp)
538 {
539 /* we have messages waiting for solicitation that have a higher
540 priority than those that we already accepted; solicit the
541 high-priority messages first */
542 solicit_messages (session, 0);
543 return;
544 }
545 }
546
499 now = GNUNET_TIME_absolute_get (); 547 now = GNUNET_TIME_absolute_get ();
500 if ((msize == 0) || 548 if ((0 == msize) ||
501 ((msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) && 549 ((msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) &&
502 (min_deadline.abs_value_us > now.abs_value_us))) 550 (min_deadline.abs_value_us > now.abs_value_us)))
503 { 551 {
504 /* not enough ready yet, try to solicit more */ 552 /* not enough ready yet, try to solicit more */
505 solicit_messages (session); 553 solicit_messages (session,
554 msize);
506 if (msize > 0) 555 if (msize > 0)
507 { 556 {
508 /* if there is data to send, just not yet, make sure we do transmit 557 /* if there is data to send, just not yet, make sure we do transmit
509 * it once the deadline is reached */ 558 * it once the deadline is reached */
510 if (session->cork_task != GNUNET_SCHEDULER_NO_TASK) 559 if (GNUNET_SCHEDULER_NO_TASK != session->cork_task)
511 GNUNET_SCHEDULER_cancel (session->cork_task); 560 GNUNET_SCHEDULER_cancel (session->cork_task);
512 session->cork_task = 561 session->cork_task =
513 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining 562 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
@@ -540,7 +589,8 @@ try_transmission (struct Session *session)
540 total_msgs = 1; 589 total_msgs = 1;
541 total_bytes = used; 590 total_bytes = used;
542 } 591 }
543 GNUNET_STATISTICS_set (GSC_stats, "# avg payload per encrypted message", 592 GNUNET_STATISTICS_set (GSC_stats,
593 "# avg payload per encrypted message",
544 total_bytes / total_msgs, GNUNET_NO); 594 total_bytes / total_msgs, GNUNET_NO);
545 /* now actually transmit... */ 595 /* now actually transmit... */
546 session->ready_to_transmit = GNUNET_NO; 596 session->ready_to_transmit = GNUNET_NO;
@@ -554,11 +604,13 @@ try_transmission (struct Session *session)
554 * 604 *
555 * @param cls the message 605 * @param cls the message
556 * @param key neighbour's identity 606 * @param key neighbour's identity
557 * @param value 'struct Neighbour' of the target 607 * @param value `struct Neighbour` of the target
558 * @return always GNUNET_OK 608 * @return always #GNUNET_OK
559 */ 609 */
560static int 610static int
561do_send_message (void *cls, const struct GNUNET_PeerIdentity * key, void *value) 611do_send_message (void *cls,
612 const struct GNUNET_PeerIdentity *key,
613 void *value)
562{ 614{
563 const struct GNUNET_MessageHeader *hdr = cls; 615 const struct GNUNET_MessageHeader *hdr = cls;
564 struct Session *session = value; 616 struct Session *session = value;
@@ -569,7 +621,8 @@ do_send_message (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
569 m = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size); 621 m = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size);
570 memcpy (&m[1], hdr, size); 622 memcpy (&m[1], hdr, size);
571 m->size = size; 623 m->size = size;
572 GNUNET_CONTAINER_DLL_insert (session->sme_head, session->sme_tail, m); 624 m->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
625 GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, session->sme_tail, m);
573 try_transmission (session); 626 try_transmission (session);
574 return GNUNET_OK; 627 return GNUNET_OK;
575} 628}
@@ -585,7 +638,8 @@ GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg)
585{ 638{
586 if (NULL == sessions) 639 if (NULL == sessions)
587 return; 640 return;
588 GNUNET_CONTAINER_multipeermap_iterate (sessions, &do_send_message, 641 GNUNET_CONTAINER_multipeermap_iterate (sessions,
642 &do_send_message,
589 (void *) msg); 643 (void *) msg);
590} 644}
591 645
@@ -617,13 +671,17 @@ GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid)
617 * this handle will now be 'owned' by the SESSIONS subsystem 671 * this handle will now be 'owned' by the SESSIONS subsystem
618 * @param msg message to transmit 672 * @param msg message to transmit
619 * @param cork is corking allowed? 673 * @param cork is corking allowed?
674 * @param priority how important is this message
620 */ 675 */
621void 676void
622GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, 677GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
623 const struct GNUNET_MessageHeader *msg, int cork) 678 const struct GNUNET_MessageHeader *msg,
679 int cork,
680 enum GNUNET_CORE_Priority priority)
624{ 681{
625 struct Session *session; 682 struct Session *session;
626 struct SessionMessageEntry *sme; 683 struct SessionMessageEntry *sme;
684 struct SessionMessageEntry *pos;
627 size_t msize; 685 size_t msize;
628 686
629 session = find_session (&car->target); 687 session = find_session (&car->target);
@@ -633,10 +691,23 @@ GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
633 sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize); 691 sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize);
634 memcpy (&sme[1], msg, msize); 692 memcpy (&sme[1], msg, msize);
635 sme->size = msize; 693 sme->size = msize;
694 sme->priority = priority;
636 if (GNUNET_YES == cork) 695 if (GNUNET_YES == cork)
637 sme->deadline = 696 sme->deadline =
638 GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY); 697 GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY);
639 GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, session->sme_tail, sme); 698 pos = session->sme_head;
699 while ( (NULL != pos) &&
700 (pos->priority > sme->priority) )
701 pos = pos->next;
702 if (NULL == pos)
703 GNUNET_CONTAINER_DLL_insert_tail (session->sme_head,
704 session->sme_tail,
705 sme);
706 else
707 GNUNET_CONTAINER_DLL_insert_after (session->sme_head,
708 session->sme_tail,
709 pos->prev,
710 sme);
640 try_transmission (session); 711 try_transmission (session);
641} 712}
642 713
@@ -644,14 +715,16 @@ GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
644/** 715/**
645 * Helper function for GSC_SESSIONS_handle_client_iterate_peers. 716 * Helper function for GSC_SESSIONS_handle_client_iterate_peers.
646 * 717 *
647 * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies 718 * @param cls the `struct GNUNET_SERVER_TransmitContext` to queue replies
648 * @param key identity of the connected peer 719 * @param key identity of the connected peer
649 * @param value the 'struct Neighbour' for the peer 720 * @param value the `struct Neighbour` for the peer
650 * @return GNUNET_OK (continue to iterate) 721 * @return GNUNET_OK (continue to iterate)
651 */ 722 */
652#include "core.h" 723#include "core.h"
653static int 724static int
654queue_connect_message (void *cls, const struct GNUNET_PeerIdentity * key, void *value) 725queue_connect_message (void *cls,
726 const struct GNUNET_PeerIdentity *key,
727 void *value)
655{ 728{
656 struct GNUNET_SERVER_TransmitContext *tc = cls; 729 struct GNUNET_SERVER_TransmitContext *tc = cls;
657 struct Session *session = value; 730 struct Session *session = value;
@@ -768,11 +841,13 @@ GSC_SESSIONS_init ()
768 * 841 *
769 * @param cls NULL 842 * @param cls NULL
770 * @param key identity of the connected peer 843 * @param key identity of the connected peer
771 * @param value the 'struct Session' for the peer 844 * @param value the `struct Session` for the peer
772 * @return GNUNET_OK (continue to iterate) 845 * @return #GNUNET_OK (continue to iterate)
773 */ 846 */
774static int 847static int
775free_session_helper (void *cls, const struct GNUNET_PeerIdentity * key, void *value) 848free_session_helper (void *cls,
849 const struct GNUNET_PeerIdentity *key,
850 void *value)
776{ 851{
777 struct Session *session = value; 852 struct Session *session = value;
778 853