aboutsummaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-transport_neighbours.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-08-12 15:49:57 +0000
committerChristian Grothoff <christian@grothoff.org>2011-08-12 15:49:57 +0000
commit25d2cf2ca1882482d535c8ad0d879b9846e12ea2 (patch)
treecbc6313736a1d1941559f5b5c922cdd1db8c4386 /src/transport/gnunet-service-transport_neighbours.c
parent63d15aa9d1ea50933ff34fc42916024a5d3196bf (diff)
downloadgnunet-25d2cf2ca1882482d535c8ad0d879b9846e12ea2.tar.gz
gnunet-25d2cf2ca1882482d535c8ad0d879b9846e12ea2.zip
finishing neighbours
Diffstat (limited to 'src/transport/gnunet-service-transport_neighbours.c')
-rw-r--r--src/transport/gnunet-service-transport_neighbours.c228
1 files changed, 172 insertions, 56 deletions
diff --git a/src/transport/gnunet-service-transport_neighbours.c b/src/transport/gnunet-service-transport_neighbours.c
index f30878966..f262c2b87 100644
--- a/src/transport/gnunet-service-transport_neighbours.c
+++ b/src/transport/gnunet-service-transport_neighbours.c
@@ -26,6 +26,7 @@
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_ats_service.h" 27#include "gnunet_ats_service.h"
28#include "gnunet-service-transport_neighbours.h" 28#include "gnunet-service-transport_neighbours.h"
29#include "gnunet-service-transport_plugins.h"
29#include "gnunet-service-transport_validation.h" 30#include "gnunet-service-transport_validation.h"
30#include "gnunet-service-transport.h" 31#include "gnunet-service-transport.h"
31#include "gnunet_peerinfo_service.h" 32#include "gnunet_peerinfo_service.h"
@@ -45,15 +46,12 @@
45#define QUOTA_VIOLATION_DROP_THRESHOLD 10 46#define QUOTA_VIOLATION_DROP_THRESHOLD 10
46 47
47 48
48// TODO: 49/**
49// - have a way to access the currently 'connected' session 50 * Entry in neighbours.
50// (for sending and to notice disconnect of it!) 51 */
51// - have a way to access/update bandwidth/quota information per peer
52// (for CostReport/TrafficReport callbacks)
53
54
55struct NeighbourMapEntry; 52struct NeighbourMapEntry;
56 53
54
57/** 55/**
58 * For each neighbour we keep a list of messages 56 * For each neighbour we keep a list of messages
59 * that we still want to transmit to the neighbour. 57 * that we still want to transmit to the neighbour.
@@ -72,6 +70,12 @@ struct MessageQueue
72 struct MessageQueue *prev; 70 struct MessageQueue *prev;
73 71
74 /** 72 /**
73 * Once this message is actively being transmitted, which
74 * neighbour is it associated with?
75 */
76 struct NeighbourMapEntry *n;
77
78 /**
75 * Function to call once we're done. 79 * Function to call once we're done.
76 */ 80 */
77 GST_NeighbourSendContinuation cont; 81 GST_NeighbourSendContinuation cont;
@@ -130,6 +134,11 @@ struct NeighbourMapEntry
130 struct GNUNET_TRANSPORT_ATS_Information *ats; 134 struct GNUNET_TRANSPORT_ATS_Information *ats;
131 135
132 /** 136 /**
137 * Are we currently trying to send a message? If so, which one?
138 */
139 struct MessageQueue *is_active;
140
141 /**
133 * Active session for communicating with the peer. 142 * Active session for communicating with the peer.
134 */ 143 */
135 struct Session *session; 144 struct Session *session;
@@ -161,18 +170,10 @@ struct NeighbourMapEntry
161 GNUNET_SCHEDULER_TaskIdentifier timeout_task; 170 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
162 171
163 /** 172 /**
164 * ID of task scheduled to run when we should retry transmitting 173 * ID of task scheduled to run when we should try transmitting
165 * the head of the message queue. Actually triggered when the 174 * the head of the message queue.
166 * transmission is timing out (we trigger instantly when we have
167 * a chance of success).
168 */
169 GNUNET_SCHEDULER_TaskIdentifier retry_task;
170
171 /**
172 * How long until we should consider this peer dead (if we don't
173 * receive another message in the meantime)?
174 */ 175 */
175 struct GNUNET_TIME_Absolute peer_timeout; 176 GNUNET_SCHEDULER_TaskIdentifier transmission_task;
176 177
177 /** 178 /**
178 * Tracker for inbound bandwidth. 179 * Tracker for inbound bandwidth.
@@ -192,15 +193,9 @@ struct NeighbourMapEntry
192 unsigned int ats_count; 193 unsigned int ats_count;
193 194
194 /** 195 /**
195 * Have we seen an PONG from this neighbour in the past (and
196 * not had a disconnect since)?
197 */
198 // int received_pong;
199
200 /**
201 * Are we already in the process of disconnecting this neighbour? 196 * Are we already in the process of disconnecting this neighbour?
202 */ 197 */
203 // int in_disconnect; 198 int in_disconnect;
204 199
205 /** 200 /**
206 * Do we currently consider this neighbour connected? (as far as 201 * Do we currently consider this neighbour connected? (as far as
@@ -246,7 +241,49 @@ lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
246} 241}
247 242
248 243
249#if 0 244/**
245 * Task invoked to start a transmission to another peer.
246 *
247 * @param cls the 'struct NeighbourMapEntry'
248 * @param tc scheduler context
249 */
250static void
251transmission_task (void *cls,
252 const struct GNUNET_SCHEDULER_TaskContext *tc);
253
254
255/**
256 * We're done with our transmission attempt, continue processing.
257 *
258 * @param cls the 'struct MessageQueue' of the message
259 * @param receiver intended receiver
260 * @param success whether it worked or not
261 */
262static void
263transmit_send_continuation (void *cls,
264 const struct GNUNET_PeerIdentity *receiver,
265 int success)
266{
267 struct MessageQueue *mq;
268 struct NeighbourMapEntry *n;
269
270 mq = cls;
271 n = mq->n;
272 if (NULL != n)
273 {
274 GNUNET_assert (n->is_active == mq);
275 n->is_active = NULL;
276 GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
277 n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
278 n);
279 }
280 if (NULL != mq->cont)
281 mq->cont (mq->cont_cls,
282 success);
283 GNUNET_free (mq);
284}
285
286
250/** 287/**
251 * Check the ready list for the given neighbour and if a plugin is 288 * Check the ready list for the given neighbour and if a plugin is
252 * ready for transmission (and if we have a message), do so! 289 * ready for transmission (and if we have a message), do so!
@@ -259,41 +296,73 @@ try_transmission_to_peer (struct NeighbourMapEntry *n)
259 struct MessageQueue *mq; 296 struct MessageQueue *mq;
260 struct GNUNET_TIME_Relative timeout; 297 struct GNUNET_TIME_Relative timeout;
261 ssize_t ret; 298 ssize_t ret;
299 struct GNUNET_TRANSPORT_PluginFunctions *papi;
262 300
263 if (n->messages_head == NULL) 301 if (n->is_active != NULL)
302 return; /* transmission already pending */
303 if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
304 return; /* currently waiting for bandwidth */
305 mq = n->messages_head;
306 while (NULL != (mq = n->messages_head))
264 { 307 {
265#if DEBUG_TRANSPORT 308 timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
266 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 309 if (timeout.rel_value > 0)
267 "Transmission queue for `%4s' is empty\n", 310 break;
268 GNUNET_i2s (&n->id)); 311 transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */
269#endif 312 }
270 return; /* nothing to do */ 313 if (NULL == mq)
314 return; /* no more messages */
315
316 papi = GST_plugins_find (n->plugin_name);
317 if (papi == NULL)
318 {
319 GNUNET_break (0);
320 return;
271 } 321 }
272 mq = n->messages_head;
273 GNUNET_CONTAINER_DLL_remove (n->messages_head, 322 GNUNET_CONTAINER_DLL_remove (n->messages_head,
274 n->messages_tail, 323 n->messages_tail,
275 mq); 324 mq);
325 n->is_active = mq;
326 mq->n = n;
276 ret = papi->send (papi->cls, 327 ret = papi->send (papi->cls,
277 &n->pid, 328 &n->id,
278 mq->message_buf, 329 mq->message_buf,
279 mq->message_buf_size, 330 mq->message_buf_size,
280 mq->priority, 331 0 /* priority -- remove from plugin API? */,
281 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 332 timeout,
282 n->session, 333 n->session,
283 n->addr, 334 n->addr,
284 n->addrlen, 335 n->addrlen,
285 GNUNET_YES /*?*/, 336 GNUNET_YES,
286 &transmit_send_continuation, mq); 337 &transmit_send_continuation, mq);
287 if (ret == -1) 338 if (ret == -1)
288 { 339 {
289 /* failure, but 'send' would not call continuation in this case, 340 /* failure, but 'send' would not call continuation in this case,
290 so we need to do it here! */ 341 so we need to do it here! */
291 transmit_send_continuation (mq, 342 transmit_send_continuation (mq,
292 &mq->neighbour_id, 343 &n->id,
293 GNUNET_SYSERR); 344 GNUNET_SYSERR);
345 n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
346 n);
294 } 347 }
295} 348}
296#endif 349
350
351/**
352 * Task invoked to start a transmission to another peer.
353 *
354 * @param cls the 'struct NeighbourMapEntry'
355 * @param tc scheduler context
356 */
357static void
358transmission_task (void *cls,
359 const struct GNUNET_SCHEDULER_TaskContext *tc)
360{
361 struct NeighbourMapEntry *n = cls;
362
363 n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
364 try_transmission_to_peer (n);
365}
297 366
298 367
299/** 368/**
@@ -325,22 +394,41 @@ disconnect_neighbour (struct NeighbourMapEntry *n)
325{ 394{
326 struct MessageQueue *mq; 395 struct MessageQueue *mq;
327 396
328 if (n->is_connected) 397 if (GNUNET_YES == n->in_disconnect)
398 return;
399 n->in_disconnect = GNUNET_YES;
400 while (NULL != (mq = n->messages_head))
329 { 401 {
402 GNUNET_CONTAINER_DLL_remove (n->messages_head,
403 n->messages_tail,
404 mq);
405 mq->cont (mq->cont_cls, GNUNET_SYSERR);
406 GNUNET_free (mq);
407 }
408 if (NULL != n->is_active)
409 {
410 n->is_active->n = NULL;
411 n->is_active = NULL;
412 }
413 if (GNUNET_YES == n->is_connected)
414 {
415 n->is_connected = GNUNET_NO;
330 disconnect_notify_cb (callback_cls, 416 disconnect_notify_cb (callback_cls,
331 &n->id); 417 &n->id);
332 n->is_connected = GNUNET_NO;
333 } 418 }
334 GNUNET_assert (GNUNET_YES == 419 GNUNET_assert (GNUNET_YES ==
335 GNUNET_CONTAINER_multihashmap_remove (neighbours, 420 GNUNET_CONTAINER_multihashmap_remove (neighbours,
336 &n->id.hashPubKey, 421 &n->id.hashPubKey,
337 n)); 422 n));
338 while (NULL != (mq = n->messages_head)) 423 if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
339 { 424 {
340 GNUNET_CONTAINER_DLL_remove (n->messages_head, 425 GNUNET_SCHEDULER_cancel (n->timeout_task);
341 n->messages_tail, 426 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
342 mq); 427 }
343 GNUNET_free (mq); 428 if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
429 {
430 GNUNET_SCHEDULER_cancel (n->timeout_task);
431 n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
344 } 432 }
345 if (NULL != n->asc) 433 if (NULL != n->asc)
346 { 434 {
@@ -446,6 +534,7 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
446 uint32_t ats_count) 534 uint32_t ats_count)
447{ 535{
448 struct NeighbourMapEntry *n; 536 struct NeighbourMapEntry *n;
537 struct GNUNET_MessageHeader connect_msg;
449 538
450 n = lookup_neighbour (peer); 539 n = lookup_neighbour (peer);
451 if (NULL == n) 540 if (NULL == n)
@@ -466,6 +555,17 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
466 ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)); 555 ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
467 GNUNET_free_non_null (n->plugin_name); 556 GNUNET_free_non_null (n->plugin_name);
468 n->plugin_name = GNUNET_strdup (plugin_name); 557 n->plugin_name = GNUNET_strdup (plugin_name);
558 GNUNET_SCHEDULER_cancel (n->timeout_task);
559 n->timeout_task =
560 GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
561 &neighbour_timeout_task, n);
562 connect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
563 connect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
564 GST_neighbours_send (peer,
565 &connect_msg,
566 sizeof (connect_msg),
567 GNUNET_TIME_UNIT_FOREVER_REL,
568 NULL, NULL);
469} 569}
470 570
471 571
@@ -564,7 +664,7 @@ GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
564 664
565 n = lookup_neighbour (target); 665 n = lookup_neighbour (target);
566 if ( (NULL == n) || 666 if ( (NULL == n) ||
567 (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) ) 667 (n->is_connected == GNUNET_YES) )
568 return GNUNET_NO; /* not connected */ 668 return GNUNET_NO; /* not connected */
569 return GNUNET_YES; 669 return GNUNET_YES;
570} 670}
@@ -593,7 +693,7 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target,
593 693
594 n = lookup_neighbour (target); 694 n = lookup_neighbour (target);
595 if ( (n == NULL) || 695 if ( (n == NULL) ||
596 (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) ) 696 (GNUNET_YES != n->is_connected) )
597 { 697 {
598 GNUNET_STATISTICS_update (GST_stats, 698 GNUNET_STATISTICS_update (GST_stats,
599 gettext_noop ("# SET QUOTA messages ignored (no such peer)"), 699 gettext_noop ("# SET QUOTA messages ignored (no such peer)"),
@@ -620,7 +720,10 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target,
620 GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, 720 GNUNET_CONTAINER_DLL_insert_tail (n->messages_head,
621 n->messages_tail, 721 n->messages_tail,
622 mq); 722 mq);
623 // try_transmission_to_peer (n); 723 if ( (GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
724 (NULL == n->is_active) )
725 n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
726 n);
624} 727}
625 728
626 729
@@ -667,9 +770,6 @@ GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity *sender
667 n->quota_violation_count--; 770 n->quota_violation_count--;
668 } 771 }
669 } 772 }
670 n->peer_timeout =
671 GNUNET_TIME_relative_to_absolute
672 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
673 GNUNET_SCHEDULER_cancel (n->timeout_task); 773 GNUNET_SCHEDULER_cancel (n->timeout_task);
674 n->timeout_task = 774 n->timeout_task =
675 GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 775 GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
@@ -773,8 +873,8 @@ neighbours_iterate (void *cls,
773 struct IteratorContext *ic = cls; 873 struct IteratorContext *ic = cls;
774 struct NeighbourMapEntry *n = value; 874 struct NeighbourMapEntry *n = value;
775 875
776 if (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) 876 if (GNUNET_YES != n->is_connected)
777 return GNUNET_OK; /* not connected */ 877 return GNUNET_OK;
778 GNUNET_assert (n->ats_count > 0); 878 GNUNET_assert (n->ats_count > 0);
779 ic->cb (ic->cb_cls, 879 ic->cb (ic->cb_cls,
780 &n->id, 880 &n->id,
@@ -813,9 +913,25 @@ void
813GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target) 913GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
814{ 914{
815 struct NeighbourMapEntry *n; 915 struct NeighbourMapEntry *n;
916 struct GNUNET_TRANSPORT_PluginFunctions *papi;
917 struct GNUNET_MessageHeader disconnect_msg;
816 918
817 n = lookup_neighbour (target); 919 n = lookup_neighbour (target);
818 /* FIXME: send disconnect message to target... */ 920 disconnect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
921 disconnect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
922 papi = GST_plugins_find (n->plugin_name);
923 if (papi != NULL)
924 papi->send (papi->cls,
925 target,
926 (const void*) &disconnect_msg,
927 sizeof (struct GNUNET_MessageHeader),
928 UINT32_MAX /* priority */,
929 GNUNET_TIME_UNIT_FOREVER_REL,
930 n->session,
931 n->addr,
932 n->addrlen,
933 GNUNET_YES,
934 NULL, NULL);
819 disconnect_neighbour (n); 935 disconnect_neighbour (n);
820} 936}
821 937