aboutsummaryrefslogtreecommitdiff
path: root/src/rps
diff options
context:
space:
mode:
authorJulius Bünger <buenger@mytum.de>2015-08-05 21:47:36 +0000
committerJulius Bünger <buenger@mytum.de>2015-08-05 21:47:36 +0000
commit688f62f7a0fcfc85a3bf611ff855edff541dc52a (patch)
treeea3a09a9be370bf9993dad514c7569de9035ba76 /src/rps
parent74236fe22800e18c5c776d32bbdd1fc9b948a00e (diff)
downloadgnunet-688f62f7a0fcfc85a3bf611ff855edff541dc52a.tar.gz
gnunet-688f62f7a0fcfc85a3bf611ff855edff541dc52a.zip
-keep track of messages passed to mq
Diffstat (limited to 'src/rps')
-rw-r--r--src/rps/gnunet-service-rps.c115
1 files changed, 104 insertions, 11 deletions
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 5d9d2f9c6..77fe80351 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -183,6 +183,34 @@ struct PeerOutstandingOp
183 183
184 184
185/** 185/**
186 * List containing all messages that are yet to be send
187 */
188struct PendingMessage
189{
190 /**
191 * DLL next, prev
192 */
193 struct PendingMessage *next;
194 struct PendingMessage *prev;
195
196 /**
197 * The envelope to the corresponding message
198 */
199 struct GNUNET_MQ_Envelope *ev;
200
201 /**
202 * The corresponding context
203 */
204 struct PeerContext *peer_ctx;
205
206 /**
207 * The message type
208 */
209 const char *type;
210};
211
212
213/**
186 * Struct used to keep track of other peer's status 214 * Struct used to keep track of other peer's status
187 * 215 *
188 * This is stored in a multipeermap. 216 * This is stored in a multipeermap.
@@ -232,6 +260,12 @@ struct PeerContext
232 uint32_t peer_flags; 260 uint32_t peer_flags;
233 261
234 /** 262 /**
263 * DLL with all messages that are yet to be sent
264 */
265 struct PendingMessage *pending_messages_head;
266 struct PendingMessage *pending_messages_tail;
267
268 /**
235 * This is pobably followed by 'statistical' data (when we first saw 269 * This is pobably followed by 'statistical' data (when we first saw
236 * him, how did we get his ID, how many pushes (in a timeinterval), 270 * him, how did we get his ID, how many pushes (in a timeinterval),
237 * ...) 271 * ...)
@@ -1148,6 +1182,45 @@ add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
1148 1182
1149 1183
1150/** 1184/**
1185 * @brief Add an envelope to a message passed to mq to list of pending messages
1186 *
1187 * @param peer peer the message was sent to
1188 * @param ev envelope to the message
1189 * @param type type of the message to be sent
1190 */
1191static struct PendingMessage *
1192insert_pending_message (const struct GNUNET_PeerIdentity *peer,
1193 struct GNUNET_MQ_Envelope *ev,
1194 const char *type)
1195{
1196 struct PendingMessage *pending_msg;
1197 struct PeerContext *peer_ctx;
1198
1199 peer_ctx = get_peer_ctx (peer);
1200 pending_msg = GNUNET_new (struct PendingMessage);
1201 pending_msg->ev = ev;
1202 pending_msg->peer_ctx = peer_ctx;
1203 pending_msg->type = type;
1204 GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1205 peer_ctx->pending_messages_tail,
1206 pending_msg);
1207 return pending_msg;
1208}
1209
1210static void
1211remove_pending_message (struct PendingMessage *pending_msg)
1212{
1213 struct PeerContext *peer_ctx;
1214
1215 peer_ctx = pending_msg->peer_ctx;
1216 GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1217 peer_ctx->pending_messages_tail,
1218 pending_msg);
1219 GNUNET_free (pending_msg);
1220}
1221
1222
1223/**
1151 * @brief This is called once a message is sent. 1224 * @brief This is called once a message is sent.
1152 * 1225 *
1153 * @param cls type of the message that was sent 1226 * @param cls type of the message that was sent
@@ -1155,10 +1228,11 @@ add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
1155static void 1228static void
1156mq_notify_sent_cb (void *cls) 1229mq_notify_sent_cb (void *cls)
1157{ 1230{
1158 const char *type = cls; 1231 struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1159 LOG (GNUNET_ERROR_TYPE_DEBUG, 1232 LOG (GNUNET_ERROR_TYPE_DEBUG,
1160 "%s was sent.\n", 1233 "%s was sent.\n",
1161 type); 1234 pending_msg->type);
1235 remove_pending_message (pending_msg);
1162} 1236}
1163 1237
1164 1238
@@ -1178,6 +1252,7 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
1178 struct GNUNET_MQ_Handle *mq; 1252 struct GNUNET_MQ_Handle *mq;
1179 struct GNUNET_MQ_Envelope *ev; 1253 struct GNUNET_MQ_Envelope *ev;
1180 struct GNUNET_RPS_P2P_PullReplyMessage *out_msg; 1254 struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
1255 struct PendingMessage *pending_msg;
1181 1256
1182 /* Compute actual size */ 1257 /* Compute actual size */
1183 send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) + 1258 send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
@@ -1208,9 +1283,10 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
1208 memcpy (&out_msg[1], peer_ids, 1283 memcpy (&out_msg[1], peer_ids,
1209 send_size * sizeof (struct GNUNET_PeerIdentity)); 1284 send_size * sizeof (struct GNUNET_PeerIdentity));
1210 1285
1286 pending_msg = insert_pending_message (peer_id, ev, "PULL REPLY");
1211 GNUNET_MQ_notify_sent (ev, 1287 GNUNET_MQ_notify_sent (ev,
1212 mq_notify_sent_cb, 1288 mq_notify_sent_cb,
1213 "PULL REPLY"); 1289 pending_msg);
1214 GNUNET_MQ_send (mq, ev); 1290 GNUNET_MQ_send (mq, ev);
1215} 1291}
1216 1292
@@ -1923,11 +1999,12 @@ compute_rand_delay (struct GNUNET_TIME_Relative mean, unsigned int spread)
1923 * @param peer_id the peer to send the pull request to. 1999 * @param peer_id the peer to send the pull request to.
1924 */ 2000 */
1925static void 2001static void
1926send_pull_request (struct GNUNET_PeerIdentity *peer_id) 2002send_pull_request (const struct GNUNET_PeerIdentity *peer_id)
1927{ 2003{
1928 struct GNUNET_MQ_Envelope *ev; 2004 struct GNUNET_MQ_Envelope *ev;
1929 struct GNUNET_MQ_Handle *mq; 2005 struct GNUNET_MQ_Handle *mq;
1930 struct PeerContext *peer_ctx; 2006 struct PeerContext *peer_ctx;
2007 struct PendingMessage *pending_msg;
1931 2008
1932 peer_ctx = get_peer_ctx (peer_id); 2009 peer_ctx = get_peer_ctx (peer_id);
1933 GNUNET_assert (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)); 2010 GNUNET_assert (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING));
@@ -1939,9 +2016,10 @@ send_pull_request (struct GNUNET_PeerIdentity *peer_id)
1939 2016
1940 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); 2017 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1941 mq = get_mq (peer_id); 2018 mq = get_mq (peer_id);
2019 pending_msg = insert_pending_message (peer_id, ev, "PULL REQUEST");
1942 GNUNET_MQ_notify_sent (ev, 2020 GNUNET_MQ_notify_sent (ev,
1943 mq_notify_sent_cb, 2021 mq_notify_sent_cb,
1944 "PULL REQUEST"); 2022 pending_msg);
1945 GNUNET_MQ_send (mq, ev); 2023 GNUNET_MQ_send (mq, ev);
1946} 2024}
1947 2025
@@ -1952,10 +2030,11 @@ send_pull_request (struct GNUNET_PeerIdentity *peer_id)
1952 * @param peer_id the peer to send the push to. 2030 * @param peer_id the peer to send the push to.
1953 */ 2031 */
1954static void 2032static void
1955send_push (struct GNUNET_PeerIdentity *peer_id) 2033send_push (const struct GNUNET_PeerIdentity *peer_id)
1956{ 2034{
1957 struct GNUNET_MQ_Envelope *ev; 2035 struct GNUNET_MQ_Envelope *ev;
1958 struct GNUNET_MQ_Handle *mq; 2036 struct GNUNET_MQ_Handle *mq;
2037 struct PendingMessage *pending_msg;
1959 2038
1960 LOG (GNUNET_ERROR_TYPE_DEBUG, 2039 LOG (GNUNET_ERROR_TYPE_DEBUG,
1961 "Going to send PUSH to peer %s.\n", 2040 "Going to send PUSH to peer %s.\n",
@@ -1963,9 +2042,10 @@ send_push (struct GNUNET_PeerIdentity *peer_id)
1963 2042
1964 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); 2043 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
1965 mq = get_mq (peer_id); 2044 mq = get_mq (peer_id);
2045 pending_msg = insert_pending_message (peer_id, ev, "PUSH");
1966 GNUNET_MQ_notify_sent (ev, 2046 GNUNET_MQ_notify_sent (ev,
1967 mq_notify_sent_cb, 2047 mq_notify_sent_cb,
1968 "PUSH"); 2048 pending_msg);
1969 GNUNET_MQ_send (mq, ev); 2049 GNUNET_MQ_send (mq, ev);
1970} 2050}
1971 2051
@@ -2542,7 +2622,7 @@ peer_remove_cb (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
2542 GNUNET_i2s (&peer_ctx->peer_id)); 2622 GNUNET_i2s (&peer_ctx->peer_id));
2543 2623
2544 /* Remove it from the sampler used for the Brahms protocol */ 2624 /* Remove it from the sampler used for the Brahms protocol */
2545 RPS_sampler_reinitialise_by_value (prot_sampler, key); 2625 RPS_sampler_reinitialise_by_value (prot_sampler, key);
2546 2626
2547 /* If operations are still scheduled for this peer cancel those */ 2627 /* If operations are still scheduled for this peer cancel those */
2548 if (0 != peer_ctx->num_outstanding_ops) 2628 if (0 != peer_ctx->num_outstanding_ops)
@@ -2585,6 +2665,18 @@ peer_remove_cb (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
2585 if (GNUNET_YES == in_arr (pull_list, pull_list_size, key)) 2665 if (GNUNET_YES == in_arr (pull_list, pull_list_size, key))
2586 rem_from_list (&pull_list, &pull_list_size, key); 2666 rem_from_list (&pull_list, &pull_list_size, key);
2587 2667
2668 /* Cancle messages that have not been sent yet */
2669 while (NULL != peer_ctx->pending_messages_head)
2670 {
2671 LOG (GNUNET_ERROR_TYPE_DEBUG,
2672 "Removing unsent %s\n",
2673 peer_ctx->pending_messages_head->type);
2674 /* We are not able to cancel messages as #GNUNET_CADET_mq_create () does not
2675 * set a #GNUNET_MQ_CancelImpl */
2676 /* GNUNET_MQ_send_cancel (peer_ctx->pending_messages_head->ev); */
2677 remove_pending_message (peer_ctx->pending_messages_head);
2678 }
2679
2588 /* If there is still a mq destroy it */ 2680 /* If there is still a mq destroy it */
2589 if (NULL != peer_ctx->mq) 2681 if (NULL != peer_ctx->mq)
2590 { 2682 {
@@ -2636,19 +2728,20 @@ peer_clean (const struct GNUNET_PeerIdentity *peer)
2636 2728
2637 if ( (0 == RPS_sampler_count_id (prot_sampler, peer)) && 2729 if ( (0 == RPS_sampler_count_id (prot_sampler, peer)) &&
2638 (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (view, peer)) && 2730 (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (view, peer)) &&
2639 (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) &&
2640 (GNUNET_NO == in_arr (push_list, push_list_size, peer)) && 2731 (GNUNET_NO == in_arr (push_list, push_list_size, peer)) &&
2641 (GNUNET_NO == in_arr (pull_list, pull_list_size, peer)) ) 2732 (GNUNET_NO == in_arr (pull_list, pull_list_size, peer)) &&
2733 (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) )
2642 { 2734 {
2643 peer_ctx = get_peer_ctx (peer); 2735 peer_ctx = get_peer_ctx (peer);
2644 2736
2645 if ( (NULL == peer_ctx->recv_channel) && 2737 if ( (NULL == peer_ctx->recv_channel) &&
2738 (NULL == peer_ctx->pending_messages_head) &&
2646 (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)) ) 2739 (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)) )
2647 { 2740 {
2648 #ifdef ENABLE_MALICIOUS 2741 #ifdef ENABLE_MALICIOUS
2649 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) 2742 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2650 peer_remove_cb (NULL, peer, peer_ctx); 2743 peer_remove_cb (NULL, peer, peer_ctx);
2651 #else 2744 #else /* ENABLE_MALICIOUS */
2652 peer_remove_cb (NULL, peer, peer_ctx); 2745 peer_remove_cb (NULL, peer, peer_ctx);
2653 #endif /* ENABLE_MALICIOUS */ 2746 #endif /* ENABLE_MALICIOUS */
2654 } 2747 }