From 90c9abc573f95de334a1f61faa089e79046d2f11 Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Fri, 20 Apr 2012 17:18:14 +0000 Subject: - Rewritten DHT monitoring --- src/dht/dht_api.c | 238 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 200 insertions(+), 38 deletions(-) (limited to 'src/dht/dht_api.c') diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 1030b7999..3f850b3e7 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -171,9 +171,19 @@ struct GNUNET_DHT_MonitorHandle GNUNET_HashCode *key; /** - * Callback for each received message of interest. + * Callback for each received message of type get. */ - GNUNET_DHT_MonitorCB cb; + GNUNET_DHT_MonitorGetCB get_cb; + + /** + * Callback for each received message of type get response. + */ + GNUNET_DHT_MonitorGetRespCB get_resp_cb; + + /** + * Callback for each received message of type put. + */ + GNUNET_DHT_MonitorPutCB put_cb; /** * Closure for cb. @@ -533,63 +543,205 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) return GNUNET_YES; } - /** - * Process a monitoring message from the service. + * Process a get monitor message from the service. * * @param handle The DHT handle. - * @param msg Message from the service. + * @param msg Monitor get message from the service. * * @return GNUNET_OK if everything went fine, * GNUNET_SYSERR if the message is malformed. */ static int -process_monitor_message (struct GNUNET_DHT_Handle *handle, - const struct GNUNET_MessageHeader *msg) +process_monitor_get_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_MonitorGetMessage *msg) { - struct GNUNET_DHT_MonitorMessage *m; struct GNUNET_DHT_MonitorHandle *h; size_t msize; - if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET || - ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT) + msize = ntohs (msg->header.size); + if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage)) return GNUNET_SYSERR; - msize = ntohs (msg->size); - if (msize < sizeof (struct GNUNET_DHT_MonitorMessage)) + + h = handle->monitor_head; + while (NULL != h) + { + int type_ok; + int key_ok; + + type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); + key_ok = NULL == h->key || memcmp (h->key, &msg->key, + sizeof (GNUNET_HashCode)) == 0; + if (type_ok && key_ok && NULL != h->get_cb) + { + h->get_cb (h->cb_cls, + ntohl (msg->options), + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + ntohl (msg->hop_count), + ntohl (msg->desired_replication_level), + ntohl (msg->get_path_length), + (struct GNUNET_PeerIdentity *) &msg[1], + &msg->key); + } + h = h->next; + } + return GNUNET_OK; +} + + +/** + * Process a get response monitor message from the service. + * + * @param handle The DHT handle. + * @param msg Monitor get response message from the service. + * + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_MonitorGetRespMessage + *msg) +{ + struct GNUNET_DHT_MonitorHandle *h; + size_t msize; + + msize = ntohs (msg->header.size); + if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) return GNUNET_SYSERR; - m = (struct GNUNET_DHT_MonitorMessage *) msg; h = handle->monitor_head; while (NULL != h) { - if (h->type == ntohl(m->type) && - (NULL == h->key || - memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0)) + int type_ok; + int key_ok; + + type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); + key_ok = NULL == h->key || memcmp (h->key, &msg->key, + sizeof (GNUNET_HashCode)) == 0; + if (type_ok && key_ok && NULL != h->get_resp_cb) { struct GNUNET_PeerIdentity *path; uint32_t getl; uint32_t putl; - path = (struct GNUNET_PeerIdentity *) &m[1]; - getl = ntohl (m->get_path_length); - putl = ntohl (m->put_path_length); - h->cb (h->cb_cls, ntohs(msg->type), - GNUNET_TIME_absolute_ntoh(m->expiration), - &m->key, - &path[getl], putl, path, getl, - ntohl (m->desired_replication_level), - ntohl (m->options), ntohl (m->type), - (void *) &path[getl + putl], - ntohs (msg->size) - - sizeof (struct GNUNET_DHT_MonitorMessage) - - sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); + path = (struct GNUNET_PeerIdentity *) &msg[1]; + getl = ntohl (msg->get_path_length); + putl = ntohl (msg->put_path_length); + h->get_resp_cb (h->cb_cls, + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + path, getl, + &path[getl], putl, + GNUNET_TIME_absolute_ntoh(msg->expiration_time), + &msg->key, + (void *) &path[getl + putl], + msize - + sizeof (struct GNUNET_DHT_MonitorGetRespMessage) - + sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); } h = h->next; } + return GNUNET_OK; +} + + +/** + * Process a put monitor message from the service. + * + * @param handle The DHT handle. + * @param msg Monitor put message from the service. + * + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_monitor_put_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_MonitorPutMessage *msg) +{ + struct GNUNET_DHT_MonitorHandle *h; + size_t msize; + + msize = ntohs (msg->header.size); + if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage)) + return GNUNET_SYSERR; + + h = handle->monitor_head; + while (NULL != h) + { + int type_ok; + int key_ok; + + type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type); + key_ok = NULL == h->key || memcmp (h->key, &msg->key, + sizeof (GNUNET_HashCode)) == 0; + if (type_ok && key_ok && NULL != h->put_cb) + { + struct GNUNET_PeerIdentity *path; + uint32_t putl; + path = (struct GNUNET_PeerIdentity *) &msg[1]; + putl = ntohl (msg->put_path_length); + h->put_cb (h->cb_cls, + ntohl (msg->options), + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + ntohl (msg->hop_count), + ntohl (msg->desired_replication_level), + putl, path, + GNUNET_TIME_absolute_ntoh(msg->expiration_time), + &msg->key, + (void *) &path[putl], + msize - + sizeof (struct GNUNET_DHT_MonitorPutMessage) - + sizeof (struct GNUNET_PeerIdentity) * putl); + } + h = h->next; + } return GNUNET_OK; } + +/** + * Process a monitoring message from the service: demultiplex for proper type. + * + * @param handle The DHT handle. + * @param msg Message from the service. + * + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_monitor_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_MessageHeader *msg) +{ + switch (ntohs (msg->type)) + { + case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET: + return process_monitor_get_message(handle, + (struct GNUNET_DHT_MonitorGetMessage *) + msg); + + case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP: + { + return process_monitor_get_resp_message( + handle, + (struct GNUNET_DHT_MonitorGetRespMessage *) msg); + } + case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT: + { + return process_monitor_put_message(handle, + (struct GNUNET_DHT_MonitorPutMessage *) + msg); + } + case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP: + /* Not implemented yet */ + GNUNET_break(0); + /* Fall through */ + default: + GNUNET_break(0); + return GNUNET_SYSERR; + } +} + /** * Handler for messages received from the DHT service * a demultiplexer which handles numerous message types @@ -930,7 +1082,9 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) * @param handle Handle to the DHT service. * @param type Type of blocks that are of interest. * @param key Key of data of interest, NULL for all. - * @param cb Callback to process all monitored data. + * @param get_cb Callback to process monitored get messages. + * @param get_resp_cb Callback to process monitored get response messages. + * @param put_cb Callback to process monitored put messages. * @param cb_cls Closure for cb. * * @return Handle to stop monitoring. @@ -939,18 +1093,21 @@ struct GNUNET_DHT_MonitorHandle * GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, enum GNUNET_BLOCK_Type type, const GNUNET_HashCode *key, - GNUNET_DHT_MonitorCB cb, + GNUNET_DHT_MonitorGetCB get_cb, + GNUNET_DHT_MonitorGetRespCB get_resp_cb, + GNUNET_DHT_MonitorPutCB put_cb, void *cb_cls) { struct GNUNET_DHT_MonitorHandle *h; - struct GNUNET_DHT_MonitorMessage *m; + struct GNUNET_DHT_MonitorStartMessage *m; struct PendingMessage *pending; h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle)); GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h); - GNUNET_assert (NULL != cb); - h->cb = cb; + h->get_cb = get_cb; + h->get_resp_cb = get_resp_cb; + h->put_cb = put_cb; h->cb_cls = cb_cls; h->type = type; h->dht_handle = handle; @@ -960,17 +1117,22 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, memcpy (h->key, key, sizeof(GNUNET_HashCode)); } - pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) + + pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartMessage) + sizeof (struct PendingMessage)); - m = (struct GNUNET_DHT_MonitorMessage *) &pending[1]; + m = (struct GNUNET_DHT_MonitorStartMessage *) &pending[1]; pending->msg = &m->header; pending->handle = handle; pending->free_on_send = GNUNET_YES; m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); - m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorMessage)); + m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartMessage)); m->type = htonl(type); - if (NULL != key) + m->get = (NULL != get_cb); + m->get_resp = (NULL != get_resp_cb); + m->put = (NULL != put_cb); + if (NULL != key) { + m->filter_key = 1; memcpy (&m->key, key, sizeof(GNUNET_HashCode)); + } GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, pending); pending->in_pending_queue = GNUNET_YES; -- cgit v1.2.3