diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-06-27 14:43:43 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-06-27 14:43:43 +0000 |
commit | b0474eb7a6aa570c04e4d76fd664caa406b3dd30 (patch) | |
tree | 3c75a22df7f927addeb3b6ca51c266088b6978fd /src/testbed/testbed_api_barriers.c | |
parent | 6f9c566127d0737ecd529f458dc28b7fc02e6eb9 (diff) | |
download | gnunet-b0474eb7a6aa570c04e4d76fd664caa406b3dd30.tar.gz gnunet-b0474eb7a6aa570c04e4d76fd664caa406b3dd30.zip |
convert testbed_api_barriers to new MQ API
Diffstat (limited to 'src/testbed/testbed_api_barriers.c')
-rw-r--r-- | src/testbed/testbed_api_barriers.c | 205 |
1 files changed, 99 insertions, 106 deletions
diff --git a/src/testbed/testbed_api_barriers.c b/src/testbed/testbed_api_barriers.c index 3ec4a905e..74139cc53 100644 --- a/src/testbed/testbed_api_barriers.c +++ b/src/testbed/testbed_api_barriers.c | |||
@@ -56,19 +56,9 @@ struct GNUNET_TESTBED_BarrierWaitHandle | |||
56 | struct GNUNET_CONFIGURATION_Handle *cfg; | 56 | struct GNUNET_CONFIGURATION_Handle *cfg; |
57 | 57 | ||
58 | /** | 58 | /** |
59 | * The client connection | 59 | * The testbed-barrier service message queue. |
60 | */ | 60 | */ |
61 | struct GNUNET_CLIENT_Connection *conn; | 61 | struct GNUNET_MQ_Handle *mq; |
62 | |||
63 | /** | ||
64 | * Transmit handle | ||
65 | */ | ||
66 | struct GNUNET_CLIENT_TransmitHandle *tx; | ||
67 | |||
68 | /** | ||
69 | * The message to transmit with tx | ||
70 | */ | ||
71 | struct GNUNET_MessageHeader *msg; | ||
72 | 62 | ||
73 | /** | 63 | /** |
74 | * The barrier wait callback | 64 | * The barrier wait callback |
@@ -76,98 +66,92 @@ struct GNUNET_TESTBED_BarrierWaitHandle | |||
76 | GNUNET_TESTBED_barrier_wait_cb cb; | 66 | GNUNET_TESTBED_barrier_wait_cb cb; |
77 | 67 | ||
78 | /** | 68 | /** |
79 | * The closure for the above callback | 69 | * The closure for @e cb. |
80 | */ | 70 | */ |
81 | void *cls; | 71 | void *cb_cls; |
82 | }; | 72 | }; |
83 | 73 | ||
84 | 74 | ||
85 | 75 | ||
86 | /** | 76 | /** |
77 | * Check if barrier status message is well-formed. | ||
78 | * | ||
79 | * @param cls closure | ||
80 | * @param msg received message | ||
81 | * @return #GNUNET_OK if the message is well-formed. | ||
82 | */ | ||
83 | static int | ||
84 | check_status (void *cls, | ||
85 | const struct GNUNET_TESTBED_BarrierStatusMsg *msg) | ||
86 | { | ||
87 | /* FIXME: this fails to actually check that the message | ||
88 | follows the protocol spec (0-terminations!). However, | ||
89 | not critical as #handle_status() doesn't interpret the | ||
90 | variable-size part anyway right now. */ | ||
91 | return GNUNET_OK; | ||
92 | } | ||
93 | |||
94 | |||
95 | /** | ||
87 | * Type of a function to call when we receive a message | 96 | * Type of a function to call when we receive a message |
88 | * from the service. | 97 | * from the service. |
89 | * | 98 | * |
90 | * @param cls closure | 99 | * @param cls closure |
91 | * @param message received message; NULL on timeout or fatal error | 100 | * @param msg received message |
92 | */ | 101 | */ |
93 | static void | 102 | static void |
94 | receive_handler (void *cls, | 103 | handle_status (void *cls, |
95 | const struct GNUNET_MessageHeader *message) | 104 | const struct GNUNET_TESTBED_BarrierStatusMsg *msg) |
96 | { | 105 | { |
97 | struct GNUNET_TESTBED_BarrierWaitHandle *h = cls; | 106 | struct GNUNET_TESTBED_BarrierWaitHandle *h = cls; |
98 | const struct GNUNET_TESTBED_BarrierStatusMsg *msg; | ||
99 | uint16_t msize; | ||
100 | 107 | ||
101 | if (NULL == message) | ||
102 | { | ||
103 | GNUNET_break_op (0); | ||
104 | goto fail; | ||
105 | } | ||
106 | if (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS != ntohs (message->type)) | ||
107 | { | ||
108 | GNUNET_break_op (0); | ||
109 | goto fail; | ||
110 | } | ||
111 | msize = ntohs (message->size); | ||
112 | if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)) | ||
113 | { | ||
114 | GNUNET_break_op (0); | ||
115 | goto fail; | ||
116 | } | ||
117 | msg = (const struct GNUNET_TESTBED_BarrierStatusMsg *) message; | ||
118 | switch (ntohs (msg->status)) | 108 | switch (ntohs (msg->status)) |
119 | { | 109 | { |
120 | case GNUNET_TESTBED_BARRIERSTATUS_ERROR: | 110 | case GNUNET_TESTBED_BARRIERSTATUS_ERROR: |
121 | goto fail; | 111 | h->cb (h->cb_cls, |
112 | h->name, | ||
113 | GNUNET_SYSERR); | ||
114 | break; | ||
122 | case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED: | 115 | case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED: |
116 | h->cb (h->cb_cls, | ||
117 | h->name, | ||
118 | GNUNET_SYSERR); | ||
123 | GNUNET_break (0); | 119 | GNUNET_break (0); |
124 | goto fail; | 120 | break; |
125 | case GNUNET_TESTBED_BARRIERSTATUS_CROSSED: | 121 | case GNUNET_TESTBED_BARRIERSTATUS_CROSSED: |
126 | h->cb (h->cls, h->name, GNUNET_OK); | 122 | h->cb (h->cb_cls, |
127 | goto destroy; | 123 | h->name, |
124 | GNUNET_OK); | ||
125 | break; | ||
128 | default: | 126 | default: |
129 | GNUNET_break_op (0); | 127 | GNUNET_break_op (0); |
128 | h->cb (h->cb_cls, | ||
129 | h->name, | ||
130 | GNUNET_SYSERR); | ||
131 | break; | ||
130 | } | 132 | } |
131 | |||
132 | fail: | ||
133 | h->cb (h->cls, h->name, GNUNET_SYSERR); | ||
134 | |||
135 | destroy: | ||
136 | GNUNET_TESTBED_barrier_wait_cancel (h); | 133 | GNUNET_TESTBED_barrier_wait_cancel (h); |
137 | } | 134 | } |
138 | 135 | ||
139 | 136 | ||
140 | /** | 137 | /** |
141 | * Function called to notify a client about the connection | 138 | * Generic error handler, called with the appropriate error code and |
142 | * begin ready to queue more data. "buf" will be | 139 | * the same closure specified at the creation of the message queue. |
143 | * NULL and "size" zero if the connection was closed for | 140 | * Not every message queue implementation supports an error handler. |
144 | * writing in the meantime. | ||
145 | * | 141 | * |
146 | * @param cls closure | 142 | * @param cls closure with the `struct GNUNET_TESTBED_BarrierWaitHandle *` |
147 | * @param size number of bytes available in buf | 143 | * @param error error code |
148 | * @param buf where the callee should write the message | ||
149 | * @return number of bytes written to buf | ||
150 | */ | 144 | */ |
151 | static size_t | 145 | static void |
152 | transmit_notify (void *cls, size_t size, void *buf) | 146 | mq_error_handler (void *cls, |
147 | enum GNUNET_MQ_Error error) | ||
153 | { | 148 | { |
154 | struct GNUNET_TESTBED_BarrierWaitHandle *h = cls; | 149 | struct GNUNET_TESTBED_BarrierWaitHandle *h = cls; |
155 | uint16_t msize; | ||
156 | 150 | ||
157 | h->tx = NULL; | 151 | h->cb (h->cb_cls, |
158 | if ((0 == size) || (NULL == buf)) | 152 | h->name, |
159 | { | 153 | GNUNET_SYSERR); |
160 | h->cb (h->cls, h->name, GNUNET_SYSERR); | 154 | GNUNET_TESTBED_barrier_wait_cancel (h); |
161 | GNUNET_TESTBED_barrier_wait_cancel (h); | ||
162 | return 0; | ||
163 | } | ||
164 | msize = htons (h->msg->size); | ||
165 | GNUNET_assert (msize <= size); | ||
166 | (void) memcpy (buf, h->msg, msize); | ||
167 | GNUNET_free (h->msg); | ||
168 | h->msg = NULL; | ||
169 | GNUNET_CLIENT_receive (h->conn, &receive_handler, h, GNUNET_TIME_UNIT_FOREVER_REL); | ||
170 | return msize; | ||
171 | } | 155 | } |
172 | 156 | ||
173 | 157 | ||
@@ -178,64 +162,74 @@ transmit_notify (void *cls, size_t size, void *buf) | |||
178 | * | 162 | * |
179 | * @param name the name of the barrier | 163 | * @param name the name of the barrier |
180 | * @param cb the barrier wait callback | 164 | * @param cb the barrier wait callback |
181 | * @param cls the closure for the above callback | 165 | * @param cb_cls the closure for @a cb |
182 | * @return barrier wait handle which can be used to cancel the waiting at | 166 | * @return barrier wait handle which can be used to cancel the waiting at |
183 | * anytime before the callback is called. NULL upon error. | 167 | * anytime before the callback is called. NULL upon error. |
184 | */ | 168 | */ |
185 | struct GNUNET_TESTBED_BarrierWaitHandle * | 169 | struct GNUNET_TESTBED_BarrierWaitHandle * |
186 | GNUNET_TESTBED_barrier_wait (const char *name, | 170 | GNUNET_TESTBED_barrier_wait (const char *name, |
187 | GNUNET_TESTBED_barrier_wait_cb cb, | 171 | GNUNET_TESTBED_barrier_wait_cb cb, |
188 | void *cls) | 172 | void *cb_cls) |
189 | { | 173 | { |
174 | GNUNET_MQ_hd_var_size (status, | ||
175 | GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS, | ||
176 | struct GNUNET_TESTBED_BarrierStatusMsg); | ||
177 | struct GNUNET_TESTBED_BarrierWaitHandle *h | ||
178 | = GNUNET_new (struct GNUNET_TESTBED_BarrierWaitHandle); | ||
179 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
180 | make_status_handler (h), | ||
181 | GNUNET_MQ_handler_end () | ||
182 | }; | ||
183 | struct GNUNET_MQ_Envelope *env; | ||
190 | struct GNUNET_TESTBED_BarrierWait *msg; | 184 | struct GNUNET_TESTBED_BarrierWait *msg; |
191 | struct GNUNET_CONFIGURATION_Handle *cfg; | 185 | const char *cfg_filename; |
192 | struct GNUNET_TESTBED_BarrierWaitHandle *h; | ||
193 | char *cfg_filename; | ||
194 | size_t name_len; | 186 | size_t name_len; |
195 | uint16_t msize; | ||
196 | 187 | ||
197 | GNUNET_assert (NULL != cb); | 188 | GNUNET_assert (NULL != cb); |
198 | GNUNET_assert (NULL != name); | ||
199 | cfg_filename = getenv (ENV_TESTBED_CONFIG); | 189 | cfg_filename = getenv (ENV_TESTBED_CONFIG); |
200 | if (NULL == cfg_filename) | 190 | if (NULL == cfg_filename) |
201 | { | 191 | { |
202 | LOG (GNUNET_ERROR_TYPE_ERROR, "Are you running under testbed?\n"); | 192 | LOG (GNUNET_ERROR_TYPE_ERROR, |
193 | "Are you running under testbed?\n"); | ||
194 | GNUNET_free (h); | ||
203 | return NULL; | 195 | return NULL; |
204 | } | 196 | } |
205 | cfg = GNUNET_CONFIGURATION_create (); | 197 | h->cfg = GNUNET_CONFIGURATION_create (); |
206 | if (GNUNET_OK != GNUNET_CONFIGURATION_load (cfg, cfg_filename)) | 198 | if (GNUNET_OK != |
199 | GNUNET_CONFIGURATION_load (h->cfg, | ||
200 | cfg_filename)) | ||
207 | { | 201 | { |
208 | LOG (GNUNET_ERROR_TYPE_ERROR, "Unable to load configuration from file `%s'\n", | 202 | LOG (GNUNET_ERROR_TYPE_ERROR, |
203 | "Unable to load configuration from file `%s'\n", | ||
209 | cfg_filename); | 204 | cfg_filename); |
210 | GNUNET_CONFIGURATION_destroy (cfg); | 205 | GNUNET_CONFIGURATION_destroy (h->cfg); |
206 | GNUNET_free (h); | ||
211 | return NULL; | 207 | return NULL; |
212 | } | 208 | } |
213 | h = GNUNET_new (struct GNUNET_TESTBED_BarrierWaitHandle); | ||
214 | h->name = GNUNET_strdup (name); | 209 | h->name = GNUNET_strdup (name); |
215 | h->cfg = cfg; | ||
216 | h->conn = GNUNET_CLIENT_connect ("testbed-barrier", h->cfg); | ||
217 | h->cb = cb; | 210 | h->cb = cb; |
218 | h->cls = cls; | 211 | h->cb_cls = cb_cls; |
219 | if (NULL == h->conn) | 212 | h->mq = GNUNET_CLIENT_connecT (h->cfg, |
213 | "testbed-barrier", | ||
214 | handlers, | ||
215 | &mq_error_handler, | ||
216 | h); | ||
217 | if (NULL == h->mq) | ||
220 | { | 218 | { |
221 | LOG (GNUNET_ERROR_TYPE_ERROR, | 219 | LOG (GNUNET_ERROR_TYPE_ERROR, |
222 | "Unable to connect to local testbed-barrier service\n"); | 220 | "Unable to connect to local testbed-barrier service\n"); |
223 | GNUNET_TESTBED_barrier_wait_cancel (h); | 221 | GNUNET_TESTBED_barrier_wait_cancel (h); |
224 | return NULL; | 222 | return NULL; |
225 | } | 223 | } |
226 | name_len = strlen (name); | 224 | name_len = strlen (name); /* NOTE: unusual to not have 0-termination, change? */ |
227 | msize = sizeof (struct GNUNET_TESTBED_BarrierWait) + name_len; | 225 | env = GNUNET_MQ_msg_extra (msg, |
228 | msg = GNUNET_malloc (msize); | 226 | name_len, |
229 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT); | 227 | GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT); |
230 | msg->header.size = htons (msize); | 228 | memcpy (msg->name, |
231 | (void) memcpy (msg->name, name, name_len); | 229 | name, |
232 | h->msg = &msg->header; | 230 | name_len); |
233 | h->tx = | 231 | GNUNET_MQ_send (h->mq, |
234 | GNUNET_CLIENT_notify_transmit_ready (h->conn, msize, | 232 | env); |
235 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
236 | GNUNET_NO, | ||
237 | &transmit_notify, | ||
238 | h); | ||
239 | return h; | 233 | return h; |
240 | } | 234 | } |
241 | 235 | ||
@@ -248,13 +242,12 @@ GNUNET_TESTBED_barrier_wait (const char *name, | |||
248 | void | 242 | void |
249 | GNUNET_TESTBED_barrier_wait_cancel (struct GNUNET_TESTBED_BarrierWaitHandle *h) | 243 | GNUNET_TESTBED_barrier_wait_cancel (struct GNUNET_TESTBED_BarrierWaitHandle *h) |
250 | { | 244 | { |
245 | if (NULL != h->mq) | ||
246 | { | ||
247 | GNUNET_MQ_destroy (h->mq); | ||
248 | h->mq = NULL; | ||
249 | } | ||
251 | GNUNET_free (h->name); | 250 | GNUNET_free (h->name); |
252 | if (NULL != h->tx) | ||
253 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->tx); | ||
254 | if (NULL != h->conn) | ||
255 | GNUNET_CLIENT_disconnect (h->conn); | ||
256 | if (NULL != h->msg) | ||
257 | GNUNET_free (h->msg); | ||
258 | GNUNET_CONFIGURATION_destroy (h->cfg); | 251 | GNUNET_CONFIGURATION_destroy (h->cfg); |
259 | GNUNET_free (h); | 252 | GNUNET_free (h); |
260 | } | 253 | } |