aboutsummaryrefslogtreecommitdiff
path: root/src/transport/plugin_transport_unix.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2013-05-05 12:01:06 +0000
committerChristian Grothoff <christian@grothoff.org>2013-05-05 12:01:06 +0000
commit843f898e3d1391aba7003e6e7c9ec0d7b3530fac (patch)
tree9dce3f1b73f73d656863bb939a71be74817ceef8 /src/transport/plugin_transport_unix.c
parent460223a5a382c80881b4fb01b17e592d98af0a39 (diff)
downloadgnunet-843f898e3d1391aba7003e6e7c9ec0d7b3530fac.tar.gz
gnunet-843f898e3d1391aba7003e6e7c9ec0d7b3530fac.zip
-hopefully fixing #2869
Diffstat (limited to 'src/transport/plugin_transport_unix.c')
-rw-r--r--src/transport/plugin_transport_unix.c837
1 files changed, 469 insertions, 368 deletions
diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c
index 646695f79..81aa9cb94 100644
--- a/src/transport/plugin_transport_unix.c
+++ b/src/transport/plugin_transport_unix.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 (C) 2010 Christian Grothoff (and other contributing authors) 3 (C) 2010, 2013 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -26,29 +26,22 @@
26 * @author Christian Grothoff 26 * @author Christian Grothoff
27 * @author Nathan Evans 27 * @author Nathan Evans
28 */ 28 */
29
30#include "platform.h" 29#include "platform.h"
31#include "gnunet_hello_lib.h" 30#include "gnunet_hello_lib.h"
32#include "gnunet_connection_lib.h" 31#include "gnunet_util_lib.h"
33#include "gnunet_container_lib.h"
34#include "gnunet_os_lib.h"
35#include "gnunet_peerinfo_service.h"
36#include "gnunet_protocols.h" 32#include "gnunet_protocols.h"
37#include "gnunet_resolver_service.h"
38#include "gnunet_server_lib.h"
39#include "gnunet_signatures.h"
40#include "gnunet_statistics_service.h" 33#include "gnunet_statistics_service.h"
41#include "gnunet_transport_service.h" 34#include "gnunet_transport_service.h"
42#include "gnunet_transport_plugin.h" 35#include "gnunet_transport_plugin.h"
43#include "transport.h" 36#include "transport.h"
44 37
45#define MAX_PROBES 20
46#define MAX_RETRIES 3
47#define RETRY 0
48
49#define LOG(kind,...) GNUNET_log_from (kind, "transport-unix",__VA_ARGS__)
50 38
51#define DEFAULT_NAT_PORT 0 39/**
40 * Return code we give on 'send' if we failed to send right now
41 * but it makes sense to retry later. (Note: we might want to
42 * move this to the plugin API!?).
43 */
44#define RETRY 0
52 45
53/** 46/**
54 * How long until we give up on transmitting the welcome message? 47 * How long until we give up on transmitting the welcome message?
@@ -56,10 +49,15 @@
56#define HOSTNAME_RESOLVE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) 49#define HOSTNAME_RESOLVE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
57 50
58/** 51/**
59 * Starting port for listening and sending, eventually a config value 52 * Default "port" to use, if configuration does not specify.
53 * Essentially just a number appended to the UNIX path.
60 */ 54 */
61#define UNIX_NAT_DEFAULT_PORT 22086 55#define UNIX_NAT_DEFAULT_PORT 22086
62 56
57
58#define LOG(kind,...) GNUNET_log_from (kind, "transport-unix",__VA_ARGS__)
59
60
63GNUNET_NETWORK_STRUCT_BEGIN 61GNUNET_NETWORK_STRUCT_BEGIN
64 62
65/** 63/**
@@ -79,44 +77,89 @@ struct UNIXMessage
79 77
80}; 78};
81 79
80
81/**
82 * Handle for a session.
83 */
82struct Session 84struct Session
83{ 85{
84 struct GNUNET_PeerIdentity target; 86 struct GNUNET_PeerIdentity target;
85 87
88 struct Plugin * plugin;
89
86 void *addr; 90 void *addr;
91
87 size_t addrlen; 92 size_t addrlen;
88 93
89 /** 94 /**
90 * Session timeout task 95 * Session timeout task
91 */ 96 */
92 GNUNET_SCHEDULER_TaskIdentifier timeout_task; 97 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
93
94 struct Plugin * plugin;
95}; 98};
96 99
100
97struct UNIXMessageWrapper 101struct UNIXMessageWrapper
98{ 102{
103 /**
104 * We keep messages in a doubly linked list.
105 */
99 struct UNIXMessageWrapper *next; 106 struct UNIXMessageWrapper *next;
107
108 /**
109 * We keep messages in a doubly linked list.
110 */
100 struct UNIXMessageWrapper *prev; 111 struct UNIXMessageWrapper *prev;
101 112
113 /**
114 * The actual payload (allocated separately right now).
115 */
102 struct UNIXMessage * msg; 116 struct UNIXMessage * msg;
103 size_t msgsize;
104 size_t payload;
105
106 struct GNUNET_TIME_Absolute timeout;
107 unsigned int priority;
108 117
118 /**
119 * Session this message belongs to.
120 */
109 struct Session *session; 121 struct Session *session;
122
123 /**
124 * Function to call upon transmission.
125 */
110 GNUNET_TRANSPORT_TransmitContinuation cont; 126 GNUNET_TRANSPORT_TransmitContinuation cont;
127
128 /**
129 * Closure for 'cont'.
130 */
111 void *cont_cls; 131 void *cont_cls;
132
133 /**
134 * Timeout for this message.
135 */
136 struct GNUNET_TIME_Absolute timeout;
137
138 /**
139 * Number of bytes in 'msg'.
140 */
141 size_t msgsize;
142
143 /**
144 * Number of bytes of payload encapsulated in 'msg'.
145 */
146 size_t payload;
147
148 /**
149 * Priority of the message (ignored, just dragged along in UNIX).
150 */
151 unsigned int priority;
112}; 152};
113 153
114/* Forward definition */ 154
155/**
156 * Encapsulation of all of the state of the plugin.
157 */
115struct Plugin; 158struct Plugin;
116 159
117 160
118/** 161/**
119 * UNIX NAT "Session" 162 * UNIX "Session"
120 */ 163 */
121struct PeerSession 164struct PeerSession
122{ 165{
@@ -166,6 +209,7 @@ struct PeerSession
166 209
167}; 210};
168 211
212
169/** 213/**
170 * Information we keep for each of our listen sockets. 214 * Information we keep for each of our listen sockets.
171 */ 215 */
@@ -177,7 +221,9 @@ struct UNIX_Sock_Info
177 struct GNUNET_NETWORK_Handle *desc; 221 struct GNUNET_NETWORK_Handle *desc;
178 222
179 /** 223 /**
180 * The port we bound to 224 * The port we bound to (not an actual PORT, as UNIX domain sockets
225 * don't have ports, but rather a number in the path name to make this
226 * one unique).
181 */ 227 */
182 uint16_t port; 228 uint16_t port;
183}; 229};
@@ -188,30 +234,31 @@ struct UNIX_Sock_Info
188 */ 234 */
189struct Plugin 235struct Plugin
190{ 236{
237
191 /** 238 /**
192 * Our environment. 239 * ID of task used to update our addresses when one expires.
193 */ 240 */
194 struct GNUNET_TRANSPORT_PluginEnvironment *env; 241 GNUNET_SCHEDULER_TaskIdentifier address_update_task;
195 242
196 /** 243 /**
197 * Sessions 244 * ID of select task
198 */ 245 */
199 struct GNUNET_CONTAINER_MultiHashMap *session_map; 246 GNUNET_SCHEDULER_TaskIdentifier select_task;
200 247
201 /** 248 /**
202 * ID of task used to update our addresses when one expires. 249 * Number of bytes we currently have in our write queue.
203 */ 250 */
204 GNUNET_SCHEDULER_TaskIdentifier address_update_task; 251 unsigned long long bytes_in_queue;
205 252
206 /** 253 /**
207 * ID of select task 254 * Our environment.
208 */ 255 */
209 GNUNET_SCHEDULER_TaskIdentifier select_task; 256 struct GNUNET_TRANSPORT_PluginEnvironment *env;
210 257
211 /** 258 /**
212 * Integer to append to unix domain socket. 259 * Sessions
213 */ 260 */
214 uint16_t port; 261 struct GNUNET_CONTAINER_MultiHashMap *session_map;
215 262
216 /** 263 /**
217 * FD Read set 264 * FD Read set
@@ -223,65 +270,81 @@ struct Plugin
223 */ 270 */
224 struct GNUNET_NETWORK_FDSet *ws; 271 struct GNUNET_NETWORK_FDSet *ws;
225 272
226 int with_ws;
227
228 /**
229 * socket that we transmit all data with
230 */
231 struct UNIX_Sock_Info unix_sock;
232
233 /** 273 /**
234 * Path of our unix domain socket (/tmp/unix-plugin-PORT) 274 * Path of our unix domain socket (/tmp/unix-plugin-PORT)
235 */ 275 */
236 char *unix_socket_path; 276 char *unix_socket_path;
237 277
278 /**
279 * Head of queue of messages to transmit.
280 */
238 struct UNIXMessageWrapper *msg_head; 281 struct UNIXMessageWrapper *msg_head;
282
283 /**
284 * Tail of queue of messages to transmit.
285 */
239 struct UNIXMessageWrapper *msg_tail; 286 struct UNIXMessageWrapper *msg_tail;
240 287
241 /** 288 /**
289 * socket that we transmit all data with
290 */
291 struct UNIX_Sock_Info unix_sock;
292
293 /**
242 * ATS network 294 * ATS network
243 */ 295 */
244 struct GNUNET_ATS_Information ats_network; 296 struct GNUNET_ATS_Information ats_network;
245 297
246 unsigned int bytes_in_queue; 298 /**
247 unsigned int bytes_in_sent; 299 * Is the write set in the current 'select' task? GNUNET_NO if the
248 unsigned int bytes_in_recv; 300 * write queue was empty when the main task was scheduled,
249 unsigned int bytes_discarded; 301 * GNUNET_YES if we're already waiting for being allowed to write.
302 */
303 int with_ws;
304
305 /**
306 * Integer to append to unix domain socket.
307 */
308 uint16_t port;
309
250}; 310};
251 311
252/**
253 * Start session timeout
254 */
255static void
256start_session_timeout (struct Session *s);
257 312
258/** 313/**
259 * Increment session timeout due to activity 314 * Increment session timeout due to activity
315 *
316 * @param s session for which the timeout should be moved
260 */ 317 */
261static void 318static void
262reschedule_session_timeout (struct Session *s); 319reschedule_session_timeout (struct Session *s);
263 320
321
264/** 322/**
265 * Cancel timeout 323 * We have been notified that our writeset has something to read. We don't
324 * know which socket needs to be read, so we have to check each one
325 * Then reschedule this function to be called again once more is available.
326 *
327 * @param cls the plugin handle
328 * @param tc the scheduling context (for rescheduling this function again)
266 */ 329 */
267static void 330static void
268stop_session_timeout (struct Session *s);
269
270
271static void
272unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); 331unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
273 332
274 333
334/**
335 * Re-schedule the main 'select' callback (unix_plugin_select)
336 * for this plugin.
337 *
338 * @param plugin the plugin context
339 */
275static void 340static void
276reschedule_select (struct Plugin * plugin) 341reschedule_select (struct Plugin * plugin)
277{ 342{
278
279 if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK) 343 if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
280 { 344 {
281 GNUNET_SCHEDULER_cancel (plugin->select_task); 345 GNUNET_SCHEDULER_cancel (plugin->select_task);
282 plugin->select_task = GNUNET_SCHEDULER_NO_TASK; 346 plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
283 } 347 }
284
285 if (NULL != plugin->msg_head) 348 if (NULL != plugin->msg_head)
286 { 349 {
287 plugin->select_task = 350 plugin->select_task =
@@ -304,15 +367,36 @@ reschedule_select (struct Plugin * plugin)
304 } 367 }
305} 368}
306 369
370
371/**
372 * Closure to 'lookup_session_it'.
373 */
307struct LookupCtx 374struct LookupCtx
308{ 375{
376 /**
377 * Location to store the session, if found.
378 */
309 struct Session *s; 379 struct Session *s;
380
381 /**
382 * Address we are looking for.
383 */
310 const struct sockaddr_un *addr; 384 const struct sockaddr_un *addr;
311}; 385};
312 386
313int lookup_session_it (void *cls, 387
314 const struct GNUNET_HashCode * key, 388/**
315 void *value) 389 * Function called to find a session by address.
390 *
391 * @param cls the 'struct LookupCtx'
392 * @param key peer we are looking for (unused)
393 * @param value a session
394 * @return GNUNET_YES if not found (continue looking), GNUNET_NO on success
395 */
396static int
397lookup_session_it (void *cls,
398 const struct GNUNET_HashCode * key,
399 void *value)
316{ 400{
317 struct LookupCtx *lctx = cls; 401 struct LookupCtx *lctx = cls;
318 struct Session *t = value; 402 struct Session *t = value;
@@ -326,23 +410,33 @@ int lookup_session_it (void *cls,
326} 410}
327 411
328 412
413/**
414 * Find an existing session by address.
415 *
416 * @param plugin the plugin
417 * @param sender for which peer should the session be?
418 * @param addr address to look for
419 * @return NULL if session was not found
420 */
329static struct Session * 421static struct Session *
330lookup_session (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, const struct sockaddr_un *addr) 422lookup_session (struct Plugin *plugin,
423 const struct GNUNET_PeerIdentity *sender,
424 const struct sockaddr_un *addr)
331{ 425{
332 struct LookupCtx lctx; 426 struct LookupCtx lctx;
333 427
334 GNUNET_assert (NULL != plugin); 428 GNUNET_assert (NULL != plugin);
335 GNUNET_assert (NULL != sender); 429 GNUNET_assert (NULL != sender);
336 GNUNET_assert (NULL != addr); 430 GNUNET_assert (NULL != addr);
337
338 lctx.s = NULL; 431 lctx.s = NULL;
339 lctx.addr = addr; 432 lctx.addr = addr;
340 433 GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map,
341 GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &sender->hashPubKey, &lookup_session_it, &lctx); 434 &sender->hashPubKey,
342 435 &lookup_session_it, &lctx);
343 return lctx.s; 436 return lctx.s;
344} 437}
345 438
439
346/** 440/**
347 * Functions with this signature are called whenever we need 441 * Functions with this signature are called whenever we need
348 * to close a session due to a disconnect or failure to 442 * to close a session due to a disconnect or failure to
@@ -353,18 +447,16 @@ lookup_session (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, const
353static void 447static void
354disconnect_session (struct Session *s) 448disconnect_session (struct Session *s)
355{ 449{
450 struct Plugin *plugin = s->plugin;
356 struct UNIXMessageWrapper *msgw; 451 struct UNIXMessageWrapper *msgw;
357 struct UNIXMessageWrapper *next; 452 struct UNIXMessageWrapper *next;
358 struct Plugin * plugin = s->plugin;
359 int removed; 453 int removed;
360 GNUNET_assert (plugin != NULL);
361 GNUNET_assert (s != NULL);
362 454
363 LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting session for peer `%s' `%s' \n", GNUNET_i2s (&s->target), s->addr); 455 LOG (GNUNET_ERROR_TYPE_DEBUG,
364 stop_session_timeout (s); 456 "Disconnecting session for peer `%s' `%s'\n",
457 GNUNET_i2s (&s->target),
458 s->addr);
365 plugin->env->session_end (plugin->env->cls, &s->target, s); 459 plugin->env->session_end (plugin->env->cls, &s->target, s);
366
367 msgw = plugin->msg_head;
368 removed = GNUNET_NO; 460 removed = GNUNET_NO;
369 next = plugin->msg_head; 461 next = plugin->msg_head;
370 while (NULL != next) 462 while (NULL != next)
@@ -381,85 +473,22 @@ disconnect_session (struct Session *s)
381 GNUNET_free (msgw); 473 GNUNET_free (msgw);
382 removed = GNUNET_YES; 474 removed = GNUNET_YES;
383 } 475 }
476 GNUNET_assert (GNUNET_YES ==
477 GNUNET_CONTAINER_multihashmap_remove (plugin->session_map,
478 &s->target.hashPubKey,
479 s));
480 GNUNET_STATISTICS_set (plugin->env->stats,
481 "# UNIX sessions active",
482 GNUNET_CONTAINER_multihashmap_size (plugin->session_map),
483 GNUNET_NO);
384 if ((GNUNET_YES == removed) && (NULL == plugin->msg_head)) 484 if ((GNUNET_YES == removed) && (NULL == plugin->msg_head))
385 reschedule_select (plugin); 485 reschedule_select (plugin);
386 486 if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
387 GNUNET_assert (GNUNET_YES ==
388 GNUNET_CONTAINER_multihashmap_remove(plugin->session_map, &s->target.hashPubKey, s));
389
390 GNUNET_STATISTICS_set(plugin->env->stats,
391 "# UNIX sessions active",
392 GNUNET_CONTAINER_multihashmap_size(plugin->session_map),
393 GNUNET_NO);
394
395 GNUNET_free (s);
396}
397
398static int
399get_session_delete_it (void *cls, const struct GNUNET_HashCode * key, void *value)
400{
401 struct Session *s = value;
402 disconnect_session (s);
403 return GNUNET_YES;
404}
405
406
407/**
408 * Disconnect from a remote node. Clean up session if we have one for this peer
409 *
410 * @param cls closure for this call (should be handle to Plugin)
411 * @param target the peeridentity of the peer to disconnect
412 * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
413 */
414static void
415unix_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
416{
417 struct Plugin *plugin = cls;
418 GNUNET_assert (plugin != NULL);
419
420 GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &target->hashPubKey, &get_session_delete_it, plugin);
421 return;
422}
423
424/**
425 * Shutdown the server process (stop receiving inbound traffic). Maybe
426 * restarted later!
427 *
428 * @param cls Handle to the plugin for this transport
429 *
430 * @return returns the number of sockets successfully closed,
431 * should equal the number of sockets successfully opened
432 */
433static int
434unix_transport_server_stop (void *cls)
435{
436 struct Plugin *plugin = cls;
437 struct UNIXMessageWrapper * msgw;
438
439 while (NULL != (msgw = plugin->msg_head))
440 {
441 GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw);
442 if (msgw->cont != NULL)
443 msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR,
444 msgw->payload, 0);
445 GNUNET_free (msgw->msg);
446 GNUNET_free (msgw);
447 }
448
449 if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
450 {
451 GNUNET_SCHEDULER_cancel (plugin->select_task);
452 plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
453 }
454
455 if (NULL != plugin->unix_sock.desc)
456 { 487 {
457 GNUNET_break (GNUNET_OK == 488 GNUNET_SCHEDULER_cancel (s->timeout_task);
458 GNUNET_NETWORK_socket_close (plugin->unix_sock.desc)); 489 s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
459 plugin->unix_sock.desc = NULL;
460 plugin->with_ws = GNUNET_NO;
461 } 490 }
462 return GNUNET_OK; 491 GNUNET_free (s);
463} 492}
464 493
465 494
@@ -482,8 +511,7 @@ unix_transport_server_stop (void *cls)
482 * for the next transmission call; or if the 511 * for the next transmission call; or if the
483 * peer disconnected...) 512 * peer disconnected...)
484 * @param cont_cls closure for cont 513 * @param cont_cls closure for cont
485 * 514 * @return on success the number of bytes written, RETRY for retry, -1 on errors
486 * @return on success : the number of bytes written, 0 n retry, -1 on errors
487 */ 515 */
488static ssize_t 516static ssize_t
489unix_real_send (void *cls, 517unix_real_send (void *cls,
@@ -505,13 +533,12 @@ unix_real_send (void *cls,
505 size_t slen; 533 size_t slen;
506 534
507 GNUNET_assert (NULL != plugin); 535 GNUNET_assert (NULL != plugin);
508 536 if (NULL == send_handle)
509 if (send_handle == NULL)
510 { 537 {
511 GNUNET_break (0); /* We do not have a send handle */ 538 GNUNET_break (0); /* We do not have a send handle */
512 return GNUNET_SYSERR; 539 return GNUNET_SYSERR;
513 } 540 }
514 if ((addr == NULL) || (addrlen == 0)) 541 if ((NULL == addr) || (0 == addrlen))
515 { 542 {
516 GNUNET_break (0); /* Can never send if we don't have an address */ 543 GNUNET_break (0); /* Can never send if we don't have an address */
517 return GNUNET_SYSERR; 544 return GNUNET_SYSERR;
@@ -543,15 +570,10 @@ resend:
543 570
544 if (GNUNET_SYSERR == sent) 571 if (GNUNET_SYSERR == sent)
545 { 572 {
546 if (errno == EAGAIN) 573 if ( (EAGAIN == errno) ||
547 { 574 (ENOBUFS == errno) )
548 return RETRY; /* We have to retry later */
549 }
550 if (errno == ENOBUFS)
551 {
552 return RETRY; /* We have to retry later */ 575 return RETRY; /* We have to retry later */
553 } 576 if (EMSGSIZE == errno)
554 if (errno == EMSGSIZE)
555 { 577 {
556 socklen_t size = 0; 578 socklen_t size = 0;
557 socklen_t len = sizeof (size); 579 socklen_t len = sizeof (size);
@@ -587,29 +609,55 @@ resend:
587 } 609 }
588 610
589 LOG (GNUNET_ERROR_TYPE_DEBUG, 611 LOG (GNUNET_ERROR_TYPE_DEBUG,
590 "UNIX transmit %u-byte message to %s (%d: %s)\n", 612 "UNIX transmit %u-byte message to %s (%d: %s)\n",
591 (unsigned int) msgbuf_size, GNUNET_a2s (sb, sbs), (int) sent, 613 (unsigned int) msgbuf_size,
592 (sent < 0) ? STRERROR (errno) : "ok"); 614 GNUNET_a2s (sb, sbs),
615 (int) sent,
616 (sent < 0) ? STRERROR (errno) : "ok");
593 return sent; 617 return sent;
594} 618}
595 619
596struct gsi_ctx 620
597{ 621/**
598 char *address; 622 * Closure for 'get_session_it'.
599 size_t addrlen; 623 */
624struct GetSessionIteratorContext
625{
626 /**
627 * Location to store the session, if found.
628 */
600 struct Session *res; 629 struct Session *res;
630
631 /**
632 * Address information.
633 */
634 const char *address;
635
636 /**
637 * Number of bytes in 'address'
638 */
639 size_t addrlen;
601}; 640};
602 641
603 642
643/**
644 * Function called to find a session by address.
645 *
646 * @param cls the 'struct LookupCtx'
647 * @param key peer we are looking for (unused)
648 * @param value a session
649 * @return GNUNET_YES if not found (continue looking), GNUNET_NO on success
650 */
604static int 651static int
605get_session_it (void *cls, const struct GNUNET_HashCode * key, void *value) 652get_session_it (void *cls,
653 const struct GNUNET_HashCode *key,
654 void *value)
606{ 655{
607 struct gsi_ctx *gsi = cls; 656 struct GetSessionIteratorContext *gsi = cls;
608 struct Session *s = value; 657 struct Session *s = value;
609 658
610 LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing session %s %s\n", gsi->address, s->addr); 659 if ( (gsi->addrlen == s->addrlen) &&
611 if ((gsi->addrlen == s->addrlen) && 660 (0 == memcmp (gsi->address, s->addr, s->addrlen)) )
612 (0 == memcmp (gsi->address, s->addr, s->addrlen)))
613 { 661 {
614 gsi->res = s; 662 gsi->res = s;
615 return GNUNET_NO; 663 return GNUNET_NO;
@@ -617,6 +665,29 @@ get_session_it (void *cls, const struct GNUNET_HashCode * key, void *value)
617 return GNUNET_YES; 665 return GNUNET_YES;
618} 666}
619 667
668
669/**
670 * Session was idle for too long, so disconnect it
671 *
672 * @param cls the 'struct Session' to disconnect
673 * @param tc scheduler context
674 */
675static void
676session_timeout (void *cls,
677 const struct GNUNET_SCHEDULER_TaskContext *tc)
678{
679 struct Session *s = cls;
680
681 s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
682 LOG (GNUNET_ERROR_TYPE_DEBUG,
683 "Session %p was idle for %s, disconnecting\n",
684 s,
685 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
686 GNUNET_YES));
687 disconnect_session (s);
688}
689
690
620/** 691/**
621 * Creates a new outbound session the transport service will use to send data to the 692 * Creates a new outbound session the transport service will use to send data to the
622 * peer 693 * peer
@@ -627,60 +698,56 @@ get_session_it (void *cls, const struct GNUNET_HashCode * key, void *value)
627 */ 698 */
628static struct Session * 699static struct Session *
629unix_plugin_get_session (void *cls, 700unix_plugin_get_session (void *cls,
630 const struct GNUNET_HELLO_Address *address) 701 const struct GNUNET_HELLO_Address *address)
631{ 702{
632 struct Session * s = NULL;
633 struct Plugin *plugin = cls; 703 struct Plugin *plugin = cls;
634 struct gsi_ctx gsi; 704 struct Session *s;
705 struct GetSessionIteratorContext gsi;
635 706
636 /* Checks */ 707 GNUNET_assert (NULL != plugin);
637 GNUNET_assert (plugin != NULL); 708 GNUNET_assert (NULL != address);
638 GNUNET_assert (address != NULL);
639 709
640 /* Check if already existing */ 710 /* Check if already existing */
641 gsi.address = (char *) address->address; 711 gsi.address = (const char *) address->address;
642 gsi.addrlen = address->address_length; 712 gsi.addrlen = address->address_length;
643 gsi.res = NULL; 713 gsi.res = NULL;
644 GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &address->peer.hashPubKey, &get_session_it, &gsi); 714 GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map,
645 if (gsi.res != NULL) 715 &address->peer.hashPubKey,
716 &get_session_it, &gsi);
717 if (NULL != gsi.res)
646 { 718 {
647 LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session\n"); 719 LOG (GNUNET_ERROR_TYPE_DEBUG,
720 "Found existing session\n");
648 return gsi.res; 721 return gsi.res;
649 } 722 }
650 723
651 /* Create a new session */ 724 /* create a new session */
652 s = GNUNET_malloc (sizeof (struct Session) + address->address_length); 725 s = GNUNET_malloc (sizeof (struct Session) + address->address_length);
653 s->addr = &s[1]; 726 s->addr = &s[1];
654 s->addrlen = address->address_length; 727 s->addrlen = address->address_length;
655 s->plugin = plugin; 728 s->plugin = plugin;
656 memcpy(s->addr, address->address, s->addrlen); 729 memcpy (s->addr, address->address, s->addrlen);
657 memcpy(&s->target, &address->peer, sizeof (struct GNUNET_PeerIdentity)); 730 memcpy (&s->target, &address->peer, sizeof (struct GNUNET_PeerIdentity));
658 731
659 start_session_timeout (s); 732 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
660 733 s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
661 GNUNET_CONTAINER_multihashmap_put (plugin->session_map, 734 &session_timeout,
662 &address->peer.hashPubKey, s, 735 s);
663 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 736 LOG (GNUNET_ERROR_TYPE_DEBUG,
664 737 "Creating a new session %p with timeout set to %s\n",
665 GNUNET_STATISTICS_set(plugin->env->stats, 738 s,
666 "# UNIX sessions active", 739 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
667 GNUNET_CONTAINER_multihashmap_size(plugin->session_map), 740 GNUNET_YES));
668 GNUNET_NO); 741 (void) GNUNET_CONTAINER_multihashmap_put (plugin->session_map,
669 LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating new session\n"); 742 &address->peer.hashPubKey, s,
743 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
744 GNUNET_STATISTICS_set (plugin->env->stats,
745 "# UNIX sessions active",
746 GNUNET_CONTAINER_multihashmap_size (plugin->session_map),
747 GNUNET_NO);
670 return s; 748 return s;
671} 749}
672 750
673/*
674 * @param cls the plugin handle
675 * @param tc the scheduling context (for rescheduling this function again)
676 *
677 * We have been notified that our writeset has something to read. We don't
678 * know which socket needs to be read, so we have to check each one
679 * Then reschedule this function to be called again once more is available.
680 *
681 */
682static void
683unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
684 751
685/** 752/**
686 * Function that can be used by the transport service to transmit 753 * Function that can be used by the transport service to transmit
@@ -722,24 +789,26 @@ unix_plugin_send (void *cls,
722 struct UNIXMessage *message; 789 struct UNIXMessage *message;
723 int ssize; 790 int ssize;
724 791
725 GNUNET_assert (plugin != NULL); 792 GNUNET_assert (NULL != plugin);
726 GNUNET_assert (session != NULL); 793 GNUNET_assert (NULL != session);
727 794
728 if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_contains_value(plugin->session_map, 795 if (GNUNET_OK !=
729 &session->target.hashPubKey, session)) 796 GNUNET_CONTAINER_multihashmap_contains_value (plugin->session_map,
797 &session->target.hashPubKey,
798 session))
730 { 799 {
731 LOG (GNUNET_ERROR_TYPE_ERROR, "Invalid session for peer `%s' `%s'\n", 800 LOG (GNUNET_ERROR_TYPE_ERROR,
732 GNUNET_i2s (&session->target), 801 "Invalid session for peer `%s' `%s'\n",
733 (char *) session->addr); 802 GNUNET_i2s (&session->target),
803 (const char *) session->addr);
734 GNUNET_break (0); 804 GNUNET_break (0);
735
736 return GNUNET_SYSERR; 805 return GNUNET_SYSERR;
737 } 806 }
738 LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending %u bytes with session for peer `%s' `%s'\n", 807 LOG (GNUNET_ERROR_TYPE_DEBUG,
739 msgbuf_size, 808 "Sending %u bytes with session for peer `%s' `%s'\n",
740 GNUNET_i2s (&session->target), 809 msgbuf_size,
741 (char *) session->addr); 810 GNUNET_i2s (&session->target),
742 811 (const char *) session->addr);
743 ssize = sizeof (struct UNIXMessage) + msgbuf_size; 812 ssize = sizeof (struct UNIXMessage) + msgbuf_size;
744 message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size); 813 message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size);
745 message->header.size = htons (ssize); 814 message->header.size = htons (ssize);
@@ -747,9 +816,7 @@ unix_plugin_send (void *cls,
747 memcpy (&message->sender, plugin->env->my_identity, 816 memcpy (&message->sender, plugin->env->my_identity,
748 sizeof (struct GNUNET_PeerIdentity)); 817 sizeof (struct GNUNET_PeerIdentity));
749 memcpy (&message[1], msgbuf, msgbuf_size); 818 memcpy (&message[1], msgbuf, msgbuf_size);
750
751 reschedule_session_timeout (session); 819 reschedule_session_timeout (session);
752
753 wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper)); 820 wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper));
754 wrapper->msg = message; 821 wrapper->msg = message;
755 wrapper->msgsize = ssize; 822 wrapper->msgsize = ssize;
@@ -759,20 +826,16 @@ unix_plugin_send (void *cls,
759 wrapper->cont = cont; 826 wrapper->cont = cont;
760 wrapper->cont_cls = cont_cls; 827 wrapper->cont_cls = cont_cls;
761 wrapper->session = session; 828 wrapper->session = session;
762 829 GNUNET_CONTAINER_DLL_insert (plugin->msg_head,
763 GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, wrapper); 830 plugin->msg_tail,
764 831 wrapper);
765 plugin->bytes_in_queue += ssize; 832 plugin->bytes_in_queue += ssize;
766 GNUNET_STATISTICS_set (plugin->env->stats,"# bytes currently in UNIX buffers", 833 GNUNET_STATISTICS_set (plugin->env->stats,
767 plugin->bytes_in_queue, GNUNET_NO); 834 "# bytes currently in UNIX buffers",
768 835 plugin->bytes_in_queue,
769 LOG (GNUNET_ERROR_TYPE_DEBUG, "Sent %d bytes to `%s'\n", ssize, 836 GNUNET_NO);
770 (char *) session->addr); 837 if (GNUNET_NO == plugin->with_ws)
771 if (plugin->with_ws == GNUNET_NO)
772 {
773 reschedule_select (plugin); 838 reschedule_select (plugin);
774 }
775
776 return ssize; 839 return ssize;
777} 840}
778 841
@@ -798,16 +861,18 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender,
798 861
799 GNUNET_assert (fromlen >= sizeof (struct sockaddr_un)); 862 GNUNET_assert (fromlen >= sizeof (struct sockaddr_un));
800 863
801 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from %s\n", 864 LOG (GNUNET_ERROR_TYPE_DEBUG,
802 un->sun_path); 865 "Received message from %s\n",
803 866 un->sun_path);
804 867 GNUNET_STATISTICS_update (plugin->env->stats,
805 868 "# bytes received via UNIX",
806 plugin->bytes_in_recv += ntohs(currhdr->size); 869 ntohs (currhdr->size),
807 GNUNET_STATISTICS_set (plugin->env->stats,"# bytes received via UNIX", 870 GNUNET_NO);
808 plugin->bytes_in_recv, GNUNET_NO); 871
809 872 addr = GNUNET_HELLO_address_allocate (sender,
810 addr = GNUNET_HELLO_address_allocate(sender, "unix", un->sun_path, strlen (un->sun_path) + 1); 873 "unix",
874 un->sun_path,
875 strlen (un->sun_path) + 1);
811 s = lookup_session (plugin, sender, un); 876 s = lookup_session (plugin, sender, un);
812 if (NULL == s) 877 if (NULL == s)
813 s = unix_plugin_get_session (plugin, addr); 878 s = unix_plugin_get_session (plugin, addr);
@@ -827,8 +892,13 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender,
827} 892}
828 893
829 894
895/**
896 * Read from UNIX domain socket (it is ready).
897 *
898 * @param plugin the plugin
899 */
830static void 900static void
831unix_plugin_select_read (struct Plugin * plugin) 901unix_plugin_select_read (struct Plugin *plugin)
832{ 902{
833 char buf[65536] GNUNET_ALIGN; 903 char buf[65536] GNUNET_ALIGN;
834 struct UNIXMessage *msg; 904 struct UNIXMessage *msg;
@@ -889,44 +959,48 @@ unix_plugin_select_read (struct Plugin * plugin)
889 GNUNET_break_op (0); 959 GNUNET_break_op (0);
890 break; 960 break;
891 } 961 }
892
893 unix_demultiplexer (plugin, &sender, currhdr, &un, sizeof (un)); 962 unix_demultiplexer (plugin, &sender, currhdr, &un, sizeof (un));
894 offset += csize; 963 offset += csize;
895 } 964 }
896} 965}
897 966
898 967
968/**
969 * Write to UNIX domain socket (it is ready).
970 *
971 * @param plugin the plugin
972 */
899static void 973static void
900unix_plugin_select_write (struct Plugin * plugin) 974unix_plugin_select_write (struct Plugin *plugin)
901{ 975{
902 int sent = 0; 976 int sent = 0;
977 struct UNIXMessageWrapper * msgw;
903 978
904 struct UNIXMessageWrapper * msgw = plugin->msg_tail; 979 while (NULL != (msgw = plugin->msg_tail))
905 while (NULL != msgw)
906 { 980 {
907 if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value > 0) 981 if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value > 0)
908 break; /* Message is ready for sending */ 982 break; /* Message is ready for sending */
909 else 983 /* Message has a timeout */
910 { 984 LOG (GNUNET_ERROR_TYPE_DEBUG,
911 /* Message has a timeout */ 985 "Timeout for message with %u bytes \n",
912 LOG (GNUNET_ERROR_TYPE_DEBUG, 986 (unsigned int) msgw->msgsize);
913 "Timeout for message with %llu bytes \n", msgw->msgsize); 987 GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw);
914 GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw); 988 plugin->bytes_in_queue -= msgw->msgsize;
915 if (NULL != msgw->cont) 989 GNUNET_STATISTICS_set (plugin->env->stats,
916 msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR, msgw->payload, 0); 990 "# bytes currently in UNIX buffers",
917 991 plugin->bytes_in_queue, GNUNET_NO);
918 plugin->bytes_in_queue -= msgw->msgsize; 992 GNUNET_STATISTICS_update (plugin->env->stats,
919 GNUNET_STATISTICS_set (plugin->env->stats, "# bytes currently in UNIX buffers", 993 "# UNIX bytes discarded",
920 plugin->bytes_in_queue, GNUNET_NO); 994 msgw->msgsize,
921 995 GNUNET_NO);
922 plugin->bytes_discarded += msgw->msgsize; 996 if (NULL != msgw->cont)
923 GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes discarded", 997 msgw->cont (msgw->cont_cls,
924 plugin->bytes_discarded, GNUNET_NO); 998 &msgw->session->target,
925 999 GNUNET_SYSERR,
926 GNUNET_free (msgw->msg); 1000 msgw->payload,
927 GNUNET_free (msgw); 1001 0);
928 } 1002 GNUNET_free (msgw->msg);
929 msgw = plugin->msg_tail; 1003 GNUNET_free (msgw);
930 } 1004 }
931 if (NULL == msgw) 1005 if (NULL == msgw)
932 return; /* Nothing to send at the moment */ 1006 return; /* Nothing to send at the moment */
@@ -945,11 +1019,12 @@ unix_plugin_select_write (struct Plugin * plugin)
945 1019
946 if (RETRY == sent) 1020 if (RETRY == sent)
947 { 1021 {
948 GNUNET_STATISTICS_update (plugin->env->stats,"# UNIX retry attempts", 1022 GNUNET_STATISTICS_update (plugin->env->stats,
949 1, GNUNET_NO); 1023 "# UNIX retry attempts",
950 1024 1, GNUNET_NO);
1025 return;
951 } 1026 }
952 else if (GNUNET_SYSERR == sent) 1027 if (GNUNET_SYSERR == sent)
953 { 1028 {
954 /* failed and no retry */ 1029 /* failed and no retry */
955 if (NULL != msgw->cont) 1030 if (NULL != msgw->cont)
@@ -959,36 +1034,40 @@ unix_plugin_select_write (struct Plugin * plugin)
959 1034
960 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); 1035 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
961 plugin->bytes_in_queue -= msgw->msgsize; 1036 plugin->bytes_in_queue -= msgw->msgsize;
962 GNUNET_STATISTICS_set (plugin->env->stats, "# bytes currently in UNIX buffers", 1037 GNUNET_STATISTICS_set (plugin->env->stats,
963 plugin->bytes_in_queue, GNUNET_NO); 1038 "# bytes currently in UNIX buffers",
964 plugin->bytes_discarded += msgw->msgsize; 1039 plugin->bytes_in_queue, GNUNET_NO);
965 GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes discarded", 1040 GNUNET_STATISTICS_update (plugin->env->stats,
966 plugin->bytes_discarded, GNUNET_NO); 1041 "# UNIX bytes discarded",
967 1042 msgw->msgsize,
968 GNUNET_free (msgw->msg); 1043 GNUNET_NO);
969 GNUNET_free (msgw);
970 return;
971 }
972 else if (sent > 0)
973 {
974 /* successfully sent bytes */
975 if (NULL != msgw->cont)
976 msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_OK, msgw->payload, msgw->msgsize);
977
978 GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
979
980 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
981 plugin->bytes_in_queue -= msgw->msgsize;
982 GNUNET_STATISTICS_set (plugin->env->stats,"# bytes currently in UNIX buffers",
983 plugin->bytes_in_queue, GNUNET_NO);
984 plugin->bytes_in_sent += msgw->msgsize;
985 GNUNET_STATISTICS_set (plugin->env->stats,"# bytes transmitted via UNIX",
986 plugin->bytes_in_sent, GNUNET_NO);
987 1044
988 GNUNET_free (msgw->msg); 1045 GNUNET_free (msgw->msg);
989 GNUNET_free (msgw); 1046 GNUNET_free (msgw);
990 return; 1047 return;
991 } 1048 }
1049 /* successfully sent bytes */
1050 GNUNET_break (sent > 0);
1051 GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
1052 plugin->msg_tail,
1053 msgw);
1054 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
1055 plugin->bytes_in_queue -= msgw->msgsize;
1056 GNUNET_STATISTICS_set (plugin->env->stats,
1057 "# bytes currently in UNIX buffers",
1058 plugin->bytes_in_queue,
1059 GNUNET_NO);
1060 GNUNET_STATISTICS_update (plugin->env->stats,
1061 "# bytes transmitted via UNIX",
1062 msgw->msgsize,
1063 GNUNET_NO);
1064 if (NULL != msgw->cont)
1065 msgw->cont (msgw->cont_cls, &msgw->session->target,
1066 GNUNET_OK,
1067 msgw->payload,
1068 msgw->msgsize);
1069 GNUNET_free (msgw->msg);
1070 GNUNET_free (msgw);
992} 1071}
993 1072
994 1073
@@ -1001,7 +1080,8 @@ unix_plugin_select_write (struct Plugin * plugin)
1001 * @param tc the scheduling context (for rescheduling this function again) 1080 * @param tc the scheduling context (for rescheduling this function again)
1002 */ 1081 */
1003static void 1082static void
1004unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 1083unix_plugin_select (void *cls,
1084 const struct GNUNET_SCHEDULER_TaskContext *tc)
1005{ 1085{
1006 struct Plugin *plugin = cls; 1086 struct Plugin *plugin = cls;
1007 1087
@@ -1014,7 +1094,7 @@ unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1014 /* Ready to send data */ 1094 /* Ready to send data */
1015 GNUNET_assert (GNUNET_NETWORK_fdset_isset 1095 GNUNET_assert (GNUNET_NETWORK_fdset_isset
1016 (tc->write_ready, plugin->unix_sock.desc)); 1096 (tc->write_ready, plugin->unix_sock.desc));
1017 if (plugin->msg_head != NULL) 1097 if (NULL != plugin->msg_head)
1018 unix_plugin_select_write (plugin); 1098 unix_plugin_select_write (plugin);
1019 } 1099 }
1020 1100
@@ -1025,7 +1105,6 @@ unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1025 (tc->read_ready, plugin->unix_sock.desc)); 1105 (tc->read_ready, plugin->unix_sock.desc));
1026 unix_plugin_select_read (plugin); 1106 unix_plugin_select_read (plugin);
1027 } 1107 }
1028
1029 reschedule_select (plugin); 1108 reschedule_select (plugin);
1030} 1109}
1031 1110
@@ -1221,10 +1300,12 @@ unix_address_to_string (void *cls, const void *addr, size_t addrlen)
1221 * @param tc unused 1300 * @param tc unused
1222 */ 1301 */
1223static void 1302static void
1224address_notification (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 1303address_notification (void *cls,
1304 const struct GNUNET_SCHEDULER_TaskContext *tc)
1225{ 1305{
1226 struct Plugin *plugin = cls; 1306 struct Plugin *plugin = cls;
1227 1307
1308 plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK;
1228 plugin->env->notify_address (plugin->env->cls, GNUNET_YES, 1309 plugin->env->notify_address (plugin->env->cls, GNUNET_YES,
1229 plugin->unix_socket_path, 1310 plugin->unix_socket_path,
1230 strlen (plugin->unix_socket_path) + 1, 1311 strlen (plugin->unix_socket_path) + 1,
@@ -1233,80 +1314,71 @@ address_notification (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1233 1314
1234 1315
1235/** 1316/**
1236 * Session was idle, so disconnect it 1317 * Increment session timeout due to activity
1237 */ 1318 *
1238static void 1319 * @param s session for which the timeout should be moved
1239session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1240{
1241 GNUNET_assert (NULL != cls);
1242 struct Session *s = cls;
1243
1244 s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1245 LOG (GNUNET_ERROR_TYPE_DEBUG,
1246 "Session %p was idle for %llu ms, disconnecting\n",
1247 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1248 /* call session destroy function */
1249 disconnect_session(s);
1250}
1251
1252
1253/**
1254 * Start session timeout
1255 */ 1320 */
1256static void 1321static void
1257start_session_timeout (struct Session *s) 1322reschedule_session_timeout (struct Session *s)
1258{ 1323{
1259 GNUNET_assert (NULL != s); 1324 GNUNET_assert (NULL != s);
1260 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task); 1325 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
1326 GNUNET_SCHEDULER_cancel (s->timeout_task);
1261 s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 1327 s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1262 &session_timeout, 1328 &session_timeout,
1263 s); 1329 s);
1264 LOG (GNUNET_ERROR_TYPE_DEBUG, 1330 LOG (GNUNET_ERROR_TYPE_DEBUG,
1265 "Timeout for session %p set to %llu ms\n", 1331 "Timeout rescheduled for session %p set to %s\n",
1266 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); 1332 s,
1333 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1334 GNUNET_YES));
1267} 1335}
1268 1336
1269 1337
1270/** 1338/**
1271 * Increment session timeout due to activity 1339 * Function called on sessions to disconnect
1340 *
1341 * @param cls the plugin (unused)
1342 * @param key peer identity (unused)
1343 * @param value the 'struct Session' to disconnect
1344 * @return GNUNET_YES (always, continue to iterate)
1272 */ 1345 */
1273static void 1346static int
1274reschedule_session_timeout (struct Session *s) 1347get_session_delete_it (void *cls, const struct GNUNET_HashCode * key, void *value)
1275{ 1348{
1276 GNUNET_assert (NULL != s); 1349 struct Session *s = value;
1277 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
1278 1350
1279 GNUNET_SCHEDULER_cancel (s->timeout_task); 1351 disconnect_session (s);
1280 s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 1352 return GNUNET_YES;
1281 &session_timeout,
1282 s);
1283 LOG (GNUNET_ERROR_TYPE_DEBUG,
1284 "Timeout rescheduled for session %p set to %llu ms\n",
1285 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1286} 1353}
1287 1354
1288 1355
1289/** 1356/**
1290 * Cancel timeout 1357 * Disconnect from a remote node. Clean up session if we have one for this peer
1358 *
1359 * @param cls closure for this call (should be handle to Plugin)
1360 * @param target the peeridentity of the peer to disconnect
1361 * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed
1291 */ 1362 */
1292static void 1363static void
1293stop_session_timeout (struct Session *s) 1364unix_disconnect (void *cls,
1365 const struct GNUNET_PeerIdentity *target)
1294{ 1366{
1295 GNUNET_assert (NULL != s); 1367 struct Plugin *plugin = cls;
1296 1368
1297 if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task) 1369 GNUNET_assert (plugin != NULL);
1298 { 1370 GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map,
1299 GNUNET_SCHEDULER_cancel (s->timeout_task); 1371 &target->hashPubKey,
1300 s->timeout_task = GNUNET_SCHEDULER_NO_TASK; 1372 &get_session_delete_it, plugin);
1301 LOG (GNUNET_ERROR_TYPE_DEBUG,
1302 "Timeout stopped for session %p canceled\n",
1303 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
1304 }
1305} 1373}
1306 1374
1375
1307/** 1376/**
1308 * The exported method. Makes the core api available via a global and 1377 * The exported method. Initializes the plugin and returns a
1309 * returns the unix transport API. 1378 * struct with the callbacks.
1379 *
1380 * @param cls the plugin's execution environment
1381 * @return NULL on error, plugin functions otherwise
1310 */ 1382 */
1311void * 1383void *
1312libgnunet_plugin_transport_unix_init (void *cls) 1384libgnunet_plugin_transport_unix_init (void *cls)
@@ -1335,7 +1407,8 @@ libgnunet_plugin_transport_unix_init (void *cls)
1335 plugin = GNUNET_malloc (sizeof (struct Plugin)); 1407 plugin = GNUNET_malloc (sizeof (struct Plugin));
1336 plugin->port = port; 1408 plugin->port = port;
1337 plugin->env = env; 1409 plugin->env = env;
1338 GNUNET_asprintf (&plugin->unix_socket_path, "/tmp/unix-plugin-sock.%d", 1410 GNUNET_asprintf (&plugin->unix_socket_path,
1411 "/tmp/unix-plugin-sock.%d",
1339 plugin->port); 1412 plugin->port);
1340 1413
1341 api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions)); 1414 api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
@@ -1349,39 +1422,67 @@ libgnunet_plugin_transport_unix_init (void *cls)
1349 api->check_address = &unix_check_address; 1422 api->check_address = &unix_check_address;
1350 api->string_to_address = &unix_string_to_address; 1423 api->string_to_address = &unix_string_to_address;
1351 sockets_created = unix_transport_server_start (plugin); 1424 sockets_created = unix_transport_server_start (plugin);
1352 if (sockets_created == 0) 1425 if (0 == sockets_created)
1353 LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UNIX sockets\n")); 1426 LOG (GNUNET_ERROR_TYPE_WARNING,
1354 1427 _("Failed to open UNIX listen socket\n"));
1355 plugin->session_map = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO); 1428 plugin->session_map = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
1356 1429 plugin->address_update_task = GNUNET_SCHEDULER_add_now (&address_notification, plugin);
1357 GNUNET_SCHEDULER_add_now (address_notification, plugin);
1358 return api; 1430 return api;
1359} 1431}
1360 1432
1433
1434/**
1435 * Shutdown the plugin.
1436 *
1437 * @param cls the plugin API returned from the initialization function
1438 * @return NULL (always)
1439 */
1361void * 1440void *
1362libgnunet_plugin_transport_unix_done (void *cls) 1441libgnunet_plugin_transport_unix_done (void *cls)
1363{ 1442{
1364 struct GNUNET_TRANSPORT_PluginFunctions *api = cls; 1443 struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
1365 struct Plugin *plugin = api->cls; 1444 struct Plugin *plugin = api->cls;
1445 struct UNIXMessageWrapper * msgw;
1366 1446
1367 if (NULL == plugin) 1447 if (NULL == plugin)
1368 { 1448 {
1369 GNUNET_free (api); 1449 GNUNET_free (api);
1370 return NULL; 1450 return NULL;
1371 } 1451 }
1372
1373 plugin->env->notify_address (plugin->env->cls, GNUNET_NO, 1452 plugin->env->notify_address (plugin->env->cls, GNUNET_NO,
1374 plugin->unix_socket_path, 1453 plugin->unix_socket_path,
1375 strlen (plugin->unix_socket_path) + 1, 1454 strlen (plugin->unix_socket_path) + 1,
1376 "unix"); 1455 "unix");
1456 while (NULL != (msgw = plugin->msg_head))
1457 {
1458 GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw);
1459 if (msgw->cont != NULL)
1460 msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR,
1461 msgw->payload, 0);
1462 GNUNET_free (msgw->msg);
1463 GNUNET_free (msgw);
1464 }
1377 1465
1378 unix_transport_server_stop (plugin); 1466 if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task)
1379 1467 {
1380 1468 GNUNET_SCHEDULER_cancel (plugin->select_task);
1381 GNUNET_CONTAINER_multihashmap_iterate (plugin->session_map, &get_session_delete_it, plugin); 1469 plugin->select_task = GNUNET_SCHEDULER_NO_TASK;
1470 }
1471 if (GNUNET_SCHEDULER_NO_TASK != plugin->address_update_task)
1472 {
1473 GNUNET_SCHEDULER_cancel (plugin->address_update_task);
1474 plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK;
1475 }
1476 if (NULL != plugin->unix_sock.desc)
1477 {
1478 GNUNET_break (GNUNET_OK ==
1479 GNUNET_NETWORK_socket_close (plugin->unix_sock.desc));
1480 plugin->unix_sock.desc = NULL;
1481 plugin->with_ws = GNUNET_NO;
1482 }
1483 GNUNET_CONTAINER_multihashmap_iterate (plugin->session_map,
1484 &get_session_delete_it, plugin);
1382 GNUNET_CONTAINER_multihashmap_destroy (plugin->session_map); 1485 GNUNET_CONTAINER_multihashmap_destroy (plugin->session_map);
1383
1384
1385 if (NULL != plugin->rs) 1486 if (NULL != plugin->rs)
1386 GNUNET_NETWORK_fdset_destroy (plugin->rs); 1487 GNUNET_NETWORK_fdset_destroy (plugin->rs);
1387 if (NULL != plugin->ws) 1488 if (NULL != plugin->ws)