From 3c9e8b1b0f7f83f27fefb02bd67b481c67cad0c8 Mon Sep 17 00:00:00 2001 From: Omar Tarabai Date: Fri, 16 May 2014 16:45:43 +0000 Subject: peerstore API now uses MQ --- src/include/gnunet_peerstore_service.h | 38 +++++++ src/peerstore/gnunet-service-peerstore.c | 12 +-- src/peerstore/peerstore_api.c | 178 +++++++++++++++++++++++++------ src/peerstore/peerstore_common.c | 63 ++++++++++- src/peerstore/peerstore_common.h | 44 +------- src/peerstore/test_peerstore_api.c | 2 +- 6 files changed, 252 insertions(+), 85 deletions(-) (limited to 'src') diff --git a/src/include/gnunet_peerstore_service.h b/src/include/gnunet_peerstore_service.h index 6fd059961..b802fb390 100644 --- a/src/include/gnunet_peerstore_service.h +++ b/src/include/gnunet_peerstore_service.h @@ -47,6 +47,44 @@ struct GNUNET_PEERSTORE_Handle; */ struct GNUNET_PEERSTORE_StoreContext; +/** + * Single PEERSTORE record + */ +struct GNUNET_PEERSTORE_Record +{ + + /** + * Responsible sub system string + */ + char *sub_system; + + /** + * Peer Identity + */ + struct GNUNET_PeerIdentity *peer; + + /** + * Record key string + */ + char *key; + + /** + * Record value BLOB + */ + void *value; + + /** + * Size of 'value' BLOB + */ + size_t value_size; + + /** + * Expiry time of entry + */ + struct GNUNET_TIME_Absolute *expiry; + +}; + /** * Continuation called with a status result. * diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c index be5394ff5..50af4342c 100644 --- a/src/peerstore/gnunet-service-peerstore.c +++ b/src/peerstore/gnunet-service-peerstore.c @@ -87,7 +87,7 @@ handle_client_disconnect (void *cls, * @param sub_system name of the GNUnet sub system responsible * @param value stored value * @param size size of stored value - */ + * int record_iterator(void *cls, const char *sub_system, const struct GNUNET_PeerIdentity *peer, @@ -108,7 +108,7 @@ int record_iterator(void *cls, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); GNUNET_SERVER_transmit_context_append_message(tc, (const struct GNUNET_MessageHeader *)srm); return GNUNET_YES; -} +}*/ /** * Handle an iterate request from client @@ -116,7 +116,7 @@ int record_iterator(void *cls, * @param cls unused * @param client identification of the client * @param message the actual message - */ + * void handle_iterate (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) @@ -147,7 +147,7 @@ void handle_iterate (void *cls, { } -} +}*/ /** * Handle a store request from client @@ -190,7 +190,7 @@ void handle_store (void *cls, record->key, record->value, record->value_size, - record->expiry)) + *record->expiry)) { response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK; } @@ -220,7 +220,7 @@ run (void *cls, { static const struct GNUNET_SERVER_MessageHandler handlers[] = { {&handle_store, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, 0}, - {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0}, +// {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0}, {NULL, NULL, 0, 0} }; char *database; diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index aa798e653..323ba45d0 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c @@ -50,6 +50,21 @@ struct GNUNET_PEERSTORE_Handle */ struct GNUNET_CLIENT_Connection *client; + /** + * Message queue + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Head of active STORE requests. + */ + struct GNUNET_PEERSTORE_StoreContext *store_head; + + /** + * Tail of active STORE requests. + */ + struct GNUNET_PEERSTORE_StoreContext *store_tail; + }; /** @@ -57,12 +72,26 @@ struct GNUNET_PEERSTORE_Handle */ struct GNUNET_PEERSTORE_StoreContext { + /** + * Kept in a DLL. + */ + struct GNUNET_PEERSTORE_StoreContext *next; + + /** + * Kept in a DLL. + */ + struct GNUNET_PEERSTORE_StoreContext *prev; /** * Handle to the PEERSTORE service. */ struct GNUNET_PEERSTORE_Handle *h; + /** + * MQ Envelope with store request message + */ + struct GNUNET_MQ_Envelope *ev; + /** * Continuation called with service response */ @@ -73,8 +102,26 @@ struct GNUNET_PEERSTORE_StoreContext */ void *cont_cls; + /** + * #GNUNET_YES / #GNUNET_NO + * if sent, cannot be canceled + */ + int request_sent; + }; +/******************************************************************************/ +/******************* DECLARATIONS *********************/ +/******************************************************************************/ + +/** + * When a response for store request is received + * + * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' + * @param msg message received, NULL on timeout or fatal error + */ +void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg); + /******************************************************************************/ /******************* CONNECTION FUNCTIONS *********************/ /******************************************************************************/ @@ -85,7 +132,7 @@ struct GNUNET_PEERSTORE_StoreContext * @param h handle to the service */ static void -reconnect (struct GNUNET_PEERSTORE_Handle *h) +reconnect (struct GNUNET_PEERSTORE_Handle *h) //FIXME: MQ friendly { LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); @@ -98,6 +145,12 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h) } +static void +handle_client_error (void *cls, enum GNUNET_MQ_Error error) //FIXME: implement +{ + //struct GNUNET_PEERSTORE_Handle *h = cls; +} + /** * Connect to the PEERSTORE service. * @@ -106,15 +159,30 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h) struct GNUNET_PEERSTORE_Handle * GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) { - struct GNUNET_CLIENT_Connection *client; struct GNUNET_PEERSTORE_Handle *h; + static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { + {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)}, + {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL, sizeof(struct GNUNET_MessageHeader)}, + GNUNET_MQ_HANDLERS_END + }; - client = GNUNET_CLIENT_connect ("peerstore", cfg); - if(NULL == client) - return NULL; h = GNUNET_new (struct GNUNET_PEERSTORE_Handle); - h->client = client; + h->client = GNUNET_CLIENT_connect ("peerstore", cfg); + if(NULL == h->client) + { + GNUNET_free(h); + return NULL; + } h->cfg = cfg; + h->mq = GNUNET_MQ_queue_for_connection_client(h->client, + mq_handlers, + &handle_client_error, + h); + if(NULL == h->mq) + { + GNUNET_free(h); + return NULL; + } LOG(GNUNET_ERROR_TYPE_DEBUG, "New connection created\n"); return h; } @@ -128,6 +196,11 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) void GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) { + if(NULL != h->mq) + { + GNUNET_MQ_destroy(h->mq); + h->mq = NULL; + } if (NULL != h->client) { GNUNET_CLIENT_disconnect (h->client); @@ -139,7 +212,7 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) /******************************************************************************/ -/******************* ADD FUNCTIONS *********************/ +/******************* STORE FUNCTIONS *********************/ /******************************************************************************/ /** @@ -148,41 +221,77 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' * @param msg message received, NULL on timeout or fatal error */ -void store_response_receiver (void *cls, const struct GNUNET_MessageHeader *msg) +void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg) //FIXME: MQ friendly { - struct GNUNET_PEERSTORE_StoreContext *sc = cls; + struct GNUNET_PEERSTORE_Handle *h = cls; + struct GNUNET_PEERSTORE_StoreContext *sc; uint16_t msg_type; + GNUNET_PEERSTORE_Continuation cont; + void *cont_cls; - if(NULL == sc->cont) + sc = h->store_head; + if(NULL == sc) + { + GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Unexpected store response, this should not happen.\n"); + reconnect(h); return; - if(NULL == msg) + } + cont = sc->cont; + cont_cls = sc->cont_cls; + GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc); + GNUNET_free(sc); + if(NULL == msg) /* Connection error */ { - sc->cont(sc->cont_cls, GNUNET_SYSERR); - reconnect(sc->h); + if(NULL != cont) + cont(cont_cls, GNUNET_SYSERR); + reconnect(h); return; } - msg_type = ntohs(msg->type); - if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type) - sc->cont(sc->cont_cls, GNUNET_OK); - else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type) - sc->cont(sc->cont_cls, GNUNET_SYSERR); - else + if(NULL != cont) /* Run continuation */ { - LOG(GNUNET_ERROR_TYPE_ERROR, "Invalid response from `PEERSTORE' service.\n"); - sc->cont(sc->cont_cls, GNUNET_SYSERR); + msg_type = ntohs(msg->type); + if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type) + cont(cont_cls, GNUNET_OK); + else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type) + cont(cont_cls, GNUNET_SYSERR); } } +/** + * Callback after MQ envelope is sent + * + * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' + */ +void store_request_sent (void *cls) +{ + struct GNUNET_PEERSTORE_StoreContext *sc = cls; + + sc->request_sent = GNUNET_YES; +} + /** * Cancel a store request * * @param sc Store request context */ void -GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) +GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) //FIXME: MQ friendly { - sc->cont = NULL; + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, + "Canceling store request.\n"); + if(GNUNET_NO == sc->request_sent) + { + if(NULL != sc->ev) + GNUNET_MQ_discard(sc->ev); + GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc); + GNUNET_free(sc); + } + else + { /* request already sent, will have to wait for response */ + sc->cont = NULL; + } + } /** @@ -209,29 +318,28 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, GNUNET_PEERSTORE_Continuation cont, void *cont_cls) { + struct GNUNET_MQ_Envelope *ev; struct GNUNET_PEERSTORE_StoreContext *sc; - struct StoreRecordMessage *srm; LOG (GNUNET_ERROR_TYPE_DEBUG, "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n", size, sub_system, GNUNET_i2s (peer), key); - sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext); - sc->cont = cont; - sc->cont_cls = cont_cls; - sc->h = h; - srm = PEERSTORE_create_record_message(sub_system, + ev = PEERSTORE_create_record_mq_envelope(sub_system, peer, key, value, size, expiry, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); - GNUNET_CLIENT_transmit_and_get_response(h->client, - (const struct GNUNET_MessageHeader *)srm, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &store_response_receiver, - sc); + GNUNET_MQ_send(h->mq, ev); + GNUNET_MQ_notify_sent(ev, &store_request_sent, ev); + sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext); + sc->ev = ev; + sc->cont = cont; + sc->cont_cls = cont_cls; + sc->h = h; + sc->request_sent = GNUNET_NO; + GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc); return sc; } diff --git a/src/peerstore/peerstore_common.c b/src/peerstore/peerstore_common.c index 6b8b80985..6c91d9802 100644 --- a/src/peerstore/peerstore_common.c +++ b/src/peerstore/peerstore_common.c @@ -36,7 +36,7 @@ * @param expiry absolute time after which the record expires * @param msg_type message type to be set in header * @return pointer to record message struct - */ + * struct StoreRecordMessage * PEERSTORE_create_record_message(const char *sub_system, const struct GNUNET_PeerIdentity *peer, @@ -83,6 +83,64 @@ PEERSTORE_create_record_message(const char *sub_system, memcpy(dummy, value, value_size); return srm; +}*/ + +/** + * Creates a MQ envelope for a single record + * + * @param sub_system sub system string + * @param peer Peer identity (can be NULL) + * @param key record key string (can be NULL) + * @param value record value BLOB (can be NULL) + * @param value_size record value size in bytes (set to 0 if value is NULL) + * @param expiry time after which the record expires + * @param msg_type message type to be set in header + * @return pointer to record message struct + */ +struct GNUNET_MQ_Envelope * +PEERSTORE_create_record_mq_envelope(const char *sub_system, + const struct GNUNET_PeerIdentity *peer, + const char *key, + const void *value, + size_t value_size, + struct GNUNET_TIME_Absolute expiry, + uint16_t msg_type) +{ + struct StoreRecordMessage *srm; + struct GNUNET_MQ_Envelope *ev; + size_t ss_size; + size_t key_size; + size_t msg_size; + void *dummy; + + ss_size = strlen(sub_system) + 1; + if(NULL == key) + key_size = 0; + else + key_size = strlen(key) + 1; + msg_size = ss_size + + key_size + + value_size; + ev = GNUNET_MQ_msg_extra(srm, msg_size, msg_type); + srm->key_size = htons(key_size); + srm->expiry = expiry; + if(NULL == peer) + srm->peer_set = htons(GNUNET_NO); + else + { + srm->peer_set = htons(GNUNET_YES); + srm->peer = *peer; + } + srm->sub_system_size = htons(ss_size); + srm->value_size = htons(value_size); + dummy = &srm[1]; + memcpy(dummy, sub_system, ss_size); + dummy += ss_size; + memcpy(dummy, key, key_size); + dummy += key_size; + memcpy(dummy, value, value_size); + + return ev; } /** @@ -118,7 +176,8 @@ PEERSTORE_parse_record_message(const struct GNUNET_MessageHeader *message) record->peer = GNUNET_new(struct GNUNET_PeerIdentity); memcpy(record->peer, &srm->peer, sizeof(struct GNUNET_PeerIdentity)); } - record->expiry = srm->expiry; + record->expiry = GNUNET_new(struct GNUNET_TIME_Absolute); + *(record->expiry) = srm->expiry; dummy = (char *)&srm[1]; if(ss_size > 0) { diff --git a/src/peerstore/peerstore_common.h b/src/peerstore/peerstore_common.h index 4795edbb1..a15302ed4 100644 --- a/src/peerstore/peerstore_common.h +++ b/src/peerstore/peerstore_common.h @@ -27,45 +27,7 @@ #include "peerstore.h" /** - * PEERSTORE single record - */ -struct GNUNET_PEERSTORE_Record -{ - - /** - * Responsible sub system string - */ - char *sub_system; - - /** - * Peer Identity - */ - struct GNUNET_PeerIdentity *peer; - - /** - * Record key string - */ - char *key; - - /** - * Record value BLOB - */ - void *value; - - /** - * Size of value BLOB - */ - size_t value_size; - - /** - * Expiry time of record - */ - struct GNUNET_TIME_Absolute expiry; - -}; - -/** - * Creates a record message ready to be sent + * Creates a MQ envelope for a single record * * @param sub_system sub system string * @param peer Peer identity (can be NULL) @@ -76,8 +38,8 @@ struct GNUNET_PEERSTORE_Record * @param msg_type message type to be set in header * @return pointer to record message struct */ -struct StoreRecordMessage * -PEERSTORE_create_record_message(const char *sub_system, +struct GNUNET_MQ_Envelope * +PEERSTORE_create_record_mq_envelope(const char *sub_system, const struct GNUNET_PeerIdentity *peer, const char *key, const void *value, diff --git a/src/peerstore/test_peerstore_api.c b/src/peerstore/test_peerstore_api.c index 781187865..74f3e70db 100644 --- a/src/peerstore/test_peerstore_api.c +++ b/src/peerstore/test_peerstore_api.c @@ -58,7 +58,7 @@ run (void *cls, "peerstore-test-key", val, val_size, - GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_TIME_UNIT_FOREVER_ABS, &store_cont, NULL); -- cgit v1.2.3