diff options
author | Christian Grothoff <christian@grothoff.org> | 2018-11-08 11:32:03 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2018-11-08 11:32:03 +0100 |
commit | a18d1f2587ca5df5a9c6e47c012bfbaf3f19098c (patch) | |
tree | 890025fef864f380c9d2ca40599f52e0e60b1c14 /src/transport/gnunet-communicator-unix.c | |
parent | 49b581dd1c00d769e97031c51b5865846e802f8f (diff) | |
download | gnunet-a18d1f2587ca5df5a9c6e47c012bfbaf3f19098c.tar.gz gnunet-a18d1f2587ca5df5a9c6e47c012bfbaf3f19098c.zip |
work on UNIX communicator
Diffstat (limited to 'src/transport/gnunet-communicator-unix.c')
-rw-r--r-- | src/transport/gnunet-communicator-unix.c | 1072 |
1 files changed, 314 insertions, 758 deletions
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c index 373b74149..f07975186 100644 --- a/src/transport/gnunet-communicator-unix.c +++ b/src/transport/gnunet-communicator-unix.c | |||
@@ -31,6 +31,16 @@ | |||
31 | #include "gnunet_transport_communication_service.h" | 31 | #include "gnunet_transport_communication_service.h" |
32 | 32 | ||
33 | /** | 33 | /** |
34 | * How many messages do we keep at most in the queue to the | ||
35 | * transport service before we start to drop (default, | ||
36 | * can be changed via the configuration file). | ||
37 | * Should be _below_ the level of the communicator API, as | ||
38 | * otherwise we may read messages just to have them dropped | ||
39 | * by the communicator API. | ||
40 | */ | ||
41 | #define DEFAULT_MAX_QUEUE_LENGTH 8 | ||
42 | |||
43 | /** | ||
34 | * Name of the communicator. | 44 | * Name of the communicator. |
35 | */ | 45 | */ |
36 | #define COMMUNICATOR_NAME "unix" | 46 | #define COMMUNICATOR_NAME "unix" |
@@ -59,63 +69,6 @@ GNUNET_NETWORK_STRUCT_END | |||
59 | 69 | ||
60 | 70 | ||
61 | /** | 71 | /** |
62 | * Information we track for a message awaiting transmission. | ||
63 | */ | ||
64 | struct UNIXMessageWrapper | ||
65 | { | ||
66 | /** | ||
67 | * We keep messages in a doubly linked list. | ||
68 | */ | ||
69 | struct UNIXMessageWrapper *next; | ||
70 | |||
71 | /** | ||
72 | * We keep messages in a doubly linked list. | ||
73 | */ | ||
74 | struct UNIXMessageWrapper *prev; | ||
75 | |||
76 | /** | ||
77 | * The actual payload (allocated separately right now). | ||
78 | */ | ||
79 | struct UNIXMessage *msg; | ||
80 | |||
81 | /** | ||
82 | * Queue this message belongs to. | ||
83 | */ | ||
84 | struct Queue *queue; | ||
85 | |||
86 | /** | ||
87 | * Function to call upon transmission. | ||
88 | */ | ||
89 | GNUNET_TRANSPORT_TransmitContinuation cont; | ||
90 | |||
91 | /** | ||
92 | * Closure for @e cont. | ||
93 | */ | ||
94 | void *cont_cls; | ||
95 | |||
96 | /** | ||
97 | * Timeout for this message. | ||
98 | */ | ||
99 | struct GNUNET_TIME_Absolute timeout; | ||
100 | |||
101 | /** | ||
102 | * Number of bytes in @e msg. | ||
103 | */ | ||
104 | size_t msgsize; | ||
105 | |||
106 | /** | ||
107 | * Number of bytes of payload encapsulated in @e msg. | ||
108 | */ | ||
109 | size_t payload; | ||
110 | |||
111 | /** | ||
112 | * Priority of the message (ignored, just dragged along in UNIX). | ||
113 | */ | ||
114 | unsigned int priority; | ||
115 | }; | ||
116 | |||
117 | |||
118 | /** | ||
119 | * Handle for a queue. | 72 | * Handle for a queue. |
120 | */ | 73 | */ |
121 | struct Queue | 74 | struct Queue |
@@ -132,10 +85,7 @@ struct Queue | |||
132 | struct Queue *prev; | 85 | struct Queue *prev; |
133 | 86 | ||
134 | /** | 87 | /** |
135 | * To whom are we talking to (set to our identity | 88 | * To whom are we talking to. |
136 | * if we are still waiting for the welcome message). | ||
137 | * | ||
138 | * FIXME: information duplicated with 'peer' in address! | ||
139 | */ | 89 | */ |
140 | struct GNUNET_PeerIdentity target; | 90 | struct GNUNET_PeerIdentity target; |
141 | 91 | ||
@@ -150,6 +100,12 @@ struct Queue | |||
150 | socklen_t address_len; | 100 | socklen_t address_len; |
151 | 101 | ||
152 | /** | 102 | /** |
103 | * Message currently scheduled for transmission, non-NULL if and only | ||
104 | * if this queue is in the #queue_head DLL. | ||
105 | */ | ||
106 | const struct GNUNET_MessageHeader *msg; | ||
107 | |||
108 | /** | ||
153 | * Message queue we are providing for the #ch. | 109 | * Message queue we are providing for the #ch. |
154 | */ | 110 | */ |
155 | struct GNUNET_MQ_Handle *mq; | 111 | struct GNUNET_MQ_Handle *mq; |
@@ -172,17 +128,11 @@ struct Queue | |||
172 | /** | 128 | /** |
173 | * Queue timeout task. | 129 | * Queue timeout task. |
174 | */ | 130 | */ |
175 | struct GNUNET_SCHEDULER_Task * timeout_task; | 131 | struct GNUNET_SCHEDULER_Task *timeout_task; |
176 | |||
177 | /** | ||
178 | * Number of messages we currently have in our write queue. | ||
179 | */ | ||
180 | unsigned int msgs_in_queue; | ||
181 | 132 | ||
182 | }; | 133 | }; |
183 | 134 | ||
184 | 135 | ||
185 | |||
186 | /** | 136 | /** |
187 | * ID of read task | 137 | * ID of read task |
188 | */ | 138 | */ |
@@ -194,9 +144,14 @@ static struct GNUNET_SCHEDULER_Task *read_task; | |||
194 | static struct GNUNET_SCHEDULER_Task *write_task; | 144 | static struct GNUNET_SCHEDULER_Task *write_task; |
195 | 145 | ||
196 | /** | 146 | /** |
197 | * Number of bytes we currently have in our write queues. | 147 | * Number of messages we currently have in our queues towards the transport service. |
198 | */ | 148 | */ |
199 | static unsigned long long bytes_in_queue; | 149 | static unsigned long long delivering_messages; |
150 | |||
151 | /** | ||
152 | * Maximum queue length before we stop reading towards the transport service. | ||
153 | */ | ||
154 | static unsigned long long max_queue_length; | ||
200 | 155 | ||
201 | /** | 156 | /** |
202 | * Our environment. | 157 | * Our environment. |
@@ -211,12 +166,12 @@ static struct GNUNET_CONTAINER_MultiPeerMap *queue_map; | |||
211 | /** | 166 | /** |
212 | * Head of queue of messages to transmit. | 167 | * Head of queue of messages to transmit. |
213 | */ | 168 | */ |
214 | static struct UNIXMessageWrapper *msg_head; | 169 | static struct Queue *queue_head; |
215 | 170 | ||
216 | /** | 171 | /** |
217 | * Tail of queue of messages to transmit. | 172 | * Tail of queue of messages to transmit. |
218 | */ | 173 | */ |
219 | static struct UNIXMessageWrapper *msg_tail; | 174 | static struct Queue *queue_tail; |
220 | 175 | ||
221 | /** | 176 | /** |
222 | * socket that we transmit all data with | 177 | * socket that we transmit all data with |
@@ -230,101 +185,6 @@ static struct GNUNET_TRANSPORT_AddressIdentifier *ai; | |||
230 | 185 | ||
231 | 186 | ||
232 | /** | 187 | /** |
233 | * If a queue monitor is attached, notify it about the new | ||
234 | * queue state. | ||
235 | * | ||
236 | * @param plugin our plugin | ||
237 | * @param queue queue that changed state | ||
238 | * @param state new state of the queue | ||
239 | */ | ||
240 | static void | ||
241 | notify_queue_monitor (struct Plugin *plugin, | ||
242 | struct Queue *queue, | ||
243 | enum GNUNET_TRANSPORT_QueueState state) | ||
244 | { | ||
245 | struct GNUNET_TRANSPORT_QueueInfo info; | ||
246 | |||
247 | if (NULL == plugin->sic) | ||
248 | return; | ||
249 | memset (&info, 0, sizeof (info)); | ||
250 | info.state = state; | ||
251 | info.is_inbound = GNUNET_SYSERR; /* hard to say */ | ||
252 | info.num_msg_pending = queue->msgs_in_queue; | ||
253 | info.num_bytes_pending = queue->bytes_in_queue; | ||
254 | /* info.receive_delay remains zero as this is not supported by UNIX | ||
255 | (cannot selectively not receive from 'some' peer while continuing | ||
256 | to receive from others) */ | ||
257 | info.queue_timeout = queue->timeout; | ||
258 | info.address = queue->address; | ||
259 | plugin->sic (plugin->sic_cls, | ||
260 | queue, | ||
261 | &info); | ||
262 | } | ||
263 | |||
264 | |||
265 | /** | ||
266 | * Function called for a quick conversion of the binary address to | ||
267 | * a numeric address. Note that the caller must not free the | ||
268 | * address and that the next call to this function is allowed | ||
269 | * to override the address again. | ||
270 | * | ||
271 | * @param cls closure | ||
272 | * @param addr binary address | ||
273 | * @param addrlen length of the @a addr | ||
274 | * @return string representing the same address | ||
275 | */ | ||
276 | static const char * | ||
277 | unix_plugin_address_to_string (void *cls, | ||
278 | const void *addr, | ||
279 | size_t addrlen) | ||
280 | { | ||
281 | static char rbuf[1024]; | ||
282 | struct UnixAddress *ua = (struct UnixAddress *) addr; | ||
283 | char *addrstr; | ||
284 | size_t addr_str_len; | ||
285 | unsigned int off; | ||
286 | |||
287 | if ((NULL == addr) || (sizeof (struct UnixAddress) > addrlen)) | ||
288 | { | ||
289 | GNUNET_break(0); | ||
290 | return NULL; | ||
291 | } | ||
292 | addrstr = (char *) &ua[1]; | ||
293 | addr_str_len = ntohl (ua->addrlen); | ||
294 | |||
295 | if (addr_str_len != addrlen - sizeof(struct UnixAddress)) | ||
296 | { | ||
297 | GNUNET_break(0); | ||
298 | return NULL; | ||
299 | } | ||
300 | if ('\0' != addrstr[addr_str_len - 1]) | ||
301 | { | ||
302 | GNUNET_break(0); | ||
303 | return NULL; | ||
304 | } | ||
305 | if (strlen (addrstr) + 1 != addr_str_len) | ||
306 | { | ||
307 | GNUNET_break(0); | ||
308 | return NULL; | ||
309 | } | ||
310 | |||
311 | off = 0; | ||
312 | if ('\0' == addrstr[0]) | ||
313 | off++; | ||
314 | memset (rbuf, 0, sizeof (rbuf)); | ||
315 | GNUNET_snprintf (rbuf, | ||
316 | sizeof (rbuf) - 1, | ||
317 | "%s.%u.%s%.*s", | ||
318 | PLUGIN_NAME, | ||
319 | ntohl (ua->options), | ||
320 | (off == 1) ? "@" : "", | ||
321 | (int) (addr_str_len - off), | ||
322 | &addrstr[off]); | ||
323 | return rbuf; | ||
324 | } | ||
325 | |||
326 | |||
327 | /** | ||
328 | * Functions with this signature are called whenever we need | 188 | * Functions with this signature are called whenever we need |
329 | * to close a queue due to a disconnect or failure to | 189 | * to close a queue due to a disconnect or failure to |
330 | * establish a connection. | 190 | * establish a connection. |
@@ -332,58 +192,40 @@ unix_plugin_address_to_string (void *cls, | |||
332 | * @param queue queue to close down | 192 | * @param queue queue to close down |
333 | */ | 193 | */ |
334 | static void | 194 | static void |
335 | unix_plugin_queue_disconnect (struct Queue *queue) | 195 | queue_destroy (struct Queue *queue) |
336 | { | 196 | { |
337 | struct Plugin *plugin = cls; | 197 | struct Plugin *plugin = cls; |
338 | struct UNIXMessageWrapper *msgw; | 198 | struct GNUNET_MQ_Handle *mq; |
339 | struct UNIXMessageWrapper *next; | ||
340 | 199 | ||
341 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 200 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
342 | "Disconnecting queue for peer `%s'\n", | 201 | "Disconnecting queue for peer `%s'\n", |
343 | GNUNET_i2s (&queue->target)); | 202 | GNUNET_i2s (&queue->target)); |
344 | plugin->env->queue_end (plugin->env->cls, | 203 | if (0 != queue->bytes_in_queue) |
345 | queue->address, | ||
346 | queue); | ||
347 | next = plugin->msg_head; | ||
348 | while (NULL != next) | ||
349 | { | 204 | { |
350 | msgw = next; | 205 | GNUNET_CONTAINER_DLL_remove (queue_head, |
351 | next = msgw->next; | 206 | queue_tail, |
352 | if (msgw->queue != queue) | 207 | queue); |
353 | continue; | 208 | queue->bytes_in_queue = 0; |
354 | GNUNET_CONTAINER_DLL_remove (plugin->msg_head, | 209 | } |
355 | plugin->msg_tail, | 210 | if (NULL != (mq = queue->mq)) |
356 | msgw); | 211 | { |
357 | queue->msgs_in_queue--; | 212 | queue->mq = NULL; |
358 | GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize); | 213 | GNUNET_MQ_destroy (mq); |
359 | queue->bytes_in_queue -= msgw->msgsize; | ||
360 | GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); | ||
361 | plugin->bytes_in_queue -= msgw->msgsize; | ||
362 | if (NULL != msgw->cont) | ||
363 | msgw->cont (msgw->cont_cls, | ||
364 | &msgw->queue->target, | ||
365 | GNUNET_SYSERR, | ||
366 | msgw->payload, 0); | ||
367 | GNUNET_free (msgw->msg); | ||
368 | GNUNET_free (msgw); | ||
369 | } | 214 | } |
370 | GNUNET_assert (GNUNET_YES == | 215 | GNUNET_assert (GNUNET_YES == |
371 | GNUNET_CONTAINER_multipeermap_remove (plugin->queue_map, | 216 | GNUNET_CONTAINER_multipeermap_remove (queue_map, |
372 | &queue->target, | 217 | &queue->target, |
373 | queue)); | 218 | queue)); |
374 | GNUNET_STATISTICS_set (stats, | 219 | GNUNET_STATISTICS_set (stats, |
375 | "# UNIX queues active", | 220 | "# UNIX queues active", |
376 | GNUNET_CONTAINER_multipeermap_size (plugin->queue_map), | 221 | GNUNET_CONTAINER_multipeermap_size (queue_map), |
377 | GNUNET_NO); | 222 | GNUNET_NO); |
378 | if (NULL != queue->timeout_task) | 223 | if (NULL != queue->timeout_task) |
379 | { | 224 | { |
380 | GNUNET_SCHEDULER_cancel (queue->timeout_task); | 225 | GNUNET_SCHEDULER_cancel (queue->timeout_task); |
381 | queue->timeout_task = NULL; | 226 | queue->timeout_task = NULL; |
382 | queue->timeout = GNUNET_TIME_UNIT_ZERO_ABS; | ||
383 | } | 227 | } |
384 | GNUNET_free (queue->address); | 228 | GNUNET_free (queue->address); |
385 | GNUNET_break (0 == queue->bytes_in_queue); | ||
386 | GNUNET_break (0 == queue->msgs_in_queue); | ||
387 | GNUNET_free (queue); | 229 | GNUNET_free (queue); |
388 | } | 230 | } |
389 | 231 | ||
@@ -416,7 +258,7 @@ queue_timeout (void *cls) | |||
416 | queue, | 258 | queue, |
417 | GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | 259 | GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, |
418 | GNUNET_YES)); | 260 | GNUNET_YES)); |
419 | unix_plugin_queue_disconnect (queue); | 261 | queue_destroy (queue); |
420 | } | 262 | } |
421 | 263 | ||
422 | 264 | ||
@@ -458,7 +300,9 @@ unix_address_to_sockaddr (const char *unixpath, | |||
458 | slen = strlen (unixpath); | 300 | slen = strlen (unixpath); |
459 | if (slen >= sizeof (un->sun_path)) | 301 | if (slen >= sizeof (un->sun_path)) |
460 | slen = sizeof (un->sun_path) - 1; | 302 | slen = sizeof (un->sun_path) - 1; |
461 | GNUNET_memcpy (un->sun_path, unixpath, slen); | 303 | GNUNET_memcpy (un->sun_path, |
304 | unixpath, | ||
305 | slen); | ||
462 | un->sun_path[slen] = '\0'; | 306 | un->sun_path[slen] = '\0'; |
463 | slen = sizeof (struct sockaddr_un); | 307 | slen = sizeof (struct sockaddr_un); |
464 | #if HAVE_SOCKADDR_UN_SUN_LEN | 308 | #if HAVE_SOCKADDR_UN_SUN_LEN |
@@ -545,182 +389,19 @@ lookup_queue (const struct GNUNET_PeerIdentity *peer, | |||
545 | } | 389 | } |
546 | 390 | ||
547 | 391 | ||
548 | |||
549 | /** | ||
550 | * Actually send out the message, assume we've got the address and | ||
551 | * send_handle squared away! | ||
552 | * | ||
553 | * @param cls closure | ||
554 | * @param send_handle which handle to send message on | ||
555 | * @param target who should receive this message (ignored by UNIX) | ||
556 | * @param msgbuf one or more GNUNET_MessageHeader(s) strung together | ||
557 | * @param msgbuf_size the size of the @a msgbuf to send | ||
558 | * @param priority how important is the message (ignored by UNIX) | ||
559 | * @param timeout when should we time out (give up) if we can not transmit? | ||
560 | * @param addr the addr to send the message to, needs to be a sockaddr for us | ||
561 | * @param addrlen the len of @a addr | ||
562 | * @param payload bytes payload to send | ||
563 | * @param cont continuation to call once the message has | ||
564 | * been transmitted (or if the transport is ready | ||
565 | * for the next transmission call; or if the | ||
566 | * peer disconnected...) | ||
567 | * @param cont_cls closure for @a cont | ||
568 | * @return on success the number of bytes written, RETRY for retry, -1 on errors | ||
569 | */ | ||
570 | static ssize_t | ||
571 | unix_real_send (void *cls, | ||
572 | struct GNUNET_NETWORK_Handle *send_handle, | ||
573 | const struct GNUNET_PeerIdentity *target, | ||
574 | const char *msgbuf, | ||
575 | size_t msgbuf_size, | ||
576 | unsigned int priority, | ||
577 | struct GNUNET_TIME_Absolute timeout, | ||
578 | const struct UnixAddress *addr, | ||
579 | size_t addrlen, | ||
580 | size_t payload, | ||
581 | GNUNET_TRANSPORT_TransmitContinuation cont, | ||
582 | void *cont_cls) | ||
583 | { | ||
584 | struct Plugin *plugin = cls; | ||
585 | ssize_t sent; | ||
586 | struct sockaddr_un *un; | ||
587 | socklen_t un_len; | ||
588 | const char *unixpath; | ||
589 | |||
590 | if (NULL == send_handle) | ||
591 | { | ||
592 | GNUNET_break (0); /* We do not have a send handle */ | ||
593 | return GNUNET_SYSERR; | ||
594 | } | ||
595 | if ((NULL == addr) || (0 == addrlen)) | ||
596 | { | ||
597 | GNUNET_break (0); /* Can never send if we don't have an address */ | ||
598 | return GNUNET_SYSERR; | ||
599 | } | ||
600 | |||
601 | /* Prepare address */ | ||
602 | unixpath = (const char *) &addr[1]; | ||
603 | if (NULL == (un = unix_address_to_sockaddr (unixpath, | ||
604 | &un_len))) | ||
605 | { | ||
606 | GNUNET_break (0); | ||
607 | return -1; | ||
608 | } | ||
609 | |||
610 | if ((GNUNET_YES == plugin->is_abstract) && | ||
611 | (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & ntohl(addr->options) )) ) | ||
612 | { | ||
613 | un->sun_path[0] = '\0'; | ||
614 | } | ||
615 | resend: | ||
616 | /* Send the data */ | ||
617 | sent = GNUNET_NETWORK_socket_sendto (send_handle, | ||
618 | msgbuf, | ||
619 | msgbuf_size, | ||
620 | (const struct sockaddr *) un, | ||
621 | un_len); | ||
622 | if (GNUNET_SYSERR == sent) | ||
623 | { | ||
624 | if ( (EAGAIN == errno) || | ||
625 | (ENOBUFS == errno) ) | ||
626 | { | ||
627 | GNUNET_free (un); | ||
628 | return RETRY; /* We have to retry later */ | ||
629 | } | ||
630 | if (EMSGSIZE == errno) | ||
631 | { | ||
632 | socklen_t size = 0; | ||
633 | socklen_t len = sizeof (size); | ||
634 | |||
635 | GNUNET_NETWORK_socket_getsockopt ((struct GNUNET_NETWORK_Handle *) | ||
636 | send_handle, SOL_SOCKET, SO_SNDBUF, &size, | ||
637 | &len); | ||
638 | if (size < msgbuf_size) | ||
639 | { | ||
640 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
641 | "Trying to increase socket buffer size from %u to %u for message size %u\n", | ||
642 | (unsigned int) size, | ||
643 | (unsigned int) ((msgbuf_size / 1000) + 2) * 1000, | ||
644 | (unsigned int) msgbuf_size); | ||
645 | size = ((msgbuf_size / 1000) + 2) * 1000; | ||
646 | if (GNUNET_OK == | ||
647 | GNUNET_NETWORK_socket_setsockopt ((struct GNUNET_NETWORK_Handle *) send_handle, | ||
648 | SOL_SOCKET, SO_SNDBUF, | ||
649 | &size, sizeof (size))) | ||
650 | goto resend; /* Increased buffer size, retry sending */ | ||
651 | else | ||
652 | { | ||
653 | /* Could not increase buffer size: error, no retry */ | ||
654 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt"); | ||
655 | GNUNET_free (un); | ||
656 | return GNUNET_SYSERR; | ||
657 | } | ||
658 | } | ||
659 | else | ||
660 | { | ||
661 | /* Buffer is bigger than message: error, no retry | ||
662 | * This should never happen!*/ | ||
663 | GNUNET_break (0); | ||
664 | GNUNET_free (un); | ||
665 | return GNUNET_SYSERR; | ||
666 | } | ||
667 | } | ||
668 | } | ||
669 | |||
670 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
671 | "UNIX transmitted %u-byte message to %s (%d: %s)\n", | ||
672 | (unsigned int) msgbuf_size, | ||
673 | GNUNET_a2s ((const struct sockaddr *)un, un_len), | ||
674 | (int) sent, | ||
675 | (sent < 0) ? STRERROR (errno) : "ok"); | ||
676 | GNUNET_free (un); | ||
677 | return sent; | ||
678 | } | ||
679 | |||
680 | |||
681 | /** | 392 | /** |
682 | * Function obtain the network type for a queue | 393 | * Creates a new outbound queue the transport service will use to send |
394 | * data to another peer. | ||
683 | * | 395 | * |
684 | * @param cls closure ('struct Plugin*') | 396 | * @param peer the target peer |
685 | * @param queue the queue | 397 | * @param un the address |
686 | * @return the network type in HBO or #GNUNET_SYSERR | 398 | * @param un_len number of bytes in @a un |
687 | */ | ||
688 | static enum GNUNET_ATS_Network_Type | ||
689 | unix_plugin_get_network (void *cls, | ||
690 | struct Queue *queue) | ||
691 | { | ||
692 | GNUNET_assert (NULL != queue); | ||
693 | return GNUNET_ATS_NET_LOOPBACK; | ||
694 | } | ||
695 | |||
696 | |||
697 | /** | ||
698 | * Function obtain the network type for a queue | ||
699 | * | ||
700 | * @param cls closure (`struct Plugin *`) | ||
701 | * @param address the address | ||
702 | * @return the network type | ||
703 | */ | ||
704 | static enum GNUNET_ATS_Network_Type | ||
705 | unix_plugin_get_network_for_address (void *cls, | ||
706 | const struct GNUNET_HELLO_Address *address) | ||
707 | |||
708 | { | ||
709 | return GNUNET_ATS_NET_LOOPBACK; | ||
710 | } | ||
711 | |||
712 | |||
713 | /** | ||
714 | * Creates a new outbound queue the transport service will use to send data to the | ||
715 | * peer | ||
716 | * | ||
717 | * @param cls the plugin | ||
718 | * @param address the address | ||
719 | * @return the queue or NULL of max connections exceeded | 399 | * @return the queue or NULL of max connections exceeded |
720 | */ | 400 | */ |
721 | static struct Queue * | 401 | static struct Queue * |
722 | unix_plugin_get_queue (void *cls, | 402 | unix_plugin_get_queue (const struct GNUNET_PeerIdentity *target, |
723 | const struct GNUNET_HELLO_Address *address) | 403 | const struct sockaddr_un *un, |
404 | socklen_t un_len) | ||
724 | { | 405 | { |
725 | struct Plugin *plugin = cls; | 406 | struct Plugin *plugin = cls; |
726 | struct Queue *queue; | 407 | struct Queue *queue; |
@@ -728,53 +409,22 @@ unix_plugin_get_queue (void *cls, | |||
728 | char * addrstr; | 409 | char * addrstr; |
729 | uint32_t addr_str_len; | 410 | uint32_t addr_str_len; |
730 | uint32_t addr_option; | 411 | uint32_t addr_option; |
412 | char *foreign_addr; | ||
413 | int is_abstract; | ||
414 | |||
415 | if (is_abstract = ('\0' == un.sun_path[0])) | ||
416 | un.sun_path[0] = '/'; | ||
417 | GNUNET_asprintf (&foreign_addr, | ||
418 | "%s-%s#%d", | ||
419 | COMMUNICATOR_NAME, | ||
420 | un.sun_path, | ||
421 | is_abstract); | ||
422 | |||
731 | 423 | ||
732 | ua = (struct UnixAddress *) address->address; | ||
733 | if ((NULL == address->address) || (0 == address->address_length) || | ||
734 | (sizeof (struct UnixAddress) > address->address_length)) | ||
735 | { | ||
736 | GNUNET_break (0); | ||
737 | return NULL; | ||
738 | } | ||
739 | addrstr = (char *) &ua[1]; | 424 | addrstr = (char *) &ua[1]; |
740 | addr_str_len = ntohl (ua->addrlen); | 425 | addr_str_len = ntohl (ua->addrlen); |
741 | addr_option = ntohl (ua->options); | 426 | addr_option = ntohl (ua->options); |
742 | 427 | ||
743 | if ( (0 != (UNIX_OPTIONS_USE_ABSTRACT_SOCKETS & addr_option)) && | ||
744 | (GNUNET_NO == plugin->is_abstract)) | ||
745 | { | ||
746 | return NULL; | ||
747 | } | ||
748 | |||
749 | if (addr_str_len != address->address_length - sizeof (struct UnixAddress)) | ||
750 | { | ||
751 | return NULL; /* This can be a legacy address */ | ||
752 | } | ||
753 | |||
754 | if ('\0' != addrstr[addr_str_len - 1]) | ||
755 | { | ||
756 | GNUNET_break (0); | ||
757 | return NULL; | ||
758 | } | ||
759 | if (strlen (addrstr) + 1 != addr_str_len) | ||
760 | { | ||
761 | GNUNET_break (0); | ||
762 | return NULL; | ||
763 | } | ||
764 | |||
765 | /* Check if a queue for this address already exists */ | ||
766 | if (NULL != (queue = lookup_queue (plugin, | ||
767 | address))) | ||
768 | { | ||
769 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
770 | "Found existing queue %p for address `%s'\n", | ||
771 | queue, | ||
772 | unix_plugin_address_to_string (NULL, | ||
773 | address->address, | ||
774 | address->address_length)); | ||
775 | return queue; | ||
776 | } | ||
777 | |||
778 | /* create a new queue */ | 428 | /* create a new queue */ |
779 | queue = GNUNET_new (struct Queue); | 429 | queue = GNUNET_new (struct Queue); |
780 | queue->target = address->peer; | 430 | queue->target = address->peer; |
@@ -795,14 +445,8 @@ unix_plugin_get_queue (void *cls, | |||
795 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 445 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
796 | GNUNET_STATISTICS_set (plugin->env->stats, | 446 | GNUNET_STATISTICS_set (plugin->env->stats, |
797 | "# UNIX queues active", | 447 | "# UNIX queues active", |
798 | GNUNET_CONTAINER_multipeermap_size (plugin->queue_map), | 448 | GNUNET_CONTAINER_multipeermap_size (queue_map), |
799 | GNUNET_NO); | 449 | GNUNET_NO); |
800 | notify_queue_monitor (plugin, | ||
801 | queue, | ||
802 | GNUNET_TRANSPORT_SS_INIT); | ||
803 | notify_queue_monitor (plugin, | ||
804 | queue, | ||
805 | GNUNET_TRANSPORT_SS_UP); | ||
806 | return queue; | 450 | return queue; |
807 | } | 451 | } |
808 | 452 | ||
@@ -891,245 +535,146 @@ unix_demultiplexer (struct Plugin *plugin, | |||
891 | 535 | ||
892 | 536 | ||
893 | /** | 537 | /** |
894 | * Read from UNIX domain socket (it is ready). | 538 | * We have been notified that our socket has something to read. Do the |
539 | * read and reschedule this function to be called again once more is | ||
540 | * available. | ||
895 | * | 541 | * |
896 | * @param plugin the plugin | 542 | * @param cls NULL |
543 | */ | ||
544 | static void | ||
545 | select_read_cb (void *cls); | ||
546 | |||
547 | |||
548 | /** | ||
549 | * Function called when message was successfully passed to | ||
550 | * transport service. Continue read activity. | ||
551 | * | ||
552 | * @param cls NULL | ||
553 | */ | ||
554 | static void | ||
555 | receive_complete_cb (void *cls) | ||
556 | { | ||
557 | delivering_messages--; | ||
558 | if ( (NULL == read_task) && | ||
559 | (delivering_messages < max_queue_length) ) | ||
560 | read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
561 | unix_sock, | ||
562 | &select_read_cb, | ||
563 | NULL); | ||
564 | } | ||
565 | |||
566 | |||
567 | /** | ||
568 | * We have been notified that our socket has something to read. Do the | ||
569 | * read and reschedule this function to be called again once more is | ||
570 | * available. | ||
571 | * | ||
572 | * @param cls NULL | ||
897 | */ | 573 | */ |
898 | static void | 574 | static void |
899 | unix_plugin_do_read (struct Plugin *plugin) | 575 | select_read_cb (void *cls) |
900 | { | 576 | { |
901 | char buf[65536] GNUNET_ALIGN; | 577 | char buf[65536] GNUNET_ALIGN; |
902 | struct UnixAddress *ua; | 578 | struct Queue *queue; |
903 | struct UNIXMessage *msg; | 579 | const struct UNIXMessage *msg; |
904 | struct GNUNET_PeerIdentity sender; | ||
905 | struct sockaddr_un un; | 580 | struct sockaddr_un un; |
906 | socklen_t addrlen; | 581 | socklen_t addrlen; |
907 | ssize_t ret; | 582 | ssize_t ret; |
908 | int offset; | 583 | uint16_t msize; |
909 | int tsize; | ||
910 | int is_abstract; | ||
911 | char *msgbuf; | ||
912 | const struct GNUNET_MessageHeader *currhdr; | ||
913 | uint16_t csize; | ||
914 | size_t ua_len; | ||
915 | 584 | ||
585 | read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
586 | unix_sock, | ||
587 | &select_read_cb, | ||
588 | NULL); | ||
916 | addrlen = sizeof (un); | 589 | addrlen = sizeof (un); |
917 | memset (&un, 0, sizeof (un)); | 590 | memset (&un, |
591 | 0, | ||
592 | sizeof (un)); | ||
918 | ret = GNUNET_NETWORK_socket_recvfrom (unix_sock, | 593 | ret = GNUNET_NETWORK_socket_recvfrom (unix_sock, |
919 | buf, sizeof (buf), | 594 | buf, |
595 | sizeof (buf), | ||
920 | (struct sockaddr *) &un, | 596 | (struct sockaddr *) &un, |
921 | &addrlen); | 597 | &addrlen); |
922 | if ((GNUNET_SYSERR == ret) && ((errno == EAGAIN) || (errno == ENOBUFS))) | 598 | if ( (-1 == ret) && |
599 | ( (EAGAIN == errno) || | ||
600 | (ENOBUFS == errno) ) ) | ||
923 | return; | 601 | return; |
924 | if (GNUNET_SYSERR == ret) | 602 | if (-1 == ret) |
925 | { | 603 | { |
926 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, | 604 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, |
927 | "recvfrom"); | 605 | "recvfrom"); |
928 | return; | 606 | return; |
929 | } | 607 | } |
930 | else | 608 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
931 | { | 609 | "Read %d bytes from socket %s\n", |
932 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 610 | (int) ret, |
933 | "Read %d bytes from socket %s\n", | 611 | un.sun_path); |
934 | (int) ret, | ||
935 | un.sun_path); | ||
936 | } | ||
937 | |||
938 | GNUNET_assert (AF_UNIX == (un.sun_family)); | 612 | GNUNET_assert (AF_UNIX == (un.sun_family)); |
939 | is_abstract = GNUNET_NO; | ||
940 | if ('\0' == un.sun_path[0]) | ||
941 | { | ||
942 | un.sun_path[0] = '@'; | ||
943 | is_abstract = GNUNET_YES; | ||
944 | } | ||
945 | |||
946 | ua_len = sizeof (struct UnixAddress) + strlen (un.sun_path) + 1; | ||
947 | ua = GNUNET_malloc (ua_len); | ||
948 | ua->addrlen = htonl (strlen (&un.sun_path[0]) +1); | ||
949 | GNUNET_memcpy (&ua[1], &un.sun_path[0], strlen (un.sun_path) + 1); | ||
950 | if (is_abstract) | ||
951 | ua->options = htonl(UNIX_OPTIONS_USE_ABSTRACT_SOCKETS); | ||
952 | else | ||
953 | ua->options = htonl(UNIX_OPTIONS_NONE); | ||
954 | |||
955 | msg = (struct UNIXMessage *) buf; | 613 | msg = (struct UNIXMessage *) buf; |
956 | csize = ntohs (msg->header.size); | 614 | msize = ntohs (msg->header.size); |
957 | if ((csize < sizeof (struct UNIXMessage)) || (csize > ret)) | 615 | if ( (msize < sizeof (struct UNIXMessage)) || |
616 | (msize > ret) ) | ||
958 | { | 617 | { |
959 | GNUNET_break_op (0); | 618 | GNUNET_break_op (0); |
960 | GNUNET_free (ua); | ||
961 | return; | 619 | return; |
962 | } | 620 | } |
963 | msgbuf = (char *) &msg[1]; | 621 | queue = lookup_queue (&msg->sender, |
964 | GNUNET_memcpy (&sender, | 622 | un, |
965 | &msg->sender, | 623 | addrlen); |
966 | sizeof (struct GNUNET_PeerIdentity)); | 624 | if (NULL == queue) |
967 | offset = 0; | 625 | queue = setup_queue (&msg->sender, |
968 | tsize = csize - sizeof (struct UNIXMessage); | 626 | un, |
969 | while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) | 627 | addrlen); |
628 | if (NULL == queue) | ||
970 | { | 629 | { |
971 | currhdr = (struct GNUNET_MessageHeader *) &msgbuf[offset]; | 630 | GNUENT_log (GNUNET_ERROR_TYPE_ERROR, |
972 | csize = ntohs (currhdr->size); | 631 | _("Maximum number of UNIX connections exceeded, dropping incoming message\n")); |
973 | if ((csize < sizeof (struct GNUNET_MessageHeader)) || | 632 | return; |
974 | (csize > tsize - offset)) | ||
975 | { | ||
976 | GNUNET_break_op (0); | ||
977 | break; | ||
978 | } | ||
979 | unix_demultiplexer (plugin, &sender, currhdr, ua, ua_len); | ||
980 | offset += csize; | ||
981 | } | 633 | } |
982 | GNUNET_free (ua); | 634 | |
983 | } | ||
984 | |||
985 | |||
986 | /** | ||
987 | * Write to UNIX domain socket (it is ready). | ||
988 | * | ||
989 | * @param plugin handle to the plugin | ||
990 | */ | ||
991 | static void | ||
992 | unix_plugin_do_write (struct Plugin *plugin) | ||
993 | { | ||
994 | ssize_t sent = 0; | ||
995 | struct UNIXMessageWrapper *msgw; | ||
996 | struct Queue *queue; | ||
997 | int did_delete; | ||
998 | 635 | ||
999 | queue = NULL; | ||
1000 | did_delete = GNUNET_NO; | ||
1001 | while (NULL != (msgw = plugin->msg_head)) | ||
1002 | { | ||
1003 | if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0) | ||
1004 | break; /* Message is ready for sending */ | ||
1005 | /* Message has a timeout */ | ||
1006 | did_delete = GNUNET_YES; | ||
1007 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1008 | "Timeout for message with %u bytes \n", | ||
1009 | (unsigned int) msgw->msgsize); | ||
1010 | GNUNET_CONTAINER_DLL_remove (plugin->msg_head, | ||
1011 | plugin->msg_tail, | ||
1012 | msgw); | ||
1013 | queue = msgw->queue; | ||
1014 | queue->msgs_in_queue--; | ||
1015 | GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize); | ||
1016 | queue->bytes_in_queue -= msgw->msgsize; | ||
1017 | GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); | ||
1018 | plugin->bytes_in_queue -= msgw->msgsize; | ||
1019 | GNUNET_STATISTICS_set (plugin->env->stats, | ||
1020 | "# bytes currently in UNIX buffers", | ||
1021 | plugin->bytes_in_queue, | ||
1022 | GNUNET_NO); | ||
1023 | GNUNET_STATISTICS_update (plugin->env->stats, | ||
1024 | "# UNIX bytes discarded", | ||
1025 | msgw->msgsize, | ||
1026 | GNUNET_NO); | ||
1027 | if (NULL != msgw->cont) | ||
1028 | msgw->cont (msgw->cont_cls, | ||
1029 | &msgw->queue->target, | ||
1030 | GNUNET_SYSERR, | ||
1031 | msgw->payload, | ||
1032 | 0); | ||
1033 | GNUNET_free (msgw->msg); | ||
1034 | GNUNET_free (msgw); | ||
1035 | } | ||
1036 | if (NULL == msgw) | ||
1037 | { | 636 | { |
1038 | if (GNUNET_YES == did_delete) | 637 | uint16_t offset = 0; |
1039 | notify_queue_monitor (plugin, | 638 | uint16_t tsize = msize - sizeof (struct UNIXMessage); |
1040 | queue, | 639 | const char *msgbuf = (const char *) &msg[1]; |
1041 | GNUNET_TRANSPORT_SS_UPDATE); | 640 | |
1042 | return; /* Nothing to send at the moment */ | 641 | while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) |
1043 | } | 642 | { |
1044 | queue = msgw->queue; | 643 | const struct GNUNET_MessageHeader *currhdr; |
1045 | sent = unix_real_send (plugin, | 644 | struct GNUNET_MessageHeader al_hdr; |
1046 | unix_sock, | 645 | uint16_t csize; |
1047 | &queue->target, | 646 | |
1048 | (const char *) msgw->msg, | 647 | currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset]; |
1049 | msgw->msgsize, | 648 | /* ensure aligned access */ |
1050 | msgw->priority, | 649 | memcpy (&al_hdr, |
1051 | msgw->timeout, | 650 | currhdr, |
1052 | msgw->queue->address->address, | 651 | sizeof (al_hdr)); |
1053 | msgw->queue->address->address_length, | 652 | csize = ntohs (al_hdr.size); |
1054 | msgw->payload, | 653 | if ( (csize < sizeof (struct GNUNET_MessageHeader)) || |
1055 | msgw->cont, msgw->cont_cls); | 654 | (csize > tsize - offset)) |
1056 | if (RETRY == sent) | 655 | { |
1057 | { | 656 | GNUNET_break_op (0); |
1058 | GNUNET_STATISTICS_update (plugin->env->stats, | 657 | break; |
1059 | "# UNIX retry attempts", | 658 | } |
1060 | 1, GNUNET_NO); | 659 | ret = GNUNET_TRANSPORT_communicator_receive (ch, |
1061 | notify_queue_monitor (plugin, | 660 | &msg->sender, |
1062 | queue, | 661 | currhdr, |
1063 | GNUNET_TRANSPORT_SS_UPDATE); | 662 | &receive_complete_cb, |
1064 | return; | 663 | NULL); |
664 | if (GNUNET_SYSERR == ret) | ||
665 | return; /* transport not up */ | ||
666 | if (GNUNET_NO == ret) | ||
667 | break; | ||
668 | delivering_messages++; | ||
669 | offset += csize; | ||
670 | } | ||
1065 | } | 671 | } |
1066 | GNUNET_CONTAINER_DLL_remove (plugin->msg_head, | 672 | if (delivering_messages >= max_queue_length) |
1067 | plugin->msg_tail, | ||
1068 | msgw); | ||
1069 | queue->msgs_in_queue--; | ||
1070 | GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize); | ||
1071 | queue->bytes_in_queue -= msgw->msgsize; | ||
1072 | GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); | ||
1073 | plugin->bytes_in_queue -= msgw->msgsize; | ||
1074 | GNUNET_STATISTICS_set (plugin->env->stats, | ||
1075 | "# bytes currently in UNIX buffers", | ||
1076 | plugin->bytes_in_queue, GNUNET_NO); | ||
1077 | notify_queue_monitor (plugin, | ||
1078 | queue, | ||
1079 | GNUNET_TRANSPORT_SS_UPDATE); | ||
1080 | if (GNUNET_SYSERR == sent) | ||
1081 | { | 673 | { |
1082 | /* failed and no retry */ | 674 | /* we should try to apply 'back pressure' */ |
1083 | if (NULL != msgw->cont) | 675 | GNUNET_SCHEDULER_cancel (read_task); |
1084 | msgw->cont (msgw->cont_cls, | 676 | read_task = NULL; |
1085 | &msgw->queue->target, | ||
1086 | GNUNET_SYSERR, | ||
1087 | msgw->payload, 0); | ||
1088 | GNUNET_STATISTICS_update (plugin->env->stats, | ||
1089 | "# UNIX bytes discarded", | ||
1090 | msgw->msgsize, | ||
1091 | GNUNET_NO); | ||
1092 | GNUNET_free (msgw->msg); | ||
1093 | GNUNET_free (msgw); | ||
1094 | return; | ||
1095 | } | 677 | } |
1096 | /* successfully sent bytes */ | ||
1097 | GNUNET_break (sent > 0); | ||
1098 | GNUNET_STATISTICS_update (plugin->env->stats, | ||
1099 | "# bytes transmitted via UNIX", | ||
1100 | msgw->msgsize, | ||
1101 | GNUNET_NO); | ||
1102 | if (NULL != msgw->cont) | ||
1103 | msgw->cont (msgw->cont_cls, | ||
1104 | &msgw->queue->target, | ||
1105 | GNUNET_OK, | ||
1106 | msgw->payload, | ||
1107 | msgw->msgsize); | ||
1108 | GNUNET_free (msgw->msg); | ||
1109 | GNUNET_free (msgw); | ||
1110 | } | ||
1111 | |||
1112 | |||
1113 | /** | ||
1114 | * We have been notified that our socket has something to read. | ||
1115 | * Then reschedule this function to be called again once more is available. | ||
1116 | * | ||
1117 | * @param cls the plugin handle | ||
1118 | */ | ||
1119 | static void | ||
1120 | unix_plugin_select_read (void *cls) | ||
1121 | { | ||
1122 | struct Plugin *plugin = cls; | ||
1123 | const struct GNUNET_SCHEDULER_TaskContext *tc; | ||
1124 | |||
1125 | plugin->read_task = NULL; | ||
1126 | tc = GNUNET_SCHEDULER_get_task_context (); | ||
1127 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) | ||
1128 | unix_plugin_do_read (plugin); | ||
1129 | plugin->read_task = | ||
1130 | GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
1131 | unix_sock, | ||
1132 | &unix_plugin_select_read, plugin); | ||
1133 | } | 678 | } |
1134 | 679 | ||
1135 | 680 | ||
@@ -1137,158 +682,155 @@ unix_plugin_select_read (void *cls) | |||
1137 | * We have been notified that our socket is ready to write. | 682 | * We have been notified that our socket is ready to write. |
1138 | * Then reschedule this function to be called again once more is available. | 683 | * Then reschedule this function to be called again once more is available. |
1139 | * | 684 | * |
1140 | * @param cls the plugin handle | 685 | * @param cls NULL |
1141 | */ | 686 | */ |
1142 | static void | 687 | static void |
1143 | unix_plugin_select_write (void *cls) | 688 | select_write_cb (void *cls) |
1144 | { | 689 | { |
1145 | struct Plugin *plugin = cls; | 690 | struct Queue *queue = queue_tail; |
1146 | const struct GNUNET_SCHEDULER_TaskContext *tc; | 691 | const struct GNUNET_MessageHeader *msg = queue->msg; |
1147 | 692 | size_t msg_size = ntohs (msg->size); | |
1148 | plugin->write_task = NULL; | 693 | ssize_t sent; |
1149 | tc = GNUNET_SCHEDULER_get_task_context (); | ||
1150 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) | ||
1151 | unix_plugin_do_write (plugin); | ||
1152 | if (NULL == plugin->msg_head) | ||
1153 | return; /* write queue empty */ | ||
1154 | plugin->write_task = | ||
1155 | GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
1156 | unix_sock, | ||
1157 | &unix_plugin_select_write, plugin); | ||
1158 | } | ||
1159 | |||
1160 | 694 | ||
1161 | /** | 695 | /* take queue of the ready list */ |
1162 | * Function that can be used by the transport service to transmit | 696 | write_task = NULL; |
1163 | * a message using the plugin. Note that in the case of a | 697 | GNUNET_CONTAINER_DLL_remove (queue_head, |
1164 | * peer disconnecting, the continuation MUST be called | 698 | queue_tail, |
1165 | * prior to the disconnect notification itself. This function | 699 | queue); |
1166 | * will be called with this peer's HELLO message to initiate | 700 | if (NULL != queue_head) |
1167 | * a fresh connection to another peer. | 701 | write_task = |
1168 | * | 702 | GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, |
1169 | * @param cls closure | 703 | unix_sock, |
1170 | * @param queue which queue must be used | 704 | &select_write_cb, |
1171 | * @param msgbuf the message to transmit | 705 | NULL); |
1172 | * @param msgbuf_size number of bytes in @a msgbuf | ||
1173 | * @param priority how important is the message (most plugins will | ||
1174 | * ignore message priority and just FIFO) | ||
1175 | * @param to how long to wait at most for the transmission (does not | ||
1176 | * require plugins to discard the message after the timeout, | ||
1177 | * just advisory for the desired delay; most plugins will ignore | ||
1178 | * this as well) | ||
1179 | * @param cont continuation to call once the message has | ||
1180 | * been transmitted (or if the transport is ready | ||
1181 | * for the next transmission call; or if the | ||
1182 | * peer disconnected...); can be NULL | ||
1183 | * @param cont_cls closure for @a cont | ||
1184 | * @return number of bytes used (on the physical network, with overheads); | ||
1185 | * -1 on hard errors (i.e. address invalid); 0 is a legal value | ||
1186 | * and does NOT mean that the message was not transmitted (DV) | ||
1187 | */ | ||
1188 | static ssize_t | ||
1189 | unix_plugin_send (void *cls, | ||
1190 | struct Queue *queue, | ||
1191 | const char *msgbuf, | ||
1192 | size_t msgbuf_size, | ||
1193 | unsigned int priority, | ||
1194 | struct GNUNET_TIME_Relative to, | ||
1195 | GNUNET_TRANSPORT_TransmitContinuation cont, | ||
1196 | void *cont_cls) | ||
1197 | { | ||
1198 | struct Plugin *plugin = cls; | ||
1199 | struct UNIXMessageWrapper *wrapper; | ||
1200 | struct UNIXMessage *message; | ||
1201 | int ssize; | ||
1202 | 706 | ||
1203 | if (GNUNET_OK != | 707 | /* send 'msg' */ |
1204 | GNUNET_CONTAINER_multipeermap_contains_value (plugin->queue_map, | 708 | queue->msg = NULL; |
1205 | &queue->target, | 709 | GNUNET_MQ_impl_send_continue (queue->mq); |
1206 | queue)) | 710 | resend: |
711 | /* Send the data */ | ||
712 | sent = GNUNET_NETWORK_socket_sendto (unix_sock, | ||
713 | queue->msg, | ||
714 | msg_size, | ||
715 | (const struct sockaddr *) mq->address, | ||
716 | mq->address_len); | ||
717 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
718 | "UNIX transmitted message to %s (%d/%u: %s)\n", | ||
719 | GNUNET_i2s (&queue->target), | ||
720 | (int) sent, | ||
721 | (unsigned int) msg_size, | ||
722 | (sent < 0) ? STRERROR (errno) : "ok"); | ||
723 | if (-1 != sent) | ||
724 | return; /* all good */ | ||
725 | switch (errno) | ||
1207 | { | 726 | { |
1208 | LOG (GNUNET_ERROR_TYPE_ERROR, | 727 | case EAGAIN: |
1209 | "Invalid queue for peer `%s' `%s'\n", | 728 | case ENOBUFS: |
1210 | GNUNET_i2s (&queue->target), | 729 | /* We should retry later... */ |
1211 | unix_plugin_address_to_string (NULL, | 730 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, |
1212 | queue->address->address, | 731 | "send"); |
1213 | queue->address->address_length)); | 732 | return; |
1214 | GNUNET_break (0); | 733 | case EMSGSIZE: |
1215 | return GNUNET_SYSERR; | 734 | { |
735 | socklen_t size = 0; | ||
736 | socklen_t len = sizeof (size); | ||
737 | |||
738 | GNUNET_NETWORK_socket_getsockopt (unix_sock, | ||
739 | SOL_SOCKET, | ||
740 | SO_SNDBUF, | ||
741 | &size, | ||
742 | &len); | ||
743 | if (size > ntohs (msg->size)) | ||
744 | { | ||
745 | /* Buffer is bigger than message: error, no retry | ||
746 | * This should never happen!*/ | ||
747 | GNUNET_break (0); | ||
748 | return; | ||
749 | } | ||
750 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
751 | "Trying to increase socket buffer size from %u to %u for message size %u\n", | ||
752 | (unsigned int) size, | ||
753 | (unsigned int) m((msg_size / 1000) + 2) * 1000, | ||
754 | (unsigned int) msg_size); | ||
755 | size = ((msg_size / 1000) + 2) * 1000; | ||
756 | if (GNUNET_OK == | ||
757 | GNUNET_NETWORK_socket_setsockopt (unix_sock, | ||
758 | SOL_SOCKET, | ||
759 | SO_SNDBUF, | ||
760 | &size, | ||
761 | sizeof (size))) | ||
762 | goto resend; /* Increased buffer size, retry sending */ | ||
763 | /* Ok, then just try very modest increase */ | ||
764 | size = msg_size; | ||
765 | if (GNUNET_OK == | ||
766 | GNUNET_NETWORK_socket_setsockopt (unix_sock, | ||
767 | SOL_SOCKET, | ||
768 | SO_SNDBUF, | ||
769 | &size, | ||
770 | sizeof (size))) | ||
771 | goto resend; /* Increased buffer size, retry sending */ | ||
772 | /* Could not increase buffer size: error, no retry */ | ||
773 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, | ||
774 | "setsockopt"); | ||
775 | return; | ||
776 | } | ||
777 | default: | ||
778 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, | ||
779 | "send"); | ||
780 | return; | ||
1216 | } | 781 | } |
1217 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1218 | "Sending %u bytes with queue for peer `%s' `%s'\n", | ||
1219 | msgbuf_size, | ||
1220 | GNUNET_i2s (&queue->target), | ||
1221 | unix_plugin_address_to_string (NULL, | ||
1222 | queue->address->address, | ||
1223 | queue->address->address_length)); | ||
1224 | ssize = sizeof (struct UNIXMessage) + msgbuf_size; | ||
1225 | message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size); | ||
1226 | message->header.size = htons (ssize); | ||
1227 | message->header.type = htons (0); | ||
1228 | GNUNET_memcpy (&message->sender, plugin->env->my_identity, | ||
1229 | sizeof (struct GNUNET_PeerIdentity)); | ||
1230 | GNUNET_memcpy (&message[1], msgbuf, msgbuf_size); | ||
1231 | wrapper = GNUNET_new (struct UNIXMessageWrapper); | ||
1232 | wrapper->msg = message; | ||
1233 | wrapper->msgsize = ssize; | ||
1234 | wrapper->payload = msgbuf_size; | ||
1235 | wrapper->priority = priority; | ||
1236 | wrapper->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), | ||
1237 | to); | ||
1238 | wrapper->cont = cont; | ||
1239 | wrapper->cont_cls = cont_cls; | ||
1240 | wrapper->queue = queue; | ||
1241 | GNUNET_CONTAINER_DLL_insert_tail (plugin->msg_head, | ||
1242 | plugin->msg_tail, | ||
1243 | wrapper); | ||
1244 | plugin->bytes_in_queue += ssize; | ||
1245 | queue->bytes_in_queue += ssize; | ||
1246 | queue->msgs_in_queue++; | ||
1247 | GNUNET_STATISTICS_set (plugin->env->stats, | ||
1248 | "# bytes currently in UNIX buffers", | ||
1249 | plugin->bytes_in_queue, | ||
1250 | GNUNET_NO); | ||
1251 | notify_queue_monitor (plugin, | ||
1252 | queue, | ||
1253 | GNUNET_TRANSPORT_SS_UPDATE); | ||
1254 | if (NULL == plugin->write_task) | ||
1255 | plugin->write_task = | ||
1256 | GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
1257 | unix_sock, | ||
1258 | &unix_plugin_select_write, plugin); | ||
1259 | return ssize; | ||
1260 | } | 782 | } |
1261 | 783 | ||
1262 | 784 | ||
1263 | /** | 785 | /** |
1264 | * Signature of functions implementing the | 786 | * Signature of functions implementing the sending functionality of a |
1265 | * sending functionality of a message queue. | 787 | * message queue. |
1266 | * | 788 | * |
1267 | * @param mq the message queue | 789 | * @param mq the message queue |
1268 | * @param msg the message to send | 790 | * @param msg the message to send |
1269 | * @param impl_state state of the implementation | 791 | * @param impl_state our `struct Queue` |
1270 | */ | 792 | */ |
1271 | static void | 793 | static void |
1272 | mq_send (struct GNUNET_MQ_Handle *mq, | 794 | mq_send (struct GNUNET_MQ_Handle *mq, |
1273 | const struct GNUNET_MessageHeader *msg, | 795 | const struct GNUNET_MessageHeader *msg, |
1274 | void *impl_state) | 796 | void *impl_state) |
1275 | { | 797 | { |
798 | struct Queue *queue = impl_state; | ||
799 | |||
800 | GNUNET_assert (mq == queue->mq); | ||
801 | GNUNET_assert (NULL == queue->msg); | ||
802 | queue->msg = msg; | ||
803 | GNUNET_CONTAINER_DLL_insert (queue_head, | ||
804 | queue_tail, | ||
805 | queue); | ||
806 | if (NULL == write_task) | ||
807 | write_task = | ||
808 | GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
809 | unix_sock, | ||
810 | &select_write_cb, | ||
811 | NULL); | ||
1276 | } | 812 | } |
1277 | 813 | ||
1278 | 814 | ||
1279 | /** | 815 | /** |
1280 | * Signature of functions implementing the | 816 | * Signature of functions implementing the destruction of a message |
1281 | * destruction of a message queue. | 817 | * queue. Implementations must not free @a mq, but should take care |
1282 | * Implementations must not free @a mq, but should | 818 | * of @a impl_state. |
1283 | * take care of @a impl_state. | ||
1284 | * | 819 | * |
1285 | * @param mq the message queue to destroy | 820 | * @param mq the message queue to destroy |
1286 | * @param impl_state state of the implementation | 821 | * @param impl_state our `struct Queue` |
1287 | */ | 822 | */ |
1288 | static void | 823 | static void |
1289 | mq_destroy (struct GNUNET_MQ_Handle *mq, | 824 | mq_destroy (struct GNUNET_MQ_Handle *mq, |
1290 | void *impl_state) | 825 | void *impl_state) |
1291 | { | 826 | { |
827 | struct Queue *queue = impl_state; | ||
828 | |||
829 | if (mq == queue->mq) | ||
830 | { | ||
831 | queue->mq = NULL; | ||
832 | queue_destroy (queue); | ||
833 | } | ||
1292 | } | 834 | } |
1293 | 835 | ||
1294 | 836 | ||
@@ -1296,12 +838,15 @@ mq_destroy (struct GNUNET_MQ_Handle *mq, | |||
1296 | * Implementation function that cancels the currently sent message. | 838 | * Implementation function that cancels the currently sent message. |
1297 | * | 839 | * |
1298 | * @param mq message queue | 840 | * @param mq message queue |
1299 | * @param impl_state state specific to the implementation | 841 | * @param impl_state our `struct Queue` |
1300 | */ | 842 | */ |
1301 | static void | 843 | static void |
1302 | mq_cancel (struct GNUNET_MQ_Handle *mq, | 844 | mq_cancel (struct GNUNET_MQ_Handle *mq, |
1303 | void *impl_state) | 845 | void *impl_state) |
1304 | { | 846 | { |
847 | struct Queue *queue = impl_state; | ||
848 | |||
849 | // FIXME: TBD! | ||
1305 | } | 850 | } |
1306 | 851 | ||
1307 | 852 | ||
@@ -1311,15 +856,17 @@ mq_cancel (struct GNUNET_MQ_Handle *mq, | |||
1311 | * the message queue. | 856 | * the message queue. |
1312 | * Not every message queue implementation supports an error handler. | 857 | * Not every message queue implementation supports an error handler. |
1313 | * | 858 | * |
1314 | * @param cls closure | 859 | * @param cls our `struct Queue` |
1315 | * @param error error code | 860 | * @param error error code |
1316 | */ | 861 | */ |
1317 | static void | 862 | static void |
1318 | mq_error (void *cls, | 863 | mq_error (void *cls, |
1319 | enum GNUNET_MQ_Error error) | 864 | enum GNUNET_MQ_Error error) |
1320 | { | 865 | { |
1321 | } | 866 | struct Queue *queue = cls; |
1322 | 867 | ||
868 | // FIXME: TBD! | ||
869 | } | ||
1323 | 870 | ||
1324 | 871 | ||
1325 | /** | 872 | /** |
@@ -1470,7 +1017,6 @@ do_shutdown (void *cls) | |||
1470 | GNUNET_TRANSPORT_communicator_disconnect (ch); | 1017 | GNUNET_TRANSPORT_communicator_disconnect (ch); |
1471 | ch = NULL; | 1018 | ch = NULL; |
1472 | } | 1019 | } |
1473 | GNUNET_break (0 == bytes_in_queue); | ||
1474 | } | 1020 | } |
1475 | 1021 | ||
1476 | 1022 | ||
@@ -1497,7 +1043,7 @@ run (void *cls, | |||
1497 | 1043 | ||
1498 | if (GNUNET_OK != | 1044 | if (GNUNET_OK != |
1499 | GNUNET_CONFIGURATION_get_value_filename (cfg, | 1045 | GNUNET_CONFIGURATION_get_value_filename (cfg, |
1500 | "transport-unix", | 1046 | "communicator-unix", |
1501 | "UNIXPATH", | 1047 | "UNIXPATH", |
1502 | &unix_socket_path)) | 1048 | &unix_socket_path)) |
1503 | { | 1049 | { |
@@ -1506,7 +1052,14 @@ run (void *cls, | |||
1506 | "UNIXPATH"); | 1052 | "UNIXPATH"); |
1507 | return; | 1053 | return; |
1508 | } | 1054 | } |
1509 | 1055 | if (GNUNET_OK != | |
1056 | GNUNET_CONFIGURATION_get_value_number (cfg, | ||
1057 | "communicator-unix", | ||
1058 | "MAX_QUEUE_LENGTH", | ||
1059 | &max_queue_length)) | ||
1060 | max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; | ||
1061 | |||
1062 | |||
1510 | /* Initialize my flags */ | 1063 | /* Initialize my flags */ |
1511 | is_abstract = 0; | 1064 | is_abstract = 0; |
1512 | #ifdef LINUX | 1065 | #ifdef LINUX |
@@ -1571,7 +1124,7 @@ run (void *cls, | |||
1571 | queue_map = GNUNET_CONTAINER_multipeermap_create (10, | 1124 | queue_map = GNUNET_CONTAINER_multipeermap_create (10, |
1572 | GNUNET_NO); | 1125 | GNUNET_NO); |
1573 | ch = GNUNET_TRANSPORT_communicator_connect (cfg, | 1126 | ch = GNUNET_TRANSPORT_communicator_connect (cfg, |
1574 | "unix", | 1127 | COMMUNICATOR_NAME, |
1575 | 65535, | 1128 | 65535, |
1576 | &mq_init, | 1129 | &mq_init, |
1577 | NULL); | 1130 | NULL); |
@@ -1587,13 +1140,16 @@ run (void *cls, | |||
1587 | COMMUNICATOR_NAME, | 1140 | COMMUNICATOR_NAME, |
1588 | unix_socket_path, | 1141 | unix_socket_path, |
1589 | is_abstract); | 1142 | is_abstract); |
1590 | |||
1591 | ai = GNUNET_TRANSPORT_communicator_address_add (ch, | 1143 | ai = GNUNET_TRANSPORT_communicator_address_add (ch, |
1592 | my_addr, | 1144 | my_addr, |
1593 | GNUNET_ATS_NET_LOOPBACK, | 1145 | GNUNET_ATS_NET_LOOPBACK, |
1594 | GNUNET_TIME_UNIT_FOREVER_REL); | 1146 | GNUNET_TIME_UNIT_FOREVER_REL); |
1595 | GNUNET_free (my_addr); | 1147 | GNUNET_free (my_addr); |
1596 | GNUNET_free (unix_socket_path); | 1148 | GNUNET_free (unix_socket_path); |
1149 | read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
1150 | unix_sock, | ||
1151 | &select_read_cb, | ||
1152 | NULL); | ||
1597 | } | 1153 | } |
1598 | 1154 | ||
1599 | 1155 | ||