aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBart Polot <bart@net.in.tum.de>2014-12-15 07:17:59 +0000
committerBart Polot <bart@net.in.tum.de>2014-12-15 07:17:59 +0000
commit4955f2fcba51ac267ddc4ceeb1e0516f9386c58c (patch)
tree2fe8917c68720e7a3225233d517081946879b501
parent38559b0298b07ee66d9840940d88ebdcce4880ed (diff)
downloadgnunet-4955f2fcba51ac267ddc4ceeb1e0516f9386c58c.tar.gz
gnunet-4955f2fcba51ac267ddc4ceeb1e0516f9386c58c.zip
Send all queued messages that fit into the core buffer
- Add timing info for debug
-rw-r--r--src/cadet/gnunet-service-cadet_peer.c209
1 files changed, 115 insertions, 94 deletions
diff --git a/src/cadet/gnunet-service-cadet_peer.c b/src/cadet/gnunet-service-cadet_peer.c
index 838a2d3de..e0a94b750 100644
--- a/src/cadet/gnunet-service-cadet_peer.c
+++ b/src/cadet/gnunet-service-cadet_peer.c
@@ -159,6 +159,11 @@ struct CadetPeer
159 struct GNUNET_CORE_TransmitHandle *core_transmit; 159 struct GNUNET_CORE_TransmitHandle *core_transmit;
160 160
161 /** 161 /**
162 * Timestamp
163 */
164 struct GNUNET_TIME_Absolute tmt_time;
165
166 /**
162 * Transmission queue to core DLL head 167 * Transmission queue to core DLL head
163 */ 168 */
164 struct CadetPeerQueue *queue_head; 169 struct CadetPeerQueue *queue_head;
@@ -448,6 +453,7 @@ core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
448 { 453 {
449 GNUNET_CORE_notify_transmit_ready_cancel (p->core_transmit); 454 GNUNET_CORE_notify_transmit_ready_cancel (p->core_transmit);
450 p->core_transmit = NULL; 455 p->core_transmit = NULL;
456 p->tmt_time.abs_value_us = 0;
451 } 457 }
452 GNUNET_STATISTICS_update (stats, "# peers", -1, GNUNET_NO); 458 GNUNET_STATISTICS_update (stats, "# peers", -1, GNUNET_NO);
453 459
@@ -971,15 +977,24 @@ queue_send (void *cls, size_t size, void *buf)
971 struct CadetConnection *c; 977 struct CadetConnection *c;
972 struct CadetPeerQueue *queue; 978 struct CadetPeerQueue *queue;
973 const struct GNUNET_PeerIdentity *dst_id; 979 const struct GNUNET_PeerIdentity *dst_id;
974 size_t data_size; 980 size_t msg_size;
981 size_t total_size;
982 size_t rest;
983 char *dst;
975 uint32_t pid; 984 uint32_t pid;
976 985
986 rest = size;
987 total_size = 0;
988 dst = (char *) buf;
977 pid = 0; 989 pid = 0;
978 peer->core_transmit = NULL; 990 peer->core_transmit = NULL;
991 peer->tmt_time.abs_value_us = 0;
992 LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
993 LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
979 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue send towards %s (max %u)\n", 994 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue send towards %s (max %u)\n",
980 GCP_2s (peer), size); 995 GCP_2s (peer), size);
981 996
982 if (NULL == buf || 0 == size) 997 if (NULL == dst || 0 == size)
983 { 998 {
984 LOG (GNUNET_ERROR_TYPE_DEBUG, "Buffer size 0.\n"); 999 LOG (GNUNET_ERROR_TYPE_DEBUG, "Buffer size 0.\n");
985 return 0; 1000 return 0;
@@ -989,102 +1004,99 @@ queue_send (void *cls, size_t size, void *buf)
989 queue = peer_get_first_message (peer); 1004 queue = peer_get_first_message (peer);
990 if (NULL == queue) 1005 if (NULL == queue)
991 { 1006 {
992 GNUNET_assert (0); /* Core tmt_rdy should've been canceled */ 1007 GNUNET_break (0); /* Core tmt_rdy should've been canceled */
993 return 0; 1008 return 0;
994 } 1009 }
995 c = queue->c;
996 1010
997 dst_id = GNUNET_PEER_resolve2 (peer->id); 1011 while (NULL != queue && rest >= queue->size)
998 LOG (GNUNET_ERROR_TYPE_DEBUG, " on connection %s %s\n",
999 GCC_2s (c), GC_f2s(queue->fwd));
1000 /* Check if buffer size is enough for the message */
1001 if (queue->size > size)
1002 { 1012 {
1003 LOG (GNUNET_ERROR_TYPE_WARNING, "not enough room (%u vs %u), reissue\n", 1013 c = queue->c;
1004 queue->size, size);
1005 peer->core_transmit =
1006 GNUNET_CORE_notify_transmit_ready (core_handle,
1007 GNUNET_NO, get_priority (queue),
1008 GNUNET_TIME_UNIT_FOREVER_REL,
1009 dst_id,
1010 queue->size,
1011 &queue_send,
1012 peer);
1013 return 0;
1014 }
1015 LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u ok\n", queue->size);
1016 1014
1017 /* Fill buf */ 1015 LOG (GNUNET_ERROR_TYPE_DEBUG, " on connection %s %s\n",
1018 switch (queue->type) 1016 GCC_2s (c), GC_f2s(queue->fwd));
1019 {
1020 case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
1021 pid = GCC_get_pid (queue->c, queue->fwd);
1022 LOG (GNUNET_ERROR_TYPE_DEBUG, " payload ID %u\n", pid);
1023 data_size = send_core_data_raw (queue->cls, size, buf);
1024 ((struct GNUNET_CADET_Encrypted *) buf)->pid = htonl (pid);
1025 break;
1026 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
1027 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
1028 case GNUNET_MESSAGE_TYPE_CADET_KX:
1029 case GNUNET_MESSAGE_TYPE_CADET_ACK:
1030 case GNUNET_MESSAGE_TYPE_CADET_POLL:
1031 LOG (GNUNET_ERROR_TYPE_DEBUG, " raw %s\n", GC_m2s (queue->type));
1032 data_size = send_core_data_raw (queue->cls, size, buf);
1033 break;
1034 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
1035 LOG (GNUNET_ERROR_TYPE_DEBUG, " path create\n");
1036 if (GCC_is_origin (c, GNUNET_YES))
1037 data_size = send_core_connection_create (c, size, buf);
1038 else
1039 data_size = send_core_data_raw (queue->cls, size, buf);
1040 break;
1041 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
1042 LOG (GNUNET_ERROR_TYPE_DEBUG, " path ack\n");
1043 if (GCC_is_origin (c, GNUNET_NO) ||
1044 GCC_is_origin (c, GNUNET_YES))
1045 data_size = send_core_connection_ack (c, size, buf);
1046 else
1047 data_size = send_core_data_raw (queue->cls, size, buf);
1048 break;
1049 case GNUNET_MESSAGE_TYPE_CADET_DATA:
1050 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
1051 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1052 /* This should be encapsulted */
1053 GNUNET_break (0);
1054 data_size = 0;
1055 break;
1056 default:
1057 GNUNET_break (0);
1058 LOG (GNUNET_ERROR_TYPE_WARNING, " type unknown: %u\n", queue->type);
1059 data_size = 0;
1060 }
1061 1017
1062 if (0 < drop_percent && 1018 LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u ok (%u/%u)\n",
1063 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 101) < drop_percent) 1019 queue->size, total_size, size);
1064 { 1020
1065 LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on connection %s %s\n", 1021 /* Fill buf */
1066 GC_m2s (queue->type), GC_m2s (queue->payload_type), queue->payload_id, 1022 switch (queue->type)
1067 GCC_2s (c), GC_f2s (queue->fwd)); 1023 {
1068 data_size = 0; 1024 case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
1069 } 1025 pid = GCC_get_pid (queue->c, queue->fwd);
1070 else 1026 LOG (GNUNET_ERROR_TYPE_DEBUG, " payload ID %u\n", pid);
1071 { 1027 msg_size = send_core_data_raw (queue->cls, size, dst);
1072 LOG (GNUNET_ERROR_TYPE_INFO, 1028 ((struct GNUNET_CADET_Encrypted *) dst)->pid = htonl (pid);
1073 "snd %s (%s %u) on connection %s (%p) %s (size %u)\n", 1029 break;
1074 GC_m2s (queue->type), GC_m2s (queue->payload_type), 1030 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
1075 queue->payload_id, GCC_2s (c), c, GC_f2s (queue->fwd), data_size); 1031 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
1076 } 1032 case GNUNET_MESSAGE_TYPE_CADET_KX:
1033 case GNUNET_MESSAGE_TYPE_CADET_ACK:
1034 case GNUNET_MESSAGE_TYPE_CADET_POLL:
1035 LOG (GNUNET_ERROR_TYPE_DEBUG, " raw %s\n", GC_m2s (queue->type));
1036 msg_size = send_core_data_raw (queue->cls, size, dst);
1037 break;
1038 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
1039 LOG (GNUNET_ERROR_TYPE_DEBUG, " path create\n");
1040 if (GCC_is_origin (c, GNUNET_YES))
1041 msg_size = send_core_connection_create (c, size, dst);
1042 else
1043 msg_size = send_core_data_raw (queue->cls, size, dst);
1044 break;
1045 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
1046 LOG (GNUNET_ERROR_TYPE_DEBUG, " path ack\n");
1047 if (GCC_is_origin (c, GNUNET_NO) ||
1048 GCC_is_origin (c, GNUNET_YES))
1049 msg_size = send_core_connection_ack (c, size, dst);
1050 else
1051 msg_size = send_core_data_raw (queue->cls, size, dst);
1052 break;
1053 case GNUNET_MESSAGE_TYPE_CADET_DATA:
1054 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
1055 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1056 /* This should be encapsulted */
1057 GNUNET_break (0);
1058 msg_size = 0;
1059 break;
1060 default:
1061 GNUNET_break (0);
1062 LOG (GNUNET_ERROR_TYPE_WARNING, " type unknown: %u\n", queue->type);
1063 msg_size = 0;
1064 }
1077 1065
1078 /* Free queue, but cls was freed by send_core_*. */ 1066 if (0 < drop_percent &&
1079 (void) GCP_queue_destroy (queue, GNUNET_NO, GNUNET_YES, pid); 1067 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 101) < drop_percent)
1068 {
1069 LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on connection %s %s\n",
1070 GC_m2s (queue->type), GC_m2s (queue->payload_type), queue->payload_id,
1071 GCC_2s (c), GC_f2s (queue->fwd));
1072 msg_size = 0;
1073 }
1074 else
1075 {
1076 LOG (GNUNET_ERROR_TYPE_INFO,
1077 "snd %s (%s %u) on connection %s (%p) %s (size %u)\n",
1078 GC_m2s (queue->type), GC_m2s (queue->payload_type),
1079 queue->payload_id, GCC_2s (c), c, GC_f2s (queue->fwd), msg_size);
1080 }
1081 total_size += msg_size;
1082 rest -= msg_size;
1083 dst = &dst[msg_size];
1084 msg_size = 0;
1085
1086 /* Free queue, but cls was freed by send_core_*. */
1087 (void) GCP_queue_destroy (queue, GNUNET_NO, GNUNET_YES, pid);
1088
1089 /* Next! */
1090 queue = peer_get_first_message (peer);
1091 }
1080 1092
1081 /* If more data in queue, send next */ 1093 /* If more data in queue, send next */
1082 queue = peer_get_first_message (peer);
1083 if (NULL != queue) 1094 if (NULL != queue)
1084 { 1095 {
1085 LOG (GNUNET_ERROR_TYPE_DEBUG, " more data!\n"); 1096 LOG (GNUNET_ERROR_TYPE_DEBUG, " more data! (%u)\n", queue->size);
1086 if (NULL == peer->core_transmit) 1097 if (NULL == peer->core_transmit)
1087 { 1098 {
1099 dst_id = GNUNET_PEER_resolve2 (peer->id);
1088 peer->core_transmit = 1100 peer->core_transmit =
1089 GNUNET_CORE_notify_transmit_ready (core_handle, 1101 GNUNET_CORE_notify_transmit_ready (core_handle,
1090 GNUNET_NO, get_priority (queue), 1102 GNUNET_NO, get_priority (queue),
@@ -1093,12 +1105,12 @@ queue_send (void *cls, size_t size, void *buf)
1093 queue->size, 1105 queue->size,
1094 &queue_send, 1106 &queue_send,
1095 peer); 1107 peer);
1108 peer->tmt_time = GNUNET_TIME_absolute_get ();
1096 queue->start_waiting = GNUNET_TIME_absolute_get (); 1109 queue->start_waiting = GNUNET_TIME_absolute_get ();
1097 } 1110 }
1098 else 1111 else
1099 { 1112 {
1100 LOG (GNUNET_ERROR_TYPE_DEBUG, 1113 LOG (GNUNET_ERROR_TYPE_DEBUG, "* tmt rdy called somewhere else\n");
1101 "* tmt rdy called somewhere else\n");
1102 } 1114 }
1103// GCC_start_poll (); FIXME needed? 1115// GCC_start_poll (); FIXME needed?
1104 } 1116 }
@@ -1107,9 +1119,10 @@ queue_send (void *cls, size_t size, void *buf)
1107// GCC_stop_poll(); FIXME needed? 1119// GCC_stop_poll(); FIXME needed?
1108 } 1120 }
1109 1121
1110 LOG (GNUNET_ERROR_TYPE_DEBUG, " return %d\n", data_size); 1122 LOG (GNUNET_ERROR_TYPE_DEBUG, " return %d\n", total_size);
1111 queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG); 1123 queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG);
1112 return data_size; 1124
1125 return total_size;
1113} 1126}
1114 1127
1115 1128
@@ -1178,8 +1191,9 @@ GCP_queue_destroy (struct CadetPeerQueue *queue, int clear_cls,
1178 { 1191 {
1179 struct GNUNET_TIME_Relative core_wait_time; 1192 struct GNUNET_TIME_Relative core_wait_time;
1180 1193
1181 LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback\n"); 1194 core_wait_time = GNUNET_TIME_absolute_get_duration (peer->tmt_time);
1182 core_wait_time = GNUNET_TIME_absolute_get_duration (queue->start_waiting); 1195 LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback, time elapsed %s\n",
1196 GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_NO));
1183 connection_destroyed = queue->callback (queue->callback_cls, 1197 connection_destroyed = queue->callback (queue->callback_cls,
1184 queue->c, sent, queue->type, pid, 1198 queue->c, sent, queue->type, pid,
1185 queue->fwd, queue->size, 1199 queue->fwd, queue->size,
@@ -1194,6 +1208,7 @@ GCP_queue_destroy (struct CadetPeerQueue *queue, int clear_cls,
1194 { 1208 {
1195 GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); 1209 GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
1196 peer->core_transmit = NULL; 1210 peer->core_transmit = NULL;
1211 peer->tmt_time.abs_value_us = 0;
1197 } 1212 }
1198 1213
1199 GNUNET_free (queue); 1214 GNUNET_free (queue);
@@ -1282,6 +1297,7 @@ GCP_queue_add (struct CadetPeer *peer, void *cls, uint16_t type,
1282 call_core = GNUNET_YES; 1297 call_core = GNUNET_YES;
1283 } 1298 }
1284 1299
1300 q->start_waiting = GNUNET_TIME_absolute_get ();
1285 if (NULL == peer->core_transmit && GNUNET_YES == call_core) 1301 if (NULL == peer->core_transmit && GNUNET_YES == call_core)
1286 { 1302 {
1287 LOG (GNUNET_ERROR_TYPE_DEBUG, 1303 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1293,7 +1309,7 @@ GCP_queue_add (struct CadetPeer *peer, void *cls, uint16_t type,
1293 GNUNET_TIME_UNIT_FOREVER_REL, 1309 GNUNET_TIME_UNIT_FOREVER_REL,
1294 GNUNET_PEER_resolve2 (peer->id), 1310 GNUNET_PEER_resolve2 (peer->id),
1295 size, &queue_send, peer); 1311 size, &queue_send, peer);
1296 q->start_waiting = GNUNET_TIME_absolute_get (); 1312 peer->tmt_time = GNUNET_TIME_absolute_get ();
1297 } 1313 }
1298 else if (GNUNET_NO == call_core) 1314 else if (GNUNET_NO == call_core)
1299 { 1315 {
@@ -1303,8 +1319,11 @@ GCP_queue_add (struct CadetPeer *peer, void *cls, uint16_t type,
1303 } 1319 }
1304 else 1320 else
1305 { 1321 {
1306 LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s already called\n", 1322 struct GNUNET_TIME_Relative elapsed;
1307 GCP_2s (peer)); 1323 elapsed = GNUNET_TIME_absolute_get_duration (peer->tmt_time);
1324 LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s already called %s\n",
1325 GCP_2s (peer),
1326 GNUNET_STRINGS_relative_time_to_string (elapsed, GNUNET_NO));
1308 1327
1309 } 1328 }
1310 queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG); 1329 queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG);
@@ -1362,6 +1381,7 @@ GCP_queue_cancel (struct CadetPeer *peer, struct CadetConnection *c)
1362 { 1381 {
1363 GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); 1382 GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
1364 peer->core_transmit = NULL; 1383 peer->core_transmit = NULL;
1384 peer->tmt_time.abs_value_us = 0;
1365 } 1385 }
1366} 1386}
1367 1387
@@ -1502,6 +1522,7 @@ GCP_queue_unlock (struct CadetPeer *peer, struct CadetConnection *c)
1502 size, 1522 size,
1503 &queue_send, 1523 &queue_send,
1504 peer); 1524 peer);
1525 peer->tmt_time = GNUNET_TIME_absolute_get ();
1505} 1526}
1506 1527
1507 1528