diff options
author | Bart Polot <bart@net.in.tum.de> | 2014-12-15 07:17:59 +0000 |
---|---|---|
committer | Bart Polot <bart@net.in.tum.de> | 2014-12-15 07:17:59 +0000 |
commit | 4955f2fcba51ac267ddc4ceeb1e0516f9386c58c (patch) | |
tree | 2fe8917c68720e7a3225233d517081946879b501 | |
parent | 38559b0298b07ee66d9840940d88ebdcce4880ed (diff) | |
download | gnunet-4955f2fcba51ac267ddc4ceeb1e0516f9386c58c.tar.gz gnunet-4955f2fcba51ac267ddc4ceeb1e0516f9386c58c.zip |
Send all queued messages that fit into the core buffer
- Add timing info for debug
-rw-r--r-- | src/cadet/gnunet-service-cadet_peer.c | 209 |
1 files changed, 115 insertions, 94 deletions
diff --git a/src/cadet/gnunet-service-cadet_peer.c b/src/cadet/gnunet-service-cadet_peer.c index 838a2d3de..e0a94b750 100644 --- a/src/cadet/gnunet-service-cadet_peer.c +++ b/src/cadet/gnunet-service-cadet_peer.c | |||
@@ -159,6 +159,11 @@ struct CadetPeer | |||
159 | struct GNUNET_CORE_TransmitHandle *core_transmit; | 159 | struct GNUNET_CORE_TransmitHandle *core_transmit; |
160 | 160 | ||
161 | /** | 161 | /** |
162 | * Timestamp | ||
163 | */ | ||
164 | struct GNUNET_TIME_Absolute tmt_time; | ||
165 | |||
166 | /** | ||
162 | * Transmission queue to core DLL head | 167 | * Transmission queue to core DLL head |
163 | */ | 168 | */ |
164 | struct CadetPeerQueue *queue_head; | 169 | struct CadetPeerQueue *queue_head; |
@@ -448,6 +453,7 @@ core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) | |||
448 | { | 453 | { |
449 | GNUNET_CORE_notify_transmit_ready_cancel (p->core_transmit); | 454 | GNUNET_CORE_notify_transmit_ready_cancel (p->core_transmit); |
450 | p->core_transmit = NULL; | 455 | p->core_transmit = NULL; |
456 | p->tmt_time.abs_value_us = 0; | ||
451 | } | 457 | } |
452 | GNUNET_STATISTICS_update (stats, "# peers", -1, GNUNET_NO); | 458 | GNUNET_STATISTICS_update (stats, "# peers", -1, GNUNET_NO); |
453 | 459 | ||
@@ -971,15 +977,24 @@ queue_send (void *cls, size_t size, void *buf) | |||
971 | struct CadetConnection *c; | 977 | struct CadetConnection *c; |
972 | struct CadetPeerQueue *queue; | 978 | struct CadetPeerQueue *queue; |
973 | const struct GNUNET_PeerIdentity *dst_id; | 979 | const struct GNUNET_PeerIdentity *dst_id; |
974 | size_t data_size; | 980 | size_t msg_size; |
981 | size_t total_size; | ||
982 | size_t rest; | ||
983 | char *dst; | ||
975 | uint32_t pid; | 984 | uint32_t pid; |
976 | 985 | ||
986 | rest = size; | ||
987 | total_size = 0; | ||
988 | dst = (char *) buf; | ||
977 | pid = 0; | 989 | pid = 0; |
978 | peer->core_transmit = NULL; | 990 | peer->core_transmit = NULL; |
991 | peer->tmt_time.abs_value_us = 0; | ||
992 | LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); | ||
993 | LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); | ||
979 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue send towards %s (max %u)\n", | 994 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue send towards %s (max %u)\n", |
980 | GCP_2s (peer), size); | 995 | GCP_2s (peer), size); |
981 | 996 | ||
982 | if (NULL == buf || 0 == size) | 997 | if (NULL == dst || 0 == size) |
983 | { | 998 | { |
984 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Buffer size 0.\n"); | 999 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Buffer size 0.\n"); |
985 | return 0; | 1000 | return 0; |
@@ -989,102 +1004,99 @@ queue_send (void *cls, size_t size, void *buf) | |||
989 | queue = peer_get_first_message (peer); | 1004 | queue = peer_get_first_message (peer); |
990 | if (NULL == queue) | 1005 | if (NULL == queue) |
991 | { | 1006 | { |
992 | GNUNET_assert (0); /* Core tmt_rdy should've been canceled */ | 1007 | GNUNET_break (0); /* Core tmt_rdy should've been canceled */ |
993 | return 0; | 1008 | return 0; |
994 | } | 1009 | } |
995 | c = queue->c; | ||
996 | 1010 | ||
997 | dst_id = GNUNET_PEER_resolve2 (peer->id); | 1011 | while (NULL != queue && rest >= queue->size) |
998 | LOG (GNUNET_ERROR_TYPE_DEBUG, " on connection %s %s\n", | ||
999 | GCC_2s (c), GC_f2s(queue->fwd)); | ||
1000 | /* Check if buffer size is enough for the message */ | ||
1001 | if (queue->size > size) | ||
1002 | { | 1012 | { |
1003 | LOG (GNUNET_ERROR_TYPE_WARNING, "not enough room (%u vs %u), reissue\n", | 1013 | c = queue->c; |
1004 | queue->size, size); | ||
1005 | peer->core_transmit = | ||
1006 | GNUNET_CORE_notify_transmit_ready (core_handle, | ||
1007 | GNUNET_NO, get_priority (queue), | ||
1008 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1009 | dst_id, | ||
1010 | queue->size, | ||
1011 | &queue_send, | ||
1012 | peer); | ||
1013 | return 0; | ||
1014 | } | ||
1015 | LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u ok\n", queue->size); | ||
1016 | 1014 | ||
1017 | /* Fill buf */ | 1015 | LOG (GNUNET_ERROR_TYPE_DEBUG, " on connection %s %s\n", |
1018 | switch (queue->type) | 1016 | GCC_2s (c), GC_f2s(queue->fwd)); |
1019 | { | ||
1020 | case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED: | ||
1021 | pid = GCC_get_pid (queue->c, queue->fwd); | ||
1022 | LOG (GNUNET_ERROR_TYPE_DEBUG, " payload ID %u\n", pid); | ||
1023 | data_size = send_core_data_raw (queue->cls, size, buf); | ||
1024 | ((struct GNUNET_CADET_Encrypted *) buf)->pid = htonl (pid); | ||
1025 | break; | ||
1026 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: | ||
1027 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: | ||
1028 | case GNUNET_MESSAGE_TYPE_CADET_KX: | ||
1029 | case GNUNET_MESSAGE_TYPE_CADET_ACK: | ||
1030 | case GNUNET_MESSAGE_TYPE_CADET_POLL: | ||
1031 | LOG (GNUNET_ERROR_TYPE_DEBUG, " raw %s\n", GC_m2s (queue->type)); | ||
1032 | data_size = send_core_data_raw (queue->cls, size, buf); | ||
1033 | break; | ||
1034 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: | ||
1035 | LOG (GNUNET_ERROR_TYPE_DEBUG, " path create\n"); | ||
1036 | if (GCC_is_origin (c, GNUNET_YES)) | ||
1037 | data_size = send_core_connection_create (c, size, buf); | ||
1038 | else | ||
1039 | data_size = send_core_data_raw (queue->cls, size, buf); | ||
1040 | break; | ||
1041 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: | ||
1042 | LOG (GNUNET_ERROR_TYPE_DEBUG, " path ack\n"); | ||
1043 | if (GCC_is_origin (c, GNUNET_NO) || | ||
1044 | GCC_is_origin (c, GNUNET_YES)) | ||
1045 | data_size = send_core_connection_ack (c, size, buf); | ||
1046 | else | ||
1047 | data_size = send_core_data_raw (queue->cls, size, buf); | ||
1048 | break; | ||
1049 | case GNUNET_MESSAGE_TYPE_CADET_DATA: | ||
1050 | case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE: | ||
1051 | case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY: | ||
1052 | /* This should be encapsulted */ | ||
1053 | GNUNET_break (0); | ||
1054 | data_size = 0; | ||
1055 | break; | ||
1056 | default: | ||
1057 | GNUNET_break (0); | ||
1058 | LOG (GNUNET_ERROR_TYPE_WARNING, " type unknown: %u\n", queue->type); | ||
1059 | data_size = 0; | ||
1060 | } | ||
1061 | 1017 | ||
1062 | if (0 < drop_percent && | 1018 | LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u ok (%u/%u)\n", |
1063 | GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 101) < drop_percent) | 1019 | queue->size, total_size, size); |
1064 | { | 1020 | |
1065 | LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on connection %s %s\n", | 1021 | /* Fill buf */ |
1066 | GC_m2s (queue->type), GC_m2s (queue->payload_type), queue->payload_id, | 1022 | switch (queue->type) |
1067 | GCC_2s (c), GC_f2s (queue->fwd)); | 1023 | { |
1068 | data_size = 0; | 1024 | case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED: |
1069 | } | 1025 | pid = GCC_get_pid (queue->c, queue->fwd); |
1070 | else | 1026 | LOG (GNUNET_ERROR_TYPE_DEBUG, " payload ID %u\n", pid); |
1071 | { | 1027 | msg_size = send_core_data_raw (queue->cls, size, dst); |
1072 | LOG (GNUNET_ERROR_TYPE_INFO, | 1028 | ((struct GNUNET_CADET_Encrypted *) dst)->pid = htonl (pid); |
1073 | "snd %s (%s %u) on connection %s (%p) %s (size %u)\n", | 1029 | break; |
1074 | GC_m2s (queue->type), GC_m2s (queue->payload_type), | 1030 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY: |
1075 | queue->payload_id, GCC_2s (c), c, GC_f2s (queue->fwd), data_size); | 1031 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN: |
1076 | } | 1032 | case GNUNET_MESSAGE_TYPE_CADET_KX: |
1033 | case GNUNET_MESSAGE_TYPE_CADET_ACK: | ||
1034 | case GNUNET_MESSAGE_TYPE_CADET_POLL: | ||
1035 | LOG (GNUNET_ERROR_TYPE_DEBUG, " raw %s\n", GC_m2s (queue->type)); | ||
1036 | msg_size = send_core_data_raw (queue->cls, size, dst); | ||
1037 | break; | ||
1038 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE: | ||
1039 | LOG (GNUNET_ERROR_TYPE_DEBUG, " path create\n"); | ||
1040 | if (GCC_is_origin (c, GNUNET_YES)) | ||
1041 | msg_size = send_core_connection_create (c, size, dst); | ||
1042 | else | ||
1043 | msg_size = send_core_data_raw (queue->cls, size, dst); | ||
1044 | break; | ||
1045 | case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK: | ||
1046 | LOG (GNUNET_ERROR_TYPE_DEBUG, " path ack\n"); | ||
1047 | if (GCC_is_origin (c, GNUNET_NO) || | ||
1048 | GCC_is_origin (c, GNUNET_YES)) | ||
1049 | msg_size = send_core_connection_ack (c, size, dst); | ||
1050 | else | ||
1051 | msg_size = send_core_data_raw (queue->cls, size, dst); | ||
1052 | break; | ||
1053 | case GNUNET_MESSAGE_TYPE_CADET_DATA: | ||
1054 | case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE: | ||
1055 | case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY: | ||
1056 | /* This should be encapsulted */ | ||
1057 | GNUNET_break (0); | ||
1058 | msg_size = 0; | ||
1059 | break; | ||
1060 | default: | ||
1061 | GNUNET_break (0); | ||
1062 | LOG (GNUNET_ERROR_TYPE_WARNING, " type unknown: %u\n", queue->type); | ||
1063 | msg_size = 0; | ||
1064 | } | ||
1077 | 1065 | ||
1078 | /* Free queue, but cls was freed by send_core_*. */ | 1066 | if (0 < drop_percent && |
1079 | (void) GCP_queue_destroy (queue, GNUNET_NO, GNUNET_YES, pid); | 1067 | GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 101) < drop_percent) |
1068 | { | ||
1069 | LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on connection %s %s\n", | ||
1070 | GC_m2s (queue->type), GC_m2s (queue->payload_type), queue->payload_id, | ||
1071 | GCC_2s (c), GC_f2s (queue->fwd)); | ||
1072 | msg_size = 0; | ||
1073 | } | ||
1074 | else | ||
1075 | { | ||
1076 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
1077 | "snd %s (%s %u) on connection %s (%p) %s (size %u)\n", | ||
1078 | GC_m2s (queue->type), GC_m2s (queue->payload_type), | ||
1079 | queue->payload_id, GCC_2s (c), c, GC_f2s (queue->fwd), msg_size); | ||
1080 | } | ||
1081 | total_size += msg_size; | ||
1082 | rest -= msg_size; | ||
1083 | dst = &dst[msg_size]; | ||
1084 | msg_size = 0; | ||
1085 | |||
1086 | /* Free queue, but cls was freed by send_core_*. */ | ||
1087 | (void) GCP_queue_destroy (queue, GNUNET_NO, GNUNET_YES, pid); | ||
1088 | |||
1089 | /* Next! */ | ||
1090 | queue = peer_get_first_message (peer); | ||
1091 | } | ||
1080 | 1092 | ||
1081 | /* If more data in queue, send next */ | 1093 | /* If more data in queue, send next */ |
1082 | queue = peer_get_first_message (peer); | ||
1083 | if (NULL != queue) | 1094 | if (NULL != queue) |
1084 | { | 1095 | { |
1085 | LOG (GNUNET_ERROR_TYPE_DEBUG, " more data!\n"); | 1096 | LOG (GNUNET_ERROR_TYPE_DEBUG, " more data! (%u)\n", queue->size); |
1086 | if (NULL == peer->core_transmit) | 1097 | if (NULL == peer->core_transmit) |
1087 | { | 1098 | { |
1099 | dst_id = GNUNET_PEER_resolve2 (peer->id); | ||
1088 | peer->core_transmit = | 1100 | peer->core_transmit = |
1089 | GNUNET_CORE_notify_transmit_ready (core_handle, | 1101 | GNUNET_CORE_notify_transmit_ready (core_handle, |
1090 | GNUNET_NO, get_priority (queue), | 1102 | GNUNET_NO, get_priority (queue), |
@@ -1093,12 +1105,12 @@ queue_send (void *cls, size_t size, void *buf) | |||
1093 | queue->size, | 1105 | queue->size, |
1094 | &queue_send, | 1106 | &queue_send, |
1095 | peer); | 1107 | peer); |
1108 | peer->tmt_time = GNUNET_TIME_absolute_get (); | ||
1096 | queue->start_waiting = GNUNET_TIME_absolute_get (); | 1109 | queue->start_waiting = GNUNET_TIME_absolute_get (); |
1097 | } | 1110 | } |
1098 | else | 1111 | else |
1099 | { | 1112 | { |
1100 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1113 | LOG (GNUNET_ERROR_TYPE_DEBUG, "* tmt rdy called somewhere else\n"); |
1101 | "* tmt rdy called somewhere else\n"); | ||
1102 | } | 1114 | } |
1103 | // GCC_start_poll (); FIXME needed? | 1115 | // GCC_start_poll (); FIXME needed? |
1104 | } | 1116 | } |
@@ -1107,9 +1119,10 @@ queue_send (void *cls, size_t size, void *buf) | |||
1107 | // GCC_stop_poll(); FIXME needed? | 1119 | // GCC_stop_poll(); FIXME needed? |
1108 | } | 1120 | } |
1109 | 1121 | ||
1110 | LOG (GNUNET_ERROR_TYPE_DEBUG, " return %d\n", data_size); | 1122 | LOG (GNUNET_ERROR_TYPE_DEBUG, " return %d\n", total_size); |
1111 | queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG); | 1123 | queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG); |
1112 | return data_size; | 1124 | |
1125 | return total_size; | ||
1113 | } | 1126 | } |
1114 | 1127 | ||
1115 | 1128 | ||
@@ -1178,8 +1191,9 @@ GCP_queue_destroy (struct CadetPeerQueue *queue, int clear_cls, | |||
1178 | { | 1191 | { |
1179 | struct GNUNET_TIME_Relative core_wait_time; | 1192 | struct GNUNET_TIME_Relative core_wait_time; |
1180 | 1193 | ||
1181 | LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback\n"); | 1194 | core_wait_time = GNUNET_TIME_absolute_get_duration (peer->tmt_time); |
1182 | core_wait_time = GNUNET_TIME_absolute_get_duration (queue->start_waiting); | 1195 | LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback, time elapsed %s\n", |
1196 | GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_NO)); | ||
1183 | connection_destroyed = queue->callback (queue->callback_cls, | 1197 | connection_destroyed = queue->callback (queue->callback_cls, |
1184 | queue->c, sent, queue->type, pid, | 1198 | queue->c, sent, queue->type, pid, |
1185 | queue->fwd, queue->size, | 1199 | queue->fwd, queue->size, |
@@ -1194,6 +1208,7 @@ GCP_queue_destroy (struct CadetPeerQueue *queue, int clear_cls, | |||
1194 | { | 1208 | { |
1195 | GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); | 1209 | GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); |
1196 | peer->core_transmit = NULL; | 1210 | peer->core_transmit = NULL; |
1211 | peer->tmt_time.abs_value_us = 0; | ||
1197 | } | 1212 | } |
1198 | 1213 | ||
1199 | GNUNET_free (queue); | 1214 | GNUNET_free (queue); |
@@ -1282,6 +1297,7 @@ GCP_queue_add (struct CadetPeer *peer, void *cls, uint16_t type, | |||
1282 | call_core = GNUNET_YES; | 1297 | call_core = GNUNET_YES; |
1283 | } | 1298 | } |
1284 | 1299 | ||
1300 | q->start_waiting = GNUNET_TIME_absolute_get (); | ||
1285 | if (NULL == peer->core_transmit && GNUNET_YES == call_core) | 1301 | if (NULL == peer->core_transmit && GNUNET_YES == call_core) |
1286 | { | 1302 | { |
1287 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1303 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1293,7 +1309,7 @@ GCP_queue_add (struct CadetPeer *peer, void *cls, uint16_t type, | |||
1293 | GNUNET_TIME_UNIT_FOREVER_REL, | 1309 | GNUNET_TIME_UNIT_FOREVER_REL, |
1294 | GNUNET_PEER_resolve2 (peer->id), | 1310 | GNUNET_PEER_resolve2 (peer->id), |
1295 | size, &queue_send, peer); | 1311 | size, &queue_send, peer); |
1296 | q->start_waiting = GNUNET_TIME_absolute_get (); | 1312 | peer->tmt_time = GNUNET_TIME_absolute_get (); |
1297 | } | 1313 | } |
1298 | else if (GNUNET_NO == call_core) | 1314 | else if (GNUNET_NO == call_core) |
1299 | { | 1315 | { |
@@ -1303,8 +1319,11 @@ GCP_queue_add (struct CadetPeer *peer, void *cls, uint16_t type, | |||
1303 | } | 1319 | } |
1304 | else | 1320 | else |
1305 | { | 1321 | { |
1306 | LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s already called\n", | 1322 | struct GNUNET_TIME_Relative elapsed; |
1307 | GCP_2s (peer)); | 1323 | elapsed = GNUNET_TIME_absolute_get_duration (peer->tmt_time); |
1324 | LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s already called %s\n", | ||
1325 | GCP_2s (peer), | ||
1326 | GNUNET_STRINGS_relative_time_to_string (elapsed, GNUNET_NO)); | ||
1308 | 1327 | ||
1309 | } | 1328 | } |
1310 | queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG); | 1329 | queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG); |
@@ -1362,6 +1381,7 @@ GCP_queue_cancel (struct CadetPeer *peer, struct CadetConnection *c) | |||
1362 | { | 1381 | { |
1363 | GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); | 1382 | GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); |
1364 | peer->core_transmit = NULL; | 1383 | peer->core_transmit = NULL; |
1384 | peer->tmt_time.abs_value_us = 0; | ||
1365 | } | 1385 | } |
1366 | } | 1386 | } |
1367 | 1387 | ||
@@ -1502,6 +1522,7 @@ GCP_queue_unlock (struct CadetPeer *peer, struct CadetConnection *c) | |||
1502 | size, | 1522 | size, |
1503 | &queue_send, | 1523 | &queue_send, |
1504 | peer); | 1524 | peer); |
1525 | peer->tmt_time = GNUNET_TIME_absolute_get (); | ||
1505 | } | 1526 | } |
1506 | 1527 | ||
1507 | 1528 | ||