aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2018-11-08 14:20:33 +0100
committerChristian Grothoff <christian@grothoff.org>2018-11-08 14:20:33 +0100
commit304bfc5d18d5613a38b5d927925dbfa00adfc82a (patch)
treeb863b2ef83a9e79413be4dae313a423ac2be9dd6 /src
parenta18d1f2587ca5df5a9c6e47c012bfbaf3f19098c (diff)
downloadgnunet-304bfc5d18d5613a38b5d927925dbfa00adfc82a.tar.gz
gnunet-304bfc5d18d5613a38b5d927925dbfa00adfc82a.zip
unix communicator now builds
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_transport_communication_service.h2
-rw-r--r--src/transport/Makefile.am9
-rw-r--r--src/transport/gnunet-communicator-unix.c769
3 files changed, 363 insertions, 417 deletions
diff --git a/src/include/gnunet_transport_communication_service.h b/src/include/gnunet_transport_communication_service.h
index b1a248e51..ab5d3742a 100644
--- a/src/include/gnunet_transport_communication_service.h
+++ b/src/include/gnunet_transport_communication_service.h
@@ -70,7 +70,7 @@ extern "C"
70typedef int 70typedef int
71(*GNUNET_TRANSPORT_CommunicatorMqInit) (void *cls, 71(*GNUNET_TRANSPORT_CommunicatorMqInit) (void *cls,
72 const struct GNUNET_PeerIdentity *peer, 72 const struct GNUNET_PeerIdentity *peer,
73 const void *address); 73 const char *address);
74 74
75 75
76/** 76/**
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index c6c02c6ed..92b53137f 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -140,6 +140,7 @@ endif
140 140
141noinst_PROGRAMS = \ 141noinst_PROGRAMS = \
142 gnunet-transport-profiler \ 142 gnunet-transport-profiler \
143 gnunet-communicator-unix \
143 $(WLAN_BIN_SENDER) \ 144 $(WLAN_BIN_SENDER) \
144 $(WLAN_BIN_RECEIVER) 145 $(WLAN_BIN_RECEIVER)
145 146
@@ -219,6 +220,14 @@ gnunet_transport_certificate_creation_SOURCES = \
219gnunet_transport_certificate_creation_LDADD = \ 220gnunet_transport_certificate_creation_LDADD = \
220 $(top_builddir)/src/util/libgnunetutil.la 221 $(top_builddir)/src/util/libgnunetutil.la
221 222
223gnunet_communicator_unix_SOURCES = \
224 gnunet-communicator-unix.c
225gnunet_communicator_unix_LDADD = \
226 libgnunettransportcommunicator.la \
227 $(top_builddir)/src/statistics/libgnunetstatistics.la \
228 $(top_builddir)/src/util/libgnunetutil.la
229
230
222gnunet_helper_transport_wlan_SOURCES = \ 231gnunet_helper_transport_wlan_SOURCES = \
223 gnunet-helper-transport-wlan.c 232 gnunet-helper-transport-wlan.c
224 233
diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c
index f07975186..2879b1738 100644
--- a/src/transport/gnunet-communicator-unix.c
+++ b/src/transport/gnunet-communicator-unix.c
@@ -154,6 +154,11 @@ static unsigned long long delivering_messages;
154static unsigned long long max_queue_length; 154static unsigned long long max_queue_length;
155 155
156/** 156/**
157 * For logging statistics.
158 */
159static struct GNUNET_STATISTICS_Handle *stats;
160
161/**
157 * Our environment. 162 * Our environment.
158 */ 163 */
159static struct GNUNET_TRANSPORT_CommunicatorHandle *ch; 164static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
@@ -194,12 +199,11 @@ static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
194static void 199static void
195queue_destroy (struct Queue *queue) 200queue_destroy (struct Queue *queue)
196{ 201{
197 struct Plugin *plugin = cls;
198 struct GNUNET_MQ_Handle *mq; 202 struct GNUNET_MQ_Handle *mq;
199 203
200 LOG (GNUNET_ERROR_TYPE_DEBUG, 204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
201 "Disconnecting queue for peer `%s'\n", 205 "Disconnecting queue for peer `%s'\n",
202 GNUNET_i2s (&queue->target)); 206 GNUNET_i2s (&queue->target));
203 if (0 != queue->bytes_in_queue) 207 if (0 != queue->bytes_in_queue)
204 { 208 {
205 GNUNET_CONTAINER_DLL_remove (queue_head, 209 GNUNET_CONTAINER_DLL_remove (queue_head,
@@ -253,11 +257,11 @@ queue_timeout (void *cls)
253 queue); 257 queue);
254 return; 258 return;
255 } 259 }
256 LOG (GNUNET_ERROR_TYPE_DEBUG, 260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
257 "Queue %p was idle for %s, disconnecting\n", 261 "Queue %p was idle for %s, disconnecting\n",
258 queue, 262 queue,
259 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 263 GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
260 GNUNET_YES)); 264 GNUNET_YES));
261 queue_destroy (queue); 265 queue_destroy (queue);
262} 266}
263 267
@@ -288,8 +292,7 @@ reschedule_queue_timeout (struct Queue *queue)
288 */ 292 */
289static struct sockaddr_un * 293static struct sockaddr_un *
290unix_address_to_sockaddr (const char *unixpath, 294unix_address_to_sockaddr (const char *unixpath,
291 socklen_t *sock_len, 295 socklen_t *sock_len)
292 int is_abstract)
293{ 296{
294 struct sockaddr_un *un; 297 struct sockaddr_un *un;
295 size_t slen; 298 size_t slen;
@@ -309,7 +312,7 @@ unix_address_to_sockaddr (const char *unixpath,
309 un->sun_len = (u_char) slen; 312 un->sun_len = (u_char) slen;
310#endif 313#endif
311 (*sock_len) = slen; 314 (*sock_len) = slen;
312 if (GNUNET_YES == is_abstract) 315 if ('@' == un->sun_path[0])
313 un->sun_path[0] = '\0'; 316 un->sun_path[0] = '\0';
314 return un; 317 return un;
315} 318}
@@ -328,7 +331,7 @@ struct LookupCtx
328 /** 331 /**
329 * Address we are looking for. 332 * Address we are looking for.
330 */ 333 */
331 const sockaddr_un *un; 334 const struct sockaddr_un *un;
332 335
333 /** 336 /**
334 * Number of bytes in @a un 337 * Number of bytes in @a un
@@ -347,7 +350,7 @@ struct LookupCtx
347 */ 350 */
348static int 351static int
349lookup_queue_it (void *cls, 352lookup_queue_it (void *cls,
350 const struct GNUNET_PeerIdentity * key, 353 const struct GNUNET_PeerIdentity *key,
351 void *value) 354 void *value)
352{ 355{
353 struct LookupCtx *lctx = cls; 356 struct LookupCtx *lctx = cls;
@@ -374,14 +377,14 @@ lookup_queue_it (void *cls,
374 */ 377 */
375static struct Queue * 378static struct Queue *
376lookup_queue (const struct GNUNET_PeerIdentity *peer, 379lookup_queue (const struct GNUNET_PeerIdentity *peer,
377 const sockaddr_un *un, 380 const struct sockaddr_un *un,
378 socklen_t un_len) 381 socklen_t un_len)
379{ 382{
380 struct LookupCtx lctx; 383 struct LookupCtx lctx;
381 384
382 lctx.un = un; 385 lctx.un = un;
383 lctx.un_len = un_len; 386 lctx.un_len = un_len;
384 GNUNET_CONTAINER_multipeermap_get_multiple (plugin->queue_map, 387 GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
385 peer, 388 peer,
386 &lookup_queue_it, 389 &lookup_queue_it,
387 &lctx); 390 &lctx);
@@ -390,295 +393,6 @@ lookup_queue (const struct GNUNET_PeerIdentity *peer,
390 393
391 394
392/** 395/**
393 * Creates a new outbound queue the transport service will use to send
394 * data to another peer.
395 *
396 * @param peer the target peer
397 * @param un the address
398 * @param un_len number of bytes in @a un
399 * @return the queue or NULL of max connections exceeded
400 */
401static struct Queue *
402unix_plugin_get_queue (const struct GNUNET_PeerIdentity *target,
403 const struct sockaddr_un *un,
404 socklen_t un_len)
405{
406 struct Plugin *plugin = cls;
407 struct Queue *queue;
408 struct UnixAddress *ua;
409 char * addrstr;
410 uint32_t addr_str_len;
411 uint32_t addr_option;
412 char *foreign_addr;
413 int is_abstract;
414
415 if (is_abstract = ('\0' == un.sun_path[0]))
416 un.sun_path[0] = '/';
417 GNUNET_asprintf (&foreign_addr,
418 "%s-%s#%d",
419 COMMUNICATOR_NAME,
420 un.sun_path,
421 is_abstract);
422
423
424 addrstr = (char *) &ua[1];
425 addr_str_len = ntohl (ua->addrlen);
426 addr_option = ntohl (ua->options);
427
428 /* create a new queue */
429 queue = GNUNET_new (struct Queue);
430 queue->target = address->peer;
431 queue->address = GNUNET_HELLO_address_copy (address);
432 queue->plugin = plugin;
433 queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
434 queue->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
435 &queue_timeout,
436 queue);
437 LOG (GNUNET_ERROR_TYPE_DEBUG,
438 "Creating a new queue %p for address `%s'\n",
439 queue,
440 unix_plugin_address_to_string (NULL,
441 address->address,
442 address->address_length));
443 (void) GNUNET_CONTAINER_multipeermap_put (plugin->queue_map,
444 &address->peer, queue,
445 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
446 GNUNET_STATISTICS_set (plugin->env->stats,
447 "# UNIX queues active",
448 GNUNET_CONTAINER_multipeermap_size (queue_map),
449 GNUNET_NO);
450 return queue;
451}
452
453
454/**
455 * Function that will be called whenever the transport service wants
456 * to notify the plugin that a queue is still active and in use and
457 * therefore the queue timeout for this queue has to be updated
458 *
459 * @param cls closure with the `struct Plugin *`
460 * @param peer which peer was the queue for
461 * @param queue which queue is being updated
462 */
463static void
464unix_plugin_update_queue_timeout (void *cls,
465 const struct GNUNET_PeerIdentity *peer,
466 struct Queue *queue)
467{
468 struct Plugin *plugin = cls;
469
470 if (GNUNET_OK !=
471 GNUNET_CONTAINER_multipeermap_contains_value (plugin->queue_map,
472 &queue->target,
473 queue))
474 {
475 GNUNET_break (0);
476 return;
477 }
478 reschedule_queue_timeout (queue);
479}
480
481
482/**
483 * Demultiplexer for UNIX messages
484 *
485 * @param plugin the main plugin for this transport
486 * @param sender from which peer the message was received
487 * @param currhdr pointer to the header of the message
488 * @param ua address to look for
489 * @param ua_len length of the address @a ua
490 */
491static void
492unix_demultiplexer (struct Plugin *plugin,
493 struct GNUNET_PeerIdentity *sender,
494 const struct GNUNET_MessageHeader *currhdr,
495 const struct UnixAddress *ua,
496 size_t ua_len)
497{
498 struct Queue *queue;
499 struct GNUNET_HELLO_Address *address;
500
501 GNUNET_assert (ua_len >= sizeof (struct UnixAddress));
502 LOG (GNUNET_ERROR_TYPE_DEBUG,
503 "Received message from %s\n",
504 unix_plugin_address_to_string (NULL, ua, ua_len));
505 GNUNET_STATISTICS_update (plugin->env->stats,
506 "# bytes received via UNIX",
507 ntohs (currhdr->size),
508 GNUNET_NO);
509
510 /* Look for existing queue */
511 address = GNUNET_HELLO_address_allocate (sender,
512 PLUGIN_NAME,
513 ua, ua_len,
514 GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" queues */
515 queue = lookup_queue (plugin, address);
516 if (NULL == queue)
517 {
518 queue = unix_plugin_get_queue (plugin, address);
519 /* Notify transport and ATS about new inbound queue */
520 plugin->env->queue_start (NULL,
521 queue->address,
522 queue,
523 GNUNET_ATS_NET_LOOPBACK);
524 }
525 else
526 {
527 reschedule_queue_timeout (queue);
528 }
529 GNUNET_HELLO_address_free (address);
530 plugin->env->receive (plugin->env->cls,
531 queue->address,
532 queue,
533 currhdr);
534}
535
536
537/**
538 * We have been notified that our socket has something to read. Do the
539 * read and reschedule this function to be called again once more is
540 * available.
541 *
542 * @param cls NULL
543 */
544static void
545select_read_cb (void *cls);
546
547
548/**
549 * Function called when message was successfully passed to
550 * transport service. Continue read activity.
551 *
552 * @param cls NULL
553 */
554static void
555receive_complete_cb (void *cls)
556{
557 delivering_messages--;
558 if ( (NULL == read_task) &&
559 (delivering_messages < max_queue_length) )
560 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
561 unix_sock,
562 &select_read_cb,
563 NULL);
564}
565
566
567/**
568 * We have been notified that our socket has something to read. Do the
569 * read and reschedule this function to be called again once more is
570 * available.
571 *
572 * @param cls NULL
573 */
574static void
575select_read_cb (void *cls)
576{
577 char buf[65536] GNUNET_ALIGN;
578 struct Queue *queue;
579 const struct UNIXMessage *msg;
580 struct sockaddr_un un;
581 socklen_t addrlen;
582 ssize_t ret;
583 uint16_t msize;
584
585 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
586 unix_sock,
587 &select_read_cb,
588 NULL);
589 addrlen = sizeof (un);
590 memset (&un,
591 0,
592 sizeof (un));
593 ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
594 buf,
595 sizeof (buf),
596 (struct sockaddr *) &un,
597 &addrlen);
598 if ( (-1 == ret) &&
599 ( (EAGAIN == errno) ||
600 (ENOBUFS == errno) ) )
601 return;
602 if (-1 == ret)
603 {
604 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
605 "recvfrom");
606 return;
607 }
608 LOG (GNUNET_ERROR_TYPE_DEBUG,
609 "Read %d bytes from socket %s\n",
610 (int) ret,
611 un.sun_path);
612 GNUNET_assert (AF_UNIX == (un.sun_family));
613 msg = (struct UNIXMessage *) buf;
614 msize = ntohs (msg->header.size);
615 if ( (msize < sizeof (struct UNIXMessage)) ||
616 (msize > ret) )
617 {
618 GNUNET_break_op (0);
619 return;
620 }
621 queue = lookup_queue (&msg->sender,
622 un,
623 addrlen);
624 if (NULL == queue)
625 queue = setup_queue (&msg->sender,
626 un,
627 addrlen);
628 if (NULL == queue)
629 {
630 GNUENT_log (GNUNET_ERROR_TYPE_ERROR,
631 _("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
632 return;
633 }
634
635
636 {
637 uint16_t offset = 0;
638 uint16_t tsize = msize - sizeof (struct UNIXMessage);
639 const char *msgbuf = (const char *) &msg[1];
640
641 while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
642 {
643 const struct GNUNET_MessageHeader *currhdr;
644 struct GNUNET_MessageHeader al_hdr;
645 uint16_t csize;
646
647 currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
648 /* ensure aligned access */
649 memcpy (&al_hdr,
650 currhdr,
651 sizeof (al_hdr));
652 csize = ntohs (al_hdr.size);
653 if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
654 (csize > tsize - offset))
655 {
656 GNUNET_break_op (0);
657 break;
658 }
659 ret = GNUNET_TRANSPORT_communicator_receive (ch,
660 &msg->sender,
661 currhdr,
662 &receive_complete_cb,
663 NULL);
664 if (GNUNET_SYSERR == ret)
665 return; /* transport not up */
666 if (GNUNET_NO == ret)
667 break;
668 delivering_messages++;
669 offset += csize;
670 }
671 }
672 if (delivering_messages >= max_queue_length)
673 {
674 /* we should try to apply 'back pressure' */
675 GNUNET_SCHEDULER_cancel (read_task);
676 read_task = NULL;
677 }
678}
679
680
681/**
682 * We have been notified that our socket is ready to write. 396 * We have been notified that our socket is ready to write.
683 * Then reschedule this function to be called again once more is available. 397 * Then reschedule this function to be called again once more is available.
684 * 398 *
@@ -712,16 +426,27 @@ select_write_cb (void *cls)
712 sent = GNUNET_NETWORK_socket_sendto (unix_sock, 426 sent = GNUNET_NETWORK_socket_sendto (unix_sock,
713 queue->msg, 427 queue->msg,
714 msg_size, 428 msg_size,
715 (const struct sockaddr *) mq->address, 429 (const struct sockaddr *) queue->address,
716 mq->address_len); 430 queue->address_len);
717 LOG (GNUNET_ERROR_TYPE_DEBUG, 431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
718 "UNIX transmitted message to %s (%d/%u: %s)\n", 432 "UNIX transmitted message to %s (%d/%u: %s)\n",
719 GNUNET_i2s (&queue->target), 433 GNUNET_i2s (&queue->target),
720 (int) sent, 434 (int) sent,
721 (unsigned int) msg_size, 435 (unsigned int) msg_size,
722 (sent < 0) ? STRERROR (errno) : "ok"); 436 (sent < 0) ? STRERROR (errno) : "ok");
723 if (-1 != sent) 437 if (-1 != sent)
438 {
439 GNUNET_STATISTICS_update (stats,
440 "# bytes sent",
441 (long long) sent,
442 GNUNET_NO);
443 reschedule_queue_timeout (queue);
724 return; /* all good */ 444 return; /* all good */
445 }
446 GNUNET_STATISTICS_update (stats,
447 "# network transmission failures",
448 1,
449 GNUNET_NO);
725 switch (errno) 450 switch (errno)
726 { 451 {
727 case EAGAIN: 452 case EAGAIN:
@@ -747,11 +472,11 @@ select_write_cb (void *cls)
747 GNUNET_break (0); 472 GNUNET_break (0);
748 return; 473 return;
749 } 474 }
750 LOG (GNUNET_ERROR_TYPE_DEBUG, 475 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
751 "Trying to increase socket buffer size from %u to %u for message size %u\n", 476 "Trying to increase socket buffer size from %u to %u for message size %u\n",
752 (unsigned int) size, 477 (unsigned int) size,
753 (unsigned int) m((msg_size / 1000) + 2) * 1000, 478 (unsigned int) ((msg_size / 1000) + 2) * 1000,
754 (unsigned int) msg_size); 479 (unsigned int) msg_size);
755 size = ((msg_size / 1000) + 2) * 1000; 480 size = ((msg_size / 1000) + 2) * 1000;
756 if (GNUNET_OK == 481 if (GNUNET_OK ==
757 GNUNET_NETWORK_socket_setsockopt (unix_sock, 482 GNUNET_NETWORK_socket_setsockopt (unix_sock,
@@ -846,7 +571,17 @@ mq_cancel (struct GNUNET_MQ_Handle *mq,
846{ 571{
847 struct Queue *queue = impl_state; 572 struct Queue *queue = impl_state;
848 573
849 // FIXME: TBD! 574 GNUNET_assert (NULL != queue->msg);
575 queue->msg = NULL;
576 GNUNET_CONTAINER_DLL_remove (queue_head,
577 queue_tail,
578 queue);
579 GNUNET_assert (NULL != write_task);
580 if (NULL == queue_head)
581 {
582 GNUNET_SCHEDULER_cancel (write_task);
583 write_task = NULL;
584 }
850} 585}
851 586
852 587
@@ -865,7 +600,230 @@ mq_error (void *cls,
865{ 600{
866 struct Queue *queue = cls; 601 struct Queue *queue = cls;
867 602
868 // FIXME: TBD! 603 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
604 "UNIX MQ error in queue to %s: %d\n",
605 GNUNET_i2s (&queue->target),
606 (int) error);
607 queue_destroy (queue);
608}
609
610
611/**
612 * Creates a new outbound queue the transport service will use to send
613 * data to another peer.
614 *
615 * @param peer the target peer
616 * @param un the address
617 * @param un_len number of bytes in @a un
618 * @return the queue or NULL of max connections exceeded
619 */
620static struct Queue *
621setup_queue (const struct GNUNET_PeerIdentity *target,
622 const struct sockaddr_un *un,
623 socklen_t un_len)
624{
625 struct Queue *queue;
626
627 queue = GNUNET_new (struct Queue);
628 queue->target = *target;
629 queue->address = GNUNET_memdup (un,
630 un_len);
631 queue->address_len = un_len;
632 (void) GNUNET_CONTAINER_multipeermap_put (queue_map,
633 &queue->target,
634 queue,
635 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
636 GNUNET_STATISTICS_set (stats,
637 "# queues active",
638 GNUNET_CONTAINER_multipeermap_size (queue_map),
639 GNUNET_NO);
640 queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
641 queue->timeout_task
642 = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
643 &queue_timeout,
644 queue);
645 queue->mq
646 = GNUNET_MQ_queue_for_callbacks (&mq_send,
647 &mq_destroy,
648 &mq_cancel,
649 queue,
650 NULL,
651 &mq_error,
652 queue);
653 {
654 char *foreign_addr;
655
656 if ('\0' == un->sun_path[0])
657 GNUNET_asprintf (&foreign_addr,
658 "%s-@%s",
659 COMMUNICATOR_NAME,
660 &un->sun_path[1]);
661 else
662 GNUNET_asprintf (&foreign_addr,
663 "%s-%s",
664 COMMUNICATOR_NAME,
665 un->sun_path);
666 queue->qh
667 = GNUNET_TRANSPORT_communicator_mq_add (ch,
668 &queue->target,
669 foreign_addr,
670 GNUNET_ATS_NET_LOOPBACK,
671 queue->mq);
672 GNUNET_free (foreign_addr);
673 }
674 return queue;
675}
676
677
678/**
679 * We have been notified that our socket has something to read. Do the
680 * read and reschedule this function to be called again once more is
681 * available.
682 *
683 * @param cls NULL
684 */
685static void
686select_read_cb (void *cls);
687
688
689/**
690 * Function called when message was successfully passed to
691 * transport service. Continue read activity.
692 *
693 * @param cls NULL
694 * @param success #GNUNET_OK on success
695 */
696static void
697receive_complete_cb (void *cls,
698 int success)
699{
700 delivering_messages--;
701 if (GNUNET_OK != success)
702 GNUNET_STATISTICS_update (stats,
703 "# transport transmission failures",
704 1,
705 GNUNET_NO);
706 if ( (NULL == read_task) &&
707 (delivering_messages < max_queue_length) )
708 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
709 unix_sock,
710 &select_read_cb,
711 NULL);
712}
713
714
715/**
716 * We have been notified that our socket has something to read. Do the
717 * read and reschedule this function to be called again once more is
718 * available.
719 *
720 * @param cls NULL
721 */
722static void
723select_read_cb (void *cls)
724{
725 char buf[65536] GNUNET_ALIGN;
726 struct Queue *queue;
727 const struct UNIXMessage *msg;
728 struct sockaddr_un un;
729 socklen_t addrlen;
730 ssize_t ret;
731 uint16_t msize;
732
733 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
734 unix_sock,
735 &select_read_cb,
736 NULL);
737 addrlen = sizeof (un);
738 memset (&un,
739 0,
740 sizeof (un));
741 ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
742 buf,
743 sizeof (buf),
744 (struct sockaddr *) &un,
745 &addrlen);
746 if ( (-1 == ret) &&
747 ( (EAGAIN == errno) ||
748 (ENOBUFS == errno) ) )
749 return;
750 if (-1 == ret)
751 {
752 GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
753 "recvfrom");
754 return;
755 }
756 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
757 "Read %d bytes from socket %s\n",
758 (int) ret,
759 un.sun_path);
760 GNUNET_assert (AF_UNIX == (un.sun_family));
761 msg = (struct UNIXMessage *) buf;
762 msize = ntohs (msg->header.size);
763 if ( (msize < sizeof (struct UNIXMessage)) ||
764 (msize > ret) )
765 {
766 GNUNET_break_op (0);
767 return;
768 }
769 queue = lookup_queue (&msg->sender,
770 &un,
771 addrlen);
772 if (NULL == queue)
773 queue = setup_queue (&msg->sender,
774 &un,
775 addrlen);
776 else
777 reschedule_queue_timeout (queue);
778 if (NULL == queue)
779 {
780 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
781 _("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
782 return;
783 }
784
785 {
786 uint16_t offset = 0;
787 uint16_t tsize = msize - sizeof (struct UNIXMessage);
788 const char *msgbuf = (const char *) &msg[1];
789
790 while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
791 {
792 const struct GNUNET_MessageHeader *currhdr;
793 struct GNUNET_MessageHeader al_hdr;
794 uint16_t csize;
795
796 currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
797 /* ensure aligned access */
798 memcpy (&al_hdr,
799 currhdr,
800 sizeof (al_hdr));
801 csize = ntohs (al_hdr.size);
802 if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
803 (csize > tsize - offset))
804 {
805 GNUNET_break_op (0);
806 break;
807 }
808 ret = GNUNET_TRANSPORT_communicator_receive (ch,
809 &msg->sender,
810 currhdr,
811 &receive_complete_cb,
812 NULL);
813 if (GNUNET_SYSERR == ret)
814 return; /* transport not up */
815 if (GNUNET_NO == ret)
816 break;
817 delivering_messages++;
818 offset += csize;
819 }
820 }
821 if (delivering_messages >= max_queue_length)
822 {
823 /* we should try to apply 'back pressure' */
824 GNUNET_SCHEDULER_cancel (read_task);
825 read_task = NULL;
826 }
869} 827}
870 828
871 829
@@ -889,76 +847,69 @@ mq_error (void *cls,
889static int 847static int
890mq_init (void *cls, 848mq_init (void *cls,
891 const struct GNUNET_PeerIdentity *peer, 849 const struct GNUNET_PeerIdentity *peer,
892 const void *address) 850 const char *address)
893{ 851{
894 struct Queue *queue; 852 struct Queue *queue;
895 char *a; 853 const char *path;
896 char *e; 854 struct sockaddr_un *un;
897 int is_abs;
898 sockaddr_un *un;
899 socklen_t un_len; 855 socklen_t un_len;
900 856
901 if (NULL == strncmp (address, 857 if (0 != strncmp (address,
902 COMMUNICATOR_NAME "-", 858 COMMUNICATOR_NAME "-",
903 strlen (COMMUNICATOR_NAME "-"))) 859 strlen (COMMUNICATOR_NAME "-")))
904 { 860 {
905 GNUNET_break_op (0); 861 GNUNET_break_op (0);
906 return GNUNET_SYSERR; 862 return GNUNET_SYSERR;
907 } 863 }
908 a = GNUNET_strdup (&address[strlen (COMMUNICATOR_NAME "-")]); 864 path = &address[strlen (COMMUNICATOR_NAME "-")];
909 e = strchr (a, 865 un = unix_address_to_sockaddr (path,
910 (unsigned char) '#'); 866 &un_len);
911 if (NULL == e)
912 {
913 GNUNET_free (a);
914 GNUNET_break_op (0);
915 return GNUNET_SYSERR;
916 }
917 is_abs = ('1' == e[1]);
918 *e = '\0';
919 un = unix_address_to_sockaddr (a,
920 &un_len,
921 is_abs);
922 queue = lookup_queue (peer, 867 queue = lookup_queue (peer,
923 un, 868 un,
924 un_len); 869 un_len);
925 if (NULL != queue) 870 if (NULL != queue)
926 { 871 {
927 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 872 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
928 "Address `%s' ignored, queue exists\n", 873 "Address `%s' for %s ignored, queue exists\n",
929 address); 874 path,
875 GNUNET_i2s (peer));
930 GNUNET_free (un); 876 GNUNET_free (un);
931 return GNUNET_OK; 877 return GNUNET_OK;
932 } 878 }
933 queue = GNUNET_new (struct Queue); 879 queue = setup_queue (peer,
934 queue->target = *peer; 880 un,
935 queue->address = un; 881 un_len);
936 queue->address_len = un_len; 882 GNUNET_free (un);
937 (void) GNUNET_CONTAINER_multihashmap_put (queue_map, 883 if (NULL == queue)
938 &queue->target, 884 {
939 queue, 885 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
940 GNUET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 886 "Failed to setup queue to %s at `%s'\n",
941 GNUNET_STATISTICS_set (stats, 887 GNUNET_i2s (peer),
942 "# UNIX queues active", 888 path);
943 GNUNET_CONTAINER_multipeermap_size (plugin->queue_map), 889 return GNUNET_NO;
944 GNUNET_NO); 890 }
945 queue->timeout = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 891 return GNUNET_OK;
946 &queue_timeout, 892}
947 queue); 893
948 queue->mq 894
949 = GNUNET_MQ_queue_for_callbacks (&mq_send, 895/**
950 &mq_destroy, 896 * Iterator over all message queues to clean up.
951 &mq_cancel, 897 *
952 queue, 898 * @param cls NULL
953 NULL, 899 * @param target unused
954 &mq_error, 900 * @param value the queue to destroy
955 queue); 901 * @return #GNUNET_OK to continue to iterate
956 queue->qh 902 */
957 = GNUNET_TRANSPORT_communicator_mq_add (ch, 903static int
958 &queue->target, 904get_queue_delete_it (void *cls,
959 address, 905 const struct GNUNET_PeerIdentity *target,
960 ATS, 906 void *value)
961 queue->mq); 907{
908 struct Queue *queue = value;
909
910 (void) cls;
911 (void) target;
912 queue_destroy (queue);
962 return GNUNET_OK; 913 return GNUNET_OK;
963} 914}
964 915
@@ -971,22 +922,6 @@ mq_init (void *cls,
971static void 922static void
972do_shutdown (void *cls) 923do_shutdown (void *cls)
973{ 924{
974 struct UNIXMessageWrapper *msgw;
975
976 while (NULL != (msgw = msg_head))
977 {
978 GNUNET_CONTAINER_DLL_remove (msg_head,
979 msg_tail,
980 msgw);
981 queue = msgw->queue;
982 queue->msgs_in_queue--;
983 GNUNET_assert (queue->bytes_in_queue >= msgw->msgsize);
984 queue->bytes_in_queue -= msgw->msgsize;
985 GNUNET_assert (bytes_in_queue >= msgw->msgsize);
986 bytes_in_queue -= msgw->msgsize;
987 GNUNET_free (msgw->msg);
988 GNUNET_free (msgw);
989 }
990 if (NULL != read_task) 925 if (NULL != read_task)
991 { 926 {
992 GNUNET_SCHEDULER_cancel (read_task); 927 GNUNET_SCHEDULER_cancel (read_task);
@@ -1017,6 +952,12 @@ do_shutdown (void *cls)
1017 GNUNET_TRANSPORT_communicator_disconnect (ch); 952 GNUNET_TRANSPORT_communicator_disconnect (ch);
1018 ch = NULL; 953 ch = NULL;
1019 } 954 }
955 if (NULL != stats)
956 {
957 GNUNET_STATISTICS_destroy (stats,
958 GNUNET_NO);
959 stats = NULL;
960 }
1020} 961}
1021 962
1022 963
@@ -1035,7 +976,6 @@ run (void *cls,
1035 const struct GNUNET_CONFIGURATION_Handle *cfg) 976 const struct GNUNET_CONFIGURATION_Handle *cfg)
1036{ 977{
1037 char *unix_socket_path; 978 char *unix_socket_path;
1038 int is_abstract;
1039 struct sockaddr_un *un; 979 struct sockaddr_un *un;
1040 socklen_t un_len; 980 socklen_t un_len;
1041 char *my_addr; 981 char *my_addr;
@@ -1059,18 +999,16 @@ run (void *cls,
1059 &max_queue_length)) 999 &max_queue_length))
1060 max_queue_length = DEFAULT_MAX_QUEUE_LENGTH; 1000 max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
1061 1001
1062
1063 /* Initialize my flags */
1064 is_abstract = 0;
1065#ifdef LINUX
1066 is_abstract
1067 = GNUNET_CONFIGURATION_get_value_yesno (cfg,
1068 "testing",
1069 "USE_ABSTRACT_SOCKETS");
1070#endif
1071 un = unix_address_to_sockaddr (unix_socket_path, 1002 un = unix_address_to_sockaddr (unix_socket_path,
1072 &un_len, 1003 &un_len);
1073 is_abstract); 1004 if (NULL == un)
1005 {
1006 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1007 "Failed to setup UNIX domain socket address with path `%s'\n",
1008 unix_socket_path);
1009 GNUNET_free (unix_socket_path);
1010 return;
1011 }
1074 unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX, 1012 unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX,
1075 SOCK_DGRAM, 1013 SOCK_DGRAM,
1076 0); 1014 0);
@@ -1086,9 +1024,9 @@ run (void *cls,
1086 (GNUNET_OK != 1024 (GNUNET_OK !=
1087 GNUNET_DISK_directory_create_for_file (un->sun_path)) ) 1025 GNUNET_DISK_directory_create_for_file (un->sun_path)) )
1088 { 1026 {
1089 LOG (GNUNET_ERROR_TYPE_ERROR, 1027 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1090 _("Cannot create path to `%s'\n"), 1028 _("Cannot create path to `%s'\n"),
1091 un->sun_path); 1029 un->sun_path);
1092 GNUNET_NETWORK_socket_close (unix_sock); 1030 GNUNET_NETWORK_socket_close (unix_sock);
1093 unix_sock = NULL; 1031 unix_sock = NULL;
1094 GNUNET_free (un); 1032 GNUNET_free (un);
@@ -1100,11 +1038,9 @@ run (void *cls,
1100 (const struct sockaddr *) un, 1038 (const struct sockaddr *) un,
1101 un_len)) 1039 un_len))
1102 { 1040 {
1103 GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, 1041 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
1104 "bind"); 1042 "bind",
1105 LOG (GNUNET_ERROR_TYPE_ERROR, 1043 un->sun_path);
1106 _("Cannot bind to `%s'\n"),
1107 un->sun_path);
1108 GNUNET_NETWORK_socket_close (unix_sock); 1044 GNUNET_NETWORK_socket_close (unix_sock);
1109 unix_sock = NULL; 1045 unix_sock = NULL;
1110 GNUNET_free (un); 1046 GNUNET_free (un);
@@ -1112,14 +1048,16 @@ run (void *cls,
1112 return; 1048 return;
1113 } 1049 }
1114 GNUNET_free (un); 1050 GNUNET_free (un);
1115 LOG (GNUNET_ERROR_TYPE_DEBUG, 1051 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116 "Bound to `%s'\n", 1052 "Bound to `%s'\n",
1117 unix_socket_path); 1053 unix_socket_path);
1054 stats = GNUNET_STATISTICS_create ("C-UNIX",
1055 cfg);
1118 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, 1056 GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
1119 NULL); 1057 NULL);
1120 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, 1058 read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
1121 unix_sock, 1059 unix_sock,
1122 &unix_plugin_select_read, 1060 &select_read_cb,
1123 NULL); 1061 NULL);
1124 queue_map = GNUNET_CONTAINER_multipeermap_create (10, 1062 queue_map = GNUNET_CONTAINER_multipeermap_create (10,
1125 GNUNET_NO); 1063 GNUNET_NO);
@@ -1136,10 +1074,9 @@ run (void *cls,
1136 return; 1074 return;
1137 } 1075 }
1138 GNUNET_asprintf (&my_addr, 1076 GNUNET_asprintf (&my_addr,
1139 "%s-%s#%d", 1077 "%s-%s",
1140 COMMUNICATOR_NAME, 1078 COMMUNICATOR_NAME,
1141 unix_socket_path, 1079 unix_socket_path);
1142 is_abstract);
1143 ai = GNUNET_TRANSPORT_communicator_address_add (ch, 1080 ai = GNUNET_TRANSPORT_communicator_address_add (ch,
1144 my_addr, 1081 my_addr,
1145 GNUNET_ATS_NET_LOOPBACK, 1082 GNUNET_ATS_NET_LOOPBACK,