From 2dd4ee49cb5718ebc55e28ae93d21f2aeb5ad7da Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Fri, 7 Oct 2011 13:05:48 +0000 Subject: Refactored connection to peers, cancelation of transmissions --- src/mesh/gnunet-service-mesh.c | 536 +++++++++++++++++++++++++---------------- 1 file changed, 324 insertions(+), 212 deletions(-) (limited to 'src/mesh') diff --git a/src/mesh/gnunet-service-mesh.c b/src/mesh/gnunet-service-mesh.c index a35ef9a66..612cade81 100644 --- a/src/mesh/gnunet-service-mesh.c +++ b/src/mesh/gnunet-service-mesh.c @@ -320,6 +320,11 @@ struct MeshPathInfo * Path itself */ struct MeshPeerPath *path; + + /** + * Position in peer's transmit queue + */ + unsigned int pos; }; @@ -620,13 +625,168 @@ dht_get_id_handler (void *cls, struct GNUNET_TIME_Absolute exp, /****************** GENERAL HELPER FUNCTIONS ************************/ /******************************************************************************/ +/** + * Check if client has registered with the service and has not disconnected + * + * @param client the client to check + * + * @return non-NULL if client exists in the global DLL + */ +static struct MeshClient * +client_get (struct GNUNET_SERVER_Client *client) +{ + struct MeshClient *c; + + c = clients; + while (NULL != c) + { + if (c->handle == client) + return c; + c = c->next; + } + return NULL; +} + + +/** + * Checks if a given client has subscribed to certain message type + * + * @param message_type Type of message to check + * @param c Client to check + * + * @return GNUNET_YES or GNUNET_NO, depending on subscription status + * + * TODO inline? + */ +static int +client_is_subscribed (uint16_t message_type, struct MeshClient *c) +{ + GNUNET_HashCode hc; + + GNUNET_CRYPTO_hash (&message_type, sizeof (uint16_t), &hc); + return GNUNET_CONTAINER_multihashmap_contains (c->types, &hc); +} + + +/** + * Send the message to all clients that have subscribed to its type + * + * @param msg Pointer to the message itself + * @return number of clients this message was sent to + */ +static unsigned int +send_subscribed_clients (struct GNUNET_MessageHeader *msg) +{ + struct MeshClient *c; + unsigned int count; + uint16_t type; + + type = ntohs (msg->type); + for (count = 0, c = clients; c != NULL; c = c->next) + { + if (client_is_subscribed (type, c)) + { + count++; + GNUNET_SERVER_notification_context_unicast (nc, c->handle, msg, + GNUNET_YES); + } + } + return count; +} + + +/** + * Notify the client that owns the tunnel that a peer has connected to it + * + * @param t Tunnel whose owner to notify + * @param id Short id of the peer that has connected + */ +static void +send_client_peer_connected (const struct MeshTunnel *t, const GNUNET_PEER_Id id) +{ + struct GNUNET_MESH_PeerControl pc; + + pc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_PEER_ADD); + pc.header.size = htons (sizeof (struct GNUNET_MESH_PeerControl)); + pc.tunnel_id = htonl (t->local_tid); + GNUNET_PEER_resolve (id, &pc.peer); + GNUNET_SERVER_notification_context_unicast (nc, t->client->handle, + &pc.header, GNUNET_NO); +} + + +/** + * Cancel a core transmission that was already requested and free all resources + * associated to the request. + * + * @param peer PeeInfo of the peer whose transmission is cancelled. + * @param i Position of the transmission to be cancelled. + */ +static void +peer_info_cancel_transmission(struct MeshPeerInfo *peer, unsigned int i) +{ + if (peer->core_transmit[i]) + { + struct MeshDataDescriptor *dd; + struct MeshPathInfo *path_info; + GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit[i]); + /* TODO: notify that tranmission has failed */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "MESH: Cancelled data transmission at %u\n", + i); + switch (peer->types[i]) + { + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: type payload\n"); + dd = peer->infos[i]; + if (0 == --(*dd->copies)) + { + GNUNET_free (dd->copies); + GNUNET_free (dd->data); + } + break; + case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: type create path\n"); + path_info = peer->infos[i]; + path_destroy(path_info->path); + break; + } + GNUNET_free (peer->infos[i]); + } +} + + +/** + * + */ +static unsigned int +peer_info_transmit_position (struct MeshPeerInfo *peer) +{ + unsigned int i; + + for (i = 0; peer->core_transmit[i]; i++) + { + if (i == (CORE_QUEUE_SIZE - 1)) + { + /* All positions are taken! Overwriting! */ + GNUNET_break (0); + peer_info_cancel_transmission(peer, 0); + return 0; + } + } + return i; +} + + /** * Retrieve the MeshPeerInfo stucture associated with the peer, create one * and insert it in the appropiate structures if the peer is not known yet. * - * @param peer Identity of the peer + * @param peer Full identity of the peer. * - * @return Existing or newly created peer info + * @return Existing or newly created peer info. */ static struct MeshPeerInfo * peer_info_get (const struct GNUNET_PeerIdentity *peer) @@ -647,6 +807,113 @@ peer_info_get (const struct GNUNET_PeerIdentity *peer) } +/** + * Retrieve the MeshPeerInfo stucture associated with the peer, create one + * and insert it in the appropiate structures if the peer is not known yet. + * + * @param peer Short identity of the peer. + * + * @return Existing or newly created peer info. + */ +static struct MeshPeerInfo * +peer_info_get_short (const GNUNET_PEER_Id peer) +{ + struct GNUNET_PeerIdentity id; + + GNUNET_PEER_resolve(peer, &id); + return peer_info_get(&id); +} + + +/** + * Function called to notify a client about the socket + * being ready to queue more data. "buf" will be + * NULL and "size" zero if the socket was closed for + * writing in the meantime. + * + * @param cls closure + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +send_core_create_path (void *cls, size_t size, void *buf); + + +/** + * Try to establish a new connection to this peer. + * Use the best path for the given tunnel. + * If the peer doesn't have any path to it yet, try to get one. + * If the peer already has some path, send a CREATE PATH towards it. + * + * @param peer PeerInfo of the peer. + * @param t Tunnel for which to create the path, if possible. + */ +static void +peer_info_connect (struct MeshPeerInfo *peer, struct MeshTunnel *t) +{ + struct MeshPeerPath *p; + struct MeshPathInfo *path_info; + struct MeshPeerInfo *neighbor; + + if (NULL != peer->path_head) + { + p = tree_get_path_to_peer(t->tree, peer->id); + if (p->length > 1) + { + struct GNUNET_PeerIdentity *id; + + path_info = GNUNET_malloc (sizeof (struct MeshPathInfo)); + path_info->path = p; + path_info->peer = peer; + path_info->t = t; + id = path_get_first_hop(t->tree, peer->id); + neighbor = peer_info_get(id); + path_info->pos = peer_info_transmit_position(neighbor); + neighbor->types[path_info->pos] = GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE; + neighbor->infos[path_info->pos] = path_info; + neighbor->core_transmit[path_info->pos] = + GNUNET_CORE_notify_transmit_ready ( + core_handle, /* handle */ + 0, /* cork */ + 0, /* priority */ + GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */ + id, /* target */ + sizeof (struct GNUNET_MESH_ManipulatePath) + + (p->length * sizeof (struct GNUNET_PeerIdentity)), /*size */ + &send_core_create_path, /* callback */ + path_info); /* cls */ + } + else + { + send_client_peer_connected(t, myid); + } + } + else if (NULL == peer->dhtget) + { + struct GNUNET_PeerIdentity id; + + GNUNET_PEER_resolve(peer->id, &id); + path_info = GNUNET_malloc(sizeof(struct MeshPathInfo)); + path_info->peer = peer; + path_info->t = t; + peer->dhtget = + GNUNET_DHT_get_start(dht_handle, /* handle */ + GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */ + GNUNET_BLOCK_TYPE_TEST, /* type */ + &id.hashPubKey, /* key to search */ + 4, /* replication level */ + GNUNET_DHT_RO_RECORD_ROUTE | + GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, + NULL, /* xquery */ + 0, /* xquery bits */ + &dht_get_id_handler, + path_info); + } + /* Otherwise, there is no path but the DHT get is already started. */ +} + + #if LATER /** * Destroy the peer_info and free any allocated resources linked to it @@ -749,8 +1016,7 @@ path_remove_from_peer (struct MeshPeerInfo *peer, * Trivial immiediate fix: try to reconnect to the disconnected node. All * its children will be reachable trough him. */ - GNUNET_PEER_resolve(d, &id); - peer_d = peer_info_get(&id); + peer_d = peer_info_get_short(d); best = UINT_MAX; aux = NULL; for (p = peer_d->path_head; NULL != p; p = p->next) @@ -817,6 +1083,11 @@ path_add_to_peer (struct MeshPeerInfo *peer_info, struct MeshPeerPath *path) } l = path_get_length (path); + if (0 == l) + { + GNUNET_free (path); + return; + } for (aux = peer_info->path_head; aux != NULL; aux = aux->next) { @@ -962,49 +1233,6 @@ void path_refresh (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); -/** - * Check if client has registered with the service and has not disconnected - * - * @param client the client to check - * - * @return non-NULL if client exists in the global DLL - */ -static struct MeshClient * -client_get (struct GNUNET_SERVER_Client *client) -{ - struct MeshClient *c; - - c = clients; - while (NULL != c) - { - if (c->handle == client) - return c; - c = c->next; - } - return NULL; -} - - -/** - * Checks if a given client has subscribed to certain message type - * - * @param message_type Type of message to check - * @param c Client to check - * - * @return GNUNET_YES or GNUNET_NO, depending on subscription status - * - * TODO inline? - */ -static int -client_is_subscribed (uint16_t message_type, struct MeshClient *c) -{ - GNUNET_HashCode hc; - - GNUNET_CRYPTO_hash (&message_type, sizeof (uint16_t), &hc); - return GNUNET_CONTAINER_multihashmap_contains (c->types, &hc); -} - - /** * Search for a tunnel among the tunnels for a client * @@ -1069,24 +1297,27 @@ tunnel_get (struct GNUNET_PeerIdentity *oid, MESH_TunnelNumber tid) void notify_peer_disconnected (const struct MeshTunnelTreeNode *n) { - struct GNUNET_MESH_PeerControl msg; - - if (NULL == n->t->client || NULL == nc) - return; + struct MeshPeerInfo *peer; - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_PEER_DEL); - msg.tunnel_id = htonl (n->t->local_tid); - GNUNET_PEER_resolve (n->peer, &msg.peer); - GNUNET_SERVER_notification_context_unicast (nc, n->t->client->handle, - &msg.header, GNUNET_NO); + if (NULL != n->t->client && NULL != nc) + { + struct GNUNET_MESH_PeerControl msg; + msg.header.size = htons (sizeof (msg)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_PEER_DEL); + msg.tunnel_id = htonl (n->t->local_tid); + GNUNET_PEER_resolve (n->peer, &msg.peer); + GNUNET_SERVER_notification_context_unicast (nc, n->t->client->handle, + &msg.header, GNUNET_NO); + } + peer = peer_info_get_short(n->peer); + peer_info_connect(peer, n->t); } /** * Add a peer to a tunnel, accomodating paths accordingly and initializing all * needed rescources. - * If peer already exists, do nothing. + * If peer already exists, reevaluate shortest path and change if different. * * @param t Tunnel we want to add a new peer to * @param peer PeerInfo of the peer being added @@ -1095,40 +1326,48 @@ notify_peer_disconnected (const struct MeshTunnelTreeNode *n) static void tunnel_add_peer (struct MeshTunnel *t, struct MeshPeerInfo *peer) { - struct MeshPeerPath *p; + struct GNUNET_PeerIdentity id; struct MeshPeerPath *best_p; + struct MeshPeerPath *p; unsigned int best_cost; unsigned int cost; - if (NULL != tree_find_peer(t->tree->root, peer->id)) - { - /* Already have it, nothing to do. */ - return; - } - - t->peers_total++; - GNUNET_array_append (peer->tunnels, peer->ntunnels, t); - if (NULL == (p = peer->path_head)) + GNUNET_PEER_resolve(peer->id, &id); + if (GNUNET_NO == + GNUNET_CONTAINER_multihashmap_contains(t->peers, &id.hashPubKey)) { - GNUNET_break (0); - return; + t->peers_total++; + GNUNET_array_append (peer->tunnels, peer->ntunnels, t); + GNUNET_CONTAINER_multihashmap_put( + t->peers, + &id.hashPubKey, + peer, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } - best_p = p; - best_cost = UINT_MAX; - while (NULL != p) + if (NULL != (p = peer->path_head)) { - if ((cost = path_get_cost (t->tree, p)) < best_cost) + best_p = p; + best_cost = UINT_MAX; + while (NULL != p) { - best_cost = cost; - best_p = p; + if ((cost = path_get_cost (t->tree, p)) < best_cost) + { + best_cost = cost; + best_p = p; + } + p = p->next; } - p = p->next; + tree_add_path (t->tree, best_p, ¬ify_peer_disconnected); + if (GNUNET_SCHEDULER_NO_TASK == t->path_refresh_task) + t->path_refresh_task = + GNUNET_SCHEDULER_add_delayed (t->tree->refresh, &path_refresh, t); + } + else + { + /* Start a DHT get if necessary */ + peer_info_connect(peer, t); } - tree_add_path (t->tree, best_p, ¬ify_peer_disconnected); - if (GNUNET_SCHEDULER_NO_TASK == t->path_refresh_task) - t->path_refresh_task = - GNUNET_SCHEDULER_add_delayed (t->tree->refresh, &path_refresh, t); } @@ -1240,7 +1479,6 @@ tunnel_destroy_iterator (void *cls, const GNUNET_HashCode * key, void *value) /**************** MESH NETWORK HANDLER HELPERS ***********************/ /******************************************************************************/ - /** * Function called to notify a client about the socket * being ready to queue more data. "buf" will be @@ -1519,54 +1757,6 @@ send_p2p_tunnel_destroy (void *cls, size_t size, void *buf) #endif -/** - * Send the message to all clients that have subscribed to its type - * - * @param msg Pointer to the message itself - * @return number of clients this message was sent to - */ -static unsigned int -send_subscribed_clients (struct GNUNET_MessageHeader *msg) -{ - struct MeshClient *c; - unsigned int count; - uint16_t type; - - type = ntohs (msg->type); - for (count = 0, c = clients; c != NULL; c = c->next) - { - if (client_is_subscribed (type, c)) - { - count++; - GNUNET_SERVER_notification_context_unicast (nc, c->handle, msg, - GNUNET_YES); - } - } - return count; -} - - - -/** - * Notify the client that owns the tunnel that a peer has connected to it - * - * @param t Tunnel whose owner to notify - * @param id Short id of the peer that has connected - */ -static void -send_client_peer_connected (const struct MeshTunnel *t, const GNUNET_PEER_Id id) -{ - struct GNUNET_MESH_PeerControl pc; - - pc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_PEER_ADD); - pc.header.size = htons (sizeof (struct GNUNET_MESH_PeerControl)); - pc.tunnel_id = htonl (t->local_tid); - GNUNET_PEER_resolve (id, &pc.peer); - GNUNET_SERVER_notification_context_unicast (nc, t->client->handle, - &pc.header, GNUNET_NO); -} - - /******************************************************************************/ /******************** MESH NETWORK HANDLERS **************************/ /******************************************************************************/ @@ -2185,9 +2375,7 @@ dht_get_id_handler (void *cls, struct GNUNET_TIME_Absolute exp, enum GNUNET_BLOCK_Type type, size_t size, const void *data) { struct MeshPathInfo *path_info = cls; - struct MeshPathInfo *path_info_aux; struct MeshPeerPath *p; - struct MeshPeerPath *aux; struct GNUNET_PeerIdentity pi; int i; @@ -2197,63 +2385,14 @@ dht_get_id_handler (void *cls, struct GNUNET_TIME_Absolute exp, GNUNET_h2s_full(&pi.hashPubKey)); GNUNET_DHT_get_stop(path_info->peer->dhtget); path_info->peer->dhtget = NULL; - if (NULL == get_path || NULL == put_path) - { - if (NULL == path_info->peer->path_head) - { - // Find ourselves some alternate initial path to the destination: retry - GNUNET_PEER_resolve(path_info->peer->id, &pi); - path_info->peer->dhtget = - GNUNET_DHT_get_start(dht_handle, /* handle */ - GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */ - GNUNET_BLOCK_TYPE_TEST, /* type */ - &pi.hashPubKey, /*key to search */ - 4, /* replication level */ - GNUNET_DHT_RO_RECORD_ROUTE | - GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, - NULL, /* xquery */ - 0, /* xquery bits */ - &dht_get_id_handler, - (void *) path_info); - return; - } - } - p = path_build_from_dht (get_path, get_path_length, put_path, put_path_length); + p = path_build_from_dht (get_path, get_path_length, + put_path, put_path_length); path_add_to_peer (path_info->peer, p); for (i = 0; i < path_info->peer->ntunnels; i++) { tunnel_add_peer (path_info->peer->tunnels[i], path_info->peer); - aux = tree_get_path_to_peer(path_info->peer->tunnels[i]->tree, - path_info->peer->id); - if (aux->length > 1) - { - struct GNUNET_PeerIdentity id; - - path_info_aux = GNUNET_malloc (sizeof (struct MeshPathInfo)); - path_info_aux->path = aux; - path_info_aux->peer = path_info->peer; - path_info_aux->t = path_info->t; - GNUNET_PEER_resolve (p->peers[1], &id); - GNUNET_CORE_notify_transmit_ready (core_handle, /* handle */ - 0, /* cork */ - 0, /* priority */ - GNUNET_TIME_UNIT_FOREVER_REL, - /* timeout */ - &id, /* target */ - sizeof (struct GNUNET_MESH_ManipulatePath) - + - (aux->length * - sizeof (struct GNUNET_PeerIdentity)), - /*size */ - &send_core_create_path, - /* callback */ - path_info_aux); /* cls */ - } - else - { - send_client_peer_connected(path_info->t, myid); - } + peer_info_connect(path_info->peer, path_info->t); } GNUNET_free (path_info); @@ -3214,34 +3353,7 @@ core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) } for (i = 0; i < CORE_QUEUE_SIZE; i++) { - if (pi->core_transmit[i]) - { - struct MeshDataDescriptor *dd; - struct MeshPathInfo *path_info; - GNUNET_CORE_notify_transmit_ready_cancel (pi->core_transmit[i]); - /* TODO: notify that tranmission has failed */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: data at %u\n", i); - switch (pi->types[i]) - { - case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: - case GNUNET_MESSAGE_TYPE_MESH_UNICAST: - case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: type payload\n"); - dd = pi->infos[i]; - if (0 == --(*dd->copies)) - { - GNUNET_free (dd->copies); - GNUNET_free (dd->data); - } - break; - case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: type create path\n"); - path_info = pi->infos[i]; - path_destroy(path_info->path); - break; - } - GNUNET_free (pi->infos[i]); - } + peer_info_cancel_transmission(pi, i); } path_remove_from_peer (pi, pi->id, myid); if (myid == pi->id) -- cgit v1.2.3