aboutsummaryrefslogtreecommitdiff
path: root/src/rps/rps_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r--src/rps/rps_api.c149
1 files changed, 142 insertions, 7 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index d0b241a2b..7a3adfa94 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -29,6 +29,8 @@
29#include "gnunet_rps_service.h" 29#include "gnunet_rps_service.h"
30#include "rps-sampler_client.h" 30#include "rps-sampler_client.h"
31 31
32#include "gnunet_nse_service.h"
33
32#include <inttypes.h> 34#include <inttypes.h>
33 35
34#define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__) 36#define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__)
@@ -109,6 +111,35 @@ struct GNUNET_RPS_Handle
109 * @brief Tail of the DLL of stream requests 111 * @brief Tail of the DLL of stream requests
110 */ 112 */
111 struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail; 113 struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail;
114
115 /**
116 * @brief Handle to nse service
117 */
118 struct GNUNET_NSE_Handle *nse;
119
120 /**
121 * @brief Pointer to the head element in DLL of request handles
122 */
123 struct GNUNET_RPS_Request_Handle *rh_head;
124
125 /**
126 * @brief Pointer to the tail element in DLL of request handles
127 */
128 struct GNUNET_RPS_Request_Handle *rh_tail;
129
130 /**
131 * @brief The desired probability with which we want to have observed all
132 * peers.
133 */
134 float desired_probability;
135
136 /**
137 * @brief A factor that catches the 'bias' of a random stream of peer ids.
138 *
139 * As introduced by Brahms: Factor between the number of unique ids in a
140 * truly random stream and number of unique ids in the gossip stream.
141 */
142 float deficiency_factor;
112}; 143};
113 144
114 145
@@ -152,6 +183,16 @@ struct GNUNET_RPS_Request_Handle
152 * The closure for the callback. 183 * The closure for the callback.
153 */ 184 */
154 void *ready_cb_cls; 185 void *ready_cb_cls;
186
187 /**
188 * @brief Pointer to next element in DLL
189 */
190 struct GNUNET_RPS_Request_Handle *next;
191
192 /**
193 * @brief Pointer to previous element in DLL
194 */
195 struct GNUNET_RPS_Request_Handle *prev;
155}; 196};
156 197
157 198
@@ -263,10 +304,7 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
263 rh->ready_cb (rh->ready_cb_cls, 304 rh->ready_cb (rh->ready_cb_cls,
264 num_peers, 305 num_peers,
265 peers); 306 peers);
266 GNUNET_RPS_stream_cancel (rh->srh); 307 GNUNET_RPS_request_cancel (rh);
267 rh->srh = NULL;
268 RPS_sampler_destroy (rh->sampler);
269 rh->sampler = NULL;
270} 308}
271 309
272 310
@@ -607,6 +645,37 @@ hash_from_share_val (const char *share_val,
607 645
608 646
609/** 647/**
648 * @brief Callback for network size estimate - called with new estimates about
649 * the network size, updates all samplers with the new estimate
650 *
651 * Implements #GNUNET_NSE_Callback
652 *
653 * @param cls the rps handle
654 * @param timestamp unused
655 * @param logestimate the estimate
656 * @param std_dev the standard distribution
657 */
658static void
659nse_cb (void *cls,
660 struct GNUNET_TIME_Absolute timestamp,
661 double logestimate,
662 double std_dev)
663{
664 struct GNUNET_RPS_Handle *h = cls;
665 (void) timestamp;
666 (void) std_dev;
667
668 for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
669 NULL != rh_iter && NULL != rh_iter->next;
670 rh_iter = rh_iter->next)
671 {
672 RPS_sampler_update_with_nw_size (rh_iter->sampler,
673 GNUNET_NSE_log_estimate_to_n (logestimate));
674 }
675}
676
677
678/**
610 * Reconnect to the service 679 * Reconnect to the service
611 */ 680 */
612static void 681static void
@@ -631,6 +700,9 @@ reconnect (struct GNUNET_RPS_Handle *h)
631 mq_handlers, 700 mq_handlers,
632 &mq_error_handler, 701 &mq_error_handler,
633 h); 702 h);
703 if (NULL != h->nse)
704 GNUNET_NSE_disconnect (h->nse);
705 h->nse = GNUNET_NSE_connect (h->cfg, &nse_cb, h);
634} 706}
635 707
636 708
@@ -638,7 +710,7 @@ reconnect (struct GNUNET_RPS_Handle *h)
638 * Connect to the rps service 710 * Connect to the rps service
639 * 711 *
640 * @param cfg configuration to use 712 * @param cfg configuration to use
641 * @return a handle to the service 713 * @return a handle to the service, NULL on error
642 */ 714 */
643struct GNUNET_RPS_Handle * 715struct GNUNET_RPS_Handle *
644GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) 716GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
@@ -647,6 +719,44 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
647 719
648 h = GNUNET_new (struct GNUNET_RPS_Handle); 720 h = GNUNET_new (struct GNUNET_RPS_Handle);
649 h->cfg = cfg; 721 h->cfg = cfg;
722 if (GNUNET_OK !=
723 GNUNET_CONFIGURATION_get_value_float (cfg,
724 "RPS",
725 "DESIRED_PROBABILITY",
726 &h->desired_probability))
727 {
728 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
729 "RPS", "DESIRED_PROBABILITY");
730 GNUNET_free (h);
731 return NULL;
732 }
733 if (0 > h->desired_probability ||
734 1 < h->desired_probability)
735 {
736 LOG (GNUNET_ERROR_TYPE_ERROR,
737 "The desired probability must be in the interval [0;1]\n");
738 GNUNET_free (h);
739 return NULL;
740 }
741 if (GNUNET_OK !=
742 GNUNET_CONFIGURATION_get_value_float (cfg,
743 "RPS",
744 "DEFICIENCY_FACTOR",
745 &h->deficiency_factor))
746 {
747 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
748 "RPS", "DEFICIENCY_FACTOR");
749 GNUNET_free (h);
750 return NULL;
751 }
752 if (0 > h->desired_probability ||
753 1 < h->desired_probability)
754 {
755 LOG (GNUNET_ERROR_TYPE_ERROR,
756 "The deficiency factor must be in the interval [0;1]\n");
757 GNUNET_free (h);
758 return NULL;
759 }
650 reconnect (h); 760 reconnect (h);
651 if (NULL == h->mq) 761 if (NULL == h->mq)
652 { 762 {
@@ -725,6 +835,10 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
725 rh->num_requests = num_req_peers; 835 rh->num_requests = num_req_peers;
726 rh->sampler = RPS_sampler_mod_init (num_req_peers, 836 rh->sampler = RPS_sampler_mod_init (num_req_peers,
727 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff 837 GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
838 RPS_sampler_set_desired_probability (rh->sampler,
839 rps_handle->desired_probability);
840 RPS_sampler_set_deficiency_factor (rh->sampler,
841 rps_handle->deficiency_factor);
728 rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler, 842 rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
729 num_req_peers, 843 num_req_peers,
730 peers_ready_cb, 844 peers_ready_cb,
@@ -734,6 +848,9 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
734 rh); /* cls */ 848 rh); /* cls */
735 rh->ready_cb = ready_cb; 849 rh->ready_cb = ready_cb;
736 rh->ready_cb_cls = cls; 850 rh->ready_cb_cls = cls;
851 GNUNET_CONTAINER_DLL_insert (rps_handle->rh_head,
852 rps_handle->rh_tail,
853 rh);
737 854
738 return rh; 855 return rh;
739} 856}
@@ -911,6 +1028,7 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
911 1028
912 h = rh->rps_handle; 1029 h = rh->rps_handle;
913 GNUNET_assert (NULL != rh); 1030 GNUNET_assert (NULL != rh);
1031 GNUNET_assert (NULL != rh->srh);
914 GNUNET_assert (h == rh->srh->rps_handle); 1032 GNUNET_assert (h == rh->srh->rps_handle);
915 GNUNET_RPS_stream_cancel (rh->srh); 1033 GNUNET_RPS_stream_cancel (rh->srh);
916 rh->srh = NULL; 1034 rh->srh = NULL;
@@ -920,6 +1038,10 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
920 RPS_sampler_request_cancel (rh->sampler_rh); 1038 RPS_sampler_request_cancel (rh->sampler_rh);
921 } 1039 }
922 RPS_sampler_destroy (rh->sampler); 1040 RPS_sampler_destroy (rh->sampler);
1041 rh->sampler = NULL;
1042 GNUNET_CONTAINER_DLL_remove (h->rh_head,
1043 h->rh_tail,
1044 rh);
923 GNUNET_free (rh); 1045 GNUNET_free (rh);
924} 1046}
925 1047
@@ -939,13 +1061,24 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
939 LOG (GNUNET_ERROR_TYPE_WARNING, 1061 LOG (GNUNET_ERROR_TYPE_WARNING,
940 "Still waiting for replies\n"); 1062 "Still waiting for replies\n");
941 for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head; 1063 for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
942 NULL != srh_iter; 1064 NULL != srh_iter;
943 srh_iter = srh_next) 1065 srh_iter = srh_next)
944 { 1066 {
945 srh_next = srh_iter->next; 1067 srh_next = srh_iter->next;
946 GNUNET_RPS_stream_cancel (srh_iter); 1068 GNUNET_RPS_stream_cancel (srh_iter);
947 } 1069 }
948 } 1070 }
1071 if (NULL != h->rh_head)
1072 {
1073 LOG (GNUNET_ERROR_TYPE_WARNING,
1074 "Not all requests were cancelled!\n");
1075 for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
1076 h->rh_head != NULL;
1077 rh_iter = h->rh_head)
1078 {
1079 GNUNET_RPS_request_cancel (rh_iter);
1080 }
1081 }
949 if (NULL != srh_callback_peers) 1082 if (NULL != srh_callback_peers)
950 { 1083 {
951 GNUNET_free (srh_callback_peers); 1084 GNUNET_free (srh_callback_peers);
@@ -957,6 +1090,8 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
957 "Still waiting for view updates\n"); 1090 "Still waiting for view updates\n");
958 GNUNET_RPS_view_request_cancel (h); 1091 GNUNET_RPS_view_request_cancel (h);
959 } 1092 }
1093 if (NULL != h->nse)
1094 GNUNET_NSE_disconnect (h->nse);
960 GNUNET_MQ_destroy (h->mq); 1095 GNUNET_MQ_destroy (h->mq);
961 GNUNET_free (h); 1096 GNUNET_free (h);
962} 1097}