aboutsummaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-transport_neighbours.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-08-07 06:04:52 +0000
committerChristian Grothoff <christian@grothoff.org>2011-08-07 06:04:52 +0000
commit5133e679d1c77f276ea7a23f2c054f61fa61ac08 (patch)
treed66c834a35d914c10e87172dcc1bdf3854bf74a3 /src/transport/gnunet-service-transport_neighbours.c
parent6ad7a1bffd1688f8ee1ecb37eacb3a55f671748c (diff)
downloadgnunet-5133e679d1c77f276ea7a23f2c054f61fa61ac08.tar.gz
gnunet-5133e679d1c77f276ea7a23f2c054f61fa61ac08.zip
towards neighbour management
Diffstat (limited to 'src/transport/gnunet-service-transport_neighbours.c')
-rw-r--r--src/transport/gnunet-service-transport_neighbours.c539
1 files changed, 539 insertions, 0 deletions
diff --git a/src/transport/gnunet-service-transport_neighbours.c b/src/transport/gnunet-service-transport_neighbours.c
index f961ea4f2..a3fdab9b7 100644
--- a/src/transport/gnunet-service-transport_neighbours.c
+++ b/src/transport/gnunet-service-transport_neighbours.c
@@ -26,6 +26,14 @@
26#include "platform.h" 26#include "platform.h"
27#include "gnunet-service-transport_neighbours.h" 27#include "gnunet-service-transport_neighbours.h"
28#include "gnunet-service-transport.h" 28#include "gnunet-service-transport.h"
29#include "gnunet_constants.h"
30
31
32/**
33 * Size of the neighbour hash map.
34 */
35#define NEIGHBOUR_TABLE_SIZE 256
36
29 37
30// TODO: 38// TODO:
31// - have a way to access the currently 'connected' session 39// - have a way to access the currently 'connected' session
@@ -34,6 +42,498 @@
34// (for CostReport/TrafficReport callbacks) 42// (for CostReport/TrafficReport callbacks)
35 43
36 44
45struct NeighbourMapEntry;
46
47/**
48 * For each neighbour we keep a list of messages
49 * that we still want to transmit to the neighbour.
50 */
51struct MessageQueue
52{
53
54 /**
55 * This is a doubly linked list.
56 */
57 struct MessageQueue *next;
58
59 /**
60 * This is a doubly linked list.
61 */
62 struct MessageQueue *prev;
63
64 /**
65 * The message(s) we want to transmit, GNUNET_MessageHeader(s)
66 * stuck together in memory. Allocated at the end of this struct.
67 */
68 const char *message_buf;
69
70 /**
71 * Size of the message buf
72 */
73 size_t message_buf_size;
74
75 /**
76 * Client responsible for queueing the message; used to check that a
77 * client has no two messages pending for the same target and to
78 * notify the client of a successful transmission; NULL if this is
79 * an internal message.
80 */
81 struct TransportClient *client;
82
83 /**
84 * At what time should we fail?
85 */
86 struct GNUNET_TIME_Absolute timeout;
87
88 /**
89 * Internal message of the transport system that should not be
90 * included in the usual SEND-SEND_OK transmission confirmation
91 * traffic management scheme. Typically, "internal_msg" will
92 * be set whenever "client" is NULL (but it is not strictly
93 * required).
94 */
95 int internal_msg;
96
97 /**
98 * How important is the message?
99 */
100 unsigned int priority;
101
102};
103
104
105
106/**
107 * Entry in neighbours.
108 */
109struct NeighbourMapEntry
110{
111
112 /**
113 * Head of list of messages we would like to send to this peer;
114 * must contain at most one message per client.
115 */
116 struct MessageQueue *messages_head;
117
118 /**
119 * Tail of list of messages we would like to send to this peer; must
120 * contain at most one message per client.
121 */
122 struct MessageQueue *messages_tail;
123
124 /**
125 * Context for peerinfo iteration.
126 * NULL after we are done processing peerinfo's information.
127 */
128 struct GNUNET_PEERINFO_IteratorContext *piter;
129
130 /**
131 * Performance data for the peer.
132 */
133 struct GNUNET_TRANSPORT_ATS_Information *ats;
134
135 /**
136 * Public key for this peer. Valid only if the respective flag is set below.
137 */
138 struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
139
140 /**
141 * Identity of this neighbour.
142 */
143 struct GNUNET_PeerIdentity id;
144
145 /**
146 * ID of task scheduled to run when this peer is about to
147 * time out (will free resources associated with the peer).
148 */
149 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
150
151 /**
152 * ID of task scheduled to run when we should retry transmitting
153 * the head of the message queue. Actually triggered when the
154 * transmission is timing out (we trigger instantly when we have
155 * a chance of success).
156 */
157 GNUNET_SCHEDULER_TaskIdentifier retry_task;
158
159 /**
160 * How long until we should consider this peer dead (if we don't
161 * receive another message in the meantime)?
162 */
163 struct GNUNET_TIME_Absolute peer_timeout;
164
165 /**
166 * Tracker for inbound bandwidth.
167 */
168 struct GNUNET_BANDWIDTH_Tracker in_tracker;
169
170 /**
171 * The latency we have seen for this particular address for
172 * this particular peer. This latency may have been calculated
173 * over multiple transports. This value reflects how long it took
174 * us to receive a response when SENDING via this particular
175 * transport/neighbour/address combination!
176 *
177 * FIXME: we need to periodically send PINGs to update this
178 * latency (at least more often than the current "huge" (11h?)
179 * update interval).
180 */
181 struct GNUNET_TIME_Relative latency;
182
183 /**
184 * How often has the other peer (recently) violated the inbound
185 * traffic limit? Incremented by 10 per violation, decremented by 1
186 * per non-violation (for each time interval).
187 */
188 unsigned int quota_violation_count;
189
190 /**
191 * DV distance to this peer (1 if no DV is used).
192 */
193 uint32_t distance;
194
195 /**
196 * Have we seen an PONG from this neighbour in the past (and
197 * not had a disconnect since)?
198 */
199 int received_pong;
200
201 /**
202 * Do we have a valid public key for this neighbour?
203 */
204 int public_key_valid;
205
206 /**
207 * Are we already in the process of disconnecting this neighbour?
208 */
209 int in_disconnect;
210
211};
212
213
214/**
215 * All known neighbours and their HELLOs.
216 */
217static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
218
219/**
220 * Closure for connect_notify_cb and disconnect_notify_cb
221 */
222static void *callback_cls;
223
224/**
225 * Function to call when we connected to a neighbour.
226 */
227static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
228
229/**
230 * Function to call when we disconnected from a neighbour.
231 */
232static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
233
234
235#if 0
236/**
237 * Check the ready list for the given neighbour and if a plugin is
238 * ready for transmission (and if we have a message), do so!
239 *
240 * @param neighbour target peer for which to transmit
241 */
242static void
243try_transmission_to_peer (struct NeighbourMapEntry *n)
244{
245 struct ReadyList *rl;
246 struct MessageQueue *mq;
247 struct GNUNET_TIME_Relative timeout;
248 ssize_t ret;
249 int force_address;
250
251 if (n->messages_head == NULL)
252 {
253#if DEBUG_TRANSPORT
254 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
255 "Transmission queue for `%4s' is empty\n",
256 GNUNET_i2s (&n->id));
257#endif
258 return; /* nothing to do */
259 }
260 rl = NULL;
261 mq = n->messages_head;
262 force_address = GNUNET_YES;
263 if (mq->specific_address == NULL)
264 {
265 /* TODO: ADD ATS */
266 mq->specific_address = get_preferred_ats_address(n);
267 GNUNET_STATISTICS_update (stats,
268 gettext_noop ("# transport selected peer address freely"),
269 1,
270 GNUNET_NO);
271 force_address = GNUNET_NO;
272 }
273 if (mq->specific_address == NULL)
274 {
275 GNUNET_STATISTICS_update (stats,
276 gettext_noop ("# transport failed to selected peer address"),
277 1,
278 GNUNET_NO);
279 timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
280 if (timeout.rel_value == 0)
281 {
282#if DEBUG_TRANSPORT
283 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
284 "No destination address available to transmit message of size %u to peer `%4s'\n",
285 mq->message_buf_size,
286 GNUNET_i2s (&mq->neighbour_id));
287#endif
288 GNUNET_STATISTICS_update (stats,
289 gettext_noop ("# bytes in message queue for other peers"),
290 - (int64_t) mq->message_buf_size,
291 GNUNET_NO);
292 GNUNET_STATISTICS_update (stats,
293 gettext_noop ("# bytes discarded (no destination address available)"),
294 mq->message_buf_size,
295 GNUNET_NO);
296 if (mq->client != NULL)
297 transmit_send_ok (mq->client, n, &n->id, GNUNET_NO);
298 GNUNET_CONTAINER_DLL_remove (n->messages_head,
299 n->messages_tail,
300 mq);
301 GNUNET_free (mq);
302 return; /* nobody ready */
303 }
304 GNUNET_STATISTICS_update (stats,
305 gettext_noop ("# message delivery deferred (no address)"),
306 1,
307 GNUNET_NO);
308 if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
309 GNUNET_SCHEDULER_cancel (n->retry_task);
310 n->retry_task = GNUNET_SCHEDULER_add_delayed (timeout,
311 &retry_transmission_task,
312 n);
313#if DEBUG_TRANSPORT
314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
315 "No validated destination address available to transmit message of size %u to peer `%4s', will wait %llums to find an address.\n",
316 mq->message_buf_size,
317 GNUNET_i2s (&mq->neighbour_id),
318 timeout.rel_value);
319#endif
320 /* FIXME: might want to trigger peerinfo lookup here
321 (unless that's already pending...) */
322 return;
323 }
324 GNUNET_CONTAINER_DLL_remove (n->messages_head,
325 n->messages_tail,
326 mq);
327 if (mq->specific_address->connected == GNUNET_NO)
328 mq->specific_address->connect_attempts++;
329 rl = mq->specific_address->ready_list;
330 mq->plugin = rl->plugin;
331 if (!mq->internal_msg)
332 mq->specific_address->in_transmit = GNUNET_YES;
333#if DEBUG_TRANSPORT
334 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
335 "Sending message of size %u for `%4s' to `%s' via plugin `%s'\n",
336 mq->message_buf_size,
337 GNUNET_i2s (&n->id),
338 (mq->specific_address->addr != NULL)
339 ? a2s (mq->plugin->short_name,
340 mq->specific_address->addr,
341 mq->specific_address->addrlen)
342 : "<inbound>",
343 rl->plugin->short_name);
344#endif
345 GNUNET_STATISTICS_update (stats,
346 gettext_noop ("# bytes in message queue for other peers"),
347 - (int64_t) mq->message_buf_size,
348 GNUNET_NO);
349 GNUNET_STATISTICS_update (stats,
350 gettext_noop ("# bytes pending with plugins"),
351 mq->message_buf_size,
352 GNUNET_NO);
353
354 GNUNET_CONTAINER_DLL_insert (n->cont_head,
355 n->cont_tail,
356 mq);
357
358 ret = rl->plugin->api->send (rl->plugin->api->cls,
359 &mq->neighbour_id,
360 mq->message_buf,
361 mq->message_buf_size,
362 mq->priority,
363 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
364 mq->specific_address->session,
365 mq->specific_address->addr,
366 mq->specific_address->addrlen,
367 force_address,
368 &transmit_send_continuation, mq);
369 if (ret == -1)
370 {
371 /* failure, but 'send' would not call continuation in this case,
372 so we need to do it here! */
373 transmit_send_continuation (mq,
374 &mq->neighbour_id,
375 GNUNET_SYSERR);
376 }
377}
378
379
380/**
381 * Send the specified message to the specified peer.
382 *
383 * @param client source of the transmission request (can be NULL)
384 * @param peer_address ForeignAddressList where we should send this message
385 * @param priority how important is the message
386 * @param timeout how long do we have to transmit?
387 * @param message_buf message(s) to send GNUNET_MessageHeader(s)
388 * @param message_buf_size total size of all messages in message_buf
389 * @param is_internal is this an internal message; these are pre-pended and
390 * also do not count for plugins being "ready" to transmit
391 * @param neighbour handle to the neighbour for transmission
392 */
393static void
394transmit_to_peer (struct TransportClient *client,
395 struct ForeignAddressList *peer_address,
396 unsigned int priority,
397 struct GNUNET_TIME_Relative timeout,
398 const char *message_buf,
399 size_t message_buf_size,
400 int is_internal, struct NeighbourMapEntry *neighbour)
401{
402 struct MessageQueue *mq;
403
404#if EXTRA_CHECKS
405 if (client != NULL)
406 {
407 /* check for duplicate submission */
408 mq = neighbour->messages_head;
409 while (NULL != mq)
410 {
411 if (mq->client == client)
412 {
413 /* client transmitted to same peer twice
414 before getting SEND_OK! */
415 GNUNET_break (0);
416 return;
417 }
418 mq = mq->next;
419 }
420 }
421#endif
422 GNUNET_STATISTICS_update (stats,
423 gettext_noop ("# bytes in message queue for other peers"),
424 message_buf_size,
425 GNUNET_NO);
426 mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size);
427 mq->specific_address = peer_address;
428 mq->client = client;
429 /* FIXME: this memcpy can be up to 7% of our total runtime! */
430 memcpy (&mq[1], message_buf, message_buf_size);
431 mq->message_buf = (const char*) &mq[1];
432 mq->message_buf_size = message_buf_size;
433 memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct GNUNET_PeerIdentity));
434 mq->internal_msg = is_internal;
435 mq->priority = priority;
436 mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
437 if (is_internal)
438 GNUNET_CONTAINER_DLL_insert (neighbour->messages_head,
439 neighbour->messages_tail,
440 mq);
441 else
442 GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head,
443 neighbour->messages_tail,
444 neighbour->messages_tail,
445 mq);
446 try_transmission_to_peer (neighbour);
447}
448
449
450/**
451 * Create a fresh entry in our neighbour list for the given peer.
452 * Will try to transmit our current HELLO to the new neighbour.
453 * Do not call this function directly, use 'setup_peer_check_blacklist.
454 *
455 * @param peer the peer for which we create the entry
456 * @param do_hello should we schedule transmitting a HELLO
457 * @return the new neighbour list entry
458 */
459static struct NeighbourMapEntry *
460setup_new_neighbour (const struct GNUNET_PeerIdentity *peer,
461 int do_hello)
462{
463 struct NeighbourMapEntry *n;
464 struct TransportPlugin *tp;
465 struct ReadyList *rl;
466
467 GNUNET_assert (0 != memcmp (peer,
468 &my_identity,
469 sizeof (struct GNUNET_PeerIdentity)));
470#if DEBUG_TRANSPORT
471 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
472 "Setting up state for neighbour `%4s'\n",
473 GNUNET_i2s (peer));
474#endif
475 GNUNET_STATISTICS_update (stats,
476 gettext_noop ("# active neighbours"),
477 1,
478 GNUNET_NO);
479 n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
480 n->id = *peer;
481 n->peer_timeout =
482 GNUNET_TIME_relative_to_absolute
483 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
484 GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
485 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
486 MAX_BANDWIDTH_CARRY_S);
487 tp = plugins;
488 while (tp != NULL)
489 {
490 if ((tp->api->send != NULL) && (!is_blacklisted(peer, tp)))
491 {
492 rl = GNUNET_malloc (sizeof (struct ReadyList));
493 rl->neighbour = n;
494 rl->next = n->plugins;
495 n->plugins = rl;
496 rl->plugin = tp;
497 rl->addresses = NULL;
498 }
499 tp = tp->next;
500 }
501 n->latency = GNUNET_TIME_UNIT_FOREVER_REL;
502 n->distance = -1;
503 n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
504 &neighbour_timeout_task, n);
505 GNUNET_CONTAINER_multihashmap_put (neighbours,
506 &n->id.hashPubKey,
507 n,
508 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
509 if (do_hello)
510 {
511 GNUNET_STATISTICS_update (stats,
512 gettext_noop ("# peerinfo new neighbor iterate requests"),
513 1,
514 GNUNET_NO);
515 GNUNET_STATISTICS_update (stats,
516 gettext_noop ("# outstanding peerinfo iterate requests"),
517 1,
518 GNUNET_NO);
519 n->piter = GNUNET_PEERINFO_iterate (peerinfo, peer,
520 GNUNET_TIME_UNIT_FOREVER_REL,
521 &add_hello_for_peer, n);
522
523 GNUNET_STATISTICS_update (stats,
524 gettext_noop ("# HELLO's sent to new neighbors"),
525 1,
526 GNUNET_NO);
527 if (NULL != our_hello)
528 transmit_to_peer (NULL, NULL, 0,
529 HELLO_ADDRESS_EXPIRATION,
530 (const char *) our_hello, GNUNET_HELLO_size(our_hello),
531 GNUNET_NO, n);
532 }
533 return n;
534}
535#endif
536
37 537
38/** 538/**
39 * Initialize the neighbours subsystem. 539 * Initialize the neighbours subsystem.
@@ -47,6 +547,37 @@ GST_neighbours_start (void *cls,
47 GNUNET_TRANSPORT_NotifyConnect connect_cb, 547 GNUNET_TRANSPORT_NotifyConnect connect_cb,
48 GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb) 548 GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
49{ 549{
550 callback_cls = cls;
551 connect_notify_cb = connect_cb;
552 disconnect_notify_cb = disconnect_cb;
553 neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
554}
555
556
557/**
558 * Disconnect from the given neighbour.
559 *
560 * @param cls unused
561 * @param key hash of neighbour's public key (not used)
562 * @param value the 'struct NeighbourMapEntry' of the neighbour
563 */
564static int
565disconnect_all_neighbours (void *cls,
566 const GNUNET_HashCode *key,
567 void *value)
568{
569 struct NeighbourMapEntry *n = value;
570
571#if DEBUG_TRANSPORT
572 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573 "Disconnecting peer `%4s', %s\n",
574 GNUNET_i2s(&n->id),
575 "SHUTDOWN_TASK");
576#endif
577 // FIXME:
578 // disconnect_neighbour (n);
579 n++;
580 return GNUNET_OK;
50} 581}
51 582
52 583
@@ -56,6 +587,14 @@ GST_neighbours_start (void *cls,
56void 587void
57GST_neighbours_stop () 588GST_neighbours_stop ()
58{ 589{
590 GNUNET_CONTAINER_multihashmap_iterate (neighbours,
591 &disconnect_all_neighbours,
592 NULL);
593 GNUNET_CONTAINER_multihashmap_destroy (neighbours);
594 neighbours = NULL;
595 callback_cls = NULL;
596 connect_notify_cb = NULL;
597 disconnect_notify_cb = NULL;
59} 598}
60 599
61 600