aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2018-11-08 11:32:03 +0100
committerChristian Grothoff <christian@grothoff.org>2018-11-08 11:32:03 +0100
commita18d1f2587ca5df5a9c6e47c012bfbaf3f19098c (patch)
tree890025fef864f380c9d2ca40599f52e0e60b1c14 /src/transport
parent49b581dd1c00d769e97031c51b5865846e802f8f (diff)
downloadgnunet-a18d1f2587ca5df5a9c6e47c012bfbaf3f19098c.tar.gz
gnunet-a18d1f2587ca5df5a9c6e47c012bfbaf3f19098c.zip
work on UNIX communicator
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/gnunet-communicator-unix.c1072
-rw-r--r--src/transport/transport_api2_communication.c1
2 files changed, 315 insertions, 758 deletions
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c
index 373b74149..f07975186 100644
--- a/src/transport/gnunet-communicator-unix.c
+++ b/src/transport/gnunet-communicator-unix.c
@@ -31,6 +31,16 @@
31#include "gnunet_transport_communication_service.h" 31#include "gnunet_transport_communication_service.h"
32 32
33/** 33/**
34 * How many messages do we keep at most in the queue to the
35 * transport service before we start to drop (default,
36 * can be changed via the configuration file).
37 * Should be _below_ the level of the communicator API, as
38 * otherwise we may read messages just to have them dropped
39 * by the communicator API.
40 */
41#define DEFAULT_MAX_QUEUE_LENGTH 8
42
43/**
34 * Name of the communicator. 44 * Name of the communicator.
35 */ 45 */
36#define COMMUNICATOR_NAME "unix" 46#define COMMUNICATOR_NAME "unix"
@@ -59,63 +69,6 @@ GNUNET_NETWORK_STRUCT_END
59 69
60 70
61/** 71/**
62 * Information we track for a message awaiting transmission.
63 */
64struct UNIXMessageWrapper
65{
66 /**
67 * We keep messages in a doubly linked list.
68 */
69 struct UNIXMessageWrapper *next;
70
71 /**
72 * We keep messages in a doubly linked list.
73 */
74 struct UNIXMessageWrapper *prev;
75
76 /**
77 * The actual payload (allocated separately right now).
78 */
79 struct UNIXMessage *msg;
80
81 /**
82 * Queue this message belongs to.
83 */
84 struct Queue *queue;
85
86 /**
87 * Function to call upon transmission.
88 */
89 GNUNET_TRANSPORT_TransmitContinuation cont;
90
91 /**
92 * Closure for @e cont.
93 */
94 void *cont_cls;
95
96 /**
97 * Timeout for this message.
98 */
99 struct GNUNET_TIME_Absolute timeout;
100
101 /**
102 * Number of bytes in @e msg.
103 */
104 size_t msgsize;
105
106 /**
107 * Number of bytes of payload encapsulated in @e msg.
108 */
109 size_t payload;
110
111 /**
112 * Priority of the message (ignored, just dragged along in UNIX).
113 */
114 unsigned int priority;
115};
116
117
118/**
119 * Handle for a queue. 72 * Handle for a queue.
120 */ 73 */
121struct Queue 74struct Queue
@@ -132,10 +85,7 @@ struct Queue
132 struct Queue *prev; 85 struct Queue *prev;
133 86
134 /** 87 /**
135 * To whom are we talking to (set to our identity 88 * To whom are we talking to.
136 * if we are still waiting for the welcome message).
137 *
138 * FIXME: information duplicated with 'peer' in address!
139 */ 89 */
140 struct GNUNET_PeerIdentity target; 90 struct GNUNET_PeerIdentity target;
141 91
@@ -150,6 +100,12 @@ struct Queue
150 socklen_t address_len; 100 socklen_t address_len;
151 101
152 /** 102 /**
103 * Message currently scheduled for transmission, non-NULL if and only
104 * if this queue is in the #queue_head DLL.
105 */
106 const struct GNUNET_MessageHeader *msg;
107
108 /**
153 * Message queue we are providing for the #ch. 109 * Message queue we are providing for the #ch.
154 */ 110 */
155 struct GNUNET_MQ_Handle *mq; 111 struct GNUNET_MQ_Handle *mq;
@@ -172,17 +128,11 @@ struct Queue
172 /** 128 /**
173 * Queue timeout task. 129 * Queue timeout task.
174 */ 130 */
175 struct GNUNET_SCHEDULER_Task * timeout_task; 131 struct GNUNET_SCHEDULER_Task *timeout_task;
176
177 /**
178 * Number of messages we currently have in our write queue.
179 */
180 unsigned int msgs_in_queue;
181 132
182}; 133};
183 134
184 135
185
186/** 136/**
187 * ID of read task 137 * ID of read task
188 */ 138 */
@@ -194,9 +144,14 @@ static struct GNUNET_SCHEDULER_Task *read_task;
194static struct GNUNET_SCHEDULER_Task *write_task; 144static struct GNUNET_SCHEDULER_Task *write_task;
195 145
196/** 146/**
197 * Number of bytes we currently have in our write queues. 147 * Number of messages we currently have in our queues towards the transport service.
198 */ 148 */
199static unsigned long long bytes_in_queue; 149static unsigned long long delivering_messages;
150
151/**
152 * Maximum queue length before we stop reading towards the transport service.
153 */
154static unsigned long long max_queue_length;
200 155
201/** 156/**
202 * Our environment. 157 * Our environment.
@@ -211,12 +166,12 @@ static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
211/** 166/**
212 * Head of queue of messages to transmit. 167 * Head of queue of messages to transmit.
213 */ 168 */
214static struct UNIXMessageWrapper *msg_head; 169static struct Queue *queue_head;
215 170
216/** 171/**
217 * Tail of queue of messages to transmit. 172 * Tail of queue of messages to transmit.
218 */ 173 */
219static struct UNIXMessageWrapper *msg_tail; 174static struct Queue *queue_tail;
220 175
221/** 176/**
222 * socket that we transmit all data with 177 * socket that we transmit all data with
@@ -230,101 +185,6 @@ static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
230 185
231 186
232/** 187/**
233 * If a queue monitor is attached, notify it about the new
234 * queue state.
235 *
236 * @param plugin our plugin
237 * @param queue queue that changed state
238 * @param state new state of the queue
239 */
240static void
241notify_queue_monitor (struct Plugin *plugin,
242 struct Queue *queue,
243 enum GNUNET_TRANSPORT_QueueState state)
244{
245 struct GNUNET_TRANSPORT_QueueInfo info;
246
247 if (NULL == plugin->sic)
248 return;
249 memset (&info, 0, sizeof (info));
250 info.state = state;
251 info.is_inbound = GNUNET_SYSERR; /* hard to say */
252 info.num_msg_pending = queue->msgs_in_queue;
253 info.num_bytes_pending = queue->bytes_in_queue;
254 /* info.receive_delay remains zero as this is not supported by UNIX
255 (cannot selectively not receive from 'some' peer while continuing
256 to receive from others) */
257 info.queue_timeout = queue->timeout;
258 info.address = queue->address;
259 plugin->sic (plugin->sic_cls,
260 queue,
261 &info);
262}
263
264
265/**
266 * Function called for a quick conversion of the binary address to
267 * a numeric address. Note that the caller must not free the
268 * address and that the next call to this function is allowed
269 * to override the address again.
270 *
271 * @param cls closure
272 * @param addr binary address
273 * @param addrlen length of the @a addr
274 * @return string representing the same address
275 */
276static const char *
277unix_plugin_address_to_string (void *cls,
278 const void *addr,
279 size_t addrlen)
280{
281 static char rbuf[1024];
282 struct UnixAddress *ua = (struct UnixAddress *) addr;
283 char *addrstr;
284 size_t addr_str_len;
285 unsigned int off;
286
287 if ((NULL == addr) || (sizeof (struct UnixAddress) > addrlen))
288 {
289 GNUNET_break(0);
290 return NULL;
291 }
292 addrstr = (char *) &ua[1];
293 addr_str_len = ntohl (ua->addrlen);
294
295 if (addr_str_len != addrlen - sizeof(struct UnixAddress))
296 {
297 GNUNET_break(0);
298 return NULL;
299 }
300 if ('\0' != addrstr[addr_str_len - 1])
301 {
302 GNUNET_break(0);
303 return NULL;
304 }
305 if (strlen (addrstr) + 1 != addr_str_len)
306 {
307 GNUNET_break(0);
308 return NULL;
309 }
310
311 off = 0;
312 if ('\0' == addrstr[0])
313 off++;
314 memset (rbuf, 0, sizeof (rbuf));
315 GNUNET_snprintf (rbuf,
316 sizeof (rbuf) - 1,
317 "%s.%u.%s%.*s",
318 PLUGIN_NAME,
319 ntohl (ua->options),
320 (off == 1) ? "@" : "",
321 (int) (addr_str_len - off),
322 &addrstr[off]);
323 return rbuf;
324}
325
326
327/**
328 * Functions with this signature are called whenever we need 188 * Functions with this signature are called whenever we need
329 * to close a queue due to a disconnect or failure to 189 * to close a queue due to a disconnect or failure to
330 * establish a connection. 190 * establish a connection.
@@ -332,58 +192,40 @@ unix_plugin_address_to_string (void *cls,
332 * @param queue queue to close down 192 * @param queue queue to close down
333 */ 193 */
334static void 194static void
335unix_plugin_queue_disconnect (struct Queue *queue) 195queue_destroy (struct Queue *queue)
336{ 196{
337 struct Plugin *plugin = cls; 197 struct Plugin *plugin = cls;
338 struct UNIXMessageWrapper *msgw; 198 struct GNUNET_MQ_Handle *mq;
339 struct UNIXMessageWrapper *next;
340 199
341 LOG (GNUNET_ERROR_TYPE_DEBUG, 200 LOG (GNUNET_ERROR_TYPE_DEBUG,
342 "Disconnecting queue for peer `%s'\n", 201 "Disconnecting queue for peer `%s'\n",
343 GNUNET_i2s (&queue->target)); 202 GNUNET_i2s (&queue->target));
344 plugin->env->queue_end (plugin->env->cls, 203 if (0 != queue->bytes_in_queue)
345 queue->address,
346 queue);
347 next = plugin->msg_head;
348 while (NULL != next)
349 { 204 {
350 msgw = next; 205 GNUNET_CONTAINER_DLL_remove (queue_head,
351 next = msgw->next; 206 queue_tail,
352 if (msgw->queue != queue) 207 queue);
353 continue; 208 queue->bytes_in_queue = 0;
354 GNUNET_CONTAINER_DLL_remove (plugin->msg_head, 209 }
355 plugin->msg_tail, 210 if (NULL != (mq = queue->mq))
356 msgw); 211 {
357 queue->msgs_in_queue--; 212 queue->mq = NULL;
358 GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize); 213 GNUNET_MQ_destroy (mq);
359 queue->bytes_in_queue -= msgw->msgsize;
360 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
361 plugin->bytes_in_queue -= msgw->msgsize;
362 if (NULL != msgw->cont)
363 msgw->cont (msgw->cont_cls,
364 &msgw->queue->target,
365 GNUNET_SYSERR,
366 msgw->payload, 0);
367 GNUNET_free (msgw->msg);
368 GNUNET_free (msgw);
369 } 214 }
370 GNUNET_assert (GNUNET_YES == 215 GNUNET_assert (GNUNET_YES ==
371 GNUNET_CONTAINER_multipeermap_remove (plugin->queue_map, 216 GNUNET_CONTAINER_multipeermap_remove (queue_map,
372 &queue->target, 217 &queue->target,
373 queue)); 218 queue));
374 GNUNET_STATISTICS_set (stats, 219 GNUNET_STATISTICS_set (stats,
375 "# UNIX queues active", 220 "# UNIX queues active",
376 GNUNET_CONTAINER_multipeermap_size (plugin->queue_map), 221 GNUNET_CONTAINER_multipeermap_size (queue_map),
377 GNUNET_NO); 222 GNUNET_NO);
378 if (NULL != queue->timeout_task) 223 if (NULL != queue->timeout_task)
379 { 224 {
380 GNUNET_SCHEDULER_cancel (queue->timeout_task); 225 GNUNET_SCHEDULER_cancel (queue->timeout_task);
381 queue->timeout_task = NULL; 226 queue->timeout_task = NULL;
382 queue->timeout = GNUNET_TIME_UNIT_ZERO_ABS;
383 } 227 }
384 GNUNET_free (queue->address); 228 GNUNET_free (queue->address);
385 GNUNET_break (0 == queue->bytes_in_queue);
386 GNUNET_break (0 == queue->msgs_in_queue);
387 GNUNET_free (queue); 229 GNUNET_free (queue);
388} 230}
389 231
@@ -416,7 +258,7 @@ queue_timeout (void *cls)
416 queue, 258 queue,
417 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 259 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
418 GNUNET_YES)); 260 GNUNET_YES));
419 unix_plugin_queue_disconnect (queue); 261 queue_destroy (queue);
420} 262}
421 263
422 264
@@ -458,7 +300,9 @@ unix_address_to_sockaddr (const char *unixpath,
458 slen = strlen (unixpath); 300 slen = strlen (unixpath);
459 if (slen >= sizeof (un->sun_path)) 301 if (slen >= sizeof (un->sun_path))
460 slen = sizeof (un->sun_path) - 1; 302 slen = sizeof (un->sun_path) - 1;
461 GNUNET_memcpy (un->sun_path, unixpath, slen); 303 GNUNET_memcpy (un->sun_path,
304 unixpath,
305 slen);
462 un->sun_path[slen] = '\0'; 306 un->sun_path[slen] = '\0';
463 slen = sizeof (struct sockaddr_un); 307 slen = sizeof (struct sockaddr_un);
464#if HAVE_SOCKADDR_UN_SUN_LEN 308#if HAVE_SOCKADDR_UN_SUN_LEN
@@ -545,182 +389,19 @@ lookup_queue (const struct GNUNET_PeerIdentity *peer,
545} 389}
546 390
547 391
548
549/**
550 * Actually send out the message, assume we've got the address and
551 * send_handle squared away!
552 *
553 * @param cls closure
554 * @param send_handle which handle to send message on
555 * @param target who should receive this message (ignored by UNIX)
556 * @param msgbuf one or more GNUNET_MessageHeader(s) strung together
557 * @param msgbuf_size the size of the @a msgbuf to send
558 * @param priority how important is the message (ignored by UNIX)
559 * @param timeout when should we time out (give up) if we can not transmit?
560 * @param addr the addr to send the message to, needs to be a sockaddr for us
561 * @param addrlen the len of @a addr
562 * @param payload bytes payload to send
563 * @param cont continuation to call once the message has
564 * been transmitted (or if the transport is ready
565 * for the next transmission call; or if the
566 * peer disconnected...)
567 * @param cont_cls closure for @a cont
568 * @return on success the number of bytes written, RETRY for retry, -1 on errors
569 */
570static ssize_t
571unix_real_send (void *cls,
572 struct GNUNET_NETWORK_Handle *send_handle,
573 const struct GNUNET_PeerIdentity *target,
574 const char *msgbuf,
575 size_t msgbuf_size,
576 unsigned int priority,
577 struct GNUNET_TIME_Absolute timeout,
578 const struct UnixAddress *addr,
579 size_t addrlen,
580 size_t payload,
581 GNUNET_TRANSPORT_TransmitContinuation cont,
582 void *cont_cls)
583{
584 struct Plugin *plugin = cls;
585 ssize_t sent;
586 struct sockaddr_un *un;
587 socklen_t un_len;
588 const char *unixpath;
589
590 if (NULL == send_handle)
591 {
592 GNUNET_break (0); /* We do not have a send handle */
593 return GNUNET_SYSERR;
594 }
595 if ((NULL == addr) || (0 == addrlen))
596 {
597 GNUNET_break (0); /* Can never send if we don't have an address */
598 return GNUNET_SYSERR;
599 }
600
601 /* Prepare address */
602 unixpath = (const char *) &addr[1];
603 if (NULL == (un = unix_address_to_sockaddr (unixpath,
604 &un_len)))
605 {
606 GNUNET_break (0);
607 return -1;
608 }
609
610 if ((GNUNET_YES == plugin->is_abstract) &&
611 (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & ntohl(addr->options) )) )
612 {
613 un->sun_path[0] = '\0';
614 }
615resend:
616 /* Send the data */
617 sent = GNUNET_NETWORK_socket_sendto (send_handle,
618 msgbuf,
619 msgbuf_size,
620 (const struct sockaddr *) un,
621 un_len);
622 if (GNUNET_SYSERR == sent)
623 {
624 if ( (EAGAIN == errno) ||
625 (ENOBUFS == errno) )
626 {
627 GNUNET_free (un);
628 return RETRY; /* We have to retry later */
629 }
630 if (EMSGSIZE == errno)
631 {
632 socklen_t size = 0;
633 socklen_t len = sizeof (size);
634
635 GNUNET_NETWORK_socket_getsockopt ((struct GNUNET_NETWORK_Handle *)
636 send_handle, SOL_SOCKET, SO_SNDBUF, &size,
637 &len);
638 if (size < msgbuf_size)
639 {
640 LOG (GNUNET_ERROR_TYPE_DEBUG,
641 "Trying to increase socket buffer size from %u to %u for message size %u\n",
642 (unsigned int) size,
643 (unsigned int) ((msgbuf_size / 1000) + 2) * 1000,
644 (unsigned int) msgbuf_size);
645 size = ((msgbuf_size / 1000) + 2) * 1000;
646 if (GNUNET_OK ==
647 GNUNET_NETWORK_socket_setsockopt ((struct GNUNET_NETWORK_Handle *) send_handle,
648 SOL_SOCKET, SO_SNDBUF,
649 &size, sizeof (size)))
650 goto resend; /* Increased buffer size, retry sending */
651 else
652 {
653 /* Could not increase buffer size: error, no retry */
654 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt");
655 GNUNET_free (un);
656 return GNUNET_SYSERR;
657 }
658 }
659 else
660 {
661 /* Buffer is bigger than message: error, no retry
662 * This should never happen!*/
663 GNUNET_break (0);
664 GNUNET_free (un);
665 return GNUNET_SYSERR;
666 }
667 }
668 }
669
670 LOG (GNUNET_ERROR_TYPE_DEBUG,
671 "UNIX transmitted %u-byte message to %s (%d: %s)\n",
672 (unsigned int) msgbuf_size,
673 GNUNET_a2s ((const struct sockaddr *)un, un_len),
674 (int) sent,
675 (sent < 0) ? STRERROR (errno) : "ok");
676 GNUNET_free (un);
677 return sent;
678}
679
680
681/** 392/**
682 * Function obtain the network type for a queue 393 * Creates a new outbound queue the transport service will use to send
394 * data to another peer.
683 * 395 *
684 * @param cls closure ('struct Plugin*') 396 * @param peer the target peer
685 * @param queue the queue 397 * @param un the address
686 * @return the network type in HBO or #GNUNET_SYSERR 398 * @param un_len number of bytes in @a un
687 */
688static enum GNUNET_ATS_Network_Type
689unix_plugin_get_network (void *cls,
690 struct Queue *queue)
691{
692 GNUNET_assert (NULL != queue);
693 return GNUNET_ATS_NET_LOOPBACK;
694}
695
696
697/**
698 * Function obtain the network type for a queue
699 *
700 * @param cls closure (`struct Plugin *`)
701 * @param address the address
702 * @return the network type
703 */
704static enum GNUNET_ATS_Network_Type
705unix_plugin_get_network_for_address (void *cls,
706 const struct GNUNET_HELLO_Address *address)
707
708{
709 return GNUNET_ATS_NET_LOOPBACK;
710}
711
712
713/**
714 * Creates a new outbound queue the transport service will use to send data to the
715 * peer
716 *
717 * @param cls the plugin
718 * @param address the address
719 * @return the queue or NULL of max connections exceeded 399 * @return the queue or NULL of max connections exceeded
720 */ 400 */
721static struct Queue * 401static struct Queue *
722unix_plugin_get_queue (void *cls, 402unix_plugin_get_queue (const struct GNUNET_PeerIdentity *target,
723 const struct GNUNET_HELLO_Address *address) 403 const struct sockaddr_un *un,
404 socklen_t un_len)
724{ 405{
725 struct Plugin *plugin = cls; 406 struct Plugin *plugin = cls;
726 struct Queue *queue; 407 struct Queue *queue;
@@ -728,53 +409,22 @@ unix_plugin_get_queue (void *cls,
728 char * addrstr; 409 char * addrstr;
729 uint32_t addr_str_len; 410 uint32_t addr_str_len;
730 uint32_t addr_option; 411 uint32_t addr_option;
412 char *foreign_addr;
413 int is_abstract;
414
415 if (is_abstract = ('\0' == un.sun_path[0]))
416 un.sun_path[0] = '/';
417 GNUNET_asprintf (&foreign_addr,
418 "%s-%s#%d",
419 COMMUNICATOR_NAME,
420 un.sun_path,
421 is_abstract);
422
731 423
732 ua = (struct UnixAddress *) address->address;
733 if ((NULL == address->address) || (0 == address->address_length) ||
734 (sizeof (struct UnixAddress) > address->address_length))
735 {
736 GNUNET_break (0);
737 return NULL;
738 }
739 addrstr = (char *) &ua[1]; 424 addrstr = (char *) &ua[1];
740 addr_str_len = ntohl (ua->addrlen); 425 addr_str_len = ntohl (ua->addrlen);
741 addr_option = ntohl (ua->options); 426 addr_option = ntohl (ua->options);
742 427
743 if ( (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & addr_option)) &&
744 (GNUNET_NO == plugin->is_abstract))
745 {
746 return NULL;
747 }
748
749 if (addr_str_len != address->address_length - sizeof (struct UnixAddress))
750 {
751 return NULL; /* This can be a legacy address */
752 }
753
754 if ('\0' != addrstr[addr_str_len - 1])
755 {
756 GNUNET_break (0);
757 return NULL;
758 }
759 if (strlen (addrstr) + 1 != addr_str_len)
760 {
761 GNUNET_break (0);
762 return NULL;
763 }
764
765 /* Check if a queue for this address already exists */
766 if (NULL != (queue = lookup_queue (plugin,
767 address)))
768 {
769 LOG (GNUNET_ERROR_TYPE_DEBUG,
770 "Found existing queue %p for address `%s'\n",
771 queue,
772 unix_plugin_address_to_string (NULL,
773 address->address,
774 address->address_length));
775 return queue;
776 }
777
778 /* create a new queue */ 428 /* create a new queue */
779 queue = GNUNET_new (struct Queue); 429 queue = GNUNET_new (struct Queue);
780 queue->target = address->peer; 430 queue->target = address->peer;
@@ -795,14 +445,8 @@ unix_plugin_get_queue (void *cls,
795 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 445 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
796 GNUNET_STATISTICS_set (plugin->env->stats, 446 GNUNET_STATISTICS_set (plugin->env->stats,
797 "# UNIX queues active", 447 "# UNIX queues active",
798 GNUNET_CONTAINER_multipeermap_size (plugin->queue_map), 448 GNUNET_CONTAINER_multipeermap_size (queue_map),
799 GNUNET_NO); 449 GNUNET_NO);
800 notify_queue_monitor (plugin,
801 queue,
802 GNUNET_TRANSPORT_SS_INIT);
803 notify_queue_monitor (plugin,
804 queue,
805 GNUNET_TRANSPORT_SS_UP);
806 return queue; 450 return queue;
807} 451}
808 452
@@ -891,245 +535,146 @@ unix_demultiplexer (struct Plugin *plugin,
891 535
892 536
893/** 537/**
894 * Read from UNIX domain socket (it is ready). 538 * We have been notified that our socket has something to read. Do the
539 * read and reschedule this function to be called again once more is
540 * available.
895 * 541 *
896 * @param plugin the plugin 542 * @param cls NULL
543 */
544static void
545select_read_cb (void *cls);
546
547
548/**
549 * Function called when message was successfully passed to
550 * transport service. Continue read activity.
551 *
552 * @param cls NULL
553 */
554static void
555receive_complete_cb (void *cls)
556{
557 delivering_messages--;
558 if ( (NULL == read_task) &&
559 (delivering_messages < max_queue_length) )
560 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
561 unix_sock,
562 &select_read_cb,
563 NULL);
564}
565
566
567/**
568 * We have been notified that our socket has something to read. Do the
569 * read and reschedule this function to be called again once more is
570 * available.
571 *
572 * @param cls NULL
897 */ 573 */
898static void 574static void
899unix_plugin_do_read (struct Plugin *plugin) 575select_read_cb (void *cls)
900{ 576{
901 char buf[65536] GNUNET_ALIGN; 577 char buf[65536] GNUNET_ALIGN;
902 struct UnixAddress *ua; 578 struct Queue *queue;
903 struct UNIXMessage *msg; 579 const struct UNIXMessage *msg;
904 struct GNUNET_PeerIdentity sender;
905 struct sockaddr_un un; 580 struct sockaddr_un un;
906 socklen_t addrlen; 581 socklen_t addrlen;
907 ssize_t ret; 582 ssize_t ret;
908 int offset; 583 uint16_t msize;
909 int tsize;
910 int is_abstract;
911 char *msgbuf;
912 const struct GNUNET_MessageHeader *currhdr;
913 uint16_t csize;
914 size_t ua_len;
915 584
585 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
586 unix_sock,
587 &select_read_cb,
588 NULL);
916 addrlen = sizeof (un); 589 addrlen = sizeof (un);
917 memset (&un, 0, sizeof (un)); 590 memset (&un,
591 0,
592 sizeof (un));
918 ret = GNUNET_NETWORK_socket_recvfrom (unix_sock, 593 ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
919 buf, sizeof (buf), 594 buf,
595 sizeof (buf),
920 (struct sockaddr *) &un, 596 (struct sockaddr *) &un,
921 &addrlen); 597 &addrlen);
922 if ((GNUNET_SYSERR == ret) && ((errno == EAGAIN) || (errno == ENOBUFS))) 598 if ( (-1 == ret) &&
599 ( (EAGAIN == errno) ||
600 (ENOBUFS == errno) ) )
923 return; 601 return;
924 if (GNUNET_SYSERR == ret) 602 if (-1 == ret)
925 { 603 {
926 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, 604 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
927 "recvfrom"); 605 "recvfrom");
928 return; 606 return;
929 } 607 }
930 else 608 LOG (GNUNET_ERROR_TYPE_DEBUG,
931 { 609 "Read %d bytes from socket %s\n",
932 LOG (GNUNET_ERROR_TYPE_DEBUG, 610 (int) ret,
933 "Read %d bytes from socket %s\n", 611 un.sun_path);
934 (int) ret,
935 un.sun_path);
936 }
937
938 GNUNET_assert (AF_UNIX == (un.sun_family)); 612 GNUNET_assert (AF_UNIX == (un.sun_family));
939 is_abstract = GNUNET_NO;
940 if ('\0' == un.sun_path[0])
941 {
942 un.sun_path[0] = '@';
943 is_abstract = GNUNET_YES;
944 }
945
946 ua_len = sizeof (struct UnixAddress) + strlen (un.sun_path) + 1;
947 ua = GNUNET_malloc (ua_len);
948 ua->addrlen = htonl (strlen (&un.sun_path[0]) +1);
949 GNUNET_memcpy (&ua[1], &un.sun_path[0], strlen (un.sun_path) + 1);
950 if (is_abstract)
951 ua->options = htonl(UNIX_OPTIONS_USE_ABSTRACT_SOCKETS);
952 else
953 ua->options = htonl(UNIX_OPTIONS_NONE);
954
955 msg = (struct UNIXMessage *) buf; 613 msg = (struct UNIXMessage *) buf;
956 csize = ntohs (msg->header.size); 614 msize = ntohs (msg->header.size);
957 if ((csize < sizeof (struct UNIXMessage)) || (csize > ret)) 615 if ( (msize < sizeof (struct UNIXMessage)) ||
616 (msize > ret) )
958 { 617 {
959 GNUNET_break_op (0); 618 GNUNET_break_op (0);
960 GNUNET_free (ua);
961 return; 619 return;
962 } 620 }
963 msgbuf = (char *) &msg[1]; 621 queue = lookup_queue (&msg->sender,
964 GNUNET_memcpy (&sender, 622 un,
965 &msg->sender, 623 addrlen);
966 sizeof (struct GNUNET_PeerIdentity)); 624 if (NULL == queue)
967 offset = 0; 625 queue = setup_queue (&msg->sender,
968 tsize = csize - sizeof (struct UNIXMessage); 626 un,
969 while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) 627 addrlen);
628 if (NULL == queue)
970 { 629 {
971 currhdr = (struct GNUNET_MessageHeader *) &msgbuf[offset]; 630 GNUENT_log (GNUNET_ERROR_TYPE_ERROR,
972 csize = ntohs (currhdr->size); 631 _("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
973 if ((csize < sizeof (struct GNUNET_MessageHeader)) || 632 return;
974 (csize > tsize - offset))
975 {
976 GNUNET_break_op (0);
977 break;
978 }
979 unix_demultiplexer (plugin, &sender, currhdr, ua, ua_len);
980 offset += csize;
981 } 633 }
982 GNUNET_free (ua); 634
983}
984
985
986/**
987 * Write to UNIX domain socket (it is ready).
988 *
989 * @param plugin handle to the plugin
990 */
991static void
992unix_plugin_do_write (struct Plugin *plugin)
993{
994 ssize_t sent = 0;
995 struct UNIXMessageWrapper *msgw;
996 struct Queue *queue;
997 int did_delete;
998 635
999 queue = NULL;
1000 did_delete = GNUNET_NO;
1001 while (NULL != (msgw = plugin->msg_head))
1002 {
1003 if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0)
1004 break; /* Message is ready for sending */
1005 /* Message has a timeout */
1006 did_delete = GNUNET_YES;
1007 LOG (GNUNET_ERROR_TYPE_DEBUG,
1008 "Timeout for message with %u bytes \n",
1009 (unsigned int) msgw->msgsize);
1010 GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
1011 plugin->msg_tail,
1012 msgw);
1013 queue = msgw->queue;
1014 queue->msgs_in_queue--;
1015 GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize);
1016 queue->bytes_in_queue -= msgw->msgsize;
1017 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
1018 plugin->bytes_in_queue -= msgw->msgsize;
1019 GNUNET_STATISTICS_set (plugin->env->stats,
1020 "# bytes currently in UNIX buffers",
1021 plugin->bytes_in_queue,
1022 GNUNET_NO);
1023 GNUNET_STATISTICS_update (plugin->env->stats,
1024 "# UNIX bytes discarded",
1025 msgw->msgsize,
1026 GNUNET_NO);
1027 if (NULL != msgw->cont)
1028 msgw->cont (msgw->cont_cls,
1029 &msgw->queue->target,
1030 GNUNET_SYSERR,
1031 msgw->payload,
1032 0);
1033 GNUNET_free (msgw->msg);
1034 GNUNET_free (msgw);
1035 }
1036 if (NULL == msgw)
1037 { 636 {
1038 if (GNUNET_YES == did_delete) 637 uint16_t offset = 0;
1039 notify_queue_monitor (plugin, 638 uint16_t tsize = msize - sizeof (struct UNIXMessage);
1040 queue, 639 const char *msgbuf = (const char *) &msg[1];
1041 GNUNET_TRANSPORT_SS_UPDATE); 640
1042 return; /* Nothing to send at the moment */ 641 while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
1043 } 642 {
1044 queue = msgw->queue; 643 const struct GNUNET_MessageHeader *currhdr;
1045 sent = unix_real_send (plugin, 644 struct GNUNET_MessageHeader al_hdr;
1046 unix_sock, 645 uint16_t csize;
1047 &queue->target, 646
1048 (const char *) msgw->msg, 647 currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
1049 msgw->msgsize, 648 /* ensure aligned access */
1050 msgw->priority, 649 memcpy (&al_hdr,
1051 msgw->timeout, 650 currhdr,
1052 msgw->queue->address->address, 651 sizeof (al_hdr));
1053 msgw->queue->address->address_length, 652 csize = ntohs (al_hdr.size);
1054 msgw->payload, 653 if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
1055 msgw->cont, msgw->cont_cls); 654 (csize > tsize - offset))
1056 if (RETRY == sent) 655 {
1057 { 656 GNUNET_break_op (0);
1058 GNUNET_STATISTICS_update (plugin->env->stats, 657 break;
1059 "# UNIX retry attempts", 658 }
1060 1, GNUNET_NO); 659 ret = GNUNET_TRANSPORT_communicator_receive (ch,
1061 notify_queue_monitor (plugin, 660 &msg->sender,
1062 queue, 661 currhdr,
1063 GNUNET_TRANSPORT_SS_UPDATE); 662 &receive_complete_cb,
1064 return; 663 NULL);
664 if (GNUNET_SYSERR == ret)
665 return; /* transport not up */
666 if (GNUNET_NO == ret)
667 break;
668 delivering_messages++;
669 offset += csize;
670 }
1065 } 671 }
1066 GNUNET_CONTAINER_DLL_remove (plugin->msg_head, 672 if (delivering_messages >= max_queue_length)
1067 plugin->msg_tail,
1068 msgw);
1069 queue->msgs_in_queue--;
1070 GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize);
1071 queue->bytes_in_queue -= msgw->msgsize;
1072 GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
1073 plugin->bytes_in_queue -= msgw->msgsize;
1074 GNUNET_STATISTICS_set (plugin->env->stats,
1075 "# bytes currently in UNIX buffers",
1076 plugin->bytes_in_queue, GNUNET_NO);
1077 notify_queue_monitor (plugin,
1078 queue,
1079 GNUNET_TRANSPORT_SS_UPDATE);
1080 if (GNUNET_SYSERR == sent)
1081 { 673 {
1082 /* failed and no retry */ 674 /* we should try to apply 'back pressure' */
1083 if (NULL != msgw->cont) 675 GNUNET_SCHEDULER_cancel (read_task);
1084 msgw->cont (msgw->cont_cls, 676 read_task = NULL;
1085 &msgw->queue->target,
1086 GNUNET_SYSERR,
1087 msgw->payload, 0);
1088 GNUNET_STATISTICS_update (plugin->env->stats,
1089 "# UNIX bytes discarded",
1090 msgw->msgsize,
1091 GNUNET_NO);
1092 GNUNET_free (msgw->msg);
1093 GNUNET_free (msgw);
1094 return;
1095 } 677 }
1096 /* successfully sent bytes */
1097 GNUNET_break (sent > 0);
1098 GNUNET_STATISTICS_update (plugin->env->stats,
1099 "# bytes transmitted via UNIX",
1100 msgw->msgsize,
1101 GNUNET_NO);
1102 if (NULL != msgw->cont)
1103 msgw->cont (msgw->cont_cls,
1104 &msgw->queue->target,
1105 GNUNET_OK,
1106 msgw->payload,
1107 msgw->msgsize);
1108 GNUNET_free (msgw->msg);
1109 GNUNET_free (msgw);
1110}
1111
1112
1113/**
1114 * We have been notified that our socket has something to read.
1115 * Then reschedule this function to be called again once more is available.
1116 *
1117 * @param cls the plugin handle
1118 */
1119static void
1120unix_plugin_select_read (void *cls)
1121{
1122 struct Plugin *plugin = cls;
1123 const struct GNUNET_SCHEDULER_TaskContext *tc;
1124
1125 plugin->read_task = NULL;
1126 tc = GNUNET_SCHEDULER_get_task_context ();
1127 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
1128 unix_plugin_do_read (plugin);
1129 plugin->read_task =
1130 GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1131 unix_sock,
1132 &unix_plugin_select_read, plugin);
1133} 678}
1134 679
1135 680
@@ -1137,158 +682,155 @@ unix_plugin_select_read (void *cls)
1137 * We have been notified that our socket is ready to write. 682 * We have been notified that our socket is ready to write.
1138 * Then reschedule this function to be called again once more is available. 683 * Then reschedule this function to be called again once more is available.
1139 * 684 *
1140 * @param cls the plugin handle 685 * @param cls NULL
1141 */ 686 */
1142static void 687static void
1143unix_plugin_select_write (void *cls) 688select_write_cb (void *cls)
1144{ 689{
1145 struct Plugin *plugin = cls; 690 struct Queue *queue = queue_tail;
1146 const struct GNUNET_SCHEDULER_TaskContext *tc; 691 const struct GNUNET_MessageHeader *msg = queue->msg;
1147 692 size_t msg_size = ntohs (msg->size);
1148 plugin->write_task = NULL; 693 ssize_t sent;
1149 tc = GNUNET_SCHEDULER_get_task_context ();
1150 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
1151 unix_plugin_do_write (plugin);
1152 if (NULL == plugin->msg_head)
1153 return; /* write queue empty */
1154 plugin->write_task =
1155 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
1156 unix_sock,
1157 &unix_plugin_select_write, plugin);
1158}
1159
1160 694
1161/** 695 /* take queue of the ready list */
1162 * Function that can be used by the transport service to transmit 696 write_task = NULL;
1163 * a message using the plugin. Note that in the case of a 697 GNUNET_CONTAINER_DLL_remove (queue_head,
1164 * peer disconnecting, the continuation MUST be called 698 queue_tail,
1165 * prior to the disconnect notification itself. This function 699 queue);
1166 * will be called with this peer's HELLO message to initiate 700 if (NULL != queue_head)
1167 * a fresh connection to another peer. 701 write_task =
1168 * 702 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
1169 * @param cls closure 703 unix_sock,
1170 * @param queue which queue must be used 704 &select_write_cb,
1171 * @param msgbuf the message to transmit 705 NULL);
1172 * @param msgbuf_size number of bytes in @a msgbuf
1173 * @param priority how important is the message (most plugins will
1174 * ignore message priority and just FIFO)
1175 * @param to how long to wait at most for the transmission (does not
1176 * require plugins to discard the message after the timeout,
1177 * just advisory for the desired delay; most plugins will ignore
1178 * this as well)
1179 * @param cont continuation to call once the message has
1180 * been transmitted (or if the transport is ready
1181 * for the next transmission call; or if the
1182 * peer disconnected...); can be NULL
1183 * @param cont_cls closure for @a cont
1184 * @return number of bytes used (on the physical network, with overheads);
1185 * -1 on hard errors (i.e. address invalid); 0 is a legal value
1186 * and does NOT mean that the message was not transmitted (DV)
1187 */
1188static ssize_t
1189unix_plugin_send (void *cls,
1190 struct Queue *queue,
1191 const char *msgbuf,
1192 size_t msgbuf_size,
1193 unsigned int priority,
1194 struct GNUNET_TIME_Relative to,
1195 GNUNET_TRANSPORT_TransmitContinuation cont,
1196 void *cont_cls)
1197{
1198 struct Plugin *plugin = cls;
1199 struct UNIXMessageWrapper *wrapper;
1200 struct UNIXMessage *message;
1201 int ssize;
1202 706
1203 if (GNUNET_OK != 707 /* send 'msg' */
1204 GNUNET_CONTAINER_multipeermap_contains_value (plugin->queue_map, 708 queue->msg = NULL;
1205 &queue->target, 709 GNUNET_MQ_impl_send_continue (queue->mq);
1206 queue)) 710 resend:
711 /* Send the data */
712 sent = GNUNET_NETWORK_socket_sendto (unix_sock,
713 queue->msg,
714 msg_size,
715 (const struct sockaddr *) mq->address,
716 mq->address_len);
717 LOG (GNUNET_ERROR_TYPE_DEBUG,
718 "UNIX transmitted message to %s (%d/%u: %s)\n",
719 GNUNET_i2s (&queue->target),
720 (int) sent,
721 (unsigned int) msg_size,
722 (sent < 0) ? STRERROR (errno) : "ok");
723 if (-1 != sent)
724 return; /* all good */
725 switch (errno)
1207 { 726 {
1208 LOG (GNUNET_ERROR_TYPE_ERROR, 727 case EAGAIN:
1209 "Invalid queue for peer `%s' `%s'\n", 728 case ENOBUFS:
1210 GNUNET_i2s (&queue->target), 729 /* We should retry later... */
1211 unix_plugin_address_to_string (NULL, 730 GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG,
1212 queue->address->address, 731 "send");
1213 queue->address->address_length)); 732 return;
1214 GNUNET_break (0); 733 case EMSGSIZE:
1215 return GNUNET_SYSERR; 734 {
735 socklen_t size = 0;
736 socklen_t len = sizeof (size);
737
738 GNUNET_NETWORK_socket_getsockopt (unix_sock,
739 SOL_SOCKET,
740 SO_SNDBUF,
741 &size,
742 &len);
743 if (size > ntohs (msg->size))
744 {
745 /* Buffer is bigger than message: error, no retry
746 * This should never happen!*/
747 GNUNET_break (0);
748 return;
749 }
750 LOG (GNUNET_ERROR_TYPE_DEBUG,
751 "Trying to increase socket buffer size from %u to %u for message size %u\n",
752 (unsigned int) size,
753 (unsigned int) m((msg_size / 1000) + 2) * 1000,
754 (unsigned int) msg_size);
755 size = ((msg_size / 1000) + 2) * 1000;
756 if (GNUNET_OK ==
757 GNUNET_NETWORK_socket_setsockopt (unix_sock,
758 SOL_SOCKET,
759 SO_SNDBUF,
760 &size,
761 sizeof (size)))
762 goto resend; /* Increased buffer size, retry sending */
763 /* Ok, then just try very modest increase */
764 size = msg_size;
765 if (GNUNET_OK ==
766 GNUNET_NETWORK_socket_setsockopt (unix_sock,
767 SOL_SOCKET,
768 SO_SNDBUF,
769 &size,
770 sizeof (size)))
771 goto resend; /* Increased buffer size, retry sending */
772 /* Could not increase buffer size: error, no retry */
773 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
774 "setsockopt");
775 return;
776 }
777 default:
778 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
779 "send");
780 return;
1216 } 781 }
1217 LOG (GNUNET_ERROR_TYPE_DEBUG,
1218 "Sending %u bytes with queue for peer `%s' `%s'\n",
1219 msgbuf_size,
1220 GNUNET_i2s (&queue->target),
1221 unix_plugin_address_to_string (NULL,
1222 queue->address->address,
1223 queue->address->address_length));
1224 ssize = sizeof (struct UNIXMessage) + msgbuf_size;
1225 message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size);
1226 message->header.size = htons (ssize);
1227 message->header.type = htons (0);
1228 GNUNET_memcpy (&message->sender, plugin->env->my_identity,
1229 sizeof (struct GNUNET_PeerIdentity));
1230 GNUNET_memcpy (&message[1], msgbuf, msgbuf_size);
1231 wrapper = GNUNET_new (struct UNIXMessageWrapper);
1232 wrapper->msg = message;
1233 wrapper->msgsize = ssize;
1234 wrapper->payload = msgbuf_size;
1235 wrapper->priority = priority;
1236 wrapper->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
1237 to);
1238 wrapper->cont = cont;
1239 wrapper->cont_cls = cont_cls;
1240 wrapper->queue = queue;
1241 GNUNET_CONTAINER_DLL_insert_tail (plugin->msg_head,
1242 plugin->msg_tail,
1243 wrapper);
1244 plugin->bytes_in_queue += ssize;
1245 queue->bytes_in_queue += ssize;
1246 queue->msgs_in_queue++;
1247 GNUNET_STATISTICS_set (plugin->env->stats,
1248 "# bytes currently in UNIX buffers",
1249 plugin->bytes_in_queue,
1250 GNUNET_NO);
1251 notify_queue_monitor (plugin,
1252 queue,
1253 GNUNET_TRANSPORT_SS_UPDATE);
1254 if (NULL == plugin->write_task)
1255 plugin->write_task =
1256 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
1257 unix_sock,
1258 &unix_plugin_select_write, plugin);
1259 return ssize;
1260} 782}
1261 783
1262 784
1263/** 785/**
1264 * Signature of functions implementing the 786 * Signature of functions implementing the sending functionality of a
1265 * sending functionality of a message queue. 787 * message queue.
1266 * 788 *
1267 * @param mq the message queue 789 * @param mq the message queue
1268 * @param msg the message to send 790 * @param msg the message to send
1269 * @param impl_state state of the implementation 791 * @param impl_state our `struct Queue`
1270 */ 792 */
1271static void 793static void
1272mq_send (struct GNUNET_MQ_Handle *mq, 794mq_send (struct GNUNET_MQ_Handle *mq,
1273 const struct GNUNET_MessageHeader *msg, 795 const struct GNUNET_MessageHeader *msg,
1274 void *impl_state) 796 void *impl_state)
1275{ 797{
798 struct Queue *queue = impl_state;
799
800 GNUNET_assert (mq == queue->mq);
801 GNUNET_assert (NULL == queue->msg);
802 queue->msg = msg;
803 GNUNET_CONTAINER_DLL_insert (queue_head,
804 queue_tail,
805 queue);
806 if (NULL == write_task)
807 write_task =
808 GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
809 unix_sock,
810 &select_write_cb,
811 NULL);
1276} 812}
1277 813
1278 814
1279/** 815/**
1280 * Signature of functions implementing the 816 * Signature of functions implementing the destruction of a message
1281 * destruction of a message queue. 817 * queue. Implementations must not free @a mq, but should take care
1282 * Implementations must not free @a mq, but should 818 * of @a impl_state.
1283 * take care of @a impl_state.
1284 * 819 *
1285 * @param mq the message queue to destroy 820 * @param mq the message queue to destroy
1286 * @param impl_state state of the implementation 821 * @param impl_state our `struct Queue`
1287 */ 822 */
1288static void 823static void
1289mq_destroy (struct GNUNET_MQ_Handle *mq, 824mq_destroy (struct GNUNET_MQ_Handle *mq,
1290 void *impl_state) 825 void *impl_state)
1291{ 826{
827 struct Queue *queue = impl_state;
828
829 if (mq == queue->mq)
830 {
831 queue->mq = NULL;
832 queue_destroy (queue);
833 }
1292} 834}
1293 835
1294 836
@@ -1296,12 +838,15 @@ mq_destroy (struct GNUNET_MQ_Handle *mq,
1296 * Implementation function that cancels the currently sent message. 838 * Implementation function that cancels the currently sent message.
1297 * 839 *
1298 * @param mq message queue 840 * @param mq message queue
1299 * @param impl_state state specific to the implementation 841 * @param impl_state our `struct Queue`
1300 */ 842 */
1301static void 843static void
1302mq_cancel (struct GNUNET_MQ_Handle *mq, 844mq_cancel (struct GNUNET_MQ_Handle *mq,
1303 void *impl_state) 845 void *impl_state)
1304{ 846{
847 struct Queue *queue = impl_state;
848
849 // FIXME: TBD!
1305} 850}
1306 851
1307 852
@@ -1311,15 +856,17 @@ mq_cancel (struct GNUNET_MQ_Handle *mq,
1311 * the message queue. 856 * the message queue.
1312 * Not every message queue implementation supports an error handler. 857 * Not every message queue implementation supports an error handler.
1313 * 858 *
1314 * @param cls closure 859 * @param cls our `struct Queue`
1315 * @param error error code 860 * @param error error code
1316 */ 861 */
1317static void 862static void
1318mq_error (void *cls, 863mq_error (void *cls,
1319 enum GNUNET_MQ_Error error) 864 enum GNUNET_MQ_Error error)
1320{ 865{
1321} 866 struct Queue *queue = cls;
1322 867
868 // FIXME: TBD!
869}
1323 870
1324 871
1325/** 872/**
@@ -1470,7 +1017,6 @@ do_shutdown (void *cls)
1470 GNUNET_TRANSPORT_communicator_disconnect (ch); 1017 GNUNET_TRANSPORT_communicator_disconnect (ch);
1471 ch = NULL; 1018 ch = NULL;
1472 } 1019 }
1473 GNUNET_break (0 == bytes_in_queue);
1474} 1020}
1475 1021
1476 1022
@@ -1497,7 +1043,7 @@ run (void *cls,
1497 1043
1498 if (GNUNET_OK != 1044 if (GNUNET_OK !=
1499 GNUNET_CONFIGURATION_get_value_filename (cfg, 1045 GNUNET_CONFIGURATION_get_value_filename (cfg,
1500 "transport-unix", 1046 "communicator-unix",
1501 "UNIXPATH", 1047 "UNIXPATH",
1502 &unix_socket_path)) 1048 &unix_socket_path))
1503 { 1049 {
@@ -1506,7 +1052,14 @@ run (void *cls,
1506 "UNIXPATH"); 1052 "UNIXPATH");
1507 return; 1053 return;
1508 } 1054 }
1509 1055 if (GNUNET_OK !=
1056 GNUNET_CONFIGURATION_get_value_number (cfg,
1057 "communicator-unix",
1058 "MAX_QUEUE_LENGTH",
1059 &max_queue_length))
1060 max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
1061
1062
1510 /* Initialize my flags */ 1063 /* Initialize my flags */
1511 is_abstract = 0; 1064 is_abstract = 0;
1512#ifdef LINUX 1065#ifdef LINUX
@@ -1571,7 +1124,7 @@ run (void *cls,
1571 queue_map = GNUNET_CONTAINER_multipeermap_create (10, 1124 queue_map = GNUNET_CONTAINER_multipeermap_create (10,
1572 GNUNET_NO); 1125 GNUNET_NO);
1573 ch = GNUNET_TRANSPORT_communicator_connect (cfg, 1126 ch = GNUNET_TRANSPORT_communicator_connect (cfg,
1574 "unix", 1127 COMMUNICATOR_NAME,
1575 65535, 1128 65535,
1576 &mq_init, 1129 &mq_init,
1577 NULL); 1130 NULL);
@@ -1587,13 +1140,16 @@ run (void *cls,
1587 COMMUNICATOR_NAME, 1140 COMMUNICATOR_NAME,
1588 unix_socket_path, 1141 unix_socket_path,
1589 is_abstract); 1142 is_abstract);
1590
1591 ai = GNUNET_TRANSPORT_communicator_address_add (ch, 1143 ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1592 my_addr, 1144 my_addr,
1593 GNUNET_ATS_NET_LOOPBACK, 1145 GNUNET_ATS_NET_LOOPBACK,
1594 GNUNET_TIME_UNIT_FOREVER_REL); 1146 GNUNET_TIME_UNIT_FOREVER_REL);
1595 GNUNET_free (my_addr); 1147 GNUNET_free (my_addr);
1596 GNUNET_free (unix_socket_path); 1148 GNUNET_free (unix_socket_path);
1149 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1150 unix_sock,
1151 &select_read_cb,
1152 NULL);
1597} 1153}
1598 1154
1599 1155
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
index e5be53150..434138e19 100644
--- a/src/transport/transport_api2_communication.c
+++ b/src/transport/transport_api2_communication.c
@@ -729,6 +729,7 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
729 handlers, 729 handlers,
730 &error_handler, 730 &error_handler,
731 ch); 731 ch);
732 // FIXME: must notify transport that we are responsible for 'ch->name' addresses!!!
732 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; 733 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
733 NULL != ai; 734 NULL != ai;
734 ai = ai->next) 735 ai = ai->next)