aboutsummaryrefslogtreecommitdiff
path: root/src/testbed/testbed_api_barriers.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-06-27 14:43:43 +0000
committerChristian Grothoff <christian@grothoff.org>2016-06-27 14:43:43 +0000
commitb0474eb7a6aa570c04e4d76fd664caa406b3dd30 (patch)
tree3c75a22df7f927addeb3b6ca51c266088b6978fd /src/testbed/testbed_api_barriers.c
parent6f9c566127d0737ecd529f458dc28b7fc02e6eb9 (diff)
downloadgnunet-b0474eb7a6aa570c04e4d76fd664caa406b3dd30.tar.gz
gnunet-b0474eb7a6aa570c04e4d76fd664caa406b3dd30.zip
convert testbed_api_barriers to new MQ API
Diffstat (limited to 'src/testbed/testbed_api_barriers.c')
-rw-r--r--src/testbed/testbed_api_barriers.c205
1 files changed, 99 insertions, 106 deletions
diff --git a/src/testbed/testbed_api_barriers.c b/src/testbed/testbed_api_barriers.c
index 3ec4a905e..74139cc53 100644
--- a/src/testbed/testbed_api_barriers.c
+++ b/src/testbed/testbed_api_barriers.c
@@ -56,19 +56,9 @@ struct GNUNET_TESTBED_BarrierWaitHandle
56 struct GNUNET_CONFIGURATION_Handle *cfg; 56 struct GNUNET_CONFIGURATION_Handle *cfg;
57 57
58 /** 58 /**
59 * The client connection 59 * The testbed-barrier service message queue.
60 */ 60 */
61 struct GNUNET_CLIENT_Connection *conn; 61 struct GNUNET_MQ_Handle *mq;
62
63 /**
64 * Transmit handle
65 */
66 struct GNUNET_CLIENT_TransmitHandle *tx;
67
68 /**
69 * The message to transmit with tx
70 */
71 struct GNUNET_MessageHeader *msg;
72 62
73 /** 63 /**
74 * The barrier wait callback 64 * The barrier wait callback
@@ -76,98 +66,92 @@ struct GNUNET_TESTBED_BarrierWaitHandle
76 GNUNET_TESTBED_barrier_wait_cb cb; 66 GNUNET_TESTBED_barrier_wait_cb cb;
77 67
78 /** 68 /**
79 * The closure for the above callback 69 * The closure for @e cb.
80 */ 70 */
81 void *cls; 71 void *cb_cls;
82}; 72};
83 73
84 74
85 75
86/** 76/**
77 * Check if barrier status message is well-formed.
78 *
79 * @param cls closure
80 * @param msg received message
81 * @return #GNUNET_OK if the message is well-formed.
82 */
83static int
84check_status (void *cls,
85 const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
86{
87 /* FIXME: this fails to actually check that the message
88 follows the protocol spec (0-terminations!). However,
89 not critical as #handle_status() doesn't interpret the
90 variable-size part anyway right now. */
91 return GNUNET_OK;
92}
93
94
95/**
87 * Type of a function to call when we receive a message 96 * Type of a function to call when we receive a message
88 * from the service. 97 * from the service.
89 * 98 *
90 * @param cls closure 99 * @param cls closure
91 * @param message received message; NULL on timeout or fatal error 100 * @param msg received message
92 */ 101 */
93static void 102static void
94receive_handler (void *cls, 103handle_status (void *cls,
95 const struct GNUNET_MessageHeader *message) 104 const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
96{ 105{
97 struct GNUNET_TESTBED_BarrierWaitHandle *h = cls; 106 struct GNUNET_TESTBED_BarrierWaitHandle *h = cls;
98 const struct GNUNET_TESTBED_BarrierStatusMsg *msg;
99 uint16_t msize;
100 107
101 if (NULL == message)
102 {
103 GNUNET_break_op (0);
104 goto fail;
105 }
106 if (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS != ntohs (message->type))
107 {
108 GNUNET_break_op (0);
109 goto fail;
110 }
111 msize = ntohs (message->size);
112 if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg))
113 {
114 GNUNET_break_op (0);
115 goto fail;
116 }
117 msg = (const struct GNUNET_TESTBED_BarrierStatusMsg *) message;
118 switch (ntohs (msg->status)) 108 switch (ntohs (msg->status))
119 { 109 {
120 case GNUNET_TESTBED_BARRIERSTATUS_ERROR: 110 case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
121 goto fail; 111 h->cb (h->cb_cls,
112 h->name,
113 GNUNET_SYSERR);
114 break;
122 case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED: 115 case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED:
116 h->cb (h->cb_cls,
117 h->name,
118 GNUNET_SYSERR);
123 GNUNET_break (0); 119 GNUNET_break (0);
124 goto fail; 120 break;
125 case GNUNET_TESTBED_BARRIERSTATUS_CROSSED: 121 case GNUNET_TESTBED_BARRIERSTATUS_CROSSED:
126 h->cb (h->cls, h->name, GNUNET_OK); 122 h->cb (h->cb_cls,
127 goto destroy; 123 h->name,
124 GNUNET_OK);
125 break;
128 default: 126 default:
129 GNUNET_break_op (0); 127 GNUNET_break_op (0);
128 h->cb (h->cb_cls,
129 h->name,
130 GNUNET_SYSERR);
131 break;
130 } 132 }
131
132 fail:
133 h->cb (h->cls, h->name, GNUNET_SYSERR);
134
135 destroy:
136 GNUNET_TESTBED_barrier_wait_cancel (h); 133 GNUNET_TESTBED_barrier_wait_cancel (h);
137} 134}
138 135
139 136
140/** 137/**
141 * Function called to notify a client about the connection 138 * Generic error handler, called with the appropriate error code and
142 * begin ready to queue more data. "buf" will be 139 * the same closure specified at the creation of the message queue.
143 * NULL and "size" zero if the connection was closed for 140 * Not every message queue implementation supports an error handler.
144 * writing in the meantime.
145 * 141 *
146 * @param cls closure 142 * @param cls closure with the `struct GNUNET_TESTBED_BarrierWaitHandle *`
147 * @param size number of bytes available in buf 143 * @param error error code
148 * @param buf where the callee should write the message
149 * @return number of bytes written to buf
150 */ 144 */
151static size_t 145static void
152transmit_notify (void *cls, size_t size, void *buf) 146mq_error_handler (void *cls,
147 enum GNUNET_MQ_Error error)
153{ 148{
154 struct GNUNET_TESTBED_BarrierWaitHandle *h = cls; 149 struct GNUNET_TESTBED_BarrierWaitHandle *h = cls;
155 uint16_t msize;
156 150
157 h->tx = NULL; 151 h->cb (h->cb_cls,
158 if ((0 == size) || (NULL == buf)) 152 h->name,
159 { 153 GNUNET_SYSERR);
160 h->cb (h->cls, h->name, GNUNET_SYSERR); 154 GNUNET_TESTBED_barrier_wait_cancel (h);
161 GNUNET_TESTBED_barrier_wait_cancel (h);
162 return 0;
163 }
164 msize = htons (h->msg->size);
165 GNUNET_assert (msize <= size);
166 (void) memcpy (buf, h->msg, msize);
167 GNUNET_free (h->msg);
168 h->msg = NULL;
169 GNUNET_CLIENT_receive (h->conn, &receive_handler, h, GNUNET_TIME_UNIT_FOREVER_REL);
170 return msize;
171} 155}
172 156
173 157
@@ -178,64 +162,74 @@ transmit_notify (void *cls, size_t size, void *buf)
178 * 162 *
179 * @param name the name of the barrier 163 * @param name the name of the barrier
180 * @param cb the barrier wait callback 164 * @param cb the barrier wait callback
181 * @param cls the closure for the above callback 165 * @param cb_cls the closure for @a cb
182 * @return barrier wait handle which can be used to cancel the waiting at 166 * @return barrier wait handle which can be used to cancel the waiting at
183 * anytime before the callback is called. NULL upon error. 167 * anytime before the callback is called. NULL upon error.
184 */ 168 */
185struct GNUNET_TESTBED_BarrierWaitHandle * 169struct GNUNET_TESTBED_BarrierWaitHandle *
186GNUNET_TESTBED_barrier_wait (const char *name, 170GNUNET_TESTBED_barrier_wait (const char *name,
187 GNUNET_TESTBED_barrier_wait_cb cb, 171 GNUNET_TESTBED_barrier_wait_cb cb,
188 void *cls) 172 void *cb_cls)
189{ 173{
174 GNUNET_MQ_hd_var_size (status,
175 GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS,
176 struct GNUNET_TESTBED_BarrierStatusMsg);
177 struct GNUNET_TESTBED_BarrierWaitHandle *h
178 = GNUNET_new (struct GNUNET_TESTBED_BarrierWaitHandle);
179 struct GNUNET_MQ_MessageHandler handlers[] = {
180 make_status_handler (h),
181 GNUNET_MQ_handler_end ()
182 };
183 struct GNUNET_MQ_Envelope *env;
190 struct GNUNET_TESTBED_BarrierWait *msg; 184 struct GNUNET_TESTBED_BarrierWait *msg;
191 struct GNUNET_CONFIGURATION_Handle *cfg; 185 const char *cfg_filename;
192 struct GNUNET_TESTBED_BarrierWaitHandle *h;
193 char *cfg_filename;
194 size_t name_len; 186 size_t name_len;
195 uint16_t msize;
196 187
197 GNUNET_assert (NULL != cb); 188 GNUNET_assert (NULL != cb);
198 GNUNET_assert (NULL != name);
199 cfg_filename = getenv (ENV_TESTBED_CONFIG); 189 cfg_filename = getenv (ENV_TESTBED_CONFIG);
200 if (NULL == cfg_filename) 190 if (NULL == cfg_filename)
201 { 191 {
202 LOG (GNUNET_ERROR_TYPE_ERROR, "Are you running under testbed?\n"); 192 LOG (GNUNET_ERROR_TYPE_ERROR,
193 "Are you running under testbed?\n");
194 GNUNET_free (h);
203 return NULL; 195 return NULL;
204 } 196 }
205 cfg = GNUNET_CONFIGURATION_create (); 197 h->cfg = GNUNET_CONFIGURATION_create ();
206 if (GNUNET_OK != GNUNET_CONFIGURATION_load (cfg, cfg_filename)) 198 if (GNUNET_OK !=
199 GNUNET_CONFIGURATION_load (h->cfg,
200 cfg_filename))
207 { 201 {
208 LOG (GNUNET_ERROR_TYPE_ERROR, "Unable to load configuration from file `%s'\n", 202 LOG (GNUNET_ERROR_TYPE_ERROR,
203 "Unable to load configuration from file `%s'\n",
209 cfg_filename); 204 cfg_filename);
210 GNUNET_CONFIGURATION_destroy (cfg); 205 GNUNET_CONFIGURATION_destroy (h->cfg);
206 GNUNET_free (h);
211 return NULL; 207 return NULL;
212 } 208 }
213 h = GNUNET_new (struct GNUNET_TESTBED_BarrierWaitHandle);
214 h->name = GNUNET_strdup (name); 209 h->name = GNUNET_strdup (name);
215 h->cfg = cfg;
216 h->conn = GNUNET_CLIENT_connect ("testbed-barrier", h->cfg);
217 h->cb = cb; 210 h->cb = cb;
218 h->cls = cls; 211 h->cb_cls = cb_cls;
219 if (NULL == h->conn) 212 h->mq = GNUNET_CLIENT_connecT (h->cfg,
213 "testbed-barrier",
214 handlers,
215 &mq_error_handler,
216 h);
217 if (NULL == h->mq)
220 { 218 {
221 LOG (GNUNET_ERROR_TYPE_ERROR, 219 LOG (GNUNET_ERROR_TYPE_ERROR,
222 "Unable to connect to local testbed-barrier service\n"); 220 "Unable to connect to local testbed-barrier service\n");
223 GNUNET_TESTBED_barrier_wait_cancel (h); 221 GNUNET_TESTBED_barrier_wait_cancel (h);
224 return NULL; 222 return NULL;
225 } 223 }
226 name_len = strlen (name); 224 name_len = strlen (name); /* NOTE: unusual to not have 0-termination, change? */
227 msize = sizeof (struct GNUNET_TESTBED_BarrierWait) + name_len; 225 env = GNUNET_MQ_msg_extra (msg,
228 msg = GNUNET_malloc (msize); 226 name_len,
229 msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT); 227 GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT);
230 msg->header.size = htons (msize); 228 memcpy (msg->name,
231 (void) memcpy (msg->name, name, name_len); 229 name,
232 h->msg = &msg->header; 230 name_len);
233 h->tx = 231 GNUNET_MQ_send (h->mq,
234 GNUNET_CLIENT_notify_transmit_ready (h->conn, msize, 232 env);
235 GNUNET_TIME_UNIT_FOREVER_REL,
236 GNUNET_NO,
237 &transmit_notify,
238 h);
239 return h; 233 return h;
240} 234}
241 235
@@ -248,13 +242,12 @@ GNUNET_TESTBED_barrier_wait (const char *name,
248void 242void
249GNUNET_TESTBED_barrier_wait_cancel (struct GNUNET_TESTBED_BarrierWaitHandle *h) 243GNUNET_TESTBED_barrier_wait_cancel (struct GNUNET_TESTBED_BarrierWaitHandle *h)
250{ 244{
245 if (NULL != h->mq)
246 {
247 GNUNET_MQ_destroy (h->mq);
248 h->mq = NULL;
249 }
251 GNUNET_free (h->name); 250 GNUNET_free (h->name);
252 if (NULL != h->tx)
253 GNUNET_CLIENT_notify_transmit_ready_cancel (h->tx);
254 if (NULL != h->conn)
255 GNUNET_CLIENT_disconnect (h->conn);
256 if (NULL != h->msg)
257 GNUNET_free (h->msg);
258 GNUNET_CONFIGURATION_destroy (h->cfg); 251 GNUNET_CONFIGURATION_destroy (h->cfg);
259 GNUNET_free (h); 252 GNUNET_free (h);
260} 253}