diff options
author | Gabor X Toth <*@tg-x.net> | 2016-08-04 20:10:17 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2016-08-04 20:10:17 +0000 |
commit | cb073fc0a243ccd29cb0dc7eb5c7e7cb33b6e97f (patch) | |
tree | e07785a96079274c0162794b0bda68b8296c5572 /src/psycstore | |
parent | 42fcd295dbcd71c399cf854525d86879095e4555 (diff) | |
download | gnunet-cb073fc0a243ccd29cb0dc7eb5c7e7cb33b6e97f.tar.gz gnunet-cb073fc0a243ccd29cb0dc7eb5c7e7cb33b6e97f.zip |
psycstore: switch to MQ
Diffstat (limited to 'src/psycstore')
-rw-r--r-- | src/psycstore/gnunet-service-psycstore.c | 5 | ||||
-rw-r--r-- | src/psycstore/psycstore_api.c | 1159 |
2 files changed, 452 insertions, 712 deletions
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c index 33e894b5e..c9a6f22b8 100644 --- a/src/psycstore/gnunet-service-psycstore.c +++ b/src/psycstore/gnunet-service-psycstore.c | |||
@@ -520,6 +520,11 @@ recv_state_message_part (void *cls, | |||
520 | struct StateModifyClosure *scls = cls; | 520 | struct StateModifyClosure *scls = cls; |
521 | uint16_t psize; | 521 | uint16_t psize; |
522 | 522 | ||
523 | if (NULL == msg) | ||
524 | { // FIXME: error on unknown message | ||
525 | return; | ||
526 | } | ||
527 | |||
523 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 528 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
524 | "recv_state_message_part() message_id: %" PRIu64 | 529 | "recv_state_message_part() message_id: %" PRIu64 |
525 | ", fragment_offset: %" PRIu64 ", flags: %u\n", | 530 | ", fragment_offset: %" PRIu64 ", flags: %u\n", |
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c index 379483e80..94b7ff9f5 100644 --- a/src/psycstore/psycstore_api.c +++ b/src/psycstore/psycstore_api.c | |||
@@ -37,8 +37,6 @@ | |||
37 | 37 | ||
38 | #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__) | 38 | #define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__) |
39 | 39 | ||
40 | typedef void (*DataCallback) (); | ||
41 | |||
42 | /** | 40 | /** |
43 | * Handle for an operation with the PSYCstore service. | 41 | * Handle for an operation with the PSYCstore service. |
44 | */ | 42 | */ |
@@ -51,40 +49,28 @@ struct GNUNET_PSYCSTORE_OperationHandle | |||
51 | struct GNUNET_PSYCSTORE_Handle *h; | 49 | struct GNUNET_PSYCSTORE_Handle *h; |
52 | 50 | ||
53 | /** | 51 | /** |
54 | * We keep operations in a DLL. | 52 | * Data callbacks. |
55 | */ | ||
56 | struct GNUNET_PSYCSTORE_OperationHandle *next; | ||
57 | |||
58 | /** | ||
59 | * We keep operations in a DLL. | ||
60 | */ | 53 | */ |
61 | struct GNUNET_PSYCSTORE_OperationHandle *prev; | 54 | union { |
55 | GNUNET_PSYCSTORE_FragmentCallback fragment_cb; | ||
56 | GNUNET_PSYCSTORE_CountersCallback counters_cb; | ||
57 | GNUNET_PSYCSTORE_StateCallback state_cb; | ||
58 | }; | ||
62 | 59 | ||
63 | /** | 60 | /** |
64 | * Continuation to invoke with the result of an operation. | 61 | * Closure for callbacks. |
65 | */ | 62 | */ |
66 | GNUNET_PSYCSTORE_ResultCallback res_cb; | 63 | void *cls; |
67 | |||
68 | /** | ||
69 | * Continuation to invoke with the result of an operation returning data. | ||
70 | */ | ||
71 | DataCallback data_cb; | ||
72 | 64 | ||
73 | /** | 65 | /** |
74 | * Closure for the callbacks. | 66 | * Message envelope. |
75 | */ | 67 | */ |
76 | void *cls; | 68 | struct GNUNET_MQ_Envelope *env; |
77 | 69 | ||
78 | /** | 70 | /** |
79 | * Operation ID. | 71 | * Operation ID. |
80 | */ | 72 | */ |
81 | uint64_t op_id; | 73 | uint64_t op_id; |
82 | |||
83 | /** | ||
84 | * Message to send to the PSYCstore service. | ||
85 | * Allocated at the end of this struct. | ||
86 | */ | ||
87 | const struct GNUNET_MessageHeader *msg; | ||
88 | }; | 74 | }; |
89 | 75 | ||
90 | 76 | ||
@@ -99,34 +85,15 @@ struct GNUNET_PSYCSTORE_Handle | |||
99 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 85 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
100 | 86 | ||
101 | /** | 87 | /** |
102 | * Socket (if available). | 88 | * Client connection. |
103 | */ | ||
104 | struct GNUNET_CLIENT_Connection *client; | ||
105 | |||
106 | /** | ||
107 | * Head of operations to transmit. | ||
108 | */ | 89 | */ |
109 | struct GNUNET_PSYCSTORE_OperationHandle *transmit_head; | 90 | struct GNUNET_MQ_Handle *mq; |
110 | 91 | ||
111 | /** | ||
112 | * Tail of operations to transmit. | ||
113 | */ | ||
114 | struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail; | ||
115 | 92 | ||
116 | /** | 93 | /** |
117 | * Head of active operations waiting for response. | 94 | * Async operations. |
118 | */ | 95 | */ |
119 | struct GNUNET_PSYCSTORE_OperationHandle *op_head; | 96 | struct GNUNET_OP_Handle *op; |
120 | |||
121 | /** | ||
122 | * Tail of active operations waiting for response. | ||
123 | */ | ||
124 | struct GNUNET_PSYCSTORE_OperationHandle *op_tail; | ||
125 | |||
126 | /** | ||
127 | * Currently pending transmission request, or NULL for none. | ||
128 | */ | ||
129 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
130 | 97 | ||
131 | /** | 98 | /** |
132 | * Task doing exponential back-off trying to reconnect. | 99 | * Task doing exponential back-off trying to reconnect. |
@@ -134,395 +101,258 @@ struct GNUNET_PSYCSTORE_Handle | |||
134 | struct GNUNET_SCHEDULER_Task *reconnect_task; | 101 | struct GNUNET_SCHEDULER_Task *reconnect_task; |
135 | 102 | ||
136 | /** | 103 | /** |
137 | * Time for next connect retry. | 104 | * Delay for next connect retry. |
138 | */ | 105 | */ |
139 | struct GNUNET_TIME_Relative reconnect_delay; | 106 | struct GNUNET_TIME_Relative reconnect_delay; |
140 | 107 | ||
141 | /** | ||
142 | * Last operation ID used. | ||
143 | */ | ||
144 | uint64_t last_op_id; | ||
145 | 108 | ||
109 | GNUNET_PSYCSTORE_FragmentCallback *fragment_cb; | ||
110 | |||
111 | GNUNET_PSYCSTORE_CountersCallback *counters_cb; | ||
112 | |||
113 | GNUNET_PSYCSTORE_StateCallback *state_cb; | ||
146 | /** | 114 | /** |
147 | * Are we polling for incoming messages right now? | 115 | * Closure for callbacks. |
148 | */ | 116 | */ |
149 | uint8_t in_receive; | 117 | void *cb_cls; |
150 | }; | 118 | }; |
151 | 119 | ||
152 | 120 | ||
153 | /** | 121 | static int |
154 | * Get a fresh operation ID to distinguish between PSYCstore requests. | 122 | check_result_code (void *cls, const struct OperationResult *opres) |
155 | * | ||
156 | * @param h Handle to the PSYCstore service. | ||
157 | * @return next operation id to use | ||
158 | */ | ||
159 | static uint64_t | ||
160 | get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h) | ||
161 | { | 123 | { |
162 | return h->last_op_id++; | 124 | uint16_t size = ntohs (opres->header.size); |
163 | } | 125 | const char *str = (const char *) &opres[1]; |
164 | 126 | if ( (sizeof (struct OperationResult) < size) && | |
165 | 127 | ('\0' != str[size - sizeof (*opres) - 1]) ) | |
166 | /** | ||
167 | * Find operation by ID. | ||
168 | * | ||
169 | * @return OperationHandle if found, or NULL otherwise. | ||
170 | */ | ||
171 | static struct GNUNET_PSYCSTORE_OperationHandle * | ||
172 | find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint64_t op_id) | ||
173 | { | ||
174 | struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; | ||
175 | while (NULL != op) | ||
176 | { | 128 | { |
177 | if (op->op_id == op_id) | 129 | GNUNET_break (0); |
178 | return op; | 130 | return GNUNET_SYSERR; |
179 | op = op->next; | ||
180 | } | 131 | } |
181 | return NULL; | ||
182 | } | ||
183 | |||
184 | 132 | ||
185 | /** | 133 | return GNUNET_OK; |
186 | * Try again to connect to the PSYCstore service. | 134 | } |
187 | * | ||
188 | * @param cls handle to the PSYCstore service. | ||
189 | */ | ||
190 | static void | ||
191 | reconnect (void *cls); | ||
192 | 135 | ||
193 | 136 | ||
194 | /** | ||
195 | * Reschedule a connect attempt to the service. | ||
196 | * | ||
197 | * @param h transport service to reconnect | ||
198 | */ | ||
199 | static void | 137 | static void |
200 | reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h) | 138 | handle_result_code (void *cls, const struct OperationResult *opres) |
201 | { | 139 | { |
202 | GNUNET_assert (h->reconnect_task == NULL); | 140 | struct GNUNET_PSYCSTORE_Handle *h = cls; |
141 | struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; | ||
142 | uint16_t size = ntohs (opres->header.size); | ||
143 | |||
144 | const char * | ||
145 | str = (sizeof (*opres) < size) ? (const char *) &opres[1] : ""; | ||
203 | 146 | ||
204 | if (NULL != h->th) | 147 | if (GNUNET_YES == GNUNET_OP_result (h->op, GNUNET_ntohll (opres->op_id), |
148 | GNUNET_ntohll (opres->result_code) + INT64_MIN, | ||
149 | str, size - sizeof (*opres), (void **) &op)) | ||
205 | { | 150 | { |
206 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | 151 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
207 | h->th = NULL; | 152 | "handle_result_code: Received result message with operation ID: %" PRIu64 "\n", |
153 | GNUNET_ntohll (opres->op_id)); | ||
154 | GNUNET_free (op); | ||
208 | } | 155 | } |
209 | if (NULL != h->client) | 156 | else |
210 | { | 157 | { |
211 | GNUNET_CLIENT_disconnect (h->client); | 158 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
212 | h->client = NULL; | 159 | "handle_result_code: No callback registered for operation with ID %" PRIu64 ".\n", |
160 | GNUNET_ntohll (opres->op_id)); | ||
213 | } | 161 | } |
214 | h->in_receive = GNUNET_NO; | 162 | h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; |
215 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
216 | "Scheduling task to reconnect to PSYCstore service in %s.\n", | ||
217 | GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES)); | ||
218 | h->reconnect_task = | ||
219 | GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); | ||
220 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); | ||
221 | } | 163 | } |
222 | 164 | ||
223 | 165 | ||
224 | /** | ||
225 | * Schedule transmission of the next message from our queue. | ||
226 | * | ||
227 | * @param h PSYCstore handle | ||
228 | */ | ||
229 | static void | 166 | static void |
230 | transmit_next (struct GNUNET_PSYCSTORE_Handle *h); | 167 | handle_result_counters (void *cls, const struct CountersResult *cres) |
231 | |||
232 | |||
233 | /** | ||
234 | * Type of a function to call when we receive a message | ||
235 | * from the service. | ||
236 | * | ||
237 | * @param cls closure | ||
238 | * @param msg message received, NULL on timeout or fatal error | ||
239 | */ | ||
240 | static void | ||
241 | message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | ||
242 | { | 168 | { |
243 | struct GNUNET_PSYCSTORE_Handle *h = cls; | 169 | struct GNUNET_PSYCSTORE_Handle *h = cls; |
244 | struct GNUNET_PSYCSTORE_OperationHandle *op; | 170 | struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; |
245 | const struct OperationResult *opres; | ||
246 | const struct CountersResult *cres; | ||
247 | const struct FragmentResult *fres; | ||
248 | const struct StateResult *sres; | ||
249 | const char *str; | ||
250 | |||
251 | if (NULL == msg) | ||
252 | { | ||
253 | reschedule_connect (h); | ||
254 | return; | ||
255 | } | ||
256 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
257 | "Received message of type %d from PSYCstore service.\n", | ||
258 | ntohs (msg->type)); | ||
259 | uint16_t size = ntohs (msg->size); | ||
260 | uint16_t type = ntohs (msg->type); | ||
261 | switch (type) | ||
262 | { | ||
263 | case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE: | ||
264 | if (size < sizeof (struct OperationResult)) | ||
265 | { | ||
266 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
267 | "Received message of type %d with length %lu bytes. " | ||
268 | "Expected >= %lu\n", | ||
269 | type, size, sizeof (struct OperationResult)); | ||
270 | GNUNET_break (0); | ||
271 | reschedule_connect (h); | ||
272 | return; | ||
273 | } | ||
274 | |||
275 | opres = (const struct OperationResult *) msg; | ||
276 | str = (const char *) &opres[1]; | ||
277 | if ( (size > sizeof (struct OperationResult)) && | ||
278 | ('\0' != str[size - sizeof (struct OperationResult) - 1]) ) | ||
279 | { | ||
280 | GNUNET_break (0); | ||
281 | reschedule_connect (h); | ||
282 | return; | ||
283 | } | ||
284 | if (size == sizeof (struct OperationResult)) | ||
285 | str = ""; | ||
286 | |||
287 | op = find_op_by_id (h, GNUNET_ntohll (opres->op_id)); | ||
288 | if (NULL == op) | ||
289 | { | ||
290 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
291 | "No callback registered for operation with ID %" PRIu64 ".\n", | ||
292 | type, GNUNET_ntohll (opres->op_id)); | ||
293 | } | ||
294 | else | ||
295 | { | ||
296 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
297 | "Received result message (type %d) with operation ID: %" PRIu64 "\n", | ||
298 | type, op->op_id); | ||
299 | |||
300 | int64_t result_code = GNUNET_ntohll (opres->result_code) + INT64_MIN; | ||
301 | GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); | ||
302 | if (NULL != op->res_cb) | ||
303 | { | ||
304 | const struct StateSyncRequest *ssreq; | ||
305 | switch (ntohs (op->msg->type)) | ||
306 | { | ||
307 | case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC: | ||
308 | ssreq = (const struct StateSyncRequest *) op->msg; | ||
309 | if (!(ssreq->flags & STATE_OP_LAST | ||
310 | || GNUNET_OK != result_code)) | ||
311 | op->res_cb = NULL; | ||
312 | break; | ||
313 | } | ||
314 | } | ||
315 | if (NULL != op->res_cb) | ||
316 | op->res_cb (op->cls, result_code, str, size - sizeof (*opres)); | ||
317 | GNUNET_free (op); | ||
318 | } | ||
319 | break; | ||
320 | |||
321 | case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS: | ||
322 | if (size != sizeof (struct CountersResult)) | ||
323 | { | ||
324 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
325 | "Received message of type %d with length %lu bytes. " | ||
326 | "Expected %lu\n", | ||
327 | type, size, sizeof (struct CountersResult)); | ||
328 | GNUNET_break (0); | ||
329 | reschedule_connect (h); | ||
330 | return; | ||
331 | } | ||
332 | |||
333 | cres = (const struct CountersResult *) msg; | ||
334 | 171 | ||
335 | op = find_op_by_id (h, GNUNET_ntohll (cres->op_id)); | 172 | if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (cres->op_id), |
336 | if (NULL == op) | 173 | NULL, NULL, (void **) &op)) |
337 | { | 174 | { |
338 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 175 | GNUNET_assert (NULL != op); |
339 | "No callback registered for operation with ID %" PRIu64 ".\n", | 176 | if (NULL != op->counters_cb) |
340 | type, GNUNET_ntohll (cres->op_id)); | ||
341 | } | ||
342 | else | ||
343 | { | 177 | { |
344 | GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); | 178 | op->counters_cb (op->cls, |
345 | if (NULL != op->data_cb) | ||
346 | ((GNUNET_PSYCSTORE_CountersCallback) | ||
347 | op->data_cb) (op->cls, | ||
348 | ntohl (cres->result_code), | 179 | ntohl (cres->result_code), |
349 | GNUNET_ntohll (cres->max_fragment_id), | 180 | GNUNET_ntohll (cres->max_fragment_id), |
350 | GNUNET_ntohll (cres->max_message_id), | 181 | GNUNET_ntohll (cres->max_message_id), |
351 | GNUNET_ntohll (cres->max_group_generation), | 182 | GNUNET_ntohll (cres->max_group_generation), |
352 | GNUNET_ntohll (cres->max_state_message_id)); | 183 | GNUNET_ntohll (cres->max_state_message_id)); |
353 | GNUNET_free (op); | ||
354 | } | 184 | } |
355 | break; | 185 | GNUNET_OP_remove (h->op, GNUNET_ntohll (cres->op_id)); |
186 | GNUNET_free (op); | ||
187 | } | ||
188 | else | ||
189 | { | ||
190 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
191 | "handle_result_counters: No callback registered for operation with ID %" PRIu64 ".\n", | ||
192 | GNUNET_ntohll (cres->op_id)); | ||
193 | } | ||
194 | h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; | ||
195 | } | ||
356 | 196 | ||
357 | case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT: | ||
358 | if (size < sizeof (struct FragmentResult)) | ||
359 | { | ||
360 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
361 | "Received message of type %d with length %lu bytes. " | ||
362 | "Expected >= %lu\n", | ||
363 | type, size, sizeof (struct FragmentResult)); | ||
364 | GNUNET_break (0); | ||
365 | reschedule_connect (h); | ||
366 | return; | ||
367 | } | ||
368 | 197 | ||
369 | fres = (const struct FragmentResult *) msg; | 198 | static int |
370 | struct GNUNET_MULTICAST_MessageHeader *mmsg = | 199 | check_result_fragment (void *cls, const struct FragmentResult *fres) |
371 | (struct GNUNET_MULTICAST_MessageHeader *) &fres[1]; | 200 | { |
372 | if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size)) | 201 | uint16_t size = ntohs (fres->header.size); |
373 | { | 202 | struct GNUNET_MULTICAST_MessageHeader *mmsg = |
374 | LOG (GNUNET_ERROR_TYPE_ERROR, | 203 | (struct GNUNET_MULTICAST_MessageHeader *) &fres[1]; |
375 | "Received message of type %d with length %lu bytes. " | 204 | if (sizeof (*fres) + sizeof (*mmsg) < size |
376 | "Expected = %lu\n", | 205 | && sizeof (*fres) + ntohs (mmsg->header.size) != size) |
377 | type, size, | 206 | { |
378 | sizeof (struct FragmentResult) + ntohs (mmsg->header.size)); | 207 | LOG (GNUNET_ERROR_TYPE_ERROR, |
379 | GNUNET_break (0); | 208 | "check_result_fragment: Received message with invalid length %lu bytes.\n", |
380 | reschedule_connect (h); | 209 | size, sizeof (*fres)); |
381 | return; | 210 | GNUNET_break (0); |
382 | } | 211 | return GNUNET_SYSERR; |
212 | } | ||
213 | return GNUNET_OK; | ||
214 | } | ||
383 | 215 | ||
384 | op = find_op_by_id (h, GNUNET_ntohll (fres->op_id)); | ||
385 | if (NULL == op) | ||
386 | { | ||
387 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
388 | "No callback registered for operation with ID %" PRIu64 ".\n", | ||
389 | type, GNUNET_ntohll (fres->op_id)); | ||
390 | } | ||
391 | else | ||
392 | { | ||
393 | if (NULL != op->data_cb) | ||
394 | ((GNUNET_PSYCSTORE_FragmentCallback) | ||
395 | op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags)); | ||
396 | } | ||
397 | break; | ||
398 | 216 | ||
399 | case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE: | 217 | static void |
400 | if (size < sizeof (struct StateResult)) | 218 | handle_result_fragment (void *cls, const struct FragmentResult *fres) |
401 | { | 219 | { |
402 | LOG (GNUNET_ERROR_TYPE_ERROR, | 220 | struct GNUNET_PSYCSTORE_Handle *h = cls; |
403 | "Received message of type %d with length %lu bytes. " | 221 | struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; |
404 | "Expected >= %lu\n", | ||
405 | type, size, sizeof (struct StateResult)); | ||
406 | GNUNET_break (0); | ||
407 | reschedule_connect (h); | ||
408 | return; | ||
409 | } | ||
410 | 222 | ||
411 | sres = (const struct StateResult *) msg; | 223 | if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (fres->op_id), |
412 | const char *name = (const char *) &sres[1]; | 224 | NULL, NULL, (void **) &op)) |
413 | uint16_t name_size = ntohs (sres->name_size); | 225 | { |
226 | GNUNET_assert (NULL != op); | ||
227 | if (NULL != op->fragment_cb) | ||
228 | op->fragment_cb (op->cls, | ||
229 | (struct GNUNET_MULTICAST_MessageHeader *) &fres[1], | ||
230 | ntohl (fres->psycstore_flags)); | ||
231 | //GNUNET_OP_remove (h->op, GNUNET_ntohll (fres->op_id)); | ||
232 | //GNUNET_free (op); | ||
233 | } | ||
234 | else | ||
235 | { | ||
236 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
237 | "handle_result_fragment: No callback registered for operation with ID %" PRIu64 ".\n", | ||
238 | GNUNET_ntohll (fres->op_id)); | ||
239 | } | ||
240 | h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; | ||
241 | } | ||
414 | 242 | ||
415 | if (name_size <= 2 || '\0' != name[name_size - 1]) | ||
416 | { | ||
417 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
418 | "Received state result message (type %d) with invalid name.\n", | ||
419 | type); | ||
420 | GNUNET_break (0); | ||
421 | reschedule_connect (h); | ||
422 | return; | ||
423 | } | ||
424 | 243 | ||
425 | op = find_op_by_id (h, GNUNET_ntohll (sres->op_id)); | 244 | static int |
426 | if (NULL == op) | 245 | check_result_state (void *cls, const struct StateResult *sres) |
427 | { | 246 | { |
428 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 247 | const char *name = (const char *) &sres[1]; |
429 | "No callback registered for operation with ID %" PRIu64 ".\n", | 248 | uint16_t name_size = ntohs (sres->name_size); |
430 | type, GNUNET_ntohll (sres->op_id)); | ||
431 | } | ||
432 | else | ||
433 | { | ||
434 | if (NULL != op->data_cb) | ||
435 | ((GNUNET_PSYCSTORE_StateCallback) | ||
436 | op->data_cb) (op->cls, name, (char *) &sres[1] + name_size, | ||
437 | ntohs (sres->header.size) - sizeof (*sres) - name_size); | ||
438 | } | ||
439 | break; | ||
440 | 249 | ||
441 | default: | 250 | if (name_size <= 2 || '\0' != name[name_size - 1]) |
251 | { | ||
252 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
253 | "check_result_state: Received state result message with invalid name.\n"); | ||
442 | GNUNET_break (0); | 254 | GNUNET_break (0); |
443 | reschedule_connect (h); | 255 | return GNUNET_SYSERR; |
444 | return; | ||
445 | } | 256 | } |
446 | 257 | return GNUNET_OK; | |
447 | GNUNET_CLIENT_receive (h->client, &message_handler, h, | ||
448 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
449 | } | 258 | } |
450 | 259 | ||
451 | 260 | ||
452 | /** | 261 | static void |
453 | * Transmit next message to service. | 262 | handle_result_state (void *cls, const struct StateResult *sres) |
454 | * | ||
455 | * @param cls The 'struct GNUNET_PSYCSTORE_Handle'. | ||
456 | * @param size Number of bytes available in buf. | ||
457 | * @param buf Where to copy the message. | ||
458 | * @return Number of bytes copied to buf. | ||
459 | */ | ||
460 | static size_t | ||
461 | send_next_message (void *cls, size_t size, void *buf) | ||
462 | { | 263 | { |
463 | struct GNUNET_PSYCSTORE_Handle *h = cls; | 264 | struct GNUNET_PSYCSTORE_Handle *h = cls; |
464 | struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head; | 265 | struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; |
465 | size_t ret; | ||
466 | |||
467 | h->th = NULL; | ||
468 | if (NULL == op) | ||
469 | return 0; | ||
470 | ret = ntohs (op->msg->size); | ||
471 | if (ret > size) | ||
472 | { | ||
473 | reschedule_connect (h); | ||
474 | return 0; | ||
475 | } | ||
476 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
477 | "Sending message of type %d to PSYCstore service. ID: %" PRIu64 "\n", | ||
478 | ntohs (op->msg->type), op->op_id); | ||
479 | GNUNET_memcpy (buf, op->msg, ret); | ||
480 | 266 | ||
481 | GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); | 267 | const char *name = (const char *) &sres[1]; |
268 | uint16_t name_size = ntohs (sres->name_size); | ||
482 | 269 | ||
483 | if (NULL == op->res_cb && NULL == op->data_cb) | 270 | if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (sres->op_id), |
271 | NULL, NULL, (void **) &op)) | ||
484 | { | 272 | { |
485 | GNUNET_free (op); | 273 | GNUNET_assert (NULL != op); |
274 | if (NULL != op->state_cb) | ||
275 | op->state_cb (op->cls, name, (char *) &sres[1] + name_size, | ||
276 | ntohs (sres->header.size) - sizeof (*sres) - name_size); | ||
277 | //GNUNET_OP_remove (h->op, GNUNET_ntohll (sres->op_id)); | ||
278 | //GNUNET_free (op); | ||
486 | } | 279 | } |
487 | else | 280 | else |
488 | { | 281 | { |
489 | GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op); | 282 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
283 | "handle_result_state: No callback registered for operation with ID %" PRIu64 ".\n", | ||
284 | GNUNET_ntohll (sres->op_id)); | ||
490 | } | 285 | } |
286 | h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; | ||
287 | } | ||
491 | 288 | ||
492 | if (NULL != h->transmit_head) | ||
493 | transmit_next (h); | ||
494 | 289 | ||
495 | if (GNUNET_NO == h->in_receive) | 290 | static void |
496 | { | 291 | reconnect (void *cls); |
497 | h->in_receive = GNUNET_YES; | ||
498 | GNUNET_CLIENT_receive (h->client, &message_handler, h, | ||
499 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
500 | } | ||
501 | return ret; | ||
502 | } | ||
503 | 292 | ||
504 | 293 | ||
505 | /** | 294 | /** |
506 | * Schedule transmission of the next message from our queue. | 295 | * Client disconnected from service. |
507 | * | 296 | * |
508 | * @param h PSYCstore handle. | 297 | * Reconnect after backoff period.= |
509 | */ | 298 | */ |
510 | static void | 299 | static void |
511 | transmit_next (struct GNUNET_PSYCSTORE_Handle *h) | 300 | disconnected (void *cls, enum GNUNET_MQ_Error error) |
301 | { | ||
302 | struct GNUNET_PSYCSTORE_Handle *h = cls; | ||
303 | |||
304 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
305 | "Origin client disconnected (%d), re-connecting\n", | ||
306 | (int) error); | ||
307 | if (NULL != h->mq) | ||
308 | { | ||
309 | GNUNET_MQ_destroy (h->mq); | ||
310 | GNUNET_OP_destroy (h->op); | ||
311 | h->mq = NULL; | ||
312 | h->op = NULL; | ||
313 | } | ||
314 | |||
315 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, | ||
316 | &reconnect, h); | ||
317 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); | ||
318 | } | ||
319 | |||
320 | |||
321 | static void | ||
322 | do_connect (struct GNUNET_PSYCSTORE_Handle *h) | ||
512 | { | 323 | { |
513 | if (NULL != h->th || NULL == h->client) | 324 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
514 | return; | 325 | "Connecting to PSYCstore service.\n"); |
515 | 326 | ||
516 | struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head; | 327 | GNUNET_MQ_hd_var_size (result_code, |
517 | if (NULL == op) | 328 | GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE, |
518 | return; | 329 | struct OperationResult); |
519 | 330 | ||
520 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, | 331 | GNUNET_MQ_hd_fixed_size (result_counters, |
521 | ntohs (op->msg->size), | 332 | GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS, |
522 | GNUNET_TIME_UNIT_FOREVER_REL, | 333 | struct CountersResult); |
523 | GNUNET_NO, | 334 | |
524 | &send_next_message, | 335 | GNUNET_MQ_hd_var_size (result_fragment, |
525 | h); | 336 | GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT, |
337 | struct FragmentResult); | ||
338 | |||
339 | GNUNET_MQ_hd_var_size (result_state, | ||
340 | GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE, | ||
341 | struct StateResult); | ||
342 | |||
343 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
344 | make_result_code_handler (h), | ||
345 | make_result_counters_handler (h), | ||
346 | make_result_fragment_handler (h), | ||
347 | make_result_state_handler (h), | ||
348 | GNUNET_MQ_handler_end () | ||
349 | }; | ||
350 | |||
351 | h->op = GNUNET_OP_create (); | ||
352 | GNUNET_assert (NULL == h->mq); | ||
353 | h->mq = GNUNET_CLIENT_connecT (h->cfg, "psycstore", | ||
354 | handlers, disconnected, h); | ||
355 | GNUNET_assert (NULL != h->mq); | ||
526 | } | 356 | } |
527 | 357 | ||
528 | 358 | ||
@@ -534,15 +364,7 @@ transmit_next (struct GNUNET_PSYCSTORE_Handle *h) | |||
534 | static void | 364 | static void |
535 | reconnect (void *cls) | 365 | reconnect (void *cls) |
536 | { | 366 | { |
537 | struct GNUNET_PSYCSTORE_Handle *h = cls; | 367 | do_connect (cls); |
538 | |||
539 | h->reconnect_task = NULL; | ||
540 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
541 | "Connecting to PSYCstore service.\n"); | ||
542 | GNUNET_assert (NULL == h->client); | ||
543 | h->client = GNUNET_CLIENT_connect ("psycstore", h->cfg); | ||
544 | GNUNET_assert (NULL != h->client); | ||
545 | transmit_next (h); | ||
546 | } | 368 | } |
547 | 369 | ||
548 | 370 | ||
@@ -558,8 +380,8 @@ GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
558 | struct GNUNET_PSYCSTORE_Handle *h | 380 | struct GNUNET_PSYCSTORE_Handle *h |
559 | = GNUNET_new (struct GNUNET_PSYCSTORE_Handle); | 381 | = GNUNET_new (struct GNUNET_PSYCSTORE_Handle); |
560 | h->cfg = cfg; | 382 | h->cfg = cfg; |
561 | h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 383 | h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; |
562 | h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h); | 384 | do_connect (h); |
563 | return h; | 385 | return h; |
564 | } | 386 | } |
565 | 387 | ||
@@ -578,53 +400,106 @@ GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h) | |||
578 | GNUNET_SCHEDULER_cancel (h->reconnect_task); | 400 | GNUNET_SCHEDULER_cancel (h->reconnect_task); |
579 | h->reconnect_task = NULL; | 401 | h->reconnect_task = NULL; |
580 | } | 402 | } |
581 | if (NULL != h->th) | 403 | if (NULL != h->mq) |
582 | { | ||
583 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | ||
584 | h->th = NULL; | ||
585 | } | ||
586 | if (NULL != h->client) | ||
587 | { | 404 | { |
588 | GNUNET_CLIENT_disconnect (h->client); | 405 | // FIXME: free data structures for pending operations |
589 | h->client = NULL; | 406 | GNUNET_MQ_destroy (h->mq); |
407 | h->mq = NULL; | ||
590 | } | 408 | } |
591 | GNUNET_free (h); | 409 | GNUNET_free (h); |
592 | } | 410 | } |
593 | 411 | ||
594 | 412 | ||
595 | /** | 413 | /** |
414 | * Message sent notification. | ||
415 | * | ||
416 | * Remove invalidated envelope pointer. | ||
417 | */ | ||
418 | static void | ||
419 | message_sent (void *cls) | ||
420 | { | ||
421 | struct GNUNET_PSYCSTORE_OperationHandle *op = cls; | ||
422 | op->env = NULL; | ||
423 | } | ||
424 | |||
425 | |||
426 | /** | ||
427 | * Create a new operation. | ||
428 | */ | ||
429 | static struct GNUNET_PSYCSTORE_OperationHandle * | ||
430 | op_create (struct GNUNET_PSYCSTORE_Handle *h, | ||
431 | struct GNUNET_OP_Handle *hop, | ||
432 | GNUNET_PSYCSTORE_ResultCallback result_cb, | ||
433 | void *cls) | ||
434 | { | ||
435 | struct GNUNET_PSYCSTORE_OperationHandle * | ||
436 | op = GNUNET_malloc (sizeof (*op)); | ||
437 | op->h = h; | ||
438 | op->op_id = GNUNET_OP_add (hop, | ||
439 | (GNUNET_ResultCallback) result_cb, | ||
440 | cls, op); | ||
441 | return op; | ||
442 | } | ||
443 | |||
444 | |||
445 | /** | ||
446 | * Send a message associated with an operation. | ||
447 | * | ||
448 | * @param h | ||
449 | * PSYCstore handle. | ||
450 | * @param op | ||
451 | * Operation handle. | ||
452 | * @param env | ||
453 | * Message envelope to send. | ||
454 | * @param[out] op_id | ||
455 | * Operation ID to write in network byte order. NULL if not needed. | ||
456 | * | ||
457 | * @return Operation handle. | ||
458 | * | ||
459 | */ | ||
460 | static struct GNUNET_PSYCSTORE_OperationHandle * | ||
461 | op_send (struct GNUNET_PSYCSTORE_Handle *h, | ||
462 | struct GNUNET_PSYCSTORE_OperationHandle *op, | ||
463 | struct GNUNET_MQ_Envelope *env, | ||
464 | uint64_t *op_id) | ||
465 | { | ||
466 | op->env = env; | ||
467 | if (NULL != op_id) | ||
468 | *op_id = GNUNET_htonll (op->op_id); | ||
469 | |||
470 | GNUNET_MQ_notify_sent (env, message_sent, op); | ||
471 | GNUNET_MQ_send (h->mq, env); | ||
472 | return op; | ||
473 | } | ||
474 | |||
475 | |||
476 | /** | ||
596 | * Cancel a PSYCstore operation. Note that the operation MAY still | 477 | * Cancel a PSYCstore operation. Note that the operation MAY still |
597 | * be executed; this merely cancels the continuation; if the request | 478 | * be executed; this merely cancels the continuation; if the request |
598 | * was already transmitted, the service may still choose to complete | 479 | * was already transmitted, the service may still choose to complete |
599 | * the operation. | 480 | * the operation. |
600 | * | 481 | * |
601 | * @param op Operation to cancel. | 482 | * @param op Operation to cancel. |
483 | * | ||
484 | * @return #GNUNET_YES if message was not sent yet and got discarded, | ||
485 | * #GNUNET_NO if it was already sent, and only the callbacks got cancelled. | ||
602 | */ | 486 | */ |
603 | void | 487 | int |
604 | GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) | 488 | GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) |
605 | { | 489 | { |
606 | struct GNUNET_PSYCSTORE_Handle *h = op->h; | 490 | struct GNUNET_PSYCSTORE_Handle *h = op->h; |
491 | int ret = GNUNET_NO; | ||
607 | 492 | ||
608 | if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client)) | 493 | if (NULL != op->env) |
609 | { | 494 | { |
610 | /* request not active, can simply remove */ | 495 | GNUNET_MQ_send_cancel (op->env); |
611 | GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); | 496 | ret = GNUNET_YES; |
612 | GNUNET_free (op); | ||
613 | return; | ||
614 | } | 497 | } |
615 | if (NULL != h->th) | 498 | |
616 | { | 499 | GNUNET_OP_remove (h->op, op->op_id); |
617 | /* request active but not yet with service, can still abort */ | 500 | GNUNET_free (op); |
618 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | 501 | |
619 | h->th = NULL; | 502 | return ret; |
620 | GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); | ||
621 | GNUNET_free (op); | ||
622 | transmit_next (h); | ||
623 | return; | ||
624 | } | ||
625 | /* request active with service, simply ensure continuations are not called */ | ||
626 | op->res_cb = NULL; | ||
627 | op->data_cb = NULL; | ||
628 | } | 503 | } |
629 | 504 | ||
630 | 505 | ||
@@ -649,9 +524,9 @@ GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) | |||
649 | * In case of a part, the last group generation the slave has access to. | 524 | * In case of a part, the last group generation the slave has access to. |
650 | * It has relevance when a larger message have fragments with different | 525 | * It has relevance when a larger message have fragments with different |
651 | * group generations. | 526 | * group generations. |
652 | * @param rcb | 527 | * @param result_cb |
653 | * Callback to call with the result of the storage operation. | 528 | * Callback to call with the result of the storage operation. |
654 | * @param rcb_cls | 529 | * @param cls |
655 | * Closure for the callback. | 530 | * Closure for the callback. |
656 | * | 531 | * |
657 | * @return Operation handle that can be used to cancel the operation. | 532 | * @return Operation handle that can be used to cancel the operation. |
@@ -664,8 +539,8 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, | |||
664 | uint64_t announced_at, | 539 | uint64_t announced_at, |
665 | uint64_t effective_since, | 540 | uint64_t effective_since, |
666 | uint64_t group_generation, | 541 | uint64_t group_generation, |
667 | GNUNET_PSYCSTORE_ResultCallback rcb, | 542 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
668 | void *rcb_cls) | 543 | void *cls) |
669 | { | 544 | { |
670 | GNUNET_assert (NULL != h); | 545 | GNUNET_assert (NULL != h); |
671 | GNUNET_assert (NULL != channel_key); | 546 | GNUNET_assert (NULL != channel_key); |
@@ -676,16 +551,8 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, | |||
676 | : effective_since == 0); | 551 | : effective_since == 0); |
677 | 552 | ||
678 | struct MembershipStoreRequest *req; | 553 | struct MembershipStoreRequest *req; |
679 | struct GNUNET_PSYCSTORE_OperationHandle * | 554 | struct GNUNET_MQ_Envelope * |
680 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); | 555 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE); |
681 | op->h = h; | ||
682 | op->res_cb = rcb; | ||
683 | op->cls = rcb_cls; | ||
684 | |||
685 | req = (struct MembershipStoreRequest *) &op[1]; | ||
686 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
687 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE); | ||
688 | req->header.size = htons (sizeof (*req)); | ||
689 | req->channel_key = *channel_key; | 556 | req->channel_key = *channel_key; |
690 | req->slave_key = *slave_key; | 557 | req->slave_key = *slave_key; |
691 | req->did_join = did_join; | 558 | req->did_join = did_join; |
@@ -693,13 +560,9 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, | |||
693 | req->effective_since = GNUNET_htonll (effective_since); | 560 | req->effective_since = GNUNET_htonll (effective_since); |
694 | req->group_generation = GNUNET_htonll (group_generation); | 561 | req->group_generation = GNUNET_htonll (group_generation); |
695 | 562 | ||
696 | op->op_id = get_next_op_id (h); | 563 | return |
697 | req->op_id = GNUNET_htonll (op->op_id); | 564 | op_send (h, op_create (h, h->op, result_cb, cls), |
698 | 565 | env, &req->op_id); | |
699 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | ||
700 | transmit_next (h); | ||
701 | |||
702 | return op; | ||
703 | } | 566 | } |
704 | 567 | ||
705 | 568 | ||
@@ -723,9 +586,9 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, | |||
723 | * Group generation of the fragment of the message to test. | 586 | * Group generation of the fragment of the message to test. |
724 | * It has relevance if the message consists of multiple fragments with | 587 | * It has relevance if the message consists of multiple fragments with |
725 | * different group generations. | 588 | * different group generations. |
726 | * @param rcb | 589 | * @param result_cb |
727 | * Callback to call with the test result. | 590 | * Callback to call with the test result. |
728 | * @param rcb_cls | 591 | * @param cls |
729 | * Closure for the callback. | 592 | * Closure for the callback. |
730 | * | 593 | * |
731 | * @return Operation handle that can be used to cancel the operation. | 594 | * @return Operation handle that can be used to cancel the operation. |
@@ -736,32 +599,20 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, | |||
736 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | 599 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, |
737 | uint64_t message_id, | 600 | uint64_t message_id, |
738 | uint64_t group_generation, | 601 | uint64_t group_generation, |
739 | GNUNET_PSYCSTORE_ResultCallback rcb, | 602 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
740 | void *rcb_cls) | 603 | void *cls) |
741 | { | 604 | { |
742 | struct MembershipTestRequest *req; | 605 | struct MembershipTestRequest *req; |
743 | struct GNUNET_PSYCSTORE_OperationHandle * | 606 | struct GNUNET_MQ_Envelope * |
744 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); | 607 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST); |
745 | op->h = h; | ||
746 | op->res_cb = rcb; | ||
747 | op->cls = rcb_cls; | ||
748 | |||
749 | req = (struct MembershipTestRequest *) &op[1]; | ||
750 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
751 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST); | ||
752 | req->header.size = htons (sizeof (*req)); | ||
753 | req->channel_key = *channel_key; | 608 | req->channel_key = *channel_key; |
754 | req->slave_key = *slave_key; | 609 | req->slave_key = *slave_key; |
755 | req->message_id = GNUNET_htonll (message_id); | 610 | req->message_id = GNUNET_htonll (message_id); |
756 | req->group_generation = GNUNET_htonll (group_generation); | 611 | req->group_generation = GNUNET_htonll (group_generation); |
757 | 612 | ||
758 | op->op_id = get_next_op_id (h); | 613 | return |
759 | req->op_id = GNUNET_htonll (op->op_id); | 614 | op_send (h, op_create (h, h->op, result_cb, cls), |
760 | 615 | env, &req->op_id); | |
761 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | ||
762 | transmit_next (h); | ||
763 | |||
764 | return op; | ||
765 | } | 616 | } |
766 | 617 | ||
767 | 618 | ||
@@ -773,8 +624,8 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, | |||
773 | * @param message Message to store. | 624 | * @param message Message to store. |
774 | * @param psycstore_flags Flags indicating whether the PSYC message contains | 625 | * @param psycstore_flags Flags indicating whether the PSYC message contains |
775 | * state modifiers. | 626 | * state modifiers. |
776 | * @param rcb Callback to call with the result of the operation. | 627 | * @param result_cb Callback to call with the result of the operation. |
777 | * @param rcb_cls Closure for the callback. | 628 | * @param cls Closure for the callback. |
778 | * | 629 | * |
779 | * @return Handle that can be used to cancel the operation. | 630 | * @return Handle that can be used to cancel the operation. |
780 | */ | 631 | */ |
@@ -783,32 +634,21 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, | |||
783 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | 634 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, |
784 | const struct GNUNET_MULTICAST_MessageHeader *msg, | 635 | const struct GNUNET_MULTICAST_MessageHeader *msg, |
785 | enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags, | 636 | enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags, |
786 | GNUNET_PSYCSTORE_ResultCallback rcb, | 637 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
787 | void *rcb_cls) | 638 | void *cls) |
788 | { | 639 | { |
789 | uint16_t size = ntohs (msg->header.size); | 640 | uint16_t size = ntohs (msg->header.size); |
790 | struct FragmentStoreRequest *req; | 641 | struct FragmentStoreRequest *req; |
791 | struct GNUNET_PSYCSTORE_OperationHandle * | 642 | struct GNUNET_MQ_Envelope * |
792 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size); | 643 | env = GNUNET_MQ_msg_extra (req, size, |
793 | op->h = h; | 644 | GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE); |
794 | op->res_cb = rcb; | ||
795 | op->cls = rcb_cls; | ||
796 | |||
797 | req = (struct FragmentStoreRequest *) &op[1]; | ||
798 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
799 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE); | ||
800 | req->header.size = htons (sizeof (*req) + size); | ||
801 | req->channel_key = *channel_key; | 645 | req->channel_key = *channel_key; |
802 | req->psycstore_flags = htonl (psycstore_flags); | 646 | req->psycstore_flags = htonl (psycstore_flags); |
803 | GNUNET_memcpy (&req[1], msg, size); | 647 | GNUNET_memcpy (&req[1], msg, size); |
804 | 648 | ||
805 | op->op_id = get_next_op_id (h); | 649 | return |
806 | req->op_id = GNUNET_htonll (op->op_id); | 650 | op_send (h, op_create (h, h->op, result_cb, cls), |
807 | 651 | env, &req->op_id); | |
808 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | ||
809 | transmit_next (h); | ||
810 | |||
811 | return op; | ||
812 | } | 652 | } |
813 | 653 | ||
814 | 654 | ||
@@ -833,7 +673,7 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, | |||
833 | * Maximum number of fragments to retrieve. | 673 | * Maximum number of fragments to retrieve. |
834 | * @param fragment_cb | 674 | * @param fragment_cb |
835 | * Callback to call with the retrieved fragments. | 675 | * Callback to call with the retrieved fragments. |
836 | * @param rcb | 676 | * @param result_cb |
837 | * Callback to call with the result of the operation. | 677 | * Callback to call with the result of the operation. |
838 | * @param cls | 678 | * @param cls |
839 | * Closure for the callbacks. | 679 | * Closure for the callbacks. |
@@ -847,21 +687,12 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, | |||
847 | uint64_t first_fragment_id, | 687 | uint64_t first_fragment_id, |
848 | uint64_t last_fragment_id, | 688 | uint64_t last_fragment_id, |
849 | GNUNET_PSYCSTORE_FragmentCallback fragment_cb, | 689 | GNUNET_PSYCSTORE_FragmentCallback fragment_cb, |
850 | GNUNET_PSYCSTORE_ResultCallback rcb, | 690 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
851 | void *cls) | 691 | void *cls) |
852 | { | 692 | { |
853 | struct FragmentGetRequest *req; | 693 | struct FragmentGetRequest *req; |
854 | struct GNUNET_PSYCSTORE_OperationHandle * | 694 | struct GNUNET_MQ_Envelope * |
855 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); | 695 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); |
856 | op->h = h; | ||
857 | op->data_cb = (DataCallback) fragment_cb; | ||
858 | op->res_cb = rcb; | ||
859 | op->cls = cls; | ||
860 | |||
861 | req = (struct FragmentGetRequest *) &op[1]; | ||
862 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
863 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); | ||
864 | req->header.size = htons (sizeof (*req)); | ||
865 | req->channel_key = *channel_key; | 696 | req->channel_key = *channel_key; |
866 | req->first_fragment_id = GNUNET_htonll (first_fragment_id); | 697 | req->first_fragment_id = GNUNET_htonll (first_fragment_id); |
867 | req->last_fragment_id = GNUNET_htonll (last_fragment_id); | 698 | req->last_fragment_id = GNUNET_htonll (last_fragment_id); |
@@ -871,13 +702,11 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, | |||
871 | req->do_membership_test = GNUNET_YES; | 702 | req->do_membership_test = GNUNET_YES; |
872 | } | 703 | } |
873 | 704 | ||
874 | op->op_id = get_next_op_id (h); | 705 | struct GNUNET_PSYCSTORE_OperationHandle * |
875 | req->op_id = GNUNET_htonll (op->op_id); | 706 | op = op_create (h, h->op, result_cb, cls); |
876 | 707 | op->fragment_cb = fragment_cb; | |
877 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | 708 | op->cls = cls; |
878 | transmit_next (h); | 709 | return op_send (h, op, env, &req->op_id); |
879 | |||
880 | return op; | ||
881 | } | 710 | } |
882 | 711 | ||
883 | 712 | ||
@@ -902,7 +731,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, | |||
902 | * Maximum number of fragments to retrieve. | 731 | * Maximum number of fragments to retrieve. |
903 | * @param fragment_cb | 732 | * @param fragment_cb |
904 | * Callback to call with the retrieved fragments. | 733 | * Callback to call with the retrieved fragments. |
905 | * @param rcb | 734 | * @param result_cb |
906 | * Callback to call with the result of the operation. | 735 | * Callback to call with the result of the operation. |
907 | * @param cls | 736 | * @param cls |
908 | * Closure for the callbacks. | 737 | * Closure for the callbacks. |
@@ -915,21 +744,12 @@ GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h, | |||
915 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, | 744 | const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, |
916 | uint64_t fragment_limit, | 745 | uint64_t fragment_limit, |
917 | GNUNET_PSYCSTORE_FragmentCallback fragment_cb, | 746 | GNUNET_PSYCSTORE_FragmentCallback fragment_cb, |
918 | GNUNET_PSYCSTORE_ResultCallback rcb, | 747 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
919 | void *cls) | 748 | void *cls) |
920 | { | 749 | { |
921 | struct FragmentGetRequest *req; | 750 | struct FragmentGetRequest *req; |
922 | struct GNUNET_PSYCSTORE_OperationHandle * | 751 | struct GNUNET_MQ_Envelope * |
923 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); | 752 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); |
924 | op->h = h; | ||
925 | op->data_cb = (DataCallback) fragment_cb; | ||
926 | op->res_cb = rcb; | ||
927 | op->cls = cls; | ||
928 | |||
929 | req = (struct FragmentGetRequest *) &op[1]; | ||
930 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
931 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); | ||
932 | req->header.size = htons (sizeof (*req)); | ||
933 | req->channel_key = *channel_key; | 753 | req->channel_key = *channel_key; |
934 | req->fragment_limit = GNUNET_ntohll (fragment_limit); | 754 | req->fragment_limit = GNUNET_ntohll (fragment_limit); |
935 | if (NULL != slave_key) | 755 | if (NULL != slave_key) |
@@ -938,13 +758,11 @@ GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h, | |||
938 | req->do_membership_test = GNUNET_YES; | 758 | req->do_membership_test = GNUNET_YES; |
939 | } | 759 | } |
940 | 760 | ||
941 | op->op_id = get_next_op_id (h); | 761 | struct GNUNET_PSYCSTORE_OperationHandle * |
942 | req->op_id = GNUNET_htonll (op->op_id); | 762 | op = op_create (h, h->op, result_cb, cls); |
943 | 763 | op->fragment_cb = fragment_cb; | |
944 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | 764 | op->cls = cls; |
945 | transmit_next (h); | 765 | return op_send (h, op, env, &req->op_id); |
946 | |||
947 | return op; | ||
948 | } | 766 | } |
949 | 767 | ||
950 | 768 | ||
@@ -986,7 +804,7 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, | |||
986 | uint64_t fragment_limit, | 804 | uint64_t fragment_limit, |
987 | const char *method_prefix, | 805 | const char *method_prefix, |
988 | GNUNET_PSYCSTORE_FragmentCallback fragment_cb, | 806 | GNUNET_PSYCSTORE_FragmentCallback fragment_cb, |
989 | GNUNET_PSYCSTORE_ResultCallback rcb, | 807 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
990 | void *cls) | 808 | void *cls) |
991 | { | 809 | { |
992 | struct MessageGetRequest *req; | 810 | struct MessageGetRequest *req; |
@@ -996,17 +814,9 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, | |||
996 | GNUNET_SERVER_MAX_MESSAGE_SIZE | 814 | GNUNET_SERVER_MAX_MESSAGE_SIZE |
997 | - sizeof (*req)) + 1; | 815 | - sizeof (*req)) + 1; |
998 | 816 | ||
999 | struct GNUNET_PSYCSTORE_OperationHandle * | 817 | struct GNUNET_MQ_Envelope * |
1000 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); | 818 | env = GNUNET_MQ_msg_extra (req, method_size, |
1001 | op->h = h; | 819 | GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); |
1002 | op->data_cb = (DataCallback) fragment_cb; | ||
1003 | op->res_cb = rcb; | ||
1004 | op->cls = cls; | ||
1005 | |||
1006 | req = (struct MessageGetRequest *) &op[1]; | ||
1007 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
1008 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); | ||
1009 | req->header.size = htons (sizeof (*req) + method_size); | ||
1010 | req->channel_key = *channel_key; | 820 | req->channel_key = *channel_key; |
1011 | req->first_message_id = GNUNET_htonll (first_message_id); | 821 | req->first_message_id = GNUNET_htonll (first_message_id); |
1012 | req->last_message_id = GNUNET_htonll (last_message_id); | 822 | req->last_message_id = GNUNET_htonll (last_message_id); |
@@ -1019,13 +829,11 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, | |||
1019 | GNUNET_memcpy (&req[1], method_prefix, method_size); | 829 | GNUNET_memcpy (&req[1], method_prefix, method_size); |
1020 | ((char *) &req[1])[method_size - 1] = '\0'; | 830 | ((char *) &req[1])[method_size - 1] = '\0'; |
1021 | 831 | ||
1022 | op->op_id = get_next_op_id (h); | 832 | struct GNUNET_PSYCSTORE_OperationHandle * |
1023 | req->op_id = GNUNET_htonll (op->op_id); | 833 | op = op_create (h, h->op, result_cb, cls); |
1024 | 834 | op->fragment_cb = fragment_cb; | |
1025 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | 835 | op->cls = cls; |
1026 | transmit_next (h); | 836 | return op_send (h, op, env, &req->op_id); |
1027 | |||
1028 | return op; | ||
1029 | } | 837 | } |
1030 | 838 | ||
1031 | 839 | ||
@@ -1061,7 +869,7 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, | |||
1061 | uint64_t message_limit, | 869 | uint64_t message_limit, |
1062 | const char *method_prefix, | 870 | const char *method_prefix, |
1063 | GNUNET_PSYCSTORE_FragmentCallback fragment_cb, | 871 | GNUNET_PSYCSTORE_FragmentCallback fragment_cb, |
1064 | GNUNET_PSYCSTORE_ResultCallback rcb, | 872 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
1065 | void *cls) | 873 | void *cls) |
1066 | { | 874 | { |
1067 | struct MessageGetRequest *req; | 875 | struct MessageGetRequest *req; |
@@ -1073,17 +881,9 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, | |||
1073 | - sizeof (*req)) + 1; | 881 | - sizeof (*req)) + 1; |
1074 | GNUNET_assert ('\0' == method_prefix[method_size - 1]); | 882 | GNUNET_assert ('\0' == method_prefix[method_size - 1]); |
1075 | 883 | ||
1076 | struct GNUNET_PSYCSTORE_OperationHandle * | 884 | struct GNUNET_MQ_Envelope * |
1077 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + method_size); | 885 | env = GNUNET_MQ_msg_extra (req, method_size, |
1078 | op->h = h; | 886 | GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); |
1079 | op->data_cb = (DataCallback) fragment_cb; | ||
1080 | op->res_cb = rcb; | ||
1081 | op->cls = cls; | ||
1082 | |||
1083 | req = (struct MessageGetRequest *) &op[1]; | ||
1084 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
1085 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); | ||
1086 | req->header.size = htons (sizeof (*req) + method_size); | ||
1087 | req->channel_key = *channel_key; | 887 | req->channel_key = *channel_key; |
1088 | req->message_limit = GNUNET_ntohll (message_limit); | 888 | req->message_limit = GNUNET_ntohll (message_limit); |
1089 | if (NULL != slave_key) | 889 | if (NULL != slave_key) |
@@ -1091,15 +891,13 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, | |||
1091 | req->slave_key = *slave_key; | 891 | req->slave_key = *slave_key; |
1092 | req->do_membership_test = GNUNET_YES; | 892 | req->do_membership_test = GNUNET_YES; |
1093 | } | 893 | } |
1094 | |||
1095 | op->op_id = get_next_op_id (h); | ||
1096 | req->op_id = GNUNET_htonll (op->op_id); | ||
1097 | GNUNET_memcpy (&req[1], method_prefix, method_size); | 894 | GNUNET_memcpy (&req[1], method_prefix, method_size); |
1098 | 895 | ||
1099 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | 896 | struct GNUNET_PSYCSTORE_OperationHandle * |
1100 | transmit_next (h); | 897 | op = op_create (h, h->op, result_cb, cls); |
1101 | 898 | op->fragment_cb = fragment_cb; | |
1102 | return op; | 899 | op->cls = cls; |
900 | return op_send (h, op, env, &req->op_id); | ||
1103 | } | 901 | } |
1104 | 902 | ||
1105 | 903 | ||
@@ -1135,21 +933,13 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, | |||
1135 | uint64_t message_id, | 933 | uint64_t message_id, |
1136 | uint64_t fragment_offset, | 934 | uint64_t fragment_offset, |
1137 | GNUNET_PSYCSTORE_FragmentCallback fragment_cb, | 935 | GNUNET_PSYCSTORE_FragmentCallback fragment_cb, |
1138 | GNUNET_PSYCSTORE_ResultCallback rcb, | 936 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
1139 | void *cls) | 937 | void *cls) |
1140 | { | 938 | { |
1141 | struct MessageGetFragmentRequest *req; | 939 | struct MessageGetFragmentRequest *req; |
1142 | struct GNUNET_PSYCSTORE_OperationHandle * | 940 | struct GNUNET_MQ_Envelope * |
1143 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); | 941 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT); |
1144 | op->h = h; | ||
1145 | op->data_cb = (DataCallback) fragment_cb; | ||
1146 | op->res_cb = rcb; | ||
1147 | op->cls = cls; | ||
1148 | 942 | ||
1149 | req = (struct MessageGetFragmentRequest *) &op[1]; | ||
1150 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
1151 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT); | ||
1152 | req->header.size = htons (sizeof (*req)); | ||
1153 | req->channel_key = *channel_key; | 943 | req->channel_key = *channel_key; |
1154 | req->message_id = GNUNET_htonll (message_id); | 944 | req->message_id = GNUNET_htonll (message_id); |
1155 | req->fragment_offset = GNUNET_htonll (fragment_offset); | 945 | req->fragment_offset = GNUNET_htonll (fragment_offset); |
@@ -1159,13 +949,11 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, | |||
1159 | req->do_membership_test = GNUNET_YES; | 949 | req->do_membership_test = GNUNET_YES; |
1160 | } | 950 | } |
1161 | 951 | ||
1162 | op->op_id = get_next_op_id (h); | 952 | struct GNUNET_PSYCSTORE_OperationHandle * |
1163 | req->op_id = GNUNET_htonll (op->op_id); | 953 | op = op_create (h, h->op, result_cb, cls); |
1164 | 954 | op->fragment_cb = fragment_cb; | |
1165 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | 955 | op->cls = cls; |
1166 | transmit_next (h); | 956 | return op_send (h, op, env, &req->op_id); |
1167 | |||
1168 | return op; | ||
1169 | } | 957 | } |
1170 | 958 | ||
1171 | 959 | ||
@@ -1189,29 +977,19 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, | |||
1189 | struct GNUNET_PSYCSTORE_OperationHandle * | 977 | struct GNUNET_PSYCSTORE_OperationHandle * |
1190 | GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, | 978 | GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, |
1191 | struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | 979 | struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, |
1192 | GNUNET_PSYCSTORE_CountersCallback ccb, | 980 | GNUNET_PSYCSTORE_CountersCallback counters_cb, |
1193 | void *ccb_cls) | 981 | void *cls) |
1194 | { | 982 | { |
1195 | struct OperationRequest *req; | 983 | struct OperationRequest *req; |
1196 | struct GNUNET_PSYCSTORE_OperationHandle * | 984 | struct GNUNET_MQ_Envelope * |
1197 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); | 985 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET); |
1198 | op->h = h; | ||
1199 | op->data_cb = ccb; | ||
1200 | op->cls = ccb_cls; | ||
1201 | |||
1202 | req = (struct OperationRequest *) &op[1]; | ||
1203 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
1204 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET); | ||
1205 | req->header.size = htons (sizeof (*req)); | ||
1206 | req->channel_key = *channel_key; | 986 | req->channel_key = *channel_key; |
1207 | 987 | ||
1208 | op->op_id = get_next_op_id (h); | 988 | struct GNUNET_PSYCSTORE_OperationHandle * |
1209 | req->op_id = GNUNET_htonll (op->op_id); | 989 | op = op_create (h, h->op, NULL, NULL); |
1210 | 990 | op->counters_cb = counters_cb; | |
1211 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | 991 | op->cls = cls; |
1212 | transmit_next (h); | 992 | return op_send (h, op, env, &req->op_id); |
1213 | |||
1214 | return op; | ||
1215 | } | 993 | } |
1216 | 994 | ||
1217 | 995 | ||
@@ -1229,10 +1007,10 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, | |||
1229 | * ID of the message that contains the @a modifiers. | 1007 | * ID of the message that contains the @a modifiers. |
1230 | * @param state_delta | 1008 | * @param state_delta |
1231 | * Value of the _state_delta PSYC header variable of the message. | 1009 | * Value of the _state_delta PSYC header variable of the message. |
1232 | * @param rcb | 1010 | * @param result_cb |
1233 | * Callback to call with the result of the operation. | 1011 | * Callback to call with the result of the operation. |
1234 | * @param rcb_cls | 1012 | * @param cls |
1235 | * Closure for the @a rcb callback. | 1013 | * Closure for @a result_cb. |
1236 | * | 1014 | * |
1237 | * @return Handle that can be used to cancel the operation. | 1015 | * @return Handle that can be used to cancel the operation. |
1238 | */ | 1016 | */ |
@@ -1241,35 +1019,37 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h, | |||
1241 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | 1019 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, |
1242 | uint64_t message_id, | 1020 | uint64_t message_id, |
1243 | uint64_t state_delta, | 1021 | uint64_t state_delta, |
1244 | GNUNET_PSYCSTORE_ResultCallback rcb, | 1022 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
1245 | void *rcb_cls) | 1023 | void *cls) |
1246 | { | 1024 | { |
1247 | struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; | ||
1248 | struct StateModifyRequest *req; | 1025 | struct StateModifyRequest *req; |
1249 | 1026 | struct GNUNET_MQ_Envelope * | |
1250 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); | 1027 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY); |
1251 | op->h = h; | ||
1252 | op->res_cb = rcb; | ||
1253 | op->cls = rcb_cls; | ||
1254 | |||
1255 | req = (struct StateModifyRequest *) &op[1]; | ||
1256 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
1257 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY); | ||
1258 | req->header.size = htons (sizeof (*req)); | ||
1259 | req->channel_key = *channel_key; | 1028 | req->channel_key = *channel_key; |
1260 | req->message_id = GNUNET_htonll (message_id); | 1029 | req->message_id = GNUNET_htonll (message_id); |
1261 | req->state_delta = GNUNET_htonll (state_delta); | 1030 | req->state_delta = GNUNET_htonll (state_delta); |
1262 | 1031 | ||
1263 | op->op_id = get_next_op_id (h); | 1032 | return op_send (h, op_create (h, h->op, result_cb, cls), |
1264 | req->op_id = GNUNET_htonll (op->op_id); | 1033 | env, &req->op_id); |
1034 | } | ||
1265 | 1035 | ||
1266 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | ||
1267 | transmit_next (h); | ||
1268 | 1036 | ||
1269 | return op; | 1037 | struct StateSyncClosure |
1270 | /* FIXME: only the last operation is returned, | 1038 | { |
1271 | * operation_cancel() should be able to cancel all of them. | 1039 | GNUNET_PSYCSTORE_ResultCallback result_cb; |
1272 | */ | 1040 | void *cls; |
1041 | uint8_t last; | ||
1042 | }; | ||
1043 | |||
1044 | |||
1045 | static void | ||
1046 | state_sync_result (void *cls, int64_t result, | ||
1047 | const char *err_msg, uint16_t err_msg_size) | ||
1048 | { | ||
1049 | struct StateSyncClosure *ssc = cls; | ||
1050 | if (GNUNET_OK != result || ssc->last) | ||
1051 | ssc->result_cb (ssc->cls, result, err_msg, err_msg_size); | ||
1052 | GNUNET_free (ssc); | ||
1273 | } | 1053 | } |
1274 | 1054 | ||
1275 | 1055 | ||
@@ -1288,9 +1068,9 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h, | |||
1288 | * Number of elements in the @a modifiers array. | 1068 | * Number of elements in the @a modifiers array. |
1289 | * @param modifiers | 1069 | * @param modifiers |
1290 | * Full state to store. | 1070 | * Full state to store. |
1291 | * @param rcb | 1071 | * @param result_cb |
1292 | * Callback to call with the result of the operation. | 1072 | * Callback to call with the result of the operation. |
1293 | * @param rcb_cls | 1073 | * @param cls |
1294 | * Closure for the callback. | 1074 | * Closure for the callback. |
1295 | * | 1075 | * |
1296 | * @return Handle that can be used to cancel the operation. | 1076 | * @return Handle that can be used to cancel the operation. |
@@ -1302,8 +1082,8 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, | |||
1302 | uint64_t state_hash_message_id, | 1082 | uint64_t state_hash_message_id, |
1303 | size_t modifier_count, | 1083 | size_t modifier_count, |
1304 | const struct GNUNET_PSYC_Modifier *modifiers, | 1084 | const struct GNUNET_PSYC_Modifier *modifiers, |
1305 | GNUNET_PSYCSTORE_ResultCallback rcb, | 1085 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
1306 | void *rcb_cls) | 1086 | void *cls) |
1307 | { | 1087 | { |
1308 | struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; | 1088 | struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; |
1309 | size_t i; | 1089 | size_t i; |
@@ -1312,14 +1092,11 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, | |||
1312 | struct StateSyncRequest *req; | 1092 | struct StateSyncRequest *req; |
1313 | uint16_t name_size = strlen (modifiers[i].name) + 1; | 1093 | uint16_t name_size = strlen (modifiers[i].name) + 1; |
1314 | 1094 | ||
1315 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size + | 1095 | struct GNUNET_MQ_Envelope * |
1316 | modifiers[i].value_size); | 1096 | env = GNUNET_MQ_msg_extra (req, |
1317 | op->h = h; | 1097 | sizeof (*req) + name_size + modifiers[i].value_size, |
1318 | op->res_cb = rcb; | 1098 | GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); |
1319 | op->cls = rcb_cls; | ||
1320 | 1099 | ||
1321 | req = (struct StateSyncRequest *) &op[1]; | ||
1322 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
1323 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); | 1100 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); |
1324 | req->header.size = htons (sizeof (*req) + name_size | 1101 | req->header.size = htons (sizeof (*req) + name_size |
1325 | + modifiers[i].value_size); | 1102 | + modifiers[i].value_size); |
@@ -1337,12 +1114,16 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, | |||
1337 | GNUNET_memcpy (&req[1], modifiers[i].name, name_size); | 1114 | GNUNET_memcpy (&req[1], modifiers[i].name, name_size); |
1338 | GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); | 1115 | GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); |
1339 | 1116 | ||
1340 | op->op_id = get_next_op_id (h); | 1117 | struct StateSyncClosure *ssc = GNUNET_malloc (sizeof (*ssc)); |
1341 | req->op_id = GNUNET_htonll (op->op_id); | 1118 | ssc->last = (req->flags & STATE_OP_LAST); |
1119 | ssc->result_cb = result_cb; | ||
1120 | ssc->cls = cls; | ||
1342 | 1121 | ||
1343 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | 1122 | op_send (h, op_create (h, h->op, state_sync_result, ssc), |
1344 | transmit_next (h); | 1123 | env, &req->op_id); |
1345 | } | 1124 | } |
1125 | // FIXME: only one operation is returned, | ||
1126 | // add pointers to other operations and make all cancellable. | ||
1346 | return op; | 1127 | return op; |
1347 | } | 1128 | } |
1348 | 1129 | ||
@@ -1356,9 +1137,9 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, | |||
1356 | * Handle for the PSYCstore. | 1137 | * Handle for the PSYCstore. |
1357 | * @param channel_key | 1138 | * @param channel_key |
1358 | * The channel we are interested in. | 1139 | * The channel we are interested in. |
1359 | * @param rcb | 1140 | * @param result_cb |
1360 | * Callback to call with the result of the operation. | 1141 | * Callback to call with the result of the operation. |
1361 | * @param rcb_cls | 1142 | * @param cls |
1362 | * Closure for the callback. | 1143 | * Closure for the callback. |
1363 | * | 1144 | * |
1364 | * @return Handle that can be used to cancel the operation. | 1145 | * @return Handle that can be used to cancel the operation. |
@@ -1367,33 +1148,20 @@ struct GNUNET_PSYCSTORE_OperationHandle * | |||
1367 | GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, | 1148 | GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, |
1368 | const struct GNUNET_CRYPTO_EddsaPublicKey | 1149 | const struct GNUNET_CRYPTO_EddsaPublicKey |
1369 | *channel_key, | 1150 | *channel_key, |
1370 | GNUNET_PSYCSTORE_ResultCallback rcb, | 1151 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
1371 | void *rcb_cls) | 1152 | void *cls) |
1372 | { | 1153 | { |
1373 | struct OperationRequest *req; | 1154 | struct OperationRequest *req; |
1374 | struct GNUNET_PSYCSTORE_OperationHandle * | 1155 | struct GNUNET_MQ_Envelope * |
1375 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); | 1156 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); |
1376 | op->h = h; | ||
1377 | op->res_cb = rcb; | ||
1378 | op->cls = rcb_cls; | ||
1379 | |||
1380 | req = (struct OperationRequest *) &op[1]; | ||
1381 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
1382 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); | ||
1383 | req->header.size = htons (sizeof (*req)); | ||
1384 | req->channel_key = *channel_key; | 1157 | req->channel_key = *channel_key; |
1385 | 1158 | ||
1386 | op->op_id = get_next_op_id (h); | 1159 | return |
1387 | req->op_id = GNUNET_htonll (op->op_id); | 1160 | op_send (h, op_create (h, h->op, result_cb, cls), |
1388 | 1161 | env, &req->op_id); | |
1389 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | ||
1390 | transmit_next (h); | ||
1391 | |||
1392 | return op; | ||
1393 | } | 1162 | } |
1394 | 1163 | ||
1395 | 1164 | ||
1396 | |||
1397 | /** | 1165 | /** |
1398 | * Update signed values of state variables in the state store. | 1166 | * Update signed values of state variables in the state store. |
1399 | * | 1167 | * |
@@ -1405,9 +1173,9 @@ GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, | |||
1405 | * Message ID that contained the state @a hash. | 1173 | * Message ID that contained the state @a hash. |
1406 | * @param hash | 1174 | * @param hash |
1407 | * Hash of the serialized full state. | 1175 | * Hash of the serialized full state. |
1408 | * @param rcb | 1176 | * @param result_cb |
1409 | * Callback to call with the result of the operation. | 1177 | * Callback to call with the result of the operation. |
1410 | * @param rcb_cls | 1178 | * @param cls |
1411 | * Closure for the callback. | 1179 | * Closure for the callback. |
1412 | */ | 1180 | */ |
1413 | struct GNUNET_PSYCSTORE_OperationHandle * | 1181 | struct GNUNET_PSYCSTORE_OperationHandle * |
@@ -1415,30 +1183,18 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, | |||
1415 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | 1183 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, |
1416 | uint64_t message_id, | 1184 | uint64_t message_id, |
1417 | const struct GNUNET_HashCode *hash, | 1185 | const struct GNUNET_HashCode *hash, |
1418 | GNUNET_PSYCSTORE_ResultCallback rcb, | 1186 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
1419 | void *rcb_cls) | 1187 | void *cls) |
1420 | { | 1188 | { |
1421 | struct StateHashUpdateRequest *req; | 1189 | struct StateHashUpdateRequest *req; |
1422 | struct GNUNET_PSYCSTORE_OperationHandle * | 1190 | struct GNUNET_MQ_Envelope * |
1423 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); | 1191 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE); |
1424 | op->h = h; | ||
1425 | op->res_cb = rcb; | ||
1426 | op->cls = rcb_cls; | ||
1427 | |||
1428 | req = (struct StateHashUpdateRequest *) &op[1]; | ||
1429 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
1430 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); | ||
1431 | req->header.size = htons (sizeof (*req)); | ||
1432 | req->channel_key = *channel_key; | 1192 | req->channel_key = *channel_key; |
1433 | req->hash = *hash; | 1193 | req->hash = *hash; |
1434 | 1194 | ||
1435 | op->op_id = get_next_op_id (h); | 1195 | return |
1436 | req->op_id = GNUNET_htonll (op->op_id); | 1196 | op_send (h, op_create (h, h->op, result_cb, cls), |
1437 | 1197 | env, &req->op_id); | |
1438 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | ||
1439 | transmit_next (h); | ||
1440 | |||
1441 | return op; | ||
1442 | } | 1198 | } |
1443 | 1199 | ||
1444 | 1200 | ||
@@ -1451,9 +1207,9 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, | |||
1451 | * The channel we are interested in. | 1207 | * The channel we are interested in. |
1452 | * @param name | 1208 | * @param name |
1453 | * Name of variable to match, the returned variable might be less specific. | 1209 | * Name of variable to match, the returned variable might be less specific. |
1454 | * @param scb | 1210 | * @param state_cb |
1455 | * Callback to return the matching state variable. | 1211 | * Callback to return the matching state variable. |
1456 | * @param rcb | 1212 | * @param result_cb |
1457 | * Callback to call with the result of the operation. | 1213 | * Callback to call with the result of the operation. |
1458 | * @param cls | 1214 | * @param cls |
1459 | * Closure for the callbacks. | 1215 | * Closure for the callbacks. |
@@ -1464,37 +1220,26 @@ struct GNUNET_PSYCSTORE_OperationHandle * | |||
1464 | GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, | 1220 | GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, |
1465 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | 1221 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, |
1466 | const char *name, | 1222 | const char *name, |
1467 | GNUNET_PSYCSTORE_StateCallback scb, | 1223 | GNUNET_PSYCSTORE_StateCallback state_cb, |
1468 | GNUNET_PSYCSTORE_ResultCallback rcb, | 1224 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
1469 | void *cls) | 1225 | void *cls) |
1470 | { | 1226 | { |
1471 | size_t name_size = strlen (name) + 1; | 1227 | size_t name_size = strlen (name) + 1; |
1472 | struct OperationRequest *req; | 1228 | struct OperationRequest *req; |
1473 | struct GNUNET_PSYCSTORE_OperationHandle * | 1229 | struct GNUNET_MQ_Envelope * |
1474 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); | 1230 | env = GNUNET_MQ_msg_extra (req, name_size, |
1475 | op->h = h; | 1231 | GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET); |
1476 | op->data_cb = (DataCallback) scb; | ||
1477 | op->res_cb = rcb; | ||
1478 | op->cls = cls; | ||
1479 | |||
1480 | req = (struct OperationRequest *) &op[1]; | ||
1481 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
1482 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET); | ||
1483 | req->header.size = htons (sizeof (*req) + name_size); | ||
1484 | req->channel_key = *channel_key; | 1232 | req->channel_key = *channel_key; |
1485 | GNUNET_memcpy (&req[1], name, name_size); | 1233 | GNUNET_memcpy (&req[1], name, name_size); |
1486 | 1234 | ||
1487 | op->op_id = get_next_op_id (h); | 1235 | struct GNUNET_PSYCSTORE_OperationHandle * |
1488 | req->op_id = GNUNET_htonll (op->op_id); | 1236 | op = op_create (h, h->op, result_cb, cls); |
1489 | 1237 | op->state_cb = state_cb; | |
1490 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | 1238 | op->cls = cls; |
1491 | transmit_next (h); | 1239 | return op_send (h, op, env, &req->op_id); |
1492 | |||
1493 | return op; | ||
1494 | } | 1240 | } |
1495 | 1241 | ||
1496 | 1242 | ||
1497 | |||
1498 | /** | 1243 | /** |
1499 | * Retrieve all state variables for a channel with the given prefix. | 1244 | * Retrieve all state variables for a channel with the given prefix. |
1500 | * | 1245 | * |
@@ -1504,9 +1249,9 @@ GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, | |||
1504 | * The channel we are interested in. | 1249 | * The channel we are interested in. |
1505 | * @param name_prefix | 1250 | * @param name_prefix |
1506 | * Prefix of state variable names to match. | 1251 | * Prefix of state variable names to match. |
1507 | * @param scb | 1252 | * @param state_cb |
1508 | * Callback to return matching state variables. | 1253 | * Callback to return matching state variables. |
1509 | * @param rcb | 1254 | * @param result_cb |
1510 | * Callback to call with the result of the operation. | 1255 | * Callback to call with the result of the operation. |
1511 | * @param cls | 1256 | * @param cls |
1512 | * Closure for the callbacks. | 1257 | * Closure for the callbacks. |
@@ -1517,33 +1262,23 @@ struct GNUNET_PSYCSTORE_OperationHandle * | |||
1517 | GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h, | 1262 | GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h, |
1518 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, | 1263 | const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, |
1519 | const char *name_prefix, | 1264 | const char *name_prefix, |
1520 | GNUNET_PSYCSTORE_StateCallback scb, | 1265 | GNUNET_PSYCSTORE_StateCallback state_cb, |
1521 | GNUNET_PSYCSTORE_ResultCallback rcb, | 1266 | GNUNET_PSYCSTORE_ResultCallback result_cb, |
1522 | void *cls) | 1267 | void *cls) |
1523 | { | 1268 | { |
1524 | size_t name_size = strlen (name_prefix) + 1; | 1269 | size_t name_size = strlen (name_prefix) + 1; |
1525 | struct OperationRequest *req; | 1270 | struct OperationRequest *req; |
1526 | struct GNUNET_PSYCSTORE_OperationHandle * | 1271 | struct GNUNET_MQ_Envelope * |
1527 | op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); | 1272 | env = GNUNET_MQ_msg_extra (req, name_size, |
1528 | op->h = h; | 1273 | GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX); |
1529 | op->data_cb = (DataCallback) scb; | ||
1530 | op->res_cb = rcb; | ||
1531 | op->cls = cls; | ||
1532 | |||
1533 | req = (struct OperationRequest *) &op[1]; | ||
1534 | op->msg = (struct GNUNET_MessageHeader *) req; | ||
1535 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX); | ||
1536 | req->header.size = htons (sizeof (*req) + name_size); | ||
1537 | req->channel_key = *channel_key; | 1274 | req->channel_key = *channel_key; |
1538 | GNUNET_memcpy (&req[1], name_prefix, name_size); | 1275 | GNUNET_memcpy (&req[1], name_prefix, name_size); |
1539 | 1276 | ||
1540 | op->op_id = get_next_op_id (h); | 1277 | struct GNUNET_PSYCSTORE_OperationHandle * |
1541 | req->op_id = GNUNET_htonll (op->op_id); | 1278 | op = op_create (h, h->op, result_cb, cls); |
1542 | 1279 | op->state_cb = state_cb; | |
1543 | GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); | 1280 | op->cls = cls; |
1544 | transmit_next (h); | 1281 | return op_send (h, op, env, &req->op_id); |
1545 | |||
1546 | return op; | ||
1547 | } | 1282 | } |
1548 | 1283 | ||
1549 | /* end of psycstore_api.c */ | 1284 | /* end of psycstore_api.c */ |