From c891e4d29ca772b6b246b928a1bda8d8c9ef499f Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 2 Jan 2022 20:46:06 +0100 Subject: -DHT: deduplicate monitor matching logic --- src/dht/gnunet-service-dht.h | 2 +- src/dht/gnunet-service-dht_clients.c | 539 +++++++++++++++++++++-------------- 2 files changed, 323 insertions(+), 218 deletions(-) (limited to 'src/dht') diff --git a/src/dht/gnunet-service-dht.h b/src/dht/gnunet-service-dht.h index d520cc905..e9b1ff63a 100644 --- a/src/dht/gnunet-service-dht.h +++ b/src/dht/gnunet-service-dht.h @@ -89,7 +89,7 @@ GDS_CLIENTS_handle_reply (const struct GDS_DATACACHE_BlockData *bd, * @param key Key of the requested data. */ void -GDS_CLIENTS_process_get (uint32_t options, +GDS_CLIENTS_process_get (enum GNUNET_DHT_RouteOption options, enum GNUNET_BLOCK_Type type, uint32_t hop_count, uint32_t desired_replication_level, diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c index b520cda41..8acde2fe7 100644 --- a/src/dht/gnunet-service-dht_clients.c +++ b/src/dht/gnunet-service-dht_clients.c @@ -855,78 +855,6 @@ handle_dht_local_get_stop ( } -/** - * Handler for monitor start messages - * - * @param cls the client we received this message from - * @param msg the actual message received - * - */ -static void -handle_dht_local_monitor (void *cls, - const struct GNUNET_DHT_MonitorStartStopMessage *msg) -{ - struct ClientHandle *ch = cls; - struct ClientMonitorRecord *r; - - r = GNUNET_new (struct ClientMonitorRecord); - r->ch = ch; - r->type = ntohl (msg->type); - r->get = ntohs (msg->get); - r->get_resp = ntohs (msg->get_resp); - r->put = ntohs (msg->put); - if (0 != ntohs (msg->filter_key)) - r->key = msg->key; - GNUNET_CONTAINER_DLL_insert (monitor_head, - monitor_tail, - r); - GNUNET_SERVICE_client_continue (ch->client); -} - - -/** - * Handler for monitor stop messages - * - * @param cls the client we received this message from - * @param msg the actual message received - */ -static void -handle_dht_local_monitor_stop ( - void *cls, - const struct GNUNET_DHT_MonitorStartStopMessage *msg) -{ - struct ClientHandle *ch = cls; - - GNUNET_SERVICE_client_continue (ch->client); - for (struct ClientMonitorRecord *r = monitor_head; - NULL != r; - r = r->next) - { - bool keys_match; - - keys_match = - (GNUNET_is_zero (&r->key)) - ? (0 == ntohs (msg->filter_key)) - : ( (0 != ntohs (msg->filter_key)) && - (! GNUNET_memcmp (&r->key, - &msg->key)) ); - if ( (ch == r->ch) && - (ntohl (msg->type) == r->type) && - (r->get == msg->get) && - (r->get_resp == msg->get_resp) && - (r->put == msg->put) && - keys_match) - { - GNUNET_CONTAINER_DLL_remove (monitor_head, - monitor_tail, - r); - GNUNET_free (r); - return; /* Delete only ONE entry */ - } - } -} - - /** * Closure for #forward_reply() */ @@ -1132,26 +1060,106 @@ GDS_CLIENTS_handle_reply (const struct GDS_DATACACHE_BlockData *bd, } +/* ************* logic for monitors ************** */ + + /** - * Check if some client is monitoring GET messages and notify - * them in that case. If tracked, @a path should include the local peer. + * Handler for monitor start messages + * + * @param cls the client we received this message from + * @param msg the actual message received * - * @param options Options, for instance RecordRoute, DemultiplexEverywhere. - * @param type The type of data in the request. - * @param hop_count Hop count so far. - * @param path_length number of entries in path (or 0 if not recorded). - * @param path peers on the GET path (or NULL if not recorded). - * @param desired_replication_level Desired replication level. - * @param key Key of the requested data. */ -void -GDS_CLIENTS_process_get (uint32_t options, - enum GNUNET_BLOCK_Type type, - uint32_t hop_count, - uint32_t desired_replication_level, - unsigned int path_length, - const struct GNUNET_PeerIdentity *path, - const struct GNUNET_HashCode *key) +static void +handle_dht_local_monitor (void *cls, + const struct GNUNET_DHT_MonitorStartStopMessage *msg) +{ + struct ClientHandle *ch = cls; + struct ClientMonitorRecord *r; + + r = GNUNET_new (struct ClientMonitorRecord); + r->ch = ch; + r->type = ntohl (msg->type); + r->get = ntohs (msg->get); + r->get_resp = ntohs (msg->get_resp); + r->put = ntohs (msg->put); + if (0 != ntohs (msg->filter_key)) + r->key = msg->key; + GNUNET_CONTAINER_DLL_insert (monitor_head, + monitor_tail, + r); + GNUNET_SERVICE_client_continue (ch->client); +} + + +/** + * Handler for monitor stop messages + * + * @param cls the client we received this message from + * @param msg the actual message received + */ +static void +handle_dht_local_monitor_stop ( + void *cls, + const struct GNUNET_DHT_MonitorStartStopMessage *msg) +{ + struct ClientHandle *ch = cls; + + GNUNET_SERVICE_client_continue (ch->client); + for (struct ClientMonitorRecord *r = monitor_head; + NULL != r; + r = r->next) + { + bool keys_match; + + keys_match = + (GNUNET_is_zero (&r->key)) + ? (0 == ntohs (msg->filter_key)) + : ( (0 != ntohs (msg->filter_key)) && + (! GNUNET_memcmp (&r->key, + &msg->key)) ); + if ( (ch == r->ch) && + (ntohl (msg->type) == r->type) && + (r->get == msg->get) && + (r->get_resp == msg->get_resp) && + (r->put == msg->put) && + keys_match) + { + GNUNET_CONTAINER_DLL_remove (monitor_head, + monitor_tail, + r); + GNUNET_free (r); + return; /* Delete only ONE entry */ + } + } +} + + +/** + * Function to call by #for_matching_monitors(). + * + * @param cls closure + * @param m a matching monitor + */ +typedef void +(*MonitorAction)(void *cls, + struct ClientMonitorRecord *m); + + +/** + * Call @a cb on all monitors that watch for blocks of @a type + * and key @a key. + * + * @param type the type to match + * @param key the key to match + * @param cb function to call + * @param cb_cls closure for @a cb + */ +static void +for_matching_monitors (enum GNUNET_BLOCK_Type type, + const struct GNUNET_HashCode *key, + MonitorAction cb, + void *cb_cls) { struct ClientHandle **cl = NULL; unsigned int cl_size = 0; @@ -1161,16 +1169,12 @@ GDS_CLIENTS_process_get (uint32_t options, m = m->next) { if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) || - (m->type == type)) && + (m->type == type) ) && ( (GNUNET_is_zero (&m->key)) || (0 == GNUNET_memcmp (key, - &m->key)))) + &m->key)) ) ) { - struct GNUNET_MQ_Envelope *env; - struct GNUNET_DHT_MonitorGetMessage *mmsg; - struct GNUNET_PeerIdentity *msg_path; - size_t msize; unsigned int i; /* Don't send duplicates */ @@ -1182,87 +1186,230 @@ GDS_CLIENTS_process_get (uint32_t options, GNUNET_array_append (cl, cl_size, m->ch); - msize = path_length * sizeof(struct GNUNET_PeerIdentity); - env = GNUNET_MQ_msg_extra (mmsg, - msize, - GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); - mmsg->options = htonl (options); - mmsg->type = htonl (type); - mmsg->hop_count = htonl (hop_count); - mmsg->desired_replication_level = htonl (desired_replication_level); - mmsg->get_path_length = htonl (path_length); - mmsg->key = *key; - msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; - GNUNET_memcpy (msg_path, - path, - path_length * sizeof(struct GNUNET_PeerIdentity)); - GNUNET_MQ_send (m->ch->mq, - env); + cb (cb_cls, + m); } } GNUNET_free (cl); } +/** + * Closure for #get_action(); + */ +struct GetActionContext +{ + enum GNUNET_DHT_RouteOption options; + enum GNUNET_BLOCK_Type type; + uint32_t hop_count; + uint32_t desired_replication_level; + unsigned int get_path_length; + const struct GNUNET_PeerIdentity *get_path; + const struct GNUNET_HashCode *key; +}; + + +/** + * Function called on monitors that match a GET. + * Sends the GET notification to the monitor. + * + * @param cls a `struct GetActionContext` + * @param m a matching monitor + */ +static void +get_action (void *cls, + struct ClientMonitorRecord *m) +{ + struct GetActionContext *gac = cls; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_DHT_MonitorGetMessage *mmsg; + struct GNUNET_PeerIdentity *msg_path; + size_t msize; + + msize = gac->get_path_length * sizeof(struct GNUNET_PeerIdentity); + env = GNUNET_MQ_msg_extra (mmsg, + msize, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); + mmsg->options = htonl (gac->options); + mmsg->type = htonl (gac->type); + mmsg->hop_count = htonl (gac->hop_count); + mmsg->desired_replication_level = htonl (gac->desired_replication_level); + mmsg->get_path_length = htonl (gac->get_path_length); + mmsg->key = *gac->key; + msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; + GNUNET_memcpy (msg_path, + gac->get_path, + gac->get_path_length * sizeof(struct GNUNET_PeerIdentity)); + GNUNET_MQ_send (m->ch->mq, + env); +} + + +/** + * Check if some client is monitoring GET messages and notify + * them in that case. If tracked, @a path should include the local peer. + * + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the GET path (or NULL if not recorded). + * @param desired_replication_level Desired replication level. + * @param key Key of the requested data. + */ +void +GDS_CLIENTS_process_get (enum GNUNET_DHT_RouteOption options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + const struct GNUNET_HashCode *key) +{ + struct GetActionContext gac = { + .options = options, + .type = type, + .hop_count = hop_count, + .desired_replication_level = desired_replication_level, + .get_path_length = path_length, + .get_path = path, + .key = key + }; + + for_matching_monitors (type, + key, + &get_action, + &gac); +} + + +/** + * Closure for response_action(). + */ +struct ResponseActionContext +{ + const struct GDS_DATACACHE_BlockData *bd; + const struct GNUNET_PeerIdentity *get_path; + unsigned int get_path_length; +}; + + +/** + * Function called on monitors that match a response. + * Sends the response notification to the monitor. + * + * @param cls a `struct ResponseActionContext` + * @param m a matching monitor + */ +static void +response_action (void *cls, + struct ClientMonitorRecord *m) +{ + const struct ResponseActionContext *resp_ctx = cls; + const struct GDS_DATACACHE_BlockData *bd = resp_ctx->bd; + + struct GNUNET_MQ_Envelope *env; + struct GNUNET_DHT_MonitorGetRespMessage *mmsg; + struct GNUNET_PeerIdentity *path; + size_t msize; + + msize = bd->data_size; + msize += (resp_ctx->get_path_length + bd->put_path_length) + * sizeof(struct GNUNET_PeerIdentity); + env = GNUNET_MQ_msg_extra (mmsg, + msize, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP); + mmsg->type = htonl (bd->type); + mmsg->put_path_length = htonl (bd->put_path_length); + mmsg->get_path_length = htonl (resp_ctx->get_path_length); + mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time); + mmsg->key = bd->key; + path = (struct GNUNET_PeerIdentity *) &mmsg[1]; + GNUNET_memcpy (path, + bd->put_path, + bd->put_path_length * sizeof(struct GNUNET_PeerIdentity)); + GNUNET_memcpy (path, + resp_ctx->get_path, + resp_ctx->get_path_length * sizeof(struct + GNUNET_PeerIdentity)); + GNUNET_memcpy (&path[resp_ctx->get_path_length], + bd->data, + bd->data_size); + GNUNET_MQ_send (m->ch->mq, + env); +} + + void GDS_CLIENTS_process_get_resp (const struct GDS_DATACACHE_BlockData *bd, const struct GNUNET_PeerIdentity *get_path, unsigned int get_path_length) { - struct ClientHandle **cl = NULL; - unsigned int cl_size = 0; + struct ResponseActionContext rac = { + .bd = bd, + .get_path = get_path, + .get_path_length = get_path_length + }; - for (struct ClientMonitorRecord *m = monitor_head; - NULL != m; - m = m->next) - { - if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) || - (m->type == bd->type) ) && - ( (GNUNET_is_zero (&m->key)) || - (0 == GNUNET_memcmp (&bd->key, - &m->key)) ) ) - { - struct GNUNET_MQ_Envelope *env; - struct GNUNET_DHT_MonitorGetRespMessage *mmsg; - struct GNUNET_PeerIdentity *path; - size_t msize; - unsigned int i; + for_matching_monitors (bd->type, + &bd->key, + &response_action, + &rac); +} - /* Don't send duplicates */ - for (i = 0; i < cl_size; i++) - if (cl[i] == m->ch) - break; - if (i < cl_size) - continue; - GNUNET_array_append (cl, - cl_size, - m->ch); - msize = bd->data_size; - msize += (get_path_length + bd->put_path_length) - * sizeof(struct GNUNET_PeerIdentity); - env = GNUNET_MQ_msg_extra (mmsg, - msize, - GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP); - mmsg->type = htonl (bd->type); - mmsg->put_path_length = htonl (bd->put_path_length); - mmsg->get_path_length = htonl (get_path_length); - mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time); - mmsg->key = bd->key; - path = (struct GNUNET_PeerIdentity *) &mmsg[1]; - GNUNET_memcpy (path, - bd->put_path, - bd->put_path_length * sizeof(struct GNUNET_PeerIdentity)); - GNUNET_memcpy (path, - get_path, - get_path_length * sizeof(struct GNUNET_PeerIdentity)); - GNUNET_memcpy (&path[get_path_length], - bd->data, - bd->data_size); - GNUNET_MQ_send (m->ch->mq, - env); - } - } - GNUNET_free (cl); + +/** + * Closure for put_action(). + */ +struct PutActionContext +{ + const struct GDS_DATACACHE_BlockData *bd; + enum GNUNET_DHT_RouteOption options; + uint32_t hop_count; + uint32_t desired_replication_level; +}; + + +/** + * Function called on monitors that match a PUT. + * Sends the PUT notification to the monitor. + * + * @param cls a `struct PutActionContext` + * @param m a matching monitor + */ +static void +put_action (void *cls, + struct ClientMonitorRecord *m) +{ + const struct PutActionContext *put_ctx = cls; + const struct GDS_DATACACHE_BlockData *bd = put_ctx->bd; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_DHT_MonitorPutMessage *mmsg; + struct GNUNET_PeerIdentity *msg_path; + size_t msize; + + msize = bd->data_size + + bd->put_path_length + * sizeof(struct GNUNET_PeerIdentity); + env = GNUNET_MQ_msg_extra (mmsg, + msize, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT); + mmsg->options = htonl (put_ctx->options); + mmsg->type = htonl (bd->type); + mmsg->hop_count = htonl (put_ctx->hop_count); + mmsg->desired_replication_level = htonl (put_ctx->desired_replication_level); + mmsg->put_path_length = htonl (bd->put_path_length); + mmsg->key = bd->key; + mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time); + msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; + GNUNET_memcpy (msg_path, + bd->put_path, + bd->put_path_length * sizeof(struct GNUNET_PeerIdentity)); + GNUNET_memcpy (&msg_path[bd->put_path_length], + bd->data, + bd->data_size); + GNUNET_MQ_send (m->ch->mq, + env); } @@ -1272,59 +1419,17 @@ GDS_CLIENTS_process_put (enum GNUNET_DHT_RouteOption options, uint32_t hop_count, uint32_t desired_replication_level) { - struct ClientHandle **cl = NULL; - unsigned int cl_size = 0; - - for (struct ClientMonitorRecord *m = monitor_head; - NULL != m; - m = m->next) - { - if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) || - (m->type == bd->type) ) && - ( (GNUNET_is_zero (&m->key)) || - (0 == - GNUNET_memcmp (&bd->key, - &m->key)) ) ) - { - struct GNUNET_MQ_Envelope *env; - struct GNUNET_DHT_MonitorPutMessage *mmsg; - struct GNUNET_PeerIdentity *msg_path; - size_t msize; - unsigned int i; + struct PutActionContext put_ctx = { + .bd = bd, + .hop_count = hop_count, + .desired_replication_level = desired_replication_level, + .options = options + }; - /* Don't send duplicates */ - for (i = 0; i < cl_size; i++) - if (cl[i] == m->ch) - break; - if (i < cl_size) - continue; - GNUNET_array_append (cl, - cl_size, - m->ch); - msize = bd->data_size; - msize += bd->put_path_length * sizeof(struct GNUNET_PeerIdentity); - env = GNUNET_MQ_msg_extra (mmsg, - msize, - GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT); - mmsg->options = htonl (options); - mmsg->type = htonl (bd->type); - mmsg->hop_count = htonl (hop_count); - mmsg->desired_replication_level = htonl (desired_replication_level); - mmsg->put_path_length = htonl (bd->put_path_length); - mmsg->key = bd->key; - mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time); - msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; - GNUNET_memcpy (msg_path, - bd->put_path, - bd->put_path_length * sizeof(struct GNUNET_PeerIdentity)); - GNUNET_memcpy (&msg_path[bd->put_path_length], - bd->data, - bd->data_size); - GNUNET_MQ_send (m->ch->mq, - env); - } - } - GNUNET_free (cl); + for_matching_monitors (bd->type, + &bd->key, + &put_action, + &put_ctx); } @@ -1334,7 +1439,7 @@ GDS_CLIENTS_process_put (enum GNUNET_DHT_RouteOption options, * @param server the initialized server */ static void -GDS_CLIENTS_init () +GDS_CLIENTS_init (void) { forward_map = GNUNET_CONTAINER_multihashmap_create (1024, @@ -1348,7 +1453,7 @@ GDS_CLIENTS_init () * Shutdown client subsystem. */ static void -GDS_CLIENTS_stop () +GDS_CLIENTS_stop (void) { if (NULL != retry_task) { -- cgit v1.2.3