From 40884377f3126bbecbfd3243d47224b3094914f9 Mon Sep 17 00:00:00 2001 From: Gabor X Toth <*@tg-x.net> Date: Wed, 30 Jul 2014 21:18:13 +0000 Subject: psyc, psycstore: retrieve state and history --- src/include/gnunet_protocols.h | 4 +- src/include/gnunet_psyc_service.h | 226 ++++++++++------- src/include/gnunet_psycstore_plugin.h | 41 ++- src/include/gnunet_psycstore_service.h | 146 +++++++++-- src/include/gnunet_social_service.h | 46 ++-- src/psyc/Makefile.am | 2 + src/psyc/gnunet-service-psyc.c | 272 ++++++++++++++++++-- src/psyc/psyc.h | 57 ++--- src/psyc/psyc_api.c | 414 +++++++++++++++++++++++++------ src/psyc/test_psyc.c | 348 +++++++++++++++++++++++--- src/psycstore/gnunet-service-psycstore.c | 97 +++++--- src/psycstore/plugin_psycstore_sqlite.c | 238 ++++++++++++++---- src/psycstore/psycstore.h | 160 +++++++----- src/psycstore/psycstore_api.c | 309 +++++++++++++++++------ src/psycstore/test_plugin_psycstore.c | 48 ++-- src/psycstore/test_psycstore.c | 65 +++-- src/social/gnunet-service-social.c | 16 +- src/social/social.h | 19 -- src/social/social_api.c | 43 ++-- 19 files changed, 1938 insertions(+), 613 deletions(-) (limited to 'src') diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 5626e1e5e..ea295197c 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -2187,10 +2187,10 @@ extern "C" /* 700 */ /** C->S: client requests channel history from PSYCstore. */ -#define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701 +#define GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY 701 /** S->C: result for a channel history request */ -#define GNUNET_MESSAGE_TYPE_PSYC_STORY_RESULT 702 +#define GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT 702 /** C->S: request best matching state variable from PSYCstore. */ diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h index 7097c46a8..25b405dad 100644 --- a/src/include/gnunet_psyc_service.h +++ b/src/include/gnunet_psyc_service.h @@ -159,6 +159,11 @@ enum GNUNET_PSYC_Policy enum GNUNET_PSYC_MessageFlags { + /** + * Default / no flags. + */ + GNUNET_PSYC_MESSAGE_DEFAULT = 0, + /** * Historic message, retrieved from PSYCstore. */ @@ -314,12 +319,12 @@ struct GNUNET_PSYC_CountersResultMessage /** * Status code for the operation. */ - int32_t result_code GNUNET_PACKED; + uint32_t result_code GNUNET_PACKED; /** * Last message ID sent to the channel. */ - uint64_t max_message_id; + uint64_t max_message_id GNUNET_PACKED; }; @@ -503,13 +508,24 @@ struct GNUNET_PSYC_Master; /** - * Function called after the channel master started. + * Function called after connected to the PSYC service + * and the channel master started. * - * @param cls Closure. - * @param max_message_id Last message ID sent to the channel. + * Also called when reconnected to the service + * after the connection closed unexpectedly. + * + * @param cls + * Closure. + * @param result + * #GNUNET_YES if there were already messages sent to the channel, + * #GNUNET_NO if the message history is empty, + * #GNUNET_SYSERR on error. + * @param max_message_id + * Last message ID sent to the channel. */ typedef void -(*GNUNET_PSYC_MasterStartCallback) (void *cls, uint64_t max_message_id); +(*GNUNET_PSYC_MasterStartCallback) (void *cls, int result, + uint64_t max_message_id); /** @@ -720,11 +736,21 @@ struct GNUNET_PSYC_Slave; /** * Function called after the slave connected to the PSYC service. * - * @param cls Closure. - * @param max_message_id Last message ID sent to the channel. + * Also called when reconnected to the service + * after the connection closed unexpectedly. + * + * @param cls + * Closure. + * @param result + * #GNUNET_YES if there were already messages sent to the channel, + * #GNUNET_NO if the message history is empty, + * #GNUNET_SYSERR on error. + * @param max_message_id + * Last message ID sent to the channel. */ typedef void -(*GNUNET_PSYC_SlaveConnectCallback) (void *cls, uint64_t max_message_id); +(*GNUNET_PSYC_SlaveConnectCallback) (void *cls, int result, + uint64_t max_message_id); /** @@ -875,6 +901,23 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th); struct GNUNET_PSYC_Channel; +/** + * Function called with the result of an asynchronous operation. + * + * @param cls + * Closure. + * @param result + * Result of the operation. + * Usually one of #GNUNET_OK, #GNUNET_YES, #GNUNET_NO, or #GNUNET_SYSERR. + * @param err_msg + * Error message. + */ +typedef void +(*GNUNET_PSYC_ResultCallback) (void *cls, + int64_t result, + const char *err_msg); + + /** * Convert a channel @a master to a @e channel handle to access the @e channel * APIs. @@ -921,7 +964,9 @@ void GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t announced_at, - uint64_t effective_since); + uint64_t effective_since, + GNUNET_PSYC_ResultCallback result_cb, + void *cls); /** @@ -949,7 +994,9 @@ void GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, - uint64_t announced_at); + uint64_t announced_at, + GNUNET_PSYC_ResultCallback result_cb, + void *cls); /** @@ -962,74 +1009,69 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, * @param value_size Number of bytes in @a value. */ typedef void -(*GNUNET_PSYC_StateCallback) (void *cls, - const char *name, - const void *value, - size_t value_size); +(*GNUNET_PSYC_StateVarCallback) (void *cls, + const char *name, + const void *value, + size_t value_size); /** - * Function called when a requested operation has finished. - * - * @param cls Closure. - */ -typedef void -(*GNUNET_PSYC_FinishCallback) (void *cls); - - -/** - * Handle to a story telling operation. - */ -struct GNUNET_PSYC_Story; - - -/** - * Request to be told the message history of the channel. + * Request to replay a part of the message history of the channel. * * Historic messages (but NOT the state at the time) will be replayed (given to * the normal method handlers) if available and if access is permitted. * - * To get the latest message, use 0 for both the start and end message ID. + * @param channel + * Which channel should be replayed? + * @param start_message_id + * Earliest interesting point in history. + * @param end_message_id + * Last (inclusive) interesting point in history. + * @param finish_cb + * Function to call when the requested history has been fully replayed + * (counting message IDs might not suffice, as some messages might be + * secret and thus the listener would not know the story is finished + * without being told explicitly)o once this function has been called, the + * client must not call GNUNET_PSYC_channel_history_replay_cancel() anymore. + * @param cls + * Closure for the callbacks. * - * @param channel Which channel should be replayed? - * @param start_message_id Earliest interesting point in history. - * @param end_message_id Last (exclusive) interesting point in history. - * @param message_cb Function to invoke on message parts received from the story. - * @param finish_cb Function to call when the requested story has been fully - * told (counting message IDs might not suffice, as some messages - * might be secret and thus the listener would not know the story is - * finished without being told explicitly); once this function - * has been called, the client must not call - * GNUNET_PSYC_channel_story_tell_cancel() anymore. - * @param cls Closure for the callbacks. - * @return Handle to cancel story telling operation. + * @return Handle to cancel history replay operation. */ -struct GNUNET_PSYC_Story * -GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, - uint64_t start_message_id, - uint64_t end_message_id, - GNUNET_PSYC_MessageCallback message_cb, - GNUNET_PSYC_MessagePartCallback message_part_cb, - GNUNET_PSYC_FinishCallback finish_cb, - void *cls); +void +GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *channel, + uint64_t start_message_id, + uint64_t end_message_id, + GNUNET_PSYC_ResultCallback finish_cb, + void *cls); /** - * Abort story telling. + * Request to replay the latest messages from the message history of the channel. + * + * Historic messages (but NOT the state at the time) will be replayed (given to + * the normal method handlers) if available and if access is permitted. * - * This function must not be called from within method handlers (as given to - * GNUNET_PSYC_slave_join()) of the slave. + * @param channel + * Which channel should be replayed? + * @param message_limit + * Maximum number of messages to replay. + * @param finish_cb + * Function to call when the requested history has been fully replayed + * (counting message IDs might not suffice, as some messages might be + * secret and thus the listener would not know the story is finished + * without being told explicitly)o once this function has been called, the + * client must not call GNUNET_PSYC_channel_history_replay_cancel() anymore. + * @param cls + * Closure for the callbacks. * - * @param story Story telling operation to stop. + * @return Handle to cancel history replay operation. */ void -GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story); - - -/** - * Handle for a state query operation. - */ -struct GNUNET_PSYC_StateQuery; +GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *channel, + uint64_t message_limit, + GNUNET_PSYC_ResultCallback finish_cb, + void *cls); /** @@ -1039,19 +1081,26 @@ struct GNUNET_PSYC_StateQuery; * less-specific name is matched; for example, requesting "_a_b" will match "_a" * if "_a_b" does not exist. * - * @param channel Channel handle. - * @param full_name Full name of the requested variable, the actual variable - * returned might have a shorter name.. - * @param cb Function called once when a matching state variable is found. + * @param channel + * Channel handle. + * @param full_name + * Full name of the requested variable. + * The actual variable returned might have a shorter name. + * @param var_cb + * Function called once when a matching state variable is found. * Not called if there's no matching state variable. - * @param cb_cls Closure for the callbacks. - * @return Handle that can be used to cancel the query operation. + * @param result_cb + * Function called after the operation finished. + * (i.e. all state variables have been returned via @a state_cb) + * @param cls + * Closure for the callbacks. */ -struct GNUNET_PSYC_StateQuery * +void GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel, const char *full_name, - GNUNET_PSYC_StateCallback cb, - void *cb_cls); + GNUNET_PSYC_StateVarCallback var_cb, + GNUNET_PSYC_ResultCallback result_cb, + void *cls); /** @@ -1064,26 +1113,25 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel, * The @a state_cb is invoked on all matching state variables asynchronously, as * the state is stored in and retrieved from the PSYCstore, * - * @param channel Channel handle. - * @param name_prefix Prefix of the state variable name to match. - * @param cb Function to call with the matching state variables. - * @param cb_cls Closure for the callbacks. - * @return Handle that can be used to cancel the query operation. + * @param channel + * Channel handle. + * @param name_prefix + * Prefix of the state variable name to match. + * @param var_cb + * Function called once when a matching state variable is found. + * Not called if there's no matching state variable. + * @param result_cb + * Function called after the operation finished. + * (i.e. all state variables have been returned via @a state_cb) + * @param cls + * Closure for the callbacks. */ -struct GNUNET_PSYC_StateQuery * +void GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel, const char *name_prefix, - GNUNET_PSYC_StateCallback cb, - void *cb_cls); - - -/** - * Cancel a state query operation. - * - * @param query Handle for the operation to cancel. - */ -void -GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateQuery *query); + GNUNET_PSYC_StateVarCallback var_cb, + GNUNET_PSYC_ResultCallback result_cb, + void *cls); #if 0 /* keep Emacsens' auto-indent happy */ diff --git a/src/include/gnunet_psycstore_plugin.h b/src/include/gnunet_psycstore_plugin.h index 1945b400e..3f02759f6 100644 --- a/src/include/gnunet_psycstore_plugin.h +++ b/src/include/gnunet_psycstore_plugin.h @@ -112,7 +112,7 @@ struct GNUNET_PSYCSTORE_PluginFunctions uint64_t psycstore_flags); /** - * Retrieve a message fragment by fragment ID. + * Retrieve a message fragment range by fragment ID. * * @see GNUNET_PSYCSTORE_fragment_get() * @@ -121,12 +121,29 @@ struct GNUNET_PSYCSTORE_PluginFunctions int (*fragment_get) (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - uint64_t fragment_id, + uint64_t first_fragment_id, + uint64_t last_fragment_id, + uint64_t *returned_fragments, GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls); /** - * Retrieve all fragments of a message. + * Retrieve latest message fragments. + * + * @see GNUNET_PSYCSTORE_fragment_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ + int + (*fragment_get_latest) (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t fragment_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls); + + /** + * Retrieve all fragments of a message ID range. * * @see GNUNET_PSYCSTORE_message_get() * @@ -135,11 +152,27 @@ struct GNUNET_PSYCSTORE_PluginFunctions int (*message_get) (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - uint64_t message_id, + uint64_t first_fragment_id, + uint64_t last_fragment_id, uint64_t *returned_fragments, GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls); + /** + * Retrieve all fragments of the latest messages. + * + * @see GNUNET_PSYCSTORE_message_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ + int + (*message_get_latest) (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t fragment_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls); + /** * Retrieve a fragment of message specified by its message ID and fragment * offset. diff --git a/src/include/gnunet_psycstore_service.h b/src/include/gnunet_psycstore_service.h index 84d69c24d..78d016bb3 100644 --- a/src/include/gnunet_psycstore_service.h +++ b/src/include/gnunet_psycstore_service.h @@ -107,9 +107,10 @@ struct GNUNET_PSYCSTORE_OperationHandle; /** * Function called with the result of an asynchronous operation. * - * @param result #GNUNET_SYSERR on error, + * @param result * #GNUNET_YES on success or if the peer was a member, - * #GNUNET_NO if the peer was not a member + * #GNUNET_NO if the peer was not a member, + * #GNUNET_SYSERR on error, */ typedef void (*GNUNET_PSYCSTORE_ResultCallback) (void *cls, @@ -235,7 +236,7 @@ typedef int /** - * Retrieve a message fragment by fragment ID. + * Retrieve message fragments by fragment ID range. * * @param h * Handle for the PSYCstore. @@ -245,11 +246,15 @@ typedef int * The slave requesting the fragment. If not NULL, a membership test is * performed first and the fragment is only returned if the slave has * access to it. - * @param fragment_id - * Fragment ID to retrieve. Use 0 to get the latest message fragment. - * @param fcb + * @param first_fragment_id + * First fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param last_fragment_id + * Last consecutive fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param fragment_cb * Callback to call with the retrieved fragments. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -260,14 +265,53 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, - uint64_t fragment_id, - GNUNET_PSYCSTORE_FragmentCallback fcb, - GNUNET_PSYCSTORE_ResultCallback rcb, + uint64_t first_message_id, + uint64_t last_message_id, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls); /** - * Retrieve all fragments of a message. + * Retrieve latest message fragments. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the fragment. If not NULL, a membership test is + * performed first and the fragment is only returned if the slave has + * access to it. + * @param first_fragment_id + * First fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param last_fragment_id + * Last consecutive fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param fragment_limit + * Maximum number of fragments to retrieve. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param rcb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t fragment_limit, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *cls); + + +/** + * Retrieve all fragments of messages in a message ID range. * * @param h * Handle for the PSYCstore. @@ -277,11 +321,15 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, * The slave requesting the message. If not NULL, a membership test is * performed first and the message is only returned if the slave has * access to it. - * @param message_id - * Message ID to retrieve. Use 0 to get the latest message. - * @param fcb + * @param first_message_id + * First message ID to retrieve. + * Use 0 to get the latest message. + * @param last_message_id + * Last consecutive message ID to retrieve. + * Use 0 to get the latest message. + * @param fragment_cb * Callback to call with the retrieved fragments. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -292,23 +340,67 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, - uint64_t message_id, - GNUNET_PSYCSTORE_FragmentCallback fcb, - GNUNET_PSYCSTORE_ResultCallback rcb, + uint64_t first_message_id, + uint64_t last_message_id, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls); +/** + * Retrieve all fragments of the latest messages. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the message. If not NULL, a membership test is + * performed first and the message is only returned if the slave has + * access to it. + * @param message_limit + * Maximum number of messages to retrieve. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param rcb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t message_limit, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *cls); + + /** * Retrieve a fragment of message specified by its message ID and fragment * offset. * - * @param h Handle for the PSYCstore. - * @param channel_key The channel we are interested in. - * @param message_id Message ID to check. Use 0 to get the latest message. - * @param fragment_offset Offset of the fragment to retrieve. - * @param fcb Callback to call with the retrieved fragments. - * @param rcb Callback to call with the result of the operation. - * @param cls Closure for the callbacks. + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the message fragment. If not NULL, a membership + * test is performed first and the message fragment is only returned + * if the slave has access to it. + * @param message_id + * Message ID to retrieve. Use 0 to get the latest message. + * @param fragment_offset + * Offset of the fragment to retrieve. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. * * @return Handle that can be used to cancel the operation. */ @@ -318,8 +410,8 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t message_id, uint64_t fragment_offset, - GNUNET_PSYCSTORE_FragmentCallback fcb, - GNUNET_PSYCSTORE_ResultCallback rcb, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, void *cls); diff --git a/src/include/gnunet_social_service.h b/src/include/gnunet_social_service.h index ca1578820..66427a072 100644 --- a/src/include/gnunet_social_service.h +++ b/src/include/gnunet_social_service.h @@ -287,12 +287,18 @@ typedef void /** * Function called after the host entered the place. * - * @param cls Closure. - * @param max_message_id Last message ID sent to the channel. - * Or 0 if no messages have been sent to the place yet. + * @param cls + * Closure. + * @param result + * #GNUNET_OK on success, or + * #GNUNET_SYSERR on error. + * @param max_message_id + * Last message ID sent to the channel. + * Or 0 if no messages have been sent to the place yet. */ typedef void -(*GNUNET_SOCIAL_HostEnterCallback) (void *cls, uint64_t max_message_id); +(*GNUNET_SOCIAL_HostEnterCallback) (void *cls, int result, + uint64_t max_message_id); /** @@ -793,18 +799,22 @@ struct GNUNET_SOCIAL_WatchHandle; /** * Watch a place for changed objects. * - * @param place Place to watch. - * @param object_filter Object prefix to match. - * @param state_cb Function to call when an object/state changes. - * @param state_cb_cls Closure for callback. + * @param place + * Place to watch. + * @param object_filter + * Object prefix to match. + * @param state_var_cb + * Function to call when an object/state var changes. + * @param cls + * Closure for callback. * * @return Handle that can be used to cancel watching. */ struct GNUNET_SOCIAL_WatchHandle * GNUNET_SOCIAL_place_watch (struct GNUNET_SOCIAL_Place *place, const char *object_filter, - GNUNET_PSYC_StateCallback state_cb, - void *state_cb_cls); + GNUNET_PSYC_StateVarCallback state_var_cb, + void *cls); /** @@ -822,18 +832,22 @@ struct GNUNET_SOCIAL_LookHandle; /** * Look at objects in the place with a matching name prefix. * - * @param place The place to look its objects at. - * @param name_prefix Look at objects with names beginning with this value. - * @param state_cb Function to call for each object found. - * @param state_cb_cls Closure for callback function. + * @param place + * The place to look its objects at. + * @param name_prefix + * Look at objects with names beginning with this value. + * @param state_var_cb + * Function to call for each object found. + * @param cls + * Closure for callback function. * * @return Handle that can be used to stop looking at objects. */ struct GNUNET_SOCIAL_LookHandle * GNUNET_SOCIAL_place_look (struct GNUNET_SOCIAL_Place *place, const char *name_prefix, - GNUNET_PSYC_StateCallback state_cb, - void *state_cb_cls); + GNUNET_PSYC_StateVarCallback state_var_cb, + void *cls); /** diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am index b18605ab9..fb4341751 100644 --- a/src/psyc/Makefile.am +++ b/src/psyc/Makefile.am @@ -86,12 +86,14 @@ test_psyc_LDADD = \ libgnunetpsyc.la \ libgnunetpsycutil.la \ $(top_builddir)/src/testing/libgnunettesting.la \ + $(top_builddir)/src/core/libgnunetcore.la \ $(top_builddir)/src/env/libgnunetenv.la \ $(top_builddir)/src/util/libgnunetutil.la test_psyc_DEPENDENCIES = \ libgnunetpsyc.la \ libgnunetpsycutil.la \ $(top_builddir)/src/testing/libgnunettesting.la \ + $(top_builddir)/src/core/libgnunetcore.la \ $(top_builddir)/src/env/libgnunetenv.la \ $(top_builddir)/src/util/libgnunetutil.la diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index e7020bc69..3adc34d2a 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c @@ -402,6 +402,14 @@ struct Slave }; +struct OperationClosure +{ + struct GNUNET_SERVER_Client *client; + struct Channel *chn; + uint64_t op_id; +}; + + static void transmit_message (struct Channel *chn); @@ -584,6 +592,46 @@ client_send_msg (const struct Channel *chn, } +/** + * Send a result code back to the client. + * + * @param client + * Client that should receive the result code. + * @param result_code + * Code to transmit. + * @param op_id + * Operation ID in network byte order. + * @param err_msg + * Error message to include (or NULL for none). + */ +static void +client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id, + int64_t result_code, const char *err_msg) +{ + struct OperationResult *res; + size_t err_len = 0; // FIXME: maximum length + + if (NULL != err_msg) + err_len = strlen (err_msg) + 1; + res = GNUNET_malloc (sizeof (struct OperationResult) + err_len); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); + res->header.size = htons (sizeof (struct OperationResult) + err_len); + res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1); + res->op_id = op_id; + if (0 < err_len) + memcpy (&res[1], err_msg, err_len); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Sending result to client for operation #%" PRIu64 ": " + "%" PRId64 " (%s)\n", + client, GNUNET_ntohll (op_id), result_code, err_msg); + + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, &res->header, + GNUNET_NO); + GNUNET_free (res); +} + + /** * Closure for join_mem_test_cb() */ @@ -799,7 +847,8 @@ mcast_recv_replay_fragment (void *cls, { struct Channel *chn = cls; - GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key, fragment_id, + GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key, + fragment_id, fragment_id, &store_recv_fragment_replay, &store_recv_fragment_replay_result, rh); } @@ -817,7 +866,8 @@ mcast_recv_replay_message (void *cls, struct GNUNET_MULTICAST_ReplayHandle *rh) { struct Channel *chn = cls; - GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, message_id, + GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, + message_id, message_id, &store_recv_fragment_replay, &store_recv_fragment_replay_result, rh); } @@ -865,7 +915,8 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) */ static void client_send_mcast_msg (struct Channel *chn, - const struct GNUNET_MULTICAST_MessageHeader *mmsg) + const struct GNUNET_MULTICAST_MessageHeader *mmsg, + uint32_t flags) { struct GNUNET_PSYC_MessageHeader *pmsg; uint16_t size = ntohs (mmsg->header.size); @@ -882,6 +933,7 @@ client_send_mcast_msg (struct Channel *chn, pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); pmsg->message_id = mmsg->message_id; pmsg->fragment_offset = mmsg->fragment_offset; + pmsg->flags = htonl (flags); memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); client_send_msg (chn, &pmsg->header); @@ -1111,7 +1163,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id, { if (GNUNET_NO == drop) { - client_send_mcast_msg (chn, cache_entry->mmsg); + client_send_mcast_msg (chn, cache_entry->mmsg, 0); } if (cache_entry->ref_count <= 1) { @@ -1375,10 +1427,10 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id, struct Channel *chn = &mst->chn; chn->store_op = NULL; - struct CountersResult res; + struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); res.header.size = htons (sizeof (res)); - res.result_code = htonl (result); + res.result_code = htonl (result - INT32_MIN); res.max_message_id = GNUNET_htonll (max_message_id); if (GNUNET_OK == result || GNUNET_NO == result) @@ -1421,10 +1473,10 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, struct Channel *chn = &slv->chn; chn->store_op = NULL; - struct CountersResult res; + struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); res.header.size = htons (sizeof (res)); - res.result_code = htonl (result); + res.result_code = htonl (result - INT32_MIN); res.max_message_id = GNUNET_htonll (max_message_id); if (GNUNET_OK == result || GNUNET_NO == result) @@ -1511,10 +1563,10 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client, { chn = &mst->chn; - struct CountersResult res; + struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); res.header.size = htons (sizeof (res)); - res.result_code = htonl (GNUNET_OK); + res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN); res.max_message_id = GNUNET_htonll (mst->max_message_id); GNUNET_SERVER_notification_context_add (nc, client); @@ -1621,10 +1673,10 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, { chn = &slv->chn; - struct CountersResult res; + struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); res.header.size = htons (sizeof (res)); - res.result_code = htonl (GNUNET_OK); + res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN); res.max_message_id = GNUNET_htonll (chn->max_message_id); GNUNET_SERVER_notification_context_add (nc, client); @@ -2047,17 +2099,26 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, }; +struct MembershipStoreClosure +{ + struct GNUNET_SERVER_Client *client; + struct Channel *chn; + uint64_t op_id; +}; + + /** * Received result of GNUNET_PSYCSTORE_membership_store() */ static void store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg) { - struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls; + struct MembershipStoreClosure *mcls = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n", - mth, result, err_msg); - /* FIXME: send result to client */ + mcls->chn, result, err_msg); + + client_send_result (mcls->client, mcls->op_id, result, err_msg); } @@ -2075,6 +2136,11 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, const struct ChannelMembershipStoreRequest * req = (const struct ChannelMembershipStoreRequest *) msg; + struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls)); + mcls->client = client; + mcls->chn = chn; + mcls->op_id = req->op_id; + uint64_t announced_at = GNUNET_ntohll (req->announced_at); uint64_t effective_since = GNUNET_ntohll (req->effective_since); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2086,19 +2152,138 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key, req->did_join, announced_at, effective_since, 0, /* FIXME: group_generation */ - &store_recv_membership_store_result, chn); + &store_recv_membership_store_result, mcls); GNUNET_SERVER_receive_done (client, GNUNET_OK); } +static int +store_recv_fragment_history (void *cls, + struct GNUNET_MULTICAST_MessageHeader *msg, + enum GNUNET_PSYCSTORE_MessageFlags flags) +{ + struct OperationClosure *opcls = cls; + struct Channel *chn = opcls->chn; + client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC); + return GNUNET_YES; +} + + +/** + * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. + */ +static void +store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg) +{ + struct OperationClosure *opcls = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p History replay #%" PRIu64 ": " + "PSYCSTORE returned %" PRId64 " (%s)\n", + opcls->chn, opcls->op_id, result, err_msg); + + client_send_result (opcls->client, opcls->op_id, result, err_msg); +} + + /** * Client requests channel history from PSYCstore. */ static void -client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + struct Channel * + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (NULL != chn); + + const struct HistoryRequest * + req = (const struct HistoryRequest *) msg; + + struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); + opcls->client = client; + opcls->chn = chn; + opcls->op_id = req->op_id; + + if (0 == req->message_limit) + GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL, + GNUNET_ntohll (req->start_message_id), + GNUNET_ntohll (req->end_message_id), + &store_recv_fragment_history, + &store_recv_fragment_history_result, opcls); + else + GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL, + GNUNET_ntohll (req->message_limit), + &store_recv_fragment_history, + &store_recv_fragment_history_result, + opcls); + + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Received state var from PSYCstore, send it to client. + */ +static int +store_recv_state_var (void *cls, const char *name, + const void *value, size_t value_size) +{ + struct OperationClosure *opcls = cls; + struct OperationResult *op; + + if (NULL != name) + { + uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; + struct GNUNET_PSYC_MessageModifier *mod; + op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size); + op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size); + op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); + op->op_id = opcls->op_id; + + mod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; + mod->header.size = htons (sizeof (*mod) + name_size + value_size); + mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); + mod->name_size = htons (name_size); + mod->value_size = htonl (value_size); + mod->oper = htons (GNUNET_ENV_OP_ASSIGN); + memcpy (&mod[1], name, name_size); + memcpy (((char *) &mod[1]) + name_size, value, value_size); + } + else + { + struct GNUNET_MessageHeader *mod; + op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size); + op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size); + op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); + op->op_id = opcls->op_id; + + mod = (struct GNUNET_MessageHeader *) &op[1]; + mod->size = htons (sizeof (*mod) + value_size); + mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); + memcpy (&mod[1], value, value_size); + } + + GNUNET_SERVER_notification_context_add (nc, opcls->client); + GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header, + GNUNET_NO); + return GNUNET_YES; +} + + +/** + * Received result of GNUNET_PSYCSTORE_state_get() + * or GNUNET_PSYCSTORE_state_get_prefix() + */ +static void +store_recv_state_result (void *cls, int64_t result, const char *err_msg) { + struct OperationClosure *opcls = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p History replay #%" PRIu64 ": " + "PSYCSTORE returned %" PRId64 " (%s)\n", + opcls->chn, opcls->op_id, result, err_msg); + client_send_result (opcls->client, opcls->op_id, result, err_msg); } @@ -2109,7 +2294,30 @@ static void client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { + struct Channel * + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (NULL != chn); + + const struct StateRequest * + req = (const struct StateRequest *) msg; + uint16_t name_size = ntohs (req->header.size) - sizeof (*req); + const char *name = (const char *) &req[1]; + if (0 == name_size || '\0' != name[name_size - 1]) + { + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + + struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); + opcls->client = client; + opcls->chn = chn; + opcls->op_id = req->op_id; + + GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name, + &store_recv_state_var, + &store_recv_state_result, opcls); + GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2120,6 +2328,30 @@ static void client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { + struct Channel * + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (NULL != chn); + + const struct StateRequest * + req = (const struct StateRequest *) msg; + + uint16_t name_size = ntohs (req->header.size) - sizeof (*req); + const char *name = (const char *) &req[1]; + if (0 == name_size || '\0' != name[name_size - 1]) + { + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + + struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); + opcls->client = client; + opcls->chn = chn; + opcls->op_id = req->op_id; + + GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name, + &store_recv_state_var, + &store_recv_state_result, opcls); + GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2140,8 +2372,8 @@ static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { { &client_recv_membership_store, NULL, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 }, - { &client_recv_story_request, NULL, - GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 }, + { &client_recv_history_replay, NULL, + GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 }, { &client_recv_state_get, NULL, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 21131e7d3..f6d40ddb4 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h @@ -112,30 +112,39 @@ struct ChannelMembershipStoreRequest */ struct GNUNET_MessageHeader header; - uint32_t reserved; + uint32_t reserved GNUNET_PACKED; + + uint64_t op_id GNUNET_PACKED; struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; - uint64_t announced_at; + uint64_t announced_at GNUNET_PACKED; - uint64_t effective_since; + uint64_t effective_since GNUNET_PACKED; uint8_t did_join; }; -struct StoryRequest +struct HistoryRequest { /** - * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STORY_REQUEST + * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_HISTORY_REQUEST */ struct GNUNET_MessageHeader header; - uint64_t op_id; + uint32_t reserved GNUNET_PACKED; + + /** + * ID for this operation. + */ + uint64_t op_id GNUNET_PACKED; + + uint64_t start_message_id GNUNET_PACKED; - uint64_t start_message_id; + uint64_t end_message_id GNUNET_PACKED; - uint64_t end_message_id; + uint64_t message_limit GNUNET_PACKED; }; @@ -148,10 +157,12 @@ struct StateRequest */ struct GNUNET_MessageHeader header; + uint32_t reserved GNUNET_PACKED; + /** * ID for this operation. */ - uint64_t op_id; + uint64_t op_id GNUNET_PACKED; /* Followed by NUL-terminated name. */ }; @@ -160,25 +171,6 @@ struct StateRequest /**** service -> library ****/ -struct CountersResult -{ - /** - * Type: GNUNET_MESSAGE_TYPE_PSYC_RESULT_COUNTERS - */ - struct GNUNET_MessageHeader header; - - /** - * Status code for the operation. - */ - int32_t result_code GNUNET_PACKED; - - /** - * Last message ID sent to the channel. - */ - uint64_t max_message_id; -}; - - /** * Answer from service to client about last operation. */ @@ -192,23 +184,22 @@ struct OperationResult */ struct GNUNET_MessageHeader header; + uint32_t reserved GNUNET_PACKED; + /** * Operation ID. */ - uint32_t op_id GNUNET_PACKED; + uint64_t op_id GNUNET_PACKED; /** * Status code for the operation. */ - int64_t result_code GNUNET_PACKED; + uint64_t result_code GNUNET_PACKED; /* Followed by: * - on error: NUL-terminated error message * - on success: one of the following message types * - * For a STORY_RESULT: - * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE - * * For a STATE_RESULT, one of: * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index ca25b1b01..8cce89704 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c @@ -43,6 +43,33 @@ #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) +struct OperationListItem +{ + struct OperationListItem *prev; + struct OperationListItem *next; + + /** + * Operation ID. + */ + uint64_t op_id; + + /** + * Continuation to invoke with the result of an operation. + */ + GNUNET_PSYC_ResultCallback result_cb; + + /** + * State variable result callback. + */ + GNUNET_PSYC_StateVarCallback state_var_cb; + + /** + * Closure for the callbacks. + */ + void *cls; +}; + + /** * Handle to access PSYC channel operations for both the master and slaves. */ @@ -83,6 +110,21 @@ struct GNUNET_PSYC_Channel */ void *disconnect_cls; + /** + * First operation in the linked list. + */ + struct OperationListItem *op_head; + + /** + * Last operation in the linked list. + */ + struct OperationListItem *op_tail; + + /** + * Last operation ID used. + */ + uint64_t last_op_id; + /** * Are we polling for incoming messages right now? */ @@ -163,21 +205,82 @@ struct GNUNET_PSYC_SlaveTransmitHandle /** - * Handle to a story telling operation. + * Get a fresh operation ID to distinguish between PSYCstore requests. + * + * @param h Handle to the PSYCstore service. + * @return next operation id to use */ -struct GNUNET_PSYC_Story +static uint64_t +op_get_next_id (struct GNUNET_PSYC_Channel *chn) { - -}; + return ++chn->last_op_id; +} /** - * Handle for a state query operation. + * Find operation by ID. + * + * @return Operation, or NULL if none found. */ -struct GNUNET_PSYC_StateQuery +static struct OperationListItem * +op_find_by_id (struct GNUNET_PSYC_Channel *chn, uint64_t op_id) { + struct OperationListItem *op = chn->op_head; + while (NULL != op) + { + if (op->op_id == op_id) + return op; + op = op->next; + } + return NULL; +} -}; + +static uint64_t +op_add (struct GNUNET_PSYC_Channel *chn, GNUNET_PSYC_ResultCallback result_cb, + void *cls) +{ + if (NULL == result_cb) + return 0; + + struct OperationListItem *op = GNUNET_malloc (sizeof (*op)); + op->op_id = op_get_next_id (chn); + op->result_cb = result_cb; + op->cls = cls; + GNUNET_CONTAINER_DLL_insert_tail (chn->op_head, chn->op_tail, op); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Added operation #%" PRIu64 "\n", chn, op->op_id); + return op->op_id; +} + + +static int +op_result (struct GNUNET_PSYC_Channel *chn, uint64_t op_id, + int64_t result_code, const char *err_msg) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Received result for operation #%" PRIu64 ": %" PRId64 " (%s)\n", + chn, op_id, result_code, err_msg); + if (0 == op_id) + return GNUNET_NO; + + struct OperationListItem *op = op_find_by_id (chn, op_id); + if (NULL == op) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Could not find operation #%" PRIu64 "\n", op_id); + return GNUNET_NO; + } + + GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op); + + if (NULL != op->result_cb) + op->result_cb (op->cls, result_code, err_msg); + + GNUNET_free (op); + return GNUNET_YES; +} static void @@ -202,6 +305,79 @@ channel_recv_disconnect (void *cls, } +static void +channel_recv_result (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_PSYC_Channel * + chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); + + uint16_t size = ntohs (msg->size); + const struct OperationResult *res = (const struct OperationResult *) msg; + const char *err_msg = NULL; + + if (sizeof (struct OperationResult) < size) + { + err_msg = (const char *) &res[1]; + if ('\0' != err_msg[size - sizeof (struct OperationResult) - 1]) + { + GNUNET_break (0); + err_msg = NULL; + } + } + + op_result (chn, GNUNET_ntohll (res->op_id), + GNUNET_ntohll (res->result_code) + INT64_MIN, err_msg); +} + + +static void +channel_recv_state_result (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_PSYC_Channel * + chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); + + const struct OperationResult *res = (const struct OperationResult *) msg; + struct OperationListItem *op = op_find_by_id (chn, GNUNET_ntohll (res->op_id)); + if (NULL == op || NULL == op->state_var_cb) + return; + + const struct GNUNET_MessageHeader *modc = (struct GNUNET_MessageHeader *) &op[1]; + uint16_t modc_size = ntohs (modc->size); + if (ntohs (msg->size) - sizeof (*msg) != modc_size) + { + GNUNET_break (0); + return; + } + switch (ntohs (modc->type)) + { + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: + { + const struct GNUNET_PSYC_MessageModifier * + mod = (const struct GNUNET_PSYC_MessageModifier *) modc; + + const char *name = (const char *) &mod[1]; + uint16_t name_size = ntohs (mod->name_size); + if ('\0' != name[name_size - 1]) + { + GNUNET_break (0); + return; + } + op->state_var_cb (op->cls, name, name + name_size, ntohs (mod->value_size)); + break; + } + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: + op->state_var_cb (op->cls, NULL, (const char *) &modc[1], + modc_size - sizeof (*modc)); + break; + } +} + + static void channel_recv_message (void *cls, struct GNUNET_CLIENT_MANAGER_Connection *client, @@ -234,9 +410,16 @@ master_recv_start_ack (void *cls, mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (struct GNUNET_PSYC_Channel)); - struct CountersResult *cres = (struct CountersResult *) msg; + struct GNUNET_PSYC_CountersResultMessage * + cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; + int32_t result = ntohl (cres->result_code) + INT32_MIN; + if (GNUNET_OK != result && GNUNET_NO != result) + { + LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master.\n"); + GNUNET_break (0); + } if (NULL != mst->start_cb) - mst->start_cb (mst->cb_cls, GNUNET_ntohll (cres->max_message_id)); + mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); } @@ -279,9 +462,16 @@ slave_recv_join_ack (void *cls, struct GNUNET_PSYC_Slave * slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (struct GNUNET_PSYC_Channel)); - struct CountersResult *cres = (struct CountersResult *) msg; + struct GNUNET_PSYC_CountersResultMessage * + cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; + int32_t result = ntohl (cres->result_code) + INT32_MIN; + if (GNUNET_YES != result && GNUNET_NO != result) + { + LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n"); + GNUNET_break (0); + } if (NULL != slv->connect_cb) - slv->connect_cb (slv->cb_cls, GNUNET_ntohll (cres->max_message_id)); + slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); } @@ -317,12 +507,20 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = { &master_recv_start_ack, NULL, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, - sizeof (struct CountersResult), GNUNET_NO }, + sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, { &master_recv_join_request, NULL, GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, + { &channel_recv_state_result, NULL, + GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, + sizeof (struct OperationResult), GNUNET_YES }, + + { &channel_recv_result, NULL, + GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, + sizeof (struct OperationResult), GNUNET_YES }, + { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, { NULL, NULL, 0, 0, GNUNET_NO } @@ -341,12 +539,20 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = { &slave_recv_join_ack, NULL, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK, - sizeof (struct CountersResult), GNUNET_NO }, + sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, { &slave_recv_join_decision, NULL, GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, + { &channel_recv_state_result, NULL, + GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, + sizeof (struct OperationResult), GNUNET_YES }, + + { &channel_recv_result, NULL, + GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, + sizeof (struct OperationResult), GNUNET_YES }, + { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, { NULL, NULL, 0, 0, GNUNET_NO } @@ -808,7 +1014,9 @@ void GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t announced_at, - uint64_t effective_since) + uint64_t effective_since, + GNUNET_PSYC_ResultCallback result_cb, + void *cls) { struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); @@ -817,6 +1025,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, req->announced_at = GNUNET_htonll (announced_at); req->effective_since = GNUNET_htonll (effective_since); req->did_join = GNUNET_YES; + req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); + GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); } @@ -845,7 +1055,9 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, void GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, - uint64_t announced_at) + uint64_t announced_at, + GNUNET_PSYC_ResultCallback result_cb, + void *cls) { struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); @@ -853,57 +1065,85 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, req->slave_key = *slave_key; req->announced_at = GNUNET_htonll (announced_at); req->did_join = GNUNET_NO; + req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); + GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); } /** - * Request to be told the message history of the channel. + * Request to replay a part of the message history of the channel. * * Historic messages (but NOT the state at the time) will be replayed (given to * the normal method handlers) if available and if access is permitted. * - * To get the latest message, use 0 for both the start and end message ID. - * - * @param channel Which channel should be replayed? - * @param start_message_id Earliest interesting point in history. - * @param end_message_id Last (exclusive) interesting point in history. - * @param message_cb Function to invoke on message parts received from the story. - * @param finish_cb Function to call when the requested story has been fully - * told (counting message IDs might not suffice, as some messages - * might be secret and thus the listener would not know the story is - * finished without being told explicitly) once this function - * has been called, the client must not call - * GNUNET_PSYC_channel_story_tell_cancel() anymore. - * @param cls Closure for the callbacks. - * - * @return Handle to cancel story telling operation. + * @param channel + * Which channel should be replayed? + * @param start_message_id + * Earliest interesting point in history. + * @param end_message_id + * Last (inclusive) interesting point in history. + * FIXME: @param method_prefix + * Retrieve only messages with a matching method prefix. + * @param result_cb + * Function to call when the requested history has been fully replayed. + * @param cls + * Closure for the callbacks. + * + * @return Handle to cancel history replay operation. */ -struct GNUNET_PSYC_Story * -GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, - uint64_t start_message_id, - uint64_t end_message_id, - GNUNET_PSYC_MessageCallback message_cb, - GNUNET_PSYC_MessagePartCallback message_part_cb, - GNUNET_PSYC_FinishCallback finish_cb, - void *cls) +void +GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn, + uint64_t start_message_id, + uint64_t end_message_id, + /* FIXME: const char *method_prefix, */ + GNUNET_PSYC_ResultCallback result_cb, + void *cls) { - return NULL; + struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); + req->header.size = htons (sizeof (*req)); + req->start_message_id = GNUNET_htonll (start_message_id); + req->end_message_id = GNUNET_htonll (end_message_id); + req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); + + GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); } /** - * Abort story telling. + * Request to replay the latest messages from the message history of the channel. * - * This function must not be called from within method handlers (as given to - * GNUNET_PSYC_slave_join()) of the slave. + * Historic messages (but NOT the state at the time) will be replayed (given to + * the normal method handlers) if available and if access is permitted. * - * @param story Story telling operation to stop. + * @param channel + * Which channel should be replayed? + * @param message_limit + * Maximum number of messages to replay. + * FIXME: @param method_prefix + * Retrieve only messages with a matching method prefix. + * @param result_cb + * Function to call when the requested history has been fully replayed. + * @param cls + * Closure for the callbacks. + * + * @return Handle to cancel history replay operation. */ void -GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story) +GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn, + uint64_t message_limit, + /* FIXME: const char *method_prefix, */ + GNUNET_PSYC_ResultCallback result_cb, + void *cls) { + struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); + req->header.size = htons (sizeof (*req)); + req->message_limit = GNUNET_htonll (message_limit); + req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); + GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); } @@ -914,22 +1154,35 @@ GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story) * less-specific name is matched; for example, requesting "_a_b" will match "_a" * if "_a_b" does not exist. * - * @param channel Channel handle. - * @param full_name Full name of the requested variable, the actual variable - * returned might have a shorter name.. - * @param cb Function called once when a matching state variable is found. + * @param channel + * Channel handle. + * @param full_name + * Full name of the requested variable. + * The actual variable returned might have a shorter name. + * @param var_cb + * Function called once when a matching state variable is found. * Not called if there's no matching state variable. - * @param cb_cls Closure for the callbacks. - * - * @return Handle that can be used to cancel the query operation. + * @param result_cb + * Function called after the operation finished. + * (i.e. all state variables have been returned via @a state_cb) + * @param cls + * Closure for the callbacks. */ -struct GNUNET_PSYC_StateQuery * -GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel, +void +GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn, const char *full_name, - GNUNET_PSYC_StateCallback cb, - void *cb_cls) + GNUNET_PSYC_StateVarCallback var_cb, + GNUNET_PSYC_ResultCallback result_cb, + void *cls) { - return NULL; + size_t name_size = strlen (full_name) + 1; + struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET); + req->header.size = htons (sizeof (*req) + name_size); + req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); + memcpy (&req[1], full_name, name_size); + + GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); } @@ -943,33 +1196,34 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel, * The @a state_cb is invoked on all matching state variables asynchronously, as * the state is stored in and retrieved from the PSYCstore, * - * @param channel Channel handle. - * @param name_prefix Prefix of the state variable name to match. - * @param cb Function to call with the matching state variables. - * @param cb_cls Closure for the callbacks. - * - * @return Handle that can be used to cancel the query operation. - */ -struct GNUNET_PSYC_StateQuery * -GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel, - const char *name_prefix, - GNUNET_PSYC_StateCallback cb, - void *cb_cls) -{ - return NULL; -} - - -/** - * Cancel a state query operation. - * - * @param query Handle for the operation to cancel. + * @param channel + * Channel handle. + * @param name_prefix + * Prefix of the state variable name to match. + * @param var_cb + * Function called once when a matching state variable is found. + * Not called if there's no matching state variable. + * @param result_cb + * Function called after the operation finished. + * (i.e. all state variables have been returned via @a state_cb) + * @param cls + * Closure for the callbacks. */ void -GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateQuery *query) +GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn, + const char *name_prefix, + GNUNET_PSYC_StateVarCallback var_cb, + GNUNET_PSYC_ResultCallback result_cb, + void *cls) { + size_t name_size = strlen (name_prefix) + 1; + struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET); + req->header.size = htons (sizeof (*req) + name_size); + req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); + memcpy (&req[1], name_prefix, name_size); + GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); } - /* end of psyc_api.c */ diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 5eadef62c..044895809 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c @@ -35,6 +35,7 @@ #include "gnunet_env_lib.h" #include "gnunet_psyc_util_lib.h" #include "gnunet_psyc_service.h" +#include "gnunet_core_service.h" #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) @@ -45,6 +46,9 @@ int res; const struct GNUNET_CONFIGURATION_Handle *cfg; +struct GNUNET_CORE_Handle *core; +struct GNUNET_PeerIdentity this_peer; + /** * Handle for task for timeout termination. */ @@ -53,6 +57,8 @@ GNUNET_SCHEDULER_TaskIdentifier end_badly_task; struct GNUNET_PSYC_Master *mst; struct GNUNET_PSYC_Slave *slv; +struct GNUNET_PSYC_Channel *mst_chn, *slv_chn; + struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key; struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key; @@ -80,9 +86,19 @@ uint8_t join_req_count; enum { - TEST_NONE, - TEST_SLAVE_TRANSMIT, - TEST_MASTER_TRANSMIT, + TEST_NONE = 0, + TEST_MASTER_START = 1, + TEST_SLAVE_JOIN = 2, + TEST_SLAVE_TRANSMIT = 3, + TEST_MASTER_TRANSMIT = 4, + TEST_MASTER_HISTORY_REPLAY_LATEST = 5, + TEST_SLAVE_HISTORY_REPLAY_LATEST = 6, + TEST_MASTER_HISTORY_REPLAY = 7, + TEST_SLAVE_HISTORY_REPLAY = 8, + TEST_MASTER_STATE_GET = 9, + TEST_SLAVE_STATE_GET = 10, + TEST_MASTER_STATE_GET_PREFIX = 11, + TEST_SLAVE_STATE_GET_PREFIX = 12, } test; @@ -118,6 +134,11 @@ void slave_parted (void *cls) void cleanup () { + if (NULL != core) + { + GNUNET_CORE_disconnect (core); + core = NULL; + } if (NULL != slv) { GNUNET_PSYC_slave_part (slv, GNUNET_NO, &slave_parted, NULL); @@ -176,14 +197,204 @@ end () } +void +state_get_var (void *cls, const char *name, const void *value, size_t value_size) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got state var: %s\n%.*s\n", name, value_size, value); +} + + +/*** Slave state_get_prefix() ***/ + +void +slave_state_get_prefix_result (void *cls, int64_t result, const char *err_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "slave_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); + // FIXME: GNUNET_assert (2 == result); + end (); +} + + +void +slave_state_get_prefix () +{ + test = TEST_SLAVE_STATE_GET_PREFIX; + GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", &state_get_var, + &slave_state_get_prefix_result, NULL); +} + + +/*** Master state_get_prefix() ***/ + + +void +master_state_get_prefix_result (void *cls, int64_t result, const char *err_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); + // FIXME: GNUNET_assert (2 == result); + slave_state_get_prefix (); +} + + +void +master_state_get_prefix () +{ + test = TEST_MASTER_STATE_GET_PREFIX; + GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", &state_get_var, + &master_state_get_prefix_result, NULL); +} + + +/*** Slave state_get() ***/ + + +void +slave_state_get_result (void *cls, int64_t result, const char *err_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "slave_state_get:\t%" PRId64 " (%s)\n", result, err_msg); + // FIXME: GNUNET_assert (2 == result); + master_state_get_prefix (); +} + + +void +slave_state_get () +{ + test = TEST_SLAVE_STATE_GET; + GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", &state_get_var, + &slave_state_get_result, NULL); +} + + +/*** Master state_get() ***/ + + +void +master_state_get_result (void *cls, int64_t result, const char *err_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "master_state_get:\t%" PRId64 " (%s)\n", result, err_msg); + // FIXME: GNUNET_assert (1 == result); + slave_state_get (); +} + + +void +master_state_get () +{ + test = TEST_MASTER_STATE_GET; + GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", &state_get_var, + &master_state_get_result, NULL); +} + + +/*** Slave history_replay() ***/ + +void +slave_history_replay_result (void *cls, int64_t result, const char *err_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "slave_history_replay:\t%" PRId64 " (%s)\n", result, err_msg); + GNUNET_assert (9 == result); + + master_state_get (); +} + + +void +slave_history_replay () +{ + test = TEST_SLAVE_HISTORY_REPLAY; + GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, + &slave_history_replay_result, + NULL); +} + + +/*** Master history_replay() ***/ + + +void +master_history_replay_result (void *cls, int64_t result, const char *err_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "master_history_replay:\t%" PRId64 " (%s)\n", result, err_msg); + GNUNET_assert (9 == result); + + slave_history_replay (); +} + + +void +master_history_replay () +{ + test = TEST_MASTER_HISTORY_REPLAY; + GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, + &master_history_replay_result, + NULL); +} + + +/*** Slave history_replay_latest() ***/ + + +void +slave_history_replay_latest_result (void *cls, int64_t result, const char *err_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "slave_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg); + GNUNET_assert (9 == result); + + master_history_replay (); +} + + +void +slave_history_replay_latest () +{ + test = TEST_SLAVE_HISTORY_REPLAY_LATEST; + GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, + &slave_history_replay_latest_result, + NULL); +} + + +/*** Master history_replay_latest() ***/ + + +void +master_history_replay_latest_result (void *cls, int64_t result, const char *err_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "master_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg); + GNUNET_assert (9 == result); + + slave_history_replay_latest (); +} + + +void +master_history_replay_latest () +{ + test = TEST_MASTER_HISTORY_REPLAY_LATEST; + GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, + &master_history_replay_latest_result, + NULL); +} + + void master_message_cb (void *cls, uint64_t message_id, uint32_t flags, const struct GNUNET_PSYC_MessageHeader *msg) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Master got PSYC message fragment of size %u " - "belonging to message ID %llu with flags %x\n", - ntohs (msg->header.size), message_id, flags); + "Test #%d: Master got PSYC message fragment of size %u " + "belonging to message ID %" PRIu64 " with flags %x\n", + test, ntohs (msg->header.size), message_id, flags); // FIXME } @@ -196,7 +407,7 @@ master_message_part_cb (void *cls, uint64_t message_id, if (NULL == msg) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error while receiving message %llu\n", message_id); + "Error while receiving message %" PRIu64 "\n", message_id); return; } @@ -204,9 +415,9 @@ master_message_part_cb (void *cls, uint64_t message_id, uint16_t size = ntohs (msg->size); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Master got message part of type %u and size %u " - "belonging to message ID %llu with flags %x\n", - type, size, message_id, flags); + "Test #%d: Master got message part of type %u and size %u " + "belonging to message ID %" PRIu64 " with flags %x\n", + test, type, size, message_id, flags); switch (test) { @@ -227,6 +438,18 @@ master_message_part_cb (void *cls, uint64_t message_id, case TEST_MASTER_TRANSMIT: break; + case TEST_MASTER_HISTORY_REPLAY: + case TEST_MASTER_HISTORY_REPLAY_LATEST: + if (GNUNET_PSYC_MESSAGE_HISTORIC != flags) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", + flags); + GNUNET_assert (0); + return; + } + break; + default: GNUNET_assert (0); } @@ -238,9 +461,9 @@ slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, const struct GNUNET_PSYC_MessageHeader *msg) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Slave got PSYC message fragment of size %u " - "belonging to message ID %llu with flags %x\n", - ntohs (msg->header.size), message_id, flags); + "Test #%d: Slave got PSYC message fragment of size %u " + "belonging to message ID %" PRIu64 " with flags %x\n", + test, ntohs (msg->header.size), message_id, flags); // FIXME } @@ -253,7 +476,7 @@ slave_message_part_cb (void *cls, uint64_t message_id, if (NULL == msg) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error while receiving message %llu\n", message_id); + "Error while receiving message " PRIu64 "\n", message_id); return; } @@ -261,15 +484,27 @@ slave_message_part_cb (void *cls, uint64_t message_id, uint16_t size = ntohs (msg->size); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Slave got message part of type %u and size %u " - "belonging to message ID %llu with flags %x\n", - type, size, message_id, flags); + "Test #%d: Slave got message part of type %u and size %u " + "belonging to message ID %" PRIu64 " with flags %x\n", + test, type, size, message_id, flags); switch (test) { case TEST_MASTER_TRANSMIT: if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) - end (); + master_history_replay_latest (); + break; + + case TEST_SLAVE_HISTORY_REPLAY: + case TEST_SLAVE_HISTORY_REPLAY_LATEST: + if (GNUNET_PSYC_MESSAGE_HISTORIC != flags) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", + flags); + GNUNET_assert (0); + return; + } break; default: @@ -417,7 +652,6 @@ slave_transmit () { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n"); - test = TEST_SLAVE_TRANSMIT; tmit = GNUNET_new (struct TransmitClosure); @@ -437,6 +671,29 @@ slave_transmit () } +void +slave_remove_cb (void *cls, int64_t result, const char *err_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "slave_remove:\t%" PRId64 " (%s)\n", result, err_msg); + + slave_transmit (); +} + + +void +slave_add_cb (void *cls, int64_t result, const char *err_msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "slave_add:\t%" PRId64 " (%s)\n", result, err_msg); + + struct GNUNET_PSYC_Channel *chn = cls; + GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2, + &slave_remove_cb, chn); + +} + + void join_decision_cb (void *cls, const struct GNUNET_PSYC_JoinDecisionMessage *dcsn, @@ -453,7 +710,8 @@ join_decision_cb (void *cls, return; } - slave_transmit (); + struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst); + GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn); } @@ -473,19 +731,17 @@ join_request_cb (void *cls, /* Reject first request */ int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO; GNUNET_PSYC_join_decision (jh, is_admitted, 0, NULL, NULL); - - /* Membership store */ - struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst); - GNUNET_PSYC_channel_slave_add (chn, slave_key, 2, 2); - GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2); } void -slave_connect_cb (void *cls, uint64_t max_message_id) +slave_connect_cb (void *cls, int result, uint64_t max_message_id) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Slave connected: %lu\n", max_message_id); + "Slave connected: %d, max_message_id: %" PRIu64 "\n", + result, max_message_id); + GNUNET_assert (TEST_SLAVE_JOIN == test); + GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); } @@ -493,8 +749,9 @@ void slave_join () { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n"); + test = TEST_SLAVE_JOIN; - struct GNUNET_PeerIdentity origin = {}; // FIXME: this peer + struct GNUNET_PeerIdentity origin = this_peer; struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, "_foo", "bar baz", 7); @@ -507,6 +764,7 @@ slave_join () &slave_message_cb, &slave_message_part_cb, &slave_connect_cb, &join_decision_cb, NULL, join_msg); + slv_chn = GNUNET_PSYC_slave_get_channel (slv); GNUNET_ENV_environment_destroy (env); } @@ -564,10 +822,13 @@ master_transmit () void -master_start_cb (void *cls, uint64_t max_message_id) +master_start_cb (void *cls, int result, uint64_t max_message_id) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Master started: %" PRIu64 "\n", max_message_id); + "Master started: %d, max_message_id: %" PRIu64 "\n", + result, max_message_id); + GNUNET_assert (TEST_MASTER_START == test); + GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); slave_join (); } @@ -576,10 +837,12 @@ void master_start () { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); + test = TEST_MASTER_START; mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, &master_start_cb, &join_request_cb, &master_message_cb, &master_message_part_cb, NULL); + mst_chn = GNUNET_PSYC_master_get_channel (mst); } void @@ -589,6 +852,21 @@ schedule_master_start (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } +void +core_connected (void *cls, const struct GNUNET_PeerIdentity *my_identity) +{ + this_peer = *my_identity; + +#if DEBUG_TEST_PSYC + master_start (); +#else + /* Allow some time for the services to initialize. */ + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + &schedule_master_start, NULL); +#endif + +} + /** * Main function of the test, run from scheduler. * @@ -615,14 +893,8 @@ run (void *cls, GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); GNUNET_CRYPTO_ecdsa_key_get_public (slave_key, &slave_pub_key); -#if DEBUG_TEST_PSYC - master_start (); -#else - /* Allow some time for the services to initialize. */ - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, - &schedule_master_start, NULL); -#endif - return; + core = GNUNET_CORE_connect (cfg, NULL, &core_connected, NULL, NULL, + NULL, GNUNET_NO, NULL, GNUNET_NO, NULL); } diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c index 7d27ea29b..87a2c87ab 100644 --- a/src/psycstore/gnunet-service-psycstore.c +++ b/src/psycstore/gnunet-service-psycstore.c @@ -25,6 +25,8 @@ * @author Christian Grothoff */ +#include + #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_constants.h" @@ -89,39 +91,41 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /** * Send a result code back to the client. * - * @param client Client that should receive the result code. - * @param result_code Code to transmit. - * @param op_id Operation ID. - * @param err_msg Error message to include (or NULL for none). + * @param client + * Client that should receive the result code. + * @param result_code + * Code to transmit. + * @param op_id + * Operation ID in network byte order. + * @param err_msg + * Error message to include (or NULL for none). */ static void -send_result_code (struct GNUNET_SERVER_Client *client, uint32_t result_code, - uint32_t op_id, const char *err_msg) +send_result_code (struct GNUNET_SERVER_Client *client, uint64_t op_id, + int64_t result_code, const char *err_msg) { struct OperationResult *res; - size_t err_len; + size_t err_len = 0; // FIXME: maximum length - if (NULL == err_msg) - err_len = 0; - else + if (NULL != err_msg) err_len = strlen (err_msg) + 1; res = GNUNET_malloc (sizeof (struct OperationResult) + err_len); res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE); res->header.size = htons (sizeof (struct OperationResult) + err_len); - res->result_code = htonl (result_code); + res->result_code = GNUNET_htonll (result_code - INT64_MIN); res->op_id = op_id; if (0 < err_len) memcpy (&res[1], err_msg, err_len); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending result %d (%s) to client\n", - (int) result_code, - err_msg); + "Sending result to client: %" PRId64 " (%s)\n", + result_code, err_msg); GNUNET_SERVER_notification_context_add (nc, client); GNUNET_SERVER_notification_context_unicast (nc, client, &res->header, GNUNET_NO); GNUNET_free (res); } + enum { MEMBERSHIP_TEST_NOT_NEEDED = 0, @@ -129,6 +133,7 @@ enum MEMBERSHIP_TEST_DONE = 2, } MessageMembershipTest; + struct SendClosure { struct GNUNET_SERVER_Client *client; @@ -158,7 +163,6 @@ struct SendClosure * @see enum MessageMembershipTest */ uint8_t membership_test; - }; @@ -214,6 +218,8 @@ send_state_var (void *cls, const char *name, struct StateResult *res; size_t name_size = strlen (name) + 1; + /* FIXME: split up value into 64k chunks */ + res = GNUNET_malloc (sizeof (struct StateResult) + name_size + value_size); res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE); res->header.size = htons (sizeof (struct StateResult) + name_size + value_size); @@ -249,7 +255,7 @@ handle_membership_store (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Failed to store membership information!\n")); - send_result_code (client, ret, req->op_id, NULL); + send_result_code (client, req->op_id, ret, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -274,7 +280,7 @@ handle_membership_test (void *cls, _("Failed to test membership!\n")); } - send_result_code (client, ret, req->op_id, NULL); + send_result_code (client, req->op_id, ret, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -295,7 +301,7 @@ handle_fragment_store (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Failed to store fragment!\n")); - send_result_code (client, ret, req->op_id, NULL); + send_result_code (client, req->op_id, ret, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -312,9 +318,20 @@ handle_fragment_get (void *cls, .channel_key = req->channel_key, .slave_key = req->slave_key, .membership_test = req->do_membership_test }; - int ret = db->fragment_get (db->cls, &req->channel_key, - GNUNET_ntohll (req->fragment_id), - &send_fragment, &sc); + int64_t ret; + uint64_t ret_frags = 0; + uint64_t first_fragment_id = GNUNET_ntohll (req->first_fragment_id); + uint64_t last_fragment_id = GNUNET_ntohll (req->last_fragment_id); + uint64_t limit = GNUNET_ntohll (req->fragment_limit); + + if (0 == limit) + ret = db->fragment_get (db->cls, &req->channel_key, + first_fragment_id, last_fragment_id, + &ret_frags, &send_fragment, &sc); + else + ret = db->fragment_get_latest (db->cls, &req->channel_key, limit, + &ret_frags, &send_fragment, &sc); + switch (ret) { case GNUNET_YES: @@ -340,8 +357,7 @@ handle_fragment_get (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Failed to get fragment!\n")); } - - send_result_code (client, ret, req->op_id, NULL); + send_result_code (client, req->op_id, (ret < 0) ? ret : ret_frags, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -358,22 +374,31 @@ handle_message_get (void *cls, .channel_key = req->channel_key, .slave_key = req->slave_key, .membership_test = req->do_membership_test }; + int64_t ret; uint64_t ret_frags = 0; - int64_t ret = db->message_get (db->cls, &req->channel_key, - GNUNET_ntohll (req->message_id), - &ret_frags, &send_fragment, &sc); + uint64_t first_message_id = GNUNET_ntohll (req->first_message_id); + uint64_t last_message_id = GNUNET_ntohll (req->last_message_id); + uint64_t limit = GNUNET_ntohll (req->message_limit); + + if (0 == limit) + ret = db->message_get (db->cls, &req->channel_key, + first_message_id, last_message_id, + &ret_frags, &send_fragment, &sc); + else + ret = db->message_get_latest (db->cls, &req->channel_key, limit, + &ret_frags, &send_fragment, &sc); + switch (ret) { case GNUNET_YES: case GNUNET_NO: break; default: - ret_frags = ret; GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Failed to get message!\n")); } - send_result_code (client, ret_frags, req->op_id, NULL); + send_result_code (client, req->op_id, (ret < 0) ? ret : ret_frags, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -404,7 +429,7 @@ handle_message_get_fragment (void *cls, _("Failed to get message fragment!\n")); } - send_result_code (client, ret, req->op_id, NULL); + send_result_code (client, req->op_id, ret, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -434,7 +459,7 @@ handle_counters_get (void *cls, res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS); res.header.size = htons (sizeof (res)); - res.result_code = htonl (ret); + res.result_code = htonl (ret - INT32_MIN); res.op_id = req->op_id; res.max_fragment_id = GNUNET_htonll (res.max_fragment_id); res.max_message_id = GNUNET_htonll (res.max_message_id); @@ -517,7 +542,7 @@ handle_state_modify (void *cls, _("Failed to end modifying state!\n")); } } - send_result_code (client, ret, req->op_id, NULL); + send_result_code (client, req->op_id, ret, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -571,7 +596,7 @@ handle_state_sync (void *cls, _("Failed to end synchronizing state!\n")); } } - send_result_code (client, ret, req->op_id, NULL); + send_result_code (client, req->op_id, ret, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -590,7 +615,7 @@ handle_state_reset (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Failed to reset state!\n")); - send_result_code (client, ret, req->op_id, NULL); + send_result_code (client, req->op_id, ret, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -609,7 +634,7 @@ handle_state_hash_update (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Failed to reset state!\n")); - send_result_code (client, ret, req->op_id, NULL); + send_result_code (client, req->op_id, ret, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -660,7 +685,7 @@ handle_state_get (void *cls, _("Failed to get state variable!\n")); } - send_result_code (client, ret, req->op_id, NULL); + send_result_code (client, req->op_id, ret, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -699,7 +724,7 @@ handle_state_get_prefix (void *cls, _("Failed to get state variable!\n")); } - send_result_code (client, ret, req->op_id, NULL); + send_result_code (client, req->op_id, ret, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); } diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c index 86b969c5d..cb6c5c437 100644 --- a/src/psycstore/plugin_psycstore_sqlite.c +++ b/src/psycstore/plugin_psycstore_sqlite.c @@ -130,12 +130,22 @@ struct Plugin /** * Precompiled SQL for fragment_get() */ - sqlite3_stmt *select_fragment; + sqlite3_stmt *select_fragments; + + /** + * Precompiled SQL for fragment_get() + */ + sqlite3_stmt *select_latest_fragments; /** * Precompiled SQL for message_get() */ - sqlite3_stmt *select_message; + sqlite3_stmt *select_messages; + + /** + * Precompiled SQL for message_get() + */ + sqlite3_stmt *select_latest_messages; /** * Precompiled SQL for message_get_fragment() @@ -456,8 +466,8 @@ database_setup (struct Plugin *plugin) " multicast_flags, psycstore_flags, data\n" "FROM messages\n" "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" - " AND fragment_id = ?;", - &plugin->select_fragment); + " AND ? <= fragment_id AND fragment_id <= ?;", + &plugin->select_fragments); sql_prepare (plugin->dbh, "SELECT hop_counter, signature, purpose, fragment_id,\n" @@ -465,8 +475,35 @@ database_setup (struct Plugin *plugin) " multicast_flags, psycstore_flags, data\n" "FROM messages\n" "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" - " AND message_id = ? AND fragment_offset = ?;", - &plugin->select_message_fragment); + " AND ? <= message_id AND message_id <= ?;", + &plugin->select_messages); + + sql_prepare (plugin->dbh, + "SELECT * FROM\n" + "(SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + " FROM messages\n" + " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " ORDER BY fragment_id DESC\n" + " LIMIT ?)\n" + "ORDER BY fragment_id;", + &plugin->select_latest_fragments); + + sql_prepare (plugin->dbh, + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND message_id IN\n" + " (SELECT message_id\n" + " FROM messages\n" + " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " ORDER BY message_id\n" + " DESC LIMIT ?)\n" + "ORDER BY fragment_id;", + &plugin->select_latest_messages); sql_prepare (plugin->dbh, "SELECT hop_counter, signature, purpose, fragment_id,\n" @@ -474,8 +511,8 @@ database_setup (struct Plugin *plugin) " multicast_flags, psycstore_flags, data\n" "FROM messages\n" "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" - " AND message_id = ?;", - &plugin->select_message); + " AND message_id = ? AND fragment_offset = ?;", + &plugin->select_message_fragment); sql_prepare (plugin->dbh, "SELECT fragment_id, message_id, group_generation\n" @@ -1036,8 +1073,42 @@ fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb, return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8)); } + +static int +fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls) +{ + int ret = GNUNET_SYSERR; + int sql_ret; + + do + { + sql_ret = sqlite3_step (stmt); + switch (sql_ret) + { + case SQLITE_DONE: + if (ret != GNUNET_OK) + ret = GNUNET_NO; + break; + case SQLITE_ROW: + ret = fragment_row (stmt, cb, cb_cls); + (*returned_fragments)++; + if (ret != GNUNET_YES) + sql_ret = SQLITE_DONE; + break; + default: + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + } + while (sql_ret == SQLITE_ROW); + + return ret; +} + /** - * Retrieve a message fragment by fragment ID. + * Retrieve a message fragment range by fragment ID. * * @see GNUNET_PSYCSTORE_fragment_get() * @@ -1046,36 +1117,29 @@ fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb, static int fragment_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - uint64_t fragment_id, + uint64_t first_fragment_id, + uint64_t last_fragment_id, + uint64_t *returned_fragments, GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls) { struct Plugin *plugin = cls; - sqlite3_stmt *stmt = plugin->select_fragment; + sqlite3_stmt *stmt = plugin->select_fragments; int ret = GNUNET_SYSERR; + *returned_fragments = 0; if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, sizeof (*channel_key), SQLITE_STATIC) - || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_id)) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id)) { LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind"); } else { - switch (sqlite3_step (stmt)) - { - case SQLITE_DONE: - ret = GNUNET_NO; - break; - case SQLITE_ROW: - ret = fragment_row (stmt, cb, cb_cls); - break; - default: - LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, - "sqlite3_step"); - } + ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls); } if (SQLITE_OK != sqlite3_reset (stmt)) @@ -1087,8 +1151,52 @@ fragment_get (void *cls, return ret; } + /** - * Retrieve all fragments of a message. + * Retrieve a message fragment range by fragment ID. + * + * @see GNUNET_PSYCSTORE_fragment_get_latest() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +fragment_get_latest (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t fragment_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->select_latest_fragments; + int ret = GNUNET_SYSERR; + *returned_fragments = 0; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), + SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls); + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + + return ret; +} + + +/** + * Retrieve all fragments of a message ID range. * * @see GNUNET_PSYCSTORE_message_get() * @@ -1097,48 +1205,29 @@ fragment_get (void *cls, static int message_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, - uint64_t message_id, + uint64_t first_message_id, + uint64_t last_message_id, uint64_t *returned_fragments, GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls) { struct Plugin *plugin = cls; - sqlite3_stmt *stmt = plugin->select_message; + sqlite3_stmt *stmt = plugin->select_messages; int ret = GNUNET_SYSERR; *returned_fragments = 0; if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, sizeof (*channel_key), SQLITE_STATIC) - || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id)) { LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind"); } else { - int sql_ret; - do - { - sql_ret = sqlite3_step (stmt); - switch (sql_ret) - { - case SQLITE_DONE: - if (ret != GNUNET_OK) - ret = GNUNET_NO; - break; - case SQLITE_ROW: - ret = fragment_row (stmt, cb, cb_cls); - (*returned_fragments)++; - if (ret != GNUNET_YES) - sql_ret = SQLITE_DONE; - break; - default: - LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, - "sqlite3_step"); - } - } - while (sql_ret == SQLITE_ROW); + ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls); } if (SQLITE_OK != sqlite3_reset (stmt)) @@ -1150,6 +1239,53 @@ message_get (void *cls, return ret; } + +/** + * Retrieve all fragments of the latest messages. + * + * @see GNUNET_PSYCSTORE_message_get_latest() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_get_latest (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->select_latest_messages; + int ret = GNUNET_SYSERR; + *returned_fragments = 0; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), + SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key, + sizeof (*channel_key), + SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls); + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + + return ret; +} + + /** * Retrieve a fragment of message specified by its message ID and fragment * offset. @@ -1777,7 +1913,9 @@ libgnunet_plugin_psycstore_sqlite_init (void *cls) api->fragment_store = &fragment_store; api->message_add_flags = &message_add_flags; api->fragment_get = &fragment_get; + api->fragment_get_latest = &fragment_get_latest; api->message_get = &message_get; + api->message_get_latest = &message_get_latest; api->message_get_fragment = &message_get_fragment; api->counters_message_get = &counters_message_get; api->counters_state_get = &counters_state_get; diff --git a/src/psycstore/psycstore.h b/src/psycstore/psycstore.h index 17905f422..e6a82848a 100644 --- a/src/psycstore/psycstore.h +++ b/src/psycstore/psycstore.h @@ -42,15 +42,17 @@ struct OperationResult */ struct GNUNET_MessageHeader header; + uint32_t reserved GNUNET_PACKED; + /** * Operation ID. */ - uint32_t op_id GNUNET_PACKED; + uint64_t op_id GNUNET_PACKED; /** * Status code for the operation. */ - int64_t result_code GNUNET_PACKED; + uint64_t result_code GNUNET_PACKED; /* followed by 0-terminated error message (on error) */ @@ -69,10 +71,18 @@ struct CountersResult */ struct GNUNET_MessageHeader header; + /** + * Status code for the operation: + * #GNUNET_OK: success, counter values are returned. + * #GNUNET_NO: no message has been sent to the channel yet. + * #GNUNET_SYSERR: an error occurred. + */ + uint32_t result_code GNUNET_PACKED; + /** * Operation ID. */ - uint32_t op_id GNUNET_PACKED; + uint64_t op_id GNUNET_PACKED; uint64_t max_fragment_id GNUNET_PACKED; @@ -81,14 +91,6 @@ struct CountersResult uint64_t max_group_generation GNUNET_PACKED; uint64_t max_state_message_id GNUNET_PACKED; - - /** - * Status code for the operation: - * #GNUNET_OK: success, counter values are returned. - * #GNUNET_NO: no message has been sent to the channel yet. - * #GNUNET_SYSERR: an error occurred. - */ - int32_t result_code GNUNET_PACKED; }; @@ -102,15 +104,14 @@ struct FragmentResult */ struct GNUNET_MessageHeader header; + uint32_t psycstore_flags GNUNET_PACKED; + /** * Operation ID. */ - uint32_t op_id GNUNET_PACKED; - - uint32_t psycstore_flags GNUNET_PACKED; - - /* followed by GNUNET_MULTICAST_MessageHeader */ + uint64_t op_id GNUNET_PACKED; + /* Followed by GNUNET_MULTICAST_MessageHeader */ }; @@ -124,14 +125,16 @@ struct StateResult */ struct GNUNET_MessageHeader header; + uint16_t name_size GNUNET_PACKED; + + uint16_t reserved GNUNET_PACKED; + /** * Operation ID. */ - uint32_t op_id GNUNET_PACKED; - - uint16_t name_size GNUNET_PACKED; + uint64_t op_id GNUNET_PACKED; - /* followed by name and value */ + /* Followed by name and value */ }; @@ -142,13 +145,14 @@ struct OperationRequest { struct GNUNET_MessageHeader header; + uint32_t reserved GNUNET_PACKED; + /** * Operation ID. */ - uint32_t op_id GNUNET_PACKED; + uint64_t op_id GNUNET_PACKED; struct GNUNET_CRYPTO_EddsaPublicKey channel_key; - }; @@ -162,10 +166,12 @@ struct MembershipStoreRequest */ struct GNUNET_MessageHeader header; + uint32_t reserved GNUNET_PACKED; + /** * Operation ID. */ - uint32_t op_id GNUNET_PACKED; + uint64_t op_id GNUNET_PACKED; /** * Channel's public key. @@ -177,9 +183,9 @@ struct MembershipStoreRequest */ struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; - uint64_t announced_at; - uint64_t effective_since; - uint64_t group_generation; + uint64_t announced_at GNUNET_PACKED; + uint64_t effective_since GNUNET_PACKED; + uint64_t group_generation GNUNET_PACKED; uint8_t did_join; }; @@ -194,10 +200,12 @@ struct MembershipTestRequest */ struct GNUNET_MessageHeader header; + uint32_t reserved GNUNET_PACKED; + /** * Operation ID. */ - uint32_t op_id GNUNET_PACKED; + uint64_t op_id GNUNET_PACKED; /** * Channel's public key. @@ -226,9 +234,9 @@ struct FragmentStoreRequest struct GNUNET_MessageHeader header; /** - * Operation ID. + * enum GNUNET_PSYCSTORE_MessageFlags */ - uint32_t op_id GNUNET_PACKED; + uint32_t psycstore_flags GNUNET_PACKED; /** * Channel's public key. @@ -236,9 +244,9 @@ struct FragmentStoreRequest struct GNUNET_CRYPTO_EddsaPublicKey channel_key; /** - * enum GNUNET_PSYCSTORE_MessageFlags + * Operation ID. */ - uint32_t psycstore_flags GNUNET_PACKED; + uint64_t op_id; /* Followed by fragment */ }; @@ -254,10 +262,12 @@ struct FragmentGetRequest */ struct GNUNET_MessageHeader header; + uint32_t reserved GNUNET_PACKED; + /** * Operation ID. */ - uint32_t op_id GNUNET_PACKED; + uint64_t op_id GNUNET_PACKED; /** * Channel's public key. @@ -270,9 +280,19 @@ struct FragmentGetRequest struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; /** - * Fragment ID to request. + * First fragment ID to request. */ - uint64_t fragment_id GNUNET_PACKED; + uint64_t first_fragment_id GNUNET_PACKED; + + /** + * Last fragment ID to request. + */ + uint64_t last_fragment_id GNUNET_PACKED; + + /** + * Maximum number of fragments to retrieve. + */ + uint64_t fragment_limit GNUNET_PACKED; /** * Do membership test with @a slave_key before returning fragment? @@ -292,10 +312,12 @@ struct MessageGetRequest */ struct GNUNET_MessageHeader header; + uint32_t reserved GNUNET_PACKED; + /** * Operation ID. */ - uint32_t op_id GNUNET_PACKED; + uint64_t op_id GNUNET_PACKED; /** * Channel's public key. @@ -308,9 +330,19 @@ struct MessageGetRequest struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; /** - * Message ID to request. + * First message ID to request. */ - uint64_t message_id GNUNET_PACKED; + uint64_t first_message_id GNUNET_PACKED; + + /** + * Last message ID to request. + */ + uint64_t last_message_id GNUNET_PACKED; + + /** + * Maximum number of messages to retrieve. + */ + uint64_t message_limit GNUNET_PACKED; /** * Do membership test with @a slave_key before returning fragment? @@ -330,10 +362,12 @@ struct MessageGetFragmentRequest */ struct GNUNET_MessageHeader header; + uint32_t reserved GNUNET_PACKED; + /** * Operation ID. */ - uint32_t op_id GNUNET_PACKED; + uint64_t op_id GNUNET_PACKED; /** * Channel's public key. @@ -373,10 +407,12 @@ struct StateHashUpdateRequest */ struct GNUNET_MessageHeader header; + uint32_t reserved GNUNET_PACKED; + /** * Operation ID. */ - uint32_t op_id GNUNET_PACKED; + uint64_t op_id GNUNET_PACKED; /** * Channel's public key. @@ -404,20 +440,6 @@ struct StateModifyRequest */ struct GNUNET_MessageHeader header; - /** - * Operation ID. - */ - uint32_t op_id GNUNET_PACKED; - - /** - * Channel's public key. - */ - struct GNUNET_CRYPTO_EddsaPublicKey channel_key; - - uint64_t message_id GNUNET_PACKED; - - uint64_t state_delta GNUNET_PACKED; - /** * Size of name, including NUL terminator. */ @@ -433,6 +455,20 @@ struct StateModifyRequest */ uint8_t oper; + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + /** + * Channel's public key. + */ + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; + + uint64_t message_id GNUNET_PACKED; + + uint64_t state_delta GNUNET_PACKED; + /* Followed by NUL-terminated name, then the value. */ }; @@ -448,26 +484,28 @@ struct StateSyncRequest struct GNUNET_MessageHeader header; /** - * Operation ID. + * Size of name, including NUL terminator. */ - uint32_t op_id GNUNET_PACKED; + uint16_t name_size GNUNET_PACKED; /** - * Channel's public key. + * OR'd StateOpFlags */ - struct GNUNET_CRYPTO_EddsaPublicKey channel_key; + uint8_t flags; + + uint8_t reserved; uint64_t message_id GNUNET_PACKED; /** - * Size of name, including NUL terminator. + * Operation ID. */ - uint16_t name_size GNUNET_PACKED; + uint64_t op_id GNUNET_PACKED; /** - * OR'd StateOpFlags + * Channel's public key. */ - uint8_t flags; + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; /* Followed by NUL-terminated name, then the value. */ }; diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c index 9df55888d..9ef1fb61a 100644 --- a/src/psycstore/psycstore_api.c +++ b/src/psycstore/psycstore_api.c @@ -25,6 +25,8 @@ * @author Christian Grothoff */ +#include + #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_constants.h" @@ -76,7 +78,7 @@ struct GNUNET_PSYCSTORE_OperationHandle /** * Operation ID. */ - uint32_t op_id; + uint64_t op_id; /** * Message to send to the PSYCstore service. @@ -137,15 +139,14 @@ struct GNUNET_PSYCSTORE_Handle struct GNUNET_TIME_Relative reconnect_delay; /** - * Are we polling for incoming messages right now? + * Last operation ID used. */ - int in_receive; + uint64_t last_op_id; /** - * The last operation id used for a PSYCstore operation. + * Are we polling for incoming messages right now? */ - uint32_t last_op_id_used; - + uint8_t in_receive; }; @@ -155,10 +156,10 @@ struct GNUNET_PSYCSTORE_Handle * @param h Handle to the PSYCstore service. * @return next operation id to use */ -static uint32_t +static uint64_t get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h) { - return h->last_op_id_used++; + return h->last_op_id++; } @@ -168,7 +169,7 @@ get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h) * @return OperationHandle if found, or NULL otherwise. */ static struct GNUNET_PSYCSTORE_OperationHandle * -find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint32_t op_id) +find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint64_t op_id) { struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head; while (NULL != op) @@ -284,19 +285,20 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) if (size == sizeof (struct OperationResult)) str = NULL; - op = find_op_by_id (h, ntohl (opres->op_id)); + op = find_op_by_id (h, GNUNET_ntohll (opres->op_id)); if (NULL == op) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %ld.\n", - type, ntohl (opres->op_id)); + "No callback registered for operation with ID %" PRIu64 ".\n", + type, GNUNET_ntohll (opres->op_id)); } else { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received result message (type %d) with operation ID: %ld\n", + "Received result message (type %d) with operation ID: %" PRIu64 "\n", type, op->op_id); + int64_t result_code = GNUNET_ntohll (opres->result_code) + INT64_MIN; GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); if (NULL != op->res_cb) { @@ -307,19 +309,19 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY: smreq = (const struct StateModifyRequest *) op->msg; if (!(smreq->flags & STATE_OP_LAST - || GNUNET_OK != ntohl (opres->result_code))) + || GNUNET_OK != result_code)) op->res_cb = NULL; break; case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC: ssreq = (const struct StateSyncRequest *) op->msg; if (!(ssreq->flags & STATE_OP_LAST - || GNUNET_OK != ntohl (opres->result_code))) + || GNUNET_OK != result_code)) op->res_cb = NULL; break; } } if (NULL != op->res_cb) - op->res_cb (op->cls, ntohl (opres->result_code), str); + op->res_cb (op->cls, result_code, str); GNUNET_free (op); } break; @@ -338,19 +340,20 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) cres = (const struct CountersResult *) msg; - op = find_op_by_id (h, ntohl (cres->op_id)); + op = find_op_by_id (h, GNUNET_ntohll (cres->op_id)); if (NULL == op) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %ld.\n", - type, ntohl (cres->op_id)); + "No callback registered for operation with ID %" PRIu64 ".\n", + type, GNUNET_ntohll (cres->op_id)); } else { GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op); if (NULL != op->data_cb) ((GNUNET_PSYCSTORE_CountersCallback) - op->data_cb) (op->cls, ntohl (cres->result_code), + op->data_cb) (op->cls, + ntohl (cres->result_code) + INT32_MIN, GNUNET_ntohll (cres->max_fragment_id), GNUNET_ntohll (cres->max_message_id), GNUNET_ntohll (cres->max_group_generation), @@ -386,12 +389,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) return; } - op = find_op_by_id (h, ntohl (fres->op_id)); + op = find_op_by_id (h, GNUNET_ntohll (fres->op_id)); if (NULL == op) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %ld.\n", - type, ntohl (fres->op_id)); + "No callback registered for operation with ID %" PRIu64 ".\n", + type, GNUNET_ntohll (fres->op_id)); } else { @@ -427,12 +430,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) return; } - op = find_op_by_id (h, ntohl (sres->op_id)); + op = find_op_by_id (h, GNUNET_ntohll (sres->op_id)); if (NULL == op) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "No callback registered for operation with ID %ld.\n", - type, ntohl (sres->op_id)); + "No callback registered for operation with ID %" PRIu64 ".\n", + type, GNUNET_ntohll (sres->op_id)); } else { @@ -479,7 +482,7 @@ send_next_message (void *cls, size_t size, void *buf) return 0; } LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending message of type %d to PSYCstore service. ID: %u\n", + "Sending message of type %d to PSYCstore service. ID: %" PRIu64 "\n", ntohs (op->msg->type), op->op_id); memcpy (buf, op->msg, ret); @@ -682,8 +685,8 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, : effective_since == 0); struct MembershipStoreRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); op->h = h; op->res_cb = rcb; op->cls = rcb_cls; @@ -700,7 +703,7 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, req->group_generation = GNUNET_htonll (group_generation); op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); @@ -746,8 +749,8 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, void *rcb_cls) { struct MembershipTestRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); op->h = h; op->res_cb = rcb; op->cls = rcb_cls; @@ -762,7 +765,7 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, req->group_generation = GNUNET_htonll (group_generation); op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); @@ -794,8 +797,8 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, { uint16_t size = ntohs (msg->header.size); struct FragmentStoreRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size); + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size); op->h = h; op->res_cb = rcb; op->cls = rcb_cls; @@ -809,7 +812,7 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, memcpy (&req[1], msg, size); op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); @@ -819,7 +822,7 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, /** - * Retrieve a message fragment by fragment ID. + * Retrieve message fragments by fragment ID range. * * @param h * Handle for the PSYCstore. @@ -829,9 +832,15 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, * The slave requesting the fragment. If not NULL, a membership test is * performed first and the fragment is only returned if the slave has * access to it. - * @param fragment_id - * Fragment ID to retrieve. Use 0 to get the latest message fragment. - * @param fcb + * @param first_fragment_id + * First fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param last_fragment_id + * Last consecutive fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param fragment_limit + * Maximum number of fragments to retrieve. + * @param fragment_cb * Callback to call with the retrieved fragments. * @param rcb * Callback to call with the result of the operation. @@ -844,16 +853,85 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, - uint64_t fragment_id, - GNUNET_PSYCSTORE_FragmentCallback fcb, + uint64_t first_fragment_id, + uint64_t last_fragment_id, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, GNUNET_PSYCSTORE_ResultCallback rcb, void *cls) { struct FragmentGetRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + op->h = h; + op->data_cb = (DataCallback) fragment_cb; + op->res_cb = rcb; + op->cls = cls; + + req = (struct FragmentGetRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); + req->header.size = htons (sizeof (*req)); + req->channel_key = *channel_key; + req->first_fragment_id = GNUNET_htonll (first_fragment_id); + req->last_fragment_id = GNUNET_htonll (last_fragment_id); + if (NULL != slave_key) + { + req->slave_key = *slave_key; + req->do_membership_test = GNUNET_YES; + } + + op->op_id = get_next_op_id (h); + req->op_id = GNUNET_htonll (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; +} + + +/** + * Retrieve latest message fragments. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the fragment. If not NULL, a membership test is + * performed first and the fragment is only returned if the slave has + * access to it. + * @param first_fragment_id + * First fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param last_fragment_id + * Last consecutive fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param fragment_limit + * Maximum number of fragments to retrieve. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param rcb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t fragment_limit, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *cls) +{ + struct FragmentGetRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); op->h = h; - op->data_cb = (DataCallback) fcb; + op->data_cb = (DataCallback) fragment_cb; op->res_cb = rcb; op->cls = cls; @@ -862,7 +940,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); req->header.size = htons (sizeof (*req)); req->channel_key = *channel_key; - req->fragment_id = GNUNET_htonll (fragment_id); + req->fragment_limit = GNUNET_ntohll (fragment_limit); if (NULL != slave_key) { req->slave_key = *slave_key; @@ -870,7 +948,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, } op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); @@ -880,7 +958,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, /** - * Retrieve all fragments of a message. + * Retrieve all fragments of messages in a message ID range. * * @param h * Handle for the PSYCstore. @@ -890,9 +968,13 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, * The slave requesting the message. If not NULL, a membership test is * performed first and the message is only returned if the slave has * access to it. - * @param message_id - * Message ID to retrieve. Use 0 to get the latest message. - * @param fcb + * @param first_message_id + * First message ID to retrieve. + * Use 0 to get the latest message. + * @param last_message_id + * Last consecutive message ID to retrieve. + * Use 0 to get the latest message. + * @param fragment_cb * Callback to call with the retrieved fragments. * @param rcb * Callback to call with the result of the operation. @@ -905,16 +987,17 @@ struct GNUNET_PSYCSTORE_OperationHandle * GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, - uint64_t message_id, - GNUNET_PSYCSTORE_FragmentCallback fcb, + uint64_t first_message_id, + uint64_t last_message_id, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, GNUNET_PSYCSTORE_ResultCallback rcb, void *cls) { struct MessageGetRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); op->h = h; - op->data_cb = (DataCallback) fcb; + op->data_cb = (DataCallback) fragment_cb; op->res_cb = rcb; op->cls = cls; @@ -923,7 +1006,8 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); req->header.size = htons (sizeof (*req)); req->channel_key = *channel_key; - req->message_id = GNUNET_htonll (message_id); + req->first_message_id = GNUNET_htonll (first_message_id); + req->last_message_id = GNUNET_htonll (last_message_id); if (NULL != slave_key) { req->slave_key = *slave_key; @@ -931,7 +1015,68 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, } op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); + + GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); + transmit_next (h); + + return op; +} + + +/** + * Retrieve all fragments of the latest messages. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the message. If not NULL, a membership test is + * performed first and the message is only returned if the slave has + * access to it. + * @param message_limit + * Maximum number of messages to retrieve. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param rcb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t message_limit, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback rcb, + void *cls) +{ + struct MessageGetRequest *req; + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + op->h = h; + op->data_cb = (DataCallback) fragment_cb; + op->res_cb = rcb; + op->cls = cls; + + req = (struct MessageGetRequest *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) req; + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); + req->header.size = htons (sizeof (*req)); + req->channel_key = *channel_key; + req->message_limit = GNUNET_ntohll (message_limit); + if (NULL != slave_key) + { + req->slave_key = *slave_key; + req->do_membership_test = GNUNET_YES; + } + + op->op_id = get_next_op_id (h); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); @@ -956,9 +1101,9 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, * Message ID to retrieve. Use 0 to get the latest message. * @param fragment_offset * Offset of the fragment to retrieve. - * @param fcb + * @param fragment_cb * Callback to call with the retrieved fragments. - * @param rcb + * @param result_cb * Callback to call with the result of the operation. * @param cls * Closure for the callbacks. @@ -971,15 +1116,15 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t message_id, uint64_t fragment_offset, - GNUNET_PSYCSTORE_FragmentCallback fcb, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, GNUNET_PSYCSTORE_ResultCallback rcb, void *cls) { struct MessageGetFragmentRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); op->h = h; - op->data_cb = (DataCallback) fcb; + op->data_cb = (DataCallback) fragment_cb; op->res_cb = rcb; op->cls = cls; @@ -997,7 +1142,7 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, } op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); @@ -1026,8 +1171,8 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, void *ccb_cls) { struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); op->h = h; op->data_cb = ccb; op->cls = ccb_cls; @@ -1039,7 +1184,7 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, req->channel_key = *channel_key; op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); @@ -1109,7 +1254,7 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h, memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); @@ -1175,7 +1320,7 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); @@ -1204,8 +1349,8 @@ GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, void *rcb_cls) { struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); op->h = h; op->res_cb = rcb; op->cls = rcb_cls; @@ -1217,7 +1362,7 @@ GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, req->channel_key = *channel_key; op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); @@ -1247,8 +1392,8 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, void *rcb_cls) { struct StateHashUpdateRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req)); + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); op->h = h; op->res_cb = rcb; op->cls = rcb_cls; @@ -1261,7 +1406,7 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, req->hash = *hash; op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); @@ -1292,8 +1437,8 @@ GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, { size_t name_size = strlen (name) + 1; struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); op->h = h; op->data_cb = (DataCallback) scb; op->res_cb = rcb; @@ -1307,7 +1452,7 @@ GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, memcpy (&req[1], name, name_size); op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); @@ -1339,8 +1484,8 @@ GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h, { size_t name_size = strlen (name_prefix) + 1; struct OperationRequest *req; - struct GNUNET_PSYCSTORE_OperationHandle *op - = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size); op->h = h; op->data_cb = (DataCallback) scb; op->res_cb = rcb; @@ -1354,7 +1499,7 @@ GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h, memcpy (&req[1], name_prefix, name_size); op->op_id = get_next_op_id (h); - req->op_id = htonl (op->op_id); + req->op_id = GNUNET_htonll (op->op_id); GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); transmit_next (h); diff --git a/src/psycstore/test_plugin_psycstore.c b/src/psycstore/test_plugin_psycstore.c index 8267ddba8..a6c456fec 100644 --- a/src/psycstore/test_plugin_psycstore.c +++ b/src/psycstore/test_plugin_psycstore.c @@ -204,11 +204,17 @@ run (void *cls, char *const *args, const char *cfgfile, msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); msg->header.size = htons (sizeof (*msg) + sizeof (channel_pub_key)); + uint64_t fragment_id = INT64_MAX - 1; + msg->fragment_id = GNUNET_htonll (fragment_id); + + uint64_t message_id = INT64_MAX - 10; + msg->message_id = GNUNET_htonll (message_id); + + uint64_t group_generation = INT64_MAX - 3; + msg->group_generation = GNUNET_htonll (group_generation); + msg->hop_counter = htonl (9); - msg->fragment_id = GNUNET_htonll (INT64_MAX - 1); msg->fragment_offset = GNUNET_htonll (0); - msg->message_id = GNUNET_htonll (INT64_MAX - 10); - msg->group_generation = GNUNET_htonll (INT64_MAX - 3); msg->flags = htonl (GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT); memcpy (&msg[1], &channel_pub_key, sizeof (channel_pub_key)); @@ -225,14 +231,19 @@ run (void *cls, char *const *args, const char *cfgfile, fcls.msg[0] = msg; fcls.flags[0] = GNUNET_PSYCSTORE_MESSAGE_STATE; - GNUNET_assert (GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg, - fcls.flags[0])); + GNUNET_assert ( + GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg, + fcls.flags[0])); - GNUNET_assert (GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key, - GNUNET_ntohll (msg->fragment_id), - fragment_cb, &fcls)); + uint64_t ret_frags = 0; + GNUNET_assert ( + GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key, + fragment_id, fragment_id, + &ret_frags, fragment_cb, &fcls)); GNUNET_assert (fcls.n == 1); + // FIXME: test fragment_get_latest and message_get_latest + fcls.n = 0; GNUNET_assert ( @@ -250,9 +261,10 @@ run (void *cls, char *const *args, const char *cfgfile, fcls.n = 0; fcls.flags[0] |= GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED; - GNUNET_assert (GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key, - GNUNET_ntohll (msg->fragment_id), - fragment_cb, &fcls)); + GNUNET_assert ( + GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key, + fragment_id, fragment_id, + &ret_frags, fragment_cb, &fcls)); GNUNET_assert (fcls.n == 1); struct GNUNET_MULTICAST_MessageHeader *msg1 @@ -270,15 +282,17 @@ run (void *cls, char *const *args, const char *cfgfile, GNUNET_assert (GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg1, fcls.flags[1])); - uint64_t retfrags = 0; - GNUNET_assert (GNUNET_OK == db->message_get (db->cls, &channel_pub_key, - GNUNET_ntohll (msg->message_id), - &retfrags, fragment_cb, &fcls)); - GNUNET_assert (fcls.n == 2 && retfrags == 2); + GNUNET_assert ( + GNUNET_OK == db->message_get (db->cls, &channel_pub_key, + message_id, message_id, + &ret_frags, fragment_cb, &fcls)); + GNUNET_assert (fcls.n == 2 && ret_frags == 2); /* Message counters */ - uint64_t fragment_id = 0, message_id = 0, group_generation = 0; + fragment_id = 0; + message_id = 0; + group_generation = 0; GNUNET_assert ( GNUNET_OK == db->counters_message_get (db->cls, &channel_pub_key, &fragment_id, &message_id, diff --git a/src/psycstore/test_psycstore.c b/src/psycstore/test_psycstore.c index 3ef2439e3..125e64f58 100644 --- a/src/psycstore/test_psycstore.c +++ b/src/psycstore/test_psycstore.c @@ -25,6 +25,8 @@ * @author Christian Grothoff */ +#include + #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_common.h" @@ -302,19 +304,22 @@ fragment_result (void *cls, enum GNUNET_PSYCSTORE_MessageFlags flags) { struct FragmentClosure *fcls = cls; + GNUNET_assert (fcls->n < fcls->n_expected); struct GNUNET_MULTICAST_MessageHeader *msg0 = fcls->msg[fcls->n]; uint64_t flags0 = fcls->flags[fcls->n++]; if (flags == flags0 && msg->header.size == msg0->header.size && 0 == memcmp (msg, msg0, ntohs (msg->header.size))) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " fragment %llu matches\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " fragment %" PRIu64 " matches\n", GNUNET_ntohll (msg->fragment_id)); return GNUNET_YES; } else { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " fragment %llu differs\n", + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + " fragment differs: expected %" PRIu64 ", got %" PRIu64 "\n", + GNUNET_ntohll (msg0->fragment_id), GNUNET_ntohll (msg->fragment_id)); GNUNET_assert (0); return GNUNET_SYSERR; @@ -323,13 +328,12 @@ fragment_result (void *cls, void -message_get_result (void *cls, int64_t result, const char *err_msg) +message_get_latest_result (void *cls, int64_t result, const char *err_msg) { struct FragmentClosure *fcls = cls; op = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get:\t%d\n", result); - GNUNET_assert (result > 0 && fcls->n && fcls->n_expected); - + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_latest:\t%d\n", result); + GNUNET_assert (0 < result && fcls->n == fcls->n_expected); modifiers[0] = (struct GNUNET_ENV_Modifier) { .oper = '=', @@ -350,30 +354,47 @@ message_get_result (void *cls, int64_t result, const char *err_msg) } +void +message_get_result (void *cls, int64_t result, const char *err_msg) +{ + struct FragmentClosure *fcls = cls; + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get:\t%d\n", result); + GNUNET_assert (0 < result && fcls->n == fcls->n_expected); + + fcls->n = 0; + fcls->n_expected = 3; + op = GNUNET_PSYCSTORE_message_get_latest (h, &channel_pub_key, &slave_pub_key, + 1, &fragment_result, + &message_get_latest_result, fcls); +} + + void message_get_fragment_result (void *cls, int64_t result, const char *err_msg) { struct FragmentClosure *fcls = cls; op = NULL; GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_fragment:\t%d\n", result); - GNUNET_assert (result > 0 && fcls->n && fcls->n_expected); + GNUNET_assert (0 < result && fcls->n == fcls->n_expected); fcls->n = 0; fcls->n_expected = 3; + uint64_t message_id = GNUNET_ntohll (fcls->msg[0]->message_id); op = GNUNET_PSYCSTORE_message_get (h, &channel_pub_key, &slave_pub_key, - GNUNET_ntohll (fcls->msg[0]->message_id), + message_id, message_id, &fragment_result, &message_get_result, fcls); } void -fragment_get_result (void *cls, int64_t result, const char *err_msg) +fragment_get_latest_result (void *cls, int64_t result, const char *err_msg) { struct FragmentClosure *fcls = cls; op = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_get:\t%d\n", result); - GNUNET_assert (result > 0 && fcls->n && fcls->n_expected); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_get_latest:\t%d\n", result); + GNUNET_assert (0 < result && fcls->n == fcls->n_expected); fcls->n = 1; fcls->n_expected = 2; @@ -381,9 +402,24 @@ fragment_get_result (void *cls, int64_t result, const char *err_msg) GNUNET_ntohll (fcls->msg[1]->message_id), GNUNET_ntohll (fcls->msg[1]->fragment_offset), &fragment_result, - &message_get_fragment_result, - fcls); + &message_get_fragment_result, fcls); +} + +void +fragment_get_result (void *cls, int64_t result, const char *err_msg) +{ + struct FragmentClosure *fcls = cls; + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_get:\t%d\n", result); + GNUNET_assert (0 < result && fcls->n == fcls->n_expected); + + fcls->n = 0; + fcls->n_expected = 3; + op = GNUNET_PSYCSTORE_fragment_get_latest (h, &channel_pub_key, + &slave_pub_key, fcls->n_expected, + &fragment_result, + &fragment_get_latest_result, fcls); } @@ -398,8 +434,9 @@ fragment_store_result (void *cls, int64_t result, const char *err_msg) { /* last fragment */ fcls.n = 0; fcls.n_expected = 1; + uint64_t fragment_id = GNUNET_ntohll (fcls.msg[0]->fragment_id); op = GNUNET_PSYCSTORE_fragment_get (h, &channel_pub_key, &slave_pub_key, - GNUNET_ntohll (fcls.msg[0]->fragment_id), + fragment_id, fragment_id, &fragment_result, &fragment_get_result, &fcls); } diff --git a/src/social/gnunet-service-social.c b/src/social/gnunet-service-social.c index c75589811..d297a153b 100644 --- a/src/social/gnunet-service-social.c +++ b/src/social/gnunet-service-social.c @@ -467,17 +467,17 @@ client_send_msg (const struct Place *plc, * Called after a PSYC master is started. */ static void -psyc_master_started (void *cls, uint64_t max_message_id) +psyc_master_started (void *cls, int result, uint64_t max_message_id) { struct Host *hst = cls; struct Place *plc = &hst->plc; plc->max_message_id = max_message_id; plc->is_ready = GNUNET_YES; - struct CountersResult res; + struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK); res.header.size = htons (sizeof (res)); - res.result_code = htonl (GNUNET_OK); + res.result_code = htonl (result - INT32_MIN); res.max_message_id = GNUNET_htonll (plc->max_message_id); client_send_msg (plc, &res.header); @@ -507,17 +507,17 @@ psyc_recv_join_request (void *cls, * Called after a PSYC slave is connected. */ static void -psyc_slave_connected (void *cls, uint64_t max_message_id) +psyc_slave_connected (void *cls, int result, uint64_t max_message_id) { struct Guest *gst = cls; struct Place *plc = &gst->plc; plc->max_message_id = max_message_id; plc->is_ready = GNUNET_YES; - struct CountersResult res; + struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK); res.header.size = htons (sizeof (res)); - res.result_code = htonl (GNUNET_OK); + res.result_code = htonl (result - INT32_MIN); res.max_message_id = GNUNET_htonll (plc->max_message_id); client_send_msg (plc, &res.header); @@ -608,7 +608,7 @@ client_recv_host_enter (void *cls, struct GNUNET_SERVER_Client *client, { plc = &hst->plc; - struct CountersResult res; + struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK); res.header.size = htons (sizeof (res)); res.result_code = htonl (GNUNET_OK); @@ -724,7 +724,7 @@ client_recv_guest_enter (void *cls, struct GNUNET_SERVER_Client *client, { plc = &gst->plc; - struct CountersResult res; + struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK); res.header.size = htons (sizeof (res)); res.result_code = htonl (GNUNET_OK); diff --git a/src/social/social.h b/src/social/social.h index 00edaefd1..5de1e5e80 100644 --- a/src/social/social.h +++ b/src/social/social.h @@ -88,25 +88,6 @@ struct GuestEnterRequest /**** service -> library ****/ -struct CountersResult -{ - /** - * Type: GNUNET_MESSAGE_TYPE_PSYC_RESULT_COUNTERS - */ - struct GNUNET_MessageHeader header; - - /** - * Status code for the operation. - */ - int32_t result_code GNUNET_PACKED; - - /** - * Last message ID sent to the channel. - */ - uint64_t max_message_id; -}; - - #if REMOVE struct NymEnterRequest { diff --git a/src/social/social_api.c b/src/social/social_api.c index 76fd0f9f9..dfcf18883 100644 --- a/src/social/social_api.c +++ b/src/social/social_api.c @@ -640,8 +640,9 @@ host_recv_enter_ack (void *cls, struct GNUNET_PSYC_CountersResultMessage * cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; + int32_t result = ntohl (cres->result_code) + INT32_MIN; if (NULL != hst->enter_cb) - hst->enter_cb (hst->cb_cls, GNUNET_ntohll (cres->max_message_id)); + hst->enter_cb (hst->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); } @@ -704,9 +705,9 @@ guest_recv_enter_ack (void *cls, struct GNUNET_PSYC_CountersResultMessage * cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; + int32_t result = ntohl (cres->result_code) + INT32_MIN; if (NULL != gst->enter_cb) - gst->enter_cb (gst->cb_cls, ntohl (cres->result_code), - GNUNET_ntohll (cres->max_message_id)); + gst->enter_cb (gst->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); } @@ -734,7 +735,7 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler host_handlers[] = { { &host_recv_enter_ack, NULL, GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK, - sizeof (struct CountersResult), GNUNET_NO }, + sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, { &host_recv_enter_request, NULL, GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, @@ -758,7 +759,7 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler guest_handlers[] = { { &guest_recv_enter_ack, NULL, GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK, - sizeof (struct CountersResult), GNUNET_NO }, + sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, { &host_recv_enter_request, NULL, GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, @@ -1598,18 +1599,22 @@ struct GNUNET_SOCIAL_WatchHandle; /** * Watch a place for changed objects. * - * @param place Place to watch. - * @param object_filter Object prefix to match. - * @param state_cb Function to call when an object/state changes. - * @param state_cb_cls Closure for callback. + * @param place + * Place to watch. + * @param object_filter + * Object prefix to match. + * @param state_var_cb + * Function to call when an object/state var changes. + * @param cls + * Closure for callback. * * @return Handle that can be used to cancel watching. */ struct GNUNET_SOCIAL_WatchHandle * GNUNET_SOCIAL_place_watch (struct GNUNET_SOCIAL_Place *place, const char *object_filter, - GNUNET_PSYC_StateCallback state_cb, - void *state_cb_cls) + GNUNET_PSYC_StateVarCallback state_var_cb, + void *cls) { return NULL; } @@ -1633,18 +1638,22 @@ struct GNUNET_SOCIAL_LookHandle; /** * Look at objects in the place with a matching name prefix. * - * @param place The place to look its objects at. - * @param name_prefix Look at objects with names beginning with this value. - * @param state_cb Function to call for each object found. - * @param state_cb_cls Closure for callback function. + * @param place + * The place to look its objects at. + * @param name_prefix + * Look at objects with names beginning with this value. + * @param state_var_cb + * Function to call for each object found. + * @param cls + * Closure for callback function. * * @return Handle that can be used to stop looking at objects. */ struct GNUNET_SOCIAL_LookHandle * GNUNET_SOCIAL_place_look (struct GNUNET_SOCIAL_Place *place, const char *name_prefix, - GNUNET_PSYC_StateCallback state_cb, - void *state_cb_cls) + GNUNET_PSYC_StateVarCallback state_var_cb, + void *cls) { return NULL; } -- cgit v1.2.3