aboutsummaryrefslogtreecommitdiff
path: root/src/dv
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2013-03-14 13:46:14 +0000
committerChristian Grothoff <christian@grothoff.org>2013-03-14 13:46:14 +0000
commitfcba8b93e2564cfb9246083d06e697753f792d82 (patch)
treed56e4fd4aaf61ddd10101b39c3a8312a0f3990c9 /src/dv
parent230237efe134ec2e826ae9976cadfc582b735338 (diff)
downloadgnunet-fcba8b93e2564cfb9246083d06e697753f792d82.tar.gz
gnunet-fcba8b93e2564cfb9246083d06e697753f792d82.zip
-more DV hacking
Diffstat (limited to 'src/dv')
-rw-r--r--src/dv/Makefile.am1
-rw-r--r--src/dv/gnunet-service-dv.c197
2 files changed, 183 insertions, 15 deletions
diff --git a/src/dv/Makefile.am b/src/dv/Makefile.am
index e9e2bba3f..3c3998535 100644
--- a/src/dv/Makefile.am
+++ b/src/dv/Makefile.am
@@ -38,6 +38,7 @@ libexec_PROGRAMS = \
38gnunet_service_dv_SOURCES = \ 38gnunet_service_dv_SOURCES = \
39 gnunet-service-dv.c dv.h 39 gnunet-service-dv.c dv.h
40gnunet_service_dv_LDADD = \ 40gnunet_service_dv_LDADD = \
41 $(top_builddir)/src/consensus/libgnunetconsensus.la \
41 $(top_builddir)/src/statistics/libgnunetstatistics.la \ 42 $(top_builddir)/src/statistics/libgnunetstatistics.la \
42 $(top_builddir)/src/core/libgnunetcore.la \ 43 $(top_builddir)/src/core/libgnunetcore.la \
43 $(top_builddir)/src/util/libgnunetutil.la \ 44 $(top_builddir)/src/util/libgnunetutil.la \
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c
index b1cf51dd9..2bf923ccc 100644
--- a/src/dv/gnunet-service-dv.c
+++ b/src/dv/gnunet-service-dv.c
@@ -43,6 +43,11 @@
43#define GNUNET_DV_CONSENSUS_FREQUENCY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)) 43#define GNUNET_DV_CONSENSUS_FREQUENCY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5))
44 44
45/** 45/**
46 * Maximum number of messages we queue per peer.
47 */
48#define MAX_QUEUE_SIZE 16
49
50/**
46 * The default fisheye depth, from how many hops away will 51 * The default fisheye depth, from how many hops away will
47 * we keep peers? 52 * we keep peers?
48 */ 53 */
@@ -101,6 +106,11 @@ struct RouteMessage
101 */ 106 */
102 struct GNUNET_PeerIdentity target; 107 struct GNUNET_PeerIdentity target;
103 108
109 /**
110 * The (actual) sender of the message.
111 */
112 struct GNUNET_PeerIdentity sender;
113
104}; 114};
105 115
106GNUNET_NETWORK_STRUCT_END 116GNUNET_NETWORK_STRUCT_END
@@ -190,6 +200,12 @@ struct DirectNeighbor
190 struct GNUNET_CONSENSUS_Handle *consensus; 200 struct GNUNET_CONSENSUS_Handle *consensus;
191 201
192 /** 202 /**
203 * ID of the task we use to (periodically) update our consensus
204 * with this peer.
205 */
206 GNUNET_SCHEDULER_TaskIdentifier consensus_task;
207
208 /**
193 * At what offset are we, with respect to inserting our own routes 209 * At what offset are we, with respect to inserting our own routes
194 * into the consensus? 210 * into the consensus?
195 */ 211 */
@@ -201,6 +217,11 @@ struct DirectNeighbor
201 */ 217 */
202 unsigned int consensus_insertion_distance; 218 unsigned int consensus_insertion_distance;
203 219
220 /**
221 * Number of messages currently in the 'pm_XXXX'-DLL.
222 */
223 unsigned int pm_queue_size;
224
204}; 225};
205 226
206 227
@@ -272,12 +293,6 @@ static struct GNUNET_CONTAINER_MultiHashMap *all_routes;
272static struct ConsensusSet consensi[DEFAULT_FISHEYE_DEPTH - 1]; 293static struct ConsensusSet consensi[DEFAULT_FISHEYE_DEPTH - 1];
273 294
274/** 295/**
275 * ID of the task we use to (periodically) update our consensus
276 * with other peers.
277 */
278static GNUNET_SCHEDULER_Task consensus_task;
279
280/**
281 * Handle to the core service api. 296 * Handle to the core service api.
282 */ 297 */
283static struct GNUNET_CORE_Handle *core_api; 298static struct GNUNET_CORE_Handle *core_api;
@@ -319,11 +334,6 @@ static struct PendingMessage *plugin_pending_tail;
319 */ 334 */
320struct GNUNET_STATISTICS_Handle *stats; 335struct GNUNET_STATISTICS_Handle *stats;
321 336
322/**
323 * How far out to keep peers we learn about.
324 */
325static unsigned long long fisheye_depth;
326
327 337
328/** 338/**
329 * Get distance information from 'atsi'. 339 * Get distance information from 'atsi'.
@@ -400,7 +410,7 @@ transmit_to_plugin (void *cls, size_t size, void *buf)
400 */ 410 */
401static void 411static void
402send_data_to_plugin (const struct GNUNET_MessageHeader *message, 412send_data_to_plugin (const struct GNUNET_MessageHeader *message,
403 struct GNUNET_PeerIdentity *distant_neighbor, 413 const struct GNUNET_PeerIdentity *distant_neighbor,
404 uint32_t distance) 414 uint32_t distance)
405{ 415{
406 struct GNUNET_DV_ReceivedMessage *received_msg; 416 struct GNUNET_DV_ReceivedMessage *received_msg;
@@ -487,7 +497,7 @@ send_control_to_plugin (const struct GNUNET_MessageHeader *message)
487 * @param uid plugin-chosen UID for the message 497 * @param uid plugin-chosen UID for the message
488 */ 498 */
489static void 499static void
490send_ack_to_plugin (struct GNUNET_PeerIdentity *target, 500send_ack_to_plugin (const struct GNUNET_PeerIdentity *target,
491 uint32_t uid) 501 uint32_t uid)
492{ 502{
493 struct GNUNET_DV_AckMessage ack_msg; 503 struct GNUNET_DV_AckMessage ack_msg;
@@ -581,6 +591,7 @@ core_transmit_notify (void *cls, size_t size, void *buf)
581 while ( (NULL != (pending = dn->pm_head)) && 591 while ( (NULL != (pending = dn->pm_head)) &&
582 (size >= off + (msize = ntohs (pending->msg->size)))) 592 (size >= off + (msize = ntohs (pending->msg->size))))
583 { 593 {
594 dn->pm_queue_size--;
584 GNUNET_CONTAINER_DLL_remove (dn->pm_head, 595 GNUNET_CONTAINER_DLL_remove (dn->pm_head,
585 dn->pm_tail, 596 dn->pm_tail,
586 pending); 597 pending);
@@ -604,6 +615,59 @@ core_transmit_notify (void *cls, size_t size, void *buf)
604 615
605 616
606/** 617/**
618 * Forward the given payload to the given target.
619 *
620 * @param target where to send the message
621 * @param distance expected (remaining) distance to the target
622 * @param sender original sender of the message
623 * @param payload payload of the message
624 */
625static void
626forward_payload (struct DirectNeighbor *target,
627 uint32_t distance,
628 const struct GNUNET_PeerIdentity *sender,
629 const struct GNUNET_MessageHeader *payload)
630{
631 struct PendingMessage *pm;
632 struct RouteMessage *rm;
633 size_t msize;
634
635 if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) &&
636 (0 != memcmp (sender,
637 &my_identity,
638 sizeof (struct GNUNET_PeerIdentity))) )
639 return;
640 msize = sizeof (struct RouteMessage) + ntohs (payload->size);
641 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
642 {
643 GNUNET_break (0);
644 return;
645 }
646 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
647 pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
648 rm = (struct RouteMessage *) &pm[1];
649 rm->header.size = htons ((uint16_t) msize);
650 rm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_ROUTE);
651 rm->distance = htonl (distance);
652 rm->target = target->peer;
653 rm->sender = *sender;
654 memcpy (&rm[1], payload, ntohs (payload->size));
655 GNUNET_CONTAINER_DLL_insert_tail (target->pm_head,
656 target->pm_tail,
657 pm);
658 target->pm_queue_size++;
659 if (NULL == target->cth)
660 target->cth = GNUNET_CORE_notify_transmit_ready (core_api,
661 GNUNET_YES /* cork */,
662 0 /* priority */,
663 GNUNET_TIME_UNIT_FOREVER_REL,
664 &target->peer,
665 msize,
666 &core_transmit_notify, target);
667}
668
669
670/**
607 * Find a free slot for storing a 'route' in the 'consensi' 671 * Find a free slot for storing a 'route' in the 'consensi'
608 * set at the given distance. 672 * set at the given distance.
609 * 673 *
@@ -741,6 +805,7 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
741 * @param message the message 805 * @param message the message
742 * @param atsi transport ATS information (latency, distance, etc.) 806 * @param atsi transport ATS information (latency, distance, etc.)
743 * @param atsi_count number of entries in atsi 807 * @param atsi_count number of entries in atsi
808 * @return GNUNET_OK on success, GNUNET_SYSERR if the other peer violated the protocol
744 */ 809 */
745static int 810static int
746handle_dv_route_message (void *cls, const struct GNUNET_PeerIdentity *peer, 811handle_dv_route_message (void *cls, const struct GNUNET_PeerIdentity *peer,
@@ -748,7 +813,62 @@ handle_dv_route_message (void *cls, const struct GNUNET_PeerIdentity *peer,
748 const struct GNUNET_ATS_Information *atsi, 813 const struct GNUNET_ATS_Information *atsi,
749 unsigned int atsi_count) 814 unsigned int atsi_count)
750{ 815{
751 GNUNET_break (0); // FIXME 816 const struct RouteMessage *rm;
817 const struct GNUNET_MessageHeader *payload;
818 struct Route *route;
819
820 if (ntohs (message->size) < sizeof (struct RouteMessage) + sizeof (struct GNUNET_MessageHeader))
821 {
822 GNUNET_break_op (0);
823 return GNUNET_SYSERR;
824 }
825 rm = (const struct RouteMessage *) message;
826 payload = (const struct GNUNET_MessageHeader *) &rm[1];
827 if (ntohs (message->size) != sizeof (struct RouteMessage) + ntohs (payload->size))
828 {
829 GNUNET_break_op (0);
830 return GNUNET_SYSERR;
831 }
832 if (0 == memcmp (&rm->target,
833 &my_identity,
834 sizeof (struct GNUNET_PeerIdentity)))
835 {
836 /* message is for me, check reverse route! */
837 route = GNUNET_CONTAINER_multihashmap_get (all_routes,
838 &rm->sender.hashPubKey);
839 if (NULL == route)
840 {
841 /* don't have reverse route, drop */
842 GNUNET_STATISTICS_update (stats,
843 "# message discarded (no reverse route)",
844 1, GNUNET_NO);
845 return GNUNET_OK;
846 }
847 send_data_to_plugin (payload,
848 &rm->sender,
849 route->target.distance);
850 return GNUNET_OK;
851 }
852 route = GNUNET_CONTAINER_multihashmap_get (all_routes,
853 &rm->target.hashPubKey);
854 if (NULL == route)
855 {
856 GNUNET_STATISTICS_update (stats,
857 "# messages discarded (no route)",
858 1, GNUNET_NO);
859 return GNUNET_OK;
860 }
861 if (route->target.distance > ntohl (rm->distance) + 1)
862 {
863 GNUNET_STATISTICS_update (stats,
864 "# messages discarded (target too far)",
865 1, GNUNET_NO);
866 return GNUNET_OK;
867 }
868 forward_payload (route->next_hop,
869 route->target.distance,
870 &rm->sender,
871 payload);
752 return GNUNET_OK; 872 return GNUNET_OK;
753} 873}
754 874
@@ -765,7 +885,43 @@ static void
765handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client, 885handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client,
766 const struct GNUNET_MessageHeader *message) 886 const struct GNUNET_MessageHeader *message)
767{ 887{
768 GNUNET_break (0); // FIXME 888 struct Route *route;
889 const struct GNUNET_DV_SendMessage *msg;
890 const struct GNUNET_MessageHeader *payload;
891
892 if (ntohs (message->size) < sizeof (struct GNUNET_DV_SendMessage) + sizeof (struct GNUNET_MessageHeader))
893 {
894 GNUNET_break (0);
895 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
896 return;
897 }
898 msg = (const struct GNUNET_DV_SendMessage *) message;
899 payload = (const struct GNUNET_MessageHeader *) &msg[1];
900 if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size))
901 {
902 GNUNET_break (0);
903 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
904 return;
905 }
906 route = GNUNET_CONTAINER_multihashmap_get (all_routes,
907 &msg->target.hashPubKey);
908 if (NULL == route)
909 {
910 /* got disconnected, send ACK anyway?
911 FIXME: What we really want is an 'NACK' here... */
912 GNUNET_STATISTICS_update (stats,
913 "# local messages discarded (no route)",
914 1, GNUNET_NO);
915 send_ack_to_plugin (&msg->target, htonl (msg->uid));
916 GNUNET_SERVER_receive_done (client, GNUNET_OK);
917 return;
918 }
919 // FIXME: flow control (send ACK only once message has left the queue...)
920 send_ack_to_plugin (&msg->target, htonl (msg->uid));
921 forward_payload (route->next_hop,
922 route->target.distance,
923 &my_identity,
924 payload);
769 GNUNET_SERVER_receive_done (client, GNUNET_OK); 925 GNUNET_SERVER_receive_done (client, GNUNET_OK);
770} 926}
771 927
@@ -876,6 +1032,7 @@ cleanup_neighbor (struct DirectNeighbor *neighbor)
876 1032
877 while (NULL != (pending = neighbor->pm_head)) 1033 while (NULL != (pending = neighbor->pm_head))
878 { 1034 {
1035 neighbor->pm_queue_size--;
879 GNUNET_CONTAINER_DLL_remove (neighbor->pm_head, 1036 GNUNET_CONTAINER_DLL_remove (neighbor->pm_head,
880 neighbor->pm_tail, 1037 neighbor->pm_tail,
881 pending); 1038 pending);
@@ -889,6 +1046,16 @@ cleanup_neighbor (struct DirectNeighbor *neighbor)
889 GNUNET_CORE_notify_transmit_ready_cancel (neighbor->cth); 1046 GNUNET_CORE_notify_transmit_ready_cancel (neighbor->cth);
890 neighbor->cth = NULL; 1047 neighbor->cth = NULL;
891 } 1048 }
1049 if (GNUNET_SCHEDULER_NO_TASK != neighbor->consensus_task)
1050 {
1051 GNUNET_SCHEDULER_cancel (neighbor->consensus_task);
1052 neighbor->consensus_task = GNUNET_SCHEDULER_NO_TASK;
1053 }
1054 if (NULL != neighbor->consensus)
1055 {
1056 GNUNET_CONSENSUS_destroy (neighbor->consensus);
1057 neighbor->consensus = NULL;
1058 }
892 GNUNET_assert (GNUNET_YES == 1059 GNUNET_assert (GNUNET_YES ==
893 GNUNET_CONTAINER_multihashmap_remove (direct_neighbors, 1060 GNUNET_CONTAINER_multihashmap_remove (direct_neighbors,
894 &neighbor->peer.hashPubKey, 1061 &neighbor->peer.hashPubKey,