From 603d264417198385513b09844ddf4557dcc44952 Mon Sep 17 00:00:00 2001 From: Omar Tarabai Date: Wed, 7 May 2014 17:13:47 +0000 Subject: update to PEERSTORE api --- src/peerstore/peerstore_api.c | 270 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 234 insertions(+), 36 deletions(-) (limited to 'src/peerstore/peerstore_api.c') diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index a49d2a720..16cf70939 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c @@ -52,18 +52,28 @@ struct GNUNET_PEERSTORE_Handle /** * Head of transmission queue. */ - struct GNUNET_PEERSTORE_AddContext *ac_head; + struct GNUNET_PEERSTORE_RequestContext *rc_head; /** * Tail of transmission queue. */ - struct GNUNET_PEERSTORE_AddContext *ac_tail; + struct GNUNET_PEERSTORE_RequestContext *rc_tail; /** * Handle for the current transmission request, or NULL if none is pending. */ struct GNUNET_CLIENT_TransmitHandle *th; + /** + * Head of store requests DLL. + */ + struct GNUNET_PEERSTORE_StoreContext *sc_head; + + /** + * Tail of store requests DLL. + */ + struct GNUNET_PEERSTORE_StoreContext *sc_tail; + /** * ID for a reconnect task. */ @@ -80,17 +90,17 @@ struct GNUNET_PEERSTORE_Handle * Entry in the transmission queue to PEERSTORE service. * */ -struct GNUNET_PEERSTORE_AddContext +struct GNUNET_PEERSTORE_RequestContext { /** * This is a linked list. */ - struct GNUNET_PEERSTORE_AddContext *next; + struct GNUNET_PEERSTORE_RequestContext *next; /** * This is a linked list. */ - struct GNUNET_PEERSTORE_AddContext *prev; + struct GNUNET_PEERSTORE_RequestContext *prev; /** * Handle to the PEERSTORE service. @@ -114,6 +124,50 @@ struct GNUNET_PEERSTORE_AddContext }; +/** + * Context for a store request + * + */ +struct GNUNET_PEERSTORE_StoreContext +{ + /** + * Kept in a DLL. + */ + struct GNUNET_PEERSTORE_StoreContext *next; + + /** + * Kept in a DLL. + */ + struct GNUNET_PEERSTORE_StoreContext *prev; + + /** + * Handle to the PEERSTORE service. + */ + struct GNUNET_PEERSTORE_Handle *h; + + /** + * Our entry in the transmission queue. + */ + struct GNUNET_PEERSTORE_RequestContext *rc; + + /** + * Function to call with store operation result + */ + GNUNET_PEERSTORE_Continuation cont; + + /** + * Closure for 'cont'. + */ + void *cont_cls; + + /** + * Set to GNUNET_YES if we are currently receiving replies from the + * service. + */ + int request_transmitted; + +}; + /******************************************************************************/ /*********************** DECLARATIONS *************************/ /******************************************************************************/ @@ -213,7 +267,6 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h) GNUNET_CLIENT_disconnect (h->client); h->client = NULL; } - h->in_receive = GNUNET_NO; h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg); if (NULL == h->client) { @@ -238,25 +291,25 @@ static size_t do_transmit (void *cls, size_t size, void *buf) { struct GNUNET_PEERSTORE_Handle *h = cls; - struct GNUNET_PEERSTORE_AddContext *ac = h->ac_head; + struct GNUNET_PEERSTORE_RequestContext *rc = h->rc_head; size_t ret; h->th = NULL; - if (NULL == ac) - return 0; /* request was cancelled in the meantime */ + if (NULL == rc) + return 0; /* request was canceled in the meantime */ if (NULL == buf) { /* peerstore service died */ LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Failed to transmit message to `%s' service.\n", "PEERSTORE"); - GNUNET_CONTAINER_DLL_remove (h->ac_head, h->ac_tail, ac); + GNUNET_CONTAINER_DLL_remove (h->rc_head, h->rc_tail, rc); reconnect (h); - if (NULL != ac->cont) - ac->cont (ac->cont_cls, _("failed to transmit request (service down?)")); - GNUNET_free (ac); + if (NULL != rc->cont) + rc->cont (rc->cont_cls, _("failed to transmit request (service down?)")); + GNUNET_free (rc); return 0; } - ret = ac->size; + ret = rc->size; if (size < ret) { /* change in head of queue (i.e. cancel + add), try again */ @@ -265,12 +318,12 @@ do_transmit (void *cls, size_t size, void *buf) } LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting request of size %u to `%s' service.\n", ret, "PEERSTORE"); - memcpy (buf, &ac[1], ret); - GNUNET_CONTAINER_DLL_remove (h->ac_head, h->ac_tail, ac); + memcpy (buf, &rc[1], ret); + GNUNET_CONTAINER_DLL_remove (h->rc_head, h->rc_tail, rc); trigger_transmit (h); - if (NULL != ac->cont) - ac->cont (ac->cont_cls, NULL); - GNUNET_free (ac); + if (NULL != rc->cont) + rc->cont (rc->cont_cls, NULL); + GNUNET_free (rc); return ret; } @@ -283,9 +336,9 @@ do_transmit (void *cls, size_t size, void *buf) static void trigger_transmit (struct GNUNET_PEERSTORE_Handle *h) { - struct GNUNET_PEERSTORE_AddContext *ac; + struct GNUNET_PEERSTORE_RequestContext *rc; - if (NULL == (ac = h->ac_head)) + if (NULL == (rc = h->rc_head)) return; /* no requests queued */ if (NULL != h->th) return; /* request already pending */ @@ -296,18 +349,154 @@ trigger_transmit (struct GNUNET_PEERSTORE_Handle *h) return; } h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, ac->size, + GNUNET_CLIENT_notify_transmit_ready (h->client, rc->size, GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_YES, &do_transmit, h); } +/******************************************************************************/ +/******************* GENERAL FUNCTIONS *********************/ +/******************************************************************************/ + +/** + * Function called with server response message + * after a store operation is request + * + * @param cls a 'struct GNUNET_PEERSTORE_StoreContext' + * @param msg message received, NULL on timeout or fatal error + */ +static void +peerstore_handler (void *cls, const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_PEERSTORE_Handle *h = cls; + struct GNUNET_PEERSTORE_StoreContext *sc; + struct StoreResponseMessage *srm; + uint16_t response_type; + uint16_t response_size; + char *emsg; + + h->in_receive = GNUNET_NO; + if(NULL == msg) + { + reconnect(h); + return; + } + response_type = ntohs(msg->type); + response_size = ntohs(msg->size); + switch(response_type) + { + case GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT: + GNUNET_assert(response_size >= sizeof(struct GNUNET_MessageHeader) + sizeof(struct StoreResponseMessage)); + sc = h->sc_head; + if(NULL == sc) + { + LOG(GNUNET_ERROR_TYPE_ERROR, "Received a response to a non-existent store request\n"); + return; + } + GNUNET_PEERSTORE_store_cancel(sc); + trigger_transmit (h); + if (NULL != h->sc_head) + { + h->in_receive = GNUNET_YES; + GNUNET_CLIENT_receive (h->client, + &peerstore_handler, + h, + GNUNET_TIME_UNIT_FOREVER_REL); + } + if(NULL != sc->cont) + { + srm = (struct StoreResponseMessage *)&msg[1]; + emsg = NULL; + if(GNUNET_NO == ntohs(srm->success)) + { + emsg = GNUNET_malloc(ntohs(srm->emsg_size)); + memcpy(emsg, &srm[1], ntohs(srm->emsg_size)); + } + sc->cont(sc->cont_cls, emsg); + } + break; + } + +} + /******************************************************************************/ /******************* ADD FUNCTIONS *********************/ /******************************************************************************/ -struct GNUNET_PEERSTORE_AddContext * -GNUNET_PEERSTORE_add (struct GNUNET_PEERSTORE_Handle *h, +/** + * Cancel a store request + * + * @param sc Store request context + */ +void +GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) +{ + struct GNUNET_PEERSTORE_Handle *h; + + h = sc->h; + sc->cont = NULL; + if (GNUNET_YES == sc->request_transmitted) + return; /* need to finish processing */ + GNUNET_CONTAINER_DLL_remove (h->sc_head, + h->sc_tail, + sc); + if (NULL != sc->rc) + { + GNUNET_CONTAINER_DLL_remove (h->rc_head, h->rc_tail, sc->rc); + GNUNET_free (sc->rc); + } + GNUNET_free (sc); +} + +/** + * Called after store request is sent + * Waits for response from service + * + * @param cls a 'struct GNUNET_PEERSTORE_StoreContext' + * @parma emsg error message (or NULL) + */ +void store_receive_result(void *cls, const char *emsg) +{ + struct GNUNET_PEERSTORE_StoreContext *sc = cls; + struct GNUNET_PEERSTORE_Handle *h = sc->h; + + sc->rc = NULL; + if(NULL != emsg) + { + GNUNET_PEERSTORE_store_cancel (sc); + reconnect (h); + if (NULL != sc->cont) + sc->cont (sc->cont_cls, emsg); + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, "Waiting for response from `%s' service.\n", + "PEERSTORE"); + sc->request_transmitted = GNUNET_YES; + if (GNUNET_NO == h->in_receive) + { + h->in_receive = GNUNET_YES; + GNUNET_CLIENT_receive (h->client, + &peerstore_handler, + h, + GNUNET_TIME_UNIT_FOREVER_REL); + } +} + +/** + * Store a new entry in the PEERSTORE + * + * @param h Handle to the PEERSTORE service + * @param peer Peer Identity + * @param sub_system name of the sub system + * @param value entry value BLOB + * @param size size of 'value' + * @param lifetime relative time after which the entry is (possibly) deleted + * @param cont Continuation function after the store request is processed + * @param cont_cls Closure for 'cont' + */ +struct GNUNET_PEERSTORE_StoreContext * +GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, const struct GNUNET_PeerIdentity *peer, const char *sub_system, const void *value, @@ -316,8 +505,9 @@ GNUNET_PEERSTORE_add (struct GNUNET_PEERSTORE_Handle *h, GNUNET_PEERSTORE_Continuation cont, void *cont_cls) { - struct GNUNET_PEERSTORE_AddContext *ac; - struct AddEntryMessage *entry; + struct GNUNET_PEERSTORE_RequestContext *rc; + struct StoreRequestMessage *entry; + struct GNUNET_PEERSTORE_StoreContext *sc; char *ss; void *val; size_t sub_system_size; @@ -327,24 +517,32 @@ GNUNET_PEERSTORE_add (struct GNUNET_PEERSTORE_Handle *h, "Storing value (size: %lu) for subsytem `%s' and peer `%s'", size, sub_system, GNUNET_i2s (peer)); sub_system_size = strlen(sub_system); - request_size = sizeof(struct AddEntryMessage) + sub_system_size + size; - ac = GNUNET_malloc(sizeof(struct GNUNET_PEERSTORE_AddContext) + request_size); - ac->h = h; - ac->size = request_size; - entry = (struct AddEntryMessage *)&ac[1]; + request_size = sizeof(struct StoreRequestMessage) + sub_system_size + size; + rc = GNUNET_malloc(sizeof(struct GNUNET_PEERSTORE_RequestContext) + request_size); + rc->h = h; + rc->size = request_size; + entry = (struct StoreRequestMessage *)&rc[1]; entry->header.size = htons(request_size); - entry->header.type = htons(GNUNET_MESSAGE_TYPE_PEERSTORE_ADD); + entry->header.type = htons(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); entry->peer = *peer; - entry->sub_system_size = sub_system_size; - entry->value_size = size; + entry->sub_system_size = htons(sub_system_size); + entry->value_size = htons(size); entry->lifetime = lifetime; ss = (char *)&entry[1]; memcpy(ss, sub_system, sub_system_size); val = ss + sub_system_size; memcpy(val, value, size); - GNUNET_CONTAINER_DLL_insert_tail(h->ac_head, h->ac_tail, ac); + sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext); + sc->cont = cont; + sc->cont_cls = cont_cls; + sc->h = h; + sc->rc = rc; + rc->cont = &store_receive_result; + rc->cont_cls = sc; + GNUNET_CONTAINER_DLL_insert_tail(h->rc_head, h->rc_tail, rc); + GNUNET_CONTAINER_DLL_insert_tail(h->sc_head, h->sc_tail, sc); trigger_transmit (h); - return ac; + return sc; } -- cgit v1.2.3