diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/include/gnunet_transport_communication_service.h | 2 | ||||
-rw-r--r-- | src/transport/Makefile.am | 9 | ||||
-rw-r--r-- | src/transport/gnunet-communicator-unix.c | 769 |
3 files changed, 363 insertions, 417 deletions
diff --git a/src/include/gnunet_transport_communication_service.h b/src/include/gnunet_transport_communication_service.h index b1a248e51..ab5d3742a 100644 --- a/src/include/gnunet_transport_communication_service.h +++ b/src/include/gnunet_transport_communication_service.h | |||
@@ -70,7 +70,7 @@ extern "C" | |||
70 | typedef int | 70 | typedef int |
71 | (*GNUNET_TRANSPORT_CommunicatorMqInit) (void *cls, | 71 | (*GNUNET_TRANSPORT_CommunicatorMqInit) (void *cls, |
72 | const struct GNUNET_PeerIdentity *peer, | 72 | const struct GNUNET_PeerIdentity *peer, |
73 | const void *address); | 73 | const char *address); |
74 | 74 | ||
75 | 75 | ||
76 | /** | 76 | /** |
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am index c6c02c6ed..92b53137f 100644 --- a/src/transport/Makefile.am +++ b/src/transport/Makefile.am | |||
@@ -140,6 +140,7 @@ endif | |||
140 | 140 | ||
141 | noinst_PROGRAMS = \ | 141 | noinst_PROGRAMS = \ |
142 | gnunet-transport-profiler \ | 142 | gnunet-transport-profiler \ |
143 | gnunet-communicator-unix \ | ||
143 | $(WLAN_BIN_SENDER) \ | 144 | $(WLAN_BIN_SENDER) \ |
144 | $(WLAN_BIN_RECEIVER) | 145 | $(WLAN_BIN_RECEIVER) |
145 | 146 | ||
@@ -219,6 +220,14 @@ gnunet_transport_certificate_creation_SOURCES = \ | |||
219 | gnunet_transport_certificate_creation_LDADD = \ | 220 | gnunet_transport_certificate_creation_LDADD = \ |
220 | $(top_builddir)/src/util/libgnunetutil.la | 221 | $(top_builddir)/src/util/libgnunetutil.la |
221 | 222 | ||
223 | gnunet_communicator_unix_SOURCES = \ | ||
224 | gnunet-communicator-unix.c | ||
225 | gnunet_communicator_unix_LDADD = \ | ||
226 | libgnunettransportcommunicator.la \ | ||
227 | $(top_builddir)/src/statistics/libgnunetstatistics.la \ | ||
228 | $(top_builddir)/src/util/libgnunetutil.la | ||
229 | |||
230 | |||
222 | gnunet_helper_transport_wlan_SOURCES = \ | 231 | gnunet_helper_transport_wlan_SOURCES = \ |
223 | gnunet-helper-transport-wlan.c | 232 | gnunet-helper-transport-wlan.c |
224 | 233 | ||
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c index f07975186..2879b1738 100644 --- a/src/transport/gnunet-communicator-unix.c +++ b/src/transport/gnunet-communicator-unix.c | |||
@@ -154,6 +154,11 @@ static unsigned long long delivering_messages; | |||
154 | static unsigned long long max_queue_length; | 154 | static unsigned long long max_queue_length; |
155 | 155 | ||
156 | /** | 156 | /** |
157 | * For logging statistics. | ||
158 | */ | ||
159 | static struct GNUNET_STATISTICS_Handle *stats; | ||
160 | |||
161 | /** | ||
157 | * Our environment. | 162 | * Our environment. |
158 | */ | 163 | */ |
159 | static struct GNUNET_TRANSPORT_CommunicatorHandle *ch; | 164 | static struct GNUNET_TRANSPORT_CommunicatorHandle *ch; |
@@ -194,12 +199,11 @@ static struct GNUNET_TRANSPORT_AddressIdentifier *ai; | |||
194 | static void | 199 | static void |
195 | queue_destroy (struct Queue *queue) | 200 | queue_destroy (struct Queue *queue) |
196 | { | 201 | { |
197 | struct Plugin *plugin = cls; | ||
198 | struct GNUNET_MQ_Handle *mq; | 202 | struct GNUNET_MQ_Handle *mq; |
199 | 203 | ||
200 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 204 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
201 | "Disconnecting queue for peer `%s'\n", | 205 | "Disconnecting queue for peer `%s'\n", |
202 | GNUNET_i2s (&queue->target)); | 206 | GNUNET_i2s (&queue->target)); |
203 | if (0 != queue->bytes_in_queue) | 207 | if (0 != queue->bytes_in_queue) |
204 | { | 208 | { |
205 | GNUNET_CONTAINER_DLL_remove (queue_head, | 209 | GNUNET_CONTAINER_DLL_remove (queue_head, |
@@ -253,11 +257,11 @@ queue_timeout (void *cls) | |||
253 | queue); | 257 | queue); |
254 | return; | 258 | return; |
255 | } | 259 | } |
256 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 260 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
257 | "Queue %p was idle for %s, disconnecting\n", | 261 | "Queue %p was idle for %s, disconnecting\n", |
258 | queue, | 262 | queue, |
259 | GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | 263 | GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, |
260 | GNUNET_YES)); | 264 | GNUNET_YES)); |
261 | queue_destroy (queue); | 265 | queue_destroy (queue); |
262 | } | 266 | } |
263 | 267 | ||
@@ -288,8 +292,7 @@ reschedule_queue_timeout (struct Queue *queue) | |||
288 | */ | 292 | */ |
289 | static struct sockaddr_un * | 293 | static struct sockaddr_un * |
290 | unix_address_to_sockaddr (const char *unixpath, | 294 | unix_address_to_sockaddr (const char *unixpath, |
291 | socklen_t *sock_len, | 295 | socklen_t *sock_len) |
292 | int is_abstract) | ||
293 | { | 296 | { |
294 | struct sockaddr_un *un; | 297 | struct sockaddr_un *un; |
295 | size_t slen; | 298 | size_t slen; |
@@ -309,7 +312,7 @@ unix_address_to_sockaddr (const char *unixpath, | |||
309 | un->sun_len = (u_char) slen; | 312 | un->sun_len = (u_char) slen; |
310 | #endif | 313 | #endif |
311 | (*sock_len) = slen; | 314 | (*sock_len) = slen; |
312 | if (GNUNET_YES == is_abstract) | 315 | if ('@' == un->sun_path[0]) |
313 | un->sun_path[0] = '\0'; | 316 | un->sun_path[0] = '\0'; |
314 | return un; | 317 | return un; |
315 | } | 318 | } |
@@ -328,7 +331,7 @@ struct LookupCtx | |||
328 | /** | 331 | /** |
329 | * Address we are looking for. | 332 | * Address we are looking for. |
330 | */ | 333 | */ |
331 | const sockaddr_un *un; | 334 | const struct sockaddr_un *un; |
332 | 335 | ||
333 | /** | 336 | /** |
334 | * Number of bytes in @a un | 337 | * Number of bytes in @a un |
@@ -347,7 +350,7 @@ struct LookupCtx | |||
347 | */ | 350 | */ |
348 | static int | 351 | static int |
349 | lookup_queue_it (void *cls, | 352 | lookup_queue_it (void *cls, |
350 | const struct GNUNET_PeerIdentity * key, | 353 | const struct GNUNET_PeerIdentity *key, |
351 | void *value) | 354 | void *value) |
352 | { | 355 | { |
353 | struct LookupCtx *lctx = cls; | 356 | struct LookupCtx *lctx = cls; |
@@ -374,14 +377,14 @@ lookup_queue_it (void *cls, | |||
374 | */ | 377 | */ |
375 | static struct Queue * | 378 | static struct Queue * |
376 | lookup_queue (const struct GNUNET_PeerIdentity *peer, | 379 | lookup_queue (const struct GNUNET_PeerIdentity *peer, |
377 | const sockaddr_un *un, | 380 | const struct sockaddr_un *un, |
378 | socklen_t un_len) | 381 | socklen_t un_len) |
379 | { | 382 | { |
380 | struct LookupCtx lctx; | 383 | struct LookupCtx lctx; |
381 | 384 | ||
382 | lctx.un = un; | 385 | lctx.un = un; |
383 | lctx.un_len = un_len; | 386 | lctx.un_len = un_len; |
384 | GNUNET_CONTAINER_multipeermap_get_multiple (plugin->queue_map, | 387 | GNUNET_CONTAINER_multipeermap_get_multiple (queue_map, |
385 | peer, | 388 | peer, |
386 | &lookup_queue_it, | 389 | &lookup_queue_it, |
387 | &lctx); | 390 | &lctx); |
@@ -390,295 +393,6 @@ lookup_queue (const struct GNUNET_PeerIdentity *peer, | |||
390 | 393 | ||
391 | 394 | ||
392 | /** | 395 | /** |
393 | * Creates a new outbound queue the transport service will use to send | ||
394 | * data to another peer. | ||
395 | * | ||
396 | * @param peer the target peer | ||
397 | * @param un the address | ||
398 | * @param un_len number of bytes in @a un | ||
399 | * @return the queue or NULL of max connections exceeded | ||
400 | */ | ||
401 | static struct Queue * | ||
402 | unix_plugin_get_queue (const struct GNUNET_PeerIdentity *target, | ||
403 | const struct sockaddr_un *un, | ||
404 | socklen_t un_len) | ||
405 | { | ||
406 | struct Plugin *plugin = cls; | ||
407 | struct Queue *queue; | ||
408 | struct UnixAddress *ua; | ||
409 | char * addrstr; | ||
410 | uint32_t addr_str_len; | ||
411 | uint32_t addr_option; | ||
412 | char *foreign_addr; | ||
413 | int is_abstract; | ||
414 | |||
415 | if (is_abstract = ('\0' == un.sun_path[0])) | ||
416 | un.sun_path[0] = '/'; | ||
417 | GNUNET_asprintf (&foreign_addr, | ||
418 | "%s-%s#%d", | ||
419 | COMMUNICATOR_NAME, | ||
420 | un.sun_path, | ||
421 | is_abstract); | ||
422 | |||
423 | |||
424 | addrstr = (char *) &ua[1]; | ||
425 | addr_str_len = ntohl (ua->addrlen); | ||
426 | addr_option = ntohl (ua->options); | ||
427 | |||
428 | /* create a new queue */ | ||
429 | queue = GNUNET_new (struct Queue); | ||
430 | queue->target = address->peer; | ||
431 | queue->address = GNUNET_HELLO_address_copy (address); | ||
432 | queue->plugin = plugin; | ||
433 | queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); | ||
434 | queue->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | ||
435 | &queue_timeout, | ||
436 | queue); | ||
437 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
438 | "Creating a new queue %p for address `%s'\n", | ||
439 | queue, | ||
440 | unix_plugin_address_to_string (NULL, | ||
441 | address->address, | ||
442 | address->address_length)); | ||
443 | (void) GNUNET_CONTAINER_multipeermap_put (plugin->queue_map, | ||
444 | &address->peer, queue, | ||
445 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
446 | GNUNET_STATISTICS_set (plugin->env->stats, | ||
447 | "# UNIX queues active", | ||
448 | GNUNET_CONTAINER_multipeermap_size (queue_map), | ||
449 | GNUNET_NO); | ||
450 | return queue; | ||
451 | } | ||
452 | |||
453 | |||
454 | /** | ||
455 | * Function that will be called whenever the transport service wants | ||
456 | * to notify the plugin that a queue is still active and in use and | ||
457 | * therefore the queue timeout for this queue has to be updated | ||
458 | * | ||
459 | * @param cls closure with the `struct Plugin *` | ||
460 | * @param peer which peer was the queue for | ||
461 | * @param queue which queue is being updated | ||
462 | */ | ||
463 | static void | ||
464 | unix_plugin_update_queue_timeout (void *cls, | ||
465 | const struct GNUNET_PeerIdentity *peer, | ||
466 | struct Queue *queue) | ||
467 | { | ||
468 | struct Plugin *plugin = cls; | ||
469 | |||
470 | if (GNUNET_OK != | ||
471 | GNUNET_CONTAINER_multipeermap_contains_value (plugin->queue_map, | ||
472 | &queue->target, | ||
473 | queue)) | ||
474 | { | ||
475 | GNUNET_break (0); | ||
476 | return; | ||
477 | } | ||
478 | reschedule_queue_timeout (queue); | ||
479 | } | ||
480 | |||
481 | |||
482 | /** | ||
483 | * Demultiplexer for UNIX messages | ||
484 | * | ||
485 | * @param plugin the main plugin for this transport | ||
486 | * @param sender from which peer the message was received | ||
487 | * @param currhdr pointer to the header of the message | ||
488 | * @param ua address to look for | ||
489 | * @param ua_len length of the address @a ua | ||
490 | */ | ||
491 | static void | ||
492 | unix_demultiplexer (struct Plugin *plugin, | ||
493 | struct GNUNET_PeerIdentity *sender, | ||
494 | const struct GNUNET_MessageHeader *currhdr, | ||
495 | const struct UnixAddress *ua, | ||
496 | size_t ua_len) | ||
497 | { | ||
498 | struct Queue *queue; | ||
499 | struct GNUNET_HELLO_Address *address; | ||
500 | |||
501 | GNUNET_assert (ua_len >= sizeof (struct UnixAddress)); | ||
502 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
503 | "Received message from %s\n", | ||
504 | unix_plugin_address_to_string (NULL, ua, ua_len)); | ||
505 | GNUNET_STATISTICS_update (plugin->env->stats, | ||
506 | "# bytes received via UNIX", | ||
507 | ntohs (currhdr->size), | ||
508 | GNUNET_NO); | ||
509 | |||
510 | /* Look for existing queue */ | ||
511 | address = GNUNET_HELLO_address_allocate (sender, | ||
512 | PLUGIN_NAME, | ||
513 | ua, ua_len, | ||
514 | GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" queues */ | ||
515 | queue = lookup_queue (plugin, address); | ||
516 | if (NULL == queue) | ||
517 | { | ||
518 | queue = unix_plugin_get_queue (plugin, address); | ||
519 | /* Notify transport and ATS about new inbound queue */ | ||
520 | plugin->env->queue_start (NULL, | ||
521 | queue->address, | ||
522 | queue, | ||
523 | GNUNET_ATS_NET_LOOPBACK); | ||
524 | } | ||
525 | else | ||
526 | { | ||
527 | reschedule_queue_timeout (queue); | ||
528 | } | ||
529 | GNUNET_HELLO_address_free (address); | ||
530 | plugin->env->receive (plugin->env->cls, | ||
531 | queue->address, | ||
532 | queue, | ||
533 | currhdr); | ||
534 | } | ||
535 | |||
536 | |||
537 | /** | ||
538 | * We have been notified that our socket has something to read. Do the | ||
539 | * read and reschedule this function to be called again once more is | ||
540 | * available. | ||
541 | * | ||
542 | * @param cls NULL | ||
543 | */ | ||
544 | static void | ||
545 | select_read_cb (void *cls); | ||
546 | |||
547 | |||
548 | /** | ||
549 | * Function called when message was successfully passed to | ||
550 | * transport service. Continue read activity. | ||
551 | * | ||
552 | * @param cls NULL | ||
553 | */ | ||
554 | static void | ||
555 | receive_complete_cb (void *cls) | ||
556 | { | ||
557 | delivering_messages--; | ||
558 | if ( (NULL == read_task) && | ||
559 | (delivering_messages < max_queue_length) ) | ||
560 | read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
561 | unix_sock, | ||
562 | &select_read_cb, | ||
563 | NULL); | ||
564 | } | ||
565 | |||
566 | |||
567 | /** | ||
568 | * We have been notified that our socket has something to read. Do the | ||
569 | * read and reschedule this function to be called again once more is | ||
570 | * available. | ||
571 | * | ||
572 | * @param cls NULL | ||
573 | */ | ||
574 | static void | ||
575 | select_read_cb (void *cls) | ||
576 | { | ||
577 | char buf[65536] GNUNET_ALIGN; | ||
578 | struct Queue *queue; | ||
579 | const struct UNIXMessage *msg; | ||
580 | struct sockaddr_un un; | ||
581 | socklen_t addrlen; | ||
582 | ssize_t ret; | ||
583 | uint16_t msize; | ||
584 | |||
585 | read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
586 | unix_sock, | ||
587 | &select_read_cb, | ||
588 | NULL); | ||
589 | addrlen = sizeof (un); | ||
590 | memset (&un, | ||
591 | 0, | ||
592 | sizeof (un)); | ||
593 | ret = GNUNET_NETWORK_socket_recvfrom (unix_sock, | ||
594 | buf, | ||
595 | sizeof (buf), | ||
596 | (struct sockaddr *) &un, | ||
597 | &addrlen); | ||
598 | if ( (-1 == ret) && | ||
599 | ( (EAGAIN == errno) || | ||
600 | (ENOBUFS == errno) ) ) | ||
601 | return; | ||
602 | if (-1 == ret) | ||
603 | { | ||
604 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, | ||
605 | "recvfrom"); | ||
606 | return; | ||
607 | } | ||
608 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
609 | "Read %d bytes from socket %s\n", | ||
610 | (int) ret, | ||
611 | un.sun_path); | ||
612 | GNUNET_assert (AF_UNIX == (un.sun_family)); | ||
613 | msg = (struct UNIXMessage *) buf; | ||
614 | msize = ntohs (msg->header.size); | ||
615 | if ( (msize < sizeof (struct UNIXMessage)) || | ||
616 | (msize > ret) ) | ||
617 | { | ||
618 | GNUNET_break_op (0); | ||
619 | return; | ||
620 | } | ||
621 | queue = lookup_queue (&msg->sender, | ||
622 | un, | ||
623 | addrlen); | ||
624 | if (NULL == queue) | ||
625 | queue = setup_queue (&msg->sender, | ||
626 | un, | ||
627 | addrlen); | ||
628 | if (NULL == queue) | ||
629 | { | ||
630 | GNUENT_log (GNUNET_ERROR_TYPE_ERROR, | ||
631 | _("Maximum number of UNIX connections exceeded, dropping incoming message\n")); | ||
632 | return; | ||
633 | } | ||
634 | |||
635 | |||
636 | { | ||
637 | uint16_t offset = 0; | ||
638 | uint16_t tsize = msize - sizeof (struct UNIXMessage); | ||
639 | const char *msgbuf = (const char *) &msg[1]; | ||
640 | |||
641 | while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) | ||
642 | { | ||
643 | const struct GNUNET_MessageHeader *currhdr; | ||
644 | struct GNUNET_MessageHeader al_hdr; | ||
645 | uint16_t csize; | ||
646 | |||
647 | currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset]; | ||
648 | /* ensure aligned access */ | ||
649 | memcpy (&al_hdr, | ||
650 | currhdr, | ||
651 | sizeof (al_hdr)); | ||
652 | csize = ntohs (al_hdr.size); | ||
653 | if ( (csize < sizeof (struct GNUNET_MessageHeader)) || | ||
654 | (csize > tsize - offset)) | ||
655 | { | ||
656 | GNUNET_break_op (0); | ||
657 | break; | ||
658 | } | ||
659 | ret = GNUNET_TRANSPORT_communicator_receive (ch, | ||
660 | &msg->sender, | ||
661 | currhdr, | ||
662 | &receive_complete_cb, | ||
663 | NULL); | ||
664 | if (GNUNET_SYSERR == ret) | ||
665 | return; /* transport not up */ | ||
666 | if (GNUNET_NO == ret) | ||
667 | break; | ||
668 | delivering_messages++; | ||
669 | offset += csize; | ||
670 | } | ||
671 | } | ||
672 | if (delivering_messages >= max_queue_length) | ||
673 | { | ||
674 | /* we should try to apply 'back pressure' */ | ||
675 | GNUNET_SCHEDULER_cancel (read_task); | ||
676 | read_task = NULL; | ||
677 | } | ||
678 | } | ||
679 | |||
680 | |||
681 | /** | ||
682 | * We have been notified that our socket is ready to write. | 396 | * We have been notified that our socket is ready to write. |
683 | * Then reschedule this function to be called again once more is available. | 397 | * Then reschedule this function to be called again once more is available. |
684 | * | 398 | * |
@@ -712,16 +426,27 @@ select_write_cb (void *cls) | |||
712 | sent = GNUNET_NETWORK_socket_sendto (unix_sock, | 426 | sent = GNUNET_NETWORK_socket_sendto (unix_sock, |
713 | queue->msg, | 427 | queue->msg, |
714 | msg_size, | 428 | msg_size, |
715 | (const struct sockaddr *) mq->address, | 429 | (const struct sockaddr *) queue->address, |
716 | mq->address_len); | 430 | queue->address_len); |
717 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 431 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
718 | "UNIX transmitted message to %s (%d/%u: %s)\n", | 432 | "UNIX transmitted message to %s (%d/%u: %s)\n", |
719 | GNUNET_i2s (&queue->target), | 433 | GNUNET_i2s (&queue->target), |
720 | (int) sent, | 434 | (int) sent, |
721 | (unsigned int) msg_size, | 435 | (unsigned int) msg_size, |
722 | (sent < 0) ? STRERROR (errno) : "ok"); | 436 | (sent < 0) ? STRERROR (errno) : "ok"); |
723 | if (-1 != sent) | 437 | if (-1 != sent) |
438 | { | ||
439 | GNUNET_STATISTICS_update (stats, | ||
440 | "# bytes sent", | ||
441 | (long long) sent, | ||
442 | GNUNET_NO); | ||
443 | reschedule_queue_timeout (queue); | ||
724 | return; /* all good */ | 444 | return; /* all good */ |
445 | } | ||
446 | GNUNET_STATISTICS_update (stats, | ||
447 | "# network transmission failures", | ||
448 | 1, | ||
449 | GNUNET_NO); | ||
725 | switch (errno) | 450 | switch (errno) |
726 | { | 451 | { |
727 | case EAGAIN: | 452 | case EAGAIN: |
@@ -747,11 +472,11 @@ select_write_cb (void *cls) | |||
747 | GNUNET_break (0); | 472 | GNUNET_break (0); |
748 | return; | 473 | return; |
749 | } | 474 | } |
750 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 475 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
751 | "Trying to increase socket buffer size from %u to %u for message size %u\n", | 476 | "Trying to increase socket buffer size from %u to %u for message size %u\n", |
752 | (unsigned int) size, | 477 | (unsigned int) size, |
753 | (unsigned int) m((msg_size / 1000) + 2) * 1000, | 478 | (unsigned int) ((msg_size / 1000) + 2) * 1000, |
754 | (unsigned int) msg_size); | 479 | (unsigned int) msg_size); |
755 | size = ((msg_size / 1000) + 2) * 1000; | 480 | size = ((msg_size / 1000) + 2) * 1000; |
756 | if (GNUNET_OK == | 481 | if (GNUNET_OK == |
757 | GNUNET_NETWORK_socket_setsockopt (unix_sock, | 482 | GNUNET_NETWORK_socket_setsockopt (unix_sock, |
@@ -846,7 +571,17 @@ mq_cancel (struct GNUNET_MQ_Handle *mq, | |||
846 | { | 571 | { |
847 | struct Queue *queue = impl_state; | 572 | struct Queue *queue = impl_state; |
848 | 573 | ||
849 | // FIXME: TBD! | 574 | GNUNET_assert (NULL != queue->msg); |
575 | queue->msg = NULL; | ||
576 | GNUNET_CONTAINER_DLL_remove (queue_head, | ||
577 | queue_tail, | ||
578 | queue); | ||
579 | GNUNET_assert (NULL != write_task); | ||
580 | if (NULL == queue_head) | ||
581 | { | ||
582 | GNUNET_SCHEDULER_cancel (write_task); | ||
583 | write_task = NULL; | ||
584 | } | ||
850 | } | 585 | } |
851 | 586 | ||
852 | 587 | ||
@@ -865,7 +600,230 @@ mq_error (void *cls, | |||
865 | { | 600 | { |
866 | struct Queue *queue = cls; | 601 | struct Queue *queue = cls; |
867 | 602 | ||
868 | // FIXME: TBD! | 603 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
604 | "UNIX MQ error in queue to %s: %d\n", | ||
605 | GNUNET_i2s (&queue->target), | ||
606 | (int) error); | ||
607 | queue_destroy (queue); | ||
608 | } | ||
609 | |||
610 | |||
611 | /** | ||
612 | * Creates a new outbound queue the transport service will use to send | ||
613 | * data to another peer. | ||
614 | * | ||
615 | * @param peer the target peer | ||
616 | * @param un the address | ||
617 | * @param un_len number of bytes in @a un | ||
618 | * @return the queue or NULL of max connections exceeded | ||
619 | */ | ||
620 | static struct Queue * | ||
621 | setup_queue (const struct GNUNET_PeerIdentity *target, | ||
622 | const struct sockaddr_un *un, | ||
623 | socklen_t un_len) | ||
624 | { | ||
625 | struct Queue *queue; | ||
626 | |||
627 | queue = GNUNET_new (struct Queue); | ||
628 | queue->target = *target; | ||
629 | queue->address = GNUNET_memdup (un, | ||
630 | un_len); | ||
631 | queue->address_len = un_len; | ||
632 | (void) GNUNET_CONTAINER_multipeermap_put (queue_map, | ||
633 | &queue->target, | ||
634 | queue, | ||
635 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
636 | GNUNET_STATISTICS_set (stats, | ||
637 | "# queues active", | ||
638 | GNUNET_CONTAINER_multipeermap_size (queue_map), | ||
639 | GNUNET_NO); | ||
640 | queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); | ||
641 | queue->timeout_task | ||
642 | = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | ||
643 | &queue_timeout, | ||
644 | queue); | ||
645 | queue->mq | ||
646 | = GNUNET_MQ_queue_for_callbacks (&mq_send, | ||
647 | &mq_destroy, | ||
648 | &mq_cancel, | ||
649 | queue, | ||
650 | NULL, | ||
651 | &mq_error, | ||
652 | queue); | ||
653 | { | ||
654 | char *foreign_addr; | ||
655 | |||
656 | if ('\0' == un->sun_path[0]) | ||
657 | GNUNET_asprintf (&foreign_addr, | ||
658 | "%s-@%s", | ||
659 | COMMUNICATOR_NAME, | ||
660 | &un->sun_path[1]); | ||
661 | else | ||
662 | GNUNET_asprintf (&foreign_addr, | ||
663 | "%s-%s", | ||
664 | COMMUNICATOR_NAME, | ||
665 | un->sun_path); | ||
666 | queue->qh | ||
667 | = GNUNET_TRANSPORT_communicator_mq_add (ch, | ||
668 | &queue->target, | ||
669 | foreign_addr, | ||
670 | GNUNET_ATS_NET_LOOPBACK, | ||
671 | queue->mq); | ||
672 | GNUNET_free (foreign_addr); | ||
673 | } | ||
674 | return queue; | ||
675 | } | ||
676 | |||
677 | |||
678 | /** | ||
679 | * We have been notified that our socket has something to read. Do the | ||
680 | * read and reschedule this function to be called again once more is | ||
681 | * available. | ||
682 | * | ||
683 | * @param cls NULL | ||
684 | */ | ||
685 | static void | ||
686 | select_read_cb (void *cls); | ||
687 | |||
688 | |||
689 | /** | ||
690 | * Function called when message was successfully passed to | ||
691 | * transport service. Continue read activity. | ||
692 | * | ||
693 | * @param cls NULL | ||
694 | * @param success #GNUNET_OK on success | ||
695 | */ | ||
696 | static void | ||
697 | receive_complete_cb (void *cls, | ||
698 | int success) | ||
699 | { | ||
700 | delivering_messages--; | ||
701 | if (GNUNET_OK != success) | ||
702 | GNUNET_STATISTICS_update (stats, | ||
703 | "# transport transmission failures", | ||
704 | 1, | ||
705 | GNUNET_NO); | ||
706 | if ( (NULL == read_task) && | ||
707 | (delivering_messages < max_queue_length) ) | ||
708 | read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
709 | unix_sock, | ||
710 | &select_read_cb, | ||
711 | NULL); | ||
712 | } | ||
713 | |||
714 | |||
715 | /** | ||
716 | * We have been notified that our socket has something to read. Do the | ||
717 | * read and reschedule this function to be called again once more is | ||
718 | * available. | ||
719 | * | ||
720 | * @param cls NULL | ||
721 | */ | ||
722 | static void | ||
723 | select_read_cb (void *cls) | ||
724 | { | ||
725 | char buf[65536] GNUNET_ALIGN; | ||
726 | struct Queue *queue; | ||
727 | const struct UNIXMessage *msg; | ||
728 | struct sockaddr_un un; | ||
729 | socklen_t addrlen; | ||
730 | ssize_t ret; | ||
731 | uint16_t msize; | ||
732 | |||
733 | read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, | ||
734 | unix_sock, | ||
735 | &select_read_cb, | ||
736 | NULL); | ||
737 | addrlen = sizeof (un); | ||
738 | memset (&un, | ||
739 | 0, | ||
740 | sizeof (un)); | ||
741 | ret = GNUNET_NETWORK_socket_recvfrom (unix_sock, | ||
742 | buf, | ||
743 | sizeof (buf), | ||
744 | (struct sockaddr *) &un, | ||
745 | &addrlen); | ||
746 | if ( (-1 == ret) && | ||
747 | ( (EAGAIN == errno) || | ||
748 | (ENOBUFS == errno) ) ) | ||
749 | return; | ||
750 | if (-1 == ret) | ||
751 | { | ||
752 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, | ||
753 | "recvfrom"); | ||
754 | return; | ||
755 | } | ||
756 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
757 | "Read %d bytes from socket %s\n", | ||
758 | (int) ret, | ||
759 | un.sun_path); | ||
760 | GNUNET_assert (AF_UNIX == (un.sun_family)); | ||
761 | msg = (struct UNIXMessage *) buf; | ||
762 | msize = ntohs (msg->header.size); | ||
763 | if ( (msize < sizeof (struct UNIXMessage)) || | ||
764 | (msize > ret) ) | ||
765 | { | ||
766 | GNUNET_break_op (0); | ||
767 | return; | ||
768 | } | ||
769 | queue = lookup_queue (&msg->sender, | ||
770 | &un, | ||
771 | addrlen); | ||
772 | if (NULL == queue) | ||
773 | queue = setup_queue (&msg->sender, | ||
774 | &un, | ||
775 | addrlen); | ||
776 | else | ||
777 | reschedule_queue_timeout (queue); | ||
778 | if (NULL == queue) | ||
779 | { | ||
780 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
781 | _("Maximum number of UNIX connections exceeded, dropping incoming message\n")); | ||
782 | return; | ||
783 | } | ||
784 | |||
785 | { | ||
786 | uint16_t offset = 0; | ||
787 | uint16_t tsize = msize - sizeof (struct UNIXMessage); | ||
788 | const char *msgbuf = (const char *) &msg[1]; | ||
789 | |||
790 | while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize) | ||
791 | { | ||
792 | const struct GNUNET_MessageHeader *currhdr; | ||
793 | struct GNUNET_MessageHeader al_hdr; | ||
794 | uint16_t csize; | ||
795 | |||
796 | currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset]; | ||
797 | /* ensure aligned access */ | ||
798 | memcpy (&al_hdr, | ||
799 | currhdr, | ||
800 | sizeof (al_hdr)); | ||
801 | csize = ntohs (al_hdr.size); | ||
802 | if ( (csize < sizeof (struct GNUNET_MessageHeader)) || | ||
803 | (csize > tsize - offset)) | ||
804 | { | ||
805 | GNUNET_break_op (0); | ||
806 | break; | ||
807 | } | ||
808 | ret = GNUNET_TRANSPORT_communicator_receive (ch, | ||
809 | &msg->sender, | ||
810 | currhdr, | ||
811 | &receive_complete_cb, | ||
812 | NULL); | ||
813 | if (GNUNET_SYSERR == ret) | ||
814 | return; /* transport not up */ | ||
815 | if (GNUNET_NO == ret) | ||
816 | break; | ||
817 | delivering_messages++; | ||
818 | offset += csize; | ||
819 | } | ||
820 | } | ||
821 | if (delivering_messages >= max_queue_length) | ||
822 | { | ||
823 | /* we should try to apply 'back pressure' */ | ||
824 | GNUNET_SCHEDULER_cancel (read_task); | ||
825 | read_task = NULL; | ||
826 | } | ||
869 | } | 827 | } |
870 | 828 | ||
871 | 829 | ||
@@ -889,76 +847,69 @@ mq_error (void *cls, | |||
889 | static int | 847 | static int |
890 | mq_init (void *cls, | 848 | mq_init (void *cls, |
891 | const struct GNUNET_PeerIdentity *peer, | 849 | const struct GNUNET_PeerIdentity *peer, |
892 | const void *address) | 850 | const char *address) |
893 | { | 851 | { |
894 | struct Queue *queue; | 852 | struct Queue *queue; |
895 | char *a; | 853 | const char *path; |
896 | char *e; | 854 | struct sockaddr_un *un; |
897 | int is_abs; | ||
898 | sockaddr_un *un; | ||
899 | socklen_t un_len; | 855 | socklen_t un_len; |
900 | 856 | ||
901 | if (NULL == strncmp (address, | 857 | if (0 != strncmp (address, |
902 | COMMUNICATOR_NAME "-", | 858 | COMMUNICATOR_NAME "-", |
903 | strlen (COMMUNICATOR_NAME "-"))) | 859 | strlen (COMMUNICATOR_NAME "-"))) |
904 | { | 860 | { |
905 | GNUNET_break_op (0); | 861 | GNUNET_break_op (0); |
906 | return GNUNET_SYSERR; | 862 | return GNUNET_SYSERR; |
907 | } | 863 | } |
908 | a = GNUNET_strdup (&address[strlen (COMMUNICATOR_NAME "-")]); | 864 | path = &address[strlen (COMMUNICATOR_NAME "-")]; |
909 | e = strchr (a, | 865 | un = unix_address_to_sockaddr (path, |
910 | (unsigned char) '#'); | 866 | &un_len); |
911 | if (NULL == e) | ||
912 | { | ||
913 | GNUNET_free (a); | ||
914 | GNUNET_break_op (0); | ||
915 | return GNUNET_SYSERR; | ||
916 | } | ||
917 | is_abs = ('1' == e[1]); | ||
918 | *e = '\0'; | ||
919 | un = unix_address_to_sockaddr (a, | ||
920 | &un_len, | ||
921 | is_abs); | ||
922 | queue = lookup_queue (peer, | 867 | queue = lookup_queue (peer, |
923 | un, | 868 | un, |
924 | un_len); | 869 | un_len); |
925 | if (NULL != queue) | 870 | if (NULL != queue) |
926 | { | 871 | { |
927 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 872 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
928 | "Address `%s' ignored, queue exists\n", | 873 | "Address `%s' for %s ignored, queue exists\n", |
929 | address); | 874 | path, |
875 | GNUNET_i2s (peer)); | ||
930 | GNUNET_free (un); | 876 | GNUNET_free (un); |
931 | return GNUNET_OK; | 877 | return GNUNET_OK; |
932 | } | 878 | } |
933 | queue = GNUNET_new (struct Queue); | 879 | queue = setup_queue (peer, |
934 | queue->target = *peer; | 880 | un, |
935 | queue->address = un; | 881 | un_len); |
936 | queue->address_len = un_len; | 882 | GNUNET_free (un); |
937 | (void) GNUNET_CONTAINER_multihashmap_put (queue_map, | 883 | if (NULL == queue) |
938 | &queue->target, | 884 | { |
939 | queue, | 885 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
940 | GNUET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 886 | "Failed to setup queue to %s at `%s'\n", |
941 | GNUNET_STATISTICS_set (stats, | 887 | GNUNET_i2s (peer), |
942 | "# UNIX queues active", | 888 | path); |
943 | GNUNET_CONTAINER_multipeermap_size (plugin->queue_map), | 889 | return GNUNET_NO; |
944 | GNUNET_NO); | 890 | } |
945 | queue->timeout = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | 891 | return GNUNET_OK; |
946 | &queue_timeout, | 892 | } |
947 | queue); | 893 | |
948 | queue->mq | 894 | |
949 | = GNUNET_MQ_queue_for_callbacks (&mq_send, | 895 | /** |
950 | &mq_destroy, | 896 | * Iterator over all message queues to clean up. |
951 | &mq_cancel, | 897 | * |
952 | queue, | 898 | * @param cls NULL |
953 | NULL, | 899 | * @param target unused |
954 | &mq_error, | 900 | * @param value the queue to destroy |
955 | queue); | 901 | * @return #GNUNET_OK to continue to iterate |
956 | queue->qh | 902 | */ |
957 | = GNUNET_TRANSPORT_communicator_mq_add (ch, | 903 | static int |
958 | &queue->target, | 904 | get_queue_delete_it (void *cls, |
959 | address, | 905 | const struct GNUNET_PeerIdentity *target, |
960 | ATS, | 906 | void *value) |
961 | queue->mq); | 907 | { |
908 | struct Queue *queue = value; | ||
909 | |||
910 | (void) cls; | ||
911 | (void) target; | ||
912 | queue_destroy (queue); | ||
962 | return GNUNET_OK; | 913 | return GNUNET_OK; |
963 | } | 914 | } |
964 | 915 | ||
@@ -971,22 +922,6 @@ mq_init (void *cls, | |||
971 | static void | 922 | static void |
972 | do_shutdown (void *cls) | 923 | do_shutdown (void *cls) |
973 | { | 924 | { |
974 | struct UNIXMessageWrapper *msgw; | ||
975 | |||
976 | while (NULL != (msgw = msg_head)) | ||
977 | { | ||
978 | GNUNET_CONTAINER_DLL_remove (msg_head, | ||
979 | msg_tail, | ||
980 | msgw); | ||
981 | queue = msgw->queue; | ||
982 | queue->msgs_in_queue--; | ||
983 | GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize); | ||
984 | queue->bytes_in_queue -= msgw->msgsize; | ||
985 | GNUNET_assert (bytes_in_queue >= msgw->msgsize); | ||
986 | bytes_in_queue -= msgw->msgsize; | ||
987 | GNUNET_free (msgw->msg); | ||
988 | GNUNET_free (msgw); | ||
989 | } | ||
990 | if (NULL != read_task) | 925 | if (NULL != read_task) |
991 | { | 926 | { |
992 | GNUNET_SCHEDULER_cancel (read_task); | 927 | GNUNET_SCHEDULER_cancel (read_task); |
@@ -1017,6 +952,12 @@ do_shutdown (void *cls) | |||
1017 | GNUNET_TRANSPORT_communicator_disconnect (ch); | 952 | GNUNET_TRANSPORT_communicator_disconnect (ch); |
1018 | ch = NULL; | 953 | ch = NULL; |
1019 | } | 954 | } |
955 | if (NULL != stats) | ||
956 | { | ||
957 | GNUNET_STATISTICS_destroy (stats, | ||
958 | GNUNET_NO); | ||
959 | stats = NULL; | ||
960 | } | ||
1020 | } | 961 | } |
1021 | 962 | ||
1022 | 963 | ||
@@ -1035,7 +976,6 @@ run (void *cls, | |||
1035 | const struct GNUNET_CONFIGURATION_Handle *cfg) | 976 | const struct GNUNET_CONFIGURATION_Handle *cfg) |
1036 | { | 977 | { |
1037 | char *unix_socket_path; | 978 | char *unix_socket_path; |
1038 | int is_abstract; | ||
1039 | struct sockaddr_un *un; | 979 | struct sockaddr_un *un; |
1040 | socklen_t un_len; | 980 | socklen_t un_len; |
1041 | char *my_addr; | 981 | char *my_addr; |
@@ -1059,18 +999,16 @@ run (void *cls, | |||
1059 | &max_queue_length)) | 999 | &max_queue_length)) |
1060 | max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; | 1000 | max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; |
1061 | 1001 | ||
1062 | |||
1063 | /* Initialize my flags */ | ||
1064 | is_abstract = 0; | ||
1065 | #ifdef LINUX | ||
1066 | is_abstract | ||
1067 | = GNUNET_CONFIGURATION_get_value_yesno (cfg, | ||
1068 | "testing", | ||
1069 | "USE_ABSTRACT_SOCKETS"); | ||
1070 | #endif | ||
1071 | un = unix_address_to_sockaddr (unix_socket_path, | 1002 | un = unix_address_to_sockaddr (unix_socket_path, |
1072 | &un_len, | 1003 | &un_len); |
1073 | is_abstract); | 1004 | if (NULL == un) |
1005 | { | ||
1006 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1007 | "Failed to setup UNIX domain socket address with path `%s'\n", | ||
1008 | unix_socket_path); | ||
1009 | GNUNET_free (unix_socket_path); | ||
1010 | return; | ||
1011 | } | ||
1074 | unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX, | 1012 | unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX, |
1075 | SOCK_DGRAM, | 1013 | SOCK_DGRAM, |
1076 | 0); | 1014 | 0); |
@@ -1086,9 +1024,9 @@ run (void *cls, | |||
1086 | (GNUNET_OK != | 1024 | (GNUNET_OK != |
1087 | GNUNET_DISK_directory_create_for_file (un->sun_path)) ) | 1025 | GNUNET_DISK_directory_create_for_file (un->sun_path)) ) |
1088 | { | 1026 | { |
1089 | LOG (GNUNET_ERROR_TYPE_ERROR, | 1027 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
1090 | _("Cannot create path to `%s'\n"), | 1028 | _("Cannot create path to `%s'\n"), |
1091 | un->sun_path); | 1029 | un->sun_path); |
1092 | GNUNET_NETWORK_socket_close (unix_sock); | 1030 | GNUNET_NETWORK_socket_close (unix_sock); |
1093 | unix_sock = NULL; | 1031 | unix_sock = NULL; |
1094 | GNUNET_free (un); | 1032 | GNUNET_free (un); |
@@ -1100,11 +1038,9 @@ run (void *cls, | |||
1100 | (const struct sockaddr *) un, | 1038 | (const struct sockaddr *) un, |
1101 | un_len)) | 1039 | un_len)) |
1102 | { | 1040 | { |
1103 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, | 1041 | GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR, |
1104 | "bind"); | 1042 | "bind", |
1105 | LOG (GNUNET_ERROR_TYPE_ERROR, | 1043 | un->sun_path); |
1106 | _("Cannot bind to `%s'\n"), | ||
1107 | un->sun_path); | ||
1108 | GNUNET_NETWORK_socket_close (unix_sock); | 1044 | GNUNET_NETWORK_socket_close (unix_sock); |
1109 | unix_sock = NULL; | 1045 | unix_sock = NULL; |
1110 | GNUNET_free (un); | 1046 | GNUNET_free (un); |
@@ -1112,14 +1048,16 @@ run (void *cls, | |||
1112 | return; | 1048 | return; |
1113 | } | 1049 | } |
1114 | GNUNET_free (un); | 1050 | GNUNET_free (un); |
1115 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1051 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1116 | "Bound to `%s'\n", | 1052 | "Bound to `%s'\n", |
1117 | unix_socket_path); | 1053 | unix_socket_path); |
1054 | stats = GNUNET_STATISTICS_create ("C-UNIX", | ||
1055 | cfg); | ||
1118 | GNUNET_SCHEDULER_add_shutdown (&do_shutdown, | 1056 | GNUNET_SCHEDULER_add_shutdown (&do_shutdown, |
1119 | NULL); | 1057 | NULL); |
1120 | read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, | 1058 | read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, |
1121 | unix_sock, | 1059 | unix_sock, |
1122 | &unix_plugin_select_read, | 1060 | &select_read_cb, |
1123 | NULL); | 1061 | NULL); |
1124 | queue_map = GNUNET_CONTAINER_multipeermap_create (10, | 1062 | queue_map = GNUNET_CONTAINER_multipeermap_create (10, |
1125 | GNUNET_NO); | 1063 | GNUNET_NO); |
@@ -1136,10 +1074,9 @@ run (void *cls, | |||
1136 | return; | 1074 | return; |
1137 | } | 1075 | } |
1138 | GNUNET_asprintf (&my_addr, | 1076 | GNUNET_asprintf (&my_addr, |
1139 | "%s-%s#%d", | 1077 | "%s-%s", |
1140 | COMMUNICATOR_NAME, | 1078 | COMMUNICATOR_NAME, |
1141 | unix_socket_path, | 1079 | unix_socket_path); |
1142 | is_abstract); | ||
1143 | ai = GNUNET_TRANSPORT_communicator_address_add (ch, | 1080 | ai = GNUNET_TRANSPORT_communicator_address_add (ch, |
1144 | my_addr, | 1081 | my_addr, |
1145 | GNUNET_ATS_NET_LOOPBACK, | 1082 | GNUNET_ATS_NET_LOOPBACK, |