From 4725d59b468f1f30ba2910992333ca157682ce29 Mon Sep 17 00:00:00 2001 From: Gabor X Toth <*@tg-x.net> Date: Thu, 7 May 2015 12:15:58 +0000 Subject: psyc/social: request history & state from psycstore; more documentation, tests, cleanup --- src/psyc/gnunet-service-psyc.c | 355 ++++++++++++++++++--------- src/psyc/psyc.h | 36 --- src/psyc/psyc_api.c | 532 +++++++++++++++++++++++++++-------------- src/psyc/psyc_util_lib.c | 38 ++- src/psyc/test_psyc.c | 352 +++++++++++++++------------ 5 files changed, 803 insertions(+), 510 deletions(-) (limited to 'src/psyc') diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 5ebbe6444..2bc128c4f 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c @@ -181,11 +181,24 @@ struct FragmentQueue /** * List of connected clients. */ -struct ClientListItem +struct Client { - struct ClientListItem *prev; - struct ClientListItem *next; + struct Client *prev; + struct Client *next; + + struct GNUNET_SERVER_Client *client; +}; + + +struct Operation +{ + struct Operation *prev; + struct Operation *next; + struct GNUNET_SERVER_Client *client; + struct Channel *chn; + uint64_t op_id; + uint32_t flags; }; @@ -194,8 +207,11 @@ struct ClientListItem */ struct Channel { - struct ClientListItem *clients_head; - struct ClientListItem *clients_tail; + struct Client *clients_head; + struct Client *clients_tail; + + struct Operation *op_head; + struct Operation *op_tail; struct TransmitMessage *tmit_head; struct TransmitMessage *tmit_tail; @@ -397,14 +413,6 @@ struct Slave }; -struct OperationClosure -{ - struct GNUNET_SERVER_Client *client; - struct Channel *chn; - uint64_t op_id; -}; - - static void transmit_message (struct Channel *chn); @@ -435,6 +443,28 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } +static struct Operation * +op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client, + uint64_t op_id, uint32_t flags) +{ + struct Operation *op = GNUNET_malloc (sizeof (*op)); + op->client = client; + op->chn = chn; + op->op_id = op_id; + op->flags = flags; + GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op); + return op; +} + + +static void +op_remove (struct Channel *chn, struct Operation *op) +{ + GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op); + GNUNET_free (op); +} + + /** * Clean up master data structures after a client disconnected. */ @@ -541,7 +571,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", GNUNET_h2s (&chn->pub_key_hash)); - struct ClientListItem *cli = chn->clients_head; + struct Client *cli = chn->clients_head; while (NULL != cli) { if (cli->client == client) @@ -553,6 +583,17 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) cli = cli->next; } + struct Operation *op = chn->op_head; + while (NULL != op) + { + if (op->client == client) + { + op->client = NULL; + break; + } + op = op->next; + } + if (NULL == chn->clients_head) { /* Last client disconnected. */ if (NULL != chn->tmit_head) @@ -574,10 +615,10 @@ static void client_send_msg (const struct Channel *chn, const struct GNUNET_MessageHeader *msg) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Sending message to clients.\n", chn); - struct ClientListItem *cli = chn->clients_head; + struct Client *cli = chn->clients_head; while (NULL != cli) { GNUNET_SERVER_notification_context_add (nc, cli->client); @@ -596,33 +637,29 @@ client_send_msg (const struct Channel *chn, * Code to transmit. * @param op_id * Operation ID in network byte order. - * @param err_msg - * Error message to include (or NULL for none). + * @param data + * Data payload or NULL. + * @param data_size + * Size of @a data. */ static void client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id, - int64_t result_code, const char *err_msg) + int64_t result_code, const void *data, uint16_t data_size) { - struct OperationResult *res; - size_t err_size = 0; + struct GNUNET_OperationResultMessage *res; - if (NULL != err_msg) - err_size = strnlen (err_msg, - GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res)) + 1; - res = GNUNET_malloc (sizeof (struct OperationResult) + err_size); + res = GNUNET_malloc (sizeof (*res) + data_size); res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); - res->header.size = htons (sizeof (struct OperationResult) + err_size); - res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1); + res->header.size = htons (sizeof (*res) + data_size); + res->result_code = GNUNET_htonll_signed (result_code); res->op_id = op_id; - if (0 < err_size) - { - memcpy (&res[1], err_msg, err_size); - ((char *) &res[1])[err_size - 1] = '\0'; - } + if (0 < data_size) + memcpy (&res[1], data, data_size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Sending result to client for operation #%" PRIu64 ": " - "%" PRId64 " (%s)\n", - client, GNUNET_ntohll (op_id), result_code, err_msg); + "%" PRId64 " (size: %u)\n", + client, GNUNET_ntohll (op_id), result_code, data_size); GNUNET_SERVER_notification_context_add (nc, client); GNUNET_SERVER_notification_context_unicast (nc, client, &res->header, @@ -647,7 +684,8 @@ struct JoinMemTestClosure * Membership test result callback used for join requests. */ static void -join_mem_test_cb (void *cls, int64_t result, const char *err_msg) +join_mem_test_cb (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { struct JoinMemTestClosure *jcls = cls; @@ -663,6 +701,12 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg) } else { + if (GNUNET_SYSERR == result) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not perform membership test (%.*s)\n", + err_msg_size, err_msg); + } // FIXME: add relays GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL); } @@ -759,12 +803,13 @@ mcast_recv_join_decision (void *cls, int is_admitted, * Received result of GNUNET_PSYCSTORE_membership_test() */ static void -store_recv_membership_test_result (void *cls, int64_t result, const char *err_msg) +store_recv_membership_test_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%s)\n", - mth, result, err_msg); + "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%.*s)\n", + mth, result, err_msg_size, err_msg); GNUNET_MULTICAST_membership_test_result (mth, result); } @@ -805,12 +850,13 @@ store_recv_fragment_replay (void *cls, * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. */ static void -store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg) +store_recv_fragment_replay_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { struct GNUNET_MULTICAST_ReplayHandle *rh = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n", - rh, result, err_msg); + "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n", + rh, result, err_msg_size, err_msg); switch (result) { @@ -867,7 +913,7 @@ mcast_recv_replay_message (void *cls, { struct Channel *chn = cls; GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, - message_id, message_id, + message_id, message_id, NULL, &store_recv_fragment_replay, &store_recv_fragment_replay_result, rh); } @@ -910,6 +956,42 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) } +/** + * Initialize PSYC message header. + */ +static inline void +psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg, + const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) +{ + uint16_t size = ntohs (mmsg->header.size); + uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); + + pmsg->header.size = htons (psize); + pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + pmsg->message_id = mmsg->message_id; + pmsg->fragment_offset = mmsg->fragment_offset; + pmsg->flags = htonl (flags); + + memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); +} + + +/** + * Create a new PSYC message from a multicast message for sending it to clients. + */ +static inline struct GNUNET_PSYC_MessageHeader * +psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) +{ + struct GNUNET_PSYC_MessageHeader *pmsg; + uint16_t size = ntohs (mmsg->header.size); + uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); + + pmsg = GNUNET_malloc (psize); + psyc_msg_init (pmsg, mmsg, flags); + return pmsg; +} + + /** * Send multicast message to all clients connected to the channel. */ @@ -918,24 +1000,13 @@ client_send_mcast_msg (struct Channel *chn, const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags) { - struct GNUNET_PSYC_MessageHeader *pmsg; - uint16_t size = ntohs (mmsg->header.size); - uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Sending multicast message to client. " "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", chn, GNUNET_ntohll (mmsg->fragment_id), GNUNET_ntohll (mmsg->message_id)); - pmsg = GNUNET_malloc (psize); - pmsg->header.size = htons (psize); - pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); - pmsg->message_id = mmsg->message_id; - pmsg->fragment_offset = mmsg->fragment_offset; - pmsg->flags = htonl (flags); - - memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); + struct GNUNET_PSYC_MessageHeader *pmsg = psyc_msg_new (mmsg, flags); client_send_msg (chn, &pmsg->header); GNUNET_free (pmsg); } @@ -1327,12 +1398,13 @@ message_queue_drop (struct Channel *chn) * Received result of GNUNET_PSYCSTORE_fragment_store(). */ static void -store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg) +store_recv_fragment_store_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { struct Channel *chn = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n", - chn, result, err_msg); + "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n", + chn, result, err_msg_size, err_msg); } @@ -1430,7 +1502,7 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id, struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); res.header.size = htons (sizeof (res)); - res.result_code = htonl (result - INT32_MIN); + res.result_code = GNUNET_htonl_signed (result); res.max_message_id = GNUNET_htonll (max_message_id); if (GNUNET_OK == result || GNUNET_NO == result) @@ -1476,7 +1548,7 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); res.header.size = htons (sizeof (res)); - res.result_code = htonl (result - INT32_MIN); + res.result_code = GNUNET_htonl_signed (result); res.max_message_id = GNUNET_htonll (max_message_id); if (GNUNET_OK == result || GNUNET_NO == result) @@ -1566,7 +1638,7 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client, struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); res.header.size = htons (sizeof (res)); - res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN); + res.result_code = GNUNET_htonl_signed (GNUNET_OK); res.max_message_id = GNUNET_htonll (mst->max_message_id); GNUNET_SERVER_notification_context_add (nc, client); @@ -1578,7 +1650,7 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client, "%p Client connected as master to channel %s.\n", mst, GNUNET_h2s (&chn->pub_key_hash)); - struct ClientListItem *cli = GNUNET_new (struct ClientListItem); + struct Client *cli = GNUNET_new (struct Client); cli->client = client; GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); @@ -1677,7 +1749,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, struct GNUNET_PSYC_CountersResultMessage res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); res.header.size = htons (sizeof (res)); - res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN); + res.result_code = GNUNET_htonl_signed (GNUNET_OK); res.max_message_id = GNUNET_htonll (chn->max_message_id); GNUNET_SERVER_notification_context_add (nc, client); @@ -1716,7 +1788,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, "%p Client connected as slave to channel %s.\n", slv, GNUNET_h2s (&chn->pub_key_hash)); - struct ClientListItem *cli = GNUNET_new (struct ClientListItem); + struct Client *cli = GNUNET_new (struct Client); cli->client = client; GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); @@ -2119,14 +2191,15 @@ struct MembershipStoreClosure * Received result of GNUNET_PSYCSTORE_membership_store() */ static void -store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg) +store_recv_membership_store_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { struct MembershipStoreClosure *mcls = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n", - mcls->chn, result, err_msg); + "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n", + mcls->chn, result, err_msg_size, err_msg); - client_send_result (mcls->client, mcls->op_id, result, err_msg); + client_send_result (mcls->client, mcls->op_id, result, err_msg, err_msg_size); } @@ -2165,36 +2238,73 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client, } +/** + * Received a fragment for GNUNET_PSYCSTORE_fragment_get(), + * in response to a history request from a client. + */ static int store_recv_fragment_history (void *cls, - struct GNUNET_MULTICAST_MessageHeader *msg, + struct GNUNET_MULTICAST_MessageHeader *mmsg, enum GNUNET_PSYCSTORE_MessageFlags flags) { - struct OperationClosure *opcls = cls; - struct Channel *chn = opcls->chn; - client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC); + struct Operation *op = cls; + if (NULL == op->client) + { /* Requesting client already disconnected. */ + return GNUNET_NO; + } + struct Channel *chn = op->chn; + + struct GNUNET_PSYC_MessageHeader *pmsg; + uint16_t msize = ntohs (mmsg->header.size); + uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg); + + struct GNUNET_OperationResultMessage * + res = GNUNET_malloc (sizeof (*res) + psize); + res->header.size = htons (sizeof (*res) + psize); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT); + res->op_id = op->op_id; + res->result_code = GNUNET_htonll_signed (GNUNET_OK); + + pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; + psyc_msg_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC); + memcpy (&res[1], pmsg, psize); + + /** @todo FIXME: send only to requesting client */ + client_send_msg (chn, &res->header); return GNUNET_YES; } /** - * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. + * Received the result of GNUNET_PSYCSTORE_fragment_get(), + * in response to a history request from a client. */ static void -store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg) +store_recv_fragment_history_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { - struct OperationClosure *opcls = cls; + struct Operation *op = cls; + if (NULL == op->client) + { /* Requesting client already disconnected. */ + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p History replay #%" PRIu64 ": " - "PSYCSTORE returned %" PRId64 " (%s)\n", - opcls->chn, opcls->op_id, result, err_msg); + "PSYCSTORE returned %" PRId64 " (%.*s)\n", + op->chn, op->op_id, result, err_msg_size, err_msg); + + if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE) + { + /** @todo Multicast replay request for messages not found locally. */ + } - client_send_result (opcls->client, opcls->op_id, result, err_msg); + client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); } /** - * Client requests channel history from PSYCstore. + * Client requests channel history. */ static void client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, @@ -2204,26 +2314,39 @@ client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); GNUNET_assert (NULL != chn); - const struct HistoryRequest * - req = (const struct HistoryRequest *) msg; + const struct GNUNET_PSYC_HistoryRequestMessage * + req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg; + uint16_t size = ntohs (msg->size); + const char *method_prefix = (const char *) &req[1]; - struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); - opcls->client = client; - opcls->chn = chn; - opcls->op_id = req->op_id; + if (size < sizeof (*req) + 1 + || '\0' != method_prefix[size - sizeof (*req) - 1]) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p History replay #%" PRIu64 ": " + "invalid method prefix. size: %u < %u?\n", + chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1); + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + + struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags)); if (0 == req->message_limit) GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL, GNUNET_ntohll (req->start_message_id), GNUNET_ntohll (req->end_message_id), + method_prefix, &store_recv_fragment_history, - &store_recv_fragment_history_result, opcls); + &store_recv_fragment_history_result, op); else GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL, GNUNET_ntohll (req->message_limit), + method_prefix, &store_recv_fragment_history, &store_recv_fragment_history_result, - opcls); + op); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2236,19 +2359,19 @@ static int store_recv_state_var (void *cls, const char *name, const void *value, size_t value_size) { - struct OperationClosure *opcls = cls; - struct OperationResult *op; + struct Operation *op = cls; + struct GNUNET_OperationResultMessage *res; if (NULL != name) { uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; struct GNUNET_PSYC_MessageModifier *mod; - op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size); - op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size); - op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); - op->op_id = opcls->op_id; + res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size); + res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); + res->op_id = op->op_id; - mod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; + mod = (struct GNUNET_PSYC_MessageModifier *) &res[1]; mod->header.size = htons (sizeof (*mod) + name_size + value_size); mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); mod->name_size = htons (name_size); @@ -2260,19 +2383,20 @@ store_recv_state_var (void *cls, const char *name, else { struct GNUNET_MessageHeader *mod; - op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size); - op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size); - op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); - op->op_id = opcls->op_id; + res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size); + res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); + res->op_id = op->op_id; - mod = (struct GNUNET_MessageHeader *) &op[1]; + mod = (struct GNUNET_MessageHeader *) &res[1]; mod->size = htons (sizeof (*mod) + value_size); mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); memcpy (&mod[1], value, value_size); } - GNUNET_SERVER_notification_context_add (nc, opcls->client); - GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header, + // FIXME: client might have been disconnected + GNUNET_SERVER_notification_context_add (nc, op->client); + GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header, GNUNET_NO); GNUNET_free (op); return GNUNET_YES; @@ -2284,15 +2408,17 @@ store_recv_state_var (void *cls, const char *name, * or GNUNET_PSYCSTORE_state_get_prefix() */ static void -store_recv_state_result (void *cls, int64_t result, const char *err_msg) +store_recv_state_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) { - struct OperationClosure *opcls = cls; + struct Operation *op = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p History replay #%" PRIu64 ": " - "PSYCSTORE returned %" PRId64 " (%s)\n", - opcls->chn, opcls->op_id, result, err_msg); + "PSYCSTORE returned %" PRId64 " (%.*s)\n", + op->chn, op->op_id, result, err_msg_size, err_msg); - client_send_result (opcls->client, opcls->op_id, result, err_msg); + // FIXME: client might have been disconnected + client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); } @@ -2314,18 +2440,15 @@ client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client, const char *name = (const char *) &req[1]; if (0 == name_size || '\0' != name[name_size - 1]) { + GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); - opcls->client = client; - opcls->chn = chn; - opcls->op_id = req->op_id; - + struct Operation *op = op_add (chn, client, req->op_id, 0); GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name, &store_recv_state_var, - &store_recv_state_result, opcls); + &store_recv_state_result, op); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2348,20 +2471,16 @@ client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, const char *name = (const char *) &req[1]; if (0 == name_size || '\0' != name[name_size - 1]) { + GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); - opcls->client = client; - opcls->chn = chn; - opcls->op_id = req->op_id; - + struct Operation *op = op_add (chn, client, req->op_id, 0); GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name, &store_recv_state_var, - &store_recv_state_result, opcls); + &store_recv_state_result, op); GNUNET_SERVER_receive_done (client, GNUNET_OK); - } diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 4bc92532f..e85e14c0e 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h @@ -171,42 +171,6 @@ struct StateRequest /**** service -> library ****/ -/** - * Answer from service to client about last operation. - */ -struct OperationResult -{ - /** - * Types: - * - GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE - * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STORY_RESULT - * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_RESULT - */ - struct GNUNET_MessageHeader header; - - uint32_t reserved GNUNET_PACKED; - - /** - * Operation ID. - */ - uint64_t op_id GNUNET_PACKED; - - /** - * Status code for the operation. - */ - uint64_t result_code GNUNET_PACKED; - - /* Followed by: - * - on error: NUL-terminated error message - * - on success: one of the following message types - * - * For a STATE_RESULT, one of: - * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER - * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT - * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END - */ -}; - GNUNET_NETWORK_STRUCT_END #endif diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index ce994b272..7839aaf9e 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c @@ -43,33 +43,6 @@ #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) -struct OperationListItem -{ - struct OperationListItem *prev; - struct OperationListItem *next; - - /** - * Operation ID. - */ - uint64_t op_id; - - /** - * Continuation to invoke with the result of an operation. - */ - GNUNET_PSYC_ResultCallback result_cb; - - /** - * State variable result callback. - */ - GNUNET_PSYC_StateVarCallback state_var_cb; - - /** - * Closure for the callbacks. - */ - void *cls; -}; - - /** * Handle to access PSYC channel operations for both the master and slaves. */ @@ -110,21 +83,6 @@ struct GNUNET_PSYC_Channel */ void *disconnect_cls; - /** - * First operation in the linked list. - */ - struct OperationListItem *op_head; - - /** - * Last operation in the linked list. - */ - struct OperationListItem *op_tail; - - /** - * Last operation ID used. - */ - uint64_t last_op_id; - /** * Are we polling for incoming messages right now? */ @@ -204,83 +162,62 @@ struct GNUNET_PSYC_SlaveTransmitHandle }; -/** - * Get a fresh operation ID to distinguish between PSYCstore requests. - * - * @param h Handle to the PSYCstore service. - * @return next operation id to use - */ -static uint64_t -op_get_next_id (struct GNUNET_PSYC_Channel *chn) -{ - return ++chn->last_op_id; -} - - -/** - * Find operation by ID. - * - * @return Operation, or NULL if none found. - */ -static struct OperationListItem * -op_find_by_id (struct GNUNET_PSYC_Channel *chn, uint64_t op_id) +struct GNUNET_PSYC_HistoryRequest { - struct OperationListItem *op = chn->op_head; - while (NULL != op) - { - if (op->op_id == op_id) - return op; - op = op->next; - } - return NULL; -} + /** + * Channel. + */ + struct GNUNET_PSYC_Channel *chn; + /** + * Operation ID. + */ + uint64_t op_id; -static uint64_t -op_add (struct GNUNET_PSYC_Channel *chn, GNUNET_PSYC_ResultCallback result_cb, - void *cls) -{ - if (NULL == result_cb) - return 0; + /** + * Message handler. + */ + struct GNUNET_PSYC_ReceiveHandle *recv; - struct OperationListItem *op = GNUNET_malloc (sizeof (*op)); - op->op_id = op_get_next_id (chn); - op->result_cb = result_cb; - op->cls = cls; - GNUNET_CONTAINER_DLL_insert_tail (chn->op_head, chn->op_tail, op); + /** + * Function to call when the operation finished. + */ + GNUNET_ResultCallback result_cb; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "%p Added operation #%" PRIu64 "\n", chn, op->op_id); - return op->op_id; -} + /** + * Closure for @a result_cb. + */ + void *cls; +}; -static int -op_result (struct GNUNET_PSYC_Channel *chn, uint64_t op_id, - int64_t result_code, const char *err_msg) +struct GNUNET_PSYC_StateRequest { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "%p Received result for operation #%" PRIu64 ": %" PRId64 " (%s)\n", - chn, op_id, result_code, err_msg); - if (0 == op_id) - return GNUNET_NO; + /** + * Channel. + */ + struct GNUNET_PSYC_Channel *chn; - struct OperationListItem *op = op_find_by_id (chn, op_id); - if (NULL == op) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Could not find operation #%" PRIu64 "\n", op_id); - return GNUNET_NO; - } + /** + * Operation ID. + */ + uint64_t op_id; - GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op); + /** + * State variable result callback. + */ + GNUNET_PSYC_StateVarCallback var_cb; - if (NULL != op->result_cb) - op->result_cb (op->cls, result_code, err_msg); + /** + * Function to call when the operation finished. + */ + GNUNET_ResultCallback result_cb; - GNUNET_free (op); - return GNUNET_YES; -} + /** + * Closure for @a result_cb. + */ + void *cls; +}; static void @@ -313,22 +250,97 @@ channel_recv_result (void *cls, struct GNUNET_PSYC_Channel * chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); + const struct GNUNET_OperationResultMessage * + res = (const struct GNUNET_OperationResultMessage *) msg; + uint16_t size = ntohs (msg->size); - const struct OperationResult *res = (const struct OperationResult *) msg; - const char *err_msg = NULL; + if (size < sizeof (*res)) + { /* Error, message too small. */ + GNUNET_break (0); + return; + } - if (sizeof (struct OperationResult) < size) - { - err_msg = (const char *) &res[1]; - if ('\0' != err_msg[size - sizeof (struct OperationResult) - 1]) - { - GNUNET_break (0); - err_msg = NULL; - } + uint16_t data_size = size - sizeof (*res); + const char *data = (0 < data_size) ? (void *) &res[1] : NULL; + GNUNET_CLIENT_MANAGER_op_result (chn->client, GNUNET_ntohll (res->op_id), + GNUNET_ntohll_signed (res->result_code), + data, data_size); +} + + +static void +op_recv_history_result (void *cls, int64_t result, + const void *data, uint16_t data_size) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received history replay result: %" PRId64 ".\n", result); + + struct GNUNET_PSYC_HistoryRequest *hist = cls; + + if (NULL != hist->result_cb) + hist->result_cb (hist->cls, result, data, data_size); + + GNUNET_PSYC_receive_destroy (hist->recv); + GNUNET_free (hist); +} + + +static void +op_recv_state_result (void *cls, int64_t result, + const void *data, uint16_t data_size) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received state request result: %" PRId64 ".\n", result); + + struct GNUNET_PSYC_StateRequest *sr = cls; + + if (NULL != sr->result_cb) + sr->result_cb (sr->cls, result, data, data_size); + + GNUNET_free (sr); +} + + +static void +channel_recv_history_result (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_PSYC_Channel * + chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); + + const struct GNUNET_OperationResultMessage * + res = (const struct GNUNET_OperationResultMessage *) msg; + struct GNUNET_PSYC_MessageHeader * + pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Received historic fragment for message #%" PRIu64 ".\n", + chn, GNUNET_ntohll (pmsg->message_id)); + + GNUNET_ResultCallback result_cb = NULL; + struct GNUNET_PSYC_HistoryRequest *hist = NULL; + + if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client, + GNUNET_ntohll (res->op_id), + &result_cb, (void *) &hist)) + { /* Operation not found. */ + LOG (GNUNET_ERROR_TYPE_WARNING, + "%p Replay operation not found for historic fragment of message #%" + PRIu64 ".\n", + chn, GNUNET_ntohll (pmsg->message_id)); + return; } - op_result (chn, GNUNET_ntohll (res->op_id), - GNUNET_ntohll (res->result_code) + INT64_MIN, err_msg); + uint16_t size = ntohs (msg->size); + if (size < sizeof (*res) + sizeof (*pmsg)) + { /* Error, message too small. */ + GNUNET_break (0); + return; + } + + GNUNET_PSYC_receive_message (hist->recv, + (const struct GNUNET_PSYC_MessageHeader *) pmsg); } @@ -340,12 +352,21 @@ channel_recv_state_result (void *cls, struct GNUNET_PSYC_Channel * chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); - const struct OperationResult *res = (const struct OperationResult *) msg; - struct OperationListItem *op = op_find_by_id (chn, GNUNET_ntohll (res->op_id)); - if (NULL == op || NULL == op->state_var_cb) + const struct GNUNET_OperationResultMessage * + res = (const struct GNUNET_OperationResultMessage *) msg; + + GNUNET_ResultCallback result_cb = NULL; + struct GNUNET_PSYC_StateRequest *sr = NULL; + + if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client, + GNUNET_ntohll (res->op_id), + &result_cb, (void *) &sr)) + { /* Operation not found. */ return; + } - const struct GNUNET_MessageHeader *modc = (struct GNUNET_MessageHeader *) &op[1]; + const struct GNUNET_MessageHeader * + modc = (struct GNUNET_MessageHeader *) &res[1]; uint16_t modc_size = ntohs (modc->size); if (ntohs (msg->size) - sizeof (*msg) != modc_size) { @@ -366,13 +387,13 @@ channel_recv_state_result (void *cls, GNUNET_break (0); return; } - op->state_var_cb (op->cls, name, name + name_size, ntohs (mod->value_size)); + sr->var_cb (sr->cls, name, name + name_size, ntohs (mod->value_size)); break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: - op->state_var_cb (op->cls, NULL, (const char *) &modc[1], - modc_size - sizeof (*modc)); + sr->var_cb (sr->cls, NULL, (const char *) &modc[1], + modc_size - sizeof (*modc)); break; } } @@ -412,11 +433,12 @@ master_recv_start_ack (void *cls, struct GNUNET_PSYC_CountersResultMessage * cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; - int32_t result = ntohl (cres->result_code) + INT32_MIN; + int32_t result = GNUNET_ntohl_signed (cres->result_code); if (GNUNET_OK != result && GNUNET_NO != result) { - LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master.\n"); + LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master: %ld\n", result); GNUNET_break (0); + /* FIXME: disconnect */ } if (NULL != mst->start_cb) mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); @@ -464,11 +486,12 @@ slave_recv_join_ack (void *cls, sizeof (struct GNUNET_PSYC_Channel)); struct GNUNET_PSYC_CountersResultMessage * cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; - int32_t result = ntohl (cres->result_code) + INT32_MIN; + int32_t result = GNUNET_ntohl_signed (cres->result_code); if (GNUNET_YES != result && GNUNET_NO != result) { LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n"); GNUNET_break (0); + /* FIXME: disconnect */ } if (NULL != slv->connect_cb) slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); @@ -513,13 +536,17 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, + { &channel_recv_history_result, NULL, + GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, + sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, + { &channel_recv_state_result, NULL, GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, - sizeof (struct OperationResult), GNUNET_YES }, + sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, { &channel_recv_result, NULL, GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, - sizeof (struct OperationResult), GNUNET_YES }, + sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, @@ -545,13 +572,17 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, + { &channel_recv_history_result, NULL, + GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, + sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, + { &channel_recv_state_result, NULL, GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, - sizeof (struct OperationResult), GNUNET_YES }, + sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, { &channel_recv_result, NULL, GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, - sizeof (struct OperationResult), GNUNET_YES }, + sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, @@ -1011,17 +1042,28 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv) * correctly; not doing so correctly will result in either denying other slaves * access or offering access to channel data to non-members. * - * @param channel Channel handle. - * @param slave_key Identity of channel slave to add. - * @param announced_at ID of the message that announced the membership change. - * @param effective_since Addition of slave is in effect since this message ID. + * @param chn + * Channel handle. + * @param slave_key + * Identity of channel slave to add. + * @param announced_at + * ID of the message that announced the membership change. + * @param effective_since + * Addition of slave is in effect since this message ID. + * @param result_cb + * Function to call with the result of the operation. + * The @e result_code argument is #GNUNET_OK on success, or + * #GNUNET_SYSERR on error. In case of an error, the @e data argument + * can contain an optional error message. + * @param cls + * Closure for @a result_cb. */ void GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t announced_at, uint64_t effective_since, - GNUNET_PSYC_ResultCallback result_cb, + GNUNET_ResultCallback result_cb, void *cls) { struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); @@ -1031,7 +1073,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, req->announced_at = GNUNET_htonll (announced_at); req->effective_since = GNUNET_htonll (effective_since); req->did_join = GNUNET_YES; - req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); + req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client, + result_cb, cls)); GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); } @@ -1054,15 +1097,25 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, * denying members access or offering access to channel data to * non-members. * - * @param channel Channel handle. - * @param slave_key Identity of channel slave to remove. - * @param announced_at ID of the message that announced the membership change. + * @param chn + * Channel handle. + * @param slave_key + * Identity of channel slave to remove. + * @param announced_at + * ID of the message that announced the membership change. + * @param result_cb + * Function to call with the result of the operation. + * The @e result_code argument is #GNUNET_OK on success, or + * #GNUNET_SYSERR on error. In case of an error, the @e data argument + * can contain an optional error message. + * @param cls + * Closure for @a result_cb. */ void GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, uint64_t announced_at, - GNUNET_PSYC_ResultCallback result_cb, + GNUNET_ResultCallback result_cb, void *cls) { struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); @@ -1071,17 +1124,62 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, req->slave_key = *slave_key; req->announced_at = GNUNET_htonll (announced_at); req->did_join = GNUNET_NO; - req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); + req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client, + result_cb, cls)); + + GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); +} + + +static struct GNUNET_PSYC_HistoryRequest * +channel_history_replay (struct GNUNET_PSYC_Channel *chn, + uint64_t start_message_id, + uint64_t end_message_id, + uint64_t message_limit, + const char *method_prefix, + uint32_t flags, + GNUNET_PSYC_MessageCallback message_cb, + GNUNET_PSYC_MessagePartCallback message_part_cb, + GNUNET_ResultCallback result_cb, + void *cls) +{ + struct GNUNET_PSYC_HistoryRequestMessage *req; + struct GNUNET_PSYC_HistoryRequest *hist = GNUNET_malloc (sizeof (*hist)); + hist->chn = chn; + hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); + hist->result_cb = result_cb; + hist->cls = cls; + hist->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client, + &op_recv_history_result, hist); + + GNUNET_assert (NULL != method_prefix); + uint16_t method_size = strnlen (method_prefix, + GNUNET_SERVER_MAX_MESSAGE_SIZE + - sizeof (*req)) + 1; + GNUNET_assert ('\0' == method_prefix[method_size - 1]); + req = GNUNET_malloc (sizeof (*req) + method_size); + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); + req->header.size = htons (sizeof (*req) + method_size); + req->start_message_id = GNUNET_htonll (start_message_id); + req->end_message_id = GNUNET_htonll (end_message_id); + req->message_limit = GNUNET_htonll (message_limit); + req->flags = htonl (flags); + req->op_id = GNUNET_htonll (hist->op_id); + memcpy (&req[1], method_prefix, method_size); GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); + return hist; } /** * Request to replay a part of the message history of the channel. * - * Historic messages (but NOT the state at the time) will be replayed (given to - * the normal method handlers) if available and if access is permitted. + * Historic messages (but NOT the state at the time) will be replayed and given + * to the normal method handlers with a #GNUNET_PSYC_MESSAGE_HISTORIC flag set. + * + * Messages are retrieved from the local PSYCstore if available, + * otherwise requested from the network. * * @param channel * Which channel should be replayed? @@ -1089,8 +1187,10 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, * Earliest interesting point in history. * @param end_message_id * Last (inclusive) interesting point in history. - * FIXME: @param method_prefix + * @param method_prefix * Retrieve only messages with a matching method prefix. + * @param flags + * OR'ed enum GNUNET_PSYC_HistoryReplayFlags * @param result_cb * Function to call when the requested history has been fully replayed. * @param cls @@ -1098,22 +1198,20 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, * * @return Handle to cancel history replay operation. */ -void +struct GNUNET_PSYC_HistoryRequest * GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn, uint64_t start_message_id, uint64_t end_message_id, - /* FIXME: const char *method_prefix, */ - GNUNET_PSYC_ResultCallback result_cb, + const char *method_prefix, + uint32_t flags, + GNUNET_PSYC_MessageCallback message_cb, + GNUNET_PSYC_MessagePartCallback message_part_cb, + GNUNET_ResultCallback result_cb, void *cls) { - struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); - req->header.size = htons (sizeof (*req)); - req->start_message_id = GNUNET_htonll (start_message_id); - req->end_message_id = GNUNET_htonll (end_message_id); - req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); - - GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); + return channel_history_replay (chn, start_message_id, end_message_id, 0, + method_prefix, flags, + message_cb, message_part_cb, result_cb, cls); } @@ -1127,8 +1225,11 @@ GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn, * Which channel should be replayed? * @param message_limit * Maximum number of messages to replay. - * FIXME: @param method_prefix + * @param method_prefix * Retrieve only messages with a matching method prefix. + * Use NULL or "" to retrieve all. + * @param flags + * OR'ed enum GNUNET_PSYC_HistoryReplayFlags * @param result_cb * Function to call when the requested history has been fully replayed. * @param cls @@ -1136,20 +1237,78 @@ GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn, * * @return Handle to cancel history replay operation. */ -void +struct GNUNET_PSYC_HistoryRequest * GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn, uint64_t message_limit, - /* FIXME: const char *method_prefix, */ - GNUNET_PSYC_ResultCallback result_cb, + const char *method_prefix, + uint32_t flags, + GNUNET_PSYC_MessageCallback message_cb, + GNUNET_PSYC_MessagePartCallback message_part_cb, + GNUNET_ResultCallback result_cb, void *cls) { - struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); - req->header.size = htons (sizeof (*req)); - req->message_limit = GNUNET_htonll (message_limit); - req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); + return channel_history_replay (chn, 0, 0, message_limit, method_prefix, flags, + message_cb, message_part_cb, result_cb, cls); +} + + +void +GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel, + struct GNUNET_PSYC_HistoryRequest *hist) +{ + GNUNET_PSYC_receive_destroy (hist->recv); + GNUNET_CLIENT_MANAGER_op_cancel (hist->chn->client, hist->op_id); + GNUNET_free (hist); +} + + +/** + * Retrieve the best matching channel state variable. + * + * If the requested variable name is not present in the state, the nearest + * less-specific name is matched; for example, requesting "_a_b" will match "_a" + * if "_a_b" does not exist. + * + * @param channel + * Channel handle. + * @param full_name + * Full name of the requested variable. + * The actual variable returned might have a shorter name. + * @param var_cb + * Function called once when a matching state variable is found. + * Not called if there's no matching state variable. + * @param result_cb + * Function called after the operation finished. + * (i.e. all state variables have been returned via @a state_cb) + * @param cls + * Closure for the callbacks. + */ +static struct GNUNET_PSYC_StateRequest * +channel_state_get (struct GNUNET_PSYC_Channel *chn, + uint16_t type, const char *name, + GNUNET_PSYC_StateVarCallback var_cb, + GNUNET_ResultCallback result_cb, void *cls) +{ + struct StateRequest *req; + struct GNUNET_PSYC_StateRequest *sr = GNUNET_malloc (sizeof (*sr)); + sr->chn = chn; + sr->var_cb = var_cb; + sr->result_cb = result_cb; + sr->cls = cls; + sr->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client, + &op_recv_state_result, sr); + + GNUNET_assert (NULL != name); + size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE + - sizeof (*req)) + 1; + req = GNUNET_malloc (sizeof (*req) + name_size); + req->header.type = htons (type); + req->header.size = htons (sizeof (*req) + name_size); + req->op_id = GNUNET_htonll (sr->op_id); + memcpy (&req[1], name, name_size); GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); + return sr; } @@ -1174,21 +1333,16 @@ GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn, * @param cls * Closure for the callbacks. */ -void +struct GNUNET_PSYC_StateRequest * GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn, const char *full_name, GNUNET_PSYC_StateVarCallback var_cb, - GNUNET_PSYC_ResultCallback result_cb, + GNUNET_ResultCallback result_cb, void *cls) { - size_t name_size = strlen (full_name) + 1; - struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET); - req->header.size = htons (sizeof (*req) + name_size); - req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); - memcpy (&req[1], full_name, name_size); + return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, + full_name, var_cb, result_cb, cls); - GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); } @@ -1215,21 +1369,29 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn, * @param cls * Closure for the callbacks. */ -void +struct GNUNET_PSYC_StateRequest * GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn, const char *name_prefix, GNUNET_PSYC_StateVarCallback var_cb, - GNUNET_PSYC_ResultCallback result_cb, + GNUNET_ResultCallback result_cb, void *cls) { - size_t name_size = strlen (name_prefix) + 1; - struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); - req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET); - req->header.size = htons (sizeof (*req) + name_size); - req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); - memcpy (&req[1], name_prefix, name_size); + return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, + name_prefix, var_cb, result_cb, cls); +} - GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); + +/** + * Cancel a state request operation. + * + * @param sr + * Handle for the operation to cancel. + */ +void +GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr) +{ + GNUNET_CLIENT_MANAGER_op_cancel (sr->chn->client, sr->op_id); + GNUNET_free (sr); } /* end of psyc_api.c */ diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c index 961922ce4..ebbc2dad8 100644 --- a/src/psyc/psyc_util_lib.c +++ b/src/psyc/psyc_util_lib.c @@ -326,9 +326,13 @@ GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit) * The message part is added to the current message buffer. * When this buffer is full, it is added to the transmission queue. * - * @param tmit Transmission handle. - * @param msg Message part, or NULL. - * @param end End of message? #GNUNET_YES or #GNUNET_NO. + * @param tmit + * Transmission handle. + * @param msg + * Message part, or NULL. + * @param end + * End of message? + * #GNUNET_YES or #GNUNET_NO. */ static void transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, @@ -632,16 +636,24 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper, /** * Transmit a message. * - * @param tmit Transmission handle. - * @param method_name Which method should be invoked. - * @param env Environment for the message. - * Should stay available until the first call to notify_data. - * Can be NULL if there are no modifiers or @a notify_mod is provided instead. - * @param notify_mod Function to call to obtain modifiers. - * Can be NULL if there are no modifiers or @a env is provided instead. - * @param notify_data Function to call to obtain fragments of the data. - * @param notify_cls Closure for @a notify_mod and @a notify_data. - * @param flags Flags for the message being transmitted. + * @param tmit + * Transmission handle. + * @param method_name + * Which method should be invoked. + * @param env + * Environment for the message. + * Should stay available until the first call to notify_data. + * Can be NULL if there are no modifiers or @a notify_mod is + * provided instead. + * @param notify_mod + * Function to call to obtain modifiers. + * Can be NULL if there are no modifiers or @a env is provided instead. + * @param notify_data + * Function to call to obtain fragments of the data. + * @param notify_cls + * Closure for @a notify_mod and @a notify_data. + * @param flags + * Flags for the message being transmitted. * * @return #GNUNET_OK if the transmission was started. * #GNUNET_SYSERR if another transmission is already going on. diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 7160c13c6..ba31d9329 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c @@ -82,7 +82,7 @@ struct TransmitClosure struct TransmitClosure *tmit; -uint8_t join_req_count; +uint8_t join_req_count, end_count; enum { @@ -105,6 +105,9 @@ enum void master_transmit (); +void +master_history_replay_latest (); + void master_stopped (void *cls) { @@ -197,6 +200,134 @@ end () } +void +master_message_cb (void *cls, uint64_t message_id, uint32_t flags, + const struct GNUNET_PSYC_MessageHeader *msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Test #%d: Master got PSYC message fragment of size %u " + "belonging to message ID %" PRIu64 " with flags %x\n", + test, ntohs (msg->header.size), message_id, flags); + // FIXME +} + + +void +master_message_part_cb (void *cls, uint64_t message_id, + uint64_t data_offset, uint32_t flags, + const struct GNUNET_MessageHeader *msg) +{ + if (NULL == msg) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error while receiving message %" PRIu64 "\n", message_id); + return; + } + + uint16_t type = ntohs (msg->type); + uint16_t size = ntohs (msg->size); + + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Test #%d: Master got message part of type %u and size %u " + "belonging to message ID %" PRIu64 " with flags %x\n", + test, type, size, message_id, flags); + + switch (test) + { + case TEST_SLAVE_TRANSMIT: + if (GNUNET_PSYC_MESSAGE_REQUEST != flags) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Unexpected request flags: %x" PRIu32 "\n", flags); + GNUNET_assert (0); + return; + } + // FIXME: check rest of message + + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) + master_transmit (); + break; + + case TEST_MASTER_TRANSMIT: + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count) + master_history_replay_latest (); + break; + + case TEST_MASTER_HISTORY_REPLAY: + case TEST_MASTER_HISTORY_REPLAY_LATEST: + if (GNUNET_PSYC_MESSAGE_HISTORIC != flags) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", + test, flags); + GNUNET_assert (0); + return; + } + break; + + default: + GNUNET_assert (0); + } +} + + +void +slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, + const struct GNUNET_PSYC_MessageHeader *msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Test #%d: Slave got PSYC message fragment of size %u " + "belonging to message ID %" PRIu64 " with flags %x\n", + test, ntohs (msg->header.size), message_id, flags); + // FIXME +} + + +void +slave_message_part_cb (void *cls, uint64_t message_id, + uint64_t data_offset, uint32_t flags, + const struct GNUNET_MessageHeader *msg) +{ + if (NULL == msg) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error while receiving message " PRIu64 "\n", message_id); + return; + } + + uint16_t type = ntohs (msg->type); + uint16_t size = ntohs (msg->size); + + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Test #%d: Slave got message part of type %u and size %u " + "belonging to message ID %" PRIu64 " with flags %x\n", + test, type, size, message_id, flags); + + switch (test) + { + case TEST_MASTER_TRANSMIT: + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count) + master_history_replay_latest (); + break; + + case TEST_SLAVE_HISTORY_REPLAY: + case TEST_SLAVE_HISTORY_REPLAY_LATEST: + if (GNUNET_PSYC_MESSAGE_HISTORIC != flags) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", + flags); + GNUNET_assert (0); + return; + } + break; + + default: + GNUNET_assert (0); + } +} + + void state_get_var (void *cls, const char *name, const void *value, size_t value_size) { @@ -208,10 +339,12 @@ state_get_var (void *cls, const char *name, const void *value, size_t value_size /*** Slave state_get_prefix() ***/ void -slave_state_get_prefix_result (void *cls, int64_t result, const char *err_msg) +slave_state_get_prefix_result (void *cls, int64_t result, + const void *err_msg, uint16_t err_msg_size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "slave_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); + "slave_state_get_prefix:\t%" PRId64 " (%.s)\n", + result, err_msg_size, err_msg); // FIXME: GNUNET_assert (2 == result); end (); } @@ -230,7 +363,8 @@ slave_state_get_prefix () void -master_state_get_prefix_result (void *cls, int64_t result, const char *err_msg) +master_state_get_prefix_result (void *cls, int64_t result, + const void *err_msg, uint16_t err_msg_size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); @@ -252,10 +386,12 @@ master_state_get_prefix () void -slave_state_get_result (void *cls, int64_t result, const char *err_msg) +slave_state_get_result (void *cls, int64_t result, + const void *err_msg, uint16_t err_msg_size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "slave_state_get:\t%" PRId64 " (%s)\n", result, err_msg); + "slave_state_get:\t%" PRId64 " (%.*s)\n", + result, err_msg_size, err_msg); // FIXME: GNUNET_assert (2 == result); master_state_get_prefix (); } @@ -274,10 +410,12 @@ slave_state_get () void -master_state_get_result (void *cls, int64_t result, const char *err_msg) +master_state_get_result (void *cls, int64_t result, + const void *err_msg, uint16_t err_msg_size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "master_state_get:\t%" PRId64 " (%s)\n", result, err_msg); + "master_state_get:\t%" PRId64 " (%.*s)\n", + result, err_msg_size, err_msg); // FIXME: GNUNET_assert (1 == result); slave_state_get (); } @@ -295,10 +433,12 @@ master_state_get () /*** Slave history_replay() ***/ void -slave_history_replay_result (void *cls, int64_t result, const char *err_msg) +slave_history_replay_result (void *cls, int64_t result, + const void *err_msg, uint16_t err_msg_size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "slave_history_replay:\t%" PRId64 " (%s)\n", result, err_msg); + "slave_history_replay:\t%" PRId64 " (%.*s)\n", + result, err_msg_size, err_msg); GNUNET_assert (9 == result); master_state_get (); @@ -309,9 +449,11 @@ void slave_history_replay () { test = TEST_SLAVE_HISTORY_REPLAY; - GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, - &slave_history_replay_result, - NULL); + GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "", + GNUNET_PSYC_HISTORY_REPLAY_LOCAL, + &slave_message_cb, + &slave_message_part_cb, + &slave_history_replay_result, NULL); } @@ -319,10 +461,12 @@ slave_history_replay () void -master_history_replay_result (void *cls, int64_t result, const char *err_msg) +master_history_replay_result (void *cls, int64_t result, + const void *err_msg, uint16_t err_msg_size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "master_history_replay:\t%" PRId64 " (%s)\n", result, err_msg); + "master_history_replay:\t%" PRId64 " (%.*s)\n", + result, err_msg_size, err_msg); GNUNET_assert (9 == result); slave_history_replay (); @@ -333,9 +477,11 @@ void master_history_replay () { test = TEST_MASTER_HISTORY_REPLAY; - GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, - &master_history_replay_result, - NULL); + GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "", + GNUNET_PSYC_HISTORY_REPLAY_LOCAL, + &master_message_cb, + &master_message_part_cb, + &master_history_replay_result, NULL); } @@ -343,10 +489,12 @@ master_history_replay () void -slave_history_replay_latest_result (void *cls, int64_t result, const char *err_msg) +slave_history_replay_latest_result (void *cls, int64_t result, + const void *err_msg, uint16_t err_msg_size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "slave_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg); + "slave_history_replay_latest:\t%" PRId64 " (%.*s)\n", + result, err_msg_size, err_msg); GNUNET_assert (9 == result); master_history_replay (); @@ -357,7 +505,10 @@ void slave_history_replay_latest () { test = TEST_SLAVE_HISTORY_REPLAY_LATEST; - GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, + GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, "", + GNUNET_PSYC_HISTORY_REPLAY_LOCAL, + &slave_message_cb, + &slave_message_part_cb, &slave_history_replay_latest_result, NULL); } @@ -367,10 +518,12 @@ slave_history_replay_latest () void -master_history_replay_latest_result (void *cls, int64_t result, const char *err_msg) +master_history_replay_latest_result (void *cls, int64_t result, + const void *err_msg, uint16_t err_msg_size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "master_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg); + "master_history_replay_latest:\t%" PRId64 " (%.*s)\n", + result, err_msg_size, err_msg); GNUNET_assert (9 == result); slave_history_replay_latest (); @@ -381,138 +534,15 @@ void master_history_replay_latest () { test = TEST_MASTER_HISTORY_REPLAY_LATEST; - GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, + GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, "", + GNUNET_PSYC_HISTORY_REPLAY_LOCAL, + &master_message_cb, + &master_message_part_cb, &master_history_replay_latest_result, NULL); } -void -master_message_cb (void *cls, uint64_t message_id, uint32_t flags, - const struct GNUNET_PSYC_MessageHeader *msg) -{ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Test #%d: Master got PSYC message fragment of size %u " - "belonging to message ID %" PRIu64 " with flags %x\n", - test, ntohs (msg->header.size), message_id, flags); - // FIXME -} - - -void -master_message_part_cb (void *cls, uint64_t message_id, - uint64_t data_offset, uint32_t flags, - const struct GNUNET_MessageHeader *msg) -{ - if (NULL == msg) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error while receiving message %" PRIu64 "\n", message_id); - return; - } - - uint16_t type = ntohs (msg->type); - uint16_t size = ntohs (msg->size); - - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Test #%d: Master got message part of type %u and size %u " - "belonging to message ID %" PRIu64 " with flags %x\n", - test, type, size, message_id, flags); - - switch (test) - { - case TEST_SLAVE_TRANSMIT: - if (GNUNET_PSYC_MESSAGE_REQUEST != flags) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Unexpected request flags: %x" PRIu32 "\n", flags); - GNUNET_assert (0); - return; - } - // FIXME: check rest of message - - if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) - master_transmit (); - break; - - case TEST_MASTER_TRANSMIT: - break; - - case TEST_MASTER_HISTORY_REPLAY: - case TEST_MASTER_HISTORY_REPLAY_LATEST: - if (GNUNET_PSYC_MESSAGE_HISTORIC != flags) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", - flags); - GNUNET_assert (0); - return; - } - break; - - default: - GNUNET_assert (0); - } -} - - -void -slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, - const struct GNUNET_PSYC_MessageHeader *msg) -{ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Test #%d: Slave got PSYC message fragment of size %u " - "belonging to message ID %" PRIu64 " with flags %x\n", - test, ntohs (msg->header.size), message_id, flags); - // FIXME -} - - -void -slave_message_part_cb (void *cls, uint64_t message_id, - uint64_t data_offset, uint32_t flags, - const struct GNUNET_MessageHeader *msg) -{ - if (NULL == msg) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error while receiving message " PRIu64 "\n", message_id); - return; - } - - uint16_t type = ntohs (msg->type); - uint16_t size = ntohs (msg->size); - - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Test #%d: Slave got message part of type %u and size %u " - "belonging to message ID %" PRIu64 " with flags %x\n", - test, type, size, message_id, flags); - - switch (test) - { - case TEST_MASTER_TRANSMIT: - if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) - master_history_replay_latest (); - break; - - case TEST_SLAVE_HISTORY_REPLAY: - case TEST_SLAVE_HISTORY_REPLAY_LATEST: - if (GNUNET_PSYC_MESSAGE_HISTORIC != flags) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n", - flags); - GNUNET_assert (0); - return; - } - break; - - default: - GNUNET_assert (0); - } -} - - void transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { @@ -665,27 +695,31 @@ slave_transmit () tmit->data[0] = "slave test"; tmit->data_count = 1; tmit->slv_tmit - = GNUNET_PSYC_slave_transmit (slv, "_request_test", tmit_notify_mod, - tmit_notify_data, tmit, + = GNUNET_PSYC_slave_transmit (slv, "_request_test", &tmit_notify_mod, + &tmit_notify_data, tmit, GNUNET_PSYC_SLAVE_TRANSMIT_NONE); } void -slave_remove_cb (void *cls, int64_t result, const char *err_msg) +slave_remove_cb (void *cls, int64_t result, + const void *err_msg, uint16_t err_msg_size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "slave_remove:\t%" PRId64 " (%s)\n", result, err_msg); + "slave_remove:\t%" PRId64 " (%.*s)\n", + result, err_msg_size, err_msg); slave_transmit (); } void -slave_add_cb (void *cls, int64_t result, const char *err_msg) +slave_add_cb (void *cls, int64_t result, + const void *err_msg, uint16_t err_msg_size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "slave_add:\t%" PRId64 " (%s)\n", result, err_msg); + "slave_add:\t%" PRId64 " (%.*s)\n", + result, err_msg_size, err_msg); struct GNUNET_PSYC_Channel *chn = cls; GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2, @@ -775,6 +809,8 @@ master_transmit () { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n"); test = TEST_MASTER_TRANSMIT; + end_count = 0; + uint32_t i, j; char *name_max = "_test_max"; @@ -816,8 +852,8 @@ master_transmit () tmit->data_delay[1] = 3; tmit->data_count = 4; tmit->mst_tmit - = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod, - tmit_notify_data, tmit, + = GNUNET_PSYC_master_transmit (mst, "_notice_test", &tmit_notify_mod, + &tmit_notify_data, tmit, GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); } -- cgit v1.2.3