aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2016-08-04 20:10:17 +0000
committerGabor X Toth <*@tg-x.net>2016-08-04 20:10:17 +0000
commitcb073fc0a243ccd29cb0dc7eb5c7e7cb33b6e97f (patch)
treee07785a96079274c0162794b0bda68b8296c5572 /src/psycstore
parent42fcd295dbcd71c399cf854525d86879095e4555 (diff)
downloadgnunet-cb073fc0a243ccd29cb0dc7eb5c7e7cb33b6e97f.tar.gz
gnunet-cb073fc0a243ccd29cb0dc7eb5c7e7cb33b6e97f.zip
psycstore: switch to MQ
Diffstat (limited to 'src/psycstore')
-rw-r--r--src/psycstore/gnunet-service-psycstore.c5
-rw-r--r--src/psycstore/psycstore_api.c1159
2 files changed, 452 insertions, 712 deletions
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c
index 33e894b5e..c9a6f22b8 100644
--- a/src/psycstore/gnunet-service-psycstore.c
+++ b/src/psycstore/gnunet-service-psycstore.c
@@ -520,6 +520,11 @@ recv_state_message_part (void *cls,
520 struct StateModifyClosure *scls = cls; 520 struct StateModifyClosure *scls = cls;
521 uint16_t psize; 521 uint16_t psize;
522 522
523 if (NULL == msg)
524 { // FIXME: error on unknown message
525 return;
526 }
527
523 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 528 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
524 "recv_state_message_part() message_id: %" PRIu64 529 "recv_state_message_part() message_id: %" PRIu64
525 ", fragment_offset: %" PRIu64 ", flags: %u\n", 530 ", fragment_offset: %" PRIu64 ", flags: %u\n",
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c
index 379483e80..94b7ff9f5 100644
--- a/src/psycstore/psycstore_api.c
+++ b/src/psycstore/psycstore_api.c
@@ -37,8 +37,6 @@
37 37
38#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__) 38#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
39 39
40typedef void (*DataCallback) ();
41
42/** 40/**
43 * Handle for an operation with the PSYCstore service. 41 * Handle for an operation with the PSYCstore service.
44 */ 42 */
@@ -51,40 +49,28 @@ struct GNUNET_PSYCSTORE_OperationHandle
51 struct GNUNET_PSYCSTORE_Handle *h; 49 struct GNUNET_PSYCSTORE_Handle *h;
52 50
53 /** 51 /**
54 * We keep operations in a DLL. 52 * Data callbacks.
55 */
56 struct GNUNET_PSYCSTORE_OperationHandle *next;
57
58 /**
59 * We keep operations in a DLL.
60 */ 53 */
61 struct GNUNET_PSYCSTORE_OperationHandle *prev; 54 union {
55 GNUNET_PSYCSTORE_FragmentCallback fragment_cb;
56 GNUNET_PSYCSTORE_CountersCallback counters_cb;
57 GNUNET_PSYCSTORE_StateCallback state_cb;
58 };
62 59
63 /** 60 /**
64 * Continuation to invoke with the result of an operation. 61 * Closure for callbacks.
65 */ 62 */
66 GNUNET_PSYCSTORE_ResultCallback res_cb; 63 void *cls;
67
68 /**
69 * Continuation to invoke with the result of an operation returning data.
70 */
71 DataCallback data_cb;
72 64
73 /** 65 /**
74 * Closure for the callbacks. 66 * Message envelope.
75 */ 67 */
76 void *cls; 68 struct GNUNET_MQ_Envelope *env;
77 69
78 /** 70 /**
79 * Operation ID. 71 * Operation ID.
80 */ 72 */
81 uint64_t op_id; 73 uint64_t op_id;
82
83 /**
84 * Message to send to the PSYCstore service.
85 * Allocated at the end of this struct.
86 */
87 const struct GNUNET_MessageHeader *msg;
88}; 74};
89 75
90 76
@@ -99,34 +85,15 @@ struct GNUNET_PSYCSTORE_Handle
99 const struct GNUNET_CONFIGURATION_Handle *cfg; 85 const struct GNUNET_CONFIGURATION_Handle *cfg;
100 86
101 /** 87 /**
102 * Socket (if available). 88 * Client connection.
103 */
104 struct GNUNET_CLIENT_Connection *client;
105
106 /**
107 * Head of operations to transmit.
108 */ 89 */
109 struct GNUNET_PSYCSTORE_OperationHandle *transmit_head; 90 struct GNUNET_MQ_Handle *mq;
110 91
111 /**
112 * Tail of operations to transmit.
113 */
114 struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail;
115 92
116 /** 93 /**
117 * Head of active operations waiting for response. 94 * Async operations.
118 */ 95 */
119 struct GNUNET_PSYCSTORE_OperationHandle *op_head; 96 struct GNUNET_OP_Handle *op;
120
121 /**
122 * Tail of active operations waiting for response.
123 */
124 struct GNUNET_PSYCSTORE_OperationHandle *op_tail;
125
126 /**
127 * Currently pending transmission request, or NULL for none.
128 */
129 struct GNUNET_CLIENT_TransmitHandle *th;
130 97
131 /** 98 /**
132 * Task doing exponential back-off trying to reconnect. 99 * Task doing exponential back-off trying to reconnect.
@@ -134,395 +101,258 @@ struct GNUNET_PSYCSTORE_Handle
134 struct GNUNET_SCHEDULER_Task *reconnect_task; 101 struct GNUNET_SCHEDULER_Task *reconnect_task;
135 102
136 /** 103 /**
137 * Time for next connect retry. 104 * Delay for next connect retry.
138 */ 105 */
139 struct GNUNET_TIME_Relative reconnect_delay; 106 struct GNUNET_TIME_Relative reconnect_delay;
140 107
141 /**
142 * Last operation ID used.
143 */
144 uint64_t last_op_id;
145 108
109 GNUNET_PSYCSTORE_FragmentCallback *fragment_cb;
110
111 GNUNET_PSYCSTORE_CountersCallback *counters_cb;
112
113 GNUNET_PSYCSTORE_StateCallback *state_cb;
146 /** 114 /**
147 * Are we polling for incoming messages right now? 115 * Closure for callbacks.
148 */ 116 */
149 uint8_t in_receive; 117 void *cb_cls;
150}; 118};
151 119
152 120
153/** 121static int
154 * Get a fresh operation ID to distinguish between PSYCstore requests. 122check_result_code (void *cls, const struct OperationResult *opres)
155 *
156 * @param h Handle to the PSYCstore service.
157 * @return next operation id to use
158 */
159static uint64_t
160get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
161{ 123{
162 return h->last_op_id++; 124 uint16_t size = ntohs (opres->header.size);
163} 125 const char *str = (const char *) &opres[1];
164 126 if ( (sizeof (struct OperationResult) < size) &&
165 127 ('\0' != str[size - sizeof (*opres) - 1]) )
166/**
167 * Find operation by ID.
168 *
169 * @return OperationHandle if found, or NULL otherwise.
170 */
171static struct GNUNET_PSYCSTORE_OperationHandle *
172find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint64_t op_id)
173{
174 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
175 while (NULL != op)
176 { 128 {
177 if (op->op_id == op_id) 129 GNUNET_break (0);
178 return op; 130 return GNUNET_SYSERR;
179 op = op->next;
180 } 131 }
181 return NULL;
182}
183
184 132
185/** 133 return GNUNET_OK;
186 * Try again to connect to the PSYCstore service. 134}
187 *
188 * @param cls handle to the PSYCstore service.
189 */
190static void
191reconnect (void *cls);
192 135
193 136
194/**
195 * Reschedule a connect attempt to the service.
196 *
197 * @param h transport service to reconnect
198 */
199static void 137static void
200reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h) 138handle_result_code (void *cls, const struct OperationResult *opres)
201{ 139{
202 GNUNET_assert (h->reconnect_task == NULL); 140 struct GNUNET_PSYCSTORE_Handle *h = cls;
141 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
142 uint16_t size = ntohs (opres->header.size);
143
144 const char *
145 str = (sizeof (*opres) < size) ? (const char *) &opres[1] : "";
203 146
204 if (NULL != h->th) 147 if (GNUNET_YES == GNUNET_OP_result (h->op, GNUNET_ntohll (opres->op_id),
148 GNUNET_ntohll (opres->result_code) + INT64_MIN,
149 str, size - sizeof (*opres), (void **) &op))
205 { 150 {
206 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); 151 LOG (GNUNET_ERROR_TYPE_DEBUG,
207 h->th = NULL; 152 "handle_result_code: Received result message with operation ID: %" PRIu64 "\n",
153 GNUNET_ntohll (opres->op_id));
154 GNUNET_free (op);
208 } 155 }
209 if (NULL != h->client) 156 else
210 { 157 {
211 GNUNET_CLIENT_disconnect (h->client); 158 LOG (GNUNET_ERROR_TYPE_DEBUG,
212 h->client = NULL; 159 "handle_result_code: No callback registered for operation with ID %" PRIu64 ".\n",
160 GNUNET_ntohll (opres->op_id));
213 } 161 }
214 h->in_receive = GNUNET_NO; 162 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
215 LOG (GNUNET_ERROR_TYPE_DEBUG,
216 "Scheduling task to reconnect to PSYCstore service in %s.\n",
217 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
218 h->reconnect_task =
219 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
220 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
221} 163}
222 164
223 165
224/**
225 * Schedule transmission of the next message from our queue.
226 *
227 * @param h PSYCstore handle
228 */
229static void 166static void
230transmit_next (struct GNUNET_PSYCSTORE_Handle *h); 167handle_result_counters (void *cls, const struct CountersResult *cres)
231
232
233/**
234 * Type of a function to call when we receive a message
235 * from the service.
236 *
237 * @param cls closure
238 * @param msg message received, NULL on timeout or fatal error
239 */
240static void
241message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
242{ 168{
243 struct GNUNET_PSYCSTORE_Handle *h = cls; 169 struct GNUNET_PSYCSTORE_Handle *h = cls;
244 struct GNUNET_PSYCSTORE_OperationHandle *op; 170 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
245 const struct OperationResult *opres;
246 const struct CountersResult *cres;
247 const struct FragmentResult *fres;
248 const struct StateResult *sres;
249 const char *str;
250
251 if (NULL == msg)
252 {
253 reschedule_connect (h);
254 return;
255 }
256 LOG (GNUNET_ERROR_TYPE_DEBUG,
257 "Received message of type %d from PSYCstore service.\n",
258 ntohs (msg->type));
259 uint16_t size = ntohs (msg->size);
260 uint16_t type = ntohs (msg->type);
261 switch (type)
262 {
263 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE:
264 if (size < sizeof (struct OperationResult))
265 {
266 LOG (GNUNET_ERROR_TYPE_ERROR,
267 "Received message of type %d with length %lu bytes. "
268 "Expected >= %lu\n",
269 type, size, sizeof (struct OperationResult));
270 GNUNET_break (0);
271 reschedule_connect (h);
272 return;
273 }
274
275 opres = (const struct OperationResult *) msg;
276 str = (const char *) &opres[1];
277 if ( (size > sizeof (struct OperationResult)) &&
278 ('\0' != str[size - sizeof (struct OperationResult) - 1]) )
279 {
280 GNUNET_break (0);
281 reschedule_connect (h);
282 return;
283 }
284 if (size == sizeof (struct OperationResult))
285 str = "";
286
287 op = find_op_by_id (h, GNUNET_ntohll (opres->op_id));
288 if (NULL == op)
289 {
290 LOG (GNUNET_ERROR_TYPE_DEBUG,
291 "No callback registered for operation with ID %" PRIu64 ".\n",
292 type, GNUNET_ntohll (opres->op_id));
293 }
294 else
295 {
296 LOG (GNUNET_ERROR_TYPE_DEBUG,
297 "Received result message (type %d) with operation ID: %" PRIu64 "\n",
298 type, op->op_id);
299
300 int64_t result_code = GNUNET_ntohll (opres->result_code) + INT64_MIN;
301 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
302 if (NULL != op->res_cb)
303 {
304 const struct StateSyncRequest *ssreq;
305 switch (ntohs (op->msg->type))
306 {
307 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
308 ssreq = (const struct StateSyncRequest *) op->msg;
309 if (!(ssreq->flags & STATE_OP_LAST
310 || GNUNET_OK != result_code))
311 op->res_cb = NULL;
312 break;
313 }
314 }
315 if (NULL != op->res_cb)
316 op->res_cb (op->cls, result_code, str, size - sizeof (*opres));
317 GNUNET_free (op);
318 }
319 break;
320
321 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS:
322 if (size != sizeof (struct CountersResult))
323 {
324 LOG (GNUNET_ERROR_TYPE_ERROR,
325 "Received message of type %d with length %lu bytes. "
326 "Expected %lu\n",
327 type, size, sizeof (struct CountersResult));
328 GNUNET_break (0);
329 reschedule_connect (h);
330 return;
331 }
332
333 cres = (const struct CountersResult *) msg;
334 171
335 op = find_op_by_id (h, GNUNET_ntohll (cres->op_id)); 172 if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (cres->op_id),
336 if (NULL == op) 173 NULL, NULL, (void **) &op))
337 { 174 {
338 LOG (GNUNET_ERROR_TYPE_DEBUG, 175 GNUNET_assert (NULL != op);
339 "No callback registered for operation with ID %" PRIu64 ".\n", 176 if (NULL != op->counters_cb)
340 type, GNUNET_ntohll (cres->op_id));
341 }
342 else
343 { 177 {
344 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); 178 op->counters_cb (op->cls,
345 if (NULL != op->data_cb)
346 ((GNUNET_PSYCSTORE_CountersCallback)
347 op->data_cb) (op->cls,
348 ntohl (cres->result_code), 179 ntohl (cres->result_code),
349 GNUNET_ntohll (cres->max_fragment_id), 180 GNUNET_ntohll (cres->max_fragment_id),
350 GNUNET_ntohll (cres->max_message_id), 181 GNUNET_ntohll (cres->max_message_id),
351 GNUNET_ntohll (cres->max_group_generation), 182 GNUNET_ntohll (cres->max_group_generation),
352 GNUNET_ntohll (cres->max_state_message_id)); 183 GNUNET_ntohll (cres->max_state_message_id));
353 GNUNET_free (op);
354 } 184 }
355 break; 185 GNUNET_OP_remove (h->op, GNUNET_ntohll (cres->op_id));
186 GNUNET_free (op);
187 }
188 else
189 {
190 LOG (GNUNET_ERROR_TYPE_DEBUG,
191 "handle_result_counters: No callback registered for operation with ID %" PRIu64 ".\n",
192 GNUNET_ntohll (cres->op_id));
193 }
194 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
195}
356 196
357 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT:
358 if (size < sizeof (struct FragmentResult))
359 {
360 LOG (GNUNET_ERROR_TYPE_ERROR,
361 "Received message of type %d with length %lu bytes. "
362 "Expected >= %lu\n",
363 type, size, sizeof (struct FragmentResult));
364 GNUNET_break (0);
365 reschedule_connect (h);
366 return;
367 }
368 197
369 fres = (const struct FragmentResult *) msg; 198static int
370 struct GNUNET_MULTICAST_MessageHeader *mmsg = 199check_result_fragment (void *cls, const struct FragmentResult *fres)
371 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1]; 200{
372 if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size)) 201 uint16_t size = ntohs (fres->header.size);
373 { 202 struct GNUNET_MULTICAST_MessageHeader *mmsg =
374 LOG (GNUNET_ERROR_TYPE_ERROR, 203 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
375 "Received message of type %d with length %lu bytes. " 204 if (sizeof (*fres) + sizeof (*mmsg) < size
376 "Expected = %lu\n", 205 && sizeof (*fres) + ntohs (mmsg->header.size) != size)
377 type, size, 206 {
378 sizeof (struct FragmentResult) + ntohs (mmsg->header.size)); 207 LOG (GNUNET_ERROR_TYPE_ERROR,
379 GNUNET_break (0); 208 "check_result_fragment: Received message with invalid length %lu bytes.\n",
380 reschedule_connect (h); 209 size, sizeof (*fres));
381 return; 210 GNUNET_break (0);
382 } 211 return GNUNET_SYSERR;
212 }
213 return GNUNET_OK;
214}
383 215
384 op = find_op_by_id (h, GNUNET_ntohll (fres->op_id));
385 if (NULL == op)
386 {
387 LOG (GNUNET_ERROR_TYPE_DEBUG,
388 "No callback registered for operation with ID %" PRIu64 ".\n",
389 type, GNUNET_ntohll (fres->op_id));
390 }
391 else
392 {
393 if (NULL != op->data_cb)
394 ((GNUNET_PSYCSTORE_FragmentCallback)
395 op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags));
396 }
397 break;
398 216
399 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE: 217static void
400 if (size < sizeof (struct StateResult)) 218handle_result_fragment (void *cls, const struct FragmentResult *fres)
401 { 219{
402 LOG (GNUNET_ERROR_TYPE_ERROR, 220 struct GNUNET_PSYCSTORE_Handle *h = cls;
403 "Received message of type %d with length %lu bytes. " 221 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
404 "Expected >= %lu\n",
405 type, size, sizeof (struct StateResult));
406 GNUNET_break (0);
407 reschedule_connect (h);
408 return;
409 }
410 222
411 sres = (const struct StateResult *) msg; 223 if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (fres->op_id),
412 const char *name = (const char *) &sres[1]; 224 NULL, NULL, (void **) &op))
413 uint16_t name_size = ntohs (sres->name_size); 225 {
226 GNUNET_assert (NULL != op);
227 if (NULL != op->fragment_cb)
228 op->fragment_cb (op->cls,
229 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1],
230 ntohl (fres->psycstore_flags));
231 //GNUNET_OP_remove (h->op, GNUNET_ntohll (fres->op_id));
232 //GNUNET_free (op);
233 }
234 else
235 {
236 LOG (GNUNET_ERROR_TYPE_DEBUG,
237 "handle_result_fragment: No callback registered for operation with ID %" PRIu64 ".\n",
238 GNUNET_ntohll (fres->op_id));
239 }
240 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
241}
414 242
415 if (name_size <= 2 || '\0' != name[name_size - 1])
416 {
417 LOG (GNUNET_ERROR_TYPE_ERROR,
418 "Received state result message (type %d) with invalid name.\n",
419 type);
420 GNUNET_break (0);
421 reschedule_connect (h);
422 return;
423 }
424 243
425 op = find_op_by_id (h, GNUNET_ntohll (sres->op_id)); 244static int
426 if (NULL == op) 245check_result_state (void *cls, const struct StateResult *sres)
427 { 246{
428 LOG (GNUNET_ERROR_TYPE_DEBUG, 247 const char *name = (const char *) &sres[1];
429 "No callback registered for operation with ID %" PRIu64 ".\n", 248 uint16_t name_size = ntohs (sres->name_size);
430 type, GNUNET_ntohll (sres->op_id));
431 }
432 else
433 {
434 if (NULL != op->data_cb)
435 ((GNUNET_PSYCSTORE_StateCallback)
436 op->data_cb) (op->cls, name, (char *) &sres[1] + name_size,
437 ntohs (sres->header.size) - sizeof (*sres) - name_size);
438 }
439 break;
440 249
441 default: 250 if (name_size <= 2 || '\0' != name[name_size - 1])
251 {
252 LOG (GNUNET_ERROR_TYPE_ERROR,
253 "check_result_state: Received state result message with invalid name.\n");
442 GNUNET_break (0); 254 GNUNET_break (0);
443 reschedule_connect (h); 255 return GNUNET_SYSERR;
444 return;
445 } 256 }
446 257 return GNUNET_OK;
447 GNUNET_CLIENT_receive (h->client, &message_handler, h,
448 GNUNET_TIME_UNIT_FOREVER_REL);
449} 258}
450 259
451 260
452/** 261static void
453 * Transmit next message to service. 262handle_result_state (void *cls, const struct StateResult *sres)
454 *
455 * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
456 * @param size Number of bytes available in buf.
457 * @param buf Where to copy the message.
458 * @return Number of bytes copied to buf.
459 */
460static size_t
461send_next_message (void *cls, size_t size, void *buf)
462{ 263{
463 struct GNUNET_PSYCSTORE_Handle *h = cls; 264 struct GNUNET_PSYCSTORE_Handle *h = cls;
464 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head; 265 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
465 size_t ret;
466
467 h->th = NULL;
468 if (NULL == op)
469 return 0;
470 ret = ntohs (op->msg->size);
471 if (ret > size)
472 {
473 reschedule_connect (h);
474 return 0;
475 }
476 LOG (GNUNET_ERROR_TYPE_DEBUG,
477 "Sending message of type %d to PSYCstore service. ID: %" PRIu64 "\n",
478 ntohs (op->msg->type), op->op_id);
479 GNUNET_memcpy (buf, op->msg, ret);
480 266
481 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); 267 const char *name = (const char *) &sres[1];
268 uint16_t name_size = ntohs (sres->name_size);
482 269
483 if (NULL == op->res_cb && NULL == op->data_cb) 270 if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (sres->op_id),
271 NULL, NULL, (void **) &op))
484 { 272 {
485 GNUNET_free (op); 273 GNUNET_assert (NULL != op);
274 if (NULL != op->state_cb)
275 op->state_cb (op->cls, name, (char *) &sres[1] + name_size,
276 ntohs (sres->header.size) - sizeof (*sres) - name_size);
277 //GNUNET_OP_remove (h->op, GNUNET_ntohll (sres->op_id));
278 //GNUNET_free (op);
486 } 279 }
487 else 280 else
488 { 281 {
489 GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op); 282 LOG (GNUNET_ERROR_TYPE_DEBUG,
283 "handle_result_state: No callback registered for operation with ID %" PRIu64 ".\n",
284 GNUNET_ntohll (sres->op_id));
490 } 285 }
286 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
287}
491 288
492 if (NULL != h->transmit_head)
493 transmit_next (h);
494 289
495 if (GNUNET_NO == h->in_receive) 290static void
496 { 291reconnect (void *cls);
497 h->in_receive = GNUNET_YES;
498 GNUNET_CLIENT_receive (h->client, &message_handler, h,
499 GNUNET_TIME_UNIT_FOREVER_REL);
500 }
501 return ret;
502}
503 292
504 293
505/** 294/**
506 * Schedule transmission of the next message from our queue. 295 * Client disconnected from service.
507 * 296 *
508 * @param h PSYCstore handle. 297 * Reconnect after backoff period.=
509 */ 298 */
510static void 299static void
511transmit_next (struct GNUNET_PSYCSTORE_Handle *h) 300disconnected (void *cls, enum GNUNET_MQ_Error error)
301{
302 struct GNUNET_PSYCSTORE_Handle *h = cls;
303
304 LOG (GNUNET_ERROR_TYPE_DEBUG,
305 "Origin client disconnected (%d), re-connecting\n",
306 (int) error);
307 if (NULL != h->mq)
308 {
309 GNUNET_MQ_destroy (h->mq);
310 GNUNET_OP_destroy (h->op);
311 h->mq = NULL;
312 h->op = NULL;
313 }
314
315 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
316 &reconnect, h);
317 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
318}
319
320
321static void
322do_connect (struct GNUNET_PSYCSTORE_Handle *h)
512{ 323{
513 if (NULL != h->th || NULL == h->client) 324 LOG (GNUNET_ERROR_TYPE_DEBUG,
514 return; 325 "Connecting to PSYCstore service.\n");
515 326
516 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head; 327 GNUNET_MQ_hd_var_size (result_code,
517 if (NULL == op) 328 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE,
518 return; 329 struct OperationResult);
519 330
520 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, 331 GNUNET_MQ_hd_fixed_size (result_counters,
521 ntohs (op->msg->size), 332 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS,
522 GNUNET_TIME_UNIT_FOREVER_REL, 333 struct CountersResult);
523 GNUNET_NO, 334
524 &send_next_message, 335 GNUNET_MQ_hd_var_size (result_fragment,
525 h); 336 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT,
337 struct FragmentResult);
338
339 GNUNET_MQ_hd_var_size (result_state,
340 GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE,
341 struct StateResult);
342
343 struct GNUNET_MQ_MessageHandler handlers[] = {
344 make_result_code_handler (h),
345 make_result_counters_handler (h),
346 make_result_fragment_handler (h),
347 make_result_state_handler (h),
348 GNUNET_MQ_handler_end ()
349 };
350
351 h->op = GNUNET_OP_create ();
352 GNUNET_assert (NULL == h->mq);
353 h->mq = GNUNET_CLIENT_connecT (h->cfg, "psycstore",
354 handlers, disconnected, h);
355 GNUNET_assert (NULL != h->mq);
526} 356}
527 357
528 358
@@ -534,15 +364,7 @@ transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
534static void 364static void
535reconnect (void *cls) 365reconnect (void *cls)
536{ 366{
537 struct GNUNET_PSYCSTORE_Handle *h = cls; 367 do_connect (cls);
538
539 h->reconnect_task = NULL;
540 LOG (GNUNET_ERROR_TYPE_DEBUG,
541 "Connecting to PSYCstore service.\n");
542 GNUNET_assert (NULL == h->client);
543 h->client = GNUNET_CLIENT_connect ("psycstore", h->cfg);
544 GNUNET_assert (NULL != h->client);
545 transmit_next (h);
546} 368}
547 369
548 370
@@ -558,8 +380,8 @@ GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
558 struct GNUNET_PSYCSTORE_Handle *h 380 struct GNUNET_PSYCSTORE_Handle *h
559 = GNUNET_new (struct GNUNET_PSYCSTORE_Handle); 381 = GNUNET_new (struct GNUNET_PSYCSTORE_Handle);
560 h->cfg = cfg; 382 h->cfg = cfg;
561 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 383 h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
562 h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, h); 384 do_connect (h);
563 return h; 385 return h;
564} 386}
565 387
@@ -578,53 +400,106 @@ GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
578 GNUNET_SCHEDULER_cancel (h->reconnect_task); 400 GNUNET_SCHEDULER_cancel (h->reconnect_task);
579 h->reconnect_task = NULL; 401 h->reconnect_task = NULL;
580 } 402 }
581 if (NULL != h->th) 403 if (NULL != h->mq)
582 {
583 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
584 h->th = NULL;
585 }
586 if (NULL != h->client)
587 { 404 {
588 GNUNET_CLIENT_disconnect (h->client); 405 // FIXME: free data structures for pending operations
589 h->client = NULL; 406 GNUNET_MQ_destroy (h->mq);
407 h->mq = NULL;
590 } 408 }
591 GNUNET_free (h); 409 GNUNET_free (h);
592} 410}
593 411
594 412
595/** 413/**
414 * Message sent notification.
415 *
416 * Remove invalidated envelope pointer.
417 */
418static void
419message_sent (void *cls)
420{
421 struct GNUNET_PSYCSTORE_OperationHandle *op = cls;
422 op->env = NULL;
423}
424
425
426/**
427 * Create a new operation.
428 */
429static struct GNUNET_PSYCSTORE_OperationHandle *
430op_create (struct GNUNET_PSYCSTORE_Handle *h,
431 struct GNUNET_OP_Handle *hop,
432 GNUNET_PSYCSTORE_ResultCallback result_cb,
433 void *cls)
434{
435 struct GNUNET_PSYCSTORE_OperationHandle *
436 op = GNUNET_malloc (sizeof (*op));
437 op->h = h;
438 op->op_id = GNUNET_OP_add (hop,
439 (GNUNET_ResultCallback) result_cb,
440 cls, op);
441 return op;
442}
443
444
445/**
446 * Send a message associated with an operation.
447 *
448 * @param h
449 * PSYCstore handle.
450 * @param op
451 * Operation handle.
452 * @param env
453 * Message envelope to send.
454 * @param[out] op_id
455 * Operation ID to write in network byte order. NULL if not needed.
456 *
457 * @return Operation handle.
458 *
459 */
460static struct GNUNET_PSYCSTORE_OperationHandle *
461op_send (struct GNUNET_PSYCSTORE_Handle *h,
462 struct GNUNET_PSYCSTORE_OperationHandle *op,
463 struct GNUNET_MQ_Envelope *env,
464 uint64_t *op_id)
465{
466 op->env = env;
467 if (NULL != op_id)
468 *op_id = GNUNET_htonll (op->op_id);
469
470 GNUNET_MQ_notify_sent (env, message_sent, op);
471 GNUNET_MQ_send (h->mq, env);
472 return op;
473}
474
475
476/**
596 * Cancel a PSYCstore operation. Note that the operation MAY still 477 * Cancel a PSYCstore operation. Note that the operation MAY still
597 * be executed; this merely cancels the continuation; if the request 478 * be executed; this merely cancels the continuation; if the request
598 * was already transmitted, the service may still choose to complete 479 * was already transmitted, the service may still choose to complete
599 * the operation. 480 * the operation.
600 * 481 *
601 * @param op Operation to cancel. 482 * @param op Operation to cancel.
483 *
484 * @return #GNUNET_YES if message was not sent yet and got discarded,
485 * #GNUNET_NO if it was already sent, and only the callbacks got cancelled.
602 */ 486 */
603void 487int
604GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) 488GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
605{ 489{
606 struct GNUNET_PSYCSTORE_Handle *h = op->h; 490 struct GNUNET_PSYCSTORE_Handle *h = op->h;
491 int ret = GNUNET_NO;
607 492
608 if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client)) 493 if (NULL != op->env)
609 { 494 {
610 /* request not active, can simply remove */ 495 GNUNET_MQ_send_cancel (op->env);
611 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op); 496 ret = GNUNET_YES;
612 GNUNET_free (op);
613 return;
614 } 497 }
615 if (NULL != h->th) 498
616 { 499 GNUNET_OP_remove (h->op, op->op_id);
617 /* request active but not yet with service, can still abort */ 500 GNUNET_free (op);
618 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); 501
619 h->th = NULL; 502 return ret;
620 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
621 GNUNET_free (op);
622 transmit_next (h);
623 return;
624 }
625 /* request active with service, simply ensure continuations are not called */
626 op->res_cb = NULL;
627 op->data_cb = NULL;
628} 503}
629 504
630 505
@@ -649,9 +524,9 @@ GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
649 * In case of a part, the last group generation the slave has access to. 524 * In case of a part, the last group generation the slave has access to.
650 * It has relevance when a larger message have fragments with different 525 * It has relevance when a larger message have fragments with different
651 * group generations. 526 * group generations.
652 * @param rcb 527 * @param result_cb
653 * Callback to call with the result of the storage operation. 528 * Callback to call with the result of the storage operation.
654 * @param rcb_cls 529 * @param cls
655 * Closure for the callback. 530 * Closure for the callback.
656 * 531 *
657 * @return Operation handle that can be used to cancel the operation. 532 * @return Operation handle that can be used to cancel the operation.
@@ -664,8 +539,8 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
664 uint64_t announced_at, 539 uint64_t announced_at,
665 uint64_t effective_since, 540 uint64_t effective_since,
666 uint64_t group_generation, 541 uint64_t group_generation,
667 GNUNET_PSYCSTORE_ResultCallback rcb, 542 GNUNET_PSYCSTORE_ResultCallback result_cb,
668 void *rcb_cls) 543 void *cls)
669{ 544{
670 GNUNET_assert (NULL != h); 545 GNUNET_assert (NULL != h);
671 GNUNET_assert (NULL != channel_key); 546 GNUNET_assert (NULL != channel_key);
@@ -676,16 +551,8 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
676 : effective_since == 0); 551 : effective_since == 0);
677 552
678 struct MembershipStoreRequest *req; 553 struct MembershipStoreRequest *req;
679 struct GNUNET_PSYCSTORE_OperationHandle * 554 struct GNUNET_MQ_Envelope *
680 op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 555 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
681 op->h = h;
682 op->res_cb = rcb;
683 op->cls = rcb_cls;
684
685 req = (struct MembershipStoreRequest *) &op[1];
686 op->msg = (struct GNUNET_MessageHeader *) req;
687 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
688 req->header.size = htons (sizeof (*req));
689 req->channel_key = *channel_key; 556 req->channel_key = *channel_key;
690 req->slave_key = *slave_key; 557 req->slave_key = *slave_key;
691 req->did_join = did_join; 558 req->did_join = did_join;
@@ -693,13 +560,9 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
693 req->effective_since = GNUNET_htonll (effective_since); 560 req->effective_since = GNUNET_htonll (effective_since);
694 req->group_generation = GNUNET_htonll (group_generation); 561 req->group_generation = GNUNET_htonll (group_generation);
695 562
696 op->op_id = get_next_op_id (h); 563 return
697 req->op_id = GNUNET_htonll (op->op_id); 564 op_send (h, op_create (h, h->op, result_cb, cls),
698 565 env, &req->op_id);
699 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
700 transmit_next (h);
701
702 return op;
703} 566}
704 567
705 568
@@ -723,9 +586,9 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
723 * Group generation of the fragment of the message to test. 586 * Group generation of the fragment of the message to test.
724 * It has relevance if the message consists of multiple fragments with 587 * It has relevance if the message consists of multiple fragments with
725 * different group generations. 588 * different group generations.
726 * @param rcb 589 * @param result_cb
727 * Callback to call with the test result. 590 * Callback to call with the test result.
728 * @param rcb_cls 591 * @param cls
729 * Closure for the callback. 592 * Closure for the callback.
730 * 593 *
731 * @return Operation handle that can be used to cancel the operation. 594 * @return Operation handle that can be used to cancel the operation.
@@ -736,32 +599,20 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
736 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 599 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
737 uint64_t message_id, 600 uint64_t message_id,
738 uint64_t group_generation, 601 uint64_t group_generation,
739 GNUNET_PSYCSTORE_ResultCallback rcb, 602 GNUNET_PSYCSTORE_ResultCallback result_cb,
740 void *rcb_cls) 603 void *cls)
741{ 604{
742 struct MembershipTestRequest *req; 605 struct MembershipTestRequest *req;
743 struct GNUNET_PSYCSTORE_OperationHandle * 606 struct GNUNET_MQ_Envelope *
744 op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 607 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
745 op->h = h;
746 op->res_cb = rcb;
747 op->cls = rcb_cls;
748
749 req = (struct MembershipTestRequest *) &op[1];
750 op->msg = (struct GNUNET_MessageHeader *) req;
751 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
752 req->header.size = htons (sizeof (*req));
753 req->channel_key = *channel_key; 608 req->channel_key = *channel_key;
754 req->slave_key = *slave_key; 609 req->slave_key = *slave_key;
755 req->message_id = GNUNET_htonll (message_id); 610 req->message_id = GNUNET_htonll (message_id);
756 req->group_generation = GNUNET_htonll (group_generation); 611 req->group_generation = GNUNET_htonll (group_generation);
757 612
758 op->op_id = get_next_op_id (h); 613 return
759 req->op_id = GNUNET_htonll (op->op_id); 614 op_send (h, op_create (h, h->op, result_cb, cls),
760 615 env, &req->op_id);
761 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
762 transmit_next (h);
763
764 return op;
765} 616}
766 617
767 618
@@ -773,8 +624,8 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
773 * @param message Message to store. 624 * @param message Message to store.
774 * @param psycstore_flags Flags indicating whether the PSYC message contains 625 * @param psycstore_flags Flags indicating whether the PSYC message contains
775 * state modifiers. 626 * state modifiers.
776 * @param rcb Callback to call with the result of the operation. 627 * @param result_cb Callback to call with the result of the operation.
777 * @param rcb_cls Closure for the callback. 628 * @param cls Closure for the callback.
778 * 629 *
779 * @return Handle that can be used to cancel the operation. 630 * @return Handle that can be used to cancel the operation.
780 */ 631 */
@@ -783,32 +634,21 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
783 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 634 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
784 const struct GNUNET_MULTICAST_MessageHeader *msg, 635 const struct GNUNET_MULTICAST_MessageHeader *msg,
785 enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags, 636 enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags,
786 GNUNET_PSYCSTORE_ResultCallback rcb, 637 GNUNET_PSYCSTORE_ResultCallback result_cb,
787 void *rcb_cls) 638 void *cls)
788{ 639{
789 uint16_t size = ntohs (msg->header.size); 640 uint16_t size = ntohs (msg->header.size);
790 struct FragmentStoreRequest *req; 641 struct FragmentStoreRequest *req;
791 struct GNUNET_PSYCSTORE_OperationHandle * 642 struct GNUNET_MQ_Envelope *
792 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size); 643 env = GNUNET_MQ_msg_extra (req, size,
793 op->h = h; 644 GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
794 op->res_cb = rcb;
795 op->cls = rcb_cls;
796
797 req = (struct FragmentStoreRequest *) &op[1];
798 op->msg = (struct GNUNET_MessageHeader *) req;
799 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
800 req->header.size = htons (sizeof (*req) + size);
801 req->channel_key = *channel_key; 645 req->channel_key = *channel_key;
802 req->psycstore_flags = htonl (psycstore_flags); 646 req->psycstore_flags = htonl (psycstore_flags);
803 GNUNET_memcpy (&req[1], msg, size); 647 GNUNET_memcpy (&req[1], msg, size);
804 648
805 op->op_id = get_next_op_id (h); 649 return
806 req->op_id = GNUNET_htonll (op->op_id); 650 op_send (h, op_create (h, h->op, result_cb, cls),
807 651 env, &req->op_id);
808 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
809 transmit_next (h);
810
811 return op;
812} 652}
813 653
814 654
@@ -833,7 +673,7 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
833 * Maximum number of fragments to retrieve. 673 * Maximum number of fragments to retrieve.
834 * @param fragment_cb 674 * @param fragment_cb
835 * Callback to call with the retrieved fragments. 675 * Callback to call with the retrieved fragments.
836 * @param rcb 676 * @param result_cb
837 * Callback to call with the result of the operation. 677 * Callback to call with the result of the operation.
838 * @param cls 678 * @param cls
839 * Closure for the callbacks. 679 * Closure for the callbacks.
@@ -847,21 +687,12 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
847 uint64_t first_fragment_id, 687 uint64_t first_fragment_id,
848 uint64_t last_fragment_id, 688 uint64_t last_fragment_id,
849 GNUNET_PSYCSTORE_FragmentCallback fragment_cb, 689 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
850 GNUNET_PSYCSTORE_ResultCallback rcb, 690 GNUNET_PSYCSTORE_ResultCallback result_cb,
851 void *cls) 691 void *cls)
852{ 692{
853 struct FragmentGetRequest *req; 693 struct FragmentGetRequest *req;
854 struct GNUNET_PSYCSTORE_OperationHandle * 694 struct GNUNET_MQ_Envelope *
855 op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 695 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
856 op->h = h;
857 op->data_cb = (DataCallback) fragment_cb;
858 op->res_cb = rcb;
859 op->cls = cls;
860
861 req = (struct FragmentGetRequest *) &op[1];
862 op->msg = (struct GNUNET_MessageHeader *) req;
863 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
864 req->header.size = htons (sizeof (*req));
865 req->channel_key = *channel_key; 696 req->channel_key = *channel_key;
866 req->first_fragment_id = GNUNET_htonll (first_fragment_id); 697 req->first_fragment_id = GNUNET_htonll (first_fragment_id);
867 req->last_fragment_id = GNUNET_htonll (last_fragment_id); 698 req->last_fragment_id = GNUNET_htonll (last_fragment_id);
@@ -871,13 +702,11 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
871 req->do_membership_test = GNUNET_YES; 702 req->do_membership_test = GNUNET_YES;
872 } 703 }
873 704
874 op->op_id = get_next_op_id (h); 705 struct GNUNET_PSYCSTORE_OperationHandle *
875 req->op_id = GNUNET_htonll (op->op_id); 706 op = op_create (h, h->op, result_cb, cls);
876 707 op->fragment_cb = fragment_cb;
877 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 708 op->cls = cls;
878 transmit_next (h); 709 return op_send (h, op, env, &req->op_id);
879
880 return op;
881} 710}
882 711
883 712
@@ -902,7 +731,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
902 * Maximum number of fragments to retrieve. 731 * Maximum number of fragments to retrieve.
903 * @param fragment_cb 732 * @param fragment_cb
904 * Callback to call with the retrieved fragments. 733 * Callback to call with the retrieved fragments.
905 * @param rcb 734 * @param result_cb
906 * Callback to call with the result of the operation. 735 * Callback to call with the result of the operation.
907 * @param cls 736 * @param cls
908 * Closure for the callbacks. 737 * Closure for the callbacks.
@@ -915,21 +744,12 @@ GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
915 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 744 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
916 uint64_t fragment_limit, 745 uint64_t fragment_limit,
917 GNUNET_PSYCSTORE_FragmentCallback fragment_cb, 746 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
918 GNUNET_PSYCSTORE_ResultCallback rcb, 747 GNUNET_PSYCSTORE_ResultCallback result_cb,
919 void *cls) 748 void *cls)
920{ 749{
921 struct FragmentGetRequest *req; 750 struct FragmentGetRequest *req;
922 struct GNUNET_PSYCSTORE_OperationHandle * 751 struct GNUNET_MQ_Envelope *
923 op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 752 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
924 op->h = h;
925 op->data_cb = (DataCallback) fragment_cb;
926 op->res_cb = rcb;
927 op->cls = cls;
928
929 req = (struct FragmentGetRequest *) &op[1];
930 op->msg = (struct GNUNET_MessageHeader *) req;
931 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
932 req->header.size = htons (sizeof (*req));
933 req->channel_key = *channel_key; 753 req->channel_key = *channel_key;
934 req->fragment_limit = GNUNET_ntohll (fragment_limit); 754 req->fragment_limit = GNUNET_ntohll (fragment_limit);
935 if (NULL != slave_key) 755 if (NULL != slave_key)
@@ -938,13 +758,11 @@ GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
938 req->do_membership_test = GNUNET_YES; 758 req->do_membership_test = GNUNET_YES;
939 } 759 }
940 760
941 op->op_id = get_next_op_id (h); 761 struct GNUNET_PSYCSTORE_OperationHandle *
942 req->op_id = GNUNET_htonll (op->op_id); 762 op = op_create (h, h->op, result_cb, cls);
943 763 op->fragment_cb = fragment_cb;
944 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 764 op->cls = cls;
945 transmit_next (h); 765 return op_send (h, op, env, &req->op_id);
946
947 return op;
948} 766}
949 767
950 768
@@ -986,7 +804,7 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
986 uint64_t fragment_limit, 804 uint64_t fragment_limit,
987 const char *method_prefix, 805 const char *method_prefix,
988 GNUNET_PSYCSTORE_FragmentCallback fragment_cb, 806 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
989 GNUNET_PSYCSTORE_ResultCallback rcb, 807 GNUNET_PSYCSTORE_ResultCallback result_cb,
990 void *cls) 808 void *cls)
991{ 809{
992 struct MessageGetRequest *req; 810 struct MessageGetRequest *req;
@@ -996,17 +814,9 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
996 GNUNET_SERVER_MAX_MESSAGE_SIZE 814 GNUNET_SERVER_MAX_MESSAGE_SIZE
997 - sizeof (*req)) + 1; 815 - sizeof (*req)) + 1;
998 816
999 struct GNUNET_PSYCSTORE_OperationHandle * 817 struct GNUNET_MQ_Envelope *
1000 op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 818 env = GNUNET_MQ_msg_extra (req, method_size,
1001 op->h = h; 819 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1002 op->data_cb = (DataCallback) fragment_cb;
1003 op->res_cb = rcb;
1004 op->cls = cls;
1005
1006 req = (struct MessageGetRequest *) &op[1];
1007 op->msg = (struct GNUNET_MessageHeader *) req;
1008 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1009 req->header.size = htons (sizeof (*req) + method_size);
1010 req->channel_key = *channel_key; 820 req->channel_key = *channel_key;
1011 req->first_message_id = GNUNET_htonll (first_message_id); 821 req->first_message_id = GNUNET_htonll (first_message_id);
1012 req->last_message_id = GNUNET_htonll (last_message_id); 822 req->last_message_id = GNUNET_htonll (last_message_id);
@@ -1019,13 +829,11 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
1019 GNUNET_memcpy (&req[1], method_prefix, method_size); 829 GNUNET_memcpy (&req[1], method_prefix, method_size);
1020 ((char *) &req[1])[method_size - 1] = '\0'; 830 ((char *) &req[1])[method_size - 1] = '\0';
1021 831
1022 op->op_id = get_next_op_id (h); 832 struct GNUNET_PSYCSTORE_OperationHandle *
1023 req->op_id = GNUNET_htonll (op->op_id); 833 op = op_create (h, h->op, result_cb, cls);
1024 834 op->fragment_cb = fragment_cb;
1025 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 835 op->cls = cls;
1026 transmit_next (h); 836 return op_send (h, op, env, &req->op_id);
1027
1028 return op;
1029} 837}
1030 838
1031 839
@@ -1061,7 +869,7 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
1061 uint64_t message_limit, 869 uint64_t message_limit,
1062 const char *method_prefix, 870 const char *method_prefix,
1063 GNUNET_PSYCSTORE_FragmentCallback fragment_cb, 871 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1064 GNUNET_PSYCSTORE_ResultCallback rcb, 872 GNUNET_PSYCSTORE_ResultCallback result_cb,
1065 void *cls) 873 void *cls)
1066{ 874{
1067 struct MessageGetRequest *req; 875 struct MessageGetRequest *req;
@@ -1073,17 +881,9 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
1073 - sizeof (*req)) + 1; 881 - sizeof (*req)) + 1;
1074 GNUNET_assert ('\0' == method_prefix[method_size - 1]); 882 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1075 883
1076 struct GNUNET_PSYCSTORE_OperationHandle * 884 struct GNUNET_MQ_Envelope *
1077 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + method_size); 885 env = GNUNET_MQ_msg_extra (req, method_size,
1078 op->h = h; 886 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1079 op->data_cb = (DataCallback) fragment_cb;
1080 op->res_cb = rcb;
1081 op->cls = cls;
1082
1083 req = (struct MessageGetRequest *) &op[1];
1084 op->msg = (struct GNUNET_MessageHeader *) req;
1085 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1086 req->header.size = htons (sizeof (*req) + method_size);
1087 req->channel_key = *channel_key; 887 req->channel_key = *channel_key;
1088 req->message_limit = GNUNET_ntohll (message_limit); 888 req->message_limit = GNUNET_ntohll (message_limit);
1089 if (NULL != slave_key) 889 if (NULL != slave_key)
@@ -1091,15 +891,13 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
1091 req->slave_key = *slave_key; 891 req->slave_key = *slave_key;
1092 req->do_membership_test = GNUNET_YES; 892 req->do_membership_test = GNUNET_YES;
1093 } 893 }
1094
1095 op->op_id = get_next_op_id (h);
1096 req->op_id = GNUNET_htonll (op->op_id);
1097 GNUNET_memcpy (&req[1], method_prefix, method_size); 894 GNUNET_memcpy (&req[1], method_prefix, method_size);
1098 895
1099 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 896 struct GNUNET_PSYCSTORE_OperationHandle *
1100 transmit_next (h); 897 op = op_create (h, h->op, result_cb, cls);
1101 898 op->fragment_cb = fragment_cb;
1102 return op; 899 op->cls = cls;
900 return op_send (h, op, env, &req->op_id);
1103} 901}
1104 902
1105 903
@@ -1135,21 +933,13 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
1135 uint64_t message_id, 933 uint64_t message_id,
1136 uint64_t fragment_offset, 934 uint64_t fragment_offset,
1137 GNUNET_PSYCSTORE_FragmentCallback fragment_cb, 935 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1138 GNUNET_PSYCSTORE_ResultCallback rcb, 936 GNUNET_PSYCSTORE_ResultCallback result_cb,
1139 void *cls) 937 void *cls)
1140{ 938{
1141 struct MessageGetFragmentRequest *req; 939 struct MessageGetFragmentRequest *req;
1142 struct GNUNET_PSYCSTORE_OperationHandle * 940 struct GNUNET_MQ_Envelope *
1143 op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 941 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
1144 op->h = h;
1145 op->data_cb = (DataCallback) fragment_cb;
1146 op->res_cb = rcb;
1147 op->cls = cls;
1148 942
1149 req = (struct MessageGetFragmentRequest *) &op[1];
1150 op->msg = (struct GNUNET_MessageHeader *) req;
1151 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
1152 req->header.size = htons (sizeof (*req));
1153 req->channel_key = *channel_key; 943 req->channel_key = *channel_key;
1154 req->message_id = GNUNET_htonll (message_id); 944 req->message_id = GNUNET_htonll (message_id);
1155 req->fragment_offset = GNUNET_htonll (fragment_offset); 945 req->fragment_offset = GNUNET_htonll (fragment_offset);
@@ -1159,13 +949,11 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
1159 req->do_membership_test = GNUNET_YES; 949 req->do_membership_test = GNUNET_YES;
1160 } 950 }
1161 951
1162 op->op_id = get_next_op_id (h); 952 struct GNUNET_PSYCSTORE_OperationHandle *
1163 req->op_id = GNUNET_htonll (op->op_id); 953 op = op_create (h, h->op, result_cb, cls);
1164 954 op->fragment_cb = fragment_cb;
1165 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 955 op->cls = cls;
1166 transmit_next (h); 956 return op_send (h, op, env, &req->op_id);
1167
1168 return op;
1169} 957}
1170 958
1171 959
@@ -1189,29 +977,19 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
1189struct GNUNET_PSYCSTORE_OperationHandle * 977struct GNUNET_PSYCSTORE_OperationHandle *
1190GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, 978GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
1191 struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 979 struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1192 GNUNET_PSYCSTORE_CountersCallback ccb, 980 GNUNET_PSYCSTORE_CountersCallback counters_cb,
1193 void *ccb_cls) 981 void *cls)
1194{ 982{
1195 struct OperationRequest *req; 983 struct OperationRequest *req;
1196 struct GNUNET_PSYCSTORE_OperationHandle * 984 struct GNUNET_MQ_Envelope *
1197 op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 985 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
1198 op->h = h;
1199 op->data_cb = ccb;
1200 op->cls = ccb_cls;
1201
1202 req = (struct OperationRequest *) &op[1];
1203 op->msg = (struct GNUNET_MessageHeader *) req;
1204 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET);
1205 req->header.size = htons (sizeof (*req));
1206 req->channel_key = *channel_key; 986 req->channel_key = *channel_key;
1207 987
1208 op->op_id = get_next_op_id (h); 988 struct GNUNET_PSYCSTORE_OperationHandle *
1209 req->op_id = GNUNET_htonll (op->op_id); 989 op = op_create (h, h->op, NULL, NULL);
1210 990 op->counters_cb = counters_cb;
1211 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 991 op->cls = cls;
1212 transmit_next (h); 992 return op_send (h, op, env, &req->op_id);
1213
1214 return op;
1215} 993}
1216 994
1217 995
@@ -1229,10 +1007,10 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
1229 * ID of the message that contains the @a modifiers. 1007 * ID of the message that contains the @a modifiers.
1230 * @param state_delta 1008 * @param state_delta
1231 * Value of the _state_delta PSYC header variable of the message. 1009 * Value of the _state_delta PSYC header variable of the message.
1232 * @param rcb 1010 * @param result_cb
1233 * Callback to call with the result of the operation. 1011 * Callback to call with the result of the operation.
1234 * @param rcb_cls 1012 * @param cls
1235 * Closure for the @a rcb callback. 1013 * Closure for @a result_cb.
1236 * 1014 *
1237 * @return Handle that can be used to cancel the operation. 1015 * @return Handle that can be used to cancel the operation.
1238 */ 1016 */
@@ -1241,35 +1019,37 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1241 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1019 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1242 uint64_t message_id, 1020 uint64_t message_id,
1243 uint64_t state_delta, 1021 uint64_t state_delta,
1244 GNUNET_PSYCSTORE_ResultCallback rcb, 1022 GNUNET_PSYCSTORE_ResultCallback result_cb,
1245 void *rcb_cls) 1023 void *cls)
1246{ 1024{
1247 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1248 struct StateModifyRequest *req; 1025 struct StateModifyRequest *req;
1249 1026 struct GNUNET_MQ_Envelope *
1250 op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 1027 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1251 op->h = h;
1252 op->res_cb = rcb;
1253 op->cls = rcb_cls;
1254
1255 req = (struct StateModifyRequest *) &op[1];
1256 op->msg = (struct GNUNET_MessageHeader *) req;
1257 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1258 req->header.size = htons (sizeof (*req));
1259 req->channel_key = *channel_key; 1028 req->channel_key = *channel_key;
1260 req->message_id = GNUNET_htonll (message_id); 1029 req->message_id = GNUNET_htonll (message_id);
1261 req->state_delta = GNUNET_htonll (state_delta); 1030 req->state_delta = GNUNET_htonll (state_delta);
1262 1031
1263 op->op_id = get_next_op_id (h); 1032 return op_send (h, op_create (h, h->op, result_cb, cls),
1264 req->op_id = GNUNET_htonll (op->op_id); 1033 env, &req->op_id);
1034}
1265 1035
1266 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1267 transmit_next (h);
1268 1036
1269 return op; 1037struct StateSyncClosure
1270 /* FIXME: only the last operation is returned, 1038{
1271 * operation_cancel() should be able to cancel all of them. 1039 GNUNET_PSYCSTORE_ResultCallback result_cb;
1272 */ 1040 void *cls;
1041 uint8_t last;
1042};
1043
1044
1045static void
1046state_sync_result (void *cls, int64_t result,
1047 const char *err_msg, uint16_t err_msg_size)
1048{
1049 struct StateSyncClosure *ssc = cls;
1050 if (GNUNET_OK != result || ssc->last)
1051 ssc->result_cb (ssc->cls, result, err_msg, err_msg_size);
1052 GNUNET_free (ssc);
1273} 1053}
1274 1054
1275 1055
@@ -1288,9 +1068,9 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1288 * Number of elements in the @a modifiers array. 1068 * Number of elements in the @a modifiers array.
1289 * @param modifiers 1069 * @param modifiers
1290 * Full state to store. 1070 * Full state to store.
1291 * @param rcb 1071 * @param result_cb
1292 * Callback to call with the result of the operation. 1072 * Callback to call with the result of the operation.
1293 * @param rcb_cls 1073 * @param cls
1294 * Closure for the callback. 1074 * Closure for the callback.
1295 * 1075 *
1296 * @return Handle that can be used to cancel the operation. 1076 * @return Handle that can be used to cancel the operation.
@@ -1302,8 +1082,8 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1302 uint64_t state_hash_message_id, 1082 uint64_t state_hash_message_id,
1303 size_t modifier_count, 1083 size_t modifier_count,
1304 const struct GNUNET_PSYC_Modifier *modifiers, 1084 const struct GNUNET_PSYC_Modifier *modifiers,
1305 GNUNET_PSYCSTORE_ResultCallback rcb, 1085 GNUNET_PSYCSTORE_ResultCallback result_cb,
1306 void *rcb_cls) 1086 void *cls)
1307{ 1087{
1308 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; 1088 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1309 size_t i; 1089 size_t i;
@@ -1312,14 +1092,11 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1312 struct StateSyncRequest *req; 1092 struct StateSyncRequest *req;
1313 uint16_t name_size = strlen (modifiers[i].name) + 1; 1093 uint16_t name_size = strlen (modifiers[i].name) + 1;
1314 1094
1315 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size + 1095 struct GNUNET_MQ_Envelope *
1316 modifiers[i].value_size); 1096 env = GNUNET_MQ_msg_extra (req,
1317 op->h = h; 1097 sizeof (*req) + name_size + modifiers[i].value_size,
1318 op->res_cb = rcb; 1098 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1319 op->cls = rcb_cls;
1320 1099
1321 req = (struct StateSyncRequest *) &op[1];
1322 op->msg = (struct GNUNET_MessageHeader *) req;
1323 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); 1100 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1324 req->header.size = htons (sizeof (*req) + name_size 1101 req->header.size = htons (sizeof (*req) + name_size
1325 + modifiers[i].value_size); 1102 + modifiers[i].value_size);
@@ -1337,12 +1114,16 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1337 GNUNET_memcpy (&req[1], modifiers[i].name, name_size); 1114 GNUNET_memcpy (&req[1], modifiers[i].name, name_size);
1338 GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); 1115 GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1339 1116
1340 op->op_id = get_next_op_id (h); 1117 struct StateSyncClosure *ssc = GNUNET_malloc (sizeof (*ssc));
1341 req->op_id = GNUNET_htonll (op->op_id); 1118 ssc->last = (req->flags & STATE_OP_LAST);
1119 ssc->result_cb = result_cb;
1120 ssc->cls = cls;
1342 1121
1343 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1122 op_send (h, op_create (h, h->op, state_sync_result, ssc),
1344 transmit_next (h); 1123 env, &req->op_id);
1345 } 1124 }
1125 // FIXME: only one operation is returned,
1126 // add pointers to other operations and make all cancellable.
1346 return op; 1127 return op;
1347} 1128}
1348 1129
@@ -1356,9 +1137,9 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1356 * Handle for the PSYCstore. 1137 * Handle for the PSYCstore.
1357 * @param channel_key 1138 * @param channel_key
1358 * The channel we are interested in. 1139 * The channel we are interested in.
1359 * @param rcb 1140 * @param result_cb
1360 * Callback to call with the result of the operation. 1141 * Callback to call with the result of the operation.
1361 * @param rcb_cls 1142 * @param cls
1362 * Closure for the callback. 1143 * Closure for the callback.
1363 * 1144 *
1364 * @return Handle that can be used to cancel the operation. 1145 * @return Handle that can be used to cancel the operation.
@@ -1367,33 +1148,20 @@ struct GNUNET_PSYCSTORE_OperationHandle *
1367GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, 1148GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1368 const struct GNUNET_CRYPTO_EddsaPublicKey 1149 const struct GNUNET_CRYPTO_EddsaPublicKey
1369 *channel_key, 1150 *channel_key,
1370 GNUNET_PSYCSTORE_ResultCallback rcb, 1151 GNUNET_PSYCSTORE_ResultCallback result_cb,
1371 void *rcb_cls) 1152 void *cls)
1372{ 1153{
1373 struct OperationRequest *req; 1154 struct OperationRequest *req;
1374 struct GNUNET_PSYCSTORE_OperationHandle * 1155 struct GNUNET_MQ_Envelope *
1375 op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 1156 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1376 op->h = h;
1377 op->res_cb = rcb;
1378 op->cls = rcb_cls;
1379
1380 req = (struct OperationRequest *) &op[1];
1381 op->msg = (struct GNUNET_MessageHeader *) req;
1382 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1383 req->header.size = htons (sizeof (*req));
1384 req->channel_key = *channel_key; 1157 req->channel_key = *channel_key;
1385 1158
1386 op->op_id = get_next_op_id (h); 1159 return
1387 req->op_id = GNUNET_htonll (op->op_id); 1160 op_send (h, op_create (h, h->op, result_cb, cls),
1388 1161 env, &req->op_id);
1389 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1390 transmit_next (h);
1391
1392 return op;
1393} 1162}
1394 1163
1395 1164
1396
1397/** 1165/**
1398 * Update signed values of state variables in the state store. 1166 * Update signed values of state variables in the state store.
1399 * 1167 *
@@ -1405,9 +1173,9 @@ GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1405 * Message ID that contained the state @a hash. 1173 * Message ID that contained the state @a hash.
1406 * @param hash 1174 * @param hash
1407 * Hash of the serialized full state. 1175 * Hash of the serialized full state.
1408 * @param rcb 1176 * @param result_cb
1409 * Callback to call with the result of the operation. 1177 * Callback to call with the result of the operation.
1410 * @param rcb_cls 1178 * @param cls
1411 * Closure for the callback. 1179 * Closure for the callback.
1412 */ 1180 */
1413struct GNUNET_PSYCSTORE_OperationHandle * 1181struct GNUNET_PSYCSTORE_OperationHandle *
@@ -1415,30 +1183,18 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1415 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1183 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1416 uint64_t message_id, 1184 uint64_t message_id,
1417 const struct GNUNET_HashCode *hash, 1185 const struct GNUNET_HashCode *hash,
1418 GNUNET_PSYCSTORE_ResultCallback rcb, 1186 GNUNET_PSYCSTORE_ResultCallback result_cb,
1419 void *rcb_cls) 1187 void *cls)
1420{ 1188{
1421 struct StateHashUpdateRequest *req; 1189 struct StateHashUpdateRequest *req;
1422 struct GNUNET_PSYCSTORE_OperationHandle * 1190 struct GNUNET_MQ_Envelope *
1423 op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 1191 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE);
1424 op->h = h;
1425 op->res_cb = rcb;
1426 op->cls = rcb_cls;
1427
1428 req = (struct StateHashUpdateRequest *) &op[1];
1429 op->msg = (struct GNUNET_MessageHeader *) req;
1430 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1431 req->header.size = htons (sizeof (*req));
1432 req->channel_key = *channel_key; 1192 req->channel_key = *channel_key;
1433 req->hash = *hash; 1193 req->hash = *hash;
1434 1194
1435 op->op_id = get_next_op_id (h); 1195 return
1436 req->op_id = GNUNET_htonll (op->op_id); 1196 op_send (h, op_create (h, h->op, result_cb, cls),
1437 1197 env, &req->op_id);
1438 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1439 transmit_next (h);
1440
1441 return op;
1442} 1198}
1443 1199
1444 1200
@@ -1451,9 +1207,9 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1451 * The channel we are interested in. 1207 * The channel we are interested in.
1452 * @param name 1208 * @param name
1453 * Name of variable to match, the returned variable might be less specific. 1209 * Name of variable to match, the returned variable might be less specific.
1454 * @param scb 1210 * @param state_cb
1455 * Callback to return the matching state variable. 1211 * Callback to return the matching state variable.
1456 * @param rcb 1212 * @param result_cb
1457 * Callback to call with the result of the operation. 1213 * Callback to call with the result of the operation.
1458 * @param cls 1214 * @param cls
1459 * Closure for the callbacks. 1215 * Closure for the callbacks.
@@ -1464,37 +1220,26 @@ struct GNUNET_PSYCSTORE_OperationHandle *
1464GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, 1220GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1465 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1221 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1466 const char *name, 1222 const char *name,
1467 GNUNET_PSYCSTORE_StateCallback scb, 1223 GNUNET_PSYCSTORE_StateCallback state_cb,
1468 GNUNET_PSYCSTORE_ResultCallback rcb, 1224 GNUNET_PSYCSTORE_ResultCallback result_cb,
1469 void *cls) 1225 void *cls)
1470{ 1226{
1471 size_t name_size = strlen (name) + 1; 1227 size_t name_size = strlen (name) + 1;
1472 struct OperationRequest *req; 1228 struct OperationRequest *req;
1473 struct GNUNET_PSYCSTORE_OperationHandle * 1229 struct GNUNET_MQ_Envelope *
1474 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); 1230 env = GNUNET_MQ_msg_extra (req, name_size,
1475 op->h = h; 1231 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1476 op->data_cb = (DataCallback) scb;
1477 op->res_cb = rcb;
1478 op->cls = cls;
1479
1480 req = (struct OperationRequest *) &op[1];
1481 op->msg = (struct GNUNET_MessageHeader *) req;
1482 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1483 req->header.size = htons (sizeof (*req) + name_size);
1484 req->channel_key = *channel_key; 1232 req->channel_key = *channel_key;
1485 GNUNET_memcpy (&req[1], name, name_size); 1233 GNUNET_memcpy (&req[1], name, name_size);
1486 1234
1487 op->op_id = get_next_op_id (h); 1235 struct GNUNET_PSYCSTORE_OperationHandle *
1488 req->op_id = GNUNET_htonll (op->op_id); 1236 op = op_create (h, h->op, result_cb, cls);
1489 1237 op->state_cb = state_cb;
1490 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1238 op->cls = cls;
1491 transmit_next (h); 1239 return op_send (h, op, env, &req->op_id);
1492
1493 return op;
1494} 1240}
1495 1241
1496 1242
1497
1498/** 1243/**
1499 * Retrieve all state variables for a channel with the given prefix. 1244 * Retrieve all state variables for a channel with the given prefix.
1500 * 1245 *
@@ -1504,9 +1249,9 @@ GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1504 * The channel we are interested in. 1249 * The channel we are interested in.
1505 * @param name_prefix 1250 * @param name_prefix
1506 * Prefix of state variable names to match. 1251 * Prefix of state variable names to match.
1507 * @param scb 1252 * @param state_cb
1508 * Callback to return matching state variables. 1253 * Callback to return matching state variables.
1509 * @param rcb 1254 * @param result_cb
1510 * Callback to call with the result of the operation. 1255 * Callback to call with the result of the operation.
1511 * @param cls 1256 * @param cls
1512 * Closure for the callbacks. 1257 * Closure for the callbacks.
@@ -1517,33 +1262,23 @@ struct GNUNET_PSYCSTORE_OperationHandle *
1517GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h, 1262GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1518 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1263 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1519 const char *name_prefix, 1264 const char *name_prefix,
1520 GNUNET_PSYCSTORE_StateCallback scb, 1265 GNUNET_PSYCSTORE_StateCallback state_cb,
1521 GNUNET_PSYCSTORE_ResultCallback rcb, 1266 GNUNET_PSYCSTORE_ResultCallback result_cb,
1522 void *cls) 1267 void *cls)
1523{ 1268{
1524 size_t name_size = strlen (name_prefix) + 1; 1269 size_t name_size = strlen (name_prefix) + 1;
1525 struct OperationRequest *req; 1270 struct OperationRequest *req;
1526 struct GNUNET_PSYCSTORE_OperationHandle * 1271 struct GNUNET_MQ_Envelope *
1527 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); 1272 env = GNUNET_MQ_msg_extra (req, name_size,
1528 op->h = h; 1273 GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1529 op->data_cb = (DataCallback) scb;
1530 op->res_cb = rcb;
1531 op->cls = cls;
1532
1533 req = (struct OperationRequest *) &op[1];
1534 op->msg = (struct GNUNET_MessageHeader *) req;
1535 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1536 req->header.size = htons (sizeof (*req) + name_size);
1537 req->channel_key = *channel_key; 1274 req->channel_key = *channel_key;
1538 GNUNET_memcpy (&req[1], name_prefix, name_size); 1275 GNUNET_memcpy (&req[1], name_prefix, name_size);
1539 1276
1540 op->op_id = get_next_op_id (h); 1277 struct GNUNET_PSYCSTORE_OperationHandle *
1541 req->op_id = GNUNET_htonll (op->op_id); 1278 op = op_create (h, h->op, result_cb, cls);
1542 1279 op->state_cb = state_cb;
1543 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1280 op->cls = cls;
1544 transmit_next (h); 1281 return op_send (h, op, env, &req->op_id);
1545
1546 return op;
1547} 1282}
1548 1283
1549/* end of psycstore_api.c */ 1284/* end of psycstore_api.c */