diff options
author | Julius Bünger <buenger@mytum.de> | 2015-08-05 21:47:36 +0000 |
---|---|---|
committer | Julius Bünger <buenger@mytum.de> | 2015-08-05 21:47:36 +0000 |
commit | 688f62f7a0fcfc85a3bf611ff855edff541dc52a (patch) | |
tree | ea3a09a9be370bf9993dad514c7569de9035ba76 /src/rps | |
parent | 74236fe22800e18c5c776d32bbdd1fc9b948a00e (diff) | |
download | gnunet-688f62f7a0fcfc85a3bf611ff855edff541dc52a.tar.gz gnunet-688f62f7a0fcfc85a3bf611ff855edff541dc52a.zip |
-keep track of messages passed to mq
Diffstat (limited to 'src/rps')
-rw-r--r-- | src/rps/gnunet-service-rps.c | 115 |
1 files changed, 104 insertions, 11 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 5d9d2f9c6..77fe80351 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c | |||
@@ -183,6 +183,34 @@ struct PeerOutstandingOp | |||
183 | 183 | ||
184 | 184 | ||
185 | /** | 185 | /** |
186 | * List containing all messages that are yet to be send | ||
187 | */ | ||
188 | struct PendingMessage | ||
189 | { | ||
190 | /** | ||
191 | * DLL next, prev | ||
192 | */ | ||
193 | struct PendingMessage *next; | ||
194 | struct PendingMessage *prev; | ||
195 | |||
196 | /** | ||
197 | * The envelope to the corresponding message | ||
198 | */ | ||
199 | struct GNUNET_MQ_Envelope *ev; | ||
200 | |||
201 | /** | ||
202 | * The corresponding context | ||
203 | */ | ||
204 | struct PeerContext *peer_ctx; | ||
205 | |||
206 | /** | ||
207 | * The message type | ||
208 | */ | ||
209 | const char *type; | ||
210 | }; | ||
211 | |||
212 | |||
213 | /** | ||
186 | * Struct used to keep track of other peer's status | 214 | * Struct used to keep track of other peer's status |
187 | * | 215 | * |
188 | * This is stored in a multipeermap. | 216 | * This is stored in a multipeermap. |
@@ -232,6 +260,12 @@ struct PeerContext | |||
232 | uint32_t peer_flags; | 260 | uint32_t peer_flags; |
233 | 261 | ||
234 | /** | 262 | /** |
263 | * DLL with all messages that are yet to be sent | ||
264 | */ | ||
265 | struct PendingMessage *pending_messages_head; | ||
266 | struct PendingMessage *pending_messages_tail; | ||
267 | |||
268 | /** | ||
235 | * This is pobably followed by 'statistical' data (when we first saw | 269 | * This is pobably followed by 'statistical' data (when we first saw |
236 | * him, how did we get his ID, how many pushes (in a timeinterval), | 270 | * him, how did we get his ID, how many pushes (in a timeinterval), |
237 | * ...) | 271 | * ...) |
@@ -1148,6 +1182,45 @@ add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array, | |||
1148 | 1182 | ||
1149 | 1183 | ||
1150 | /** | 1184 | /** |
1185 | * @brief Add an envelope to a message passed to mq to list of pending messages | ||
1186 | * | ||
1187 | * @param peer peer the message was sent to | ||
1188 | * @param ev envelope to the message | ||
1189 | * @param type type of the message to be sent | ||
1190 | */ | ||
1191 | static struct PendingMessage * | ||
1192 | insert_pending_message (const struct GNUNET_PeerIdentity *peer, | ||
1193 | struct GNUNET_MQ_Envelope *ev, | ||
1194 | const char *type) | ||
1195 | { | ||
1196 | struct PendingMessage *pending_msg; | ||
1197 | struct PeerContext *peer_ctx; | ||
1198 | |||
1199 | peer_ctx = get_peer_ctx (peer); | ||
1200 | pending_msg = GNUNET_new (struct PendingMessage); | ||
1201 | pending_msg->ev = ev; | ||
1202 | pending_msg->peer_ctx = peer_ctx; | ||
1203 | pending_msg->type = type; | ||
1204 | GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head, | ||
1205 | peer_ctx->pending_messages_tail, | ||
1206 | pending_msg); | ||
1207 | return pending_msg; | ||
1208 | } | ||
1209 | |||
1210 | static void | ||
1211 | remove_pending_message (struct PendingMessage *pending_msg) | ||
1212 | { | ||
1213 | struct PeerContext *peer_ctx; | ||
1214 | |||
1215 | peer_ctx = pending_msg->peer_ctx; | ||
1216 | GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head, | ||
1217 | peer_ctx->pending_messages_tail, | ||
1218 | pending_msg); | ||
1219 | GNUNET_free (pending_msg); | ||
1220 | } | ||
1221 | |||
1222 | |||
1223 | /** | ||
1151 | * @brief This is called once a message is sent. | 1224 | * @brief This is called once a message is sent. |
1152 | * | 1225 | * |
1153 | * @param cls type of the message that was sent | 1226 | * @param cls type of the message that was sent |
@@ -1155,10 +1228,11 @@ add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array, | |||
1155 | static void | 1228 | static void |
1156 | mq_notify_sent_cb (void *cls) | 1229 | mq_notify_sent_cb (void *cls) |
1157 | { | 1230 | { |
1158 | const char *type = cls; | 1231 | struct PendingMessage *pending_msg = (struct PendingMessage *) cls; |
1159 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1232 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1160 | "%s was sent.\n", | 1233 | "%s was sent.\n", |
1161 | type); | 1234 | pending_msg->type); |
1235 | remove_pending_message (pending_msg); | ||
1162 | } | 1236 | } |
1163 | 1237 | ||
1164 | 1238 | ||
@@ -1178,6 +1252,7 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id, | |||
1178 | struct GNUNET_MQ_Handle *mq; | 1252 | struct GNUNET_MQ_Handle *mq; |
1179 | struct GNUNET_MQ_Envelope *ev; | 1253 | struct GNUNET_MQ_Envelope *ev; |
1180 | struct GNUNET_RPS_P2P_PullReplyMessage *out_msg; | 1254 | struct GNUNET_RPS_P2P_PullReplyMessage *out_msg; |
1255 | struct PendingMessage *pending_msg; | ||
1181 | 1256 | ||
1182 | /* Compute actual size */ | 1257 | /* Compute actual size */ |
1183 | send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) + | 1258 | send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) + |
@@ -1208,9 +1283,10 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id, | |||
1208 | memcpy (&out_msg[1], peer_ids, | 1283 | memcpy (&out_msg[1], peer_ids, |
1209 | send_size * sizeof (struct GNUNET_PeerIdentity)); | 1284 | send_size * sizeof (struct GNUNET_PeerIdentity)); |
1210 | 1285 | ||
1286 | pending_msg = insert_pending_message (peer_id, ev, "PULL REPLY"); | ||
1211 | GNUNET_MQ_notify_sent (ev, | 1287 | GNUNET_MQ_notify_sent (ev, |
1212 | mq_notify_sent_cb, | 1288 | mq_notify_sent_cb, |
1213 | "PULL REPLY"); | 1289 | pending_msg); |
1214 | GNUNET_MQ_send (mq, ev); | 1290 | GNUNET_MQ_send (mq, ev); |
1215 | } | 1291 | } |
1216 | 1292 | ||
@@ -1923,11 +1999,12 @@ compute_rand_delay (struct GNUNET_TIME_Relative mean, unsigned int spread) | |||
1923 | * @param peer_id the peer to send the pull request to. | 1999 | * @param peer_id the peer to send the pull request to. |
1924 | */ | 2000 | */ |
1925 | static void | 2001 | static void |
1926 | send_pull_request (struct GNUNET_PeerIdentity *peer_id) | 2002 | send_pull_request (const struct GNUNET_PeerIdentity *peer_id) |
1927 | { | 2003 | { |
1928 | struct GNUNET_MQ_Envelope *ev; | 2004 | struct GNUNET_MQ_Envelope *ev; |
1929 | struct GNUNET_MQ_Handle *mq; | 2005 | struct GNUNET_MQ_Handle *mq; |
1930 | struct PeerContext *peer_ctx; | 2006 | struct PeerContext *peer_ctx; |
2007 | struct PendingMessage *pending_msg; | ||
1931 | 2008 | ||
1932 | peer_ctx = get_peer_ctx (peer_id); | 2009 | peer_ctx = get_peer_ctx (peer_id); |
1933 | GNUNET_assert (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)); | 2010 | GNUNET_assert (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)); |
@@ -1939,9 +2016,10 @@ send_pull_request (struct GNUNET_PeerIdentity *peer_id) | |||
1939 | 2016 | ||
1940 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); | 2017 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); |
1941 | mq = get_mq (peer_id); | 2018 | mq = get_mq (peer_id); |
2019 | pending_msg = insert_pending_message (peer_id, ev, "PULL REQUEST"); | ||
1942 | GNUNET_MQ_notify_sent (ev, | 2020 | GNUNET_MQ_notify_sent (ev, |
1943 | mq_notify_sent_cb, | 2021 | mq_notify_sent_cb, |
1944 | "PULL REQUEST"); | 2022 | pending_msg); |
1945 | GNUNET_MQ_send (mq, ev); | 2023 | GNUNET_MQ_send (mq, ev); |
1946 | } | 2024 | } |
1947 | 2025 | ||
@@ -1952,10 +2030,11 @@ send_pull_request (struct GNUNET_PeerIdentity *peer_id) | |||
1952 | * @param peer_id the peer to send the push to. | 2030 | * @param peer_id the peer to send the push to. |
1953 | */ | 2031 | */ |
1954 | static void | 2032 | static void |
1955 | send_push (struct GNUNET_PeerIdentity *peer_id) | 2033 | send_push (const struct GNUNET_PeerIdentity *peer_id) |
1956 | { | 2034 | { |
1957 | struct GNUNET_MQ_Envelope *ev; | 2035 | struct GNUNET_MQ_Envelope *ev; |
1958 | struct GNUNET_MQ_Handle *mq; | 2036 | struct GNUNET_MQ_Handle *mq; |
2037 | struct PendingMessage *pending_msg; | ||
1959 | 2038 | ||
1960 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2039 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1961 | "Going to send PUSH to peer %s.\n", | 2040 | "Going to send PUSH to peer %s.\n", |
@@ -1963,9 +2042,10 @@ send_push (struct GNUNET_PeerIdentity *peer_id) | |||
1963 | 2042 | ||
1964 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); | 2043 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); |
1965 | mq = get_mq (peer_id); | 2044 | mq = get_mq (peer_id); |
2045 | pending_msg = insert_pending_message (peer_id, ev, "PUSH"); | ||
1966 | GNUNET_MQ_notify_sent (ev, | 2046 | GNUNET_MQ_notify_sent (ev, |
1967 | mq_notify_sent_cb, | 2047 | mq_notify_sent_cb, |
1968 | "PUSH"); | 2048 | pending_msg); |
1969 | GNUNET_MQ_send (mq, ev); | 2049 | GNUNET_MQ_send (mq, ev); |
1970 | } | 2050 | } |
1971 | 2051 | ||
@@ -2542,7 +2622,7 @@ peer_remove_cb (void *cls, const struct GNUNET_PeerIdentity *key, void *value) | |||
2542 | GNUNET_i2s (&peer_ctx->peer_id)); | 2622 | GNUNET_i2s (&peer_ctx->peer_id)); |
2543 | 2623 | ||
2544 | /* Remove it from the sampler used for the Brahms protocol */ | 2624 | /* Remove it from the sampler used for the Brahms protocol */ |
2545 | RPS_sampler_reinitialise_by_value (prot_sampler, key); | 2625 | RPS_sampler_reinitialise_by_value (prot_sampler, key); |
2546 | 2626 | ||
2547 | /* If operations are still scheduled for this peer cancel those */ | 2627 | /* If operations are still scheduled for this peer cancel those */ |
2548 | if (0 != peer_ctx->num_outstanding_ops) | 2628 | if (0 != peer_ctx->num_outstanding_ops) |
@@ -2585,6 +2665,18 @@ peer_remove_cb (void *cls, const struct GNUNET_PeerIdentity *key, void *value) | |||
2585 | if (GNUNET_YES == in_arr (pull_list, pull_list_size, key)) | 2665 | if (GNUNET_YES == in_arr (pull_list, pull_list_size, key)) |
2586 | rem_from_list (&pull_list, &pull_list_size, key); | 2666 | rem_from_list (&pull_list, &pull_list_size, key); |
2587 | 2667 | ||
2668 | /* Cancle messages that have not been sent yet */ | ||
2669 | while (NULL != peer_ctx->pending_messages_head) | ||
2670 | { | ||
2671 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2672 | "Removing unsent %s\n", | ||
2673 | peer_ctx->pending_messages_head->type); | ||
2674 | /* We are not able to cancel messages as #GNUNET_CADET_mq_create () does not | ||
2675 | * set a #GNUNET_MQ_CancelImpl */ | ||
2676 | /* GNUNET_MQ_send_cancel (peer_ctx->pending_messages_head->ev); */ | ||
2677 | remove_pending_message (peer_ctx->pending_messages_head); | ||
2678 | } | ||
2679 | |||
2588 | /* If there is still a mq destroy it */ | 2680 | /* If there is still a mq destroy it */ |
2589 | if (NULL != peer_ctx->mq) | 2681 | if (NULL != peer_ctx->mq) |
2590 | { | 2682 | { |
@@ -2636,19 +2728,20 @@ peer_clean (const struct GNUNET_PeerIdentity *peer) | |||
2636 | 2728 | ||
2637 | if ( (0 == RPS_sampler_count_id (prot_sampler, peer)) && | 2729 | if ( (0 == RPS_sampler_count_id (prot_sampler, peer)) && |
2638 | (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (view, peer)) && | 2730 | (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (view, peer)) && |
2639 | (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) && | ||
2640 | (GNUNET_NO == in_arr (push_list, push_list_size, peer)) && | 2731 | (GNUNET_NO == in_arr (push_list, push_list_size, peer)) && |
2641 | (GNUNET_NO == in_arr (pull_list, pull_list_size, peer)) ) | 2732 | (GNUNET_NO == in_arr (pull_list, pull_list_size, peer)) && |
2733 | (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) ) | ||
2642 | { | 2734 | { |
2643 | peer_ctx = get_peer_ctx (peer); | 2735 | peer_ctx = get_peer_ctx (peer); |
2644 | 2736 | ||
2645 | if ( (NULL == peer_ctx->recv_channel) && | 2737 | if ( (NULL == peer_ctx->recv_channel) && |
2738 | (NULL == peer_ctx->pending_messages_head) && | ||
2646 | (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)) ) | 2739 | (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)) ) |
2647 | { | 2740 | { |
2648 | #ifdef ENABLE_MALICIOUS | 2741 | #ifdef ENABLE_MALICIOUS |
2649 | if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) | 2742 | if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) |
2650 | peer_remove_cb (NULL, peer, peer_ctx); | 2743 | peer_remove_cb (NULL, peer, peer_ctx); |
2651 | #else | 2744 | #else /* ENABLE_MALICIOUS */ |
2652 | peer_remove_cb (NULL, peer, peer_ctx); | 2745 | peer_remove_cb (NULL, peer, peer_ctx); |
2653 | #endif /* ENABLE_MALICIOUS */ | 2746 | #endif /* ENABLE_MALICIOUS */ |
2654 | } | 2747 | } |