aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore/psycstore_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2013-09-16 04:59:05 +0000
committerGabor X Toth <*@tg-x.net>2013-09-16 04:59:05 +0000
commitf78e9753a91497f1deb5e20d10868c27ab4a6013 (patch)
treef49653367e00fd4b611ec6ec281144d4568bd111 /src/psycstore/psycstore_api.c
parentfbae7143d25a258b9dfabefa9ca3956e9228cb0e (diff)
downloadgnunet-f78e9753a91497f1deb5e20d10868c27ab4a6013.tar.gz
gnunet-f78e9753a91497f1deb5e20d10868c27ab4a6013.zip
PSYCstore service and API implementation
Diffstat (limited to 'src/psycstore/psycstore_api.c')
-rw-r--r--src/psycstore/psycstore_api.c1145
1 files changed, 1026 insertions, 119 deletions
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c
index 5847fc852..5b9bb7e89 100644
--- a/src/psycstore/psycstore_api.c
+++ b/src/psycstore/psycstore_api.c
@@ -1,22 +1,22 @@
1/* 1/*
2 This file is part of GNUnet. 2 * This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors) 3 * (C) 2013 Christian Grothoff (and other contributing authors)
4 4 *
5 GNUnet is free software; you can redistribute it and/or modify 5 * GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public Liceidentity as published 6 * it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your 7 * by the Free Software Foundation; either version 3, or (at your
8 option) any later version. 8 * option) any later version.
9 9 *
10 GNUnet is distributed in the hope that it will be useful, but 10 * GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public Liceidentity for more details. 13 * General Public License for more details.
14 14 *
15 You should have received a copy of the GNU General Public Liceidentity 15 * You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the 16 * along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 * Boston, MA 02111-1307, USA.
19*/ 19 */
20 20
21/** 21/**
22 * @file psycstore/psycstore_api.c 22 * @file psycstore/psycstore_api.c
@@ -30,10 +30,12 @@
30#include "gnunet_constants.h" 30#include "gnunet_constants.h"
31#include "gnunet_protocols.h" 31#include "gnunet_protocols.h"
32#include "gnunet_psycstore_service.h" 32#include "gnunet_psycstore_service.h"
33#include "gnunet_multicast_service.h"
33#include "psycstore.h" 34#include "psycstore.h"
34 35
35#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__) 36#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
36 37
38typedef void (*DataCallback) ();
37 39
38/** 40/**
39 * Handle for an operation with the PSYCstore service. 41 * Handle for an operation with the PSYCstore service.
@@ -45,7 +47,7 @@ struct GNUNET_PSYCSTORE_OperationHandle
45 * Main PSYCstore handle. 47 * Main PSYCstore handle.
46 */ 48 */
47 struct GNUNET_PSYCSTORE_Handle *h; 49 struct GNUNET_PSYCSTORE_Handle *h;
48 50
49 /** 51 /**
50 * We keep operations in a DLL. 52 * We keep operations in a DLL.
51 */ 53 */
@@ -57,31 +59,30 @@ struct GNUNET_PSYCSTORE_OperationHandle
57 struct GNUNET_PSYCSTORE_OperationHandle *prev; 59 struct GNUNET_PSYCSTORE_OperationHandle *prev;
58 60
59 /** 61 /**
60 * Message to send to the PSYCstore service.
61 * Allocated at the end of this struct.
62 */
63 const struct GNUNET_MessageHeader *msg;
64
65 /**
66 * Continuation to invoke with the result of an operation. 62 * Continuation to invoke with the result of an operation.
67 */ 63 */
68 GNUNET_PSYCSTORE_ResultCallback res_cb; 64 GNUNET_PSYCSTORE_ResultCallback res_cb;
69 65
70 /** 66 /**
71 * Continuation to invoke with the result of an operation returning a fragment. 67 * Continuation to invoke with the result of an operation returning data.
72 */ 68 */
73 GNUNET_PSYCSTORE_FragmentCallback frag_cb; 69 DataCallback data_cb;
74 70
75 /** 71 /**
76 * Continuation to invoke with the result of an operation returning a state variable. 72 * Closure for the callbacks.
77 */ 73 */
78 GNUNET_PSYCSTORE_StateCallback state_cb; 74 void *cls;
79 75
80 /** 76 /**
81 * Closure for the callbacks. 77 * Operation ID.
82 */ 78 */
83 void *cls; 79 uint32_t op_id;
84 80
81 /**
82 * Message to send to the PSYCstore service.
83 * Allocated at the end of this struct.
84 */
85 const struct GNUNET_MessageHeader *msg;
85}; 86};
86 87
87 88
@@ -101,13 +102,23 @@ struct GNUNET_PSYCSTORE_Handle
101 struct GNUNET_CLIENT_Connection *client; 102 struct GNUNET_CLIENT_Connection *client;
102 103
103 /** 104 /**
104 * Head of active operations. 105 * Head of operations to transmit.
105 */ 106 */
107 struct GNUNET_PSYCSTORE_OperationHandle *transmit_head;
108
109 /**
110 * Tail of operations to transmit.
111 */
112 struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail;
113
114 /**
115 * Head of active operations waiting for response.
116 */
106 struct GNUNET_PSYCSTORE_OperationHandle *op_head; 117 struct GNUNET_PSYCSTORE_OperationHandle *op_head;
107 118
108 /** 119 /**
109 * Tail of active operations. 120 * Tail of active operations waiting for response.
110 */ 121 */
111 struct GNUNET_PSYCSTORE_OperationHandle *op_tail; 122 struct GNUNET_PSYCSTORE_OperationHandle *op_tail;
112 123
113 /** 124 /**
@@ -130,10 +141,47 @@ struct GNUNET_PSYCSTORE_Handle
130 */ 141 */
131 int in_receive; 142 int in_receive;
132 143
144 /**
145 * The last operation id used for a PSYCstore operation.
146 */
147 uint32_t last_op_id_used;
148
133}; 149};
134 150
135 151
136/** 152/**
153 * Get a fresh operation ID to distinguish between PSYCstore requests.
154 *
155 * @param h Handle to the PSYCstore service.
156 * @return next operation id to use
157 */
158static uint32_t
159get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
160{
161 return h->last_op_id_used++;
162}
163
164
165/**
166 * Find operation by ID.
167 *
168 * @return OperationHandle if found, or NULL otherwise.
169 */
170static struct GNUNET_PSYCSTORE_OperationHandle *
171find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint32_t op_id)
172{
173 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
174 while (NULL != op)
175 {
176 if (op->op_id == op_id)
177 return op;
178 op = op->next;
179 }
180 return NULL;
181}
182
183
184/**
137 * Try again to connect to the PSYCstore service. 185 * Try again to connect to the PSYCstore service.
138 * 186 *
139 * @param cls handle to the PSYCstore service. 187 * @param cls handle to the PSYCstore service.
@@ -175,6 +223,15 @@ reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h)
175 223
176 224
177/** 225/**
226 * Schedule transmission of the next message from our queue.
227 *
228 * @param h PSYCstore handle
229 */
230static void
231transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
232
233
234/**
178 * Type of a function to call when we receive a message 235 * Type of a function to call when we receive a message
179 * from the service. 236 * from the service.
180 * 237 *
@@ -182,12 +239,16 @@ reschedule_connect (struct GNUNET_PSYCSTORE_Handle *h)
182 * @param msg message received, NULL on timeout or fatal error 239 * @param msg message received, NULL on timeout or fatal error
183 */ 240 */
184static void 241static void
185message_handler (void *cls, 242message_handler (void *cls,
186 const struct GNUNET_MessageHeader *msg) 243 const struct GNUNET_MessageHeader *msg)
187{ 244{
188 struct GNUNET_PSYCSTORE_Handle *h = cls; 245 struct GNUNET_PSYCSTORE_Handle *h = cls;
189 struct GNUNET_PSYCSTORE_OperationHandle *op; 246 struct GNUNET_PSYCSTORE_OperationHandle *op;
190 const struct ResultCodeMessage *rcm; 247 const struct OperationResult *opres;
248 const struct MasterCountersResult *mcres;
249 const struct SlaveCountersResult *scres;
250 const struct FragmentResult *fres;
251 const struct StateResult *sres;
191 const char *str; 252 const char *str;
192 uint16_t size; 253 uint16_t size;
193 254
@@ -203,68 +264,240 @@ message_handler (void *cls,
203 switch (ntohs (msg->type)) 264 switch (ntohs (msg->type))
204 { 265 {
205 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE: 266 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE:
206 if (size < sizeof (struct ResultCodeMessage)) 267 if (size < sizeof (struct OperationResult))
207 { 268 {
269 LOG (GNUNET_ERROR_TYPE_ERROR,
270 "Received message of type %d with length %lu bytes. "
271 "Expected >= %lu\n",
272 ntohs (msg->type), size, sizeof (struct OperationResult));
208 GNUNET_break (0); 273 GNUNET_break (0);
209 reschedule_connect (h); 274 reschedule_connect (h);
210 return; 275 return;
211 } 276 }
212 rcm = (const struct ResultCodeMessage *) msg; 277
213 str = (const char *) &rcm[1]; 278 opres = (const struct OperationResult *) msg;
214 if ( (size > sizeof (struct ResultCodeMessage)) && 279 str = (const char *) &opres[1];
215 ('\0' != str[size - sizeof (struct ResultCodeMessage) - 1]) ) 280 if ( (size > sizeof (struct OperationResult)) &&
281 ('\0' != str[size - sizeof (struct OperationResult) - 1]) )
216 { 282 {
217 GNUNET_break (0); 283 GNUNET_break (0);
218 reschedule_connect (h); 284 reschedule_connect (h);
219 return; 285 return;
220 } 286 }
221 if (size == sizeof (struct ResultCodeMessage)) 287 if (size == sizeof (struct OperationResult))
222 str = NULL; 288 str = NULL;
223 289
224 op = h->op_head; 290 op = find_op_by_id (h, ntohl (opres->op_id));
225 GNUNET_CONTAINER_DLL_remove (h->op_head, 291 if (NULL == op)
226 h->op_tail, 292 {
227 op); 293 LOG (GNUNET_ERROR_TYPE_ERROR,
228 GNUNET_CLIENT_receive (h->client, &message_handler, h, 294 "Received result of an unkown operation ID: %ld\n",
229 GNUNET_TIME_UNIT_FOREVER_REL); 295 ntohl (opres->op_id));
230 if (NULL != op->res_cb) 296 }
231 op->res_cb (op->cls, rcm->result_code , str); 297 else
232 GNUNET_free (op); 298 {
299 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
300 if (NULL != op->res_cb)
301 {
302 const struct StateModifyRequest *smreq;
303 const struct StateSyncRequest *ssreq;
304 switch (ntohs (op->msg->type))
305 {
306 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY:
307 smreq = (const struct StateModifyRequest *) op->msg;
308 if (!(smreq->flags & STATE_OP_LAST
309 || GNUNET_OK != ntohl (opres->result_code)))
310 op->res_cb = NULL;
311 break;
312 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
313 ssreq = (const struct StateSyncRequest *) op->msg;
314 if (!(ssreq->flags & STATE_OP_LAST
315 || GNUNET_OK != ntohl (opres->result_code)))
316 op->res_cb = NULL;
317 break;
318 }
319 }
320 if (NULL != op->res_cb)
321 op->res_cb (op->cls, ntohl (opres->result_code), str);
322 GNUNET_free (op);
323 }
324 break;
325
326 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_MASTER:
327 if (size != sizeof (struct MasterCountersResult))
328 {
329 LOG (GNUNET_ERROR_TYPE_ERROR,
330 "Received message of type %d with length %lu bytes. "
331 "Expected %lu\n",
332 ntohs (msg->type), size, sizeof (struct MasterCountersResult));
333 GNUNET_break (0);
334 reschedule_connect (h);
335 return;
336 }
337
338 mcres = (const struct MasterCountersResult *) msg;
339
340 op = find_op_by_id (h, ntohl (mcres->op_id));
341 if (NULL == op)
342 {
343 LOG (GNUNET_ERROR_TYPE_ERROR,
344 "Received result of an unkown operation ID: %ld\n",
345 ntohl (mcres->op_id));
346 }
347 else
348 {
349 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
350 if (NULL != op->data_cb)
351 ((GNUNET_PSYCSTORE_MasterCountersCallback)
352 op->data_cb) (op->cls,
353 GNUNET_ntohll (mcres->fragment_id),
354 GNUNET_ntohll (mcres->message_id),
355 GNUNET_ntohll (mcres->group_generation));
356 GNUNET_free (op);
357 }
233 break; 358 break;
359
360 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_SLAVE:
361 if (size != sizeof (struct SlaveCountersResult))
362 {
363 LOG (GNUNET_ERROR_TYPE_ERROR,
364 "Received message of type %d with length %lu bytes. "
365 "Expected %lu\n",
366 ntohs (msg->type), size, sizeof (struct SlaveCountersResult));
367 GNUNET_break (0);
368 reschedule_connect (h);
369 return;
370 }
371
372 scres = (const struct SlaveCountersResult *) msg;
373
374 op = find_op_by_id (h, ntohl (scres->op_id));
375 if (NULL == op)
376 {
377 LOG (GNUNET_ERROR_TYPE_ERROR,
378 "Received result of an unkown operation ID: %ld\n",
379 ntohl (scres->op_id));
380 }
381 else
382 {
383 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
384 if (NULL != op->data_cb)
385 ((GNUNET_PSYCSTORE_SlaveCountersCallback)
386 op->data_cb) (op->cls, GNUNET_ntohll (scres->max_known_msg_id));
387 GNUNET_free (op);
388 }
389 break;
390
391 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT:
392 if (size < sizeof (struct FragmentResult))
393 {
394 LOG (GNUNET_ERROR_TYPE_ERROR,
395 "Received message of type %d with length %lu bytes. "
396 "Expected >= %lu\n",
397 ntohs (msg->type), size, sizeof (struct FragmentResult));
398 GNUNET_break (0);
399 reschedule_connect (h);
400 return;
401 }
402
403 fres = (const struct FragmentResult *) msg;
404 struct GNUNET_MULTICAST_MessageHeader *mmsg =
405 (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
406 if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size))
407 {
408 LOG (GNUNET_ERROR_TYPE_ERROR,
409 "Received message of type %d with length %lu bytes. "
410 "Expected = %lu\n",
411 ntohs (msg->type), size,
412 sizeof (struct FragmentResult) + ntohs (mmsg->header.size));
413 GNUNET_break (0);
414 reschedule_connect (h);
415 return;
416 }
417
418 op = find_op_by_id (h, ntohl (fres->op_id));
419 if (NULL == op)
420 {
421 LOG (GNUNET_ERROR_TYPE_ERROR,
422 "Received result of an unkown operation ID: %ld\n",
423 ntohl (fres->op_id));
424 }
425 else
426 {
427 if (NULL != op->data_cb)
428 ((GNUNET_PSYCSTORE_FragmentCallback)
429 op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags));
430 }
431 break;
432
433 case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE:
434 if (size < sizeof (struct StateResult))
435 {
436 LOG (GNUNET_ERROR_TYPE_ERROR,
437 "Received message of type %d with length %lu bytes. "
438 "Expected >= %lu\n",
439 ntohs (msg->type), size, sizeof (struct StateResult));
440 GNUNET_break (0);
441 reschedule_connect (h);
442 return;
443 }
444
445 sres = (const struct StateResult *) msg;
446 const char *name = (const char *) &sres[1];
447 uint16_t name_size = ntohs (sres->name_size);
448
449 if (name_size <= 2 || '\0' != name[name_size - 1])
450 {
451 LOG (GNUNET_ERROR_TYPE_ERROR,
452 "Received state result message (type %d) with invalid name.\n",
453 ntohs (msg->type), name_size, name);
454 GNUNET_break (0);
455 reschedule_connect (h);
456 return;
457 }
458
459 op = find_op_by_id (h, ntohl (sres->op_id));
460 if (NULL == op)
461 {
462 LOG (GNUNET_ERROR_TYPE_ERROR,
463 "Received result of an unkown operation ID: %ld\n",
464 ntohl (sres->op_id));
465 }
466 else
467 {
468 if (NULL != op->data_cb)
469 ((GNUNET_PSYCSTORE_StateCallback)
470 op->data_cb) (op->cls, name, (void *) &sres[1] + name_size,
471 ntohs (sres->header.size) - sizeof (*sres) - name_size);
472 }
473 break;
474
234 default: 475 default:
235 GNUNET_break (0); 476 GNUNET_break (0);
236 reschedule_connect (h); 477 reschedule_connect (h);
237 return; 478 return;
238 } 479 }
239}
240 480
241 481 GNUNET_CLIENT_receive (h->client, &message_handler, h,
242/** 482 GNUNET_TIME_UNIT_FOREVER_REL);
243 * Schedule transmission of the next message from our queue. 483}
244 *
245 * @param h PSYCstore handle
246 */
247static void
248transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
249 484
250 485
251/** 486/**
252 * Transmit next message to service. 487 * Transmit next message to service.
253 * 488 *
254 * @param cls the 'struct GNUNET_PSYCSTORE_Handle'. 489 * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
255 * @param size number of bytes available in buf 490 * @param size Number of bytes available in buf.
256 * @param buf where to copy the message 491 * @param buf Where to copy the message.
257 * @return number of bytes copied to buf 492 * @return Number of bytes copied to buf.
258 */ 493 */
259static size_t 494static size_t
260send_next_message (void *cls, 495send_next_message (void *cls, size_t size, void *buf)
261 size_t size,
262 void *buf)
263{ 496{
264 struct GNUNET_PSYCSTORE_Handle *h = cls; 497 struct GNUNET_PSYCSTORE_Handle *h = cls;
265 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; 498 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
266 size_t ret; 499 size_t ret;
267 500
268 h->th = NULL; 501 h->th = NULL;
269 if (NULL == op) 502 if (NULL == op)
270 return 0; 503 return 0;
@@ -273,26 +506,30 @@ send_next_message (void *cls,
273 { 506 {
274 reschedule_connect (h); 507 reschedule_connect (h);
275 return 0; 508 return 0;
276 } 509 }
277 LOG (GNUNET_ERROR_TYPE_DEBUG, 510 LOG (GNUNET_ERROR_TYPE_DEBUG,
278 "Sending message of type %d to PSYCstore service\n", 511 "Sending message of type %d to PSYCstore service\n",
279 ntohs (op->msg->type)); 512 ntohs (op->msg->type));
280 memcpy (buf, op->msg, ret); 513 memcpy (buf, op->msg, ret);
281 if ( (NULL == op->res_cb) && 514
282 (NULL == op->frag_cb) && 515 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
283 (NULL == op->state_cb)) 516
517 if (NULL == op->res_cb && NULL == op->data_cb)
284 { 518 {
285 GNUNET_CONTAINER_DLL_remove (h->op_head,
286 h->op_tail,
287 op);
288 GNUNET_free (op); 519 GNUNET_free (op);
289 transmit_next (h);
290 } 520 }
521 else
522 {
523 GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op);
524 }
525
526 if (NULL != h->transmit_head)
527 transmit_next (h);
528
291 if (GNUNET_NO == h->in_receive) 529 if (GNUNET_NO == h->in_receive)
292 { 530 {
293 h->in_receive = GNUNET_YES; 531 h->in_receive = GNUNET_YES;
294 GNUNET_CLIENT_receive (h->client, 532 GNUNET_CLIENT_receive (h->client, &message_handler, h,
295 &message_handler, h,
296 GNUNET_TIME_UNIT_FOREVER_REL); 533 GNUNET_TIME_UNIT_FOREVER_REL);
297 } 534 }
298 return ret; 535 return ret;
@@ -302,18 +539,18 @@ send_next_message (void *cls,
302/** 539/**
303 * Schedule transmission of the next message from our queue. 540 * Schedule transmission of the next message from our queue.
304 * 541 *
305 * @param h PSYCstore handle 542 * @param h PSYCstore handle.
306 */ 543 */
307static void 544static void
308transmit_next (struct GNUNET_PSYCSTORE_Handle *h) 545transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
309{ 546{
310 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; 547 if (NULL != h->th || NULL == h->client)
548 return;
311 549
312 GNUNET_assert (NULL == h->th); 550 struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
313 if (NULL == op) 551 if (NULL == op)
314 return; 552 return;
315 if (NULL == h->client) 553
316 return;
317 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, 554 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
318 ntohs (op->msg->size), 555 ntohs (op->msg->size),
319 GNUNET_TIME_UNIT_FOREVER_REL, 556 GNUNET_TIME_UNIT_FOREVER_REL,
@@ -326,8 +563,8 @@ transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
326/** 563/**
327 * Try again to connect to the PSYCstore service. 564 * Try again to connect to the PSYCstore service.
328 * 565 *
329 * @param cls the handle to the PSYCstore service 566 * @param cls Handle to the PSYCstore service.
330 * @param tc scheduler context 567 * @param tc Scheduler context.
331 */ 568 */
332static void 569static void
333reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 570reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -347,8 +584,8 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
347/** 584/**
348 * Connect to the PSYCstore service. 585 * Connect to the PSYCstore service.
349 * 586 *
350 * @param cfg the configuration to use 587 * @param cfg The configuration to use
351 * @return handle to use 588 * @return Handle to use
352 */ 589 */
353struct GNUNET_PSYCSTORE_Handle * 590struct GNUNET_PSYCSTORE_Handle *
354GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) 591GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
@@ -366,7 +603,7 @@ GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
366/** 603/**
367 * Disconnect from PSYCstore service 604 * Disconnect from PSYCstore service
368 * 605 *
369 * @param h handle to destroy 606 * @param h Handle to destroy
370 */ 607 */
371void 608void
372GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h) 609GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
@@ -405,13 +642,10 @@ GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
405{ 642{
406 struct GNUNET_PSYCSTORE_Handle *h = op->h; 643 struct GNUNET_PSYCSTORE_Handle *h = op->h;
407 644
408 if ( (h->op_head != op) || 645 if (h->transmit_head != NULL && (h->transmit_head != op || NULL == h->client))
409 (NULL == h->client) )
410 { 646 {
411 /* request not active, can simply remove */ 647 /* request not active, can simply remove */
412 GNUNET_CONTAINER_DLL_remove (h->op_head, 648 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
413 h->op_tail,
414 op);
415 GNUNET_free (op); 649 GNUNET_free (op);
416 return; 650 return;
417 } 651 }
@@ -420,47 +654,720 @@ GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op)
420 /* request active but not yet with service, can still abort */ 654 /* request active but not yet with service, can still abort */
421 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); 655 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
422 h->th = NULL; 656 h->th = NULL;
423 GNUNET_CONTAINER_DLL_remove (h->op_head, 657 GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
424 h->op_tail,
425 op);
426 GNUNET_free (op); 658 GNUNET_free (op);
427 transmit_next (h); 659 transmit_next (h);
428 return; 660 return;
429 } 661 }
430 /* request active with service, simply ensure continuations are not called */ 662 /* request active with service, simply ensure continuations are not called */
431 op->res_cb = NULL; 663 op->res_cb = NULL;
432 op->frag_cb = NULL; 664 op->data_cb = NULL;
433 op->state_cb = NULL; 665}
666
667
668/**
669 * Store join/leave events for a PSYC channel in order to be able to answer
670 * membership test queries later.
671 *
672 * @param h Handle for the PSYCstore.
673 * @param channel_key The channel where the event happened.
674 * @param slave_key Public key of joining/leaving slave.
675 * @param did_join #GNUNET_YES on join, #GNUNET_NO on part.
676 * @param announced_at ID of the message that announced the membership change.
677 * @param effective_since Message ID this membership change is in effect since.
678 * For joins it is <= announced_at, for parts it is always 0.
679 * @param group_generation In case of a part, the last group generation the
680 * slave has access to. It has relevance when a larger message have
681 * fragments with different group generations.
682 * @param rcb Callback to call with the result of the storage operation.
683 * @param rcb_cls Closure for the callback.
684 *
685 * @return Operation handle that can be used to cancel the operation.
686 */
687struct GNUNET_PSYCSTORE_OperationHandle *
688GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
689 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
690 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
691 int did_join,
692 uint64_t announced_at,
693 uint64_t effective_since,
694 uint64_t group_generation,
695 GNUNET_PSYCSTORE_ResultCallback rcb,
696 void *rcb_cls)
697{
698 GNUNET_assert (NULL != h);
699 GNUNET_assert (NULL != channel_key);
700 GNUNET_assert (NULL != slave_key);
701 GNUNET_assert (did_join
702 ? effective_since <= announced_at
703 : effective_since == 0);
704
705 struct MembershipStoreRequest *req;
706 struct GNUNET_PSYCSTORE_OperationHandle *op
707 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
708 op->h = h;
709 op->res_cb = rcb;
710 op->cls = rcb_cls;
711
712 req = (struct MembershipStoreRequest *) &op[1];
713 op->msg = (struct GNUNET_MessageHeader *) req;
714 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
715 req->header.size = htons (sizeof (*req));
716 req->channel_key = *channel_key;
717 req->slave_key = *slave_key;
718 req->did_join = htonl (did_join);
719 req->announced_at = GNUNET_htonll (announced_at);
720 req->effective_since = GNUNET_htonll (effective_since);
721 req->group_generation = GNUNET_htonll (group_generation);
722
723 op->op_id = get_next_op_id (h);
724 req->op_id = htonl (op->op_id);
725
726 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
727 transmit_next (h);
728
729 return op;
434} 730}
435 731
436 732
733/**
734 * Test if a member was admitted to the channel at the given message ID.
735 *
736 * This is useful when relaying and replaying messages to check if a particular
737 * slave has access to the message fragment with a given group generation. It
738 * is also used when handling join requests to determine whether the slave is
739 * currently admitted to the channel.
740 *
741 * @param h Handle for the PSYCstore.
742 * @param channel_key The channel we are interested in.
743 * @param slave_key Public key of slave whose membership to check.
744 * @param message_id Message ID for which to do the membership test.
745 * @param group_generation Group generation of the fragment of the message to
746 * test. It has relevance if the message consists of multiple fragments
747 * with different group generations.
748 * @param rcb Callback to call with the test result.
749 * @param rcb_cls Closure for the callback.
750 *
751 * @return Operation handle that can be used to cancel the operation.
752 */
753struct GNUNET_PSYCSTORE_OperationHandle *
754GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
755 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
756 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
757 uint64_t message_id,
758 uint64_t group_generation,
759 GNUNET_PSYCSTORE_ResultCallback rcb,
760 void *rcb_cls)
761{
762 struct MembershipTestRequest *req;
763 struct GNUNET_PSYCSTORE_OperationHandle *op
764 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
765 op->h = h;
766 op->res_cb = rcb;
767 op->cls = rcb_cls;
768
769 req = (struct MembershipTestRequest *) &op[1];
770 op->msg = (struct GNUNET_MessageHeader *) req;
771 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
772 req->header.size = htons (sizeof (*req));
773 req->channel_key = *channel_key;
774 req->slave_key = *slave_key;
775 req->message_id = GNUNET_htonll (message_id);
776 req->group_generation = GNUNET_htonll (group_generation);
777
778 op->op_id = get_next_op_id (h);
779 req->op_id = htonl (op->op_id);
780
781 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
782 transmit_next (h);
783
784 return op;
785}
786
787
788/**
789 * Store a message fragment sent to a channel.
790 *
791 * @param h Handle for the PSYCstore.
792 * @param channel_key The channel the message belongs to.
793 * @param message Message to store.
794 * @param psycstore_flags Flags indicating whether the PSYC message contains
795 * state modifiers.
796 * @param rcb Callback to call with the result of the operation.
797 * @param rcb_cls Closure for the callback.
798 *
799 * @return Handle that can be used to cancel the operation.
800 */
801struct GNUNET_PSYCSTORE_OperationHandle *
802GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
803 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
804 const struct GNUNET_MULTICAST_MessageHeader *message,
805 uint32_t psycstore_flags,
806 GNUNET_PSYCSTORE_ResultCallback rcb,
807 void *rcb_cls)
808{
809 uint16_t size = ntohs (message->header.size);
810 struct FragmentStoreRequest *req;
811 struct GNUNET_PSYCSTORE_OperationHandle *op
812 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
813 op->h = h;
814 op->res_cb = rcb;
815 op->cls = rcb_cls;
816
817 req = (struct FragmentStoreRequest *) &op[1];
818 op->msg = (struct GNUNET_MessageHeader *) req;
819 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
820 req->header.size = htons (sizeof (*req) + size);
821 req->channel_key = *channel_key;
822 req->psycstore_flags = htonl (psycstore_flags);
823 memcpy (&req[1], message, size);
824
825 op->op_id = get_next_op_id (h);
826 req->op_id = htonl (op->op_id);
827
828 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
829 transmit_next (h);
830
831 return op;
832}
833
834
835/**
836 * Retrieve a message fragment by fragment ID.
837 *
838 * @param h Handle for the PSYCstore.
839 * @param channel_key The channel we are interested in.
840 * @param fragment_id Fragment ID to check. Use 0 to get the latest message fragment.
841 * @param fcb Callback to call with the retrieved fragments.
842 * @param rcb Callback to call with the result of the operation.
843 * @param cls Closure for the callbacks.
844 *
845 * @return Handle that can be used to cancel the operation.
846 */
847struct GNUNET_PSYCSTORE_OperationHandle *
848GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
849 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
850 uint64_t fragment_id,
851 GNUNET_PSYCSTORE_FragmentCallback fcb,
852 GNUNET_PSYCSTORE_ResultCallback rcb,
853 void *cls)
854{
855 struct FragmentGetRequest *req;
856 struct GNUNET_PSYCSTORE_OperationHandle *op
857 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
858 op->h = h;
859 op->data_cb = (DataCallback) fcb;
860 op->res_cb = rcb;
861 op->cls = cls;
862
863 req = (struct FragmentGetRequest *) &op[1];
864 op->msg = (struct GNUNET_MessageHeader *) req;
865 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
866 req->header.size = htons (sizeof (*req));
867 req->channel_key = *channel_key;
868 req->fragment_id = GNUNET_htonll (fragment_id);
869
870 op->op_id = get_next_op_id (h);
871 req->op_id = htonl (op->op_id);
872
873 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
874 transmit_next (h);
875
876 return op;
877}
878
879
880/**
881 * Retrieve all fragments of a message.
882 *
883 * @param h Handle for the PSYCstore.
884 * @param channel_key The channel we are interested in.
885 * @param message_id Message ID to check. Use 0 to get the latest message.
886 * @param fcb Callback to call with the retrieved fragments.
887 * @param rcb Callback to call with the result of the operation.
888 * @param cls Closure for the callbacks.
889 *
890 * @return Handle that can be used to cancel the operation.
891 */
437struct GNUNET_PSYCSTORE_OperationHandle * 892struct GNUNET_PSYCSTORE_OperationHandle *
438GNUNET_PSYCSTORE_membership_store ( 893GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
439 struct GNUNET_PSYCSTORE_Handle *h, 894 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
440 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, 895 uint64_t message_id,
441 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, 896 GNUNET_PSYCSTORE_FragmentCallback fcb,
442 int did_join, 897 GNUNET_PSYCSTORE_ResultCallback rcb,
443 uint64_t announced_at, 898 void *cls)
444 uint64_t effective_since,
445 uint64_t group_generation,
446 GNUNET_PSYCSTORE_ResultCallback rcb,
447 void *rcb_cls)
448{ 899{
449 900 struct MessageGetRequest *req;
901 struct GNUNET_PSYCSTORE_OperationHandle *op
902 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
903 op->h = h;
904 op->data_cb = (DataCallback) fcb;
905 op->res_cb = rcb;
906 op->cls = cls;
907
908 req = (struct MessageGetRequest *) &op[1];
909 op->msg = (struct GNUNET_MessageHeader *) req;
910 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
911 req->header.size = htons (sizeof (*req));
912 req->channel_key = *channel_key;
913 req->message_id = GNUNET_htonll (message_id);
914
915 op->op_id = get_next_op_id (h);
916 req->op_id = htonl (op->op_id);
917
918 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
919 transmit_next (h);
920
921 return op;
922}
923
924
925/**
926 * Retrieve a fragment of message specified by its message ID and fragment
927 * offset.
928 *
929 * @param h Handle for the PSYCstore.
930 * @param channel_key The channel we are interested in.
931 * @param message_id Message ID to check. Use 0 to get the latest message.
932 * @param fragment_offset Offset of the fragment to retrieve.
933 * @param fcb Callback to call with the retrieved fragments.
934 * @param rcb Callback to call with the result of the operation.
935 * @param cls Closure for the callbacks.
936 *
937 * @return Handle that can be used to cancel the operation.
938 */
939struct GNUNET_PSYCSTORE_OperationHandle *
940GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
941 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
942 uint64_t message_id,
943 uint64_t fragment_offset,
944 GNUNET_PSYCSTORE_FragmentCallback fcb,
945 GNUNET_PSYCSTORE_ResultCallback rcb,
946 void *cls)
947{
948 struct MessageGetFragmentRequest *req;
949 struct GNUNET_PSYCSTORE_OperationHandle *op
950 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
951 op->h = h;
952 op->data_cb = (DataCallback) fcb;
953 op->res_cb = rcb;
954 op->cls = cls;
955
956 req = (struct MessageGetFragmentRequest *) &op[1];
957 op->msg = (struct GNUNET_MessageHeader *) req;
958 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
959 req->header.size = htons (sizeof (*req));
960 req->channel_key = *channel_key;
961 req->message_id = GNUNET_htonll (message_id);
962 req->fragment_offset = GNUNET_htonll (fragment_offset);
963
964 op->op_id = get_next_op_id (h);
965 req->op_id = htonl (op->op_id);
966
967 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
968 transmit_next (h);
969
970 return op;
971}
972
973
974/**
975 * Retrieve latest values of counters for a channel master.
976 *
977 * The current value of counters are needed when a channel master is restarted,
978 * so that it can continue incrementing the counters from their last value.
979 *
980 * @param h Handle for the PSYCstore.
981 * @param channel_key Public key that identifies the channel.
982 * @param mccb Callback to call with the result.
983 * @param mccb_cls Closure for the callback.
984 *
985 * @return Handle that can be used to cancel the operation.
986 */
987struct GNUNET_PSYCSTORE_OperationHandle *
988GNUNET_PSYCSTORE_counters_get_master (struct GNUNET_PSYCSTORE_Handle *h,
989 struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
990 GNUNET_PSYCSTORE_MasterCountersCallback mccb,
991 void *mccb_cls)
992{
993 struct OperationRequest *req;
994 struct GNUNET_PSYCSTORE_OperationHandle *op
995 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
996 op->h = h;
997 op->data_cb = mccb;
998 op->cls = mccb_cls;
999
1000 req = (struct OperationRequest *) &op[1];
1001 op->msg = (struct GNUNET_MessageHeader *) req;
1002 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_MASTER);
1003 req->header.size = htons (sizeof (*req));
1004 req->channel_key = *channel_key;
1005
1006 op->op_id = get_next_op_id (h);
1007 req->op_id = htonl (op->op_id);
1008
1009 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1010 transmit_next (h);
1011
1012 return op;
450} 1013}
451 1014
452 1015
1016
1017/**
1018 * Retrieve latest values of counters for a channel slave.
1019 *
1020 * The current value of counters are needed when a channel slave rejoins
1021 * and starts the state synchronization process.
1022 *
1023 * @param h Handle for the PSYCstore.
1024 * @param channel_key Public key that identifies the channel.
1025 * @param sccb Callback to call with the result.
1026 * @param sccb_cls Closure for the callback.
1027 *
1028 * @return Handle that can be used to cancel the operation.
1029 */
453struct GNUNET_PSYCSTORE_OperationHandle * 1030struct GNUNET_PSYCSTORE_OperationHandle *
454GNUNET_PSYCSTORE_membership_test ( 1031GNUNET_PSYCSTORE_counters_get_slave (struct GNUNET_PSYCSTORE_Handle *h,
455 struct GNUNET_PSYCSTORE_Handle *h, 1032 struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
456 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, 1033 GNUNET_PSYCSTORE_SlaveCountersCallback sccb,
457 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, 1034 void *sccb_cls)
458 uint64_t message_id,
459 uint64_t group_generation,
460 GNUNET_PSYCSTORE_ResultCallback rcb,
461 void *rcb_cls)
462{ 1035{
1036 struct OperationRequest *req;
1037 struct GNUNET_PSYCSTORE_OperationHandle *op
1038 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1039 op->h = h;
1040 op->data_cb = sccb;
1041 op->cls = sccb_cls;
1042
1043 req = (struct OperationRequest *) &op[1];
1044 op->msg = (struct GNUNET_MessageHeader *) req;
1045 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_SLAVE);
1046 req->header.size = htons (sizeof (*req));
1047 req->channel_key = *channel_key;
1048
1049 op->op_id = get_next_op_id (h);
1050 req->op_id = htonl (op->op_id);
1051
1052 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1053 transmit_next (h);
1054
1055 return op;
1056}
1057
1058
1059/**
1060 * Apply modifiers of a message to the current channel state.
1061 *
1062 * An error is returned if there are missing messages containing state
1063 * operations before the current one.
1064 *
1065 * @param h Handle for the PSYCstore.
1066 * @param channel_key The channel we are interested in.
1067 * @param message_id ID of the message that contains the @a modifiers.
1068 * @param state_delta Value of the _state_delta PSYC header variable of the message.
1069 * @param modifier_count Number of elements in the @a modifiers array.
1070 * @param modifiers List of modifiers to apply.
1071 * @param rcb Callback to call with the result of the operation.
1072 * @param rcb_cls Closure for the callback.
1073 *
1074 * @return Handle that can be used to cancel the operation.
1075 */
1076struct GNUNET_PSYCSTORE_OperationHandle *
1077GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1078 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1079 uint64_t message_id,
1080 uint64_t state_delta,
1081 size_t modifier_count,
1082 const struct GNUNET_ENV_Modifier *modifiers,
1083 GNUNET_PSYCSTORE_ResultCallback rcb,
1084 void *rcb_cls)
1085{
1086 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1087 size_t i;
1088
1089 for (i = 0; i < modifier_count; i++) {
1090 struct StateModifyRequest *req;
1091 uint16_t name_size = strlen (modifiers[i].name) + 1;
1092
1093 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1094 modifiers[i].value_size);
1095 op->h = h;
1096 op->res_cb = rcb;
1097 op->cls = rcb_cls;
1098
1099 req = (struct StateModifyRequest *) &op[1];
1100 op->msg = (struct GNUNET_MessageHeader *) req;
1101 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
1102 req->header.size = htons (sizeof (*req) + name_size
1103 + modifiers[i].value_size);
1104 req->channel_key = *channel_key;
1105 req->message_id = GNUNET_htonll (message_id);
1106 req->state_delta = GNUNET_htonll (state_delta);
1107 req->oper = modifiers[i].oper;
1108 req->name_size = htons (name_size);
1109 req->flags
1110 = 0 == i
1111 ? STATE_OP_FIRST
1112 : modifier_count - 1 == i
1113 ? STATE_OP_LAST
1114 : 0;
1115
1116 memcpy (&req[1], modifiers[i].name, name_size);
1117 memcpy ((void *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1118
1119 op->op_id = get_next_op_id (h);
1120 req->op_id = htonl (op->op_id);
1121
1122 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1123 transmit_next (h);
1124 }
1125 return op;
1126 /* FIXME: only the last operation is returned,
1127 * operation_cancel() should be able to cancel all of them.
1128 */
1129}
1130
1131
1132/**
1133 * Store synchronized state.
1134 *
1135 * @param h Handle for the PSYCstore.
1136 * @param channel_key The channel we are interested in.
1137 * @param message_id ID of the message that contains the state_hash PSYC header variable.
1138 * @param modifier_count Number of elements in the @a modifiers array.
1139 * @param modifiers Full state to store.
1140 * @param rcb Callback to call with the result of the operation.
1141 * @param rcb_cls Closure for the callback.
1142 *
1143 * @return Handle that can be used to cancel the operation.
1144 */
1145struct GNUNET_PSYCSTORE_OperationHandle *
1146GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1147 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1148 uint64_t message_id,
1149 size_t modifier_count,
1150 const struct GNUNET_ENV_Modifier *modifiers,
1151 GNUNET_PSYCSTORE_ResultCallback rcb,
1152 void *rcb_cls)
1153{
1154 struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
1155 size_t i;
1156
1157 for (i = 0; i < modifier_count; i++) {
1158 struct StateSyncRequest *req;
1159 uint16_t name_size = strlen (modifiers[i].name) + 1;
1160
1161 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
1162 modifiers[i].value_size);
1163 op->h = h;
1164 op->res_cb = rcb;
1165 op->cls = rcb_cls;
1166
1167 req = (struct StateSyncRequest *) &op[1];
1168 op->msg = (struct GNUNET_MessageHeader *) req;
1169 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
1170 req->header.size = htons (sizeof (*req) + name_size
1171 + modifiers[i].value_size);
1172 req->channel_key = *channel_key;
1173 req->message_id = GNUNET_htonll (message_id);
1174 req->name_size = htons (name_size);
1175 req->flags
1176 = 0 == i
1177 ? STATE_OP_FIRST
1178 : modifier_count - 1 == i
1179 ? STATE_OP_LAST
1180 : 0;
1181
1182 memcpy (&req[1], modifiers[i].name, name_size);
1183 memcpy ((void *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1184
1185 op->op_id = get_next_op_id (h);
1186 req->op_id = htonl (op->op_id);
1187
1188 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1189 transmit_next (h);
1190 }
1191 return op;
1192}
1193
1194
1195/**
1196 * Reset the state of a channel.
1197 *
1198 * Delete all state variables stored for the given channel.
1199 *
1200 * @param h Handle for the PSYCstore.
1201 * @param channel_key The channel we are interested in.
1202 * @param rcb Callback to call with the result of the operation.
1203 * @param rcb_cls Closure for the callback.
1204 *
1205 * @return Handle that can be used to cancel the operation.
1206 */
1207struct GNUNET_PSYCSTORE_OperationHandle *
1208GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1209 const struct GNUNET_CRYPTO_EccPublicSignKey
1210 *channel_key,
1211 GNUNET_PSYCSTORE_ResultCallback rcb,
1212 void *rcb_cls)
1213{
1214 struct OperationRequest *req;
1215 struct GNUNET_PSYCSTORE_OperationHandle *op
1216 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1217 op->h = h;
1218 op->res_cb = rcb;
1219 op->cls = rcb_cls;
1220
1221 req = (struct OperationRequest *) &op[1];
1222 op->msg = (struct GNUNET_MessageHeader *) req;
1223 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1224 req->header.size = htons (sizeof (*req));
1225 req->channel_key = *channel_key;
1226
1227 op->op_id = get_next_op_id (h);
1228 req->op_id = htonl (op->op_id);
1229
1230 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1231 transmit_next (h);
1232
1233 return op;
1234}
1235
1236
1237
1238/**
1239 * Update signed values of state variables in the state store.
1240 *
1241 * @param h Handle for the PSYCstore.
1242 * @param channel_key The channel we are interested in.
1243 * @param message_id Message ID that contained the state @a hash.
1244 * @param hash Hash of the serialized full state.
1245 * @param rcb Callback to call with the result of the operation.
1246 * @param rcb_cls Closure for the callback.
1247 *
1248 */
1249struct GNUNET_PSYCSTORE_OperationHandle *
1250GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1251 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1252 uint64_t message_id,
1253 const struct GNUNET_HashCode *hash,
1254 GNUNET_PSYCSTORE_ResultCallback rcb,
1255 void *rcb_cls)
1256{
1257 struct StateHashUpdateRequest *req;
1258 struct GNUNET_PSYCSTORE_OperationHandle *op
1259 = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1260 op->h = h;
1261 op->res_cb = rcb;
1262 op->cls = rcb_cls;
1263
1264 req = (struct StateHashUpdateRequest *) &op[1];
1265 op->msg = (struct GNUNET_MessageHeader *) req;
1266 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
1267 req->header.size = htons (sizeof (*req));
1268 req->channel_key = *channel_key;
1269 req->hash = *hash;
1270
1271 op->op_id = get_next_op_id (h);
1272 req->op_id = htonl (op->op_id);
1273
1274 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1275 transmit_next (h);
1276
1277 return op;
1278}
1279
1280
1281/**
1282 * Retrieve the best matching state variable.
1283 *
1284 * @param h Handle for the PSYCstore.
1285 * @param channel_key The channel we are interested in.
1286 * @param name Name of variable to match, the returned variable might be less specific.
1287 * @param scb Callback to return the matching state variable.
1288 * @param rcb Callback to call with the result of the operation.
1289 * @param cls Closure for the callbacks.
1290 *
1291 * @return Handle that can be used to cancel the operation.
1292 */
1293struct GNUNET_PSYCSTORE_OperationHandle *
1294GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1295 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1296 const char *name,
1297 GNUNET_PSYCSTORE_StateCallback scb,
1298 GNUNET_PSYCSTORE_ResultCallback rcb,
1299 void *cls)
1300{
1301 size_t name_size = strlen (name) + 1;
1302 struct OperationRequest *req;
1303 struct GNUNET_PSYCSTORE_OperationHandle *op
1304 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1305 op->h = h;
1306 op->data_cb = (DataCallback) scb;
1307 op->res_cb = rcb;
1308 op->cls = cls;
1309
1310 req = (struct OperationRequest *) &op[1];
1311 op->msg = (struct GNUNET_MessageHeader *) req;
1312 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
1313 req->header.size = htons (sizeof (*req) + name_size);
1314 req->channel_key = *channel_key;
1315 memcpy (&req[1], name, name_size);
1316
1317 op->op_id = get_next_op_id (h);
1318 req->op_id = htonl (op->op_id);
1319
1320 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1321 transmit_next (h);
1322
1323 return op;
1324}
1325
1326
1327
1328/**
1329 * Retrieve all state variables for a channel with the given prefix.
1330 *
1331 * @param h Handle for the PSYCstore.
1332 * @param channel_key The channel we are interested in.
1333 * @param name_prefix Prefix of state variable names to match.
1334 * @param scb Callback to return matching state variables.
1335 * @param rcb Callback to call with the result of the operation.
1336 * @param cls Closure for the callbacks.
1337 *
1338 * @return Handle that can be used to cancel the operation.
1339 */
1340struct GNUNET_PSYCSTORE_OperationHandle *
1341GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1342 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
1343 const char *name_prefix,
1344 GNUNET_PSYCSTORE_StateCallback scb,
1345 GNUNET_PSYCSTORE_ResultCallback rcb,
1346 void *cls)
1347{
1348 size_t name_size = strlen (name_prefix) + 1;
1349 struct OperationRequest *req;
1350 struct GNUNET_PSYCSTORE_OperationHandle *op
1351 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1352 op->h = h;
1353 op->data_cb = (DataCallback) scb;
1354 op->res_cb = rcb;
1355 op->cls = cls;
1356
1357 req = (struct OperationRequest *) &op[1];
1358 op->msg = (struct GNUNET_MessageHeader *) req;
1359 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
1360 req->header.size = htons (sizeof (*req) + name_size);
1361 req->channel_key = *channel_key;
1362 memcpy (&req[1], name_prefix, name_size);
1363
1364 op->op_id = get_next_op_id (h);
1365 req->op_id = htonl (op->op_id);
1366
1367 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1368 transmit_next (h);
463 1369
1370 return op;
464} 1371}
465 1372
466/* end of psycstore_api.c */ 1373/* end of psycstore_api.c */