summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-07-30 21:18:13 +0000
committerGabor X Toth <*@tg-x.net>2014-07-30 21:18:13 +0000
commit40884377f3126bbecbfd3243d47224b3094914f9 (patch)
tree9f32aab9064b199178282a0c9918313e0aa30049
parent831718fa44b2c56577aa4e36a479fef6debb8cea (diff)
psyc, psycstore: retrieve state and history
-rw-r--r--src/include/gnunet_protocols.h4
-rw-r--r--src/include/gnunet_psyc_service.h226
-rw-r--r--src/include/gnunet_psycstore_plugin.h41
-rw-r--r--src/include/gnunet_psycstore_service.h146
-rw-r--r--src/include/gnunet_social_service.h46
-rw-r--r--src/psyc/Makefile.am2
-rw-r--r--src/psyc/gnunet-service-psyc.c272
-rw-r--r--src/psyc/psyc.h57
-rw-r--r--src/psyc/psyc_api.c414
-rw-r--r--src/psyc/test_psyc.c348
-rw-r--r--src/psycstore/gnunet-service-psycstore.c97
-rw-r--r--src/psycstore/plugin_psycstore_sqlite.c238
-rw-r--r--src/psycstore/psycstore.h160
-rw-r--r--src/psycstore/psycstore_api.c309
-rw-r--r--src/psycstore/test_plugin_psycstore.c48
-rw-r--r--src/psycstore/test_psycstore.c65
-rw-r--r--src/social/gnunet-service-social.c16
-rw-r--r--src/social/social.h19
-rw-r--r--src/social/social_api.c43
19 files changed, 1938 insertions, 613 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 5626e1e5e..ea295197c 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2187,10 +2187,10 @@ extern "C"
/* 700 */
/** C->S: client requests channel history from PSYCstore. */
-#define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701
+#define GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY 701
/** S->C: result for a channel history request */
-#define GNUNET_MESSAGE_TYPE_PSYC_STORY_RESULT 702
+#define GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT 702
/** C->S: request best matching state variable from PSYCstore. */
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h
index 7097c46a8..25b405dad 100644
--- a/src/include/gnunet_psyc_service.h
+++ b/src/include/gnunet_psyc_service.h
@@ -160,6 +160,11 @@ enum GNUNET_PSYC_Policy
enum GNUNET_PSYC_MessageFlags
{
/**
+ * Default / no flags.
+ */
+ GNUNET_PSYC_MESSAGE_DEFAULT = 0,
+
+ /**
* Historic message, retrieved from PSYCstore.
*/
GNUNET_PSYC_MESSAGE_HISTORIC = 1 << 0,
@@ -314,12 +319,12 @@ struct GNUNET_PSYC_CountersResultMessage
/**
* Status code for the operation.
*/
- int32_t result_code GNUNET_PACKED;
+ uint32_t result_code GNUNET_PACKED;
/**
* Last message ID sent to the channel.
*/
- uint64_t max_message_id;
+ uint64_t max_message_id GNUNET_PACKED;
};
@@ -503,13 +508,24 @@ struct GNUNET_PSYC_Master;
/**
- * Function called after the channel master started.
+ * Function called after connected to the PSYC service
+ * and the channel master started.
*
- * @param cls Closure.
- * @param max_message_id Last message ID sent to the channel.
+ * Also called when reconnected to the service
+ * after the connection closed unexpectedly.
+ *
+ * @param cls
+ * Closure.
+ * @param result
+ * #GNUNET_YES if there were already messages sent to the channel,
+ * #GNUNET_NO if the message history is empty,
+ * #GNUNET_SYSERR on error.
+ * @param max_message_id
+ * Last message ID sent to the channel.
*/
typedef void
-(*GNUNET_PSYC_MasterStartCallback) (void *cls, uint64_t max_message_id);
+(*GNUNET_PSYC_MasterStartCallback) (void *cls, int result,
+ uint64_t max_message_id);
/**
@@ -720,11 +736,21 @@ struct GNUNET_PSYC_Slave;
/**
* Function called after the slave connected to the PSYC service.
*
- * @param cls Closure.
- * @param max_message_id Last message ID sent to the channel.
+ * Also called when reconnected to the service
+ * after the connection closed unexpectedly.
+ *
+ * @param cls
+ * Closure.
+ * @param result
+ * #GNUNET_YES if there were already messages sent to the channel,
+ * #GNUNET_NO if the message history is empty,
+ * #GNUNET_SYSERR on error.
+ * @param max_message_id
+ * Last message ID sent to the channel.
*/
typedef void
-(*GNUNET_PSYC_SlaveConnectCallback) (void *cls, uint64_t max_message_id);
+(*GNUNET_PSYC_SlaveConnectCallback) (void *cls, int result,
+ uint64_t max_message_id);
/**
@@ -876,6 +902,23 @@ struct GNUNET_PSYC_Channel;
/**
+ * Function called with the result of an asynchronous operation.
+ *
+ * @param cls
+ * Closure.
+ * @param result
+ * Result of the operation.
+ * Usually one of #GNUNET_OK, #GNUNET_YES, #GNUNET_NO, or #GNUNET_SYSERR.
+ * @param err_msg
+ * Error message.
+ */
+typedef void
+(*GNUNET_PSYC_ResultCallback) (void *cls,
+ int64_t result,
+ const char *err_msg);
+
+
+/**
* Convert a channel @a master to a @e channel handle to access the @e channel
* APIs.
*
@@ -921,7 +964,9 @@ void
GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
uint64_t announced_at,
- uint64_t effective_since);
+ uint64_t effective_since,
+ GNUNET_PSYC_ResultCallback result_cb,
+ void *cls);
/**
@@ -949,7 +994,9 @@ void
GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
const struct GNUNET_CRYPTO_EcdsaPublicKey
*slave_key,
- uint64_t announced_at);
+ uint64_t announced_at,
+ GNUNET_PSYC_ResultCallback result_cb,
+ void *cls);
/**
@@ -962,74 +1009,69 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
* @param value_size Number of bytes in @a value.
*/
typedef void
-(*GNUNET_PSYC_StateCallback) (void *cls,
- const char *name,
- const void *value,
- size_t value_size);
+(*GNUNET_PSYC_StateVarCallback) (void *cls,
+ const char *name,
+ const void *value,
+ size_t value_size);
/**
- * Function called when a requested operation has finished.
- *
- * @param cls Closure.
- */
-typedef void
-(*GNUNET_PSYC_FinishCallback) (void *cls);
-
-
-/**
- * Handle to a story telling operation.
- */
-struct GNUNET_PSYC_Story;
-
-
-/**
- * Request to be told the message history of the channel.
+ * Request to replay a part of the message history of the channel.
*
* Historic messages (but NOT the state at the time) will be replayed (given to
* the normal method handlers) if available and if access is permitted.
*
- * To get the latest message, use 0 for both the start and end message ID.
+ * @param channel
+ * Which channel should be replayed?
+ * @param start_message_id
+ * Earliest interesting point in history.
+ * @param end_message_id
+ * Last (inclusive) interesting point in history.
+ * @param finish_cb
+ * Function to call when the requested history has been fully replayed
+ * (counting message IDs might not suffice, as some messages might be
+ * secret and thus the listener would not know the story is finished
+ * without being told explicitly)o once this function has been called, the
+ * client must not call GNUNET_PSYC_channel_history_replay_cancel() anymore.
+ * @param cls
+ * Closure for the callbacks.
*
- * @param channel Which channel should be replayed?
- * @param start_message_id Earliest interesting point in history.
- * @param end_message_id Last (exclusive) interesting point in history.
- * @param message_cb Function to invoke on message parts received from the story.
- * @param finish_cb Function to call when the requested story has been fully
- * told (counting message IDs might not suffice, as some messages
- * might be secret and thus the listener would not know the story is
- * finished without being told explicitly); once this function
- * has been called, the client must not call
- * GNUNET_PSYC_channel_story_tell_cancel() anymore.
- * @param cls Closure for the callbacks.
- * @return Handle to cancel story telling operation.
+ * @return Handle to cancel history replay operation.
*/
-struct GNUNET_PSYC_Story *
-GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel,
- uint64_t start_message_id,
- uint64_t end_message_id,
- GNUNET_PSYC_MessageCallback message_cb,
- GNUNET_PSYC_MessagePartCallback message_part_cb,
- GNUNET_PSYC_FinishCallback finish_cb,
- void *cls);
+void
+GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *channel,
+ uint64_t start_message_id,
+ uint64_t end_message_id,
+ GNUNET_PSYC_ResultCallback finish_cb,
+ void *cls);
/**
- * Abort story telling.
+ * Request to replay the latest messages from the message history of the channel.
+ *
+ * Historic messages (but NOT the state at the time) will be replayed (given to
+ * the normal method handlers) if available and if access is permitted.
*
- * This function must not be called from within method handlers (as given to
- * GNUNET_PSYC_slave_join()) of the slave.
+ * @param channel
+ * Which channel should be replayed?
+ * @param message_limit
+ * Maximum number of messages to replay.
+ * @param finish_cb
+ * Function to call when the requested history has been fully replayed
+ * (counting message IDs might not suffice, as some messages might be
+ * secret and thus the listener would not know the story is finished
+ * without being told explicitly)o once this function has been called, the
+ * client must not call GNUNET_PSYC_channel_history_replay_cancel() anymore.
+ * @param cls
+ * Closure for the callbacks.
*
- * @param story Story telling operation to stop.
+ * @return Handle to cancel history replay operation.
*/
void
-GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story);
-
-
-/**
- * Handle for a state query operation.
- */
-struct GNUNET_PSYC_StateQuery;
+GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *channel,
+ uint64_t message_limit,
+ GNUNET_PSYC_ResultCallback finish_cb,
+ void *cls);
/**
@@ -1039,19 +1081,26 @@ struct GNUNET_PSYC_StateQuery;
* less-specific name is matched; for example, requesting "_a_b" will match "_a"
* if "_a_b" does not exist.
*
- * @param channel Channel handle.
- * @param full_name Full name of the requested variable, the actual variable
- * returned might have a shorter name..
- * @param cb Function called once when a matching state variable is found.
+ * @param channel
+ * Channel handle.
+ * @param full_name
+ * Full name of the requested variable.
+ * The actual variable returned might have a shorter name.
+ * @param var_cb
+ * Function called once when a matching state variable is found.
* Not called if there's no matching state variable.
- * @param cb_cls Closure for the callbacks.
- * @return Handle that can be used to cancel the query operation.
+ * @param result_cb
+ * Function called after the operation finished.
+ * (i.e. all state variables have been returned via @a state_cb)
+ * @param cls
+ * Closure for the callbacks.
*/
-struct GNUNET_PSYC_StateQuery *
+void
GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
const char *full_name,
- GNUNET_PSYC_StateCallback cb,
- void *cb_cls);
+ GNUNET_PSYC_StateVarCallback var_cb,
+ GNUNET_PSYC_ResultCallback result_cb,
+ void *cls);
/**
@@ -1064,26 +1113,25 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
* The @a state_cb is invoked on all matching state variables asynchronously, as
* the state is stored in and retrieved from the PSYCstore,
*
- * @param channel Channel handle.
- * @param name_prefix Prefix of the state variable name to match.
- * @param cb Function to call with the matching state variables.
- * @param cb_cls Closure for the callbacks.
- * @return Handle that can be used to cancel the query operation.
+ * @param channel
+ * Channel handle.
+ * @param name_prefix
+ * Prefix of the state variable name to match.
+ * @param var_cb
+ * Function called once when a matching state variable is found.
+ * Not called if there's no matching state variable.
+ * @param result_cb
+ * Function called after the operation finished.
+ * (i.e. all state variables have been returned via @a state_cb)
+ * @param cls
+ * Closure for the callbacks.
*/
-struct GNUNET_PSYC_StateQuery *
+void
GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel,
const char *name_prefix,
- GNUNET_PSYC_StateCallback cb,
- void *cb_cls);
-
-
-/**
- * Cancel a state query operation.
- *
- * @param query Handle for the operation to cancel.
- */
-void
-GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateQuery *query);
+ GNUNET_PSYC_StateVarCallback var_cb,
+ GNUNET_PSYC_ResultCallback result_cb,
+ void *cls);
#if 0 /* keep Emacsens' auto-indent happy */
diff --git a/src/include/gnunet_psycstore_plugin.h b/src/include/gnunet_psycstore_plugin.h
index 1945b400e..3f02759f6 100644
--- a/src/include/gnunet_psycstore_plugin.h
+++ b/src/include/gnunet_psycstore_plugin.h
@@ -112,7 +112,7 @@ struct GNUNET_PSYCSTORE_PluginFunctions
uint64_t psycstore_flags);
/**
- * Retrieve a message fragment by fragment ID.
+ * Retrieve a message fragment range by fragment ID.
*
* @see GNUNET_PSYCSTORE_fragment_get()
*
@@ -121,12 +121,29 @@ struct GNUNET_PSYCSTORE_PluginFunctions
int
(*fragment_get) (void *cls,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
- uint64_t fragment_id,
+ uint64_t first_fragment_id,
+ uint64_t last_fragment_id,
+ uint64_t *returned_fragments,
GNUNET_PSYCSTORE_FragmentCallback cb,
void *cb_cls);
/**
- * Retrieve all fragments of a message.
+ * Retrieve latest message fragments.
+ *
+ * @see GNUNET_PSYCSTORE_fragment_get()
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+ int
+ (*fragment_get_latest) (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ uint64_t fragment_limit,
+ uint64_t *returned_fragments,
+ GNUNET_PSYCSTORE_FragmentCallback cb,
+ void *cb_cls);
+
+ /**
+ * Retrieve all fragments of a message ID range.
*
* @see GNUNET_PSYCSTORE_message_get()
*
@@ -135,12 +152,28 @@ struct GNUNET_PSYCSTORE_PluginFunctions
int
(*message_get) (void *cls,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
- uint64_t message_id,
+ uint64_t first_fragment_id,
+ uint64_t last_fragment_id,
uint64_t *returned_fragments,
GNUNET_PSYCSTORE_FragmentCallback cb,
void *cb_cls);
/**
+ * Retrieve all fragments of the latest messages.
+ *
+ * @see GNUNET_PSYCSTORE_message_get()
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+ int
+ (*message_get_latest) (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ uint64_t fragment_limit,
+ uint64_t *returned_fragments,
+ GNUNET_PSYCSTORE_FragmentCallback cb,
+ void *cb_cls);
+
+ /**
* Retrieve a fragment of message specified by its message ID and fragment
* offset.
*
diff --git a/src/include/gnunet_psycstore_service.h b/src/include/gnunet_psycstore_service.h
index 84d69c24d..78d016bb3 100644
--- a/src/include/gnunet_psycstore_service.h
+++ b/src/include/gnunet_psycstore_service.h
@@ -107,9 +107,10 @@ struct GNUNET_PSYCSTORE_OperationHandle;
/**
* Function called with the result of an asynchronous operation.
*
- * @param result #GNUNET_SYSERR on error,
+ * @param result
* #GNUNET_YES on success or if the peer was a member,
- * #GNUNET_NO if the peer was not a member
+ * #GNUNET_NO if the peer was not a member,
+ * #GNUNET_SYSERR on error,
*/
typedef void
(*GNUNET_PSYCSTORE_ResultCallback) (void *cls,
@@ -235,7 +236,7 @@ typedef int
/**
- * Retrieve a message fragment by fragment ID.
+ * Retrieve message fragments by fragment ID range.
*
* @param h
* Handle for the PSYCstore.
@@ -245,11 +246,15 @@ typedef int
* The slave requesting the fragment. If not NULL, a membership test is
* performed first and the fragment is only returned if the slave has
* access to it.
- * @param fragment_id
- * Fragment ID to retrieve. Use 0 to get the latest message fragment.
- * @param fcb
+ * @param first_fragment_id
+ * First fragment ID to retrieve.
+ * Use 0 to get the latest message fragment.
+ * @param last_fragment_id
+ * Last consecutive fragment ID to retrieve.
+ * Use 0 to get the latest message fragment.
+ * @param fragment_cb
* Callback to call with the retrieved fragments.
- * @param rcb
+ * @param result_cb
* Callback to call with the result of the operation.
* @param cls
* Closure for the callbacks.
@@ -260,14 +265,53 @@ struct GNUNET_PSYCSTORE_OperationHandle *
GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
- uint64_t fragment_id,
- GNUNET_PSYCSTORE_FragmentCallback fcb,
- GNUNET_PSYCSTORE_ResultCallback rcb,
+ uint64_t first_message_id,
+ uint64_t last_message_id,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
+ GNUNET_PSYCSTORE_ResultCallback result_cb,
void *cls);
/**
- * Retrieve all fragments of a message.
+ * Retrieve latest message fragments.
+ *
+ * @param h
+ * Handle for the PSYCstore.
+ * @param channel_key
+ * The channel we are interested in.
+ * @param slave_key
+ * The slave requesting the fragment. If not NULL, a membership test is
+ * performed first and the fragment is only returned if the slave has
+ * access to it.
+ * @param first_fragment_id
+ * First fragment ID to retrieve.
+ * Use 0 to get the latest message fragment.
+ * @param last_fragment_id
+ * Last consecutive fragment ID to retrieve.
+ * Use 0 to get the latest message fragment.
+ * @param fragment_limit
+ * Maximum number of fragments to retrieve.
+ * @param fragment_cb
+ * Callback to call with the retrieved fragments.
+ * @param rcb
+ * Callback to call with the result of the operation.
+ * @param cls
+ * Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ uint64_t fragment_limit,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls);
+
+
+/**
+ * Retrieve all fragments of messages in a message ID range.
*
* @param h
* Handle for the PSYCstore.
@@ -277,11 +321,15 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
* The slave requesting the message. If not NULL, a membership test is
* performed first and the message is only returned if the slave has
* access to it.
- * @param message_id
- * Message ID to retrieve. Use 0 to get the latest message.
- * @param fcb
+ * @param first_message_id
+ * First message ID to retrieve.
+ * Use 0 to get the latest message.
+ * @param last_message_id
+ * Last consecutive message ID to retrieve.
+ * Use 0 to get the latest message.
+ * @param fragment_cb
* Callback to call with the retrieved fragments.
- * @param rcb
+ * @param result_cb
* Callback to call with the result of the operation.
* @param cls
* Closure for the callbacks.
@@ -292,23 +340,67 @@ struct GNUNET_PSYCSTORE_OperationHandle *
GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
- uint64_t message_id,
- GNUNET_PSYCSTORE_FragmentCallback fcb,
- GNUNET_PSYCSTORE_ResultCallback rcb,
+ uint64_t first_message_id,
+ uint64_t last_message_id,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
+ GNUNET_PSYCSTORE_ResultCallback result_cb,
void *cls);
/**
+ * Retrieve all fragments of the latest messages.
+ *
+ * @param h
+ * Handle for the PSYCstore.
+ * @param channel_key
+ * The channel we are interested in.
+ * @param slave_key
+ * The slave requesting the message. If not NULL, a membership test is
+ * performed first and the message is only returned if the slave has
+ * access to it.
+ * @param message_limit
+ * Maximum number of messages to retrieve.
+ * @param fragment_cb
+ * Callback to call with the retrieved fragments.
+ * @param rcb
+ * Callback to call with the result of the operation.
+ * @param cls
+ * Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ uint64_t message_limit,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls);
+
+
+/**
* Retrieve a fragment of message specified by its message ID and fragment
* offset.
*
- * @param h Handle for the PSYCstore.
- * @param channel_key The channel we are interested in.
- * @param message_id Message ID to check. Use 0 to get the latest message.
- * @param fragment_offset Offset of the fragment to retrieve.
- * @param fcb Callback to call with the retrieved fragments.
- * @param rcb Callback to call with the result of the operation.
- * @param cls Closure for the callbacks.
+ * @param h
+ * Handle for the PSYCstore.
+ * @param channel_key
+ * The channel we are interested in.
+ * @param slave_key
+ * The slave requesting the message fragment. If not NULL, a membership
+ * test is performed first and the message fragment is only returned
+ * if the slave has access to it.
+ * @param message_id
+ * Message ID to retrieve. Use 0 to get the latest message.
+ * @param fragment_offset
+ * Offset of the fragment to retrieve.
+ * @param fragment_cb
+ * Callback to call with the retrieved fragments.
+ * @param result_cb
+ * Callback to call with the result of the operation.
+ * @param cls
+ * Closure for the callbacks.
*
* @return Handle that can be used to cancel the operation.
*/
@@ -318,8 +410,8 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
uint64_t message_id,
uint64_t fragment_offset,
- GNUNET_PSYCSTORE_FragmentCallback fcb,
- GNUNET_PSYCSTORE_ResultCallback rcb,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
+ GNUNET_PSYCSTORE_ResultCallback result_cb,
void *cls);
diff --git a/src/include/gnunet_social_service.h b/src/include/gnunet_social_service.h
index ca1578820..66427a072 100644
--- a/src/include/gnunet_social_service.h
+++ b/src/include/gnunet_social_service.h
@@ -287,12 +287,18 @@ typedef void
/**
* Function called after the host entered the place.
*
- * @param cls Closure.
- * @param max_message_id Last message ID sent to the channel.
- * Or 0 if no messages have been sent to the place yet.
+ * @param cls
+ * Closure.
+ * @param result
+ * #GNUNET_OK on success, or
+ * #GNUNET_SYSERR on error.
+ * @param max_message_id
+ * Last message ID sent to the channel.
+ * Or 0 if no messages have been sent to the place yet.
*/
typedef void
-(*GNUNET_SOCIAL_HostEnterCallback) (void *cls, uint64_t max_message_id);
+(*GNUNET_SOCIAL_HostEnterCallback) (void *cls, int result,
+ uint64_t max_message_id);
/**
@@ -793,18 +799,22 @@ struct GNUNET_SOCIAL_WatchHandle;
/**
* Watch a place for changed objects.
*
- * @param place Place to watch.
- * @param object_filter Object prefix to match.
- * @param state_cb Function to call when an object/state changes.
- * @param state_cb_cls Closure for callback.
+ * @param place
+ * Place to watch.
+ * @param object_filter
+ * Object prefix to match.
+ * @param state_var_cb
+ * Function to call when an object/state var changes.
+ * @param cls
+ * Closure for callback.
*
* @return Handle that can be used to cancel watching.
*/
struct GNUNET_SOCIAL_WatchHandle *
GNUNET_SOCIAL_place_watch (struct GNUNET_SOCIAL_Place *place,
const char *object_filter,
- GNUNET_PSYC_StateCallback state_cb,
- void *state_cb_cls);
+ GNUNET_PSYC_StateVarCallback state_var_cb,
+ void *cls);
/**
@@ -822,18 +832,22 @@ struct GNUNET_SOCIAL_LookHandle;
/**
* Look at objects in the place with a matching name prefix.
*
- * @param place The place to look its objects at.
- * @param name_prefix Look at objects with names beginning with this value.
- * @param state_cb Function to call for each object found.
- * @param state_cb_cls Closure for callback function.
+ * @param place
+ * The place to look its objects at.
+ * @param name_prefix
+ * Look at objects with names beginning with this value.
+ * @param state_var_cb
+ * Function to call for each object found.
+ * @param cls
+ * Closure for callback function.
*
* @return Handle that can be used to stop looking at objects.
*/
struct GNUNET_SOCIAL_LookHandle *
GNUNET_SOCIAL_place_look (struct GNUNET_SOCIAL_Place *place,
const char *name_prefix,
- GNUNET_PSYC_StateCallback state_cb,
- void *state_cb_cls);
+ GNUNET_PSYC_StateVarCallback state_var_cb,
+ void *cls);
/**
diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am
index b18605ab9..fb4341751 100644
--- a/src/psyc/Makefile.am
+++ b/src/psyc/Makefile.am
@@ -86,12 +86,14 @@ test_psyc_LDADD = \
libgnunetpsyc.la \
libgnunetpsycutil.la \
$(top_builddir)/src/testing/libgnunettesting.la \
+ $(top_builddir)/src/core/libgnunetcore.la \
$(top_builddir)/src/env/libgnunetenv.la \
$(top_builddir)/src/util/libgnunetutil.la
test_psyc_DEPENDENCIES = \
libgnunetpsyc.la \
libgnunetpsycutil.la \
$(top_builddir)/src/testing/libgnunettesting.la \
+ $(top_builddir)/src/core/libgnunetcore.la \
$(top_builddir)/src/env/libgnunetenv.la \
$(top_builddir)/src/util/libgnunetutil.la
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index e7020bc69..3adc34d2a 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -402,6 +402,14 @@ struct Slave
};
+struct OperationClosure
+{
+ struct GNUNET_SERVER_Client *client;
+ struct Channel *chn;
+ uint64_t op_id;
+};
+
+
static void
transmit_message (struct Channel *chn);
@@ -585,6 +593,46 @@ client_send_msg (const struct Channel *chn,
/**
+ * Send a result code back to the client.
+ *
+ * @param client
+ * Client that should receive the result code.
+ * @param result_code
+ * Code to transmit.
+ * @param op_id
+ * Operation ID in network byte order.
+ * @param err_msg
+ * Error message to include (or NULL for none).
+ */
+static void
+client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
+ int64_t result_code, const char *err_msg)
+{
+ struct OperationResult *res;
+ size_t err_len = 0; // FIXME: maximum length
+
+ if (NULL != err_msg)
+ err_len = strlen (err_msg) + 1;
+ res = GNUNET_malloc (sizeof (struct OperationResult) + err_len);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
+ res->header.size = htons (sizeof (struct OperationResult) + err_len);
+ res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1);
+ res->op_id = op_id;
+ if (0 < err_len)
+ memcpy (&res[1], err_msg, err_len);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Sending result to client for operation #%" PRIu64 ": "
+ "%" PRId64 " (%s)\n",
+ client, GNUNET_ntohll (op_id), result_code, err_msg);
+
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
+ GNUNET_NO);
+ GNUNET_free (res);
+}
+
+
+/**
* Closure for join_mem_test_cb()
*/
struct JoinMemTestClosure
@@ -799,7 +847,8 @@ mcast_recv_replay_fragment (void *cls,
{
struct Channel *chn = cls;
- GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key, fragment_id,
+ GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
+ fragment_id, fragment_id,
&store_recv_fragment_replay,
&store_recv_fragment_replay_result, rh);
}
@@ -817,7 +866,8 @@ mcast_recv_replay_message (void *cls,
struct GNUNET_MULTICAST_ReplayHandle *rh)
{
struct Channel *chn = cls;
- GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, message_id,
+ GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
+ message_id, message_id,
&store_recv_fragment_replay,
&store_recv_fragment_replay_result, rh);
}
@@ -865,7 +915,8 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
*/
static void
client_send_mcast_msg (struct Channel *chn,
- const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg,
+ uint32_t flags)
{
struct GNUNET_PSYC_MessageHeader *pmsg;
uint16_t size = ntohs (mmsg->header.size);
@@ -882,6 +933,7 @@ client_send_mcast_msg (struct Channel *chn,
pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
pmsg->message_id = mmsg->message_id;
pmsg->fragment_offset = mmsg->fragment_offset;
+ pmsg->flags = htonl (flags);
memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
client_send_msg (chn, &pmsg->header);
@@ -1111,7 +1163,7 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
{
if (GNUNET_NO == drop)
{
- client_send_mcast_msg (chn, cache_entry->mmsg);
+ client_send_mcast_msg (chn, cache_entry->mmsg, 0);
}
if (cache_entry->ref_count <= 1)
{
@@ -1375,10 +1427,10 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
struct Channel *chn = &mst->chn;
chn->store_op = NULL;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
res.header.size = htons (sizeof (res));
- res.result_code = htonl (result);
+ res.result_code = htonl (result - INT32_MIN);
res.max_message_id = GNUNET_htonll (max_message_id);
if (GNUNET_OK == result || GNUNET_NO == result)
@@ -1421,10 +1473,10 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
struct Channel *chn = &slv->chn;
chn->store_op = NULL;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
res.header.size = htons (sizeof (res));
- res.result_code = htonl (result);
+ res.result_code = htonl (result - INT32_MIN);
res.max_message_id = GNUNET_htonll (max_message_id);
if (GNUNET_OK == result || GNUNET_NO == result)
@@ -1511,10 +1563,10 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
{
chn = &mst->chn;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
res.header.size = htons (sizeof (res));
- res.result_code = htonl (GNUNET_OK);
+ res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN);
res.max_message_id = GNUNET_htonll (mst->max_message_id);
GNUNET_SERVER_notification_context_add (nc, client);
@@ -1621,10 +1673,10 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
{
chn = &slv->chn;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
res.header.size = htons (sizeof (res));
- res.result_code = htonl (GNUNET_OK);
+ res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN);
res.max_message_id = GNUNET_htonll (chn->max_message_id);
GNUNET_SERVER_notification_context_add (nc, client);
@@ -2047,17 +2099,26 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
};
+struct MembershipStoreClosure
+{
+ struct GNUNET_SERVER_Client *client;
+ struct Channel *chn;
+ uint64_t op_id;
+};
+
+
/**
* Received result of GNUNET_PSYCSTORE_membership_store()
*/
static void
store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg)
{
- struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
+ struct MembershipStoreClosure *mcls = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n",
- mth, result, err_msg);
- /* FIXME: send result to client */
+ mcls->chn, result, err_msg);
+
+ client_send_result (mcls->client, mcls->op_id, result, err_msg);
}
@@ -2075,6 +2136,11 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
const struct ChannelMembershipStoreRequest *
req = (const struct ChannelMembershipStoreRequest *) msg;
+ struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls));
+ mcls->client = client;
+ mcls->chn = chn;
+ mcls->op_id = req->op_id;
+
uint64_t announced_at = GNUNET_ntohll (req->announced_at);
uint64_t effective_since = GNUNET_ntohll (req->effective_since);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2086,19 +2152,138 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
req->did_join, announced_at, effective_since,
0, /* FIXME: group_generation */
- &store_recv_membership_store_result, chn);
+ &store_recv_membership_store_result, mcls);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
+static int
+store_recv_fragment_history (void *cls,
+ struct GNUNET_MULTICAST_MessageHeader *msg,
+ enum GNUNET_PSYCSTORE_MessageFlags flags)
+{
+ struct OperationClosure *opcls = cls;
+ struct Channel *chn = opcls->chn;
+ client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
+ */
+static void
+store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg)
+{
+ struct OperationClosure *opcls = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p History replay #%" PRIu64 ": "
+ "PSYCSTORE returned %" PRId64 " (%s)\n",
+ opcls->chn, opcls->op_id, result, err_msg);
+
+ client_send_result (opcls->client, opcls->op_id, result, err_msg);
+}
+
+
/**
* Client requests channel history from PSYCstore.
*/
static void
-client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct Channel *
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
+
+ const struct HistoryRequest *
+ req = (const struct HistoryRequest *) msg;
+
+ struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
+ opcls->client = client;
+ opcls->chn = chn;
+ opcls->op_id = req->op_id;
+
+ if (0 == req->message_limit)
+ GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
+ GNUNET_ntohll (req->start_message_id),
+ GNUNET_ntohll (req->end_message_id),
+ &store_recv_fragment_history,
+ &store_recv_fragment_history_result, opcls);
+ else
+ GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
+ GNUNET_ntohll (req->message_limit),
+ &store_recv_fragment_history,
+ &store_recv_fragment_history_result,
+ opcls);
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Received state var from PSYCstore, send it to client.
+ */
+static int
+store_recv_state_var (void *cls, const char *name,
+ const void *value, size_t value_size)
+{
+ struct OperationClosure *opcls = cls;
+ struct OperationResult *op;
+
+ if (NULL != name)
+ {
+ uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
+ struct GNUNET_PSYC_MessageModifier *mod;
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size);
+ op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size);
+ op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+ op->op_id = opcls->op_id;
+
+ mod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
+ mod->header.size = htons (sizeof (*mod) + name_size + value_size);
+ mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
+ mod->name_size = htons (name_size);
+ mod->value_size = htonl (value_size);
+ mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
+ memcpy (&mod[1], name, name_size);
+ memcpy (((char *) &mod[1]) + name_size, value, value_size);
+ }
+ else
+ {
+ struct GNUNET_MessageHeader *mod;
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size);
+ op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size);
+ op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+ op->op_id = opcls->op_id;
+
+ mod = (struct GNUNET_MessageHeader *) &op[1];
+ mod->size = htons (sizeof (*mod) + value_size);
+ mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
+ memcpy (&mod[1], value, value_size);
+ }
+
+ GNUNET_SERVER_notification_context_add (nc, opcls->client);
+ GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header,
+ GNUNET_NO);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Received result of GNUNET_PSYCSTORE_state_get()
+ * or GNUNET_PSYCSTORE_state_get_prefix()
+ */
+static void
+store_recv_state_result (void *cls, int64_t result, const char *err_msg)
{
+ struct OperationClosure *opcls = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p History replay #%" PRIu64 ": "
+ "PSYCSTORE returned %" PRId64 " (%s)\n",
+ opcls->chn, opcls->op_id, result, err_msg);
+ client_send_result (opcls->client, opcls->op_id, result, err_msg);
}
@@ -2109,7 +2294,30 @@ static void
client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
+ struct Channel *
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
+
+ const struct StateRequest *
+ req = (const struct StateRequest *) msg;
+ uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
+ const char *name = (const char *) &req[1];
+ if (0 == name_size || '\0' != name[name_size - 1])
+ {
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
+ opcls->client = client;
+ opcls->chn = chn;
+ opcls->op_id = req->op_id;
+
+ GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
+ &store_recv_state_var,
+ &store_recv_state_result, opcls);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -2120,6 +2328,30 @@ static void
client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
+ struct Channel *
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
+
+ const struct StateRequest *
+ req = (const struct StateRequest *) msg;
+
+ uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
+ const char *name = (const char *) &req[1];
+ if (0 == name_size || '\0' != name[name_size - 1])
+ {
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
+ opcls->client = client;
+ opcls->chn = chn;
+ opcls->op_id = req->op_id;
+
+ GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
+ &store_recv_state_var,
+ &store_recv_state_result, opcls);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -2140,8 +2372,8 @@ static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
{ &client_recv_membership_store, NULL,
GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
- { &client_recv_story_request, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
+ { &client_recv_history_replay, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
{ &client_recv_state_get, NULL,
GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 21131e7d3..f6d40ddb4 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -112,30 +112,39 @@ struct ChannelMembershipStoreRequest
*/
struct GNUNET_MessageHeader header;
- uint32_t reserved;
+ uint32_t reserved GNUNET_PACKED;
+
+ uint64_t op_id GNUNET_PACKED;
struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
- uint64_t announced_at;
+ uint64_t announced_at GNUNET_PACKED;
- uint64_t effective_since;
+ uint64_t effective_since GNUNET_PACKED;
uint8_t did_join;
};
-struct StoryRequest
+struct HistoryRequest
{
/**
- * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STORY_REQUEST
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_HISTORY_REQUEST
*/
struct GNUNET_MessageHeader header;
- uint64_t op_id;
+ uint32_t reserved GNUNET_PACKED;
+
+ /**
+ * ID for this operation.
+ */
+ uint64_t op_id GNUNET_PACKED;
+
+ uint64_t start_message_id GNUNET_PACKED;
- uint64_t start_message_id;
+ uint64_t end_message_id GNUNET_PACKED;
- uint64_t end_message_id;
+ uint64_t message_limit GNUNET_PACKED;
};
@@ -148,10 +157,12 @@ struct StateRequest
*/
struct GNUNET_MessageHeader header;
+ uint32_t reserved GNUNET_PACKED;
+
/**
* ID for this operation.
*/
- uint64_t op_id;
+ uint64_t op_id GNUNET_PACKED;
/* Followed by NUL-terminated name. */
};
@@ -160,25 +171,6 @@ struct StateRequest
/**** service -> library ****/
-struct CountersResult
-{
- /**
- * Type: GNUNET_MESSAGE_TYPE_PSYC_RESULT_COUNTERS
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Status code for the operation.
- */
- int32_t result_code GNUNET_PACKED;
-
- /**
- * Last message ID sent to the channel.
- */
- uint64_t max_message_id;
-};
-
-
/**
* Answer from service to client about last operation.
*/
@@ -192,23 +184,22 @@ struct OperationResult
*/
struct GNUNET_MessageHeader header;
+ uint32_t reserved GNUNET_PACKED;
+
/**
* Operation ID.
*/
- uint32_t op_id GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
/**
* Status code for the operation.
*/
- int64_t result_code GNUNET_PACKED;
+ uint64_t result_code GNUNET_PACKED;
/* Followed by:
* - on error: NUL-terminated error message
* - on success: one of the following message types
*
- * For a STORY_RESULT:
- * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE
- *
* For a STATE_RESULT, one of:
* - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER
* - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index ca25b1b01..8cce89704 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -43,6 +43,33 @@
#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
+struct OperationListItem
+{
+ struct OperationListItem *prev;
+ struct OperationListItem *next;
+
+ /**
+ * Operation ID.
+ */
+ uint64_t op_id;
+
+ /**
+ * Continuation to invoke with the result of an operation.
+ */
+ GNUNET_PSYC_ResultCallback result_cb;
+
+ /**
+ * State variable result callback.
+ */
+ GNUNET_PSYC_StateVarCallback state_var_cb;
+
+ /**
+ * Closure for the callbacks.
+ */
+ void *cls;
+};
+
+
/**
* Handle to access PSYC channel operations for both the master and slaves.
*/
@@ -84,6 +111,21 @@ struct GNUNET_PSYC_Channel
void *disconnect_cls;
/**
+ * First operation in the linked list.
+ */
+ struct OperationListItem *op_head;
+
+ /**
+ * Last operation in the linked list.
+ */
+ struct OperationListItem *op_tail;
+
+ /**
+ * Last operation ID used.
+ */
+ uint64_t last_op_id;
+
+ /**
* Are we polling for incoming messages right now?
*/
uint8_t in_receive;
@@ -163,21 +205,82 @@ struct GNUNET_PSYC_SlaveTransmitHandle
/**
- * Handle to a story telling operation.
+ * Get a fresh operation ID to distinguish between PSYCstore requests.
+ *
+ * @param h Handle to the PSYCstore service.
+ * @return next operation id to use
*/
-struct GNUNET_PSYC_Story
+static uint64_t
+op_get_next_id (struct GNUNET_PSYC_Channel *chn)
{
-
-};
+ return ++chn->last_op_id;
+}
/**
- * Handle for a state query operation.
+ * Find operation by ID.
+ *
+ * @return Operation, or NULL if none found.
*/
-struct GNUNET_PSYC_StateQuery
+static struct OperationListItem *
+op_find_by_id (struct GNUNET_PSYC_Channel *chn, uint64_t op_id)
{
+ struct OperationListItem *op = chn->op_head;
+ while (NULL != op)
+ {
+ if (op->op_id == op_id)
+ return op;
+ op = op->next;
+ }
+ return NULL;
+}
-};
+
+static uint64_t
+op_add (struct GNUNET_PSYC_Channel *chn, GNUNET_PSYC_ResultCallback result_cb,
+ void *cls)
+{
+ if (NULL == result_cb)
+ return 0;
+
+ struct OperationListItem *op = GNUNET_malloc (sizeof (*op));
+ op->op_id = op_get_next_id (chn);
+ op->result_cb = result_cb;
+ op->cls = cls;
+ GNUNET_CONTAINER_DLL_insert_tail (chn->op_head, chn->op_tail, op);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Added operation #%" PRIu64 "\n", chn, op->op_id);
+ return op->op_id;
+}
+
+
+static int
+op_result (struct GNUNET_PSYC_Channel *chn, uint64_t op_id,
+ int64_t result_code, const char *err_msg)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received result for operation #%" PRIu64 ": %" PRId64 " (%s)\n",
+ chn, op_id, result_code, err_msg);
+ if (0 == op_id)
+ return GNUNET_NO;
+
+ struct OperationListItem *op = op_find_by_id (chn, op_id);
+ if (NULL == op)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Could not find operation #%" PRIu64 "\n", op_id);
+ return GNUNET_NO;
+ }
+
+ GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op);
+
+ if (NULL != op->result_cb)
+ op->result_cb (op->cls, result_code, err_msg);
+
+ GNUNET_free (op);
+ return GNUNET_YES;
+}
static void
@@ -203,6 +306,79 @@ channel_recv_disconnect (void *cls,
static void
+channel_recv_result (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PSYC_Channel *
+ chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
+
+ uint16_t size = ntohs (msg->size);
+ const struct OperationResult *res = (const struct OperationResult *) msg;
+ const char *err_msg = NULL;
+
+ if (sizeof (struct OperationResult) < size)
+ {
+ err_msg = (const char *) &res[1];
+ if ('\0' != err_msg[size - sizeof (struct OperationResult) - 1])
+ {
+ GNUNET_break (0);
+ err_msg = NULL;
+ }
+ }
+
+ op_result (chn, GNUNET_ntohll (res->op_id),
+ GNUNET_ntohll (res->result_code) + INT64_MIN, err_msg);
+}
+
+
+static void
+channel_recv_state_result (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PSYC_Channel *
+ chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
+
+ const struct OperationResult *res = (const struct OperationResult *) msg;
+ struct OperationListItem *op = op_find_by_id (chn, GNUNET_ntohll (res->op_id));
+ if (NULL == op || NULL == op->state_var_cb)
+ return;
+
+ const struct GNUNET_MessageHeader *modc = (struct GNUNET_MessageHeader *) &op[1];
+ uint16_t modc_size = ntohs (modc->size);
+ if (ntohs (msg->size) - sizeof (*msg) != modc_size)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ switch (ntohs (modc->type))
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ {
+ const struct GNUNET_PSYC_MessageModifier *
+ mod = (const struct GNUNET_PSYC_MessageModifier *) modc;
+
+ const char *name = (const char *) &mod[1];
+ uint16_t name_size = ntohs (mod->name_size);
+ if ('\0' != name[name_size - 1])
+ {
+ GNUNET_break (0);
+ return;
+ }
+ op->state_var_cb (op->cls, name, name + name_size, ntohs (mod->value_size));
+ break;
+ }
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ op->state_var_cb (op->cls, NULL, (const char *) &modc[1],
+ modc_size - sizeof (*modc));
+ break;
+ }
+}
+
+
+static void
channel_recv_message (void *cls,
struct GNUNET_CLIENT_MANAGER_Connection *client,
const struct GNUNET_MessageHeader *msg)
@@ -234,9 +410,16 @@ master_recv_start_ack (void *cls,
mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
sizeof (struct GNUNET_PSYC_Channel));
- struct CountersResult *cres = (struct CountersResult *) msg;
+ struct GNUNET_PSYC_CountersResultMessage *
+ cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
+ int32_t result = ntohl (cres->result_code) + INT32_MIN;
+ if (GNUNET_OK != result && GNUNET_NO != result)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master.\n");
+ GNUNET_break (0);
+ }
if (NULL != mst->start_cb)
- mst->start_cb (mst->cb_cls, GNUNET_ntohll (cres->max_message_id));
+ mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
}
@@ -279,9 +462,16 @@ slave_recv_join_ack (void *cls,
struct GNUNET_PSYC_Slave *
slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
sizeof (struct GNUNET_PSYC_Channel));
- struct CountersResult *cres = (struct CountersResult *) msg;
+ struct GNUNET_PSYC_CountersResultMessage *
+ cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
+ int32_t result = ntohl (cres->result_code) + INT32_MIN;
+ if (GNUNET_YES != result && GNUNET_NO != result)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n");
+ GNUNET_break (0);
+ }
if (NULL != slv->connect_cb)
- slv->connect_cb (slv->cb_cls, GNUNET_ntohll (cres->max_message_id));
+ slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
}
@@ -317,12 +507,20 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] =
{ &master_recv_start_ack, NULL,
GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK,
- sizeof (struct CountersResult), GNUNET_NO },
+ sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO },
{ &master_recv_join_request, NULL,
GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES },
+ { &channel_recv_state_result, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
+ sizeof (struct OperationResult), GNUNET_YES },
+
+ { &channel_recv_result, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
+ sizeof (struct OperationResult), GNUNET_YES },
+
{ &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
{ NULL, NULL, 0, 0, GNUNET_NO }
@@ -341,12 +539,20 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] =
{ &slave_recv_join_ack, NULL,
GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK,
- sizeof (struct CountersResult), GNUNET_NO },
+ sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO },
{ &slave_recv_join_decision, NULL,
GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES },
+ { &channel_recv_state_result, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
+ sizeof (struct OperationResult), GNUNET_YES },
+
+ { &channel_recv_result, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
+ sizeof (struct OperationResult), GNUNET_YES },
+
{ &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
{ NULL, NULL, 0, 0, GNUNET_NO }
@@ -808,7 +1014,9 @@ void
GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
uint64_t announced_at,
- uint64_t effective_since)
+ uint64_t effective_since,
+ GNUNET_PSYC_ResultCallback result_cb,
+ void *cls)
{
struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req));
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE);
@@ -817,6 +1025,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
req->announced_at = GNUNET_htonll (announced_at);
req->effective_since = GNUNET_htonll (effective_since);
req->did_join = GNUNET_YES;
+ req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
+
GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
}
@@ -845,7 +1055,9 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
void
GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
- uint64_t announced_at)
+ uint64_t announced_at,
+ GNUNET_PSYC_ResultCallback result_cb,
+ void *cls)
{
struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req));
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE);
@@ -853,57 +1065,85 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
req->slave_key = *slave_key;
req->announced_at = GNUNET_htonll (announced_at);
req->did_join = GNUNET_NO;
+ req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
+
GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
}
/**
- * Request to be told the message history of the channel.
+ * Request to replay a part of the message history of the channel.
*
* Historic messages (but NOT the state at the time) will be replayed (given to
* the normal method handlers) if available and if access is permitted.
*
- * To get the latest message, use 0 for both the start and end message ID.
- *
- * @param channel Which channel should be replayed?
- * @param start_message_id Earliest interesting point in history.
- * @param end_message_id Last (exclusive) interesting point in history.
- * @param message_cb Function to invoke on message parts received from the story.
- * @param finish_cb Function to call when the requested story has been fully
- * told (counting message IDs might not suffice, as some messages
- * might be secret and thus the listener would not know the story is
- * finished without being told explicitly) once this function
- * has been called, the client must not call
- * GNUNET_PSYC_channel_story_tell_cancel() anymore.
- * @param cls Closure for the callbacks.
- *
- * @return Handle to cancel story telling operation.
+ * @param channel
+ * Which channel should be replayed?
+ * @param start_message_id
+ * Earliest interesting point in history.
+ * @param end_message_id
+ * Last (inclusive) interesting point in history.
+ * FIXME: @param method_prefix
+ * Retrieve only messages with a matching method prefix.
+ * @param result_cb
+ * Function to call when the requested history has been fully replayed.
+ * @param cls
+ * Closure for the callbacks.
+ *
+ * @return Handle to cancel history replay operation.
*/
-struct GNUNET_PSYC_Story *
-GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel,
- uint64_t start_message_id,
- uint64_t end_message_id,
- GNUNET_PSYC_MessageCallback message_cb,
- GNUNET_PSYC_MessagePartCallback message_part_cb,
- GNUNET_PSYC_FinishCallback finish_cb,
- void *cls)
+void
+GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
+ uint64_t start_message_id,
+ uint64_t end_message_id,
+ /* FIXME: const char *method_prefix, */
+ GNUNET_PSYC_ResultCallback result_cb,
+ void *cls)
{
- return NULL;
+ struct HistoryRequest *req = GNUNET_malloc (sizeof (*req));
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
+ req->header.size = htons (sizeof (*req));
+ req->start_message_id = GNUNET_htonll (start_message_id);
+ req->end_message_id = GNUNET_htonll (end_message_id);
+ req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
+
+ GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
}
/**
- * Abort story telling.
+ * Request to replay the latest messages from the message history of the channel.
*
- * This function must not be called from within method handlers (as given to
- * GNUNET_PSYC_slave_join()) of the slave.
+ * Historic messages (but NOT the state at the time) will be replayed (given to
+ * the normal method handlers) if available and if access is permitted.
*
- * @param story Story telling operation to stop.
+ * @param channel
+ * Which channel should be replayed?
+ * @param message_limit
+ * Maximum number of messages to replay.
+ * FIXME: @param method_prefix
+ * Retrieve only messages with a matching method prefix.
+ * @param result_cb
+ * Function to call when the requested history has been fully replayed.
+ * @param cls
+ * Closure for the callbacks.
+ *
+ * @return Handle to cancel history replay operation.
*/
void
-GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story)
+GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn,
+ uint64_t message_limit,
+ /* FIXME: const char *method_prefix, */
+ GNUNET_PSYC_ResultCallback result_cb,
+ void *cls)
{
+ struct HistoryRequest *req = GNUNET_malloc (sizeof (*req));
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
+ req->header.size = htons (sizeof (*req));
+ req->message_limit = GNUNET_htonll (message_limit);
+ req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
+ GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
}
@@ -914,22 +1154,35 @@ GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story)
* less-specific name is matched; for example, requesting "_a_b" will match "_a"
* if "_a_b" does not exist.
*
- * @param channel Channel handle.
- * @param full_name Full name of the requested variable, the actual variable
- * returned might have a shorter name..
- * @param cb Function called once when a matching state variable is found.
+ * @param channel
+ * Channel handle.
+ * @param full_name
+ * Full name of the requested variable.
+ * The actual variable returned might have a shorter name.
+ * @param var_cb
+ * Function called once when a matching state variable is found.
* Not called if there's no matching state variable.
- * @param cb_cls Closure for the callbacks.
- *
- * @return Handle that can be used to cancel the query operation.
+ * @param result_cb
+ * Function called after the operation finished.
+ * (i.e. all state variables have been returned via @a state_cb)
+ * @param cls
+ * Closure for the callbacks.
*/
-struct GNUNET_PSYC_StateQuery *
-GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
+void
+GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn,
const char *full_name,
- GNUNET_PSYC_StateCallback cb,
- void *cb_cls)
+ GNUNET_PSYC_StateVarCallback var_cb,
+ GNUNET_PSYC_ResultCallback result_cb,
+ void *cls)
{
- return NULL;
+ size_t name_size = strlen (full_name) + 1;
+ struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size);
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET);
+ req->header.size = htons (sizeof (*req) + name_size);
+ req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
+ memcpy (&req[1], full_name, name_size);
+
+ GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
}
@@ -943,33 +1196,34 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
* The @a state_cb is invoked on all matching state variables asynchronously, as
* the state is stored in and retrieved from the PSYCstore,
*
- * @param channel Channel handle.
- * @param name_prefix Prefix of the state variable name to match.
- * @param cb Function to call with the matching state variables.
- * @param cb_cls Closure for the callbacks.
- *
- * @return Handle that can be used to cancel the query operation.
- */
-struct GNUNET_PSYC_StateQuery *
-GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel,
- const char *name_prefix,
- GNUNET_PSYC_StateCallback cb,
- void *cb_cls)
-{
- return NULL;
-}
-
-
-/**
- * Cancel a state query operation.
- *
- * @param query Handle for the operation to cancel.
+ * @param channel
+ * Channel handle.
+ * @param name_prefix
+ * Prefix of the state variable name to match.
+ * @param var_cb
+ * Function called once when a matching state variable is found.
+ * Not called if there's no matching state variable.
+ * @param result_cb
+ * Function called after the operation finished.
+ * (i.e. all state variables have been returned via @a state_cb)
+ * @param cls
+ * Closure for the callbacks.
*/
void
-GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateQuery *query)
+GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn,
+ const char *name_prefix,
+ GNUNET_PSYC_StateVarCallback var_cb,
+ GNUNET_PSYC_ResultCallback result_cb,
+ void *cls)
{
+ size_t name_size = strlen (name_prefix) + 1;
+ struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size);
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET);
+ req->header.size = htons (sizeof (*req) + name_size);
+ req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
+ memcpy (&req[1], name_prefix, name_size);
+ GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
}
-
/* end of psyc_api.c */
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 5eadef62c..044895809 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -35,6 +35,7 @@
#include "gnunet_env_lib.h"
#include "gnunet_psyc_util_lib.h"
#include "gnunet_psyc_service.h"
+#include "gnunet_core_service.h"
#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
@@ -45,6 +46,9 @@ int res;
const struct GNUNET_CONFIGURATION_Handle *cfg;
+struct GNUNET_CORE_Handle *core;
+struct GNUNET_PeerIdentity this_peer;
+
/**
* Handle for task for timeout termination.
*/
@@ -53,6 +57,8 @@ GNUNET_SCHEDULER_TaskIdentifier end_badly_task;
struct GNUNET_PSYC_Master *mst;
struct GNUNET_PSYC_Slave *slv;
+struct GNUNET_PSYC_Channel *mst_chn, *slv_chn;
+
struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key;
struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key;
@@ -80,9 +86,19 @@ uint8_t join_req_count;
enum
{
- TEST_NONE,
- TEST_SLAVE_TRANSMIT,
- TEST_MASTER_TRANSMIT,
+ TEST_NONE = 0,
+ TEST_MASTER_START = 1,
+ TEST_SLAVE_JOIN = 2,
+ TEST_SLAVE_TRANSMIT = 3,
+ TEST_MASTER_TRANSMIT = 4,
+ TEST_MASTER_HISTORY_REPLAY_LATEST = 5,
+ TEST_SLAVE_HISTORY_REPLAY_LATEST = 6,
+ TEST_MASTER_HISTORY_REPLAY = 7,
+ TEST_SLAVE_HISTORY_REPLAY = 8,
+ TEST_MASTER_STATE_GET = 9,
+ TEST_SLAVE_STATE_GET = 10,
+ TEST_MASTER_STATE_GET_PREFIX = 11,
+ TEST_SLAVE_STATE_GET_PREFIX = 12,
} test;
@@ -118,6 +134,11 @@ void slave_parted (void *cls)
void
cleanup ()
{
+ if (NULL != core)
+ {
+ GNUNET_CORE_disconnect (core);
+ core = NULL;
+ }
if (NULL != slv)
{
GNUNET_PSYC_slave_part (slv, GNUNET_NO, &slave_parted, NULL);
@@ -177,13 +198,203 @@ end ()
void
+state_get_var (void *cls, const char *name, const void *value, size_t value_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Got state var: %s\n%.*s\n", name, value_size, value);
+}
+
+
+/*** Slave state_get_prefix() ***/
+
+void
+slave_state_get_prefix_result (void *cls, int64_t result, const char *err_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg);
+ // FIXME: GNUNET_assert (2 == result);
+ end ();
+}
+
+
+void
+slave_state_get_prefix ()
+{
+ test = TEST_SLAVE_STATE_GET_PREFIX;
+ GNUNET_PSYC_channel_state_get_prefix (slv_chn, "_foo", &state_get_var,
+ &slave_state_get_prefix_result, NULL);
+}
+
+
+/*** Master state_get_prefix() ***/
+
+
+void
+master_state_get_prefix_result (void *cls, int64_t result, const char *err_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg);
+ // FIXME: GNUNET_assert (2 == result);
+ slave_state_get_prefix ();
+}
+
+
+void
+master_state_get_prefix ()
+{
+ test = TEST_MASTER_STATE_GET_PREFIX;
+ GNUNET_PSYC_channel_state_get_prefix (mst_chn, "_foo", &state_get_var,
+ &master_state_get_prefix_result, NULL);
+}
+
+
+/*** Slave state_get() ***/
+
+
+void
+slave_state_get_result (void *cls, int64_t result, const char *err_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_state_get:\t%" PRId64 " (%s)\n", result, err_msg);
+ // FIXME: GNUNET_assert (2 == result);
+ master_state_get_prefix ();
+}
+
+
+void
+slave_state_get ()
+{
+ test = TEST_SLAVE_STATE_GET;
+ GNUNET_PSYC_channel_state_get (slv_chn, "_foo_bar_baz", &state_get_var,
+ &slave_state_get_result, NULL);
+}
+
+
+/*** Master state_get() ***/
+
+
+void
+master_state_get_result (void *cls, int64_t result, const char *err_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_state_get:\t%" PRId64 " (%s)\n", result, err_msg);
+ // FIXME: GNUNET_assert (1 == result);
+ slave_state_get ();
+}
+
+
+void
+master_state_get ()
+{
+ test = TEST_MASTER_STATE_GET;
+ GNUNET_PSYC_channel_state_get (mst_chn, "_foo_bar_baz", &state_get_var,
+ &master_state_get_result, NULL);
+}
+
+
+/*** Slave history_replay() ***/
+
+void
+slave_history_replay_result (void *cls, int64_t result, const char *err_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_history_replay:\t%" PRId64 " (%s)\n", result, err_msg);
+ GNUNET_assert (9 == result);
+
+ master_state_get ();
+}
+
+
+void
+slave_history_replay ()
+{
+ test = TEST_SLAVE_HISTORY_REPLAY;
+ GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1,
+ &slave_history_replay_result,
+ NULL);
+}
+
+
+/*** Master history_replay() ***/
+
+
+void
+master_history_replay_result (void *cls, int64_t result, const char *err_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_history_replay:\t%" PRId64 " (%s)\n", result, err_msg);
+ GNUNET_assert (9 == result);
+
+ slave_history_replay ();
+}
+
+
+void
+master_history_replay ()
+{
+ test = TEST_MASTER_HISTORY_REPLAY;
+ GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1,
+ &master_history_replay_result,
+ NULL);
+}
+
+
+/*** Slave history_replay_latest() ***/
+
+
+void
+slave_history_replay_latest_result (void *cls, int64_t result, const char *err_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg);
+ GNUNET_assert (9 == result);
+
+ master_history_replay ();
+}
+
+
+void
+slave_history_replay_latest ()
+{
+ test = TEST_SLAVE_HISTORY_REPLAY_LATEST;
+ GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1,
+ &slave_history_replay_latest_result,
+ NULL);
+}
+
+
+/*** Master history_replay_latest() ***/
+
+
+void
+master_history_replay_latest_result (void *cls, int64_t result, const char *err_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "master_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg);
+ GNUNET_assert (9 == result);
+
+ slave_history_replay_latest ();
+}
+
+
+void
+master_history_replay_latest ()
+{
+ test = TEST_MASTER_HISTORY_REPLAY_LATEST;
+ GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1,
+ &master_history_replay_latest_result,
+ NULL);
+}
+
+
+void
master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
const struct GNUNET_PSYC_MessageHeader *msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Master got PSYC message fragment of size %u "
- "belonging to message ID %llu with flags %x\n",
- ntohs (msg->header.size), message_id, flags);
+ "Test #%d: Master got PSYC message fragment of size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, ntohs (msg->header.size), message_id, flags);
// FIXME
}
@@ -196,7 +407,7 @@ master_message_part_cb (void *cls, uint64_t message_id,
if (NULL == msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Error while receiving message %llu\n", message_id);
+ "Error while receiving message %" PRIu64 "\n", message_id);
return;
}
@@ -204,9 +415,9 @@ master_message_part_cb (void *cls, uint64_t message_id,
uint16_t size = ntohs (msg->size);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Master got message part of type %u and size %u "
- "belonging to message ID %llu with flags %x\n",
- type, size, message_id, flags);
+ "Test #%d: Master got message part of type %u and size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, type, size, message_id, flags);
switch (test)
{
@@ -227,6 +438,18 @@ master_message_part_cb (void *cls, uint64_t message_id,
case TEST_MASTER_TRANSMIT:
break;
+ case TEST_MASTER_HISTORY_REPLAY:
+ case TEST_MASTER_HISTORY_REPLAY_LATEST:
+ if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
+ flags);
+ GNUNET_assert (0);
+ return;
+ }
+ break;
+
default:
GNUNET_assert (0);
}
@@ -238,9 +461,9 @@ slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
const struct GNUNET_PSYC_MessageHeader *msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Slave got PSYC message fragment of size %u "
- "belonging to message ID %llu with flags %x\n",
- ntohs (msg->header.size), message_id, flags);
+ "Test #%d: Slave got PSYC message fragment of size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, ntohs (msg->header.size), message_id, flags);
// FIXME
}
@@ -253,7 +476,7 @@ slave_message_part_cb (void *cls, uint64_t message_id,
if (NULL == msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Error while receiving message %llu\n", message_id);
+ "Error while receiving message " PRIu64 "\n", message_id);
return;
}
@@ -261,15 +484,27 @@ slave_message_part_cb (void *cls, uint64_t message_id,
uint16_t size = ntohs (msg->size);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Slave got message part of type %u and size %u "
- "belonging to message ID %llu with flags %x\n",
- type, size, message_id, flags);
+ "Test #%d: Slave got message part of type %u and size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, type, size, message_id, flags);
switch (test)
{
case TEST_MASTER_TRANSMIT:
if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
- end ();
+ master_history_replay_latest ();
+ break;
+
+ case TEST_SLAVE_HISTORY_REPLAY:
+ case TEST_SLAVE_HISTORY_REPLAY_LATEST:
+ if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
+ flags);
+ GNUNET_assert (0);
+ return;
+ }
break;
default:
@@ -417,7 +652,6 @@ slave_transmit ()
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n");
-
test = TEST_SLAVE_TRANSMIT;
tmit = GNUNET_new (struct TransmitClosure);
@@ -438,6 +672,29 @@ slave_transmit ()
void
+slave_remove_cb (void *cls, int64_t result, const char *err_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_remove:\t%" PRId64 " (%s)\n", result, err_msg);
+
+ slave_transmit ();
+}
+
+
+void
+slave_add_cb (void *cls, int64_t result, const char *err_msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "slave_add:\t%" PRId64 " (%s)\n", result, err_msg);
+
+ struct GNUNET_PSYC_Channel *chn = cls;
+ GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
+ &slave_remove_cb, chn);
+
+}
+
+
+void
join_decision_cb (void *cls,
const struct GNUNET_PSYC_JoinDecisionMessage *dcsn,
int is_admitted,
@@ -453,7 +710,8 @@ join_decision_cb (void *cls,
return;
}
- slave_transmit ();
+ struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
+ GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn);
}
@@ -473,19 +731,17 @@ join_request_cb (void *cls,
/* Reject first request */
int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO;
GNUNET_PSYC_join_decision (jh, is_admitted, 0, NULL, NULL);
-
- /* Membership store */
- struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
- GNUNET_PSYC_channel_slave_add (chn, slave_key, 2, 2);
- GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2);
}
void
-slave_connect_cb (void *cls, uint64_t max_message_id)
+slave_connect_cb (void *cls, int result, uint64_t max_message_id)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Slave connected: %lu\n", max_message_id);
+ "Slave connected: %d, max_message_id: %" PRIu64 "\n",
+ result, max_message_id);
+ GNUNET_assert (TEST_SLAVE_JOIN == test);
+ GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
}
@@ -493,8 +749,9 @@ void
slave_join ()
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
+ test = TEST_SLAVE_JOIN;
- struct GNUNET_PeerIdentity origin = {}; // FIXME: this peer
+ struct GNUNET_PeerIdentity origin = this_peer;
struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
"_foo", "bar baz", 7);
@@ -507,6 +764,7 @@ slave_join ()
&slave_message_cb, &slave_message_part_cb,
&slave_connect_cb, &join_decision_cb, NULL,
join_msg);
+ slv_chn = GNUNET_PSYC_slave_get_channel (slv);
GNUNET_ENV_environment_destroy (env);
}
@@ -564,10 +822,13 @@ master_transmit ()
void
-master_start_cb (void *cls, uint64_t max_message_id)
+master_start_cb (void *cls, int result, uint64_t max_message_id)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Master started: %" PRIu64 "\n", max_message_id);
+ "Master started: %d, max_message_id: %" PRIu64 "\n",
+ result, max_message_id);
+ GNUNET_assert (TEST_MASTER_START == test);
+ GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
slave_join ();
}
@@ -576,10 +837,12 @@ void
master_start ()
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
+ test = TEST_MASTER_START;
mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE,
&master_start_cb, &join_request_cb,
&master_message_cb, &master_message_part_cb,
NULL);
+ mst_chn = GNUNET_PSYC_master_get_channel (mst);
}
void
@@ -589,6 +852,21 @@ schedule_master_start (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
}
+void
+core_connected (void *cls, const struct GNUNET_PeerIdentity *my_identity)
+{
+ this_peer = *my_identity;
+
+#if DEBUG_TEST_PSYC
+ master_start ();
+#else
+ /* Allow some time for the services to initialize. */
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+ &schedule_master_start, NULL);
+#endif
+
+}
+
/**
* Main function of the test, run from scheduler.
*
@@ -615,14 +893,8 @@ run (void *cls,
GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key);
GNUNET_CRYPTO_ecdsa_key_get_public (slave_key, &slave_pub_key);
-#if DEBUG_TEST_PSYC
- master_start ();
-#else
- /* Allow some time for the services to initialize. */
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
- &schedule_master_start, NULL);
-#endif
- return;
+ core = GNUNET_CORE_connect (cfg, NULL, &core_connected, NULL, NULL,
+ NULL, GNUNET_NO, NULL, GNUNET_NO, NULL);
}
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c
index 7d27ea29b..87a2c87ab 100644
--- a/src/psycstore/gnunet-service-psycstore.c
+++ b/src/psycstore/gnunet-service-psycstore.c
@@ -25,6 +25,8 @@
* @author Christian Grothoff
*/
+#include <inttypes.h>
+
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_constants.h"
@@ -89,39 +91,41 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
/**
* Send a result code back to the client.
*
- * @param client Client that should receive the result code.
- * @param result_code Code to transmit.
- * @param op_id Operation ID.
- * @param err_msg Error message to include (or NULL for none).
+ * @param client
+ * Client that should receive the result code.
+ * @param result_code
+ * Code to transmit.
+ * @param op_id
+ * Operation ID in network byte order.
+ * @param err_msg
+ * Error message to include (or NULL for none).
*/
static void
-send_result_code (struct GNUNET_SERVER_Client *client, uint32_t result_code,
- uint32_t op_id, const char *err_msg)
+send_result_code (struct GNUNET_SERVER_Client *client, uint64_t op_id,
+ int64_t result_code, const char *err_msg)
{
struct OperationResult *res;
- size_t err_len;
+ size_t err_len = 0; // FIXME: maximum length
- if (NULL == err_msg)
- err_len = 0;
- else
+ if (NULL != err_msg)
err_len = strlen (err_msg) + 1;
res = GNUNET_malloc (sizeof (struct OperationResult) + err_len);
res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE);
res->header.size = htons (sizeof (struct OperationResult) + err_len);
- res->result_code = htonl (result_code);
+ res->result_code = GNUNET_htonll (result_code - INT64_MIN);
res->op_id = op_id;
if (0 < err_len)
memcpy (&res[1], err_msg, err_len);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending result %d (%s) to client\n",
- (int) result_code,
- err_msg);
+ "Sending result to client: %" PRId64 " (%s)\n",
+ result_code, err_msg);
GNUNET_SERVER_notification_context_add (nc, client);
GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
GNUNET_NO);
GNUNET_free (res);
}
+
enum
{
MEMBERSHIP_TEST_NOT_NEEDED = 0,
@@ -129,6 +133,7 @@ enum
MEMBERSHIP_TEST_DONE = 2,
} MessageMembershipTest;
+
struct SendClosure
{
struct GNUNET_SERVER_Client *client;
@@ -158,7 +163,6 @@ struct SendClosure
* @see enum MessageMembershipTest
*/
uint8_t membership_test;
-
};
@@ -214,6 +218,8 @@ send_state_var (void *cls, const char *name,
struct StateResult *res;
size_t name_size = strlen (name) + 1;
+ /* FIXME: split up value into 64k chunks */
+
res = GNUNET_malloc (sizeof (struct StateResult) + name_size + value_size);
res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE);
res->header.size = htons (sizeof (struct StateResult) + name_size + value_size);
@@ -249,7 +255,7 @@ handle_membership_store (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_("Failed to store membership information!\n"));
- send_result_code (client, ret, req->op_id, NULL);
+ send_result_code (client, req->op_id, ret, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -274,7 +280,7 @@ handle_membership_test (void *cls,
_("Failed to test membership!\n"));
}
- send_result_code (client, ret, req->op_id, NULL);
+ send_result_code (client, req->op_id, ret, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -295,7 +301,7 @@ handle_fragment_store (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_("Failed to store fragment!\n"));
- send_result_code (client, ret, req->op_id, NULL);
+ send_result_code (client, req->op_id, ret, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -312,9 +318,20 @@ handle_fragment_get (void *cls,
.channel_key = req->channel_key, .slave_key = req->slave_key,
.membership_test = req->do_membership_test };
- int ret = db->fragment_get (db->cls, &req->channel_key,
- GNUNET_ntohll (req->fragment_id),
- &send_fragment, &sc);
+ int64_t ret;
+ uint64_t ret_frags = 0;
+ uint64_t first_fragment_id = GNUNET_ntohll (req->first_fragment_id);
+ uint64_t last_fragment_id = GNUNET_ntohll (req->last_fragment_id);
+ uint64_t limit = GNUNET_ntohll (req->fragment_limit);
+
+ if (0 == limit)
+ ret = db->fragment_get (db->cls, &req->channel_key,
+ first_fragment_id, last_fragment_id,
+ &ret_frags, &send_fragment, &sc);
+ else
+ ret = db->fragment_get_latest (db->cls, &req->channel_key, limit,
+ &ret_frags, &send_fragment, &sc);
+
switch (ret)
{
case GNUNET_YES:
@@ -340,8 +357,7 @@ handle_fragment_get (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_("Failed to get fragment!\n"));
}
-
- send_result_code (client, ret, req->op_id, NULL);
+ send_result_code (client, req->op_id, (ret < 0) ? ret : ret_frags, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -358,22 +374,31 @@ handle_message_get (void *cls,
.channel_key = req->channel_key, .slave_key = req->slave_key,
.membership_test = req->do_membership_test };
+ int64_t ret;
uint64_t ret_frags = 0;
- int64_t ret = db->message_get (db->cls, &req->channel_key,
- GNUNET_ntohll (req->message_id),
- &ret_frags, &send_fragment, &sc);
+ uint64_t first_message_id = GNUNET_ntohll (req->first_message_id);
+ uint64_t last_message_id = GNUNET_ntohll (req->last_message_id);
+ uint64_t limit = GNUNET_ntohll (req->message_limit);
+
+ if (0 == limit)
+ ret = db->message_get (db->cls, &req->channel_key,
+ first_message_id, last_message_id,
+ &ret_frags, &send_fragment, &sc);
+ else
+ ret = db->message_get_latest (db->cls, &req->channel_key, limit,
+ &ret_frags, &send_fragment, &sc);
+
switch (ret)
{
case GNUNET_YES:
case GNUNET_NO:
break;
default:
- ret_frags = ret;
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_("Failed to get message!\n"));
}
- send_result_code (client, ret_frags, req->op_id, NULL);
+ send_result_code (client, req->op_id, (ret < 0) ? ret : ret_frags, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -404,7 +429,7 @@ handle_message_get_fragment (void *cls,
_("Failed to get message fragment!\n"));
}
- send_result_code (client, ret, req->op_id, NULL);
+ send_result_code (client, req->op_id, ret, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -434,7 +459,7 @@ handle_counters_get (void *cls,
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS);
res.header.size = htons (sizeof (res));
- res.result_code = htonl (ret);
+ res.result_code = htonl (ret - INT32_MIN);
res.op_id = req->op_id;
res.max_fragment_id = GNUNET_htonll (res.max_fragment_id);
res.max_message_id = GNUNET_htonll (res.max_message_id);
@@ -517,7 +542,7 @@ handle_state_modify (void *cls,
_("Failed to end modifying state!\n"));
}
}
- send_result_code (client, ret, req->op_id, NULL);
+ send_result_code (client, req->op_id, ret, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -571,7 +596,7 @@ handle_state_sync (void *cls,
_("Failed to end synchronizing state!\n"));
}
}
- send_result_code (client, ret, req->op_id, NULL);
+ send_result_code (client, req->op_id, ret, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -590,7 +615,7 @@ handle_state_reset (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_("Failed to reset state!\n"));
- send_result_code (client, ret, req->op_id, NULL);
+ send_result_code (client, req->op_id, ret, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -609,7 +634,7 @@ handle_state_hash_update (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_("Failed to reset state!\n"));
- send_result_code (client, ret, req->op_id, NULL);
+ send_result_code (client, req->op_id, ret, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -660,7 +685,7 @@ handle_state_get (void *cls,
_("Failed to get state variable!\n"));
}
- send_result_code (client, ret, req->op_id, NULL);
+ send_result_code (client, req->op_id, ret, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -699,7 +724,7 @@ handle_state_get_prefix (void *cls,
_("Failed to get state variable!\n"));
}
- send_result_code (client, ret, req->op_id, NULL);
+ send_result_code (client, req->op_id, ret, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c
index 86b969c5d..cb6c5c437 100644
--- a/src/psycstore/plugin_psycstore_sqlite.c
+++ b/src/psycstore/plugin_psycstore_sqlite.c
@@ -130,12 +130,22 @@ struct Plugin
/**
* Precompiled SQL for fragment_get()
*/
- sqlite3_stmt *select_fragment;
+ sqlite3_stmt *select_fragments;
+
+ /**
+ * Precompiled SQL for fragment_get()
+ */
+ sqlite3_stmt *select_latest_fragments;
/**
* Precompiled SQL for message_get()
*/
- sqlite3_stmt *select_message;
+ sqlite3_stmt *select_messages;
+
+ /**
+ * Precompiled SQL for message_get()
+ */
+ sqlite3_stmt *select_latest_messages;
/**
* Precompiled SQL for message_get_fragment()
@@ -456,8 +466,8 @@ database_setup (struct Plugin *plugin)
" multicast_flags, psycstore_flags, data\n"
"FROM messages\n"
"WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
- " AND fragment_id = ?;",
- &plugin->select_fragment);
+ " AND ? <= fragment_id AND fragment_id <= ?;",
+ &plugin->select_fragments);
sql_prepare (plugin->dbh,
"SELECT hop_counter, signature, purpose, fragment_id,\n"
@@ -465,8 +475,35 @@ database_setup (struct Plugin *plugin)
" multicast_flags, psycstore_flags, data\n"
"FROM messages\n"
"WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
- " AND message_id = ? AND fragment_offset = ?;",
- &plugin->select_message_fragment);
+ " AND ? <= message_id AND message_id <= ?;",
+ &plugin->select_messages);
+
+ sql_prepare (plugin->dbh,
+ "SELECT * FROM\n"
+ "(SELECT hop_counter, signature, purpose, fragment_id,\n"
+ " fragment_offset, message_id, group_generation,\n"
+ " multicast_flags, psycstore_flags, data\n"
+ " FROM messages\n"
+ " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+ " ORDER BY fragment_id DESC\n"
+ " LIMIT ?)\n"
+ "ORDER BY fragment_id;",
+ &plugin->select_latest_fragments);
+
+ sql_prepare (plugin->dbh,
+ "SELECT hop_counter, signature, purpose, fragment_id,\n"
+ " fragment_offset, message_id, group_generation,\n"
+ " multicast_flags, psycstore_flags, data\n"
+ "FROM messages\n"
+ "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+ " AND message_id IN\n"
+ " (SELECT message_id\n"
+ " FROM messages\n"
+ " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
+ " ORDER BY message_id\n"
+ " DESC LIMIT ?)\n"
+ "ORDER BY fragment_id;",
+ &plugin->select_latest_messages);
sql_prepare (plugin->dbh,
"SELECT hop_counter, signature, purpose, fragment_id,\n"
@@ -474,8 +511,8 @@ database_setup (struct Plugin *plugin)
" multicast_flags, psycstore_flags, data\n"
"FROM messages\n"
"WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
- " AND message_id = ?;",
- &plugin->select_message);
+ " AND message_id = ? AND fragment_offset = ?;",
+ &plugin->select_message_fragment);
sql_prepare (plugin->dbh,
"SELECT fragment_id, message_id, group_generation\n"
@@ -1036,8 +1073,42 @@ fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
}
+
+static int
+fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt,
+ uint64_t *returned_fragments,
+ GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls)
+{
+ int ret = GNUNET_SYSERR;
+ int sql_ret;
+
+ do
+ {
+ sql_ret = sqlite3_step (stmt);
+ switch (sql_ret)
+ {
+ case SQLITE_DONE:
+ if (ret != GNUNET_OK)
+ ret = GNUNET_NO;
+ break;
+ case SQLITE_ROW:
+ ret = fragment_row (stmt, cb, cb_cls);
+ (*returned_fragments)++;
+ if (ret != GNUNET_YES)
+ sql_ret = SQLITE_DONE;
+ break;
+ default:
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_step");
+ }
+ }
+ while (sql_ret == SQLITE_ROW);
+
+ return ret;
+}
+
/**
- * Retrieve a message fragment by fragment ID.
+ * Retrieve a message fragment range by fragment ID.
*
* @see GNUNET_PSYCSTORE_fragment_get()
*
@@ -1046,36 +1117,29 @@ fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb,
static int
fragment_get (void *cls,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
- uint64_t fragment_id,
+ uint64_t first_fragment_id,
+ uint64_t last_fragment_id,
+ uint64_t *returned_fragments,
GNUNET_PSYCSTORE_FragmentCallback cb,
void *cb_cls)
{
struct Plugin *plugin = cls;
- sqlite3_stmt *stmt = plugin->select_fragment;
+ sqlite3_stmt *stmt = plugin->select_fragments;
int ret = GNUNET_SYSERR;
+ *returned_fragments = 0;
if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
sizeof (*channel_key),
SQLITE_STATIC)
- || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_id))
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_bind");
}
else
{
- switch (sqlite3_step (stmt))
- {
- case SQLITE_DONE:
- ret = GNUNET_NO;
- break;
- case SQLITE_ROW:
- ret = fragment_row (stmt, cb, cb_cls);
- break;
- default:
- LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "sqlite3_step");
- }
+ ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
}
if (SQLITE_OK != sqlite3_reset (stmt))
@@ -1087,8 +1151,52 @@ fragment_get (void *cls,
return ret;
}
+
/**
- * Retrieve all fragments of a message.
+ * Retrieve a message fragment range by fragment ID.
+ *
+ * @see GNUNET_PSYCSTORE_fragment_get_latest()
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+static int
+fragment_get_latest (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ uint64_t fragment_limit,
+ uint64_t *returned_fragments,
+ GNUNET_PSYCSTORE_FragmentCallback cb,
+ void *cb_cls)
+{
+ struct Plugin *plugin = cls;
+ sqlite3_stmt *stmt = plugin->select_latest_fragments;
+ int ret = GNUNET_SYSERR;
+ *returned_fragments = 0;
+
+ if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
+ sizeof (*channel_key),
+ SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_bind");
+ }
+ else
+ {
+ ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
+ }
+
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_reset");
+ }
+
+ return ret;
+}
+
+
+/**
+ * Retrieve all fragments of a message ID range.
*
* @see GNUNET_PSYCSTORE_message_get()
*
@@ -1097,48 +1205,29 @@ fragment_get (void *cls,
static int
message_get (void *cls,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
- uint64_t message_id,
+ uint64_t first_message_id,
+ uint64_t last_message_id,
uint64_t *returned_fragments,
GNUNET_PSYCSTORE_FragmentCallback cb,
void *cb_cls)
{
struct Plugin *plugin = cls;
- sqlite3_stmt *stmt = plugin->select_message;
+ sqlite3_stmt *stmt = plugin->select_messages;
int ret = GNUNET_SYSERR;
*returned_fragments = 0;
if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
sizeof (*channel_key),
SQLITE_STATIC)
- || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id))
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
"sqlite3_bind");
}
else
{
- int sql_ret;
- do
- {
- sql_ret = sqlite3_step (stmt);
- switch (sql_ret)
- {
- case SQLITE_DONE:
- if (ret != GNUNET_OK)
- ret = GNUNET_NO;
- break;
- case SQLITE_ROW:
- ret = fragment_row (stmt, cb, cb_cls);
- (*returned_fragments)++;
- if (ret != GNUNET_YES)
- sql_ret = SQLITE_DONE;
- break;
- default:
- LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "sqlite3_step");
- }
- }
- while (sql_ret == SQLITE_ROW);
+ ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
}
if (SQLITE_OK != sqlite3_reset (stmt))
@@ -1150,6 +1239,53 @@ message_get (void *cls,
return ret;
}
+
+/**
+ * Retrieve all fragments of the latest messages.
+ *
+ * @see GNUNET_PSYCSTORE_message_get_latest()
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+static int
+message_get_latest (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ uint64_t message_limit,
+ uint64_t *returned_fragments,
+ GNUNET_PSYCSTORE_FragmentCallback cb,
+ void *cb_cls)
+{
+ struct Plugin *plugin = cls;
+ sqlite3_stmt *stmt = plugin->select_latest_messages;
+ int ret = GNUNET_SYSERR;
+ *returned_fragments = 0;
+
+ if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
+ sizeof (*channel_key),
+ SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
+ sizeof (*channel_key),
+ SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_bind");
+ }
+ else
+ {
+ ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls);
+ }
+
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_reset");
+ }
+
+ return ret;
+}
+
+
/**
* Retrieve a fragment of message specified by its message ID and fragment
* offset.
@@ -1777,7 +1913,9 @@ libgnunet_plugin_psycstore_sqlite_init (void *cls)
api->fragment_store = &fragment_store;
api->message_add_flags = &message_add_flags;
api->fragment_get = &fragment_get;
+ api->fragment_get_latest = &fragment_get_latest;
api->message_get = &message_get;
+ api->message_get_latest = &message_get_latest;
api->message_get_fragment = &message_get_fragment;
api->counters_message_get = &counters_message_get;
api->counters_state_get = &counters_state_get;
diff --git a/src/psycstore/psycstore.h b/src/psycstore/psycstore.h
index 17905f422..e6a82848a 100644
--- a/src/psycstore/psycstore.h
+++ b/src/psycstore/psycstore.h
@@ -42,15 +42,17 @@ struct OperationResult
*/
struct GNUNET_MessageHeader header;
+ uint32_t reserved GNUNET_PACKED;
+
/**
* Operation ID.
*/
- uint32_t op_id GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
/**
* Status code for the operation.
*/
- int64_t result_code GNUNET_PACKED;
+ uint64_t result_code GNUNET_PACKED;
/* followed by 0-terminated error message (on error) */
@@ -70,9 +72,17 @@ struct CountersResult
struct GNUNET_MessageHeader header;
/**
+ * Status code for the operation:
+ * #GNUNET_OK: success, counter values are returned.
+ * #GNUNET_NO: no message has been sent to the channel yet.
+ * #GNUNET_SYSERR: an error occurred.
+ */
+ uint32_t result_code GNUNET_PACKED;
+
+ /**
* Operation ID.
*/
- uint32_t op_id GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
uint64_t max_fragment_id GNUNET_PACKED;
@@ -81,14 +91,6 @@ struct CountersResult
uint64_t max_group_generation GNUNET_PACKED;
uint64_t max_state_message_id GNUNET_PACKED;
-
- /**
- * Status code for the operation:
- * #GNUNET_OK: success, counter values are returned.
- * #GNUNET_NO: no message has been sent to the channel yet.
- * #GNUNET_SYSERR: an error occurred.
- */
- int32_t result_code GNUNET_PACKED;
};
@@ -102,15 +104,14 @@ struct FragmentResult
*/
struct GNUNET_MessageHeader header;
+ uint32_t psycstore_flags GNUNET_PACKED;
+
/**
* Operation ID.
*/
- uint32_t op_id GNUNET_PACKED;
-
- uint32_t psycstore_flags GNUNET_PACKED;
-
- /* followed by GNUNET_MULTICAST_MessageHeader */
+ uint64_t op_id GNUNET_PACKED;
+ /* Followed by GNUNET_MULTICAST_MessageHeader */
};
@@ -124,14 +125,16 @@ struct StateResult
*/
struct GNUNET_MessageHeader header;
+ uint16_t name_size GNUNET_PACKED;
+
+ uint16_t reserved GNUNET_PACKED;
+
/**
* Operation ID.
*/
- uint32_t op_id GNUNET_PACKED;
-
- uint16_t name_size GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
- /* followed by name and value */
+ /* Followed by name and value */
};
@@ -142,13 +145,14 @@ struct OperationRequest
{
struct GNUNET_MessageHeader header;
+ uint32_t reserved GNUNET_PACKED;
+
/**
* Operation ID.
*/
- uint32_t op_id GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
-
};
@@ -162,10 +166,12 @@ struct MembershipStoreRequest
*/
struct GNUNET_MessageHeader header;
+ uint32_t reserved GNUNET_PACKED;
+
/**
* Operation ID.
*/
- uint32_t op_id GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
/**
* Channel's public key.
@@ -177,9 +183,9 @@ struct MembershipStoreRequest
*/
struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
- uint64_t announced_at;
- uint64_t effective_since;
- uint64_t group_generation;
+ uint64_t announced_at GNUNET_PACKED;
+ uint64_t effective_since GNUNET_PACKED;
+ uint64_t group_generation GNUNET_PACKED;
uint8_t did_join;
};
@@ -194,10 +200,12 @@ struct MembershipTestRequest
*/
struct GNUNET_MessageHeader header;
+ uint32_t reserved GNUNET_PACKED;
+
/**
* Operation ID.
*/
- uint32_t op_id GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
/**
* Channel's public key.
@@ -226,9 +234,9 @@ struct FragmentStoreRequest
struct GNUNET_MessageHeader header;
/**
- * Operation ID.
+ * enum GNUNET_PSYCSTORE_MessageFlags
*/
- uint32_t op_id GNUNET_PACKED;
+ uint32_t psycstore_flags GNUNET_PACKED;
/**
* Channel's public key.
@@ -236,9 +244,9 @@ struct FragmentStoreRequest
struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
/**
- * enum GNUNET_PSYCSTORE_MessageFlags
+ * Operation ID.
*/
- uint32_t psycstore_flags GNUNET_PACKED;
+ uint64_t op_id;
/* Followed by fragment */
};
@@ -254,10 +262,12 @@ struct FragmentGetRequest
*/
struct GNUNET_MessageHeader header;
+ uint32_t reserved GNUNET_PACKED;
+
/**
* Operation ID.
*/
- uint32_t op_id GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
/**
* Channel's public key.
@@ -270,9 +280,19 @@ struct FragmentGetRequest
struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
/**
- * Fragment ID to request.
+ * First fragment ID to request.
*/
- uint64_t fragment_id GNUNET_PACKED;
+ uint64_t first_fragment_id GNUNET_PACKED;
+
+ /**
+ * Last fragment ID to request.
+ */
+ uint64_t last_fragment_id GNUNET_PACKED;
+
+ /**
+ * Maximum number of fragments to retrieve.
+ */
+ uint64_t fragment_limit GNUNET_PACKED;
/**
* Do membership test with @a slave_key before returning fragment?
@@ -292,10 +312,12 @@ struct MessageGetRequest
*/
struct GNUNET_MessageHeader header;
+ uint32_t reserved GNUNET_PACKED;
+
/**
* Operation ID.
*/
- uint32_t op_id GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
/**
* Channel's public key.
@@ -308,9 +330,19 @@ struct MessageGetRequest
struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
/**
- * Message ID to request.
+ * First message ID to request.
*/
- uint64_t message_id GNUNET_PACKED;
+ uint64_t first_message_id GNUNET_PACKED;
+
+ /**
+ * Last message ID to request.
+ */
+ uint64_t last_message_id GNUNET_PACKED;
+
+ /**
+ * Maximum number of messages to retrieve.
+ */
+ uint64_t message_limit GNUNET_PACKED;
/**
* Do membership test with @a slave_key before returning fragment?
@@ -330,10 +362,12 @@ struct MessageGetFragmentRequest
*/
struct GNUNET_MessageHeader header;
+ uint32_t reserved GNUNET_PACKED;
+
/**
* Operation ID.
*/
- uint32_t op_id GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
/**
* Channel's public key.
@@ -373,10 +407,12 @@ struct StateHashUpdateRequest
*/
struct GNUNET_MessageHeader header;
+ uint32_t reserved GNUNET_PACKED;
+
/**
* Operation ID.
*/
- uint32_t op_id GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
/**
* Channel's public key.
@@ -405,20 +441,6 @@ struct StateModifyRequest
struct GNUNET_MessageHeader header;
/**
- * Operation ID.
- */
- uint32_t op_id GNUNET_PACKED;
-
- /**
- * Channel's public key.
- */
- struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
-
- uint64_t message_id GNUNET_PACKED;
-
- uint64_t state_delta GNUNET_PACKED;
-
- /**
* Size of name, including NUL terminator.
*/
uint16_t name_size GNUNET_PACKED;
@@ -433,6 +455,20 @@ struct StateModifyRequest
*/
uint8_t oper;
+ /**
+ * Operation ID.
+ */
+ uint64_t op_id GNUNET_PACKED;
+
+ /**
+ * Channel's public key.
+ */
+ struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
+
+ uint64_t message_id GNUNET_PACKED;
+
+ uint64_t state_delta GNUNET_PACKED;
+
/* Followed by NUL-terminated name, then the value. */
};
@@ -448,26 +484,28 @@ struct StateSyncRequest
struct GNUNET_MessageHeader header;
/**
- * Operation ID.
+ * Size of name, including NUL terminator.
*/
- uint32_t op_id GNUNET_PACKED;
+ uint16_t name_size GNUNET_PACKED;
/**
- * Channel's public key.
+ * OR'd StateOpFlags
*/
- struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
+ uint8_t flags;
+
+ uint8_t reserved;
uint64_t message_id GNUNET_PACKED;
/**
- * Size of name, including NUL terminator.
+ * Operation ID.
*/
- uint16_t name_size GNUNET_PACKED;
+ uint64_t op_id GNUNET_PACKED;
/**
- * OR'd StateOpFlags
+ * Channel's public key.
*/
- uint8_t flags;
+ struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
/* Followed by NUL-terminated name, then the value. */
};
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c
index 9df55888d..9ef1fb61a 100644
--- a/src/psycstore/psycstore_api.c
+++ b/src/psycstore/psycstore_api.c
@@ -25,6 +25,8 @@
* @author Christian Grothoff
*/
+#include <inttypes.h>
+
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_constants.h"
@@ -76,7 +78,7 @@ struct GNUNET_PSYCSTORE_OperationHandle
/**
* Operation ID.
*/
- uint32_t op_id;
+ uint64_t op_id;
/**
* Message to send to the PSYCstore service.
@@ -137,15 +139,14 @@ struct GNUNET_PSYCSTORE_Handle
struct GNUNET_TIME_Relative reconnect_delay;
/**
- * Are we polling for incoming messages right now?
+ * Last operation ID used.
*/
- int in_receive;
+ uint64_t last_op_id;
/**
- * The last operation id used for a PSYCstore operation.
+ * Are we polling for incoming messages right now?
*/
- uint32_t last_op_id_used;
-
+ uint8_t in_receive;
};
@@ -155,10 +156,10 @@ struct GNUNET_PSYCSTORE_Handle
* @param h Handle to the PSYCstore service.
* @return next operation id to use
*/
-static uint32_t
+static uint64_t
get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
{
- return h->last_op_id_used++;
+ return h->last_op_id++;
}
@@ -168,7 +169,7 @@ get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
* @return OperationHandle if found, or NULL otherwise.
*/
static struct GNUNET_PSYCSTORE_OperationHandle *
-find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint32_t op_id)
+find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint64_t op_id)
{
struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
while (NULL != op)
@@ -284,19 +285,20 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
if (size == sizeof (struct OperationResult))
str = NULL;
- op = find_op_by_id (h, ntohl (opres->op_id));
+ op = find_op_by_id (h, GNUNET_ntohll (opres->op_id));
if (NULL == op)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "No callback registered for operation with ID %ld.\n",
- type, ntohl (opres->op_id));
+ "No callback registered for operation with ID %" PRIu64 ".\n",
+ type, GNUNET_ntohll (opres->op_id));
}
else
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received result message (type %d) with operation ID: %ld\n",
+ "Received result message (type %d) with operation ID: %" PRIu64 "\n",
type, op->op_id);
+ int64_t result_code = GNUNET_ntohll (opres->result_code) + INT64_MIN;
GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
if (NULL != op->res_cb)
{
@@ -307,19 +309,19 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY:
smreq = (const struct StateModifyRequest *) op->msg;
if (!(smreq->flags & STATE_OP_LAST
- || GNUNET_OK != ntohl (opres->result_code)))
+ || GNUNET_OK != result_code))
op->res_cb = NULL;
break;
case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
ssreq = (const struct StateSyncRequest *) op->msg;
if (!(ssreq->flags & STATE_OP_LAST
- || GNUNET_OK != ntohl (opres->result_code)))
+ || GNUNET_OK != result_code))
op->res_cb = NULL;
break;
}
}
if (NULL != op->res_cb)
- op->res_cb (op->cls, ntohl (opres->result_code), str);
+ op->res_cb (op->cls, result_code, str);
GNUNET_free (op);
}
break;
@@ -338,19 +340,20 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
cres = (const struct CountersResult *) msg;
- op = find_op_by_id (h, ntohl (cres->op_id));
+ op = find_op_by_id (h, GNUNET_ntohll (cres->op_id));
if (NULL == op)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "No callback registered for operation with ID %ld.\n",
- type, ntohl (cres->op_id));
+ "No callback registered for operation with ID %" PRIu64 ".\n",
+ type, GNUNET_ntohll (cres->op_id));
}
else
{
GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
if (NULL != op->data_cb)
((GNUNET_PSYCSTORE_CountersCallback)
- op->data_cb) (op->cls, ntohl (cres->result_code),
+ op->data_cb) (op->cls,
+ ntohl (cres->result_code) + INT32_MIN,
GNUNET_ntohll (cres->max_fragment_id),
GNUNET_ntohll (cres->max_message_id),
GNUNET_ntohll (cres->max_group_generation),
@@ -386,12 +389,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
return;
}
- op = find_op_by_id (h, ntohl (fres->op_id));
+ op = find_op_by_id (h, GNUNET_ntohll (fres->op_id));
if (NULL == op)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "No callback registered for operation with ID %ld.\n",
- type, ntohl (fres->op_id));
+ "No callback registered for operation with ID %" PRIu64 ".\n",
+ type, GNUNET_ntohll (fres->op_id));
}
else
{
@@ -427,12 +430,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
return;
}
- op = find_op_by_id (h, ntohl (sres->op_id));
+ op = find_op_by_id (h, GNUNET_ntohll (sres->op_id));
if (NULL == op)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "No callback registered for operation with ID %ld.\n",
- type, ntohl (sres->op_id));
+ "No callback registered for operation with ID %" PRIu64 ".\n",
+ type, GNUNET_ntohll (sres->op_id));
}
else
{
@@ -479,7 +482,7 @@ send_next_message (void *cls, size_t size, void *buf)
return 0;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending message of type %d to PSYCstore service. ID: %u\n",
+ "Sending message of type %d to PSYCstore service. ID: %" PRIu64 "\n",
ntohs (op->msg->type), op->op_id);
memcpy (buf, op->msg, ret);
@@ -682,8 +685,8 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
: effective_since == 0);
struct MembershipStoreRequest *req;
- struct GNUNET_PSYCSTORE_OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
op->h = h;
op->res_cb = rcb;
op->cls = rcb_cls;
@@ -700,7 +703,7 @@ GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
req->group_generation = GNUNET_htonll (group_generation);
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
@@ -746,8 +749,8 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
void *rcb_cls)
{
struct MembershipTestRequest *req;
- struct GNUNET_PSYCSTORE_OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
op->h = h;
op->res_cb = rcb;
op->cls = rcb_cls;
@@ -762,7 +765,7 @@ GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
req->group_generation = GNUNET_htonll (group_generation);
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
@@ -794,8 +797,8 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
{
uint16_t size = ntohs (msg->header.size);
struct FragmentStoreRequest *req;
- struct GNUNET_PSYCSTORE_OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
op->h = h;
op->res_cb = rcb;
op->cls = rcb_cls;
@@ -809,7 +812,7 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
memcpy (&req[1], msg, size);
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
@@ -819,7 +822,7 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
/**
- * Retrieve a message fragment by fragment ID.
+ * Retrieve message fragments by fragment ID range.
*
* @param h
* Handle for the PSYCstore.
@@ -829,9 +832,15 @@ GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
* The slave requesting the fragment. If not NULL, a membership test is
* performed first and the fragment is only returned if the slave has
* access to it.
- * @param fragment_id
- * Fragment ID to retrieve. Use 0 to get the latest message fragment.
- * @param fcb
+ * @param first_fragment_id
+ * First fragment ID to retrieve.
+ * Use 0 to get the latest message fragment.
+ * @param last_fragment_id
+ * Last consecutive fragment ID to retrieve.
+ * Use 0 to get the latest message fragment.
+ * @param fragment_limit
+ * Maximum number of fragments to retrieve.
+ * @param fragment_cb
* Callback to call with the retrieved fragments.
* @param rcb
* Callback to call with the result of the operation.
@@ -844,16 +853,85 @@ struct GNUNET_PSYCSTORE_OperationHandle *
GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
- uint64_t fragment_id,
- GNUNET_PSYCSTORE_FragmentCallback fcb,
+ uint64_t first_fragment_id,
+ uint64_t last_fragment_id,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
GNUNET_PSYCSTORE_ResultCallback rcb,
void *cls)
{
struct FragmentGetRequest *req;
- struct GNUNET_PSYCSTORE_OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op->h = h;
+ op->data_cb = (DataCallback) fragment_cb;
+ op->res_cb = rcb;
+ op->cls = cls;
+
+ req = (struct FragmentGetRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
+ req->header.size = htons (sizeof (*req));
+ req->channel_key = *channel_key;
+ req->first_fragment_id = GNUNET_htonll (first_fragment_id);
+ req->last_fragment_id = GNUNET_htonll (last_fragment_id);
+ if (NULL != slave_key)
+ {
+ req->slave_key = *slave_key;
+ req->do_membership_test = GNUNET_YES;
+ }
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = GNUNET_htonll (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
+}
+
+
+/**
+ * Retrieve latest message fragments.
+ *
+ * @param h
+ * Handle for the PSYCstore.
+ * @param channel_key
+ * The channel we are interested in.
+ * @param slave_key
+ * The slave requesting the fragment. If not NULL, a membership test is
+ * performed first and the fragment is only returned if the slave has
+ * access to it.
+ * @param first_fragment_id
+ * First fragment ID to retrieve.
+ * Use 0 to get the latest message fragment.
+ * @param last_fragment_id
+ * Last consecutive fragment ID to retrieve.
+ * Use 0 to get the latest message fragment.
+ * @param fragment_limit
+ * Maximum number of fragments to retrieve.
+ * @param fragment_cb
+ * Callback to call with the retrieved fragments.
+ * @param rcb
+ * Callback to call with the result of the operation.
+ * @param cls
+ * Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ uint64_t fragment_limit,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls)
+{
+ struct FragmentGetRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
op->h = h;
- op->data_cb = (DataCallback) fcb;
+ op->data_cb = (DataCallback) fragment_cb;
op->res_cb = rcb;
op->cls = cls;
@@ -862,7 +940,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
req->header.size = htons (sizeof (*req));
req->channel_key = *channel_key;
- req->fragment_id = GNUNET_htonll (fragment_id);
+ req->fragment_limit = GNUNET_ntohll (fragment_limit);
if (NULL != slave_key)
{
req->slave_key = *slave_key;
@@ -870,7 +948,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
}
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
@@ -880,7 +958,7 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
/**
- * Retrieve all fragments of a message.
+ * Retrieve all fragments of messages in a message ID range.
*
* @param h
* Handle for the PSYCstore.
@@ -890,9 +968,13 @@ GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
* The slave requesting the message. If not NULL, a membership test is
* performed first and the message is only returned if the slave has
* access to it.
- * @param message_id
- * Message ID to retrieve. Use 0 to get the latest message.
- * @param fcb
+ * @param first_message_id
+ * First message ID to retrieve.
+ * Use 0 to get the latest message.
+ * @param last_message_id
+ * Last consecutive message ID to retrieve.
+ * Use 0 to get the latest message.
+ * @param fragment_cb
* Callback to call with the retrieved fragments.
* @param rcb
* Callback to call with the result of the operation.
@@ -905,16 +987,17 @@ struct GNUNET_PSYCSTORE_OperationHandle *
GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
- uint64_t message_id,
- GNUNET_PSYCSTORE_FragmentCallback fcb,
+ uint64_t first_message_id,
+ uint64_t last_message_id,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
GNUNET_PSYCSTORE_ResultCallback rcb,
void *cls)
{
struct MessageGetRequest *req;
- struct GNUNET_PSYCSTORE_OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
op->h = h;
- op->data_cb = (DataCallback) fcb;
+ op->data_cb = (DataCallback) fragment_cb;
op->res_cb = rcb;
op->cls = cls;
@@ -923,7 +1006,8 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
req->header.size = htons (sizeof (*req));
req->channel_key = *channel_key;
- req->message_id = GNUNET_htonll (message_id);
+ req->first_message_id = GNUNET_htonll (first_message_id);
+ req->last_message_id = GNUNET_htonll (last_message_id);
if (NULL != slave_key)
{
req->slave_key = *slave_key;
@@ -931,7 +1015,68 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
}
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
+}
+
+
+/**
+ * Retrieve all fragments of the latest messages.
+ *
+ * @param h
+ * Handle for the PSYCstore.
+ * @param channel_key
+ * The channel we are interested in.
+ * @param slave_key
+ * The slave requesting the message. If not NULL, a membership test is
+ * performed first and the message is only returned if the slave has
+ * access to it.
+ * @param message_limit
+ * Maximum number of messages to retrieve.
+ * @param fragment_cb
+ * Callback to call with the retrieved fragments.
+ * @param rcb
+ * Callback to call with the result of the operation.
+ * @param cls
+ * Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ uint64_t message_limit,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls)
+{
+ struct MessageGetRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op->h = h;
+ op->data_cb = (DataCallback) fragment_cb;
+ op->res_cb = rcb;
+ op->cls = cls;
+
+ req = (struct MessageGetRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
+ req->header.size = htons (sizeof (*req));
+ req->channel_key = *channel_key;
+ req->message_limit = GNUNET_ntohll (message_limit);
+ if (NULL != slave_key)
+ {
+ req->slave_key = *slave_key;
+ req->do_membership_test = GNUNET_YES;
+ }
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
@@ -956,9 +1101,9 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
* Message ID to retrieve. Use 0 to get the latest message.
* @param fragment_offset
* Offset of the fragment to retrieve.
- * @param fcb
+ * @param fragment_cb
* Callback to call with the retrieved fragments.
- * @param rcb
+ * @param result_cb
* Callback to call with the result of the operation.
* @param cls
* Closure for the callbacks.
@@ -971,15 +1116,15 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
uint64_t message_id,
uint64_t fragment_offset,
- GNUNET_PSYCSTORE_FragmentCallback fcb,
+ GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
GNUNET_PSYCSTORE_ResultCallback rcb,
void *cls)
{
struct MessageGetFragmentRequest *req;
- struct GNUNET_PSYCSTORE_OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
op->h = h;
- op->data_cb = (DataCallback) fcb;
+ op->data_cb = (DataCallback) fragment_cb;
op->res_cb = rcb;
op->cls = cls;
@@ -997,7 +1142,7 @@ GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
}
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
@@ -1026,8 +1171,8 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
void *ccb_cls)
{
struct OperationRequest *req;
- struct GNUNET_PSYCSTORE_OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
op->h = h;
op->data_cb = ccb;
op->cls = ccb_cls;
@@ -1039,7 +1184,7 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
req->channel_key = *channel_key;
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
@@ -1109,7 +1254,7 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
@@ -1175,7 +1320,7 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
@@ -1204,8 +1349,8 @@ GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
void *rcb_cls)
{
struct OperationRequest *req;
- struct GNUNET_PSYCSTORE_OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
op->h = h;
op->res_cb = rcb;
op->cls = rcb_cls;
@@ -1217,7 +1362,7 @@ GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
req->channel_key = *channel_key;
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
@@ -1247,8 +1392,8 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
void *rcb_cls)
{
struct StateHashUpdateRequest *req;
- struct GNUNET_PSYCSTORE_OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
op->h = h;
op->res_cb = rcb;
op->cls = rcb_cls;
@@ -1261,7 +1406,7 @@ GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
req->hash = *hash;
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
@@ -1292,8 +1437,8 @@ GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
{
size_t name_size = strlen (name) + 1;
struct OperationRequest *req;
- struct GNUNET_PSYCSTORE_OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
op->h = h;
op->data_cb = (DataCallback) scb;
op->res_cb = rcb;
@@ -1307,7 +1452,7 @@ GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
memcpy (&req[1], name, name_size);
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
@@ -1339,8 +1484,8 @@ GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
{
size_t name_size = strlen (name_prefix) + 1;
struct OperationRequest *req;
- struct GNUNET_PSYCSTORE_OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
+ struct GNUNET_PSYCSTORE_OperationHandle *
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
op->h = h;
op->data_cb = (DataCallback) scb;
op->res_cb = rcb;
@@ -1354,7 +1499,7 @@ GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
memcpy (&req[1], name_prefix, name_size);
op->op_id = get_next_op_id (h);
- req->op_id = htonl (op->op_id);
+ req->op_id = GNUNET_htonll (op->op_id);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
diff --git a/src/psycstore/test_plugin_psycstore.c b/src/psycstore/test_plugin_psycstore.c
index 8267ddba8..a6c456fec 100644
--- a/src/psycstore/test_plugin_psycstore.c
+++ b/src/psycstore/test_plugin_psycstore.c
@@ -204,11 +204,17 @@ run (void *cls, char *const *args, const char *cfgfile,
msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
msg->header.size = htons (sizeof (*msg) + sizeof (channel_pub_key));
+ uint64_t fragment_id = INT64_MAX - 1;
+ msg->fragment_id = GNUNET_htonll (fragment_id);
+
+ uint64_t message_id = INT64_MAX - 10;
+ msg->message_id = GNUNET_htonll (message_id);
+
+ uint64_t group_generation = INT64_MAX - 3;
+ msg->group_generation = GNUNET_htonll (group_generation);
+
msg->hop_counter = htonl (9);
- msg->fragment_id = GNUNET_htonll (INT64_MAX - 1);
msg->fragment_offset = GNUNET_htonll (0);
- msg->message_id = GNUNET_htonll (INT64_MAX - 10);
- msg->group_generation = GNUNET_htonll (INT64_MAX - 3);
msg->flags = htonl (GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT);
memcpy (&msg[1], &channel_pub_key, sizeof (channel_pub_key));
@@ -225,14 +231,19 @@ run (void *cls, char *const *args, const char *cfgfile,
fcls.msg[0] = msg;
fcls.flags[0] = GNUNET_PSYCSTORE_MESSAGE_STATE;
- GNUNET_assert (GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg,
- fcls.flags[0]));
+ GNUNET_assert (
+ GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg,
+ fcls.flags[0]));
- GNUNET_assert (GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key,
- GNUNET_ntohll (msg->fragment_id),
- fragment_cb, &fcls));
+ uint64_t ret_frags = 0;
+ GNUNET_assert (
+ GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key,
+ fragment_id, fragment_id,
+ &ret_frags, fragment_cb, &fcls));
GNUNET_assert (fcls.n == 1);
+ // FIXME: test fragment_get_latest and message_get_latest
+
fcls.n = 0;
GNUNET_assert (
@@ -250,9 +261,10 @@ run (void *cls, char *const *args, const char *cfgfile,
fcls.n = 0;
fcls.flags[0] |= GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED;
- GNUNET_assert (GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key,
- GNUNET_ntohll (msg->fragment_id),
- fragment_cb, &fcls));
+ GNUNET_assert (
+ GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key,
+ fragment_id, fragment_id,
+ &ret_frags, fragment_cb, &fcls));
GNUNET_assert (fcls.n == 1);
struct GNUNET_MULTICAST_MessageHeader *msg1
@@ -270,15 +282,17 @@ run (void *cls, char *const *args, const char *cfgfile,
GNUNET_assert (GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg1,
fcls.flags[1]));
- uint64_t retfrags = 0;
- GNUNET_assert (GNUNET_OK == db->message_get (db->cls, &channel_pub_key,
- GNUNET_ntohll (msg->message_id),
- &retfrags, fragment_cb, &fcls));
- GNUNET_assert (fcls.n == 2 && retfrags == 2);
+ GNUNET_assert (
+ GNUNET_OK == db->message_get (db->cls, &channel_pub_key,
+ message_id, message_id,
+ &ret_frags, fragment_cb, &fcls));
+ GNUNET_assert (fcls.n == 2 && ret_frags == 2);
/* Message counters */
- uint64_t fragment_id = 0, message_id = 0, group_generation = 0;
+ fragment_id = 0;
+ message_id = 0;
+ group_generation = 0;
GNUNET_assert (
GNUNET_OK == db->counters_message_get (db->cls, &channel_pub_key,
&fragment_id, &message_id,
diff --git a/src/psycstore/test_psycstore.c b/src/psycstore/test_psycstore.c
index 3ef2439e3..125e64f58 100644
--- a/src/psycstore/test_psycstore.c
+++ b/src/psycstore/test_psycstore.c
@@ -25,6 +25,8 @@
* @author Christian Grothoff
*/
+#include <inttypes.h>
+
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_common.h"
@@ -302,19 +304,22 @@ fragment_result (void *cls,
enum GNUNET_PSYCSTORE_MessageFlags flags)
{
struct FragmentClosure *fcls = cls;
+ GNUNET_assert (fcls->n < fcls->n_expected);
struct GNUNET_MULTICAST_MessageHeader *msg0 = fcls->msg[fcls->n];
uint64_t flags0 = fcls->flags[fcls->n++];
if (flags == flags0 && msg->header.size == msg0->header.size
&& 0 == memcmp (msg, msg0, ntohs (msg->header.size)))
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " fragment %llu matches\n",
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " fragment %" PRIu64 " matches\n",
GNUNET_ntohll (msg->fragment_id));
return GNUNET_YES;
}
else
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " fragment %llu differs\n",
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ " fragment differs: expected %" PRIu64 ", got %" PRIu64 "\n",
+ GNUNET_ntohll (msg0->fragment_id),
GNUNET_ntohll (msg->fragment_id));
GNUNET_assert (0);
return GNUNET_SYSERR;
@@ -323,13 +328,12 @@ fragment_result (void *cls,
void
-message_get_result (void *cls, int64_t result, const char *err_msg)
+message_get_latest_result (void *cls, int64_t result, const char *err_msg)
{
struct FragmentClosure *fcls = cls;
op = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get:\t%d\n", result);
- GNUNET_assert (result > 0 && fcls->n && fcls->n_expected);
-
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_latest:\t%d\n", result);
+ GNUNET_assert (0 < result && fcls->n == fcls->n_expected);
modifiers[0] = (struct GNUNET_ENV_Modifier) {
.oper = '=',
@@ -351,29 +355,46 @@ message_get_result (void *cls, int64_t result, const char *err_msg)
void
+message_get_result (void *cls, int64_t result, const char *err_msg)
+{
+ struct FragmentClosure *fcls = cls;
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get:\t%d\n", result);
+ GNUNET_assert (0 < result && fcls->n == fcls->n_expected);
+
+ fcls->n = 0;
+ fcls->n_expected = 3;
+ op = GNUNET_PSYCSTORE_message_get_latest (h, &channel_pub_key, &slave_pub_key,
+ 1, &fragment_result,
+ &message_get_latest_result, fcls);
+}
+
+
+void
message_get_fragment_result (void *cls, int64_t result, const char *err_msg)
{
struct FragmentClosure *fcls = cls;
op = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_fragment:\t%d\n", result);
- GNUNET_assert (result > 0 && fcls->n && fcls->n_expected);
+ GNUNET_assert (0 < result && fcls->n == fcls->n_expected);
fcls->n = 0;
fcls->n_expected = 3;
+ uint64_t message_id = GNUNET_ntohll (fcls->msg[0]->message_id);
op = GNUNET_PSYCSTORE_message_get (h, &channel_pub_key, &slave_pub_key,
- GNUNET_ntohll (fcls->msg[0]->message_id),
+ message_id, message_id,
&fragment_result,
&message_get_result, fcls);
}
void
-fragment_get_result (void *cls, int64_t result, const char *err_msg)
+fragment_get_latest_result (void *cls, int64_t result, const char *err_msg)
{
struct FragmentClosure *fcls = cls;
op = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_get:\t%d\n", result);
- GNUNET_assert (result > 0 && fcls->n && fcls->n_expected);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_get_latest:\t%d\n", result);
+ GNUNET_assert (0 < result && fcls->n == fcls->n_expected);
fcls->n = 1;
fcls->n_expected = 2;
@@ -381,9 +402,24 @@ fragment_get_result (void *cls, int64_t result, const char *err_msg)
GNUNET_ntohll (fcls->msg[1]->message_id),
GNUNET_ntohll (fcls->msg[1]->fragment_offset),
&fragment_result,
- &message_get_fragment_result,
- fcls);
+ &message_get_fragment_result, fcls);
+}
+
+void
+fragment_get_result (void *cls, int64_t result, const char *err_msg)
+{
+ struct FragmentClosure *fcls = cls;
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_get:\t%d\n", result);
+ GNUNET_assert (0 < result && fcls->n == fcls->n_expected);
+
+ fcls->n = 0;
+ fcls->n_expected = 3;
+ op = GNUNET_PSYCSTORE_fragment_get_latest (h, &channel_pub_key,
+ &slave_pub_key, fcls->n_expected,
+ &fragment_result,
+ &fragment_get_latest_result, fcls);
}
@@ -398,8 +434,9 @@ fragment_store_result (void *cls, int64_t result, const char *err_msg)
{ /* last fragment */
fcls.n = 0;
fcls.n_expected = 1;
+ uint64_t fragment_id = GNUNET_ntohll (fcls.msg[0]->fragment_id);
op = GNUNET_PSYCSTORE_fragment_get (h, &channel_pub_key, &slave_pub_key,
- GNUNET_ntohll (fcls.msg[0]->fragment_id),
+ fragment_id, fragment_id,
&fragment_result,
&fragment_get_result, &fcls);
}
diff --git a/src/social/gnunet-service-social.c b/src/social/gnunet-service-social.c
index c75589811..d297a153b 100644
--- a/src/social/gnunet-service-social.c
+++ b/src/social/gnunet-service-social.c
@@ -467,17 +467,17 @@ client_send_msg (const struct Place *plc,
* Called after a PSYC master is started.
*/
static void
-psyc_master_started (void *cls, uint64_t max_message_id)
+psyc_master_started (void *cls, int result, uint64_t max_message_id)
{
struct Host *hst = cls;
struct Place *plc = &hst->plc;
plc->max_message_id = max_message_id;
plc->is_ready = GNUNET_YES;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK);
res.header.size = htons (sizeof (res));
- res.result_code = htonl (GNUNET_OK);
+ res.result_code = htonl (result - INT32_MIN);
res.max_message_id = GNUNET_htonll (plc->max_message_id);
client_send_msg (plc, &res.header);
@@ -507,17 +507,17 @@ psyc_recv_join_request (void *cls,
* Called after a PSYC slave is connected.
*/
static void
-psyc_slave_connected (void *cls, uint64_t max_message_id)
+psyc_slave_connected (void *cls, int result, uint64_t max_message_id)
{
struct Guest *gst = cls;
struct Place *plc = &gst->plc;
plc->max_message_id = max_message_id;
plc->is_ready = GNUNET_YES;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK);
res.header.size = htons (sizeof (res));
- res.result_code = htonl (GNUNET_OK);
+ res.result_code = htonl (result - INT32_MIN);
res.max_message_id = GNUNET_htonll (plc->max_message_id);
client_send_msg (plc, &res.header);
@@ -608,7 +608,7 @@ client_recv_host_enter (void *cls, struct GNUNET_SERVER_Client *client,
{
plc = &hst->plc;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (GNUNET_OK);
@@ -724,7 +724,7 @@ client_recv_guest_enter (void *cls, struct GNUNET_SERVER_Client *client,
{
plc = &gst->plc;
- struct CountersResult res;
+ struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (GNUNET_OK);
diff --git a/src/social/social.h b/src/social/social.h
index 00edaefd1..5de1e5e80 100644
--- a/src/social/social.h
+++ b/src/social/social.h
@@ -88,25 +88,6 @@ struct GuestEnterRequest
/**** service -> library ****/
-struct CountersResult
-{
- /**
- * Type: GNUNET_MESSAGE_TYPE_PSYC_RESULT_COUNTERS
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Status code for the operation.
- */
- int32_t result_code GNUNET_PACKED;
-
- /**
- * Last message ID sent to the channel.
- */
- uint64_t max_message_id;
-};
-
-
#if REMOVE
struct NymEnterRequest
{
diff --git a/src/social/social_api.c b/src/social/social_api.c
index 76fd0f9f9..dfcf18883 100644
--- a/src/social/social_api.c
+++ b/src/social/social_api.c
@@ -640,8 +640,9 @@ host_recv_enter_ack (void *cls,
struct GNUNET_PSYC_CountersResultMessage *
cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
+ int32_t result = ntohl (cres->result_code) + INT32_MIN;
if (NULL != hst->enter_cb)
- hst->enter_cb (hst->cb_cls, GNUNET_ntohll (cres->max_message_id));
+ hst->enter_cb (hst->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
}
@@ -704,9 +705,9 @@ guest_recv_enter_ack (void *cls,
struct GNUNET_PSYC_CountersResultMessage *
cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
+ int32_t result = ntohl (cres->result_code) + INT32_MIN;
if (NULL != gst->enter_cb)
- gst->enter_cb (gst->cb_cls, ntohl (cres->result_code),
- GNUNET_ntohll (cres->max_message_id));
+ gst->enter_cb (gst->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
}
@@ -734,7 +735,7 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler host_handlers[] =
{
{ &host_recv_enter_ack, NULL,
GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK,
- sizeof (struct CountersResult), GNUNET_NO },
+ sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO },
{ &host_recv_enter_request, NULL,
GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
@@ -758,7 +759,7 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler guest_handlers[] =
{
{ &guest_recv_enter_ack, NULL,
GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK,
- sizeof (struct CountersResult), GNUNET_NO },
+ sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO },
{ &host_recv_enter_request, NULL,
GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
@@ -1598,18 +1599,22 @@ struct GNUNET_SOCIAL_WatchHandle;
/**
* Watch a place for changed objects.
*
- * @param place Place to watch.
- * @param object_filter Object prefix to match.
- * @param state_cb Function to call when an object/state changes.
- * @param state_cb_cls Closure for callback.
+ * @param place
+ * Place to watch.
+ * @param object_filter
+ * Object prefix to match.
+ * @param state_var_cb
+ * Function to call when an object/state var changes.
+ * @param cls
+ * Closure for callback.
*
* @return Handle that can be used to cancel watching.
*/
struct GNUNET_SOCIAL_WatchHandle *
GNUNET_SOCIAL_place_watch (struct GNUNET_SOCIAL_Place *place,
const char *object_filter,
- GNUNET_PSYC_StateCallback state_cb,
- void *state_cb_cls)
+ GNUNET_PSYC_StateVarCallback state_var_cb,
+ void *cls)
{
return NULL;
}
@@ -1633,18 +1638,22 @@ struct GNUNET_SOCIAL_LookHandle;
/**
* Look at objects in the place with a matching name prefix.
*
- * @param place The place to look its objects at.
- * @param name_prefix Look at objects with names beginning with this value.
- * @param state_cb Function to call for each object found.
- * @param state_cb_cls Closure for callback function.
+ * @param place
+ * The place to look its objects at.
+ * @param name_prefix
+ * Look at objects with names beginning with this value.
+ * @param state_var_cb
+ * Function to call for each object found.
+ * @param cls
+ * Closure for callback function.
*
* @return Handle that can be used to stop looking at objects.
*/
struct GNUNET_SOCIAL_LookHandle *
GNUNET_SOCIAL_place_look (struct GNUNET_SOCIAL_Place *place,
const char *name_prefix,
- GNUNET_PSYC_StateCallback state_cb,
- void *state_cb_cls)
+ GNUNET_PSYC_StateVarCallback state_var_cb,
+ void *cls)
{
return NULL;
}