aboutsummaryrefslogtreecommitdiff
path: root/src/dv
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-08-01 22:56:32 +0000
committerChristian Grothoff <christian@grothoff.org>2016-08-01 22:56:32 +0000
commit22be4f71e12c7f2eacafe03d94d4d1c86bc72f0a (patch)
tree9bbaf4db61f8a862476ccb79a88ad6dfed6ef91e /src/dv
parent3d3cedf2e2c41883771cc2170b761840ac26b869 (diff)
downloadgnunet-22be4f71e12c7f2eacafe03d94d4d1c86bc72f0a.tar.gz
gnunet-22be4f71e12c7f2eacafe03d94d4d1c86bc72f0a.zip
-adapting xdht and wdht to new core MQ API
Diffstat (limited to 'src/dv')
-rw-r--r--src/dv/gnunet-service-dv.c366
1 files changed, 143 insertions, 223 deletions
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c
index 2827a17f2..3b79b474f 100644
--- a/src/dv/gnunet-service-dv.c
+++ b/src/dv/gnunet-service-dv.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2013 GNUnet e.V. 3 Copyright (C) 2013, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -125,34 +125,6 @@ GNUNET_NETWORK_STRUCT_END
125 125
126 126
127/** 127/**
128 * Linked list of messages to send to clients.
129 */
130struct PendingMessage
131{
132 /**
133 * Pointer to next item in the list
134 */
135 struct PendingMessage *next;
136
137 /**
138 * Pointer to previous item in the list
139 */
140 struct PendingMessage *prev;
141
142 /**
143 * Actual message to be sent, allocated after this struct.
144 */
145 const struct GNUNET_MessageHeader *msg;
146
147 /**
148 * Next target for the message (a neighbour of ours).
149 */
150 struct GNUNET_PeerIdentity next_target;
151
152};
153
154
155/**
156 * Information about a direct neighbor (core-level, excluding 128 * Information about a direct neighbor (core-level, excluding
157 * DV-links, only DV-enabled peers). 129 * DV-links, only DV-enabled peers).
158 */ 130 */
@@ -162,7 +134,7 @@ struct DirectNeighbor
162 /** 134 /**
163 * Identity of the peer. 135 * Identity of the peer.
164 */ 136 */
165 struct GNUNET_PeerIdentity peer; 137 const struct GNUNET_PeerIdentity *peer;
166 138
167 /** 139 /**
168 * Session ID we use whenever we create a set union with 140 * Session ID we use whenever we create a set union with
@@ -173,19 +145,9 @@ struct DirectNeighbor
173 struct GNUNET_HashCode real_session_id; 145 struct GNUNET_HashCode real_session_id;
174 146
175 /** 147 /**
176 * Head of linked list of messages to send to this peer.
177 */
178 struct PendingMessage *pm_head;
179
180 /**
181 * Tail of linked list of messages to send to this peer.
182 */
183 struct PendingMessage *pm_tail;
184
185 /**
186 * Transmit handle to core service. 148 * Transmit handle to core service.
187 */ 149 */
188 struct GNUNET_CORE_TransmitHandle *cth; 150 struct GNUNET_MQ_Handle *mq;
189 151
190 /** 152 /**
191 * Routing table of the neighbor, NULL if not yet established. 153 * Routing table of the neighbor, NULL if not yet established.
@@ -240,11 +202,6 @@ struct DirectNeighbor
240 unsigned int consensus_insertion_distance; 202 unsigned int consensus_insertion_distance;
241 203
242 /** 204 /**
243 * Number of messages currently in the 'pm_XXXX'-DLL.
244 */
245 unsigned int pm_queue_size;
246
247 /**
248 * Elements in consensus 205 * Elements in consensus
249 */ 206 */
250 unsigned int consensus_elements; 207 unsigned int consensus_elements;
@@ -381,7 +338,7 @@ static struct GNUNET_ATS_PerformanceHandle *ats;
381/** 338/**
382 * Task scheduled to refresh routes based on direct neighbours. 339 * Task scheduled to refresh routes based on direct neighbours.
383 */ 340 */
384static struct GNUNET_SCHEDULER_Task * rr_task; 341static struct GNUNET_SCHEDULER_Task *rr_task;
385 342
386/** 343/**
387 * #GNUNET_YES if we are shutting down. 344 * #GNUNET_YES if we are shutting down.
@@ -549,60 +506,6 @@ send_disconnect_to_plugin (const struct GNUNET_PeerIdentity *target)
549 506
550 507
551/** 508/**
552 * Function called to transfer a message to another peer
553 * via core.
554 *
555 * @param cls closure with the direct neighbor
556 * @param size number of bytes available in buf
557 * @param buf where the callee should write the message
558 * @return number of bytes written to buf
559 */
560static size_t
561core_transmit_notify (void *cls, size_t size, void *buf)
562{
563 struct DirectNeighbor *dn = cls;
564 char *cbuf = buf;
565 struct PendingMessage *pending;
566 size_t off;
567 size_t msize;
568
569 dn->cth = NULL;
570 if (NULL == buf)
571 {
572 /* client disconnected */
573 return 0;
574 }
575 off = 0;
576 while ( (NULL != (pending = dn->pm_head)) &&
577 (size >= off + (msize = ntohs (pending->msg->size))))
578 {
579 dn->pm_queue_size--;
580 GNUNET_CONTAINER_DLL_remove (dn->pm_head,
581 dn->pm_tail,
582 pending);
583 GNUNET_memcpy (&cbuf[off], pending->msg, msize);
584 GNUNET_free (pending);
585 off += msize;
586 }
587 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
588 "Transmitting total of %u bytes to %s\n",
589 (unsigned int) off,
590 GNUNET_i2s (&dn->peer));
591 GNUNET_assert (NULL != core_api);
592 if (NULL != pending)
593 dn->cth =
594 GNUNET_CORE_notify_transmit_ready (core_api,
595 GNUNET_YES /* cork */,
596 GNUNET_CORE_PRIO_BEST_EFFORT,
597 GNUNET_TIME_UNIT_FOREVER_REL,
598 &dn->peer,
599 msize,
600 &core_transmit_notify, dn);
601 return off;
602}
603
604
605/**
606 * Forward the given payload to the given target. 509 * Forward the given payload to the given target.
607 * 510 *
608 * @param target where to send the message 511 * @param target where to send the message
@@ -618,11 +521,10 @@ forward_payload (struct DirectNeighbor *target,
618 const struct GNUNET_PeerIdentity *actual_target, 521 const struct GNUNET_PeerIdentity *actual_target,
619 const struct GNUNET_MessageHeader *payload) 522 const struct GNUNET_MessageHeader *payload)
620{ 523{
621 struct PendingMessage *pm; 524 struct GNUNET_MQ_Envelope *env;
622 struct RouteMessage *rm; 525 struct RouteMessage *rm;
623 size_t msize;
624 526
625 if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) && 527 if ( (GNUNET_MQ_get_length (target->mq) >= MAX_QUEUE_SIZE) &&
626 (0 != memcmp (sender, 528 (0 != memcmp (sender,
627 &my_identity, 529 &my_identity,
628 sizeof (struct GNUNET_PeerIdentity))) ) 530 sizeof (struct GNUNET_PeerIdentity))) )
@@ -630,38 +532,24 @@ forward_payload (struct DirectNeighbor *target,
630 /* not _our_ client and queue is full, drop */ 532 /* not _our_ client and queue is full, drop */
631 GNUNET_STATISTICS_update (stats, 533 GNUNET_STATISTICS_update (stats,
632 "# messages dropped", 534 "# messages dropped",
633 1, GNUNET_NO); 535 1,
536 GNUNET_NO);
634 return; 537 return;
635 } 538 }
636 msize = sizeof (struct RouteMessage) + ntohs (payload->size); 539 if (sizeof (struct RouteMessage) + ntohs (payload->size)
637 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 540 >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
638 { 541 {
639 GNUNET_break (0); 542 GNUNET_break (0);
640 return; 543 return;
641 } 544 }
642 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); 545 env = GNUNET_MQ_msg_nested_mh (rm,
643 pm->next_target = target->peer; 546 GNUNET_MESSAGE_TYPE_DV_ROUTE,
644 pm->msg = (const struct GNUNET_MessageHeader *) &pm[1]; 547 payload);
645 rm = (struct RouteMessage *) &pm[1];
646 rm->header.size = htons ((uint16_t) msize);
647 rm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_ROUTE);
648 rm->distance = htonl (distance); 548 rm->distance = htonl (distance);
649 rm->target = *actual_target; 549 rm->target = *actual_target;
650 rm->sender = *sender; 550 rm->sender = *sender;
651 GNUNET_memcpy (&rm[1], payload, ntohs (payload->size)); 551 GNUNET_MQ_send (target->mq,
652 GNUNET_CONTAINER_DLL_insert_tail (target->pm_head, 552 env);
653 target->pm_tail,
654 pm);
655 target->pm_queue_size++;
656 GNUNET_assert (NULL != core_api);
657 if (NULL == target->cth)
658 target->cth = GNUNET_CORE_notify_transmit_ready (core_api,
659 GNUNET_YES /* cork */,
660 GNUNET_CORE_PRIO_BEST_EFFORT,
661 GNUNET_TIME_UNIT_FOREVER_REL,
662 &target->peer,
663 msize,
664 &core_transmit_notify, target);
665} 553}
666 554
667 555
@@ -790,7 +678,7 @@ build_set (void *cls)
790 /* we have added all elements to the set, run the operation */ 678 /* we have added all elements to the set, run the operation */
791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 679 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792 "Finished building my SET for peer `%s' with %u elements, committing\n", 680 "Finished building my SET for peer `%s' with %u elements, committing\n",
793 GNUNET_i2s (&neighbor->peer), 681 GNUNET_i2s (neighbor->peer),
794 neighbor->consensus_elements); 682 neighbor->consensus_elements);
795 GNUNET_SET_commit (neighbor->set_op, 683 GNUNET_SET_commit (neighbor->set_op,
796 neighbor->my_set); 684 neighbor->my_set);
@@ -809,8 +697,12 @@ build_set (void *cls)
809 697
810 /* Find next non-NULL entry */ 698 /* Find next non-NULL entry */
811 neighbor->consensus_insertion_offset++; 699 neighbor->consensus_insertion_offset++;
812 if ( (0 != memcmp (&target->peer, &my_identity, sizeof (my_identity))) && 700 if ( (0 != memcmp (&target->peer,
813 (0 != memcmp (&target->peer, &neighbor->peer, sizeof (neighbor->peer))) ) 701 &my_identity,
702 sizeof (my_identity))) &&
703 (0 != memcmp (&target->peer,
704 neighbor->peer,
705 sizeof (struct GNUNET_PeerIdentity))) )
814 { 706 {
815 /* Add target if it is not the neighbor or this peer */ 707 /* Add target if it is not the neighbor or this peer */
816 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 708 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -842,26 +734,26 @@ handle_direct_connect (struct DirectNeighbor *neighbor)
842 734
843 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 735 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844 "Direct connection to %s established, routing table exchange begins.\n", 736 "Direct connection to %s established, routing table exchange begins.\n",
845 GNUNET_i2s (&neighbor->peer)); 737 GNUNET_i2s (neighbor->peer));
846 GNUNET_STATISTICS_update (stats, 738 GNUNET_STATISTICS_update (stats,
847 "# peers connected (1-hop)", 739 "# peers connected (1-hop)",
848 1, GNUNET_NO); 740 1, GNUNET_NO);
849 route = GNUNET_CONTAINER_multipeermap_get (all_routes, 741 route = GNUNET_CONTAINER_multipeermap_get (all_routes,
850 &neighbor->peer); 742 neighbor->peer);
851 if (NULL != route) 743 if (NULL != route)
852 { 744 {
853 GNUNET_assert (GNUNET_YES == 745 GNUNET_assert (GNUNET_YES ==
854 GNUNET_CONTAINER_multipeermap_remove (all_routes, 746 GNUNET_CONTAINER_multipeermap_remove (all_routes,
855 &neighbor->peer, 747 neighbor->peer,
856 route)); 748 route));
857 send_disconnect_to_plugin (&neighbor->peer); 749 send_disconnect_to_plugin (neighbor->peer);
858 release_route (route); 750 release_route (route);
859 GNUNET_free (route); 751 GNUNET_free (route);
860 } 752 }
861 753
862 neighbor->direct_route = GNUNET_new (struct Route); 754 neighbor->direct_route = GNUNET_new (struct Route);
863 neighbor->direct_route->next_hop = neighbor; 755 neighbor->direct_route->next_hop = neighbor;
864 neighbor->direct_route->target.peer = neighbor->peer; 756 neighbor->direct_route->target.peer = *neighbor->peer;
865 allocate_route (neighbor->direct_route, DIRECT_NEIGHBOR_COST); 757 allocate_route (neighbor->direct_route, DIRECT_NEIGHBOR_COST);
866 758
867 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -870,8 +762,12 @@ handle_direct_connect (struct DirectNeighbor *neighbor)
870 762
871 763
872 /* construct session ID seed as XOR of both peer's identities */ 764 /* construct session ID seed as XOR of both peer's identities */
873 GNUNET_CRYPTO_hash (&my_identity, sizeof (my_identity), &h1); 765 GNUNET_CRYPTO_hash (&my_identity,
874 GNUNET_CRYPTO_hash (&neighbor->peer, sizeof (struct GNUNET_PeerIdentity), &h2); 766 sizeof (my_identity),
767 &h1);
768 GNUNET_CRYPTO_hash (neighbor->peer,
769 sizeof (struct GNUNET_PeerIdentity),
770 &h2);
875 GNUNET_CRYPTO_hash_xor (&h1, 771 GNUNET_CRYPTO_hash_xor (&h1,
876 &h2, 772 &h2,
877 &session_id); 773 &session_id);
@@ -919,16 +815,21 @@ handle_direct_connect (struct DirectNeighbor *neighbor)
919 * 815 *
920 * @param cls closure 816 * @param cls closure
921 * @param peer peer identity this notification is about 817 * @param peer peer identity this notification is about
818 * @param mq message queue for sending data to @a peer
819 * @return our `struct DirectNeighbour` for this peer
922 */ 820 */
923static void 821static void *
924handle_core_connect (void *cls, 822handle_core_connect (void *cls,
925 const struct GNUNET_PeerIdentity *peer) 823 const struct GNUNET_PeerIdentity *peer,
824 struct GNUNET_MQ_Handle *mq)
926{ 825{
927 struct DirectNeighbor *neighbor; 826 struct DirectNeighbor *neighbor;
928 827
929 /* Check for connect to self message */ 828 /* Check for connect to self message */
930 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) 829 if (0 == memcmp (&my_identity,
931 return; 830 peer,
831 sizeof (struct GNUNET_PeerIdentity)))
832 return NULL;
932 /* check if entry exists */ 833 /* check if entry exists */
933 neighbor = GNUNET_CONTAINER_multipeermap_get (direct_neighbors, 834 neighbor = GNUNET_CONTAINER_multipeermap_get (direct_neighbors,
934 peer); 835 peer);
@@ -942,23 +843,24 @@ handle_core_connect (void *cls,
942 GNUNET_i2s (peer), 843 GNUNET_i2s (peer),
943 (unsigned int) neighbor->distance); 844 (unsigned int) neighbor->distance);
944 if (DIRECT_NEIGHBOR_COST != neighbor->distance) 845 if (DIRECT_NEIGHBOR_COST != neighbor->distance)
945 return; 846 return NULL;
946 handle_direct_connect (neighbor); 847 handle_direct_connect (neighbor);
947 return; 848 return NULL;
948 } 849 }
949 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 850 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
950 "Core connected to %s (distance unknown)\n", 851 "Core connected to %s (distance unknown)\n",
951 GNUNET_i2s (peer)); 852 GNUNET_i2s (peer));
952 neighbor = GNUNET_new (struct DirectNeighbor); 853 neighbor = GNUNET_new (struct DirectNeighbor);
953 neighbor->peer = *peer; 854 neighbor->peer = peer;
954 GNUNET_assert (GNUNET_YES == 855 GNUNET_assert (GNUNET_YES ==
955 GNUNET_CONTAINER_multipeermap_put (direct_neighbors, 856 GNUNET_CONTAINER_multipeermap_put (direct_neighbors,
956 peer, 857 neighbor->peer,
957 neighbor, 858 neighbor,
958 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 859 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
959 neighbor->connected = GNUNET_YES; 860 neighbor->connected = GNUNET_YES;
960 neighbor->distance = 0; /* unknown */ 861 neighbor->distance = 0; /* unknown */
961 neighbor->network = GNUNET_ATS_NET_UNSPECIFIED; 862 neighbor->network = GNUNET_ATS_NET_UNSPECIFIED;
863 return neighbor;
962} 864}
963 865
964 866
@@ -1613,7 +1515,7 @@ listen_set_union (void *cls,
1613 return; /* why??? */ 1515 return; /* why??? */
1614 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1516 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1615 "Starting to create consensus with %s\n", 1517 "Starting to create consensus with %s\n",
1616 GNUNET_i2s (&neighbor->peer)); 1518 GNUNET_i2s (neighbor->peer));
1617 if (NULL != neighbor->set_op) 1519 if (NULL != neighbor->set_op)
1618 { 1520 {
1619 GNUNET_SET_operation_cancel (neighbor->set_op); 1521 GNUNET_SET_operation_cancel (neighbor->set_op);
@@ -1668,22 +1570,46 @@ initiate_set_union (void *cls)
1668 1570
1669 1571
1670/** 1572/**
1573 * Check that @a rm is well-formed.
1574 *
1575 * @param cls closure
1576 * @param rm the message
1577 * @return #GNUNET_OK if @a rm is well-formed.
1578 */
1579static int
1580check_dv_route_message (void *cls,
1581 const struct RouteMessage *rm)
1582{
1583 const struct GNUNET_MessageHeader *payload;
1584
1585 if (ntohs (rm->header.size) < sizeof (struct RouteMessage) + sizeof (struct GNUNET_MessageHeader))
1586 {
1587 GNUNET_break_op (0);
1588 return GNUNET_SYSERR;
1589 }
1590 payload = (const struct GNUNET_MessageHeader *) &rm[1];
1591 if (ntohs (rm->header.size) != sizeof (struct RouteMessage) + ntohs (payload->size))
1592 {
1593 GNUNET_break_op (0);
1594 return GNUNET_SYSERR;
1595 }
1596 return GNUNET_OK;
1597}
1598
1599
1600/**
1671 * Core handler for DV data messages. Whatever this message 1601 * Core handler for DV data messages. Whatever this message
1672 * contains all we really have to do is rip it out of its 1602 * contains all we really have to do is rip it out of its
1673 * DV layering and give it to our pal the DV plugin to report 1603 * DV layering and give it to our pal the DV plugin to report
1674 * in with. 1604 * in with.
1675 * 1605 *
1676 * @param cls closure 1606 * @param cls closure
1677 * @param peer peer which sent the message (immediate sender) 1607 * @param rm the message
1678 * @param message the message
1679 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the other peer violated the protocol
1680 */ 1608 */
1681static int 1609static void
1682handle_dv_route_message (void *cls, 1610handle_dv_route_message (void *cls,
1683 const struct GNUNET_PeerIdentity *peer, 1611 const struct RouteMessage *rm)
1684 const struct GNUNET_MessageHeader *message)
1685{ 1612{
1686 const struct RouteMessage *rm;
1687 const struct GNUNET_MessageHeader *payload; 1613 const struct GNUNET_MessageHeader *payload;
1688 struct Route *route; 1614 struct Route *route;
1689 struct DirectNeighbor *neighbor; 1615 struct DirectNeighbor *neighbor;
@@ -1695,19 +1621,8 @@ handle_dv_route_message (void *cls,
1695 char prev[5]; 1621 char prev[5];
1696 char dst[5]; 1622 char dst[5];
1697 1623
1698 if (ntohs (message->size) < sizeof (struct RouteMessage) + sizeof (struct GNUNET_MessageHeader))
1699 {
1700 GNUNET_break_op (0);
1701 return GNUNET_SYSERR;
1702 }
1703 rm = (const struct RouteMessage *) message;
1704 distance = ntohl (rm->distance); 1624 distance = ntohl (rm->distance);
1705 payload = (const struct GNUNET_MessageHeader *) &rm[1]; 1625 payload = (const struct GNUNET_MessageHeader *) &rm[1];
1706 if (ntohs (message->size) != sizeof (struct RouteMessage) + ntohs (payload->size))
1707 {
1708 GNUNET_break_op (0);
1709 return GNUNET_SYSERR;
1710 }
1711 strncpy (prev, GNUNET_i2s (peer), 4); 1626 strncpy (prev, GNUNET_i2s (peer), 4);
1712 strncpy (me, GNUNET_i2s (&my_identity), 4); 1627 strncpy (me, GNUNET_i2s (&my_identity), 4);
1713 strncpy (src, GNUNET_i2s (&rm->sender), 4); 1628 strncpy (src, GNUNET_i2s (&rm->sender), 4);
@@ -1715,19 +1630,22 @@ handle_dv_route_message (void *cls,
1715 prev[4] = me[4] = src[4] = dst[4] = '\0'; 1630 prev[4] = me[4] = src[4] = dst[4] = '\0';
1716 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1631 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1717 "Handling DV message with %u bytes payload of type %u from %s to %s routed by %s to me (%s @ hop %u)\n", 1632 "Handling DV message with %u bytes payload of type %u from %s to %s routed by %s to me (%s @ hop %u)\n",
1718 (unsigned int) (ntohs (message->size) - sizeof (struct RouteMessage)), 1633 (unsigned int) (ntohs (rm->header.size) - sizeof (struct RouteMessage)),
1719 ntohs (payload->type), 1634 ntohs (payload->type),
1720 src, dst, 1635 src,
1721 prev, me, 1636 dst,
1637 prev,
1638 me,
1722 (unsigned int) distance + 1); 1639 (unsigned int) distance + 1);
1723 1640
1724 if (0 == memcmp (&rm->target, 1641 if (0 == memcmp (&rm->target,
1725 &my_identity, 1642 &my_identity,
1726 sizeof (struct GNUNET_PeerIdentity))) 1643 sizeof (struct GNUNET_PeerIdentity)))
1727 { 1644 {
1728 if ((NULL 1645 if ((NULL !=
1729 != (dn = GNUNET_CONTAINER_multipeermap_get (direct_neighbors, 1646 (dn = GNUNET_CONTAINER_multipeermap_get (direct_neighbors,
1730 &rm->sender))) && (DIRECT_NEIGHBOR_COST == dn->distance)) 1647 &rm->sender))) &&
1648 (DIRECT_NEIGHBOR_COST == dn->distance))
1731 { 1649 {
1732 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1650 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1733 "Discarding DV message, as %s is a direct neighbor\n", 1651 "Discarding DV message, as %s is a direct neighbor\n",
@@ -1735,7 +1653,7 @@ handle_dv_route_message (void *cls,
1735 GNUNET_STATISTICS_update (stats, 1653 GNUNET_STATISTICS_update (stats,
1736 "# messages discarded (direct neighbor)", 1654 "# messages discarded (direct neighbor)",
1737 1, GNUNET_NO); 1655 1, GNUNET_NO);
1738 return GNUNET_OK; 1656 return;
1739 } 1657 }
1740 /* message is for me, check reverse route! */ 1658 /* message is for me, check reverse route! */
1741 route = GNUNET_CONTAINER_multipeermap_get (all_routes, 1659 route = GNUNET_CONTAINER_multipeermap_get (all_routes,
@@ -1749,7 +1667,7 @@ handle_dv_route_message (void *cls,
1749 if (NULL == neighbor) 1667 if (NULL == neighbor)
1750 { 1668 {
1751 GNUNET_break (0); 1669 GNUNET_break (0);
1752 return GNUNET_OK; 1670 return;
1753 } 1671 }
1754 target = GNUNET_new (struct Target); 1672 target = GNUNET_new (struct Target);
1755 target->peer = rm->sender; 1673 target->peer = rm->sender;
@@ -1768,7 +1686,7 @@ handle_dv_route_message (void *cls,
1768 { 1686 {
1769 GNUNET_break_op (0); 1687 GNUNET_break_op (0);
1770 GNUNET_free (target); 1688 GNUNET_free (target);
1771 return GNUNET_OK; 1689 return;
1772 } 1690 }
1773 add_new_route (target, neighbor); 1691 add_new_route (target, neighbor);
1774 } 1692 }
@@ -1779,7 +1697,7 @@ handle_dv_route_message (void *cls,
1779 send_data_to_plugin (payload, 1697 send_data_to_plugin (payload,
1780 &rm->sender, 1698 &rm->sender,
1781 1 + distance); 1699 1 + distance);
1782 return GNUNET_OK; 1700 return;
1783 } 1701 }
1784 if ( (NULL == GNUNET_CONTAINER_multipeermap_get (direct_neighbors, 1702 if ( (NULL == GNUNET_CONTAINER_multipeermap_get (direct_neighbors,
1785 &rm->sender)) && 1703 &rm->sender)) &&
@@ -1795,7 +1713,7 @@ handle_dv_route_message (void *cls,
1795 if (NULL == neighbor) 1713 if (NULL == neighbor)
1796 { 1714 {
1797 GNUNET_break (0); 1715 GNUNET_break (0);
1798 return GNUNET_OK; 1716 return;
1799 } 1717 }
1800 target = GNUNET_new (struct Target); 1718 target = GNUNET_new (struct Target);
1801 target->peer = rm->sender; 1719 target->peer = rm->sender;
@@ -1810,7 +1728,7 @@ handle_dv_route_message (void *cls,
1810 { 1728 {
1811 GNUNET_break_op (0); 1729 GNUNET_break_op (0);
1812 GNUNET_free (target); 1730 GNUNET_free (target);
1813 return GNUNET_OK; 1731 return;
1814 } 1732 }
1815 add_new_route (target, neighbor); 1733 add_new_route (target, neighbor);
1816 } 1734 }
@@ -1830,7 +1748,7 @@ handle_dv_route_message (void *cls,
1830 GNUNET_STATISTICS_update (stats, 1748 GNUNET_STATISTICS_update (stats,
1831 "# messages discarded (no route)", 1749 "# messages discarded (no route)",
1832 1, GNUNET_NO); 1750 1, GNUNET_NO);
1833 return GNUNET_OK; 1751 return;
1834 } 1752 }
1835 } 1753 }
1836 else 1754 else
@@ -1839,13 +1757,12 @@ handle_dv_route_message (void *cls,
1839 } 1757 }
1840 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1758 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1841 "Forwarding message to %s\n", 1759 "Forwarding message to %s\n",
1842 GNUNET_i2s (&neighbor->peer)); 1760 GNUNET_i2s (neighbor->peer));
1843 forward_payload (neighbor, 1761 forward_payload (neighbor,
1844 distance + 1, 1762 distance + 1,
1845 &rm->sender, 1763 &rm->sender,
1846 &rm->target, 1764 &rm->target,
1847 payload); 1765 payload);
1848 return GNUNET_OK;
1849} 1766}
1850 1767
1851 1768
@@ -1918,20 +1835,10 @@ handle_dv_send_message (void *cls,
1918static void 1835static void
1919cleanup_neighbor (struct DirectNeighbor *neighbor) 1836cleanup_neighbor (struct DirectNeighbor *neighbor)
1920{ 1837{
1921 struct PendingMessage *pending;
1922
1923 while (NULL != (pending = neighbor->pm_head))
1924 {
1925 neighbor->pm_queue_size--;
1926 GNUNET_CONTAINER_DLL_remove (neighbor->pm_head,
1927 neighbor->pm_tail,
1928 pending);
1929 GNUNET_free (pending);
1930 }
1931 handle_direct_disconnect (neighbor); 1838 handle_direct_disconnect (neighbor);
1932 GNUNET_assert (GNUNET_YES == 1839 GNUNET_assert (GNUNET_YES ==
1933 GNUNET_CONTAINER_multipeermap_remove (direct_neighbors, 1840 GNUNET_CONTAINER_multipeermap_remove (direct_neighbors,
1934 &neighbor->peer, 1841 neighbor->peer,
1935 neighbor)); 1842 neighbor));
1936 GNUNET_free (neighbor); 1843 GNUNET_free (neighbor);
1937} 1844}
@@ -1942,36 +1849,31 @@ cleanup_neighbor (struct DirectNeighbor *neighbor)
1942 * 1849 *
1943 * @param cls closure 1850 * @param cls closure
1944 * @param peer peer identity this notification is about 1851 * @param peer peer identity this notification is about
1852 * @param internal_cls the corresponding `struct DirectNeighbor`
1945 */ 1853 */
1946static void 1854static void
1947handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) 1855handle_core_disconnect (void *cls,
1856 const struct GNUNET_PeerIdentity *peer,
1857 void *internal_cls)
1948{ 1858{
1949 struct DirectNeighbor *neighbor; 1859 struct DirectNeighbor *neighbor = internal_cls;
1950 1860
1951 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1861 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1952 "Received core peer disconnect message for peer `%s'!\n", 1862 "Received core peer disconnect message for peer `%s'!\n",
1953 GNUNET_i2s (peer)); 1863 GNUNET_i2s (peer));
1954 /* Check for disconnect from self message */ 1864 /* Check for disconnect from self message */
1955 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
1956 return;
1957 neighbor =
1958 GNUNET_CONTAINER_multipeermap_get (direct_neighbors, peer);
1959 if (NULL == neighbor) 1865 if (NULL == neighbor)
1960 {
1961 GNUNET_break (0);
1962 return; 1866 return;
1963 }
1964 GNUNET_break (GNUNET_YES == neighbor->connected); 1867 GNUNET_break (GNUNET_YES == neighbor->connected);
1965 neighbor->connected = GNUNET_NO; 1868 neighbor->connected = GNUNET_NO;
1966 if (DIRECT_NEIGHBOR_COST == neighbor->distance) 1869 if (DIRECT_NEIGHBOR_COST == neighbor->distance)
1967 { 1870 {
1968
1969 GNUNET_STATISTICS_update (stats, 1871 GNUNET_STATISTICS_update (stats,
1970 "# peers connected (1-hop)", 1872 "# peers connected (1-hop)",
1971 -1, GNUNET_NO); 1873 -1,
1874 GNUNET_NO);
1972 } 1875 }
1973 cleanup_neighbor (neighbor); 1876 cleanup_neighbor (neighbor);
1974
1975 if (GNUNET_YES == in_shutdown) 1877 if (GNUNET_YES == in_shutdown)
1976 return; 1878 return;
1977 schedule_refresh_routes (); 1879 schedule_refresh_routes ();
@@ -2035,14 +1937,16 @@ shutdown_task (void *cls)
2035 1937
2036 in_shutdown = GNUNET_YES; 1938 in_shutdown = GNUNET_YES;
2037 GNUNET_assert (NULL != core_api); 1939 GNUNET_assert (NULL != core_api);
2038 GNUNET_CORE_disconnect (core_api); 1940 GNUNET_CORE_disconnecT (core_api);
2039 core_api = NULL; 1941 core_api = NULL;
2040 GNUNET_ATS_performance_done (ats); 1942 GNUNET_ATS_performance_done (ats);
2041 ats = NULL; 1943 ats = NULL;
2042 GNUNET_CONTAINER_multipeermap_iterate (direct_neighbors, 1944 GNUNET_CONTAINER_multipeermap_iterate (direct_neighbors,
2043 &free_direct_neighbors, NULL); 1945 &free_direct_neighbors,
1946 NULL);
2044 GNUNET_CONTAINER_multipeermap_iterate (all_routes, 1947 GNUNET_CONTAINER_multipeermap_iterate (all_routes,
2045 &free_route, NULL); 1948 &free_route,
1949 NULL);
2046 GNUNET_CONTAINER_multipeermap_destroy (direct_neighbors); 1950 GNUNET_CONTAINER_multipeermap_destroy (direct_neighbors);
2047 GNUNET_CONTAINER_multipeermap_destroy (all_routes); 1951 GNUNET_CONTAINER_multipeermap_destroy (all_routes);
2048 GNUNET_STATISTICS_destroy (stats, GNUNET_NO); 1952 GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
@@ -2103,7 +2007,8 @@ notify_client_about_route (void *cls,
2103 * @param message the actual message 2007 * @param message the actual message
2104 */ 2008 */
2105static void 2009static void
2106handle_start (void *cls, struct GNUNET_SERVER_Client *client, 2010handle_start (void *cls,
2011 struct GNUNET_SERVER_Client *client,
2107 const struct GNUNET_MessageHeader *message) 2012 const struct GNUNET_MessageHeader *message)
2108{ 2013{
2109 GNUNET_SERVER_notification_context_add (nc, client); 2014 GNUNET_SERVER_notification_context_add (nc, client);
@@ -2139,12 +2044,16 @@ core_init (void *cls,
2139 * @param c configuration to use 2044 * @param c configuration to use
2140 */ 2045 */
2141static void 2046static void
2142run (void *cls, struct GNUNET_SERVER_Handle *server, 2047run (void *cls,
2048 struct GNUNET_SERVER_Handle *server,
2143 const struct GNUNET_CONFIGURATION_Handle *c) 2049 const struct GNUNET_CONFIGURATION_Handle *c)
2144{ 2050{
2145 static struct GNUNET_CORE_MessageHandler core_handlers[] = { 2051 GNUNET_MQ_hd_var_size (dv_route_message,
2146 {&handle_dv_route_message, GNUNET_MESSAGE_TYPE_DV_ROUTE, 0}, 2052 GNUNET_MESSAGE_TYPE_DV_ROUTE,
2147 {NULL, 0, 0} 2053 struct RouteMessage);
2054 struct GNUNET_MQ_MessageHandler core_handlers[] = {
2055 make_dv_route_message_handler (NULL),
2056 GNUNET_MQ_handler_end ()
2148 }; 2057 };
2149 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { 2058 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
2150 {&handle_start, NULL, 2059 {&handle_start, NULL,
@@ -2157,30 +2066,36 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
2157 }; 2066 };
2158 in_shutdown = GNUNET_NO; 2067 in_shutdown = GNUNET_NO;
2159 cfg = c; 2068 cfg = c;
2160 direct_neighbors = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); 2069 direct_neighbors = GNUNET_CONTAINER_multipeermap_create (128,
2161 all_routes = GNUNET_CONTAINER_multipeermap_create (65536, GNUNET_NO); 2070 GNUNET_NO);
2162 core_api = GNUNET_CORE_connect (cfg, NULL, 2071 all_routes = GNUNET_CONTAINER_multipeermap_create (65536,
2072 GNUNET_NO);
2073 core_api = GNUNET_CORE_connecT (cfg,
2074 NULL,
2163 &core_init, 2075 &core_init,
2164 &handle_core_connect, 2076 &handle_core_connect,
2165 &handle_core_disconnect, 2077 &handle_core_disconnect,
2166 NULL, GNUNET_NO,
2167 NULL, GNUNET_NO,
2168 core_handlers); 2078 core_handlers);
2169 2079
2170 if (NULL == core_api) 2080 if (NULL == core_api)
2171 return; 2081 return;
2172 ats = GNUNET_ATS_performance_init (cfg, &handle_ats_update, NULL); 2082 ats = GNUNET_ATS_performance_init (cfg,
2083 &handle_ats_update,
2084 NULL);
2173 if (NULL == ats) 2085 if (NULL == ats)
2174 { 2086 {
2175 GNUNET_CORE_disconnect (core_api); 2087 GNUNET_CORE_disconnecT (core_api);
2176 core_api = NULL; 2088 core_api = NULL;
2177 return; 2089 return;
2178 } 2090 }
2179 nc = GNUNET_SERVER_notification_context_create (server, 2091 nc = GNUNET_SERVER_notification_context_create (server,
2180 MAX_QUEUE_SIZE_PLUGIN); 2092 MAX_QUEUE_SIZE_PLUGIN);
2181 stats = GNUNET_STATISTICS_create ("dv", cfg); 2093 stats = GNUNET_STATISTICS_create ("dv",
2182 GNUNET_SERVER_add_handlers (server, plugin_handlers); 2094 cfg);
2183 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); 2095 GNUNET_SERVER_add_handlers (server,
2096 plugin_handlers);
2097 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2098 NULL);
2184} 2099}
2185 2100
2186 2101
@@ -2192,11 +2107,16 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
2192 * @return 0 ok, 1 on error 2107 * @return 0 ok, 1 on error
2193 */ 2108 */
2194int 2109int
2195main (int argc, char *const *argv) 2110main (int argc,
2111 char *const *argv)
2196{ 2112{
2197 return (GNUNET_OK == 2113 return (GNUNET_OK ==
2198 GNUNET_SERVICE_run (argc, argv, "dv", GNUNET_SERVICE_OPTION_NONE, 2114 GNUNET_SERVICE_run (argc,
2199 &run, NULL)) ? 0 : 1; 2115 argv,
2116 "dv",
2117 GNUNET_SERVICE_OPTION_NONE,
2118 &run,
2119 NULL)) ? 0 : 1;
2200} 2120}
2201 2121
2202/* end of gnunet-service-dv.c */ 2122/* end of gnunet-service-dv.c */