aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2014-06-22 22:25:02 +0000
committerChristian Grothoff <christian@grothoff.org>2014-06-22 22:25:02 +0000
commit705ce0c1e6b6e8fd877f94c6bad977718db4b647 (patch)
tree9ede1e28fc2f6cf4d0af9ba09fa49cc622638ebd
parentd28cb8e55ebe88deccac1a27cfcdbdd71516cc47 (diff)
downloadgnunet-705ce0c1e6b6e8fd877f94c6bad977718db4b647.tar.gz
gnunet-705ce0c1e6b6e8fd877f94c6bad977718db4b647.zip
implement monitoring API in UNIX, simplify code a bit
-rw-r--r--src/transport/plugin_transport_unix.c928
1 files changed, 479 insertions, 449 deletions
diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c
index 881a4b9ff..6b90cf1f4 100644
--- a/src/transport/plugin_transport_unix.c
+++ b/src/transport/plugin_transport_unix.c
@@ -43,11 +43,24 @@
43 */ 43 */
44#define RETRY 0 44#define RETRY 0
45 45
46/**
47 * Name of the plugin.
48 */
46#define PLUGIN_NAME "unix" 49#define PLUGIN_NAME "unix"
47 50
51/**
52 * Options for UNIX Domain addresses.
53 */
48enum UNIX_ADDRESS_OPTIONS 54enum UNIX_ADDRESS_OPTIONS
49{ 55{
56 /**
57 * No special options.
58 */
50 UNIX_OPTIONS_NONE = 0, 59 UNIX_OPTIONS_NONE = 0,
60
61 /**
62 * Linux abstract domain sockets should be used.
63 */
51 UNIX_OPTIONS_USE_ABSTRACT_SOCKETS = 1 64 UNIX_OPTIONS_USE_ABSTRACT_SOCKETS = 1
52}; 65};
53 66
@@ -100,38 +113,9 @@ struct UNIXMessage
100 113
101GNUNET_NETWORK_STRUCT_END 114GNUNET_NETWORK_STRUCT_END
102 115
103/**
104 * Handle for a session.
105 */
106struct Session
107{
108 /**
109 * To whom are we talking to (set to our identity
110 * if we are still waiting for the welcome message).
111 *
112 * FIXME: information duplicated with 'peer' in address!
113 */
114 struct GNUNET_PeerIdentity target;
115
116 /**
117 * Pointer to the global plugin struct.
118 */
119 struct Plugin *plugin;
120
121 /**
122 * Address of the other peer.
123 */
124 struct GNUNET_HELLO_Address *address;
125
126 /**
127 * Session timeout task
128 */
129 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
130};
131
132 116
133/** 117/**
134 * 118 * Information we track for a message awaiting transmission.
135 */ 119 */
136struct UNIXMessageWrapper 120struct UNIXMessageWrapper
137{ 121{
@@ -188,6 +172,63 @@ struct UNIXMessageWrapper
188 172
189 173
190/** 174/**
175 * Handle for a session.
176 */
177struct Session
178{
179
180 /**
181 * Sessions with pending messages (!) are kept in a DLL.
182 */
183 struct Session *next;
184
185 /**
186 * Sessions with pending messages (!) are kept in a DLL.
187 */
188 struct Session *prev;
189
190 /**
191 * To whom are we talking to (set to our identity
192 * if we are still waiting for the welcome message).
193 *
194 * FIXME: information duplicated with 'peer' in address!
195 */
196 struct GNUNET_PeerIdentity target;
197
198 /**
199 * Pointer to the global plugin struct.
200 */
201 struct Plugin *plugin;
202
203 /**
204 * Address of the other peer.
205 */
206 struct GNUNET_HELLO_Address *address;
207
208 /**
209 * Number of bytes we currently have in our write queue.
210 */
211 unsigned long long bytes_in_queue;
212
213 /**
214 * Timeout for this session.
215 */
216 struct GNUNET_TIME_Absolute timeout;
217
218 /**
219 * Session timeout task.
220 */
221 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
222
223 /**
224 * Number of messages we currently have in our write queue.
225 */
226 unsigned int msgs_in_queue;
227
228};
229
230
231/**
191 * Encapsulation of all of the state of the plugin. 232 * Encapsulation of all of the state of the plugin.
192 */ 233 */
193struct Plugin; 234struct Plugin;
@@ -224,12 +265,17 @@ struct Plugin
224 GNUNET_SCHEDULER_TaskIdentifier address_update_task; 265 GNUNET_SCHEDULER_TaskIdentifier address_update_task;
225 266
226 /** 267 /**
227 * ID of select task 268 * ID of read task
228 */ 269 */
229 GNUNET_SCHEDULER_TaskIdentifier select_task; 270 GNUNET_SCHEDULER_TaskIdentifier read_task;
230 271
231 /** 272 /**
232 * Number of bytes we currently have in our write queue. 273 * ID of write task
274 */
275 GNUNET_SCHEDULER_TaskIdentifier write_task;
276
277 /**
278 * Number of bytes we currently have in our write queues.
233 */ 279 */
234 unsigned long long bytes_in_queue; 280 unsigned long long bytes_in_queue;
235 281
@@ -244,14 +290,14 @@ struct Plugin
244 struct GNUNET_CONTAINER_MultiPeerMap *session_map; 290 struct GNUNET_CONTAINER_MultiPeerMap *session_map;
245 291
246 /** 292 /**
247 * FD Read set 293 * Head of queue of messages to transmit.
248 */ 294 */
249 struct GNUNET_NETWORK_FDSet *rs; 295 struct UNIXMessageWrapper *msg_head;
250 296
251 /** 297 /**
252 * FD Write set 298 * Tail of queue of messages to transmit.
253 */ 299 */
254 struct GNUNET_NETWORK_FDSet *ws; 300 struct UNIXMessageWrapper *msg_tail;
255 301
256 /** 302 /**
257 * Path of our unix domain socket (/tmp/unix-plugin-PORT) 303 * Path of our unix domain socket (/tmp/unix-plugin-PORT)
@@ -259,16 +305,6 @@ struct Plugin
259 char *unix_socket_path; 305 char *unix_socket_path;
260 306
261 /** 307 /**
262 * Head of queue of messages to transmit.
263 */
264 struct UNIXMessageWrapper *msg_head;
265
266 /**
267 * Tail of queue of messages to transmit.
268 */
269 struct UNIXMessageWrapper *msg_tail;
270
271 /**
272 * Function to call about session status changes. 308 * Function to call about session status changes.
273 */ 309 */
274 GNUNET_TRANSPORT_SessionInfoCallback sic; 310 GNUNET_TRANSPORT_SessionInfoCallback sic;
@@ -294,74 +330,14 @@ struct Plugin
294 struct GNUNET_ATS_Information ats_network; 330 struct GNUNET_ATS_Information ats_network;
295 331
296 /** 332 /**
297 * Is the write set in the current 'select' task? #GNUNET_NO if the
298 * write queue was empty when the main task was scheduled,
299 * #GNUNET_YES if we're already waiting for being allowed to write.
300 */
301 int with_ws;
302
303 /**
304 * Are we using an abstract UNIX domain socket? 333 * Are we using an abstract UNIX domain socket?
305 */ 334 */
306 int abstract; 335 int is_abstract;
307 336
308}; 337};
309 338
310 339
311/** 340/**
312 * Increment session timeout due to activity
313 *
314 * @param s session for which the timeout should be moved
315 */
316static void
317reschedule_session_timeout (struct Session *s);
318
319
320/**
321 * We have been notified that our writeset has something to read. We don't
322 * know which socket needs to be read, so we have to check each one
323 * Then reschedule this function to be called again once more is available.
324 *
325 * @param cls the plugin handle
326 * @param tc the scheduling context (for rescheduling this function again)
327 */
328static void
329unix_plugin_select (void *cls,
330 const struct GNUNET_SCHEDULER_TaskContext *tc);
331
332
333/**
334 * Convert unix path to a `struct sockaddr_un`
335 *
336 * @param unixpath path to convert
337 * @param sock_len[out] set to the length of the address
338 * @return converted unix path
339 */
340static struct sockaddr_un *
341unix_address_to_sockaddr (const char *unixpath,
342 socklen_t *sock_len)
343{
344 struct sockaddr_un *un;
345 size_t slen;
346
347 GNUNET_assert (0 < strlen (unixpath)); /* sanity check */
348 un = GNUNET_new (struct sockaddr_un);
349 un->sun_family = AF_UNIX;
350 slen = strlen (unixpath);
351 if (slen >= sizeof (un->sun_path))
352 slen = sizeof (un->sun_path) - 1;
353 memcpy (un->sun_path, unixpath, slen);
354 un->sun_path[slen] = '\0';
355 slen = sizeof (struct sockaddr_un);
356#if HAVE_SOCKADDR_IN_SIN_LEN
357 un->sun_len = (u_char) slen;
358#endif
359 (*sock_len) = slen;
360 return un;
361}
362
363
364/**
365 * Function called for a quick conversion of the binary address to 341 * Function called for a quick conversion of the binary address to
366 * a numeric address. Note that the caller must not free the 342 * a numeric address. Note that the caller must not free the
367 * address and that the next call to this function is allowed 343 * address and that the next call to this function is allowed
@@ -424,39 +400,148 @@ unix_address_to_string (void *cls,
424 400
425 401
426/** 402/**
427 * Re-schedule the main 'select' callback (#unix_plugin_select()) 403 * Functions with this signature are called whenever we need
428 * for this plugin. 404 * to close a session due to a disconnect or failure to
405 * establish a connection.
429 * 406 *
430 * @param plugin the plugin context 407 * @param cls closure with the `struct Plugin`
408 * @param session session to close down
409 * @return #GNUNET_OK on success
431 */ 410 */
432static void 411static int
433reschedule_select (struct Plugin *plugin) 412unix_session_disconnect (void *cls,
413 struct Session *session)
434{ 414{
435 if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK) 415 struct Plugin *plugin = cls;
436 { 416 struct UNIXMessageWrapper *msgw;
437 GNUNET_SCHEDULER_cancel (plugin->select_task); 417 struct UNIXMessageWrapper *next;
438 plugin->select_task = GNUNET_SCHEDULER_NO_TASK; 418
439 } 419 LOG (GNUNET_ERROR_TYPE_DEBUG,
440 if (NULL != plugin->msg_head) 420 "Disconnecting session for peer `%s' `%s'\n",
421 GNUNET_i2s (&session->target),
422 unix_address_to_string (NULL,
423 session->address->address,
424 session->address->address_length));
425 plugin->env->session_end (plugin->env->cls,
426 session->address,
427 session);
428 next = plugin->msg_head;
429 while (NULL != next)
441 { 430 {
442 plugin->select_task = 431 msgw = next;
443 GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT, 432 next = msgw->next;
444 GNUNET_TIME_UNIT_FOREVER_REL, 433 if (msgw->session != session)
445 plugin->rs, 434 continue;
446 plugin->ws, 435 GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
447 &unix_plugin_select, plugin); 436 plugin->msg_tail,
448 plugin->with_ws = GNUNET_YES; 437 msgw);
438 session->msgs_in_queue--;
439 GNUNET_assert (session->bytes_in_queue >= msgw->msgsize);
440 session->bytes_in_queue -= msgw->msgsize;
441 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
442 plugin->bytes_in_queue -= msgw->msgsize;
443 if (NULL != msgw->cont)
444 msgw->cont (msgw->cont_cls,
445 &msgw->session->target,
446 GNUNET_SYSERR,
447 msgw->payload, 0);
448 GNUNET_free (msgw->msg);
449 GNUNET_free (msgw);
449 } 450 }
450 else 451 GNUNET_assert (GNUNET_YES ==
452 GNUNET_CONTAINER_multipeermap_remove (plugin->session_map,
453 &session->target,
454 session));
455 GNUNET_STATISTICS_set (plugin->env->stats,
456 "# UNIX sessions active",
457 GNUNET_CONTAINER_multipeermap_size (plugin->session_map),
458 GNUNET_NO);
459 if (GNUNET_SCHEDULER_NO_TASK != session->timeout_task)
451 { 460 {
452 plugin->select_task = 461 GNUNET_SCHEDULER_cancel (session->timeout_task);
453 GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT, 462 session->timeout_task = GNUNET_SCHEDULER_NO_TASK;
454 GNUNET_TIME_UNIT_FOREVER_REL,
455 plugin->rs,
456 NULL,
457 &unix_plugin_select, plugin);
458 plugin->with_ws = GNUNET_NO;
459 } 463 }
464 GNUNET_HELLO_address_free (session->address);
465 GNUNET_break (0 == session->bytes_in_queue);
466 GNUNET_break (0 == session->msgs_in_queue);
467 GNUNET_free (session);
468 return GNUNET_OK;
469}
470
471
472/**
473 * Session was idle for too long, so disconnect it
474 *
475 * @param cls the 'struct Session' to disconnect
476 * @param tc scheduler context
477 */
478static void
479session_timeout (void *cls,
480 const struct GNUNET_SCHEDULER_TaskContext *tc)
481{
482 struct Session *s = cls;
483 struct GNUNET_TIME_Relative left;
484
485 s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
486 left = GNUNET_TIME_absolute_get_remaining (s->timeout);
487 if (0 != left.rel_value_us)
488 {
489 /* not actually our turn yet */
490 s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
491 &session_timeout,
492 s);
493 return;
494 }
495 LOG (GNUNET_ERROR_TYPE_DEBUG,
496 "Session %p was idle for %s, disconnecting\n",
497 s,
498 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
499 GNUNET_YES));
500 unix_session_disconnect (s->plugin, s);
501}
502
503
504/**
505 * Increment session timeout due to activity
506 *
507 * @param s session for which the timeout should be rescheduled
508 */
509static void
510reschedule_session_timeout (struct Session *s)
511{
512 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
513 s->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
514}
515
516
517/**
518 * Convert unix path to a `struct sockaddr_un`
519 *
520 * @param unixpath path to convert
521 * @param sock_len[out] set to the length of the address
522 * @return converted unix path
523 */
524static struct sockaddr_un *
525unix_address_to_sockaddr (const char *unixpath,
526 socklen_t *sock_len)
527{
528 struct sockaddr_un *un;
529 size_t slen;
530
531 GNUNET_assert (0 < strlen (unixpath)); /* sanity check */
532 un = GNUNET_new (struct sockaddr_un);
533 un->sun_family = AF_UNIX;
534 slen = strlen (unixpath);
535 if (slen >= sizeof (un->sun_path))
536 slen = sizeof (un->sun_path) - 1;
537 memcpy (un->sun_path, unixpath, slen);
538 un->sun_path[slen] = '\0';
539 slen = sizeof (struct sockaddr_un);
540#if HAVE_SOCKADDR_IN_SIN_LEN
541 un->sun_len = (u_char) slen;
542#endif
543 (*sock_len) = slen;
544 return un;
460} 545}
461 546
462 547
@@ -525,70 +610,6 @@ lookup_session (struct Plugin *plugin,
525 610
526 611
527/** 612/**
528 * Functions with this signature are called whenever we need
529 * to close a session due to a disconnect or failure to
530 * establish a connection.
531 *
532 * @param cls closure with the `struct Plugin`
533 * @param s session to close down
534 * @return #GNUNET_OK on success
535 */
536static int
537unix_session_disconnect (void *cls,
538 struct Session *s)
539{
540 struct Plugin *plugin = cls;
541 struct UNIXMessageWrapper *msgw;
542 struct UNIXMessageWrapper *next;
543 int removed;
544
545 LOG (GNUNET_ERROR_TYPE_DEBUG,
546 "Disconnecting session for peer `%s' `%s'\n",
547 GNUNET_i2s (&s->target),
548 unix_address_to_string (NULL,
549 s->address->address,
550 s->address->address_length));
551 plugin->env->session_end (plugin->env->cls, s->address, s);
552 removed = GNUNET_NO;
553 next = plugin->msg_head;
554 while (NULL != next)
555 {
556 msgw = next;
557 next = msgw->next;
558 if (msgw->session != s)
559 continue;
560 GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
561 plugin->msg_tail,
562 msgw);
563 if (NULL != msgw->cont)
564 msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR,
565 msgw->payload, 0);
566 GNUNET_free (msgw->msg);
567 GNUNET_free (msgw);
568 removed = GNUNET_YES;
569 }
570 GNUNET_assert (GNUNET_YES ==
571 GNUNET_CONTAINER_multipeermap_remove (plugin->session_map,
572 &s->target,
573 s));
574 GNUNET_STATISTICS_set (plugin->env->stats,
575 "# UNIX sessions active",
576 GNUNET_CONTAINER_multipeermap_size (plugin->session_map),
577 GNUNET_NO);
578 if ((GNUNET_YES == removed) && (NULL == plugin->msg_head))
579 reschedule_select (plugin);
580 if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
581 {
582 GNUNET_SCHEDULER_cancel (s->timeout_task);
583 s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
584 }
585 GNUNET_HELLO_address_free (s->address);
586 GNUNET_free (s);
587 return GNUNET_OK;
588}
589
590
591/**
592 * Function that is called to get the keepalive factor. 613 * Function that is called to get the keepalive factor.
593 * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to 614 * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
594 * calculate the interval between keepalive packets. 615 * calculate the interval between keepalive packets.
@@ -611,7 +632,7 @@ unix_query_keepalive_factor (void *cls)
611 * @param send_handle which handle to send message on 632 * @param send_handle which handle to send message on
612 * @param target who should receive this message (ignored by UNIX) 633 * @param target who should receive this message (ignored by UNIX)
613 * @param msgbuf one or more GNUNET_MessageHeader(s) strung together 634 * @param msgbuf one or more GNUNET_MessageHeader(s) strung together
614 * @param msgbuf_size the size of the msgbuf to send 635 * @param msgbuf_size the size of the @a msgbuf to send
615 * @param priority how important is the message (ignored by UNIX) 636 * @param priority how important is the message (ignored by UNIX)
616 * @param timeout when should we time out (give up) if we can not transmit? 637 * @param timeout when should we time out (give up) if we can not transmit?
617 * @param addr the addr to send the message to, needs to be a sockaddr for us 638 * @param addr the addr to send the message to, needs to be a sockaddr for us
@@ -644,7 +665,6 @@ unix_real_send (void *cls,
644 socklen_t un_len; 665 socklen_t un_len;
645 const char *unixpath; 666 const char *unixpath;
646 667
647 GNUNET_assert (NULL != plugin);
648 if (NULL == send_handle) 668 if (NULL == send_handle)
649 { 669 {
650 GNUNET_break (0); /* We do not have a send handle */ 670 GNUNET_break (0); /* We do not have a send handle */
@@ -665,15 +685,18 @@ unix_real_send (void *cls,
665 return -1; 685 return -1;
666 } 686 }
667 687
668 if ((GNUNET_YES == plugin->abstract) && 688 if ((GNUNET_YES == plugin->is_abstract) &&
669 (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & ntohl(addr->options) )) ) 689 (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & ntohl(addr->options) )) )
670 { 690 {
671 un->sun_path[0] = '\0'; 691 un->sun_path[0] = '\0';
672 } 692 }
673resend: 693resend:
674 /* Send the data */ 694 /* Send the data */
675 sent = GNUNET_NETWORK_socket_sendto (send_handle, msgbuf, msgbuf_size, 695 sent = GNUNET_NETWORK_socket_sendto (send_handle,
676 (const struct sockaddr *) un, un_len); 696 msgbuf,
697 msgbuf_size,
698 (const struct sockaddr *) un,
699 un_len);
677 if (GNUNET_SYSERR == sent) 700 if (GNUNET_SYSERR == sent)
678 { 701 {
679 if ( (EAGAIN == errno) || 702 if ( (EAGAIN == errno) ||
@@ -693,12 +716,15 @@ resend:
693 if (size < msgbuf_size) 716 if (size < msgbuf_size)
694 { 717 {
695 LOG (GNUNET_ERROR_TYPE_DEBUG, 718 LOG (GNUNET_ERROR_TYPE_DEBUG,
696 "Trying to increase socket buffer size from %i to %i for message size %i\n", 719 "Trying to increase socket buffer size from %u to %u for message size %u\n",
697 size, ((msgbuf_size / 1000) + 2) * 1000, msgbuf_size); 720 (unsigned int) size,
721 (unsigned int) ((msgbuf_size / 1000) + 2) * 1000,
722 (unsigned int) msgbuf_size);
698 size = ((msgbuf_size / 1000) + 2) * 1000; 723 size = ((msgbuf_size / 1000) + 2) * 1000;
699 if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt 724 if (GNUNET_OK ==
700 ((struct GNUNET_NETWORK_Handle *) send_handle, SOL_SOCKET, SO_SNDBUF, 725 GNUNET_NETWORK_socket_setsockopt ((struct GNUNET_NETWORK_Handle *) send_handle,
701 &size, sizeof (size))) 726 SOL_SOCKET, SO_SNDBUF,
727 &size, sizeof (size)))
702 goto resend; /* Increased buffer size, retry sending */ 728 goto resend; /* Increased buffer size, retry sending */
703 else 729 else
704 { 730 {
@@ -720,7 +746,7 @@ resend:
720 } 746 }
721 747
722 LOG (GNUNET_ERROR_TYPE_DEBUG, 748 LOG (GNUNET_ERROR_TYPE_DEBUG,
723 "UNIX transmit %u-byte message to %s (%d: %s)\n", 749 "UNIX transmitted %u-byte message to %s (%d: %s)\n",
724 (unsigned int) msgbuf_size, 750 (unsigned int) msgbuf_size,
725 GNUNET_a2s ((const struct sockaddr *)un, un_len), 751 GNUNET_a2s ((const struct sockaddr *)un, un_len),
726 (int) sent, 752 (int) sent,
@@ -731,28 +757,6 @@ resend:
731 757
732 758
733/** 759/**
734 * Session was idle for too long, so disconnect it
735 *
736 * @param cls the 'struct Session' to disconnect
737 * @param tc scheduler context
738 */
739static void
740session_timeout (void *cls,
741 const struct GNUNET_SCHEDULER_TaskContext *tc)
742{
743 struct Session *s = cls;
744
745 s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
746 LOG (GNUNET_ERROR_TYPE_DEBUG,
747 "Session %p was idle for %s, disconnecting\n",
748 s,
749 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
750 GNUNET_YES));
751 unix_session_disconnect (s->plugin, s);
752}
753
754
755/**
756 * Function obtain the network type for a session 760 * Function obtain the network type for a session
757 * 761 *
758 * @param cls closure ('struct Plugin*') 762 * @param cls closure ('struct Plugin*')
@@ -787,9 +791,6 @@ unix_plugin_get_session (void *cls,
787 uint32_t addr_str_len; 791 uint32_t addr_str_len;
788 uint32_t addr_option; 792 uint32_t addr_option;
789 793
790 GNUNET_assert (NULL != plugin);
791 GNUNET_assert (NULL != address);
792
793 ua = (struct UnixAddress *) address->address; 794 ua = (struct UnixAddress *) address->address;
794 if ((NULL == address->address) || (0 == address->address_length) || 795 if ((NULL == address->address) || (0 == address->address_length) ||
795 (sizeof (struct UnixAddress) > address->address_length)) 796 (sizeof (struct UnixAddress) > address->address_length))
@@ -802,7 +803,7 @@ unix_plugin_get_session (void *cls,
802 addr_option = ntohl (ua->options); 803 addr_option = ntohl (ua->options);
803 804
804 if ( (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & addr_option)) && 805 if ( (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & addr_option)) &&
805 (GNUNET_NO == plugin->abstract)) 806 (GNUNET_NO == plugin->is_abstract))
806 { 807 {
807 return NULL; 808 return NULL;
808 } 809 }
@@ -882,101 +883,11 @@ unix_plugin_update_session_timeout (void *cls,
882 GNUNET_CONTAINER_multipeermap_contains_value (plugin->session_map, 883 GNUNET_CONTAINER_multipeermap_contains_value (plugin->session_map,
883 &session->target, 884 &session->target,
884 session)) 885 session))
885 return;
886 reschedule_session_timeout (session);
887}
888
889
890/**
891 * Function that can be used by the transport service to transmit
892 * a message using the plugin. Note that in the case of a
893 * peer disconnecting, the continuation MUST be called
894 * prior to the disconnect notification itself. This function
895 * will be called with this peer's HELLO message to initiate
896 * a fresh connection to another peer.
897 *
898 * @param cls closure
899 * @param session which session must be used
900 * @param msgbuf the message to transmit
901 * @param msgbuf_size number of bytes in @a msgbuf
902 * @param priority how important is the message (most plugins will
903 * ignore message priority and just FIFO)
904 * @param to how long to wait at most for the transmission (does not
905 * require plugins to discard the message after the timeout,
906 * just advisory for the desired delay; most plugins will ignore
907 * this as well)
908 * @param cont continuation to call once the message has
909 * been transmitted (or if the transport is ready
910 * for the next transmission call; or if the
911 * peer disconnected...); can be NULL
912 * @param cont_cls closure for @a cont
913 * @return number of bytes used (on the physical network, with overheads);
914 * -1 on hard errors (i.e. address invalid); 0 is a legal value
915 * and does NOT mean that the message was not transmitted (DV)
916 */
917static ssize_t
918unix_plugin_send (void *cls,
919 struct Session *session,
920 const char *msgbuf,
921 size_t msgbuf_size,
922 unsigned int priority,
923 struct GNUNET_TIME_Relative to,
924 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
925{
926 struct Plugin *plugin = cls;
927 struct UNIXMessageWrapper *wrapper;
928 struct UNIXMessage *message;
929 int ssize;
930
931 if (GNUNET_OK !=
932 GNUNET_CONTAINER_multipeermap_contains_value (plugin->session_map,
933 &session->target,
934 session))
935 { 886 {
936 LOG (GNUNET_ERROR_TYPE_ERROR,
937 "Invalid session for peer `%s' `%s'\n",
938 GNUNET_i2s (&session->target),
939 unix_address_to_string(NULL,
940 session->address->address,
941 session->address->address_length));
942 GNUNET_break (0); 887 GNUNET_break (0);
943 return GNUNET_SYSERR; 888 return;
944 } 889 }
945 LOG (GNUNET_ERROR_TYPE_DEBUG, 890 reschedule_session_timeout (session);
946 "Sending %u bytes with session for peer `%s' `%s'\n",
947 msgbuf_size,
948 GNUNET_i2s (&session->target),
949 unix_address_to_string (NULL,
950 session->address->address,
951 session->address->address_length));
952 ssize = sizeof (struct UNIXMessage) + msgbuf_size;
953 message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size);
954 message->header.size = htons (ssize);
955 message->header.type = htons (0);
956 memcpy (&message->sender, plugin->env->my_identity,
957 sizeof (struct GNUNET_PeerIdentity));
958 memcpy (&message[1], msgbuf, msgbuf_size);
959 wrapper = GNUNET_new (struct UNIXMessageWrapper);
960 wrapper->msg = message;
961 wrapper->msgsize = ssize;
962 wrapper->payload = msgbuf_size;
963 wrapper->priority = priority;
964 wrapper->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
965 to);
966 wrapper->cont = cont;
967 wrapper->cont_cls = cont_cls;
968 wrapper->session = session;
969 GNUNET_CONTAINER_DLL_insert (plugin->msg_head,
970 plugin->msg_tail,
971 wrapper);
972 plugin->bytes_in_queue += ssize;
973 GNUNET_STATISTICS_set (plugin->env->stats,
974 "# bytes currently in UNIX buffers",
975 plugin->bytes_in_queue,
976 GNUNET_NO);
977 if (GNUNET_NO == plugin->with_ws)
978 reschedule_select (plugin);
979 return ssize;
980} 891}
981 892
982 893
@@ -1008,21 +919,30 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender,
1008 GNUNET_NO); 919 GNUNET_NO);
1009 920
1010 /* Look for existing session */ 921 /* Look for existing session */
1011 address = GNUNET_HELLO_address_allocate (sender, PLUGIN_NAME, ua, ua_len, 922 address = GNUNET_HELLO_address_allocate (sender,
1012 GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" sessions */ 923 PLUGIN_NAME,
924 ua, ua_len,
925 GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" sessions */
1013 s = lookup_session (plugin, address); 926 s = lookup_session (plugin, address);
1014
1015 if (NULL == s) 927 if (NULL == s)
1016 { 928 {
1017 s = unix_plugin_get_session (plugin, address); 929 s = unix_plugin_get_session (plugin, address);
1018 /* Notify transport and ATS about new inbound session */ 930 /* Notify transport and ATS about new inbound session */
1019 plugin->env->session_start (NULL, s->address, s, &plugin->ats_network, 1); 931 plugin->env->session_start (NULL,
932 s->address,
933 s,
934 &plugin->ats_network, 1);
1020 } 935 }
1021 GNUNET_HELLO_address_free (address); 936 GNUNET_HELLO_address_free (address);
1022 reschedule_session_timeout (s); 937 reschedule_session_timeout (s);
1023 938
1024 plugin->env->receive (plugin->env->cls, s->address, s, currhdr); 939 plugin->env->receive (plugin->env->cls,
1025 plugin->env->update_address_metrics (plugin->env->cls, s->address, s, 940 s->address,
941 s,
942 currhdr);
943 plugin->env->update_address_metrics (plugin->env->cls,
944 s->address,
945 s,
1026 &plugin->ats_network, 1); 946 &plugin->ats_network, 1);
1027} 947}
1028 948
@@ -1033,7 +953,7 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender,
1033 * @param plugin the plugin 953 * @param plugin the plugin
1034 */ 954 */
1035static void 955static void
1036unix_plugin_select_read (struct Plugin *plugin) 956unix_plugin_do_read (struct Plugin *plugin)
1037{ 957{
1038 char buf[65536] GNUNET_ALIGN; 958 char buf[65536] GNUNET_ALIGN;
1039 struct UnixAddress *ua; 959 struct UnixAddress *ua;
@@ -1052,17 +972,16 @@ unix_plugin_select_read (struct Plugin *plugin)
1052 972
1053 addrlen = sizeof (un); 973 addrlen = sizeof (un);
1054 memset (&un, 0, sizeof (un)); 974 memset (&un, 0, sizeof (un));
1055 975 ret = GNUNET_NETWORK_socket_recvfrom (plugin->unix_sock.desc,
1056 ret = 976 buf, sizeof (buf),
1057 GNUNET_NETWORK_socket_recvfrom (plugin->unix_sock.desc, buf, sizeof (buf), 977 (struct sockaddr *) &un,
1058 (struct sockaddr *) &un, &addrlen); 978 &addrlen);
1059
1060 if ((GNUNET_SYSERR == ret) && ((errno == EAGAIN) || (errno == ENOBUFS))) 979 if ((GNUNET_SYSERR == ret) && ((errno == EAGAIN) || (errno == ENOBUFS)))
1061 return; 980 return;
1062 981 if (GNUNET_SYSERR == ret)
1063 if (ret == GNUNET_SYSERR)
1064 { 982 {
1065 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "recvfrom"); 983 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
984 "recvfrom");
1066 return; 985 return;
1067 } 986 }
1068 else 987 else
@@ -1122,15 +1041,16 @@ unix_plugin_select_read (struct Plugin *plugin)
1122/** 1041/**
1123 * Write to UNIX domain socket (it is ready). 1042 * Write to UNIX domain socket (it is ready).
1124 * 1043 *
1125 * @param plugin the plugin 1044 * @param session session to write data for
1126 */ 1045 */
1127static void 1046static void
1128unix_plugin_select_write (struct Plugin *plugin) 1047unix_plugin_do_write (struct Plugin *plugin)
1129{ 1048{
1130 int sent = 0; 1049 ssize_t sent = 0;
1131 struct UNIXMessageWrapper * msgw; 1050 struct UNIXMessageWrapper *msgw;
1051 struct Session *session;
1132 1052
1133 while (NULL != (msgw = plugin->msg_tail)) 1053 while (NULL != (msgw = plugin->msg_head))
1134 { 1054 {
1135 if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0) 1055 if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0)
1136 break; /* Message is ready for sending */ 1056 break; /* Message is ready for sending */
@@ -1138,11 +1058,19 @@ unix_plugin_select_write (struct Plugin *plugin)
1138 LOG (GNUNET_ERROR_TYPE_DEBUG, 1058 LOG (GNUNET_ERROR_TYPE_DEBUG,
1139 "Timeout for message with %u bytes \n", 1059 "Timeout for message with %u bytes \n",
1140 (unsigned int) msgw->msgsize); 1060 (unsigned int) msgw->msgsize);
1141 GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw); 1061 GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
1062 plugin->msg_tail,
1063 msgw);
1064 session = msgw->session;
1065 session->msgs_in_queue--;
1066 GNUNET_assert (session->bytes_in_queue >= msgw->msgsize);
1067 session->bytes_in_queue -= msgw->msgsize;
1068 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
1142 plugin->bytes_in_queue -= msgw->msgsize; 1069 plugin->bytes_in_queue -= msgw->msgsize;
1143 GNUNET_STATISTICS_set (plugin->env->stats, 1070 GNUNET_STATISTICS_set (plugin->env->stats,
1144 "# bytes currently in UNIX buffers", 1071 "# bytes currently in UNIX buffers",
1145 plugin->bytes_in_queue, GNUNET_NO); 1072 plugin->bytes_in_queue,
1073 GNUNET_NO);
1146 GNUNET_STATISTICS_update (plugin->env->stats, 1074 GNUNET_STATISTICS_update (plugin->env->stats,
1147 "# UNIX bytes discarded", 1075 "# UNIX bytes discarded",
1148 msgw->msgsize, 1076 msgw->msgsize,
@@ -1178,45 +1106,43 @@ unix_plugin_select_write (struct Plugin *plugin)
1178 1, GNUNET_NO); 1106 1, GNUNET_NO);
1179 return; 1107 return;
1180 } 1108 }
1109 GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
1110 plugin->msg_tail,
1111 msgw);
1112 session = msgw->session;
1113 session->msgs_in_queue--;
1114 GNUNET_assert (session->bytes_in_queue >= msgw->msgsize);
1115 session->bytes_in_queue -= msgw->msgsize;
1116 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
1117 plugin->bytes_in_queue -= msgw->msgsize;
1118 GNUNET_STATISTICS_set (plugin->env->stats,
1119 "# bytes currently in UNIX buffers",
1120 plugin->bytes_in_queue, GNUNET_NO);
1181 if (GNUNET_SYSERR == sent) 1121 if (GNUNET_SYSERR == sent)
1182 { 1122 {
1183 /* failed and no retry */ 1123 /* failed and no retry */
1184 if (NULL != msgw->cont) 1124 if (NULL != msgw->cont)
1185 msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR, msgw->payload, 0); 1125 msgw->cont (msgw->cont_cls,
1186 1126 &msgw->session->target,
1187 GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw); 1127 GNUNET_SYSERR,
1188 1128 msgw->payload, 0);
1189 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
1190 plugin->bytes_in_queue -= msgw->msgsize;
1191 GNUNET_STATISTICS_set (plugin->env->stats,
1192 "# bytes currently in UNIX buffers",
1193 plugin->bytes_in_queue, GNUNET_NO);
1194 GNUNET_STATISTICS_update (plugin->env->stats, 1129 GNUNET_STATISTICS_update (plugin->env->stats,
1195 "# UNIX bytes discarded", 1130 "# UNIX bytes discarded",
1196 msgw->msgsize, 1131 msgw->msgsize,
1197 GNUNET_NO); 1132 GNUNET_NO);
1198
1199 GNUNET_free (msgw->msg); 1133 GNUNET_free (msgw->msg);
1200 GNUNET_free (msgw); 1134 GNUNET_free (msgw);
1201 return; 1135 return;
1202 } 1136 }
1203 /* successfully sent bytes */ 1137 /* successfully sent bytes */
1204 GNUNET_break (sent > 0); 1138 GNUNET_break (sent > 0);
1205 GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
1206 plugin->msg_tail,
1207 msgw);
1208 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
1209 plugin->bytes_in_queue -= msgw->msgsize;
1210 GNUNET_STATISTICS_set (plugin->env->stats,
1211 "# bytes currently in UNIX buffers",
1212 plugin->bytes_in_queue,
1213 GNUNET_NO);
1214 GNUNET_STATISTICS_update (plugin->env->stats, 1139 GNUNET_STATISTICS_update (plugin->env->stats,
1215 "# bytes transmitted via UNIX", 1140 "# bytes transmitted via UNIX",
1216 msgw->msgsize, 1141 msgw->msgsize,
1217 GNUNET_NO); 1142 GNUNET_NO);
1218 if (NULL != msgw->cont) 1143 if (NULL != msgw->cont)
1219 msgw->cont (msgw->cont_cls, &msgw->session->target, 1144 msgw->cont (msgw->cont_cls,
1145 &msgw->session->target,
1220 GNUNET_OK, 1146 GNUNET_OK,
1221 msgw->payload, 1147 msgw->payload,
1222 msgw->msgsize); 1148 msgw->msgsize);
@@ -1226,47 +1152,160 @@ unix_plugin_select_write (struct Plugin *plugin)
1226 1152
1227 1153
1228/** 1154/**
1229 * We have been notified that our writeset has something to read. We don't 1155 * We have been notified that our socket has something to read.
1230 * know which socket needs to be read, so we have to check each one
1231 * Then reschedule this function to be called again once more is available. 1156 * Then reschedule this function to be called again once more is available.
1232 * 1157 *
1233 * @param cls the plugin handle 1158 * @param cls the plugin handle
1234 * @param tc the scheduling context (for rescheduling this function again) 1159 * @param tc the scheduling context
1235 */ 1160 */
1236static void 1161static void
1237unix_plugin_select (void *cls, 1162unix_plugin_select_read (void *cls,
1238 const struct GNUNET_SCHEDULER_TaskContext *tc) 1163 const struct GNUNET_SCHEDULER_TaskContext *tc)
1239{ 1164{
1240 struct Plugin *plugin = cls; 1165 struct Plugin *plugin = cls;
1241 1166
1242 plugin->select_task = GNUNET_SCHEDULER_NO_TASK; 1167 plugin->read_task = GNUNET_SCHEDULER_NO_TASK;
1243 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) 1168 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1244 return; 1169 return;
1170 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
1171 unix_plugin_do_read (plugin);
1172 plugin->read_task =
1173 GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1174 plugin->unix_sock.desc,
1175 &unix_plugin_select_read, plugin);
1176}
1245 1177
1246 if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
1247 {
1248 /* Ready to send data */
1249 GNUNET_assert (GNUNET_NETWORK_fdset_isset
1250 (tc->write_ready, plugin->unix_sock.desc));
1251 if (NULL != plugin->msg_head)
1252 unix_plugin_select_write (plugin);
1253 }
1254 1178
1255 if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0) 1179/**
1180 * We have been notified that our socket is ready to write.
1181 * Then reschedule this function to be called again once more is available.
1182 *
1183 * @param cls the plugin handle
1184 * @param tc the scheduling context
1185 */
1186static void
1187unix_plugin_select_write (void *cls,
1188 const struct GNUNET_SCHEDULER_TaskContext *tc)
1189{
1190 struct Plugin *plugin = cls;
1191
1192 plugin->write_task = GNUNET_SCHEDULER_NO_TASK;
1193 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1194 return;
1195 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
1196 unix_plugin_do_write (plugin);
1197 if (NULL == plugin->msg_head)
1198 return; /* write queue empty */
1199 plugin->write_task =
1200 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
1201 plugin->unix_sock.desc,
1202 &unix_plugin_select_write, plugin);
1203}
1204
1205
1206/**
1207 * Function that can be used by the transport service to transmit
1208 * a message using the plugin. Note that in the case of a
1209 * peer disconnecting, the continuation MUST be called
1210 * prior to the disconnect notification itself. This function
1211 * will be called with this peer's HELLO message to initiate
1212 * a fresh connection to another peer.
1213 *
1214 * @param cls closure
1215 * @param session which session must be used
1216 * @param msgbuf the message to transmit
1217 * @param msgbuf_size number of bytes in @a msgbuf
1218 * @param priority how important is the message (most plugins will
1219 * ignore message priority and just FIFO)
1220 * @param to how long to wait at most for the transmission (does not
1221 * require plugins to discard the message after the timeout,
1222 * just advisory for the desired delay; most plugins will ignore
1223 * this as well)
1224 * @param cont continuation to call once the message has
1225 * been transmitted (or if the transport is ready
1226 * for the next transmission call; or if the
1227 * peer disconnected...); can be NULL
1228 * @param cont_cls closure for @a cont
1229 * @return number of bytes used (on the physical network, with overheads);
1230 * -1 on hard errors (i.e. address invalid); 0 is a legal value
1231 * and does NOT mean that the message was not transmitted (DV)
1232 */
1233static ssize_t
1234unix_plugin_send (void *cls,
1235 struct Session *session,
1236 const char *msgbuf,
1237 size_t msgbuf_size,
1238 unsigned int priority,
1239 struct GNUNET_TIME_Relative to,
1240 GNUNET_TRANSPORT_TransmitContinuation cont,
1241 void *cont_cls)
1242{
1243 struct Plugin *plugin = cls;
1244 struct UNIXMessageWrapper *wrapper;
1245 struct UNIXMessage *message;
1246 int ssize;
1247
1248 if (GNUNET_OK !=
1249 GNUNET_CONTAINER_multipeermap_contains_value (plugin->session_map,
1250 &session->target,
1251 session))
1256 { 1252 {
1257 /* Ready to receive data */ 1253 LOG (GNUNET_ERROR_TYPE_ERROR,
1258 GNUNET_assert (GNUNET_NETWORK_fdset_isset 1254 "Invalid session for peer `%s' `%s'\n",
1259 (tc->read_ready, plugin->unix_sock.desc)); 1255 GNUNET_i2s (&session->target),
1260 unix_plugin_select_read (plugin); 1256 unix_address_to_string(NULL,
1257 session->address->address,
1258 session->address->address_length));
1259 GNUNET_break (0);
1260 return GNUNET_SYSERR;
1261 } 1261 }
1262 reschedule_select (plugin); 1262 LOG (GNUNET_ERROR_TYPE_DEBUG,
1263 "Sending %u bytes with session for peer `%s' `%s'\n",
1264 msgbuf_size,
1265 GNUNET_i2s (&session->target),
1266 unix_address_to_string (NULL,
1267 session->address->address,
1268 session->address->address_length));
1269 ssize = sizeof (struct UNIXMessage) + msgbuf_size;
1270 message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size);
1271 message->header.size = htons (ssize);
1272 message->header.type = htons (0);
1273 memcpy (&message->sender, plugin->env->my_identity,
1274 sizeof (struct GNUNET_PeerIdentity));
1275 memcpy (&message[1], msgbuf, msgbuf_size);
1276 wrapper = GNUNET_new (struct UNIXMessageWrapper);
1277 wrapper->msg = message;
1278 wrapper->msgsize = ssize;
1279 wrapper->payload = msgbuf_size;
1280 wrapper->priority = priority;
1281 wrapper->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
1282 to);
1283 wrapper->cont = cont;
1284 wrapper->cont_cls = cont_cls;
1285 wrapper->session = session;
1286 GNUNET_CONTAINER_DLL_insert_tail (plugin->msg_head,
1287 plugin->msg_tail,
1288 wrapper);
1289 plugin->bytes_in_queue += ssize;
1290 session->bytes_in_queue += ssize;
1291 session->msgs_in_queue++;
1292 GNUNET_STATISTICS_set (plugin->env->stats,
1293 "# bytes currently in UNIX buffers",
1294 plugin->bytes_in_queue,
1295 GNUNET_NO);
1296 if (GNUNET_SCHEDULER_NO_TASK == plugin->write_task)
1297 plugin->write_task =
1298 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
1299 plugin->unix_sock.desc,
1300 &unix_plugin_select_write, plugin);
1301 return ssize;
1263} 1302}
1264 1303
1265 1304
1266/** 1305/**
1267 * Create a slew of UNIX sockets. If possible, use IPv6 and IPv4. 1306 * Create a slew of UNIX sockets. If possible, use IPv6 and IPv4.
1268 * 1307 *
1269 * @param cls closure for server start, should be a struct Plugin * 1308 * @param cls closure for server start, should be a `struct Plugin *`
1270 * @return number of sockets created or #GNUNET_SYSERR on error 1309 * @return number of sockets created or #GNUNET_SYSERR on error
1271 */ 1310 */
1272static int 1311static int
@@ -1278,7 +1317,7 @@ unix_transport_server_start (void *cls)
1278 1317
1279 un = unix_address_to_sockaddr (plugin->unix_socket_path, 1318 un = unix_address_to_sockaddr (plugin->unix_socket_path,
1280 &un_len); 1319 &un_len);
1281 if (GNUNET_YES == plugin->abstract) 1320 if (GNUNET_YES == plugin->is_abstract)
1282 { 1321 {
1283 plugin->unix_socket_path[0] = '@'; 1322 plugin->unix_socket_path[0] = '@';
1284 un->sun_path[0] = '\0'; 1323 un->sun_path[0] = '\0';
@@ -1304,7 +1343,8 @@ unix_transport_server_start (void *cls)
1304 } 1343 }
1305 } 1344 }
1306 if (GNUNET_OK != 1345 if (GNUNET_OK !=
1307 GNUNET_NETWORK_socket_bind (plugin->unix_sock.desc, (const struct sockaddr *) un, un_len)) 1346 GNUNET_NETWORK_socket_bind (plugin->unix_sock.desc,
1347 (const struct sockaddr *) un, un_len))
1308 { 1348 {
1309 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "bind"); 1349 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "bind");
1310 GNUNET_NETWORK_socket_close (plugin->unix_sock.desc); 1350 GNUNET_NETWORK_socket_close (plugin->unix_sock.desc);
@@ -1312,15 +1352,13 @@ unix_transport_server_start (void *cls)
1312 GNUNET_free (un); 1352 GNUNET_free (un);
1313 return GNUNET_SYSERR; 1353 return GNUNET_SYSERR;
1314 } 1354 }
1315 LOG (GNUNET_ERROR_TYPE_DEBUG, "Bound to `%s'\n", plugin->unix_socket_path); 1355 LOG (GNUNET_ERROR_TYPE_DEBUG,
1316 plugin->rs = GNUNET_NETWORK_fdset_create (); 1356 "Bound to `%s'\n",
1317 plugin->ws = GNUNET_NETWORK_fdset_create (); 1357 plugin->unix_socket_path);
1318 GNUNET_NETWORK_fdset_zero (plugin->rs); 1358 plugin->read_task =
1319 GNUNET_NETWORK_fdset_zero (plugin->ws); 1359 GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1320 GNUNET_NETWORK_fdset_set (plugin->rs, plugin->unix_sock.desc); 1360 plugin->unix_sock.desc,
1321 GNUNET_NETWORK_fdset_set (plugin->ws, plugin->unix_sock.desc); 1361 &unix_plugin_select_read, plugin);
1322
1323 reschedule_select (plugin);
1324 GNUNET_free (un); 1362 GNUNET_free (un);
1325 return 1; 1363 return 1;
1326} 1364}
@@ -1430,7 +1468,8 @@ unix_plugin_address_pretty_printer (void *cls, const char *type,
1430 */ 1468 */
1431static int 1469static int
1432unix_string_to_address (void *cls, 1470unix_string_to_address (void *cls,
1433 const char *addr, uint16_t addrlen, 1471 const char *addr,
1472 uint16_t addrlen,
1434 void **buf, size_t *added) 1473 void **buf, size_t *added)
1435{ 1474{
1436 struct UnixAddress *ua; 1475 struct UnixAddress *ua;
@@ -1538,28 +1577,6 @@ address_notification (void *cls,
1538 1577
1539 1578
1540/** 1579/**
1541 * Increment session timeout due to activity
1542 *
1543 * @param s session for which the timeout should be rescheduled
1544 */
1545static void
1546reschedule_session_timeout (struct Session *s)
1547{
1548 GNUNET_assert (NULL != s);
1549 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
1550 GNUNET_SCHEDULER_cancel (s->timeout_task);
1551 s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1552 &session_timeout,
1553 s);
1554 LOG (GNUNET_ERROR_TYPE_DEBUG,
1555 "Timeout rescheduled for session %p set to %s\n",
1556 s,
1557 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1558 GNUNET_YES));
1559}
1560
1561
1562/**
1563 * Function called on sessions to disconnect 1580 * Function called on sessions to disconnect
1564 * 1581 *
1565 * @param cls the plugin 1582 * @param cls the plugin
@@ -1619,12 +1636,14 @@ send_session_info_iter (void *cls,
1619 struct GNUNET_TRANSPORT_SessionInfo info; 1636 struct GNUNET_TRANSPORT_SessionInfo info;
1620 1637
1621 memset (&info, 0, sizeof (info)); 1638 memset (&info, 0, sizeof (info));
1622 info.state = GNUNET_TRANSPORT_SS_UP; /* ??? */ 1639 info.state = GNUNET_TRANSPORT_SS_UP; /* all are up if we have them */
1623 // FIXME: info->is_inbound = ? 1640 info.is_inbound = GNUNET_SYSERR; /* hard to say */
1624 // FIXME: info->num_msg_pending = ? 1641 info.num_msg_pending = session->msgs_in_queue;
1625 // FIXME: info->num_bytes_pending = ? 1642 info.num_bytes_pending = session->bytes_in_queue;
1626 // FIXME: info->receive_delay = ? 1643 /* info.receive_delay remains zero as this is not supported by UNIX
1627 // FIXME: info->session_timeout = ? 1644 (cannot selectively not receive from 'some' peer while continuing
1645 to receive from others) */
1646 info.session_timeout = session->timeout;
1628 info.address = session->address; 1647 info.address = session->address;
1629 plugin->sic (plugin->sic_cls, 1648 plugin->sic (plugin->sic_cls,
1630 session, 1649 session,
@@ -1690,11 +1709,14 @@ libgnunet_plugin_transport_unix_init (void *cls)
1690 1709
1691 plugin = GNUNET_new (struct Plugin); 1710 plugin = GNUNET_new (struct Plugin);
1692 if (GNUNET_OK != 1711 if (GNUNET_OK !=
1693 GNUNET_CONFIGURATION_get_value_filename(env->cfg, "transport-unix", "UNIXPATH", 1712 GNUNET_CONFIGURATION_get_value_filename (env->cfg,
1694 &plugin->unix_socket_path)) 1713 "transport-unix",
1714 "UNIXPATH",
1715 &plugin->unix_socket_path))
1695 { 1716 {
1696 LOG (GNUNET_ERROR_TYPE_ERROR, 1717 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
1697 _("No UNIXPATH given in configuration!\n")); 1718 "transport-unix",
1719 "UNIXPATH");
1698 GNUNET_free (plugin); 1720 GNUNET_free (plugin);
1699 return NULL; 1721 return NULL;
1700 } 1722 }
@@ -1703,14 +1725,13 @@ libgnunet_plugin_transport_unix_init (void *cls)
1703 1725
1704 /* Initialize my flags */ 1726 /* Initialize my flags */
1705#ifdef LINUX 1727#ifdef LINUX
1706 plugin->abstract = GNUNET_CONFIGURATION_get_value_yesno (plugin->env->cfg, 1728 plugin->is_abstract = GNUNET_CONFIGURATION_get_value_yesno (plugin->env->cfg,
1707 "testing", "USE_ABSTRACT_SOCKETS"); 1729 "testing",
1730 "USE_ABSTRACT_SOCKETS");
1708#endif 1731#endif
1709 plugin->myoptions = UNIX_OPTIONS_NONE; 1732 plugin->myoptions = UNIX_OPTIONS_NONE;
1710 if (GNUNET_YES == plugin->abstract) 1733 if (GNUNET_YES == plugin->is_abstract)
1711 {
1712 plugin->myoptions = UNIX_OPTIONS_USE_ABSTRACT_SOCKETS; 1734 plugin->myoptions = UNIX_OPTIONS_USE_ABSTRACT_SOCKETS;
1713 }
1714 1735
1715 api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions); 1736 api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
1716 api->cls = plugin; 1737 api->cls = plugin;
@@ -1758,6 +1779,7 @@ libgnunet_plugin_transport_unix_done (void *cls)
1758 struct UNIXMessageWrapper * msgw; 1779 struct UNIXMessageWrapper * msgw;
1759 struct UnixAddress *ua; 1780 struct UnixAddress *ua;
1760 size_t len; 1781 size_t len;
1782 struct Session *session;
1761 1783
1762 if (NULL == plugin) 1784 if (NULL == plugin)
1763 { 1785 {
@@ -1787,6 +1809,12 @@ libgnunet_plugin_transport_unix_done (void *cls)
1787 GNUNET_CONTAINER_DLL_remove (plugin->msg_head, 1809 GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
1788 plugin->msg_tail, 1810 plugin->msg_tail,
1789 msgw); 1811 msgw);
1812 session = msgw->session;
1813 session->msgs_in_queue--;
1814 GNUNET_assert (session->bytes_in_queue >= msgw->msgsize);
1815 session->bytes_in_queue -= msgw->msgsize;
1816 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
1817 plugin->bytes_in_queue -= msgw->msgsize;
1790 if (NULL != msgw->cont) 1818 if (NULL != msgw->cont)
1791 msgw->cont (msgw->cont_cls, 1819 msgw->cont (msgw->cont_cls,
1792 &msgw->session->target, 1820 &msgw->session->target,
@@ -1796,10 +1824,15 @@ libgnunet_plugin_transport_unix_done (void *cls)
1796 GNUNET_free (msgw); 1824 GNUNET_free (msgw);
1797 } 1825 }
1798 1826
1799 if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task) 1827 if (GNUNET_SCHEDULER_NO_TASK != plugin->read_task)
1828 {
1829 GNUNET_SCHEDULER_cancel (plugin->read_task);
1830 plugin->read_task = GNUNET_SCHEDULER_NO_TASK;
1831 }
1832 if (GNUNET_SCHEDULER_NO_TASK != plugin->write_task)
1800 { 1833 {
1801 GNUNET_SCHEDULER_cancel (plugin->select_task); 1834 GNUNET_SCHEDULER_cancel (plugin->write_task);
1802 plugin->select_task = GNUNET_SCHEDULER_NO_TASK; 1835 plugin->write_task = GNUNET_SCHEDULER_NO_TASK;
1803 } 1836 }
1804 if (GNUNET_SCHEDULER_NO_TASK != plugin->address_update_task) 1837 if (GNUNET_SCHEDULER_NO_TASK != plugin->address_update_task)
1805 { 1838 {
@@ -1811,15 +1844,12 @@ libgnunet_plugin_transport_unix_done (void *cls)
1811 GNUNET_break (GNUNET_OK == 1844 GNUNET_break (GNUNET_OK ==
1812 GNUNET_NETWORK_socket_close (plugin->unix_sock.desc)); 1845 GNUNET_NETWORK_socket_close (plugin->unix_sock.desc));
1813 plugin->unix_sock.desc = NULL; 1846 plugin->unix_sock.desc = NULL;
1814 plugin->with_ws = GNUNET_NO;
1815 } 1847 }
1816 GNUNET_CONTAINER_multipeermap_iterate (plugin->session_map, 1848 GNUNET_CONTAINER_multipeermap_iterate (plugin->session_map,
1817 &get_session_delete_it, plugin); 1849 &get_session_delete_it,
1850 plugin);
1818 GNUNET_CONTAINER_multipeermap_destroy (plugin->session_map); 1851 GNUNET_CONTAINER_multipeermap_destroy (plugin->session_map);
1819 if (NULL != plugin->rs) 1852 GNUNET_break (0 == plugin->bytes_in_queue);
1820 GNUNET_NETWORK_fdset_destroy (plugin->rs);
1821 if (NULL != plugin->ws)
1822 GNUNET_NETWORK_fdset_destroy (plugin->ws);
1823 GNUNET_free (plugin->unix_socket_path); 1853 GNUNET_free (plugin->unix_socket_path);
1824 GNUNET_free (plugin); 1854 GNUNET_free (plugin);
1825 GNUNET_free (api); 1855 GNUNET_free (api);