aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-07-30 21:18:13 +0000
committerGabor X Toth <*@tg-x.net>2014-07-30 21:18:13 +0000
commit40884377f3126bbecbfd3243d47224b3094914f9 (patch)
tree9f32aab9064b199178282a0c9918313e0aa30049 /src/psycstore
parent831718fa44b2c56577aa4e36a479fef6debb8cea (diff)
downloadgnunet-40884377f3126bbecbfd3243d47224b3094914f9.tar.gz
gnunet-40884377f3126bbecbfd3243d47224b3094914f9.zip
psyc, psycstore: retrieve state and history
Diffstat (limited to 'src/psycstore')
-rw-r--r--src/psycstore/gnunet-service-psycstore.c97
-rw-r--r--src/psycstore/plugin_psycstore_sqlite.c238
-rw-r--r--src/psycstore/psycstore.h160
-rw-r--r--src/psycstore/psycstore_api.c309
-rw-r--r--src/psycstore/test_plugin_psycstore.c48
-rw-r--r--src/psycstore/test_psycstore.c65
6 files changed, 657 insertions, 260 deletions
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c
index 7d27ea29b..87a2c87ab 100644
--- a/src/psycstore/gnunet-service-psycstore.c
+++ b/src/psycstore/gnunet-service-psycstore.c
@@ -25,6 +25,8 @@
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 */ 26 */
27 27
28#include <inttypes.h>
29
28#include "platform.h" 30#include "platform.h"
29#include "gnunet_util_lib.h" 31#include "gnunet_util_lib.h"
30#include "gnunet_constants.h" 32#include "gnunet_constants.h"
@@ -89,39 +91,41 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
89/** 91/**
90 * Send a result code back to the client. 92 * Send a result code back to the client.
91 * 93 *
92 * @param client Client that should receive the result code. 94 * @param client
93 * @param result_code Code to transmit. 95 * Client that should receive the result code.
94 * @param op_id Operation ID. 96 * @param result_code
95 * @param err_msg Error message to include (or NULL for none). 97 * Code to transmit.
98 * @param op_id
99 * Operation ID in network byte order.
100 * @param err_msg
101 * Error message to include (or NULL for none).
96 */ 102 */
97static void 103static void
98send_result_code (struct GNUNET_SERVER_Client *client, uint32_t result_code, 104send_result_code (struct GNUNET_SERVER_Client *client, uint64_t op_id,
99 uint32_t op_id, const char *err_msg) 105 int64_t result_code, const char *err_msg)
100{ 106{
101 struct OperationResult *res; 107 struct OperationResult *res;
102 size_t err_len; 108 size_t err_len = 0; // FIXME: maximum length
103 109
104 if (NULL == err_msg) 110 if (NULL != err_msg)
105 err_len = 0;
106 else
107 err_len = strlen (err_msg) + 1; 111 err_len = strlen (err_msg) + 1;
108 res = GNUNET_malloc (sizeof (struct OperationResult) + err_len); 112 res = GNUNET_malloc (sizeof (struct OperationResult) + err_len);
109 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE); 113 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE);
110 res->header.size = htons (sizeof (struct OperationResult) + err_len); 114 res->header.size = htons (sizeof (struct OperationResult) + err_len);
111 res->result_code = htonl (result_code); 115 res->result_code = GNUNET_htonll (result_code - INT64_MIN);
112 res->op_id = op_id; 116 res->op_id = op_id;
113 if (0 < err_len) 117 if (0 < err_len)
114 memcpy (&res[1], err_msg, err_len); 118 memcpy (&res[1], err_msg, err_len);
115 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 119 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
116 "Sending result %d (%s) to client\n", 120 "Sending result to client: %" PRId64 " (%s)\n",
117 (int) result_code, 121 result_code, err_msg);
118 err_msg);
119 GNUNET_SERVER_notification_context_add (nc, client); 122 GNUNET_SERVER_notification_context_add (nc, client);
120 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header, 123 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
121 GNUNET_NO); 124 GNUNET_NO);
122 GNUNET_free (res); 125 GNUNET_free (res);
123} 126}
124 127
128
125enum 129enum
126{ 130{
127 MEMBERSHIP_TEST_NOT_NEEDED = 0, 131 MEMBERSHIP_TEST_NOT_NEEDED = 0,
@@ -129,6 +133,7 @@ enum
129 MEMBERSHIP_TEST_DONE = 2, 133 MEMBERSHIP_TEST_DONE = 2,
130} MessageMembershipTest; 134} MessageMembershipTest;
131 135
136
132struct SendClosure 137struct SendClosure
133{ 138{
134 struct GNUNET_SERVER_Client *client; 139 struct GNUNET_SERVER_Client *client;
@@ -158,7 +163,6 @@ struct SendClosure
158 * @see enum MessageMembershipTest 163 * @see enum MessageMembershipTest
159 */ 164 */
160 uint8_t membership_test; 165 uint8_t membership_test;
161
162}; 166};
163 167
164 168
@@ -214,6 +218,8 @@ send_state_var (void *cls, const char *name,
214 struct StateResult *res; 218 struct StateResult *res;
215 size_t name_size = strlen (name) + 1; 219 size_t name_size = strlen (name) + 1;
216 220
221 /* FIXME: split up value into 64k chunks */
222
217 res = GNUNET_malloc (sizeof (struct StateResult) + name_size + value_size); 223 res = GNUNET_malloc (sizeof (struct StateResult) + name_size + value_size);
218 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE); 224 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE);
219 res->header.size = htons (sizeof (struct StateResult) + name_size + value_size); 225 res->header.size = htons (sizeof (struct StateResult) + name_size + value_size);
@@ -249,7 +255,7 @@ handle_membership_store (void *cls,
249 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 255 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
250 _("Failed to store membership information!\n")); 256 _("Failed to store membership information!\n"));
251 257
252 send_result_code (client, ret, req->op_id, NULL); 258 send_result_code (client, req->op_id, ret, NULL);
253 GNUNET_SERVER_receive_done (client, GNUNET_OK); 259 GNUNET_SERVER_receive_done (client, GNUNET_OK);
254} 260}
255 261
@@ -274,7 +280,7 @@ handle_membership_test (void *cls,
274 _("Failed to test membership!\n")); 280 _("Failed to test membership!\n"));
275 } 281 }
276 282
277 send_result_code (client, ret, req->op_id, NULL); 283 send_result_code (client, req->op_id, ret, NULL);
278 GNUNET_SERVER_receive_done (client, GNUNET_OK); 284 GNUNET_SERVER_receive_done (client, GNUNET_OK);
279} 285}
280 286
@@ -295,7 +301,7 @@ handle_fragment_store (void *cls,
295 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 301 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
296 _("Failed to store fragment!\n")); 302 _("Failed to store fragment!\n"));
297 303
298 send_result_code (client, ret, req->op_id, NULL); 304 send_result_code (client, req->op_id, ret, NULL);
299 GNUNET_SERVER_receive_done (client, GNUNET_OK); 305 GNUNET_SERVER_receive_done (client, GNUNET_OK);
300} 306}
301 307
@@ -312,9 +318,20 @@ handle_fragment_get (void *cls,
312 .channel_key = req->channel_key, .slave_key = req->slave_key, 318 .channel_key = req->channel_key, .slave_key = req->slave_key,
313 .membership_test = req->do_membership_test }; 319 .membership_test = req->do_membership_test };
314 320
315 int ret = db->fragment_get (db->cls, &req->channel_key, 321 int64_t ret;
316 GNUNET_ntohll (req->fragment_id), 322 uint64_t ret_frags = 0;
317 &send_fragment, &sc); 323 uint64_t first_fragment_id = GNUNET_ntohll (req->first_fragment_id);
324 uint64_t last_fragment_id = GNUNET_ntohll (req->last_fragment_id);
325 uint64_t limit = GNUNET_ntohll (req->fragment_limit);
326
327 if (0 == limit)
328 ret = db->fragment_get (db->cls, &req->channel_key,
329 first_fragment_id, last_fragment_id,
330 &ret_frags, &send_fragment, &sc);
331 else
332 ret = db->fragment_get_latest (db->cls, &req->channel_key, limit,
333 &ret_frags, &send_fragment, &sc);
334
318 switch (ret) 335 switch (ret)
319 { 336 {
320 case GNUNET_YES: 337 case GNUNET_YES:
@@ -340,8 +357,7 @@ handle_fragment_get (void *cls,
340 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 357 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
341 _("Failed to get fragment!\n")); 358 _("Failed to get fragment!\n"));
342 } 359 }
343 360 send_result_code (client, req->op_id, (ret < 0) ? ret : ret_frags, NULL);
344 send_result_code (client, ret, req->op_id, NULL);
345 GNUNET_SERVER_receive_done (client, GNUNET_OK); 361 GNUNET_SERVER_receive_done (client, GNUNET_OK);
346} 362}
347 363
@@ -358,22 +374,31 @@ handle_message_get (void *cls,
358 .channel_key = req->channel_key, .slave_key = req->slave_key, 374 .channel_key = req->channel_key, .slave_key = req->slave_key,
359 .membership_test = req->do_membership_test }; 375 .membership_test = req->do_membership_test };
360 376
377 int64_t ret;
361 uint64_t ret_frags = 0; 378 uint64_t ret_frags = 0;
362 int64_t ret = db->message_get (db->cls, &req->channel_key, 379 uint64_t first_message_id = GNUNET_ntohll (req->first_message_id);
363 GNUNET_ntohll (req->message_id), 380 uint64_t last_message_id = GNUNET_ntohll (req->last_message_id);
364 &ret_frags, &send_fragment, &sc); 381 uint64_t limit = GNUNET_ntohll (req->message_limit);
382
383 if (0 == limit)
384 ret = db->message_get (db->cls, &req->channel_key,
385 first_message_id, last_message_id,
386 &ret_frags, &send_fragment, &sc);
387 else
388 ret = db->message_get_latest (db->cls, &req->channel_key, limit,
389 &ret_frags, &send_fragment, &sc);
390
365 switch (ret) 391 switch (ret)
366 { 392 {
367 case GNUNET_YES: 393 case GNUNET_YES:
368 case GNUNET_NO: 394 case GNUNET_NO:
369 break; 395 break;
370 default: 396 default:
371 ret_frags = ret;
372 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 397 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
373 _("Failed to get message!\n")); 398 _("Failed to get message!\n"));
374 } 399 }
375 400
376 send_result_code (client, ret_frags, req->op_id, NULL); 401 send_result_code (client, req->op_id, (ret < 0) ? ret : ret_frags, NULL);
377 GNUNET_SERVER_receive_done (client, GNUNET_OK); 402 GNUNET_SERVER_receive_done (client, GNUNET_OK);
378} 403}
379 404
@@ -404,7 +429,7 @@ handle_message_get_fragment (void *cls,
404 _("Failed to get message fragment!\n")); 429 _("Failed to get message fragment!\n"));
405 } 430 }
406 431
407 send_result_code (client, ret, req->op_id, NULL); 432 send_result_code (client, req->op_id, ret, NULL);
408 GNUNET_SERVER_receive_done (client, GNUNET_OK); 433 GNUNET_SERVER_receive_done (client, GNUNET_OK);
409} 434}
410 435
@@ -434,7 +459,7 @@ handle_counters_get (void *cls,
434 459
435 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS); 460 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS);
436 res.header.size = htons (sizeof (res)); 461 res.header.size = htons (sizeof (res));
437 res.result_code = htonl (ret); 462 res.result_code = htonl (ret - INT32_MIN);
438 res.op_id = req->op_id; 463 res.op_id = req->op_id;
439 res.max_fragment_id = GNUNET_htonll (res.max_fragment_id); 464 res.max_fragment_id = GNUNET_htonll (res.max_fragment_id);
440 res.max_message_id = GNUNET_htonll (res.max_message_id); 465 res.max_message_id = GNUNET_htonll (res.max_message_id);
@@ -517,7 +542,7 @@ handle_state_modify (void *cls,
517 _("Failed to end modifying state!\n")); 542 _("Failed to end modifying state!\n"));
518 } 543 }
519 } 544 }
520 send_result_code (client, ret, req->op_id, NULL); 545 send_result_code (client, req->op_id, ret, NULL);
521 GNUNET_SERVER_receive_done (client, GNUNET_OK); 546 GNUNET_SERVER_receive_done (client, GNUNET_OK);
522} 547}
523 548
@@ -571,7 +596,7 @@ handle_state_sync (void *cls,
571 _("Failed to end synchronizing state!\n")); 596 _("Failed to end synchronizing state!\n"));
572 } 597 }
573 } 598 }
574 send_result_code (client, ret, req->op_id, NULL); 599 send_result_code (client, req->op_id, ret, NULL);
575 GNUNET_SERVER_receive_done (client, GNUNET_OK); 600 GNUNET_SERVER_receive_done (client, GNUNET_OK);
576} 601}
577 602
@@ -590,7 +615,7 @@ handle_state_reset (void *cls,
590 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 615 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
591 _("Failed to reset state!\n")); 616 _("Failed to reset state!\n"));
592 617
593 send_result_code (client, ret, req->op_id, NULL); 618 send_result_code (client, req->op_id, ret, NULL);
594 GNUNET_SERVER_receive_done (client, GNUNET_OK); 619 GNUNET_SERVER_receive_done (client, GNUNET_OK);
595} 620}
596 621
@@ -609,7 +634,7 @@ handle_state_hash_update (void *cls,
609 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 634 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
610 _("Failed to reset state!\n")); 635 _("Failed to reset state!\n"));
611 636
612 send_result_code (client, ret, req->op_id, NULL); 637 send_result_code (client, req->op_id, ret, NULL);
613 GNUNET_SERVER_receive_done (client, GNUNET_OK); 638 GNUNET_SERVER_receive_done (client, GNUNET_OK);
614} 639}
615 640
@@ -660,7 +685,7 @@ handle_state_get (void *cls,
660 _("Failed to get state variable!\n")); 685 _("Failed to get state variable!\n"));
661 } 686 }
662 687
663 send_result_code (client, ret, req->op_id, NULL); 688 send_result_code (client, req->op_id, ret, NULL);
664 GNUNET_SERVER_receive_done (client, GNUNET_OK); 689 GNUNET_SERVER_receive_done (client, GNUNET_OK);
665} 690}
666 691
@@ -699,7 +724,7 @@ handle_state_get_prefix (void *cls,
699 _("Failed to get state variable!\n")); 724 _("Failed to get state variable!\n"));
700 } 725 }
701 726
702 send_result_code (client, ret, req->op_id, NULL); 727 send_result_code (client, req->op_id, ret, NULL);
703 GNUNET_SERVER_receive_done (client, GNUNET_OK); 728 GNUNET_SERVER_receive_done (client, GNUNET_OK);
704} 729}
705 730
diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c
index 86b969c5d..cb6c5c437 100644
--- a/src/psycstore/plugin_psycstore_sqlite.c
+++ b/src/psycstore/plugin_psycstore_sqlite.c
@@ -130,12 +130,22 @@ struct Plugin
130 /** 130 /**
131 * Precompiled SQL for fragment_get() 131 * Precompiled SQL for fragment_get()
132 */ 132 */
133 sqlite3_stmt *select_fragment; 133 sqlite3_stmt *select_fragments;
134
135 /**
136 * Precompiled SQL for fragment_get()
137 */
138 sqlite3_stmt *select_latest_fragments;
134 139
135 /** 140 /**
136 * Precompiled SQL for message_get() 141 * Precompiled SQL for message_get()
137 */ 142 */
138 sqlite3_stmt *select_message; 143 sqlite3_stmt *select_messages;
144
145 /**
146 * Precompiled SQL for message_get()
147 */
148 sqlite3_stmt *select_latest_messages;
139 149
140 /** 150 /**
141 * Precompiled SQL for message_get_fragment() 151 * Precompiled SQL for message_get_fragment()
@@ -456,8 +466,8 @@ database_setup (struct Plugin *plugin)
456 " multicast_flags, psycstore_flags, data\n" 466 " multicast_flags, psycstore_flags, data\n"
457 "FROM messages\n" 467 "FROM messages\n"
458 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" 468 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
459 " AND fragment_id = ?;", 469 " AND ? <= fragment_id AND fragment_id <= ?;",
460 &plugin->select_fragment); 470 &plugin->select_fragments);
461 471
462 sql_prepare (plugin->dbh, 472 sql_prepare (plugin->dbh,
463 "SELECT hop_counter, signature, purpose, fragment_id,\n" 473 "SELECT hop_counter, signature, purpose, fragment_id,\n"
@@ -465,8 +475,35 @@ database_setup (struct Plugin *plugin)
465 " multicast_flags, psycstore_flags, data\n" 475 " multicast_flags, psycstore_flags, data\n"
466 "FROM messages\n" 476 "FROM messages\n"
467 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" 477 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
468 " AND message_id = ? AND fragment_offset = ?;", 478 " AND ? <= message_id AND message_id <= ?;",
469 &plugin->select_message_fragment); 479 &plugin->select_messages);
480
481 sql_prepare (plugin->dbh,
482 "SELECT * FROM\n"
483 "(SELECT hop_counter, signature, purpose, fragment_id,\n"
484 " fragment_offset, message_id, group_generation,\n"
485 " multicast_flags, psycstore_flags, data\n"
486 " FROM messages\n"
487 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
488 " ORDER BY fragment_id DESC\n"
489 " LIMIT ?)\n"
490 "ORDER BY fragment_id;",
491 &plugin->select_latest_fragments);
492
493 sql_prepare (plugin->dbh,
494 "SELECT hop_counter, signature, purpose, fragment_id,\n"
495 " fragment_offset, message_id, group_generation,\n"
496 " multicast_flags, psycstore_flags, data\n"
497 "FROM messages\n"
498 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
499 " AND message_id IN\n"
500 " (SELECT message_id\n"
501 " FROM messages\n"
502 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
503 " ORDER BY message_id\n"
504 " DESC LIMIT ?)\n"
505 "ORDER BY fragment_id;",
506 &plugin->select_latest_messages);
470 507
471 sql_prepare (plugin->dbh, 508 sql_prepare (plugin->dbh,
472 "SELECT hop_counter, signature, purpose, fragment_id,\n" 509 "SELECT hop_counter, signature, purpose, fragment_id,\n"
@@ -474,8 +511,8 @@ database_setup (struct Plugin *plugin)
474 " multicast_flags, psycstore_flags, data\n" 511 " multicast_flags, psycstore_flags, data\n"
475 "FROM messages\n" 512 "FROM messages\n"
476 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" 513 "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
477 " AND message_id = ?;", 514 " AND message_id = ? AND fragment_offset = ?;",
478 &plugin->select_message); 515 &plugin->select_message_fragment);
479 516
480 sql_prepare (plugin->dbh, 517 sql_prepare (plugin->dbh,
481 "SELECT fragment_id, message_id, group_generation\n" 518 "SELECT fragment_id, message_id, group_generation\n"
@@ -1036,8 +1073,42 @@ fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1036 return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8)); 1073 return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
1037} 1074}
1038 1075
1076
1077static int
1078fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
1079 uint64_t *returned_fragments,
1080 GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
1081{
1082 int ret = GNUNET_SYSERR;
1083 int sql_ret;
1084
1085 do
1086 {
1087 sql_ret = sqlite3_step (stmt);
1088 switch (sql_ret)
1089 {
1090 case SQLITE_DONE:
1091 if (ret != GNUNET_OK)
1092 ret = GNUNET_NO;
1093 break;
1094 case SQLITE_ROW:
1095 ret = fragment_row (stmt, cb, cb_cls);
1096 (*returned_fragments)++;
1097 if (ret != GNUNET_YES)
1098 sql_ret = SQLITE_DONE;
1099 break;
1100 default:
1101 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1102 "sqlite3_step");
1103 }
1104 }
1105 while (sql_ret == SQLITE_ROW);
1106
1107 return ret;
1108}
1109
1039/** 1110/**
1040 * Retrieve a message fragment by fragment ID. 1111 * Retrieve a message fragment range by fragment ID.
1041 * 1112 *
1042 * @see GNUNET_PSYCSTORE_fragment_get() 1113 * @see GNUNET_PSYCSTORE_fragment_get()
1043 * 1114 *
@@ -1046,36 +1117,29 @@ fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
1046static int 1117static int
1047fragment_get (void *cls, 1118fragment_get (void *cls,
1048 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1119 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1049 uint64_t fragment_id, 1120 uint64_t first_fragment_id,
1121 uint64_t last_fragment_id,
1122 uint64_t *returned_fragments,
1050 GNUNET_PSYCSTORE_FragmentCallback cb, 1123 GNUNET_PSYCSTORE_FragmentCallback cb,
1051 void *cb_cls) 1124 void *cb_cls)
1052{ 1125{
1053 struct Plugin *plugin = cls; 1126 struct Plugin *plugin = cls;
1054 sqlite3_stmt *stmt = plugin->select_fragment; 1127 sqlite3_stmt *stmt = plugin->select_fragments;
1055 int ret = GNUNET_SYSERR; 1128 int ret = GNUNET_SYSERR;
1129 *returned_fragments = 0;
1056 1130
1057 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1131 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1058 sizeof (*channel_key), 1132 sizeof (*channel_key),
1059 SQLITE_STATIC) 1133 SQLITE_STATIC)
1060 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_id)) 1134 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
1135 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
1061 { 1136 {
1062 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1137 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1063 "sqlite3_bind"); 1138 "sqlite3_bind");
1064 } 1139 }
1065 else 1140 else
1066 { 1141 {
1067 switch (sqlite3_step (stmt)) 1142 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1068 {
1069 case SQLITE_DONE:
1070 ret = GNUNET_NO;
1071 break;
1072 case SQLITE_ROW:
1073 ret = fragment_row (stmt, cb, cb_cls);
1074 break;
1075 default:
1076 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1077 "sqlite3_step");
1078 }
1079 } 1143 }
1080 1144
1081 if (SQLITE_OK != sqlite3_reset (stmt)) 1145 if (SQLITE_OK != sqlite3_reset (stmt))
@@ -1087,8 +1151,52 @@ fragment_get (void *cls,
1087 return ret; 1151 return ret;
1088} 1152}
1089 1153
1154
1090/** 1155/**
1091 * Retrieve all fragments of a message. 1156 * Retrieve a message fragment range by fragment ID.
1157 *
1158 * @see GNUNET_PSYCSTORE_fragment_get_latest()
1159 *
1160 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1161 */
1162static int
1163fragment_get_latest (void *cls,
1164 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1165 uint64_t fragment_limit,
1166 uint64_t *returned_fragments,
1167 GNUNET_PSYCSTORE_FragmentCallback cb,
1168 void *cb_cls)
1169{
1170 struct Plugin *plugin = cls;
1171 sqlite3_stmt *stmt = plugin->select_latest_fragments;
1172 int ret = GNUNET_SYSERR;
1173 *returned_fragments = 0;
1174
1175 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1176 sizeof (*channel_key),
1177 SQLITE_STATIC)
1178 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
1179 {
1180 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1181 "sqlite3_bind");
1182 }
1183 else
1184 {
1185 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1186 }
1187
1188 if (SQLITE_OK != sqlite3_reset (stmt))
1189 {
1190 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1191 "sqlite3_reset");
1192 }
1193
1194 return ret;
1195}
1196
1197
1198/**
1199 * Retrieve all fragments of a message ID range.
1092 * 1200 *
1093 * @see GNUNET_PSYCSTORE_message_get() 1201 * @see GNUNET_PSYCSTORE_message_get()
1094 * 1202 *
@@ -1097,48 +1205,29 @@ fragment_get (void *cls,
1097static int 1205static int
1098message_get (void *cls, 1206message_get (void *cls,
1099 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1207 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1100 uint64_t message_id, 1208 uint64_t first_message_id,
1209 uint64_t last_message_id,
1101 uint64_t *returned_fragments, 1210 uint64_t *returned_fragments,
1102 GNUNET_PSYCSTORE_FragmentCallback cb, 1211 GNUNET_PSYCSTORE_FragmentCallback cb,
1103 void *cb_cls) 1212 void *cb_cls)
1104{ 1213{
1105 struct Plugin *plugin = cls; 1214 struct Plugin *plugin = cls;
1106 sqlite3_stmt *stmt = plugin->select_message; 1215 sqlite3_stmt *stmt = plugin->select_messages;
1107 int ret = GNUNET_SYSERR; 1216 int ret = GNUNET_SYSERR;
1108 *returned_fragments = 0; 1217 *returned_fragments = 0;
1109 1218
1110 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, 1219 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1111 sizeof (*channel_key), 1220 sizeof (*channel_key),
1112 SQLITE_STATIC) 1221 SQLITE_STATIC)
1113 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)) 1222 || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
1223 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id))
1114 { 1224 {
1115 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 1225 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1116 "sqlite3_bind"); 1226 "sqlite3_bind");
1117 } 1227 }
1118 else 1228 else
1119 { 1229 {
1120 int sql_ret; 1230 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1121 do
1122 {
1123 sql_ret = sqlite3_step (stmt);
1124 switch (sql_ret)
1125 {
1126 case SQLITE_DONE:
1127 if (ret != GNUNET_OK)
1128 ret = GNUNET_NO;
1129 break;
1130 case SQLITE_ROW:
1131 ret = fragment_row (stmt, cb, cb_cls);
1132 (*returned_fragments)++;
1133 if (ret != GNUNET_YES)
1134 sql_ret = SQLITE_DONE;
1135 break;
1136 default:
1137 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1138 "sqlite3_step");
1139 }
1140 }
1141 while (sql_ret == SQLITE_ROW);
1142 } 1231 }
1143 1232
1144 if (SQLITE_OK != sqlite3_reset (stmt)) 1233 if (SQLITE_OK != sqlite3_reset (stmt))
@@ -1150,6 +1239,53 @@ message_get (void *cls,
1150 return ret; 1239 return ret;
1151} 1240}
1152 1241
1242
1243/**
1244 * Retrieve all fragments of the latest messages.
1245 *
1246 * @see GNUNET_PSYCSTORE_message_get_latest()
1247 *
1248 * @return #GNUNET_OK on success, else #GNUNET_SYSERR
1249 */
1250static int
1251message_get_latest (void *cls,
1252 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1253 uint64_t message_limit,
1254 uint64_t *returned_fragments,
1255 GNUNET_PSYCSTORE_FragmentCallback cb,
1256 void *cb_cls)
1257{
1258 struct Plugin *plugin = cls;
1259 sqlite3_stmt *stmt = plugin->select_latest_messages;
1260 int ret = GNUNET_SYSERR;
1261 *returned_fragments = 0;
1262
1263 if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
1264 sizeof (*channel_key),
1265 SQLITE_STATIC)
1266 || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
1267 sizeof (*channel_key),
1268 SQLITE_STATIC)
1269 || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
1270 {
1271 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1272 "sqlite3_bind");
1273 }
1274 else
1275 {
1276 ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
1277 }
1278
1279 if (SQLITE_OK != sqlite3_reset (stmt))
1280 {
1281 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
1282 "sqlite3_reset");
1283 }
1284
1285 return ret;
1286}
1287
1288
1153/** 1289/**
1154 * Retrieve a fragment of message specified by its message ID and fragment 1290 * Retrieve a fragment of message specified by its message ID and fragment
1155 * offset. 1291 * offset.
@@ -1777,7 +1913,9 @@ libgnunet_plugin_psycstore_sqlite_init (void *cls)
1777 api->fragment_store = &fragment_store; 1913 api->fragment_store = &fragment_store;
1778 api->message_add_flags = &message_add_flags; 1914 api->message_add_flags = &message_add_flags;
1779 api->fragment_get = &fragment_get; 1915 api->fragment_get = &fragment_get;
1916 api->fragment_get_latest = &fragment_get_latest;
1780 api->message_get = &message_get; 1917 api->message_get = &message_get;
1918 api->message_get_latest = &message_get_latest;
1781 api->message_get_fragment = &message_get_fragment; 1919 api->message_get_fragment = &message_get_fragment;
1782 api->counters_message_get = &counters_message_get; 1920 api->counters_message_get = &counters_message_get;
1783 api->counters_state_get = &counters_state_get; 1921 api->counters_state_get = &counters_state_get;
diff --git a/src/psycstore/psycstore.h b/src/psycstore/psycstore.h
index 17905f422..e6a82848a 100644
--- a/src/psycstore/psycstore.h
+++ b/src/psycstore/psycstore.h
@@ -42,15 +42,17 @@ struct OperationResult
42 */ 42 */
43 struct GNUNET_MessageHeader header; 43 struct GNUNET_MessageHeader header;
44 44
45 uint32_t reserved GNUNET_PACKED;
46
45 /** 47 /**
46 * Operation ID. 48 * Operation ID.
47 */ 49 */
48 uint32_t op_id GNUNET_PACKED; 50 uint64_t op_id GNUNET_PACKED;
49 51
50 /** 52 /**
51 * Status code for the operation. 53 * Status code for the operation.
52 */ 54 */
53 int64_t result_code GNUNET_PACKED; 55 uint64_t result_code GNUNET_PACKED;
54 56
55 /* followed by 0-terminated error message (on error) */ 57 /* followed by 0-terminated error message (on error) */
56 58
@@ -70,9 +72,17 @@ struct CountersResult
70 struct GNUNET_MessageHeader header; 72 struct GNUNET_MessageHeader header;
71 73
72 /** 74 /**
75 * Status code for the operation:
76 * #GNUNET_OK: success, counter values are returned.
77 * #GNUNET_NO: no message has been sent to the channel yet.
78 * #GNUNET_SYSERR: an error occurred.
79 */
80 uint32_t result_code GNUNET_PACKED;
81
82 /**
73 * Operation ID. 83 * Operation ID.
74 */ 84 */
75 uint32_t op_id GNUNET_PACKED; 85 uint64_t op_id GNUNET_PACKED;
76 86
77 uint64_t max_fragment_id GNUNET_PACKED; 87 uint64_t max_fragment_id GNUNET_PACKED;
78 88
@@ -81,14 +91,6 @@ struct CountersResult
81 uint64_t max_group_generation GNUNET_PACKED; 91 uint64_t max_group_generation GNUNET_PACKED;
82 92
83 uint64_t max_state_message_id GNUNET_PACKED; 93 uint64_t max_state_message_id GNUNET_PACKED;
84
85 /**
86 * Status code for the operation:
87 * #GNUNET_OK: success, counter values are returned.
88 * #GNUNET_NO: no message has been sent to the channel yet.
89 * #GNUNET_SYSERR: an error occurred.
90 */
91 int32_t result_code GNUNET_PACKED;
92}; 94};
93 95
94 96
@@ -102,15 +104,14 @@ struct FragmentResult
102 */ 104 */
103 struct GNUNET_MessageHeader header; 105 struct GNUNET_MessageHeader header;
104 106
107 uint32_t psycstore_flags GNUNET_PACKED;
108
105 /** 109 /**
106 * Operation ID. 110 * Operation ID.
107 */ 111 */
108 uint32_t op_id GNUNET_PACKED; 112 uint64_t op_id GNUNET_PACKED;
109
110 uint32_t psycstore_flags GNUNET_PACKED;
111
112 /* followed by GNUNET_MULTICAST_MessageHeader */
113 113
114 /* Followed by GNUNET_MULTICAST_MessageHeader */
114}; 115};
115 116
116 117
@@ -124,14 +125,16 @@ struct StateResult
124 */ 125 */
125 struct GNUNET_MessageHeader header; 126 struct GNUNET_MessageHeader header;
126 127
128 uint16_t name_size GNUNET_PACKED;
129
130 uint16_t reserved GNUNET_PACKED;
131
127 /** 132 /**
128 * Operation ID. 133 * Operation ID.
129 */ 134 */
130 uint32_t op_id GNUNET_PACKED; 135 uint64_t op_id GNUNET_PACKED;
131
132 uint16_t name_size GNUNET_PACKED;
133 136
134 /* followed by name and value */ 137 /* Followed by name and value */
135}; 138};
136 139
137 140
@@ -142,13 +145,14 @@ struct OperationRequest
142{ 145{
143 struct GNUNET_MessageHeader header; 146 struct GNUNET_MessageHeader header;
144 147
148 uint32_t reserved GNUNET_PACKED;
149
145 /** 150 /**
146 * Operation ID. 151 * Operation ID.
147 */ 152 */
148 uint32_t op_id GNUNET_PACKED; 153 uint64_t op_id GNUNET_PACKED;
149 154
150 struct GNUNET_CRYPTO_EddsaPublicKey channel_key; 155 struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
151
152}; 156};
153 157
154 158
@@ -162,10 +166,12 @@ struct MembershipStoreRequest
162 */ 166 */
163 struct GNUNET_MessageHeader header; 167 struct GNUNET_MessageHeader header;
164 168
169 uint32_t reserved GNUNET_PACKED;
170
165 /** 171 /**
166 * Operation ID. 172 * Operation ID.
167 */ 173 */
168 uint32_t op_id GNUNET_PACKED; 174 uint64_t op_id GNUNET_PACKED;
169 175
170 /** 176 /**
171 * Channel's public key. 177 * Channel's public key.
@@ -177,9 +183,9 @@ struct MembershipStoreRequest
177 */ 183 */
178 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; 184 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
179 185
180 uint64_t announced_at; 186 uint64_t announced_at GNUNET_PACKED;
181 uint64_t effective_since; 187 uint64_t effective_since GNUNET_PACKED;
182 uint64_t group_generation; 188 uint64_t group_generation GNUNET_PACKED;
183 uint8_t did_join; 189 uint8_t did_join;
184}; 190};
185 191
@@ -194,10 +200,12 @@ struct MembershipTestRequest
194 */ 200 */
195 struct GNUNET_MessageHeader header; 201 struct GNUNET_MessageHeader header;
196 202
203 uint32_t reserved GNUNET_PACKED;
204
197 /** 205 /**
198 * Operation ID. 206 * Operation ID.
199 */ 207 */
200 uint32_t op_id GNUNET_PACKED; 208 uint64_t op_id GNUNET_PACKED;
201 209
202 /** 210 /**
203 * Channel's public key. 211 * Channel's public key.
@@ -226,9 +234,9 @@ struct FragmentStoreRequest
226 struct GNUNET_MessageHeader header; 234 struct GNUNET_MessageHeader header;
227 235
228 /** 236 /**
229 * Operation ID. 237 * enum GNUNET_PSYCSTORE_MessageFlags
230 */ 238 */
231 uint32_t op_id GNUNET_PACKED; 239 uint32_t psycstore_flags GNUNET_PACKED;
232 240
233 /** 241 /**
234 * Channel's public key. 242 * Channel's public key.
@@ -236,9 +244,9 @@ struct FragmentStoreRequest
236 struct GNUNET_CRYPTO_EddsaPublicKey channel_key; 244 struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
237 245
238 /** 246 /**
239 * enum GNUNET_PSYCSTORE_MessageFlags 247 * Operation ID.
240 */ 248 */
241 uint32_t psycstore_flags GNUNET_PACKED; 249 uint64_t op_id;
242 250
243 /* Followed by fragment */ 251 /* Followed by fragment */
244}; 252};
@@ -254,10 +262,12 @@ struct FragmentGetRequest
254 */ 262 */
255 struct GNUNET_MessageHeader header; 263 struct GNUNET_MessageHeader header;
256 264
265 uint32_t reserved GNUNET_PACKED;
266
257 /** 267 /**
258 * Operation ID. 268 * Operation ID.
259 */ 269 */
260 uint32_t op_id GNUNET_PACKED; 270 uint64_t op_id GNUNET_PACKED;
261 271
262 /** 272 /**
263 * Channel's public key. 273 * Channel's public key.
@@ -270,9 +280,19 @@ struct FragmentGetRequest
270 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; 280 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
271 281
272 /** 282 /**
273 * Fragment ID to request. 283 * First fragment ID to request.
274 */ 284 */
275 uint64_t fragment_id GNUNET_PACKED; 285 uint64_t first_fragment_id GNUNET_PACKED;
286
287 /**
288 * Last fragment ID to request.
289 */
290 uint64_t last_fragment_id GNUNET_PACKED;
291
292 /**
293 * Maximum number of fragments to retrieve.
294 */
295 uint64_t fragment_limit GNUNET_PACKED;
276 296
277 /** 297 /**
278 * Do membership test with @a slave_key before returning fragment? 298 * Do membership test with @a slave_key before returning fragment?
@@ -292,10 +312,12 @@ struct MessageGetRequest
292 */ 312 */
293 struct GNUNET_MessageHeader header; 313 struct GNUNET_MessageHeader header;
294 314
315 uint32_t reserved GNUNET_PACKED;
316
295 /** 317 /**
296 * Operation ID. 318 * Operation ID.
297 */ 319 */
298 uint32_t op_id GNUNET_PACKED; 320 uint64_t op_id GNUNET_PACKED;
299 321
300 /** 322 /**
301 * Channel's public key. 323 * Channel's public key.
@@ -308,9 +330,19 @@ struct MessageGetRequest
308 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; 330 struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
309 331
310 /** 332 /**
311 * Message ID to request. 333 * First message ID to request.
312 */ 334 */
313 uint64_t message_id GNUNET_PACKED; 335 uint64_t first_message_id GNUNET_PACKED;
336
337 /**
338 * Last message ID to request.
339 */
340 uint64_t last_message_id GNUNET_PACKED;
341
342 /**
343 * Maximum number of messages to retrieve.
344 */
345 uint64_t message_limit GNUNET_PACKED;
314 346
315 /** 347 /**
316 * Do membership test with @a slave_key before returning fragment? 348 * Do membership test with @a slave_key before returning fragment?
@@ -330,10 +362,12 @@ struct MessageGetFragmentRequest
330 */ 362 */
331 struct GNUNET_MessageHeader header; 363 struct GNUNET_MessageHeader header;
332 364
365 uint32_t reserved GNUNET_PACKED;
366
333 /** 367 /**
334 * Operation ID. 368 * Operation ID.
335 */ 369 */
336 uint32_t op_id GNUNET_PACKED; 370 uint64_t op_id GNUNET_PACKED;
337 371
338 /** 372 /**
339 * Channel's public key. 373 * Channel's public key.
@@ -373,10 +407,12 @@ struct StateHashUpdateRequest
373 */ 407 */
374 struct GNUNET_MessageHeader header; 408 struct GNUNET_MessageHeader header;
375 409
410 uint32_t reserved GNUNET_PACKED;
411
376 /** 412 /**
377 * Operation ID. 413 * Operation ID.
378 */ 414 */
379 uint32_t op_id GNUNET_PACKED; 415 uint64_t op_id GNUNET_PACKED;
380 416
381 /** 417 /**
382 * Channel's public key. 418 * Channel's public key.
@@ -405,20 +441,6 @@ struct StateModifyRequest
405 struct GNUNET_MessageHeader header; 441 struct GNUNET_MessageHeader header;
406 442
407 /** 443 /**
408 * Operation ID.
409 */
410 uint32_t op_id GNUNET_PACKED;
411
412 /**
413 * Channel's public key.
414 */
415 struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
416
417 uint64_t message_id GNUNET_PACKED;
418
419 uint64_t state_delta GNUNET_PACKED;
420
421 /**
422 * Size of name, including NUL terminator. 444 * Size of name, including NUL terminator.
423 */ 445 */
424 uint16_t name_size GNUNET_PACKED; 446 uint16_t name_size GNUNET_PACKED;
@@ -433,6 +455,20 @@ struct StateModifyRequest
433 */ 455 */
434 uint8_t oper; 456 uint8_t oper;
435 457
458 /**
459 * Operation ID.
460 */
461 uint64_t op_id GNUNET_PACKED;
462
463 /**
464 * Channel's public key.
465 */
466 struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
467
468 uint64_t message_id GNUNET_PACKED;
469
470 uint64_t state_delta GNUNET_PACKED;
471
436 /* Followed by NUL-terminated name, then the value. */ 472 /* Followed by NUL-terminated name, then the value. */
437}; 473};
438 474
@@ -448,26 +484,28 @@ struct StateSyncRequest
448 struct GNUNET_MessageHeader header; 484 struct GNUNET_MessageHeader header;
449 485
450 /** 486 /**
451 * Operation ID. 487 * Size of name, including NUL terminator.
452 */ 488 */
453 uint32_t op_id GNUNET_PACKED; 489 uint16_t name_size GNUNET_PACKED;
454 490
455 /** 491 /**
456 * Channel's public key. 492 * OR'd StateOpFlags
457 */ 493 */
458 struct GNUNET_CRYPTO_EddsaPublicKey channel_key; 494 uint8_t flags;
495
496 uint8_t reserved;
459 497
460 uint64_t message_id GNUNET_PACKED; 498 uint64_t message_id GNUNET_PACKED;
461 499
462 /** 500 /**
463 * Size of name, including NUL terminator. 501 * Operation ID.
464 */ 502 */
465 uint16_t name_size GNUNET_PACKED; 503 uint64_t op_id GNUNET_PACKED;
466 504
467 /** 505 /**
468 * OR'd StateOpFlags 506 * Channel's public key.
469 */ 507 */
470 uint8_t flags; 508 struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
471 509
472 /* Followed by NUL-terminated name, then the value. */ 510 /* Followed by NUL-terminated name, then the value. */
473}; 511};
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c
index 9df55888d..9ef1fb61a 100644
--- a/src/psycstore/psycstore_api.c
+++ b/src/psycstore/psycstore_api.c
@@ -25,6 +25,8 @@
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 */ 26 */
27 27
28#include <inttypes.h>
29
28#include "platform.h" 30#include "platform.h"
29#include "gnunet_util_lib.h" 31#include "gnunet_util_lib.h"
30#include "gnunet_constants.h" 32#include "gnunet_constants.h"
@@ -76,7 +78,7 @@ struct GNUNET_PSYCSTORE_OperationHandle
76 /** 78 /**
77 * Operation ID. 79 * Operation ID.
78 */ 80 */
79 uint32_t op_id; 81 uint64_t op_id;
80 82
81 /** 83 /**
82 * Message to send to the PSYCstore service. 84 * Message to send to the PSYCstore service.
@@ -137,15 +139,14 @@ struct GNUNET_PSYCSTORE_Handle
137 struct GNUNET_TIME_Relative reconnect_delay; 139 struct GNUNET_TIME_Relative reconnect_delay;
138 140
139 /** 141 /**
140 * Are we polling for incoming messages right now? 142 * Last operation ID used.
141 */ 143 */
142 int in_receive; 144 uint64_t last_op_id;
143 145
144 /** 146 /**
145 * The last operation id used for a PSYCstore operation. 147 * Are we polling for incoming messages right now?
146 */ 148 */
147 uint32_t last_op_id_used; 149 uint8_t in_receive;
148
149}; 150};
150 151
151 152
@@ -155,10 +156,10 @@ struct GNUNET_PSYCSTORE_Handle
155 * @param h Handle to the PSYCstore service. 156 * @param h Handle to the PSYCstore service.
156 * @return next operation id to use 157 * @return next operation id to use
157 */ 158 */
158static uint32_t 159static uint64_t
159get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h) 160get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
160{ 161{
161 return h->last_op_id_used++; 162 return h->last_op_id++;
162} 163}
163 164
164 165
@@ -168,7 +169,7 @@ get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
168 * @return OperationHandle if found, or NULL otherwise. 169 * @return OperationHandle if found, or NULL otherwise.
169 */ 170 */
170static struct GNUNET_PSYCSTORE_OperationHandle * 171static struct GNUNET_PSYCSTORE_OperationHandle *
171find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint32_t op_id) 172find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint64_t op_id)
172{ 173{
173 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; 174 struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
174 while (NULL != op) 175 while (NULL != op)
@@ -284,19 +285,20 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
284 if (size == sizeof (struct OperationResult)) 285 if (size == sizeof (struct OperationResult))
285 str = NULL; 286 str = NULL;
286 287
287 op = find_op_by_id (h, ntohl (opres->op_id)); 288 op = find_op_by_id (h, GNUNET_ntohll (opres->op_id));
288 if (NULL == op) 289 if (NULL == op)
289 { 290 {
290 LOG (GNUNET_ERROR_TYPE_DEBUG, 291 LOG (GNUNET_ERROR_TYPE_DEBUG,
291 "No callback registered for operation with ID %ld.\n", 292 "No callback registered for operation with ID %" PRIu64 ".\n",
292 type, ntohl (opres->op_id)); 293 type, GNUNET_ntohll (opres->op_id));
293 } 294 }
294 else 295 else
295 { 296 {
296 LOG (GNUNET_ERROR_TYPE_DEBUG, 297 LOG (GNUNET_ERROR_TYPE_DEBUG,
297 "Received result message (type %d) with operation ID: %ld\n", 298 "Received result message (type %d) with operation ID: %" PRIu64 "\n",
298 type, op->op_id); 299 type, op->op_id);
299 300
301 int64_t result_code = GNUNET_ntohll (opres->result_code) + INT64_MIN;
300 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); 302 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
301 if (NULL != op->res_cb) 303 if (NULL != op->res_cb)
302 { 304 {
@@ -307,19 +309,19 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
307 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY: 309 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY:
308 smreq = (const struct StateModifyRequest *) op->msg; 310 smreq = (const struct StateModifyRequest *) op->msg;
309 if (!(smreq->flags & STATE_OP_LAST 311 if (!(smreq->flags & STATE_OP_LAST
310 || GNUNET_OK != ntohl (opres->result_code))) 312 || GNUNET_OK != result_code))
311 op->res_cb = NULL; 313 op->res_cb = NULL;
312 break; 314 break;
313 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC: 315 case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
314 ssreq = (const struct StateSyncRequest *) op->msg; 316 ssreq = (const struct StateSyncRequest *) op->msg;
315 if (!(ssreq->flags & STATE_OP_LAST 317 if (!(ssreq->flags & STATE_OP_LAST
316 || GNUNET_OK != ntohl (opres->result_code))) 318 || GNUNET_OK != result_code))
317 op->res_cb = NULL; 319 op->res_cb = NULL;
318 break; 320 break;
319 } 321 }
320 } 322 }
321 if (NULL != op->res_cb) 323 if (NULL != op->res_cb)
322 op->res_cb (op->cls, ntohl (opres->result_code), str); 324 op->res_cb (op->cls, result_code, str);
323 GNUNET_free (op); 325 GNUNET_free (op);
324 } 326 }
325 break; 327 break;
@@ -338,19 +340,20 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
338 340
339 cres = (const struct CountersResult *) msg; 341 cres = (const struct CountersResult *) msg;
340 342
341 op = find_op_by_id (h, ntohl (cres->op_id)); 343 op = find_op_by_id (h, GNUNET_ntohll (cres->op_id));
342 if (NULL == op) 344 if (NULL == op)
343 { 345 {
344 LOG (GNUNET_ERROR_TYPE_DEBUG, 346 LOG (GNUNET_ERROR_TYPE_DEBUG,
345 "No callback registered for operation with ID %ld.\n", 347 "No callback registered for operation with ID %" PRIu64 ".\n",
346 type, ntohl (cres->op_id)); 348 type, GNUNET_ntohll (cres->op_id));
347 } 349 }
348 else 350 else
349 { 351 {
350 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); 352 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
351 if (NULL != op->data_cb) 353 if (NULL != op->data_cb)
352 ((GNUNET_PSYCSTORE_CountersCallback) 354 ((GNUNET_PSYCSTORE_CountersCallback)
353 op->data_cb) (op->cls, ntohl (cres->result_code), 355 op->data_cb) (op->cls,
356 ntohl (cres->result_code) + INT32_MIN,
354 GNUNET_ntohll (cres->max_fragment_id), 357 GNUNET_ntohll (cres->max_fragment_id),
355 GNUNET_ntohll (cres->max_message_id), 358 GNUNET_ntohll (cres->max_message_id),
356 GNUNET_ntohll (cres->max_group_generation), 359 GNUNET_ntohll (cres->max_group_generation),
@@ -386,12 +389,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
386 return; 389 return;
387 } 390 }
388 391
389 op = find_op_by_id (h, ntohl (fres->op_id)); 392 op = find_op_by_id (h, GNUNET_ntohll (fres->op_id));
390 if (NULL == op) 393 if (NULL == op)
391 { 394 {
392 LOG (GNUNET_ERROR_TYPE_DEBUG, 395 LOG (GNUNET_ERROR_TYPE_DEBUG,
393 "No callback registered for operation with ID %ld.\n", 396 "No callback registered for operation with ID %" PRIu64 ".\n",
394 type, ntohl (fres->op_id)); 397 type, GNUNET_ntohll (fres->op_id));
395 } 398 }
396 else 399 else
397 { 400 {
@@ -427,12 +430,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
427 return; 430 return;
428 } 431 }
429 432
430 op = find_op_by_id (h, ntohl (sres->op_id)); 433 op = find_op_by_id (h, GNUNET_ntohll (sres->op_id));
431 if (NULL == op) 434 if (NULL == op)
432 { 435 {
433 LOG (GNUNET_ERROR_TYPE_DEBUG, 436 LOG (GNUNET_ERROR_TYPE_DEBUG,
434 "No callback registered for operation with ID %ld.\n", 437 "No callback registered for operation with ID %" PRIu64 ".\n",
435 type, ntohl (sres->op_id)); 438 type, GNUNET_ntohll (sres->op_id));
436 } 439 }
437 else 440 else
438 { 441 {
@@ -479,7 +482,7 @@ send_next_message (void *cls, size_t size, void *buf)
479 return 0; 482 return 0;
480 } 483 }
481 LOG (GNUNET_ERROR_TYPE_DEBUG, 484 LOG (GNUNET_ERROR_TYPE_DEBUG,
482 "Sending message of type %d to PSYCstore service. ID: %u\n", 485 "Sending message of type %d to PSYCstore service. ID: %" PRIu64 "\n",
483 ntohs (op->msg->type), op->op_id); 486 ntohs (op->msg->type), op->op_id);
484 memcpy (buf, op->msg, ret); 487 memcpy (buf, op->msg, ret);
485 488
@@ -682,8 +685,8 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
682 : effective_since == 0); 685 : effective_since == 0);
683 686
684 struct MembershipStoreRequest *req; 687 struct MembershipStoreRequest *req;
685 struct GNUNET_PSYCSTORE_OperationHandle *op 688 struct GNUNET_PSYCSTORE_OperationHandle *
686 = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 689 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
687 op->h = h; 690 op->h = h;
688 op->res_cb = rcb; 691 op->res_cb = rcb;
689 op->cls = rcb_cls; 692 op->cls = rcb_cls;
@@ -700,7 +703,7 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
700 req->group_generation = GNUNET_htonll (group_generation); 703 req->group_generation = GNUNET_htonll (group_generation);
701 704
702 op->op_id = get_next_op_id (h); 705 op->op_id = get_next_op_id (h);
703 req->op_id = htonl (op->op_id); 706 req->op_id = GNUNET_htonll (op->op_id);
704 707
705 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 708 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
706 transmit_next (h); 709 transmit_next (h);
@@ -746,8 +749,8 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
746 void *rcb_cls) 749 void *rcb_cls)
747{ 750{
748 struct MembershipTestRequest *req; 751 struct MembershipTestRequest *req;
749 struct GNUNET_PSYCSTORE_OperationHandle *op 752 struct GNUNET_PSYCSTORE_OperationHandle *
750 = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 753 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
751 op->h = h; 754 op->h = h;
752 op->res_cb = rcb; 755 op->res_cb = rcb;
753 op->cls = rcb_cls; 756 op->cls = rcb_cls;
@@ -762,7 +765,7 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
762 req->group_generation = GNUNET_htonll (group_generation); 765 req->group_generation = GNUNET_htonll (group_generation);
763 766
764 op->op_id = get_next_op_id (h); 767 op->op_id = get_next_op_id (h);
765 req->op_id = htonl (op->op_id); 768 req->op_id = GNUNET_htonll (op->op_id);
766 769
767 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 770 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
768 transmit_next (h); 771 transmit_next (h);
@@ -794,8 +797,8 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
794{ 797{
795 uint16_t size = ntohs (msg->header.size); 798 uint16_t size = ntohs (msg->header.size);
796 struct FragmentStoreRequest *req; 799 struct FragmentStoreRequest *req;
797 struct GNUNET_PSYCSTORE_OperationHandle *op 800 struct GNUNET_PSYCSTORE_OperationHandle *
798 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size); 801 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
799 op->h = h; 802 op->h = h;
800 op->res_cb = rcb; 803 op->res_cb = rcb;
801 op->cls = rcb_cls; 804 op->cls = rcb_cls;
@@ -809,7 +812,7 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
809 memcpy (&req[1], msg, size); 812 memcpy (&req[1], msg, size);
810 813
811 op->op_id = get_next_op_id (h); 814 op->op_id = get_next_op_id (h);
812 req->op_id = htonl (op->op_id); 815 req->op_id = GNUNET_htonll (op->op_id);
813 816
814 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 817 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
815 transmit_next (h); 818 transmit_next (h);
@@ -819,7 +822,7 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
819 822
820 823
821/** 824/**
822 * Retrieve a message fragment by fragment ID. 825 * Retrieve message fragments by fragment ID range.
823 * 826 *
824 * @param h 827 * @param h
825 * Handle for the PSYCstore. 828 * Handle for the PSYCstore.
@@ -829,9 +832,15 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
829 * The slave requesting the fragment. If not NULL, a membership test is 832 * The slave requesting the fragment. If not NULL, a membership test is
830 * performed first and the fragment is only returned if the slave has 833 * performed first and the fragment is only returned if the slave has
831 * access to it. 834 * access to it.
832 * @param fragment_id 835 * @param first_fragment_id
833 * Fragment ID to retrieve. Use 0 to get the latest message fragment. 836 * First fragment ID to retrieve.
834 * @param fcb 837 * Use 0 to get the latest message fragment.
838 * @param last_fragment_id
839 * Last consecutive fragment ID to retrieve.
840 * Use 0 to get the latest message fragment.
841 * @param fragment_limit
842 * Maximum number of fragments to retrieve.
843 * @param fragment_cb
835 * Callback to call with the retrieved fragments. 844 * Callback to call with the retrieved fragments.
836 * @param rcb 845 * @param rcb
837 * Callback to call with the result of the operation. 846 * Callback to call with the result of the operation.
@@ -844,16 +853,85 @@ struct GNUNET_PSYCSTORE_OperationHandle *
844GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, 853GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
845 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 854 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
846 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 855 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
847 uint64_t fragment_id, 856 uint64_t first_fragment_id,
848 GNUNET_PSYCSTORE_FragmentCallback fcb, 857 uint64_t last_fragment_id,
858 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
849 GNUNET_PSYCSTORE_ResultCallback rcb, 859 GNUNET_PSYCSTORE_ResultCallback rcb,
850 void *cls) 860 void *cls)
851{ 861{
852 struct FragmentGetRequest *req; 862 struct FragmentGetRequest *req;
853 struct GNUNET_PSYCSTORE_OperationHandle *op 863 struct GNUNET_PSYCSTORE_OperationHandle *
854 = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 864 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
865 op->h = h;
866 op->data_cb = (DataCallback) fragment_cb;
867 op->res_cb = rcb;
868 op->cls = cls;
869
870 req = (struct FragmentGetRequest *) &op[1];
871 op->msg = (struct GNUNET_MessageHeader *) req;
872 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
873 req->header.size = htons (sizeof (*req));
874 req->channel_key = *channel_key;
875 req->first_fragment_id = GNUNET_htonll (first_fragment_id);
876 req->last_fragment_id = GNUNET_htonll (last_fragment_id);
877 if (NULL != slave_key)
878 {
879 req->slave_key = *slave_key;
880 req->do_membership_test = GNUNET_YES;
881 }
882
883 op->op_id = get_next_op_id (h);
884 req->op_id = GNUNET_htonll (op->op_id);
885
886 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
887 transmit_next (h);
888
889 return op;
890}
891
892
893/**
894 * Retrieve latest message fragments.
895 *
896 * @param h
897 * Handle for the PSYCstore.
898 * @param channel_key
899 * The channel we are interested in.
900 * @param slave_key
901 * The slave requesting the fragment. If not NULL, a membership test is
902 * performed first and the fragment is only returned if the slave has
903 * access to it.
904 * @param first_fragment_id
905 * First fragment ID to retrieve.
906 * Use 0 to get the latest message fragment.
907 * @param last_fragment_id
908 * Last consecutive fragment ID to retrieve.
909 * Use 0 to get the latest message fragment.
910 * @param fragment_limit
911 * Maximum number of fragments to retrieve.
912 * @param fragment_cb
913 * Callback to call with the retrieved fragments.
914 * @param rcb
915 * Callback to call with the result of the operation.
916 * @param cls
917 * Closure for the callbacks.
918 *
919 * @return Handle that can be used to cancel the operation.
920 */
921struct GNUNET_PSYCSTORE_OperationHandle *
922GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
923 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
924 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
925 uint64_t fragment_limit,
926 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
927 GNUNET_PSYCSTORE_ResultCallback rcb,
928 void *cls)
929{
930 struct FragmentGetRequest *req;
931 struct GNUNET_PSYCSTORE_OperationHandle *
932 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
855 op->h = h; 933 op->h = h;
856 op->data_cb = (DataCallback) fcb; 934 op->data_cb = (DataCallback) fragment_cb;
857 op->res_cb = rcb; 935 op->res_cb = rcb;
858 op->cls = cls; 936 op->cls = cls;
859 937
@@ -862,7 +940,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
862 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); 940 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
863 req->header.size = htons (sizeof (*req)); 941 req->header.size = htons (sizeof (*req));
864 req->channel_key = *channel_key; 942 req->channel_key = *channel_key;
865 req->fragment_id = GNUNET_htonll (fragment_id); 943 req->fragment_limit = GNUNET_ntohll (fragment_limit);
866 if (NULL != slave_key) 944 if (NULL != slave_key)
867 { 945 {
868 req->slave_key = *slave_key; 946 req->slave_key = *slave_key;
@@ -870,7 +948,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
870 } 948 }
871 949
872 op->op_id = get_next_op_id (h); 950 op->op_id = get_next_op_id (h);
873 req->op_id = htonl (op->op_id); 951 req->op_id = GNUNET_htonll (op->op_id);
874 952
875 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 953 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
876 transmit_next (h); 954 transmit_next (h);
@@ -880,7 +958,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
880 958
881 959
882/** 960/**
883 * Retrieve all fragments of a message. 961 * Retrieve all fragments of messages in a message ID range.
884 * 962 *
885 * @param h 963 * @param h
886 * Handle for the PSYCstore. 964 * Handle for the PSYCstore.
@@ -890,9 +968,13 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
890 * The slave requesting the message. If not NULL, a membership test is 968 * The slave requesting the message. If not NULL, a membership test is
891 * performed first and the message is only returned if the slave has 969 * performed first and the message is only returned if the slave has
892 * access to it. 970 * access to it.
893 * @param message_id 971 * @param first_message_id
894 * Message ID to retrieve. Use 0 to get the latest message. 972 * First message ID to retrieve.
895 * @param fcb 973 * Use 0 to get the latest message.
974 * @param last_message_id
975 * Last consecutive message ID to retrieve.
976 * Use 0 to get the latest message.
977 * @param fragment_cb
896 * Callback to call with the retrieved fragments. 978 * Callback to call with the retrieved fragments.
897 * @param rcb 979 * @param rcb
898 * Callback to call with the result of the operation. 980 * Callback to call with the result of the operation.
@@ -905,16 +987,17 @@ struct GNUNET_PSYCSTORE_OperationHandle *
905GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, 987GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
906 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 988 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
907 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 989 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
908 uint64_t message_id, 990 uint64_t first_message_id,
909 GNUNET_PSYCSTORE_FragmentCallback fcb, 991 uint64_t last_message_id,
992 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
910 GNUNET_PSYCSTORE_ResultCallback rcb, 993 GNUNET_PSYCSTORE_ResultCallback rcb,
911 void *cls) 994 void *cls)
912{ 995{
913 struct MessageGetRequest *req; 996 struct MessageGetRequest *req;
914 struct GNUNET_PSYCSTORE_OperationHandle *op 997 struct GNUNET_PSYCSTORE_OperationHandle *
915 = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 998 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
916 op->h = h; 999 op->h = h;
917 op->data_cb = (DataCallback) fcb; 1000 op->data_cb = (DataCallback) fragment_cb;
918 op->res_cb = rcb; 1001 op->res_cb = rcb;
919 op->cls = cls; 1002 op->cls = cls;
920 1003
@@ -923,7 +1006,8 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
923 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); 1006 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
924 req->header.size = htons (sizeof (*req)); 1007 req->header.size = htons (sizeof (*req));
925 req->channel_key = *channel_key; 1008 req->channel_key = *channel_key;
926 req->message_id = GNUNET_htonll (message_id); 1009 req->first_message_id = GNUNET_htonll (first_message_id);
1010 req->last_message_id = GNUNET_htonll (last_message_id);
927 if (NULL != slave_key) 1011 if (NULL != slave_key)
928 { 1012 {
929 req->slave_key = *slave_key; 1013 req->slave_key = *slave_key;
@@ -931,7 +1015,68 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
931 } 1015 }
932 1016
933 op->op_id = get_next_op_id (h); 1017 op->op_id = get_next_op_id (h);
934 req->op_id = htonl (op->op_id); 1018 req->op_id = GNUNET_htonll (op->op_id);
1019
1020 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1021 transmit_next (h);
1022
1023 return op;
1024}
1025
1026
1027/**
1028 * Retrieve all fragments of the latest messages.
1029 *
1030 * @param h
1031 * Handle for the PSYCstore.
1032 * @param channel_key
1033 * The channel we are interested in.
1034 * @param slave_key
1035 * The slave requesting the message. If not NULL, a membership test is
1036 * performed first and the message is only returned if the slave has
1037 * access to it.
1038 * @param message_limit
1039 * Maximum number of messages to retrieve.
1040 * @param fragment_cb
1041 * Callback to call with the retrieved fragments.
1042 * @param rcb
1043 * Callback to call with the result of the operation.
1044 * @param cls
1045 * Closure for the callbacks.
1046 *
1047 * @return Handle that can be used to cancel the operation.
1048 */
1049struct GNUNET_PSYCSTORE_OperationHandle *
1050GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
1051 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1052 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1053 uint64_t message_limit,
1054 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1055 GNUNET_PSYCSTORE_ResultCallback rcb,
1056 void *cls)
1057{
1058 struct MessageGetRequest *req;
1059 struct GNUNET_PSYCSTORE_OperationHandle *
1060 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1061 op->h = h;
1062 op->data_cb = (DataCallback) fragment_cb;
1063 op->res_cb = rcb;
1064 op->cls = cls;
1065
1066 req = (struct MessageGetRequest *) &op[1];
1067 op->msg = (struct GNUNET_MessageHeader *) req;
1068 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1069 req->header.size = htons (sizeof (*req));
1070 req->channel_key = *channel_key;
1071 req->message_limit = GNUNET_ntohll (message_limit);
1072 if (NULL != slave_key)
1073 {
1074 req->slave_key = *slave_key;
1075 req->do_membership_test = GNUNET_YES;
1076 }
1077
1078 op->op_id = get_next_op_id (h);
1079 req->op_id = GNUNET_htonll (op->op_id);
935 1080
936 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1081 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
937 transmit_next (h); 1082 transmit_next (h);
@@ -956,9 +1101,9 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
956 * Message ID to retrieve. Use 0 to get the latest message. 1101 * Message ID to retrieve. Use 0 to get the latest message.
957 * @param fragment_offset 1102 * @param fragment_offset
958 * Offset of the fragment to retrieve. 1103 * Offset of the fragment to retrieve.
959 * @param fcb 1104 * @param fragment_cb
960 * Callback to call with the retrieved fragments. 1105 * Callback to call with the retrieved fragments.
961 * @param rcb 1106 * @param result_cb
962 * Callback to call with the result of the operation. 1107 * Callback to call with the result of the operation.
963 * @param cls 1108 * @param cls
964 * Closure for the callbacks. 1109 * Closure for the callbacks.
@@ -971,15 +1116,15 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
971 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 1116 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
972 uint64_t message_id, 1117 uint64_t message_id,
973 uint64_t fragment_offset, 1118 uint64_t fragment_offset,
974 GNUNET_PSYCSTORE_FragmentCallback fcb, 1119 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
975 GNUNET_PSYCSTORE_ResultCallback rcb, 1120 GNUNET_PSYCSTORE_ResultCallback rcb,
976 void *cls) 1121 void *cls)
977{ 1122{
978 struct MessageGetFragmentRequest *req; 1123 struct MessageGetFragmentRequest *req;
979 struct GNUNET_PSYCSTORE_OperationHandle *op 1124 struct GNUNET_PSYCSTORE_OperationHandle *
980 = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 1125 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
981 op->h = h; 1126 op->h = h;
982 op->data_cb = (DataCallback) fcb; 1127 op->data_cb = (DataCallback) fragment_cb;
983 op->res_cb = rcb; 1128 op->res_cb = rcb;
984 op->cls = cls; 1129 op->cls = cls;
985 1130
@@ -997,7 +1142,7 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
997 } 1142 }
998 1143
999 op->op_id = get_next_op_id (h); 1144 op->op_id = get_next_op_id (h);
1000 req->op_id = htonl (op->op_id); 1145 req->op_id = GNUNET_htonll (op->op_id);
1001 1146
1002 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1147 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1003 transmit_next (h); 1148 transmit_next (h);
@@ -1026,8 +1171,8 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
1026 void *ccb_cls) 1171 void *ccb_cls)
1027{ 1172{
1028 struct OperationRequest *req; 1173 struct OperationRequest *req;
1029 struct GNUNET_PSYCSTORE_OperationHandle *op 1174 struct GNUNET_PSYCSTORE_OperationHandle *
1030 = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 1175 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1031 op->h = h; 1176 op->h = h;
1032 op->data_cb = ccb; 1177 op->data_cb = ccb;
1033 op->cls = ccb_cls; 1178 op->cls = ccb_cls;
@@ -1039,7 +1184,7 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
1039 req->channel_key = *channel_key; 1184 req->channel_key = *channel_key;
1040 1185
1041 op->op_id = get_next_op_id (h); 1186 op->op_id = get_next_op_id (h);
1042 req->op_id = htonl (op->op_id); 1187 req->op_id = GNUNET_htonll (op->op_id);
1043 1188
1044 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1189 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1045 transmit_next (h); 1190 transmit_next (h);
@@ -1109,7 +1254,7 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
1109 memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); 1254 memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1110 1255
1111 op->op_id = get_next_op_id (h); 1256 op->op_id = get_next_op_id (h);
1112 req->op_id = htonl (op->op_id); 1257 req->op_id = GNUNET_htonll (op->op_id);
1113 1258
1114 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1259 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1115 transmit_next (h); 1260 transmit_next (h);
@@ -1175,7 +1320,7 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
1175 memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); 1320 memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
1176 1321
1177 op->op_id = get_next_op_id (h); 1322 op->op_id = get_next_op_id (h);
1178 req->op_id = htonl (op->op_id); 1323 req->op_id = GNUNET_htonll (op->op_id);
1179 1324
1180 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1325 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1181 transmit_next (h); 1326 transmit_next (h);
@@ -1204,8 +1349,8 @@ GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1204 void *rcb_cls) 1349 void *rcb_cls)
1205{ 1350{
1206 struct OperationRequest *req; 1351 struct OperationRequest *req;
1207 struct GNUNET_PSYCSTORE_OperationHandle *op 1352 struct GNUNET_PSYCSTORE_OperationHandle *
1208 = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 1353 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1209 op->h = h; 1354 op->h = h;
1210 op->res_cb = rcb; 1355 op->res_cb = rcb;
1211 op->cls = rcb_cls; 1356 op->cls = rcb_cls;
@@ -1217,7 +1362,7 @@ GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
1217 req->channel_key = *channel_key; 1362 req->channel_key = *channel_key;
1218 1363
1219 op->op_id = get_next_op_id (h); 1364 op->op_id = get_next_op_id (h);
1220 req->op_id = htonl (op->op_id); 1365 req->op_id = GNUNET_htonll (op->op_id);
1221 1366
1222 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1367 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1223 transmit_next (h); 1368 transmit_next (h);
@@ -1247,8 +1392,8 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1247 void *rcb_cls) 1392 void *rcb_cls)
1248{ 1393{
1249 struct StateHashUpdateRequest *req; 1394 struct StateHashUpdateRequest *req;
1250 struct GNUNET_PSYCSTORE_OperationHandle *op 1395 struct GNUNET_PSYCSTORE_OperationHandle *
1251 = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 1396 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
1252 op->h = h; 1397 op->h = h;
1253 op->res_cb = rcb; 1398 op->res_cb = rcb;
1254 op->cls = rcb_cls; 1399 op->cls = rcb_cls;
@@ -1261,7 +1406,7 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
1261 req->hash = *hash; 1406 req->hash = *hash;
1262 1407
1263 op->op_id = get_next_op_id (h); 1408 op->op_id = get_next_op_id (h);
1264 req->op_id = htonl (op->op_id); 1409 req->op_id = GNUNET_htonll (op->op_id);
1265 1410
1266 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1411 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1267 transmit_next (h); 1412 transmit_next (h);
@@ -1292,8 +1437,8 @@ GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1292{ 1437{
1293 size_t name_size = strlen (name) + 1; 1438 size_t name_size = strlen (name) + 1;
1294 struct OperationRequest *req; 1439 struct OperationRequest *req;
1295 struct GNUNET_PSYCSTORE_OperationHandle *op 1440 struct GNUNET_PSYCSTORE_OperationHandle *
1296 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); 1441 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1297 op->h = h; 1442 op->h = h;
1298 op->data_cb = (DataCallback) scb; 1443 op->data_cb = (DataCallback) scb;
1299 op->res_cb = rcb; 1444 op->res_cb = rcb;
@@ -1307,7 +1452,7 @@ GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
1307 memcpy (&req[1], name, name_size); 1452 memcpy (&req[1], name, name_size);
1308 1453
1309 op->op_id = get_next_op_id (h); 1454 op->op_id = get_next_op_id (h);
1310 req->op_id = htonl (op->op_id); 1455 req->op_id = GNUNET_htonll (op->op_id);
1311 1456
1312 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1457 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1313 transmit_next (h); 1458 transmit_next (h);
@@ -1339,8 +1484,8 @@ GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1339{ 1484{
1340 size_t name_size = strlen (name_prefix) + 1; 1485 size_t name_size = strlen (name_prefix) + 1;
1341 struct OperationRequest *req; 1486 struct OperationRequest *req;
1342 struct GNUNET_PSYCSTORE_OperationHandle *op 1487 struct GNUNET_PSYCSTORE_OperationHandle *
1343 = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); 1488 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
1344 op->h = h; 1489 op->h = h;
1345 op->data_cb = (DataCallback) scb; 1490 op->data_cb = (DataCallback) scb;
1346 op->res_cb = rcb; 1491 op->res_cb = rcb;
@@ -1354,7 +1499,7 @@ GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
1354 memcpy (&req[1], name_prefix, name_size); 1499 memcpy (&req[1], name_prefix, name_size);
1355 1500
1356 op->op_id = get_next_op_id (h); 1501 op->op_id = get_next_op_id (h);
1357 req->op_id = htonl (op->op_id); 1502 req->op_id = GNUNET_htonll (op->op_id);
1358 1503
1359 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1504 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1360 transmit_next (h); 1505 transmit_next (h);
diff --git a/src/psycstore/test_plugin_psycstore.c b/src/psycstore/test_plugin_psycstore.c
index 8267ddba8..a6c456fec 100644
--- a/src/psycstore/test_plugin_psycstore.c
+++ b/src/psycstore/test_plugin_psycstore.c
@@ -204,11 +204,17 @@ run (void *cls, char *const *args, const char *cfgfile,
204 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); 204 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
205 msg->header.size = htons (sizeof (*msg) + sizeof (channel_pub_key)); 205 msg->header.size = htons (sizeof (*msg) + sizeof (channel_pub_key));
206 206
207 uint64_t fragment_id = INT64_MAX - 1;
208 msg->fragment_id = GNUNET_htonll (fragment_id);
209
210 uint64_t message_id = INT64_MAX - 10;
211 msg->message_id = GNUNET_htonll (message_id);
212
213 uint64_t group_generation = INT64_MAX - 3;
214 msg->group_generation = GNUNET_htonll (group_generation);
215
207 msg->hop_counter = htonl (9); 216 msg->hop_counter = htonl (9);
208 msg->fragment_id = GNUNET_htonll (INT64_MAX - 1);
209 msg->fragment_offset = GNUNET_htonll (0); 217 msg->fragment_offset = GNUNET_htonll (0);
210 msg->message_id = GNUNET_htonll (INT64_MAX - 10);
211 msg->group_generation = GNUNET_htonll (INT64_MAX - 3);
212 msg->flags = htonl (GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT); 218 msg->flags = htonl (GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT);
213 219
214 memcpy (&msg[1], &channel_pub_key, sizeof (channel_pub_key)); 220 memcpy (&msg[1], &channel_pub_key, sizeof (channel_pub_key));
@@ -225,14 +231,19 @@ run (void *cls, char *const *args, const char *cfgfile,
225 fcls.msg[0] = msg; 231 fcls.msg[0] = msg;
226 fcls.flags[0] = GNUNET_PSYCSTORE_MESSAGE_STATE; 232 fcls.flags[0] = GNUNET_PSYCSTORE_MESSAGE_STATE;
227 233
228 GNUNET_assert (GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg, 234 GNUNET_assert (
229 fcls.flags[0])); 235 GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg,
236 fcls.flags[0]));
230 237
231 GNUNET_assert (GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key, 238 uint64_t ret_frags = 0;
232 GNUNET_ntohll (msg->fragment_id), 239 GNUNET_assert (
233 fragment_cb, &fcls)); 240 GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key,
241 fragment_id, fragment_id,
242 &ret_frags, fragment_cb, &fcls));
234 GNUNET_assert (fcls.n == 1); 243 GNUNET_assert (fcls.n == 1);
235 244
245 // FIXME: test fragment_get_latest and message_get_latest
246
236 fcls.n = 0; 247 fcls.n = 0;
237 248
238 GNUNET_assert ( 249 GNUNET_assert (
@@ -250,9 +261,10 @@ run (void *cls, char *const *args, const char *cfgfile,
250 fcls.n = 0; 261 fcls.n = 0;
251 fcls.flags[0] |= GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED; 262 fcls.flags[0] |= GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED;
252 263
253 GNUNET_assert (GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key, 264 GNUNET_assert (
254 GNUNET_ntohll (msg->fragment_id), 265 GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key,
255 fragment_cb, &fcls)); 266 fragment_id, fragment_id,
267 &ret_frags, fragment_cb, &fcls));
256 GNUNET_assert (fcls.n == 1); 268 GNUNET_assert (fcls.n == 1);
257 269
258 struct GNUNET_MULTICAST_MessageHeader *msg1 270 struct GNUNET_MULTICAST_MessageHeader *msg1
@@ -270,15 +282,17 @@ run (void *cls, char *const *args, const char *cfgfile,
270 GNUNET_assert (GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg1, 282 GNUNET_assert (GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg1,
271 fcls.flags[1])); 283 fcls.flags[1]));
272 284
273 uint64_t retfrags = 0; 285 GNUNET_assert (
274 GNUNET_assert (GNUNET_OK == db->message_get (db->cls, &channel_pub_key, 286 GNUNET_OK == db->message_get (db->cls, &channel_pub_key,
275 GNUNET_ntohll (msg->message_id), 287 message_id, message_id,
276 &retfrags, fragment_cb, &fcls)); 288 &ret_frags, fragment_cb, &fcls));
277 GNUNET_assert (fcls.n == 2 && retfrags == 2); 289 GNUNET_assert (fcls.n == 2 && ret_frags == 2);
278 290
279 /* Message counters */ 291 /* Message counters */
280 292
281 uint64_t fragment_id = 0, message_id = 0, group_generation = 0; 293 fragment_id = 0;
294 message_id = 0;
295 group_generation = 0;
282 GNUNET_assert ( 296 GNUNET_assert (
283 GNUNET_OK == db->counters_message_get (db->cls, &channel_pub_key, 297 GNUNET_OK == db->counters_message_get (db->cls, &channel_pub_key,
284 &fragment_id, &message_id, 298 &fragment_id, &message_id,
diff --git a/src/psycstore/test_psycstore.c b/src/psycstore/test_psycstore.c
index 3ef2439e3..125e64f58 100644
--- a/src/psycstore/test_psycstore.c
+++ b/src/psycstore/test_psycstore.c
@@ -25,6 +25,8 @@
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 */ 26 */
27 27
28#include <inttypes.h>
29
28#include "platform.h" 30#include "platform.h"
29#include "gnunet_util_lib.h" 31#include "gnunet_util_lib.h"
30#include "gnunet_common.h" 32#include "gnunet_common.h"
@@ -302,19 +304,22 @@ fragment_result (void *cls,
302 enum GNUNET_PSYCSTORE_MessageFlags flags) 304 enum GNUNET_PSYCSTORE_MessageFlags flags)
303{ 305{
304 struct FragmentClosure *fcls = cls; 306 struct FragmentClosure *fcls = cls;
307 GNUNET_assert (fcls->n < fcls->n_expected);
305 struct GNUNET_MULTICAST_MessageHeader *msg0 = fcls->msg[fcls->n]; 308 struct GNUNET_MULTICAST_MessageHeader *msg0 = fcls->msg[fcls->n];
306 uint64_t flags0 = fcls->flags[fcls->n++]; 309 uint64_t flags0 = fcls->flags[fcls->n++];
307 310
308 if (flags == flags0 && msg->header.size == msg0->header.size 311 if (flags == flags0 && msg->header.size == msg0->header.size
309 && 0 == memcmp (msg, msg0, ntohs (msg->header.size))) 312 && 0 == memcmp (msg, msg0, ntohs (msg->header.size)))
310 { 313 {
311 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " fragment %llu matches\n", 314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " fragment %" PRIu64 " matches\n",
312 GNUNET_ntohll (msg->fragment_id)); 315 GNUNET_ntohll (msg->fragment_id));
313 return GNUNET_YES; 316 return GNUNET_YES;
314 } 317 }
315 else 318 else
316 { 319 {
317 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " fragment %llu differs\n", 320 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
321 " fragment differs: expected %" PRIu64 ", got %" PRIu64 "\n",
322 GNUNET_ntohll (msg0->fragment_id),
318 GNUNET_ntohll (msg->fragment_id)); 323 GNUNET_ntohll (msg->fragment_id));
319 GNUNET_assert (0); 324 GNUNET_assert (0);
320 return GNUNET_SYSERR; 325 return GNUNET_SYSERR;
@@ -323,13 +328,12 @@ fragment_result (void *cls,
323 328
324 329
325void 330void
326message_get_result (void *cls, int64_t result, const char *err_msg) 331message_get_latest_result (void *cls, int64_t result, const char *err_msg)
327{ 332{
328 struct FragmentClosure *fcls = cls; 333 struct FragmentClosure *fcls = cls;
329 op = NULL; 334 op = NULL;
330 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get:\t%d\n", result); 335 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_latest:\t%d\n", result);
331 GNUNET_assert (result > 0 && fcls->n && fcls->n_expected); 336 GNUNET_assert (0 < result && fcls->n == fcls->n_expected);
332
333 337
334 modifiers[0] = (struct GNUNET_ENV_Modifier) { 338 modifiers[0] = (struct GNUNET_ENV_Modifier) {
335 .oper = '=', 339 .oper = '=',
@@ -351,29 +355,46 @@ message_get_result (void *cls, int64_t result, const char *err_msg)
351 355
352 356
353void 357void
358message_get_result (void *cls, int64_t result, const char *err_msg)
359{
360 struct FragmentClosure *fcls = cls;
361 op = NULL;
362 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get:\t%d\n", result);
363 GNUNET_assert (0 < result && fcls->n == fcls->n_expected);
364
365 fcls->n = 0;
366 fcls->n_expected = 3;
367 op = GNUNET_PSYCSTORE_message_get_latest (h, &channel_pub_key, &slave_pub_key,
368 1, &fragment_result,
369 &message_get_latest_result, fcls);
370}
371
372
373void
354message_get_fragment_result (void *cls, int64_t result, const char *err_msg) 374message_get_fragment_result (void *cls, int64_t result, const char *err_msg)
355{ 375{
356 struct FragmentClosure *fcls = cls; 376 struct FragmentClosure *fcls = cls;
357 op = NULL; 377 op = NULL;
358 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_fragment:\t%d\n", result); 378 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_fragment:\t%d\n", result);
359 GNUNET_assert (result > 0 && fcls->n && fcls->n_expected); 379 GNUNET_assert (0 < result && fcls->n == fcls->n_expected);
360 380
361 fcls->n = 0; 381 fcls->n = 0;
362 fcls->n_expected = 3; 382 fcls->n_expected = 3;
383 uint64_t message_id = GNUNET_ntohll (fcls->msg[0]->message_id);
363 op = GNUNET_PSYCSTORE_message_get (h, &channel_pub_key, &slave_pub_key, 384 op = GNUNET_PSYCSTORE_message_get (h, &channel_pub_key, &slave_pub_key,
364 GNUNET_ntohll (fcls->msg[0]->message_id), 385 message_id, message_id,
365 &fragment_result, 386 &fragment_result,
366 &message_get_result, fcls); 387 &message_get_result, fcls);
367} 388}
368 389
369 390
370void 391void
371fragment_get_result (void *cls, int64_t result, const char *err_msg) 392fragment_get_latest_result (void *cls, int64_t result, const char *err_msg)
372{ 393{
373 struct FragmentClosure *fcls = cls; 394 struct FragmentClosure *fcls = cls;
374 op = NULL; 395 op = NULL;
375 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_get:\t%d\n", result); 396 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_get_latest:\t%d\n", result);
376 GNUNET_assert (result > 0 && fcls->n && fcls->n_expected); 397 GNUNET_assert (0 < result && fcls->n == fcls->n_expected);
377 398
378 fcls->n = 1; 399 fcls->n = 1;
379 fcls->n_expected = 2; 400 fcls->n_expected = 2;
@@ -381,9 +402,24 @@ fragment_get_result (void *cls, int64_t result, const char *err_msg)
381 GNUNET_ntohll (fcls->msg[1]->message_id), 402 GNUNET_ntohll (fcls->msg[1]->message_id),
382 GNUNET_ntohll (fcls->msg[1]->fragment_offset), 403 GNUNET_ntohll (fcls->msg[1]->fragment_offset),
383 &fragment_result, 404 &fragment_result,
384 &message_get_fragment_result, 405 &message_get_fragment_result, fcls);
385 fcls); 406}
407
386 408
409void
410fragment_get_result (void *cls, int64_t result, const char *err_msg)
411{
412 struct FragmentClosure *fcls = cls;
413 op = NULL;
414 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_get:\t%d\n", result);
415 GNUNET_assert (0 < result && fcls->n == fcls->n_expected);
416
417 fcls->n = 0;
418 fcls->n_expected = 3;
419 op = GNUNET_PSYCSTORE_fragment_get_latest (h, &channel_pub_key,
420 &slave_pub_key, fcls->n_expected,
421 &fragment_result,
422 &fragment_get_latest_result, fcls);
387} 423}
388 424
389 425
@@ -398,8 +434,9 @@ fragment_store_result (void *cls, int64_t result, const char *err_msg)
398 { /* last fragment */ 434 { /* last fragment */
399 fcls.n = 0; 435 fcls.n = 0;
400 fcls.n_expected = 1; 436 fcls.n_expected = 1;
437 uint64_t fragment_id = GNUNET_ntohll (fcls.msg[0]->fragment_id);
401 op = GNUNET_PSYCSTORE_fragment_get (h, &channel_pub_key, &slave_pub_key, 438 op = GNUNET_PSYCSTORE_fragment_get (h, &channel_pub_key, &slave_pub_key,
402 GNUNET_ntohll (fcls.msg[0]->fragment_id), 439 fragment_id, fragment_id,
403 &fragment_result, 440 &fragment_result,
404 &fragment_get_result, &fcls); 441 &fragment_get_result, &fcls);
405 } 442 }