diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-06-20 19:56:20 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-06-20 19:56:20 +0000 |
commit | 38cd63a1e806124d19bdf478d9bfcdafe3bdccbb (patch) | |
tree | d6face14daa2323bc5ea7584f1bb48853db0d599 | |
parent | 45efbb2efbaa11a25e7e53592ee41f971ce6babd (diff) | |
download | gnunet-38cd63a1e806124d19bdf478d9bfcdafe3bdccbb.tar.gz gnunet-38cd63a1e806124d19bdf478d9bfcdafe3bdccbb.zip |
convering nse_api.c to new MQ API
-rw-r--r-- | src/nse/nse_api.c | 203 |
1 files changed, 73 insertions, 130 deletions
diff --git a/src/nse/nse_api.c b/src/nse/nse_api.c index 1c260d537..d942d5ec6 100644 --- a/src/nse/nse_api.c +++ b/src/nse/nse_api.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2009, 2010, 2011 GNUnet e.V. | 3 | Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -45,19 +45,14 @@ struct GNUNET_NSE_Handle | |||
45 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 45 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
46 | 46 | ||
47 | /** | 47 | /** |
48 | * Socket (if available). | 48 | * Message queue (if available). |
49 | */ | 49 | */ |
50 | struct GNUNET_CLIENT_Connection *client; | 50 | struct GNUNET_MQ_Handle *mq; |
51 | |||
52 | /** | ||
53 | * Currently pending transmission request. | ||
54 | */ | ||
55 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
56 | 51 | ||
57 | /** | 52 | /** |
58 | * Task doing exponential back-off trying to reconnect. | 53 | * Task doing exponential back-off trying to reconnect. |
59 | */ | 54 | */ |
60 | struct GNUNET_SCHEDULER_Task * reconnect_task; | 55 | struct GNUNET_SCHEDULER_Task *reconnect_task; |
61 | 56 | ||
62 | /** | 57 | /** |
63 | * Time for next connect retry. | 58 | * Time for next connect retry. |
@@ -80,118 +75,55 @@ struct GNUNET_NSE_Handle | |||
80 | /** | 75 | /** |
81 | * Try again to connect to network size estimation service. | 76 | * Try again to connect to network size estimation service. |
82 | * | 77 | * |
83 | * @param cls the handle to the transport service | 78 | * @param cls closure with the `struct GNUNET_NSE_Handle *` |
84 | */ | 79 | */ |
85 | static void | 80 | static void |
86 | reconnect (void *cls); | 81 | reconnect (void *cls); |
87 | 82 | ||
88 | 83 | ||
89 | /** | 84 | /** |
90 | * Type of a function to call when we receive a message | 85 | * Generic error handler, called with the appropriate |
91 | * from the service. | 86 | * error code and the same closure specified at the creation of |
87 | * the message queue. | ||
88 | * Not every message queue implementation supports an error handler. | ||
92 | * | 89 | * |
93 | * @param cls closure | 90 | * @param cls closure with the `struct GNUNET_NSE_Handle *` |
94 | * @param msg message received, NULL on timeout or fatal error | 91 | * @param error error code |
95 | */ | 92 | */ |
96 | static void | 93 | static void |
97 | message_handler (void *cls, | 94 | mq_error_handler (void *cls, |
98 | const struct GNUNET_MessageHeader *msg) | 95 | enum GNUNET_MQ_Error error) |
99 | { | 96 | { |
100 | struct GNUNET_NSE_Handle *h = cls; | 97 | struct GNUNET_NSE_Handle *h = cls; |
101 | const struct GNUNET_NSE_ClientMessage *client_msg; | ||
102 | |||
103 | if (NULL == msg) | ||
104 | { | ||
105 | /* Error, timeout, death */ | ||
106 | GNUNET_CLIENT_disconnect (h->client); | ||
107 | h->client = NULL; | ||
108 | h->reconnect_task = | ||
109 | GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, | ||
110 | &reconnect, h); | ||
111 | return; | ||
112 | } | ||
113 | if ((ntohs (msg->size) != sizeof (struct GNUNET_NSE_ClientMessage)) || | ||
114 | (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_NSE_ESTIMATE)) | ||
115 | { | ||
116 | GNUNET_break (0); | ||
117 | return; | ||
118 | } | ||
119 | client_msg = (const struct GNUNET_NSE_ClientMessage *) msg; | ||
120 | h->recv_cb (h->recv_cb_cls, GNUNET_TIME_absolute_ntoh (client_msg->timestamp), | ||
121 | GNUNET_ntoh_double (client_msg->size_estimate), | ||
122 | GNUNET_ntoh_double (client_msg->std_deviation)); | ||
123 | GNUNET_CLIENT_receive (h->client, &message_handler, h, | ||
124 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
125 | } | ||
126 | |||
127 | |||
128 | /** | ||
129 | * Reschedule a connect attempt to the service. | ||
130 | * | ||
131 | * @param h transport service to reconnect | ||
132 | */ | ||
133 | static void | ||
134 | reschedule_connect (struct GNUNET_NSE_Handle *h) | ||
135 | { | ||
136 | GNUNET_assert (h->reconnect_task == NULL); | ||
137 | |||
138 | if (NULL != h->th) | ||
139 | { | ||
140 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | ||
141 | h->th = NULL; | ||
142 | } | ||
143 | if (NULL != h->client) | ||
144 | { | ||
145 | GNUNET_CLIENT_disconnect (h->client); | ||
146 | h->client = NULL; | ||
147 | } | ||
148 | 98 | ||
149 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 99 | GNUNET_MQ_destroy (h->mq); |
150 | "Scheduling task to reconnect to nse service in %s.\n", | 100 | h->mq = NULL; |
151 | GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, | 101 | h->reconnect_task |
152 | GNUNET_YES)); | 102 | = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, |
153 | h->reconnect_task = | 103 | &reconnect, |
154 | GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, | 104 | h); |
155 | &reconnect, h); | ||
156 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); | 105 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); |
157 | } | 106 | } |
158 | 107 | ||
159 | 108 | ||
160 | /** | 109 | /** |
161 | * Transmit START message to service. | 110 | * Type of a function to call when we receive a message |
111 | * from the service. | ||
162 | * | 112 | * |
163 | * @param cls the `struct GNUNET_NSE_Handle *` | 113 | * @param cls closure |
164 | * @param size number of bytes available in @a buf | 114 | * @param cklient_msg message received |
165 | * @param buf where to copy the message | ||
166 | * @return number of bytes copied to @a buf | ||
167 | */ | 115 | */ |
168 | static size_t | 116 | static void |
169 | send_start (void *cls, size_t size, void *buf) | 117 | handle_estimate (void *cls, |
118 | const struct GNUNET_NSE_ClientMessage *client_msg) | ||
170 | { | 119 | { |
171 | struct GNUNET_NSE_Handle *h = cls; | 120 | struct GNUNET_NSE_Handle *h = cls; |
172 | struct GNUNET_MessageHeader *msg; | ||
173 | |||
174 | h->th = NULL; | ||
175 | if (NULL == buf) | ||
176 | { | ||
177 | /* Connect error... */ | ||
178 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
179 | "Error while trying to transmit `%s' request.\n", | ||
180 | "START"); | ||
181 | reschedule_connect (h); | ||
182 | return 0; | ||
183 | } | ||
184 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
185 | "Transmitting `%s' request.\n", | ||
186 | "START"); | ||
187 | GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); | ||
188 | 121 | ||
189 | msg = (struct GNUNET_MessageHeader *) buf; | 122 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; |
190 | msg->size = htons (sizeof (struct GNUNET_MessageHeader)); | 123 | h->recv_cb (h->recv_cb_cls, |
191 | msg->type = htons (GNUNET_MESSAGE_TYPE_NSE_START); | 124 | GNUNET_TIME_absolute_ntoh (client_msg->timestamp), |
192 | GNUNET_CLIENT_receive (h->client, &message_handler, h, | 125 | GNUNET_ntoh_double (client_msg->size_estimate), |
193 | GNUNET_TIME_UNIT_FOREVER_REL); | 126 | GNUNET_ntoh_double (client_msg->std_deviation)); |
194 | return sizeof (struct GNUNET_MessageHeader); | ||
195 | } | 127 | } |
196 | 128 | ||
197 | 129 | ||
@@ -203,21 +135,32 @@ send_start (void *cls, size_t size, void *buf) | |||
203 | static void | 135 | static void |
204 | reconnect (void *cls) | 136 | reconnect (void *cls) |
205 | { | 137 | { |
138 | GNUNET_MQ_hd_fixed_size (estimate, | ||
139 | GNUNET_MESSAGE_TYPE_NSE_ESTIMATE, | ||
140 | struct GNUNET_NSE_ClientMessage); | ||
206 | struct GNUNET_NSE_Handle *h = cls; | 141 | struct GNUNET_NSE_Handle *h = cls; |
142 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
143 | make_estimate_handler (h), | ||
144 | GNUNET_MQ_handler_end () | ||
145 | }; | ||
146 | struct GNUNET_MessageHeader *msg; | ||
147 | struct GNUNET_MQ_Envelope *env; | ||
207 | 148 | ||
208 | h->reconnect_task = NULL; | 149 | h->reconnect_task = NULL; |
209 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 150 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
210 | "Connecting to network size estimation service.\n"); | 151 | "Connecting to network size estimation service.\n"); |
211 | GNUNET_assert (NULL == h->client); | 152 | GNUNET_assert (NULL == h->mq); |
212 | h->client = GNUNET_CLIENT_connect ("nse", h->cfg); | 153 | h->mq = GNUNET_CLIENT_connecT (h->cfg, |
213 | GNUNET_assert (NULL != h->client); | 154 | "nse", |
214 | GNUNET_assert (NULL == h->th); | 155 | handlers, |
215 | h->th = | 156 | &mq_error_handler, |
216 | GNUNET_CLIENT_notify_transmit_ready (h->client, | 157 | h); |
217 | sizeof (struct GNUNET_MessageHeader), | 158 | if (NULL == h->mq) |
218 | GNUNET_TIME_UNIT_FOREVER_REL, | 159 | return; |
219 | GNUNET_NO, &send_start, h); | 160 | env = GNUNET_MQ_msg (msg, |
220 | GNUNET_assert (NULL != h->th); | 161 | GNUNET_MESSAGE_TYPE_NSE_START); |
162 | GNUNET_MQ_send (h->mq, | ||
163 | env); | ||
221 | } | 164 | } |
222 | 165 | ||
223 | 166 | ||
@@ -231,18 +174,24 @@ reconnect (void *cls) | |||
231 | */ | 174 | */ |
232 | struct GNUNET_NSE_Handle * | 175 | struct GNUNET_NSE_Handle * |
233 | GNUNET_NSE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | 176 | GNUNET_NSE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, |
234 | GNUNET_NSE_Callback func, void *func_cls) | 177 | GNUNET_NSE_Callback func, |
178 | void *func_cls) | ||
235 | { | 179 | { |
236 | struct GNUNET_NSE_Handle *ret; | 180 | struct GNUNET_NSE_Handle *h; |
237 | 181 | ||
238 | GNUNET_assert (func != NULL); | 182 | GNUNET_assert (NULL != func); |
239 | ret = GNUNET_new (struct GNUNET_NSE_Handle); | 183 | h = GNUNET_new (struct GNUNET_NSE_Handle); |
240 | ret->cfg = cfg; | 184 | h->cfg = cfg; |
241 | ret->recv_cb = func; | 185 | h->recv_cb = func; |
242 | ret->recv_cb_cls = func_cls; | 186 | h->recv_cb_cls = func_cls; |
243 | ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 187 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; |
244 | ret->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, ret); | 188 | reconnect (h); |
245 | return ret; | 189 | if (NULL == h->mq) |
190 | { | ||
191 | GNUNET_free (h); | ||
192 | return NULL; | ||
193 | } | ||
194 | return h; | ||
246 | } | 195 | } |
247 | 196 | ||
248 | 197 | ||
@@ -254,21 +203,15 @@ GNUNET_NSE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
254 | void | 203 | void |
255 | GNUNET_NSE_disconnect (struct GNUNET_NSE_Handle *h) | 204 | GNUNET_NSE_disconnect (struct GNUNET_NSE_Handle *h) |
256 | { | 205 | { |
257 | GNUNET_assert (NULL != h); | 206 | if (NULL != h->reconnect_task) |
258 | if (h->reconnect_task != NULL) | ||
259 | { | 207 | { |
260 | GNUNET_SCHEDULER_cancel (h->reconnect_task); | 208 | GNUNET_SCHEDULER_cancel (h->reconnect_task); |
261 | h->reconnect_task = NULL; | 209 | h->reconnect_task = NULL; |
262 | } | 210 | } |
263 | if (NULL != h->th) | 211 | if (NULL != h->mq) |
264 | { | ||
265 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | ||
266 | h->th = NULL; | ||
267 | } | ||
268 | if (NULL != h->client) | ||
269 | { | 212 | { |
270 | GNUNET_CLIENT_disconnect (h->client); | 213 | GNUNET_MQ_destroy (h->mq); |
271 | h->client = NULL; | 214 | h->mq = NULL; |
272 | } | 215 | } |
273 | GNUNET_free (h); | 216 | GNUNET_free (h); |
274 | } | 217 | } |