diff options
author | Christian Grothoff <christian@grothoff.org> | 2011-08-12 15:49:57 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2011-08-12 15:49:57 +0000 |
commit | 25d2cf2ca1882482d535c8ad0d879b9846e12ea2 (patch) | |
tree | cbc6313736a1d1941559f5b5c922cdd1db8c4386 /src/transport | |
parent | 63d15aa9d1ea50933ff34fc42916024a5d3196bf (diff) | |
download | gnunet-25d2cf2ca1882482d535c8ad0d879b9846e12ea2.tar.gz gnunet-25d2cf2ca1882482d535c8ad0d879b9846e12ea2.zip |
finishing neighbours
Diffstat (limited to 'src/transport')
-rw-r--r-- | src/transport/gnunet-service-transport_neighbours.c | 228 | ||||
-rw-r--r-- | src/transport/plugin_transport_tcp.c | 6 |
2 files changed, 175 insertions, 59 deletions
diff --git a/src/transport/gnunet-service-transport_neighbours.c b/src/transport/gnunet-service-transport_neighbours.c index f30878966..f262c2b87 100644 --- a/src/transport/gnunet-service-transport_neighbours.c +++ b/src/transport/gnunet-service-transport_neighbours.c | |||
@@ -26,6 +26,7 @@ | |||
26 | #include "platform.h" | 26 | #include "platform.h" |
27 | #include "gnunet_ats_service.h" | 27 | #include "gnunet_ats_service.h" |
28 | #include "gnunet-service-transport_neighbours.h" | 28 | #include "gnunet-service-transport_neighbours.h" |
29 | #include "gnunet-service-transport_plugins.h" | ||
29 | #include "gnunet-service-transport_validation.h" | 30 | #include "gnunet-service-transport_validation.h" |
30 | #include "gnunet-service-transport.h" | 31 | #include "gnunet-service-transport.h" |
31 | #include "gnunet_peerinfo_service.h" | 32 | #include "gnunet_peerinfo_service.h" |
@@ -45,15 +46,12 @@ | |||
45 | #define QUOTA_VIOLATION_DROP_THRESHOLD 10 | 46 | #define QUOTA_VIOLATION_DROP_THRESHOLD 10 |
46 | 47 | ||
47 | 48 | ||
48 | // TODO: | 49 | /** |
49 | // - have a way to access the currently 'connected' session | 50 | * Entry in neighbours. |
50 | // (for sending and to notice disconnect of it!) | 51 | */ |
51 | // - have a way to access/update bandwidth/quota information per peer | ||
52 | // (for CostReport/TrafficReport callbacks) | ||
53 | |||
54 | |||
55 | struct NeighbourMapEntry; | 52 | struct NeighbourMapEntry; |
56 | 53 | ||
54 | |||
57 | /** | 55 | /** |
58 | * For each neighbour we keep a list of messages | 56 | * For each neighbour we keep a list of messages |
59 | * that we still want to transmit to the neighbour. | 57 | * that we still want to transmit to the neighbour. |
@@ -72,6 +70,12 @@ struct MessageQueue | |||
72 | struct MessageQueue *prev; | 70 | struct MessageQueue *prev; |
73 | 71 | ||
74 | /** | 72 | /** |
73 | * Once this message is actively being transmitted, which | ||
74 | * neighbour is it associated with? | ||
75 | */ | ||
76 | struct NeighbourMapEntry *n; | ||
77 | |||
78 | /** | ||
75 | * Function to call once we're done. | 79 | * Function to call once we're done. |
76 | */ | 80 | */ |
77 | GST_NeighbourSendContinuation cont; | 81 | GST_NeighbourSendContinuation cont; |
@@ -130,6 +134,11 @@ struct NeighbourMapEntry | |||
130 | struct GNUNET_TRANSPORT_ATS_Information *ats; | 134 | struct GNUNET_TRANSPORT_ATS_Information *ats; |
131 | 135 | ||
132 | /** | 136 | /** |
137 | * Are we currently trying to send a message? If so, which one? | ||
138 | */ | ||
139 | struct MessageQueue *is_active; | ||
140 | |||
141 | /** | ||
133 | * Active session for communicating with the peer. | 142 | * Active session for communicating with the peer. |
134 | */ | 143 | */ |
135 | struct Session *session; | 144 | struct Session *session; |
@@ -161,18 +170,10 @@ struct NeighbourMapEntry | |||
161 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | 170 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; |
162 | 171 | ||
163 | /** | 172 | /** |
164 | * ID of task scheduled to run when we should retry transmitting | 173 | * ID of task scheduled to run when we should try transmitting |
165 | * the head of the message queue. Actually triggered when the | 174 | * the head of the message queue. |
166 | * transmission is timing out (we trigger instantly when we have | ||
167 | * a chance of success). | ||
168 | */ | ||
169 | GNUNET_SCHEDULER_TaskIdentifier retry_task; | ||
170 | |||
171 | /** | ||
172 | * How long until we should consider this peer dead (if we don't | ||
173 | * receive another message in the meantime)? | ||
174 | */ | 175 | */ |
175 | struct GNUNET_TIME_Absolute peer_timeout; | 176 | GNUNET_SCHEDULER_TaskIdentifier transmission_task; |
176 | 177 | ||
177 | /** | 178 | /** |
178 | * Tracker for inbound bandwidth. | 179 | * Tracker for inbound bandwidth. |
@@ -192,15 +193,9 @@ struct NeighbourMapEntry | |||
192 | unsigned int ats_count; | 193 | unsigned int ats_count; |
193 | 194 | ||
194 | /** | 195 | /** |
195 | * Have we seen an PONG from this neighbour in the past (and | ||
196 | * not had a disconnect since)? | ||
197 | */ | ||
198 | // int received_pong; | ||
199 | |||
200 | /** | ||
201 | * Are we already in the process of disconnecting this neighbour? | 196 | * Are we already in the process of disconnecting this neighbour? |
202 | */ | 197 | */ |
203 | // int in_disconnect; | 198 | int in_disconnect; |
204 | 199 | ||
205 | /** | 200 | /** |
206 | * Do we currently consider this neighbour connected? (as far as | 201 | * Do we currently consider this neighbour connected? (as far as |
@@ -246,7 +241,49 @@ lookup_neighbour (const struct GNUNET_PeerIdentity *pid) | |||
246 | } | 241 | } |
247 | 242 | ||
248 | 243 | ||
249 | #if 0 | 244 | /** |
245 | * Task invoked to start a transmission to another peer. | ||
246 | * | ||
247 | * @param cls the 'struct NeighbourMapEntry' | ||
248 | * @param tc scheduler context | ||
249 | */ | ||
250 | static void | ||
251 | transmission_task (void *cls, | ||
252 | const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
253 | |||
254 | |||
255 | /** | ||
256 | * We're done with our transmission attempt, continue processing. | ||
257 | * | ||
258 | * @param cls the 'struct MessageQueue' of the message | ||
259 | * @param receiver intended receiver | ||
260 | * @param success whether it worked or not | ||
261 | */ | ||
262 | static void | ||
263 | transmit_send_continuation (void *cls, | ||
264 | const struct GNUNET_PeerIdentity *receiver, | ||
265 | int success) | ||
266 | { | ||
267 | struct MessageQueue *mq; | ||
268 | struct NeighbourMapEntry *n; | ||
269 | |||
270 | mq = cls; | ||
271 | n = mq->n; | ||
272 | if (NULL != n) | ||
273 | { | ||
274 | GNUNET_assert (n->is_active == mq); | ||
275 | n->is_active = NULL; | ||
276 | GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); | ||
277 | n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, | ||
278 | n); | ||
279 | } | ||
280 | if (NULL != mq->cont) | ||
281 | mq->cont (mq->cont_cls, | ||
282 | success); | ||
283 | GNUNET_free (mq); | ||
284 | } | ||
285 | |||
286 | |||
250 | /** | 287 | /** |
251 | * Check the ready list for the given neighbour and if a plugin is | 288 | * Check the ready list for the given neighbour and if a plugin is |
252 | * ready for transmission (and if we have a message), do so! | 289 | * ready for transmission (and if we have a message), do so! |
@@ -259,41 +296,73 @@ try_transmission_to_peer (struct NeighbourMapEntry *n) | |||
259 | struct MessageQueue *mq; | 296 | struct MessageQueue *mq; |
260 | struct GNUNET_TIME_Relative timeout; | 297 | struct GNUNET_TIME_Relative timeout; |
261 | ssize_t ret; | 298 | ssize_t ret; |
299 | struct GNUNET_TRANSPORT_PluginFunctions *papi; | ||
262 | 300 | ||
263 | if (n->messages_head == NULL) | 301 | if (n->is_active != NULL) |
302 | return; /* transmission already pending */ | ||
303 | if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK) | ||
304 | return; /* currently waiting for bandwidth */ | ||
305 | mq = n->messages_head; | ||
306 | while (NULL != (mq = n->messages_head)) | ||
264 | { | 307 | { |
265 | #if DEBUG_TRANSPORT | 308 | timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); |
266 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 309 | if (timeout.rel_value > 0) |
267 | "Transmission queue for `%4s' is empty\n", | 310 | break; |
268 | GNUNET_i2s (&n->id)); | 311 | transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */ |
269 | #endif | 312 | } |
270 | return; /* nothing to do */ | 313 | if (NULL == mq) |
314 | return; /* no more messages */ | ||
315 | |||
316 | papi = GST_plugins_find (n->plugin_name); | ||
317 | if (papi == NULL) | ||
318 | { | ||
319 | GNUNET_break (0); | ||
320 | return; | ||
271 | } | 321 | } |
272 | mq = n->messages_head; | ||
273 | GNUNET_CONTAINER_DLL_remove (n->messages_head, | 322 | GNUNET_CONTAINER_DLL_remove (n->messages_head, |
274 | n->messages_tail, | 323 | n->messages_tail, |
275 | mq); | 324 | mq); |
325 | n->is_active = mq; | ||
326 | mq->n = n; | ||
276 | ret = papi->send (papi->cls, | 327 | ret = papi->send (papi->cls, |
277 | &n->pid, | 328 | &n->id, |
278 | mq->message_buf, | 329 | mq->message_buf, |
279 | mq->message_buf_size, | 330 | mq->message_buf_size, |
280 | mq->priority, | 331 | 0 /* priority -- remove from plugin API? */, |
281 | GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | 332 | timeout, |
282 | n->session, | 333 | n->session, |
283 | n->addr, | 334 | n->addr, |
284 | n->addrlen, | 335 | n->addrlen, |
285 | GNUNET_YES /*?*/, | 336 | GNUNET_YES, |
286 | &transmit_send_continuation, mq); | 337 | &transmit_send_continuation, mq); |
287 | if (ret == -1) | 338 | if (ret == -1) |
288 | { | 339 | { |
289 | /* failure, but 'send' would not call continuation in this case, | 340 | /* failure, but 'send' would not call continuation in this case, |
290 | so we need to do it here! */ | 341 | so we need to do it here! */ |
291 | transmit_send_continuation (mq, | 342 | transmit_send_continuation (mq, |
292 | &mq->neighbour_id, | 343 | &n->id, |
293 | GNUNET_SYSERR); | 344 | GNUNET_SYSERR); |
345 | n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, | ||
346 | n); | ||
294 | } | 347 | } |
295 | } | 348 | } |
296 | #endif | 349 | |
350 | |||
351 | /** | ||
352 | * Task invoked to start a transmission to another peer. | ||
353 | * | ||
354 | * @param cls the 'struct NeighbourMapEntry' | ||
355 | * @param tc scheduler context | ||
356 | */ | ||
357 | static void | ||
358 | transmission_task (void *cls, | ||
359 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
360 | { | ||
361 | struct NeighbourMapEntry *n = cls; | ||
362 | |||
363 | n->transmission_task = GNUNET_SCHEDULER_NO_TASK; | ||
364 | try_transmission_to_peer (n); | ||
365 | } | ||
297 | 366 | ||
298 | 367 | ||
299 | /** | 368 | /** |
@@ -325,22 +394,41 @@ disconnect_neighbour (struct NeighbourMapEntry *n) | |||
325 | { | 394 | { |
326 | struct MessageQueue *mq; | 395 | struct MessageQueue *mq; |
327 | 396 | ||
328 | if (n->is_connected) | 397 | if (GNUNET_YES == n->in_disconnect) |
398 | return; | ||
399 | n->in_disconnect = GNUNET_YES; | ||
400 | while (NULL != (mq = n->messages_head)) | ||
329 | { | 401 | { |
402 | GNUNET_CONTAINER_DLL_remove (n->messages_head, | ||
403 | n->messages_tail, | ||
404 | mq); | ||
405 | mq->cont (mq->cont_cls, GNUNET_SYSERR); | ||
406 | GNUNET_free (mq); | ||
407 | } | ||
408 | if (NULL != n->is_active) | ||
409 | { | ||
410 | n->is_active->n = NULL; | ||
411 | n->is_active = NULL; | ||
412 | } | ||
413 | if (GNUNET_YES == n->is_connected) | ||
414 | { | ||
415 | n->is_connected = GNUNET_NO; | ||
330 | disconnect_notify_cb (callback_cls, | 416 | disconnect_notify_cb (callback_cls, |
331 | &n->id); | 417 | &n->id); |
332 | n->is_connected = GNUNET_NO; | ||
333 | } | 418 | } |
334 | GNUNET_assert (GNUNET_YES == | 419 | GNUNET_assert (GNUNET_YES == |
335 | GNUNET_CONTAINER_multihashmap_remove (neighbours, | 420 | GNUNET_CONTAINER_multihashmap_remove (neighbours, |
336 | &n->id.hashPubKey, | 421 | &n->id.hashPubKey, |
337 | n)); | 422 | n)); |
338 | while (NULL != (mq = n->messages_head)) | 423 | if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task) |
339 | { | 424 | { |
340 | GNUNET_CONTAINER_DLL_remove (n->messages_head, | 425 | GNUNET_SCHEDULER_cancel (n->timeout_task); |
341 | n->messages_tail, | 426 | n->timeout_task = GNUNET_SCHEDULER_NO_TASK; |
342 | mq); | 427 | } |
343 | GNUNET_free (mq); | 428 | if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task) |
429 | { | ||
430 | GNUNET_SCHEDULER_cancel (n->timeout_task); | ||
431 | n->transmission_task = GNUNET_SCHEDULER_NO_TASK; | ||
344 | } | 432 | } |
345 | if (NULL != n->asc) | 433 | if (NULL != n->asc) |
346 | { | 434 | { |
@@ -446,6 +534,7 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, | |||
446 | uint32_t ats_count) | 534 | uint32_t ats_count) |
447 | { | 535 | { |
448 | struct NeighbourMapEntry *n; | 536 | struct NeighbourMapEntry *n; |
537 | struct GNUNET_MessageHeader connect_msg; | ||
449 | 538 | ||
450 | n = lookup_neighbour (peer); | 539 | n = lookup_neighbour (peer); |
451 | if (NULL == n) | 540 | if (NULL == n) |
@@ -466,6 +555,17 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, | |||
466 | ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)); | 555 | ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)); |
467 | GNUNET_free_non_null (n->plugin_name); | 556 | GNUNET_free_non_null (n->plugin_name); |
468 | n->plugin_name = GNUNET_strdup (plugin_name); | 557 | n->plugin_name = GNUNET_strdup (plugin_name); |
558 | GNUNET_SCHEDULER_cancel (n->timeout_task); | ||
559 | n->timeout_task = | ||
560 | GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | ||
561 | &neighbour_timeout_task, n); | ||
562 | connect_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); | ||
563 | connect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); | ||
564 | GST_neighbours_send (peer, | ||
565 | &connect_msg, | ||
566 | sizeof (connect_msg), | ||
567 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
568 | NULL, NULL); | ||
469 | } | 569 | } |
470 | 570 | ||
471 | 571 | ||
@@ -564,7 +664,7 @@ GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target) | |||
564 | 664 | ||
565 | n = lookup_neighbour (target); | 665 | n = lookup_neighbour (target); |
566 | if ( (NULL == n) || | 666 | if ( (NULL == n) || |
567 | (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) ) | 667 | (n->is_connected == GNUNET_YES) ) |
568 | return GNUNET_NO; /* not connected */ | 668 | return GNUNET_NO; /* not connected */ |
569 | return GNUNET_YES; | 669 | return GNUNET_YES; |
570 | } | 670 | } |
@@ -593,7 +693,7 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, | |||
593 | 693 | ||
594 | n = lookup_neighbour (target); | 694 | n = lookup_neighbour (target); |
595 | if ( (n == NULL) || | 695 | if ( (n == NULL) || |
596 | (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) ) | 696 | (GNUNET_YES != n->is_connected) ) |
597 | { | 697 | { |
598 | GNUNET_STATISTICS_update (GST_stats, | 698 | GNUNET_STATISTICS_update (GST_stats, |
599 | gettext_noop ("# SET QUOTA messages ignored (no such peer)"), | 699 | gettext_noop ("# SET QUOTA messages ignored (no such peer)"), |
@@ -620,7 +720,10 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, | |||
620 | GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, | 720 | GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, |
621 | n->messages_tail, | 721 | n->messages_tail, |
622 | mq); | 722 | mq); |
623 | // try_transmission_to_peer (n); | 723 | if ( (GNUNET_SCHEDULER_NO_TASK == n->transmission_task) && |
724 | (NULL == n->is_active) ) | ||
725 | n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, | ||
726 | n); | ||
624 | } | 727 | } |
625 | 728 | ||
626 | 729 | ||
@@ -667,9 +770,6 @@ GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity *sender | |||
667 | n->quota_violation_count--; | 770 | n->quota_violation_count--; |
668 | } | 771 | } |
669 | } | 772 | } |
670 | n->peer_timeout = | ||
671 | GNUNET_TIME_relative_to_absolute | ||
672 | (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); | ||
673 | GNUNET_SCHEDULER_cancel (n->timeout_task); | 773 | GNUNET_SCHEDULER_cancel (n->timeout_task); |
674 | n->timeout_task = | 774 | n->timeout_task = |
675 | GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | 775 | GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, |
@@ -773,8 +873,8 @@ neighbours_iterate (void *cls, | |||
773 | struct IteratorContext *ic = cls; | 873 | struct IteratorContext *ic = cls; |
774 | struct NeighbourMapEntry *n = value; | 874 | struct NeighbourMapEntry *n = value; |
775 | 875 | ||
776 | if (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) | 876 | if (GNUNET_YES != n->is_connected) |
777 | return GNUNET_OK; /* not connected */ | 877 | return GNUNET_OK; |
778 | GNUNET_assert (n->ats_count > 0); | 878 | GNUNET_assert (n->ats_count > 0); |
779 | ic->cb (ic->cb_cls, | 879 | ic->cb (ic->cb_cls, |
780 | &n->id, | 880 | &n->id, |
@@ -813,9 +913,25 @@ void | |||
813 | GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target) | 913 | GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target) |
814 | { | 914 | { |
815 | struct NeighbourMapEntry *n; | 915 | struct NeighbourMapEntry *n; |
916 | struct GNUNET_TRANSPORT_PluginFunctions *papi; | ||
917 | struct GNUNET_MessageHeader disconnect_msg; | ||
816 | 918 | ||
817 | n = lookup_neighbour (target); | 919 | n = lookup_neighbour (target); |
818 | /* FIXME: send disconnect message to target... */ | 920 | disconnect_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); |
921 | disconnect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); | ||
922 | papi = GST_plugins_find (n->plugin_name); | ||
923 | if (papi != NULL) | ||
924 | papi->send (papi->cls, | ||
925 | target, | ||
926 | (const void*) &disconnect_msg, | ||
927 | sizeof (struct GNUNET_MessageHeader), | ||
928 | UINT32_MAX /* priority */, | ||
929 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
930 | n->session, | ||
931 | n->addr, | ||
932 | n->addrlen, | ||
933 | GNUNET_YES, | ||
934 | NULL, NULL); | ||
819 | disconnect_neighbour (n); | 935 | disconnect_neighbour (n); |
820 | } | 936 | } |
821 | 937 | ||
diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c index e796dacf4..2c20ba35e 100644 --- a/src/transport/plugin_transport_tcp.c +++ b/src/transport/plugin_transport_tcp.c | |||
@@ -834,6 +834,9 @@ disconnect_session (struct Session *session) | |||
834 | (session->transmit_handle); | 834 | (session->transmit_handle); |
835 | session->transmit_handle = NULL; | 835 | session->transmit_handle = NULL; |
836 | } | 836 | } |
837 | session->plugin->env->session_end (session->plugin->env->cls, | ||
838 | &session->target, | ||
839 | session); | ||
837 | while (NULL != (pm = session->pending_messages_head)) | 840 | while (NULL != (pm = session->pending_messages_head)) |
838 | { | 841 | { |
839 | #if DEBUG_TCP | 842 | #if DEBUG_TCP |
@@ -878,9 +881,6 @@ disconnect_session (struct Session *session) | |||
878 | -1, | 881 | -1, |
879 | GNUNET_NO); | 882 | GNUNET_NO); |
880 | GNUNET_free_non_null (session->connect_addr); | 883 | GNUNET_free_non_null (session->connect_addr); |
881 | session->plugin->env->session_end (session->plugin->env->cls, | ||
882 | &session->target, | ||
883 | session); | ||
884 | GNUNET_assert (NULL == session->transmit_handle); | 884 | GNUNET_assert (NULL == session->transmit_handle); |
885 | GNUNET_free (session); | 885 | GNUNET_free (session); |
886 | } | 886 | } |