aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/psyc_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-05-07 12:15:58 +0000
committerGabor X Toth <*@tg-x.net>2015-05-07 12:15:58 +0000
commit4725d59b468f1f30ba2910992333ca157682ce29 (patch)
tree23715ee20879c94a3363e28ea184370a4a71e44d /src/psyc/psyc_api.c
parenta5edf8ac9f03a368c87ea6163994d4ac3d62af06 (diff)
downloadgnunet-4725d59b468f1f30ba2910992333ca157682ce29.tar.gz
gnunet-4725d59b468f1f30ba2910992333ca157682ce29.zip
psyc/social: request history & state from psycstore; more documentation, tests, cleanup
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r--src/psyc/psyc_api.c532
1 files changed, 347 insertions, 185 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index ce994b272..7839aaf9e 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -43,33 +43,6 @@
43#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) 43#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
44 44
45 45
46struct OperationListItem
47{
48 struct OperationListItem *prev;
49 struct OperationListItem *next;
50
51 /**
52 * Operation ID.
53 */
54 uint64_t op_id;
55
56 /**
57 * Continuation to invoke with the result of an operation.
58 */
59 GNUNET_PSYC_ResultCallback result_cb;
60
61 /**
62 * State variable result callback.
63 */
64 GNUNET_PSYC_StateVarCallback state_var_cb;
65
66 /**
67 * Closure for the callbacks.
68 */
69 void *cls;
70};
71
72
73/** 46/**
74 * Handle to access PSYC channel operations for both the master and slaves. 47 * Handle to access PSYC channel operations for both the master and slaves.
75 */ 48 */
@@ -111,21 +84,6 @@ struct GNUNET_PSYC_Channel
111 void *disconnect_cls; 84 void *disconnect_cls;
112 85
113 /** 86 /**
114 * First operation in the linked list.
115 */
116 struct OperationListItem *op_head;
117
118 /**
119 * Last operation in the linked list.
120 */
121 struct OperationListItem *op_tail;
122
123 /**
124 * Last operation ID used.
125 */
126 uint64_t last_op_id;
127
128 /**
129 * Are we polling for incoming messages right now? 87 * Are we polling for incoming messages right now?
130 */ 88 */
131 uint8_t in_receive; 89 uint8_t in_receive;
@@ -204,83 +162,62 @@ struct GNUNET_PSYC_SlaveTransmitHandle
204}; 162};
205 163
206 164
207/** 165struct GNUNET_PSYC_HistoryRequest
208 * Get a fresh operation ID to distinguish between PSYCstore requests.
209 *
210 * @param h Handle to the PSYCstore service.
211 * @return next operation id to use
212 */
213static uint64_t
214op_get_next_id (struct GNUNET_PSYC_Channel *chn)
215{
216 return ++chn->last_op_id;
217}
218
219
220/**
221 * Find operation by ID.
222 *
223 * @return Operation, or NULL if none found.
224 */
225static struct OperationListItem *
226op_find_by_id (struct GNUNET_PSYC_Channel *chn, uint64_t op_id)
227{ 166{
228 struct OperationListItem *op = chn->op_head; 167 /**
229 while (NULL != op) 168 * Channel.
230 { 169 */
231 if (op->op_id == op_id) 170 struct GNUNET_PSYC_Channel *chn;
232 return op;
233 op = op->next;
234 }
235 return NULL;
236}
237 171
172 /**
173 * Operation ID.
174 */
175 uint64_t op_id;
238 176
239static uint64_t 177 /**
240op_add (struct GNUNET_PSYC_Channel *chn, GNUNET_PSYC_ResultCallback result_cb, 178 * Message handler.
241 void *cls) 179 */
242{ 180 struct GNUNET_PSYC_ReceiveHandle *recv;
243 if (NULL == result_cb)
244 return 0;
245 181
246 struct OperationListItem *op = GNUNET_malloc (sizeof (*op)); 182 /**
247 op->op_id = op_get_next_id (chn); 183 * Function to call when the operation finished.
248 op->result_cb = result_cb; 184 */
249 op->cls = cls; 185 GNUNET_ResultCallback result_cb;
250 GNUNET_CONTAINER_DLL_insert_tail (chn->op_head, chn->op_tail, op);
251 186
252 LOG (GNUNET_ERROR_TYPE_DEBUG, 187 /**
253 "%p Added operation #%" PRIu64 "\n", chn, op->op_id); 188 * Closure for @a result_cb.
254 return op->op_id; 189 */
255} 190 void *cls;
191};
256 192
257 193
258static int 194struct GNUNET_PSYC_StateRequest
259op_result (struct GNUNET_PSYC_Channel *chn, uint64_t op_id,
260 int64_t result_code, const char *err_msg)
261{ 195{
262 LOG (GNUNET_ERROR_TYPE_DEBUG, 196 /**
263 "%p Received result for operation #%" PRIu64 ": %" PRId64 " (%s)\n", 197 * Channel.
264 chn, op_id, result_code, err_msg); 198 */
265 if (0 == op_id) 199 struct GNUNET_PSYC_Channel *chn;
266 return GNUNET_NO;
267 200
268 struct OperationListItem *op = op_find_by_id (chn, op_id); 201 /**
269 if (NULL == op) 202 * Operation ID.
270 { 203 */
271 LOG (GNUNET_ERROR_TYPE_WARNING, 204 uint64_t op_id;
272 "Could not find operation #%" PRIu64 "\n", op_id);
273 return GNUNET_NO;
274 }
275 205
276 GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op); 206 /**
207 * State variable result callback.
208 */
209 GNUNET_PSYC_StateVarCallback var_cb;
277 210
278 if (NULL != op->result_cb) 211 /**
279 op->result_cb (op->cls, result_code, err_msg); 212 * Function to call when the operation finished.
213 */
214 GNUNET_ResultCallback result_cb;
280 215
281 GNUNET_free (op); 216 /**
282 return GNUNET_YES; 217 * Closure for @a result_cb.
283} 218 */
219 void *cls;
220};
284 221
285 222
286static void 223static void
@@ -313,22 +250,97 @@ channel_recv_result (void *cls,
313 struct GNUNET_PSYC_Channel * 250 struct GNUNET_PSYC_Channel *
314 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 251 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
315 252
253 const struct GNUNET_OperationResultMessage *
254 res = (const struct GNUNET_OperationResultMessage *) msg;
255
316 uint16_t size = ntohs (msg->size); 256 uint16_t size = ntohs (msg->size);
317 const struct OperationResult *res = (const struct OperationResult *) msg; 257 if (size < sizeof (*res))
318 const char *err_msg = NULL; 258 { /* Error, message too small. */
259 GNUNET_break (0);
260 return;
261 }
319 262
320 if (sizeof (struct OperationResult) < size) 263 uint16_t data_size = size - sizeof (*res);
321 { 264 const char *data = (0 < data_size) ? (void *) &res[1] : NULL;
322 err_msg = (const char *) &res[1]; 265 GNUNET_CLIENT_MANAGER_op_result (chn->client, GNUNET_ntohll (res->op_id),
323 if ('\0' != err_msg[size - sizeof (struct OperationResult) - 1]) 266 GNUNET_ntohll_signed (res->result_code),
324 { 267 data, data_size);
325 GNUNET_break (0); 268}
326 err_msg = NULL; 269
327 } 270
271static void
272op_recv_history_result (void *cls, int64_t result,
273 const void *data, uint16_t data_size)
274{
275 LOG (GNUNET_ERROR_TYPE_DEBUG,
276 "Received history replay result: %" PRId64 ".\n", result);
277
278 struct GNUNET_PSYC_HistoryRequest *hist = cls;
279
280 if (NULL != hist->result_cb)
281 hist->result_cb (hist->cls, result, data, data_size);
282
283 GNUNET_PSYC_receive_destroy (hist->recv);
284 GNUNET_free (hist);
285}
286
287
288static void
289op_recv_state_result (void *cls, int64_t result,
290 const void *data, uint16_t data_size)
291{
292 LOG (GNUNET_ERROR_TYPE_DEBUG,
293 "Received state request result: %" PRId64 ".\n", result);
294
295 struct GNUNET_PSYC_StateRequest *sr = cls;
296
297 if (NULL != sr->result_cb)
298 sr->result_cb (sr->cls, result, data, data_size);
299
300 GNUNET_free (sr);
301}
302
303
304static void
305channel_recv_history_result (void *cls,
306 struct GNUNET_CLIENT_MANAGER_Connection *client,
307 const struct GNUNET_MessageHeader *msg)
308{
309 struct GNUNET_PSYC_Channel *
310 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
311
312 const struct GNUNET_OperationResultMessage *
313 res = (const struct GNUNET_OperationResultMessage *) msg;
314 struct GNUNET_PSYC_MessageHeader *
315 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
316
317 LOG (GNUNET_ERROR_TYPE_DEBUG,
318 "%p Received historic fragment for message #%" PRIu64 ".\n",
319 chn, GNUNET_ntohll (pmsg->message_id));
320
321 GNUNET_ResultCallback result_cb = NULL;
322 struct GNUNET_PSYC_HistoryRequest *hist = NULL;
323
324 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client,
325 GNUNET_ntohll (res->op_id),
326 &result_cb, (void *) &hist))
327 { /* Operation not found. */
328 LOG (GNUNET_ERROR_TYPE_WARNING,
329 "%p Replay operation not found for historic fragment of message #%"
330 PRIu64 ".\n",
331 chn, GNUNET_ntohll (pmsg->message_id));
332 return;
328 } 333 }
329 334
330 op_result (chn, GNUNET_ntohll (res->op_id), 335 uint16_t size = ntohs (msg->size);
331 GNUNET_ntohll (res->result_code) + INT64_MIN, err_msg); 336 if (size < sizeof (*res) + sizeof (*pmsg))
337 { /* Error, message too small. */
338 GNUNET_break (0);
339 return;
340 }
341
342 GNUNET_PSYC_receive_message (hist->recv,
343 (const struct GNUNET_PSYC_MessageHeader *) pmsg);
332} 344}
333 345
334 346
@@ -340,12 +352,21 @@ channel_recv_state_result (void *cls,
340 struct GNUNET_PSYC_Channel * 352 struct GNUNET_PSYC_Channel *
341 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 353 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
342 354
343 const struct OperationResult *res = (const struct OperationResult *) msg; 355 const struct GNUNET_OperationResultMessage *
344 struct OperationListItem *op = op_find_by_id (chn, GNUNET_ntohll (res->op_id)); 356 res = (const struct GNUNET_OperationResultMessage *) msg;
345 if (NULL == op || NULL == op->state_var_cb) 357
358 GNUNET_ResultCallback result_cb = NULL;
359 struct GNUNET_PSYC_StateRequest *sr = NULL;
360
361 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client,
362 GNUNET_ntohll (res->op_id),
363 &result_cb, (void *) &sr))
364 { /* Operation not found. */
346 return; 365 return;
366 }
347 367
348 const struct GNUNET_MessageHeader *modc = (struct GNUNET_MessageHeader *) &op[1]; 368 const struct GNUNET_MessageHeader *
369 modc = (struct GNUNET_MessageHeader *) &res[1];
349 uint16_t modc_size = ntohs (modc->size); 370 uint16_t modc_size = ntohs (modc->size);
350 if (ntohs (msg->size) - sizeof (*msg) != modc_size) 371 if (ntohs (msg->size) - sizeof (*msg) != modc_size)
351 { 372 {
@@ -366,13 +387,13 @@ channel_recv_state_result (void *cls,
366 GNUNET_break (0); 387 GNUNET_break (0);
367 return; 388 return;
368 } 389 }
369 op->state_var_cb (op->cls, name, name + name_size, ntohs (mod->value_size)); 390 sr->var_cb (sr->cls, name, name + name_size, ntohs (mod->value_size));
370 break; 391 break;
371 } 392 }
372 393
373 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: 394 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
374 op->state_var_cb (op->cls, NULL, (const char *) &modc[1], 395 sr->var_cb (sr->cls, NULL, (const char *) &modc[1],
375 modc_size - sizeof (*modc)); 396 modc_size - sizeof (*modc));
376 break; 397 break;
377 } 398 }
378} 399}
@@ -412,11 +433,12 @@ master_recv_start_ack (void *cls,
412 433
413 struct GNUNET_PSYC_CountersResultMessage * 434 struct GNUNET_PSYC_CountersResultMessage *
414 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; 435 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
415 int32_t result = ntohl (cres->result_code) + INT32_MIN; 436 int32_t result = GNUNET_ntohl_signed (cres->result_code);
416 if (GNUNET_OK != result && GNUNET_NO != result) 437 if (GNUNET_OK != result && GNUNET_NO != result)
417 { 438 {
418 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master.\n"); 439 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master: %ld\n", result);
419 GNUNET_break (0); 440 GNUNET_break (0);
441 /* FIXME: disconnect */
420 } 442 }
421 if (NULL != mst->start_cb) 443 if (NULL != mst->start_cb)
422 mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); 444 mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
@@ -464,11 +486,12 @@ slave_recv_join_ack (void *cls,
464 sizeof (struct GNUNET_PSYC_Channel)); 486 sizeof (struct GNUNET_PSYC_Channel));
465 struct GNUNET_PSYC_CountersResultMessage * 487 struct GNUNET_PSYC_CountersResultMessage *
466 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; 488 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
467 int32_t result = ntohl (cres->result_code) + INT32_MIN; 489 int32_t result = GNUNET_ntohl_signed (cres->result_code);
468 if (GNUNET_YES != result && GNUNET_NO != result) 490 if (GNUNET_YES != result && GNUNET_NO != result)
469 { 491 {
470 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n"); 492 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n");
471 GNUNET_break (0); 493 GNUNET_break (0);
494 /* FIXME: disconnect */
472 } 495 }
473 if (NULL != slv->connect_cb) 496 if (NULL != slv->connect_cb)
474 slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); 497 slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
@@ -513,13 +536,17 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] =
513 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, 536 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
514 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, 537 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES },
515 538
539 { &channel_recv_history_result, NULL,
540 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
541 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
542
516 { &channel_recv_state_result, NULL, 543 { &channel_recv_state_result, NULL,
517 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, 544 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
518 sizeof (struct OperationResult), GNUNET_YES }, 545 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
519 546
520 { &channel_recv_result, NULL, 547 { &channel_recv_result, NULL,
521 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, 548 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
522 sizeof (struct OperationResult), GNUNET_YES }, 549 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
523 550
524 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 551 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
525 552
@@ -545,13 +572,17 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] =
545 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 572 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
546 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, 573 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES },
547 574
575 { &channel_recv_history_result, NULL,
576 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
577 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
578
548 { &channel_recv_state_result, NULL, 579 { &channel_recv_state_result, NULL,
549 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, 580 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
550 sizeof (struct OperationResult), GNUNET_YES }, 581 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
551 582
552 { &channel_recv_result, NULL, 583 { &channel_recv_result, NULL,
553 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, 584 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
554 sizeof (struct OperationResult), GNUNET_YES }, 585 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
555 586
556 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 587 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
557 588
@@ -1011,17 +1042,28 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv)
1011 * correctly; not doing so correctly will result in either denying other slaves 1042 * correctly; not doing so correctly will result in either denying other slaves
1012 * access or offering access to channel data to non-members. 1043 * access or offering access to channel data to non-members.
1013 * 1044 *
1014 * @param channel Channel handle. 1045 * @param chn
1015 * @param slave_key Identity of channel slave to add. 1046 * Channel handle.
1016 * @param announced_at ID of the message that announced the membership change. 1047 * @param slave_key
1017 * @param effective_since Addition of slave is in effect since this message ID. 1048 * Identity of channel slave to add.
1049 * @param announced_at
1050 * ID of the message that announced the membership change.
1051 * @param effective_since
1052 * Addition of slave is in effect since this message ID.
1053 * @param result_cb
1054 * Function to call with the result of the operation.
1055 * The @e result_code argument is #GNUNET_OK on success, or
1056 * #GNUNET_SYSERR on error. In case of an error, the @e data argument
1057 * can contain an optional error message.
1058 * @param cls
1059 * Closure for @a result_cb.
1018 */ 1060 */
1019void 1061void
1020GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, 1062GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1021 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 1063 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1022 uint64_t announced_at, 1064 uint64_t announced_at,
1023 uint64_t effective_since, 1065 uint64_t effective_since,
1024 GNUNET_PSYC_ResultCallback result_cb, 1066 GNUNET_ResultCallback result_cb,
1025 void *cls) 1067 void *cls)
1026{ 1068{
1027 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); 1069 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req));
@@ -1031,7 +1073,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1031 req->announced_at = GNUNET_htonll (announced_at); 1073 req->announced_at = GNUNET_htonll (announced_at);
1032 req->effective_since = GNUNET_htonll (effective_since); 1074 req->effective_since = GNUNET_htonll (effective_since);
1033 req->did_join = GNUNET_YES; 1075 req->did_join = GNUNET_YES;
1034 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); 1076 req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client,
1077 result_cb, cls));
1035 1078
1036 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1079 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1037} 1080}
@@ -1054,15 +1097,25 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1054 * denying members access or offering access to channel data to 1097 * denying members access or offering access to channel data to
1055 * non-members. 1098 * non-members.
1056 * 1099 *
1057 * @param channel Channel handle. 1100 * @param chn
1058 * @param slave_key Identity of channel slave to remove. 1101 * Channel handle.
1059 * @param announced_at ID of the message that announced the membership change. 1102 * @param slave_key
1103 * Identity of channel slave to remove.
1104 * @param announced_at
1105 * ID of the message that announced the membership change.
1106 * @param result_cb
1107 * Function to call with the result of the operation.
1108 * The @e result_code argument is #GNUNET_OK on success, or
1109 * #GNUNET_SYSERR on error. In case of an error, the @e data argument
1110 * can contain an optional error message.
1111 * @param cls
1112 * Closure for @a result_cb.
1060 */ 1113 */
1061void 1114void
1062GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, 1115GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1063 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 1116 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1064 uint64_t announced_at, 1117 uint64_t announced_at,
1065 GNUNET_PSYC_ResultCallback result_cb, 1118 GNUNET_ResultCallback result_cb,
1066 void *cls) 1119 void *cls)
1067{ 1120{
1068 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); 1121 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req));
@@ -1071,17 +1124,62 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1071 req->slave_key = *slave_key; 1124 req->slave_key = *slave_key;
1072 req->announced_at = GNUNET_htonll (announced_at); 1125 req->announced_at = GNUNET_htonll (announced_at);
1073 req->did_join = GNUNET_NO; 1126 req->did_join = GNUNET_NO;
1074 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); 1127 req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client,
1128 result_cb, cls));
1129
1130 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1131}
1132
1133
1134static struct GNUNET_PSYC_HistoryRequest *
1135channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1136 uint64_t start_message_id,
1137 uint64_t end_message_id,
1138 uint64_t message_limit,
1139 const char *method_prefix,
1140 uint32_t flags,
1141 GNUNET_PSYC_MessageCallback message_cb,
1142 GNUNET_PSYC_MessagePartCallback message_part_cb,
1143 GNUNET_ResultCallback result_cb,
1144 void *cls)
1145{
1146 struct GNUNET_PSYC_HistoryRequestMessage *req;
1147 struct GNUNET_PSYC_HistoryRequest *hist = GNUNET_malloc (sizeof (*hist));
1148 hist->chn = chn;
1149 hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
1150 hist->result_cb = result_cb;
1151 hist->cls = cls;
1152 hist->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client,
1153 &op_recv_history_result, hist);
1154
1155 GNUNET_assert (NULL != method_prefix);
1156 uint16_t method_size = strnlen (method_prefix,
1157 GNUNET_SERVER_MAX_MESSAGE_SIZE
1158 - sizeof (*req)) + 1;
1159 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1160 req = GNUNET_malloc (sizeof (*req) + method_size);
1161 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
1162 req->header.size = htons (sizeof (*req) + method_size);
1163 req->start_message_id = GNUNET_htonll (start_message_id);
1164 req->end_message_id = GNUNET_htonll (end_message_id);
1165 req->message_limit = GNUNET_htonll (message_limit);
1166 req->flags = htonl (flags);
1167 req->op_id = GNUNET_htonll (hist->op_id);
1168 memcpy (&req[1], method_prefix, method_size);
1075 1169
1076 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1170 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1171 return hist;
1077} 1172}
1078 1173
1079 1174
1080/** 1175/**
1081 * Request to replay a part of the message history of the channel. 1176 * Request to replay a part of the message history of the channel.
1082 * 1177 *
1083 * Historic messages (but NOT the state at the time) will be replayed (given to 1178 * Historic messages (but NOT the state at the time) will be replayed and given
1084 * the normal method handlers) if available and if access is permitted. 1179 * to the normal method handlers with a #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
1180 *
1181 * Messages are retrieved from the local PSYCstore if available,
1182 * otherwise requested from the network.
1085 * 1183 *
1086 * @param channel 1184 * @param channel
1087 * Which channel should be replayed? 1185 * Which channel should be replayed?
@@ -1089,8 +1187,10 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1089 * Earliest interesting point in history. 1187 * Earliest interesting point in history.
1090 * @param end_message_id 1188 * @param end_message_id
1091 * Last (inclusive) interesting point in history. 1189 * Last (inclusive) interesting point in history.
1092 * FIXME: @param method_prefix 1190 * @param method_prefix
1093 * Retrieve only messages with a matching method prefix. 1191 * Retrieve only messages with a matching method prefix.
1192 * @param flags
1193 * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
1094 * @param result_cb 1194 * @param result_cb
1095 * Function to call when the requested history has been fully replayed. 1195 * Function to call when the requested history has been fully replayed.
1096 * @param cls 1196 * @param cls
@@ -1098,22 +1198,20 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1098 * 1198 *
1099 * @return Handle to cancel history replay operation. 1199 * @return Handle to cancel history replay operation.
1100 */ 1200 */
1101void 1201struct GNUNET_PSYC_HistoryRequest *
1102GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn, 1202GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1103 uint64_t start_message_id, 1203 uint64_t start_message_id,
1104 uint64_t end_message_id, 1204 uint64_t end_message_id,
1105 /* FIXME: const char *method_prefix, */ 1205 const char *method_prefix,
1106 GNUNET_PSYC_ResultCallback result_cb, 1206 uint32_t flags,
1207 GNUNET_PSYC_MessageCallback message_cb,
1208 GNUNET_PSYC_MessagePartCallback message_part_cb,
1209 GNUNET_ResultCallback result_cb,
1107 void *cls) 1210 void *cls)
1108{ 1211{
1109 struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); 1212 return channel_history_replay (chn, start_message_id, end_message_id, 0,
1110 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); 1213 method_prefix, flags,
1111 req->header.size = htons (sizeof (*req)); 1214 message_cb, message_part_cb, result_cb, cls);
1112 req->start_message_id = GNUNET_htonll (start_message_id);
1113 req->end_message_id = GNUNET_htonll (end_message_id);
1114 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1115
1116 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1117} 1215}
1118 1216
1119 1217
@@ -1127,8 +1225,11 @@ GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1127 * Which channel should be replayed? 1225 * Which channel should be replayed?
1128 * @param message_limit 1226 * @param message_limit
1129 * Maximum number of messages to replay. 1227 * Maximum number of messages to replay.
1130 * FIXME: @param method_prefix 1228 * @param method_prefix
1131 * Retrieve only messages with a matching method prefix. 1229 * Retrieve only messages with a matching method prefix.
1230 * Use NULL or "" to retrieve all.
1231 * @param flags
1232 * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
1132 * @param result_cb 1233 * @param result_cb
1133 * Function to call when the requested history has been fully replayed. 1234 * Function to call when the requested history has been fully replayed.
1134 * @param cls 1235 * @param cls
@@ -1136,20 +1237,78 @@ GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1136 * 1237 *
1137 * @return Handle to cancel history replay operation. 1238 * @return Handle to cancel history replay operation.
1138 */ 1239 */
1139void 1240struct GNUNET_PSYC_HistoryRequest *
1140GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn, 1241GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn,
1141 uint64_t message_limit, 1242 uint64_t message_limit,
1142 /* FIXME: const char *method_prefix, */ 1243 const char *method_prefix,
1143 GNUNET_PSYC_ResultCallback result_cb, 1244 uint32_t flags,
1245 GNUNET_PSYC_MessageCallback message_cb,
1246 GNUNET_PSYC_MessagePartCallback message_part_cb,
1247 GNUNET_ResultCallback result_cb,
1144 void *cls) 1248 void *cls)
1145{ 1249{
1146 struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); 1250 return channel_history_replay (chn, 0, 0, message_limit, method_prefix, flags,
1147 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); 1251 message_cb, message_part_cb, result_cb, cls);
1148 req->header.size = htons (sizeof (*req)); 1252}
1149 req->message_limit = GNUNET_htonll (message_limit); 1253
1150 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); 1254
1255void
1256GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel,
1257 struct GNUNET_PSYC_HistoryRequest *hist)
1258{
1259 GNUNET_PSYC_receive_destroy (hist->recv);
1260 GNUNET_CLIENT_MANAGER_op_cancel (hist->chn->client, hist->op_id);
1261 GNUNET_free (hist);
1262}
1263
1264
1265/**
1266 * Retrieve the best matching channel state variable.
1267 *
1268 * If the requested variable name is not present in the state, the nearest
1269 * less-specific name is matched; for example, requesting "_a_b" will match "_a"
1270 * if "_a_b" does not exist.
1271 *
1272 * @param channel
1273 * Channel handle.
1274 * @param full_name
1275 * Full name of the requested variable.
1276 * The actual variable returned might have a shorter name.
1277 * @param var_cb
1278 * Function called once when a matching state variable is found.
1279 * Not called if there's no matching state variable.
1280 * @param result_cb
1281 * Function called after the operation finished.
1282 * (i.e. all state variables have been returned via @a state_cb)
1283 * @param cls
1284 * Closure for the callbacks.
1285 */
1286static struct GNUNET_PSYC_StateRequest *
1287channel_state_get (struct GNUNET_PSYC_Channel *chn,
1288 uint16_t type, const char *name,
1289 GNUNET_PSYC_StateVarCallback var_cb,
1290 GNUNET_ResultCallback result_cb, void *cls)
1291{
1292 struct StateRequest *req;
1293 struct GNUNET_PSYC_StateRequest *sr = GNUNET_malloc (sizeof (*sr));
1294 sr->chn = chn;
1295 sr->var_cb = var_cb;
1296 sr->result_cb = result_cb;
1297 sr->cls = cls;
1298 sr->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client,
1299 &op_recv_state_result, sr);
1300
1301 GNUNET_assert (NULL != name);
1302 size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE
1303 - sizeof (*req)) + 1;
1304 req = GNUNET_malloc (sizeof (*req) + name_size);
1305 req->header.type = htons (type);
1306 req->header.size = htons (sizeof (*req) + name_size);
1307 req->op_id = GNUNET_htonll (sr->op_id);
1308 memcpy (&req[1], name, name_size);
1151 1309
1152 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1310 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1311 return sr;
1153} 1312}
1154 1313
1155 1314
@@ -1174,21 +1333,16 @@ GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn,
1174 * @param cls 1333 * @param cls
1175 * Closure for the callbacks. 1334 * Closure for the callbacks.
1176 */ 1335 */
1177void 1336struct GNUNET_PSYC_StateRequest *
1178GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn, 1337GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn,
1179 const char *full_name, 1338 const char *full_name,
1180 GNUNET_PSYC_StateVarCallback var_cb, 1339 GNUNET_PSYC_StateVarCallback var_cb,
1181 GNUNET_PSYC_ResultCallback result_cb, 1340 GNUNET_ResultCallback result_cb,
1182 void *cls) 1341 void *cls)
1183{ 1342{
1184 size_t name_size = strlen (full_name) + 1; 1343 return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
1185 struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); 1344 full_name, var_cb, result_cb, cls);
1186 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET);
1187 req->header.size = htons (sizeof (*req) + name_size);
1188 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1189 memcpy (&req[1], full_name, name_size);
1190 1345
1191 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1192} 1346}
1193 1347
1194 1348
@@ -1215,21 +1369,29 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn,
1215 * @param cls 1369 * @param cls
1216 * Closure for the callbacks. 1370 * Closure for the callbacks.
1217 */ 1371 */
1218void 1372struct GNUNET_PSYC_StateRequest *
1219GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn, 1373GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn,
1220 const char *name_prefix, 1374 const char *name_prefix,
1221 GNUNET_PSYC_StateVarCallback var_cb, 1375 GNUNET_PSYC_StateVarCallback var_cb,
1222 GNUNET_PSYC_ResultCallback result_cb, 1376 GNUNET_ResultCallback result_cb,
1223 void *cls) 1377 void *cls)
1224{ 1378{
1225 size_t name_size = strlen (name_prefix) + 1; 1379 return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
1226 struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); 1380 name_prefix, var_cb, result_cb, cls);
1227 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET); 1381}
1228 req->header.size = htons (sizeof (*req) + name_size);
1229 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1230 memcpy (&req[1], name_prefix, name_size);
1231 1382
1232 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1383
1384/**
1385 * Cancel a state request operation.
1386 *
1387 * @param sr
1388 * Handle for the operation to cancel.
1389 */
1390void
1391GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr)
1392{
1393 GNUNET_CLIENT_MANAGER_op_cancel (sr->chn->client, sr->op_id);
1394 GNUNET_free (sr);
1233} 1395}
1234 1396
1235/* end of psyc_api.c */ 1397/* end of psyc_api.c */