diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/transport/plugin_transport_tcp.c | 310 | ||||
-rw-r--r-- | src/transport/plugin_transport_udp.c | 1 | ||||
-rw-r--r-- | src/transport/plugin_transport_unix.c | 1 |
3 files changed, 240 insertions, 72 deletions
diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c index 4d9ab1b41..f3791c138 100644 --- a/src/transport/plugin_transport_tcp.c +++ b/src/transport/plugin_transport_tcp.c | |||
@@ -272,6 +272,11 @@ struct Session | |||
272 | struct GNUNET_SERVER_TransmitHandle *transmit_handle; | 272 | struct GNUNET_SERVER_TransmitHandle *transmit_handle; |
273 | 273 | ||
274 | /** | 274 | /** |
275 | * Address of the other peer. | ||
276 | */ | ||
277 | struct GNUNET_HELLO_Address *address; | ||
278 | |||
279 | /** | ||
275 | * ID of task used to delay receiving more to throttle sender. | 280 | * ID of task used to delay receiving more to throttle sender. |
276 | */ | 281 | */ |
277 | GNUNET_SCHEDULER_TaskIdentifier receive_delay_task; | 282 | GNUNET_SCHEDULER_TaskIdentifier receive_delay_task; |
@@ -281,19 +286,11 @@ struct Session | |||
281 | */ | 286 | */ |
282 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | 287 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; |
283 | 288 | ||
284 | struct GNUNET_HELLO_Address *address; | ||
285 | |||
286 | /** | ||
287 | * Address of the other peer (either based on our 'connect' | ||
288 | * call or on our 'accept' call). | ||
289 | * | ||
290 | * struct IPv4TcpAddress or struct IPv6TcpAddress | ||
291 | */ | ||
292 | //void *addr; | ||
293 | /** | 289 | /** |
294 | * Length of @e addr. | 290 | * When will this session time out? |
295 | */ | 291 | */ |
296 | //size_t addrlen; | 292 | struct GNUNET_TIME_Absolute timeout; |
293 | |||
297 | /** | 294 | /** |
298 | * Last activity on this connection. Used to select preferred | 295 | * Last activity on this connection. Used to select preferred |
299 | * connection. | 296 | * connection. |
@@ -301,6 +298,16 @@ struct Session | |||
301 | struct GNUNET_TIME_Absolute last_activity; | 298 | struct GNUNET_TIME_Absolute last_activity; |
302 | 299 | ||
303 | /** | 300 | /** |
301 | * Number of bytes waiting for transmission to this peer. | ||
302 | */ | ||
303 | unsigned long long bytes_in_queue; | ||
304 | |||
305 | /** | ||
306 | * Number of messages waiting for transmission to this peer. | ||
307 | */ | ||
308 | unsigned int msgs_in_queue; | ||
309 | |||
310 | /** | ||
304 | * Are we still expecting the welcome message? (#GNUNET_YES/#GNUNET_NO) | 311 | * Are we still expecting the welcome message? (#GNUNET_YES/#GNUNET_NO) |
305 | */ | 312 | */ |
306 | int expecting_welcome; | 313 | int expecting_welcome; |
@@ -378,6 +385,16 @@ struct Plugin | |||
378 | struct GNUNET_RESOLVER_RequestHandle *ext_dns; | 385 | struct GNUNET_RESOLVER_RequestHandle *ext_dns; |
379 | 386 | ||
380 | /** | 387 | /** |
388 | * Function to call about session status changes. | ||
389 | */ | ||
390 | GNUNET_TRANSPORT_SessionInfoCallback sic; | ||
391 | |||
392 | /** | ||
393 | * Closure for @e sic. | ||
394 | */ | ||
395 | void *sic_cls; | ||
396 | |||
397 | /** | ||
381 | * How many more TCP sessions are we allowed to open right now? | 398 | * How many more TCP sessions are we allowed to open right now? |
382 | */ | 399 | */ |
383 | unsigned long long max_connections; | 400 | unsigned long long max_connections; |
@@ -410,6 +427,40 @@ struct Plugin | |||
410 | 427 | ||
411 | }; | 428 | }; |
412 | 429 | ||
430 | |||
431 | /** | ||
432 | * If a session monitor is attached, notify it about the new | ||
433 | * session state. | ||
434 | * | ||
435 | * @param plugin our plugin | ||
436 | * @param session session that changed state | ||
437 | * @param state new state of the session | ||
438 | */ | ||
439 | static void | ||
440 | notify_session_monitor (struct Plugin *plugin, | ||
441 | struct Session *session, | ||
442 | enum GNUNET_TRANSPORT_SessionState state) | ||
443 | { | ||
444 | struct GNUNET_TRANSPORT_SessionInfo info; | ||
445 | |||
446 | if (NULL == plugin->sic) | ||
447 | return; | ||
448 | memset (&info, 0, sizeof (info)); | ||
449 | info.state = state; | ||
450 | info.is_inbound = GNUNET_SYSERR; /* hard to say */ | ||
451 | info.num_msg_pending = session->msgs_in_queue; | ||
452 | info.num_bytes_pending = session->bytes_in_queue; | ||
453 | /* info.receive_delay remains zero as this is not supported by UDP | ||
454 | (cannot selectively not receive from 'some' peer while continuing | ||
455 | to receive from others) */ | ||
456 | info.session_timeout = session->timeout; | ||
457 | info.address = session->address; | ||
458 | plugin->sic (plugin->sic_cls, | ||
459 | session, | ||
460 | &info); | ||
461 | } | ||
462 | |||
463 | |||
413 | /** | 464 | /** |
414 | * Function called for a quick conversion of the binary address to | 465 | * Function called for a quick conversion of the binary address to |
415 | * a numeric address. Note that the caller must not free the | 466 | * a numeric address. Note that the caller must not free the |
@@ -780,6 +831,7 @@ tcp_disconnect_session (void *cls, | |||
780 | { | 831 | { |
781 | GNUNET_SCHEDULER_cancel (session->timeout_task); | 832 | GNUNET_SCHEDULER_cancel (session->timeout_task); |
782 | session->timeout_task = GNUNET_SCHEDULER_NO_TASK; | 833 | session->timeout_task = GNUNET_SCHEDULER_NO_TASK; |
834 | session->timeout = GNUNET_TIME_UNIT_ZERO_ABS; | ||
783 | } | 835 | } |
784 | 836 | ||
785 | if (GNUNET_YES | 837 | if (GNUNET_YES |
@@ -825,16 +877,26 @@ tcp_disconnect_session (void *cls, | |||
825 | : "Could not deliver message to `%4s', notifying.\n", | 877 | : "Could not deliver message to `%4s', notifying.\n", |
826 | GNUNET_i2s (&session->target)); | 878 | GNUNET_i2s (&session->target)); |
827 | GNUNET_STATISTICS_update (session->plugin->env->stats, | 879 | GNUNET_STATISTICS_update (session->plugin->env->stats, |
828 | gettext_noop ("# bytes currently in TCP buffers"), | 880 | gettext_noop ("# bytes currently in TCP buffers"), |
829 | -(int64_t) pm->message_size, GNUNET_NO); | 881 | -(int64_t) pm->message_size, GNUNET_NO); |
830 | GNUNET_STATISTICS_update (session->plugin->env->stats, gettext_noop | 882 | GNUNET_STATISTICS_update (session->plugin->env->stats, |
831 | ("# bytes discarded by TCP (disconnect)"), pm->message_size, GNUNET_NO); | 883 | gettext_noop ("# bytes discarded by TCP (disconnect)"), |
832 | GNUNET_CONTAINER_DLL_remove(session->pending_messages_head, | 884 | pm->message_size, |
833 | session->pending_messages_tail, pm); | 885 | GNUNET_NO); |
886 | GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, | ||
887 | session->pending_messages_tail, | ||
888 | pm); | ||
889 | GNUNET_assert (0 < session->msgs_in_queue); | ||
890 | session->msgs_in_queue--; | ||
891 | GNUNET_assert (pm->message_size <= session->bytes_in_queue); | ||
892 | session->bytes_in_queue -= pm->message_size; | ||
834 | if (NULL != pm->transmit_cont) | 893 | if (NULL != pm->transmit_cont) |
835 | pm->transmit_cont (pm->transmit_cont_cls, &session->target, GNUNET_SYSERR, | 894 | pm->transmit_cont (pm->transmit_cont_cls, |
836 | pm->message_size, 0); | 895 | &session->target, |
837 | GNUNET_free(pm); | 896 | GNUNET_SYSERR, |
897 | pm->message_size, | ||
898 | 0); | ||
899 | GNUNET_free (pm); | ||
838 | } | 900 | } |
839 | if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK ) | 901 | if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK ) |
840 | { | 902 | { |
@@ -881,10 +943,25 @@ session_timeout (void *cls, | |||
881 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 943 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
882 | { | 944 | { |
883 | struct Session *s = cls; | 945 | struct Session *s = cls; |
946 | struct GNUNET_TIME_Relative left; | ||
884 | 947 | ||
885 | s->timeout_task = GNUNET_SCHEDULER_NO_TASK; | 948 | s->timeout_task = GNUNET_SCHEDULER_NO_TASK; |
949 | left = GNUNET_TIME_absolute_get_remaining (s->timeout); | ||
950 | if (0 != left.rel_value_us) | ||
951 | { | ||
952 | /* not actually our turn yet, but let's at least update | ||
953 | the monitor, it may think we're about to die ... */ | ||
954 | notify_session_monitor (s->plugin, | ||
955 | s, | ||
956 | GNUNET_TRANSPORT_SS_UP); | ||
957 | s->timeout_task = GNUNET_SCHEDULER_add_delayed (left, | ||
958 | &session_timeout, | ||
959 | s); | ||
960 | return; | ||
961 | } | ||
886 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 962 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, |
887 | "Session %p was idle for %s, disconnecting\n", s, | 963 | "Session %p was idle for %s, disconnecting\n", |
964 | s, | ||
888 | GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | 965 | GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, |
889 | GNUNET_YES)); | 966 | GNUNET_YES)); |
890 | /* call session destroy function */ | 967 | /* call session destroy function */ |
@@ -901,14 +978,10 @@ static void | |||
901 | reschedule_session_timeout (struct Session *s) | 978 | reschedule_session_timeout (struct Session *s) |
902 | { | 979 | { |
903 | GNUNET_assert(GNUNET_SCHEDULER_NO_TASK != s->timeout_task); | 980 | GNUNET_assert(GNUNET_SCHEDULER_NO_TASK != s->timeout_task); |
904 | GNUNET_SCHEDULER_cancel (s->timeout_task); | 981 | s->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); |
905 | s->timeout_task = GNUNET_SCHEDULER_add_delayed ( | ||
906 | GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, s); | ||
907 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | ||
908 | "Timeout rescheduled for session %p set to %s\n", s, | ||
909 | GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, GNUNET_YES)); | ||
910 | } | 982 | } |
911 | 983 | ||
984 | |||
912 | /** | 985 | /** |
913 | * Create a new session. Also queues a welcome message. | 986 | * Create a new session. Also queues a welcome message. |
914 | * | 987 | * |
@@ -959,8 +1032,11 @@ create_session (struct Plugin *plugin, | |||
959 | GNUNET_STATISTICS_update (plugin->env->stats, | 1032 | GNUNET_STATISTICS_update (plugin->env->stats, |
960 | gettext_noop ("# bytes currently in TCP buffers"), pm->message_size, | 1033 | gettext_noop ("# bytes currently in TCP buffers"), pm->message_size, |
961 | GNUNET_NO); | 1034 | GNUNET_NO); |
962 | GNUNET_CONTAINER_DLL_insert(session->pending_messages_head, | 1035 | GNUNET_CONTAINER_DLL_insert (session->pending_messages_head, |
963 | session->pending_messages_tail, pm); | 1036 | session->pending_messages_tail, |
1037 | pm); | ||
1038 | session->msgs_in_queue++; | ||
1039 | session->bytes_in_queue += pm->message_size; | ||
964 | if (GNUNET_YES != is_nat) | 1040 | if (GNUNET_YES != is_nat) |
965 | { | 1041 | { |
966 | GNUNET_STATISTICS_update (plugin->env->stats, | 1042 | GNUNET_STATISTICS_update (plugin->env->stats, |
@@ -968,8 +1044,10 @@ create_session (struct Plugin *plugin, | |||
968 | } | 1044 | } |
969 | plugin->env->register_quota_notification (plugin->env->cls, | 1045 | plugin->env->register_quota_notification (plugin->env->cls, |
970 | &address->peer, PLUGIN_NAME, session); | 1046 | &address->peer, PLUGIN_NAME, session); |
971 | session->timeout_task = GNUNET_SCHEDULER_add_delayed ( | 1047 | session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); |
972 | GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, session); | 1048 | session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, |
1049 | &session_timeout, | ||
1050 | session); | ||
973 | return session; | 1051 | return session; |
974 | } | 1052 | } |
975 | 1053 | ||
@@ -1012,9 +1090,9 @@ do_transmit (void *cls, size_t size, void *buf) | |||
1012 | plugin = session->plugin; | 1090 | plugin = session->plugin; |
1013 | if (NULL == buf) | 1091 | if (NULL == buf) |
1014 | { | 1092 | { |
1015 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1093 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1016 | "Timeout trying to transmit to peer `%4s', discarding message queue.\n", | 1094 | "Timeout trying to transmit to peer `%4s', discarding message queue.\n", |
1017 | GNUNET_i2s (&session->target)); | 1095 | GNUNET_i2s (&session->target)); |
1018 | /* timeout; cancel all messages that have already expired */ | 1096 | /* timeout; cancel all messages that have already expired */ |
1019 | hd = NULL; | 1097 | hd = NULL; |
1020 | tl = NULL; | 1098 | tl = NULL; |
@@ -1023,13 +1101,19 @@ do_transmit (void *cls, size_t size, void *buf) | |||
1023 | while ((NULL != (pos = session->pending_messages_head)) | 1101 | while ((NULL != (pos = session->pending_messages_head)) |
1024 | && (pos->timeout.abs_value_us <= now.abs_value_us)) | 1102 | && (pos->timeout.abs_value_us <= now.abs_value_us)) |
1025 | { | 1103 | { |
1026 | GNUNET_CONTAINER_DLL_remove(session->pending_messages_head, | 1104 | GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, |
1027 | session->pending_messages_tail, pos); | 1105 | session->pending_messages_tail, |
1028 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 1106 | pos); |
1029 | "Failed to transmit %u byte message to `%4s'.\n", pos->message_size, | 1107 | GNUNET_assert (0 < session->msgs_in_queue); |
1030 | GNUNET_i2s (&session->target)); | 1108 | session->msgs_in_queue--; |
1109 | GNUNET_assert (pos->message_size <= session->bytes_in_queue); | ||
1110 | session->bytes_in_queue -= pos->message_size; | ||
1111 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1112 | "Failed to transmit %u byte message to `%4s'.\n", | ||
1113 | pos->message_size, | ||
1114 | GNUNET_i2s (&session->target)); | ||
1031 | ret += pos->message_size; | 1115 | ret += pos->message_size; |
1032 | GNUNET_CONTAINER_DLL_insert_after(hd, tl, tl, pos); | 1116 | GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos); |
1033 | } | 1117 | } |
1034 | /* do this call before callbacks (so that if callbacks destroy | 1118 | /* do this call before callbacks (so that if callbacks destroy |
1035 | * session, they have a chance to cancel actions done by this | 1119 | * session, they have a chance to cancel actions done by this |
@@ -1040,10 +1124,12 @@ do_transmit (void *cls, size_t size, void *buf) | |||
1040 | * the callbacks may abort the session */ | 1124 | * the callbacks may abort the session */ |
1041 | while (NULL != (pos = hd)) | 1125 | while (NULL != (pos = hd)) |
1042 | { | 1126 | { |
1043 | GNUNET_CONTAINER_DLL_remove(hd, tl, pos); | 1127 | GNUNET_CONTAINER_DLL_remove (hd, tl, pos); |
1044 | if (pos->transmit_cont != NULL ) | 1128 | if (pos->transmit_cont != NULL) |
1045 | pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_SYSERR, | 1129 | pos->transmit_cont (pos->transmit_cont_cls, |
1046 | pos->message_size, 0); | 1130 | &pid, |
1131 | GNUNET_SYSERR, | ||
1132 | pos->message_size, 0); | ||
1047 | GNUNET_free(pos); | 1133 | GNUNET_free(pos); |
1048 | } | 1134 | } |
1049 | GNUNET_STATISTICS_update (plugin->env->stats, | 1135 | GNUNET_STATISTICS_update (plugin->env->stats, |
@@ -1062,10 +1148,16 @@ do_transmit (void *cls, size_t size, void *buf) | |||
1062 | { | 1148 | { |
1063 | if (ret + pos->message_size > size) | 1149 | if (ret + pos->message_size > size) |
1064 | break; | 1150 | break; |
1065 | GNUNET_CONTAINER_DLL_remove(session->pending_messages_head, | 1151 | GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, |
1066 | session->pending_messages_tail, pos); | 1152 | session->pending_messages_tail, |
1153 | pos); | ||
1154 | GNUNET_assert (0 < session->msgs_in_queue); | ||
1155 | session->msgs_in_queue--; | ||
1156 | GNUNET_assert (pos->message_size <= session->bytes_in_queue); | ||
1157 | session->bytes_in_queue -= pos->message_size; | ||
1067 | GNUNET_assert(size >= pos->message_size); | 1158 | GNUNET_assert(size >= pos->message_size); |
1068 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Transmitting message of type %u size %u\n", | 1159 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
1160 | "Transmitting message of type %u size %u\n", | ||
1069 | ntohs (((struct GNUNET_MessageHeader * ) pos->msg)->type), | 1161 | ntohs (((struct GNUNET_MessageHeader * ) pos->msg)->type), |
1070 | pos->message_size); | 1162 | pos->message_size); |
1071 | /* FIXME: this memcpy can be up to 7% of our total runtime */ | 1163 | /* FIXME: this memcpy can be up to 7% of our total runtime */ |
@@ -1073,7 +1165,7 @@ do_transmit (void *cls, size_t size, void *buf) | |||
1073 | cbuf += pos->message_size; | 1165 | cbuf += pos->message_size; |
1074 | ret += pos->message_size; | 1166 | ret += pos->message_size; |
1075 | size -= pos->message_size; | 1167 | size -= pos->message_size; |
1076 | GNUNET_CONTAINER_DLL_insert_tail(hd, tl, pos); | 1168 | GNUNET_CONTAINER_DLL_insert_tail (hd, tl, pos); |
1077 | } | 1169 | } |
1078 | /* schedule 'continuation' before callbacks so that callbacks that | 1170 | /* schedule 'continuation' before callbacks so that callbacks that |
1079 | * cancel everything don't cause us to use a session that no longer | 1171 | * cancel everything don't cause us to use a session that no longer |
@@ -1085,10 +1177,13 @@ do_transmit (void *cls, size_t size, void *buf) | |||
1085 | * we should not use 'session' after this point */ | 1177 | * we should not use 'session' after this point */ |
1086 | while (NULL != (pos = hd)) | 1178 | while (NULL != (pos = hd)) |
1087 | { | 1179 | { |
1088 | GNUNET_CONTAINER_DLL_remove(hd, tl, pos); | 1180 | GNUNET_CONTAINER_DLL_remove (hd, tl, pos); |
1089 | if (pos->transmit_cont != NULL ) | 1181 | if (pos->transmit_cont != NULL) |
1090 | pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_OK, | 1182 | pos->transmit_cont (pos->transmit_cont_cls, |
1091 | pos->message_size, pos->message_size); /* FIXME: include TCP overhead */ | 1183 | &pid, |
1184 | GNUNET_OK, | ||
1185 | pos->message_size, | ||
1186 | pos->message_size); /* FIXME: include TCP overhead */ | ||
1092 | GNUNET_free(pos); | 1187 | GNUNET_free(pos); |
1093 | } | 1188 | } |
1094 | GNUNET_assert(hd == NULL); | 1189 | GNUNET_assert(hd == NULL); |
@@ -1253,11 +1348,12 @@ tcp_plugin_send (void *cls, struct Session *session, const char *msgbuf, | |||
1253 | "Asked to transmit %u bytes to `%s', added message to list.\n", | 1348 | "Asked to transmit %u bytes to `%s', added message to list.\n", |
1254 | msgbuf_size, GNUNET_i2s (&session->target)); | 1349 | msgbuf_size, GNUNET_i2s (&session->target)); |
1255 | 1350 | ||
1256 | if (GNUNET_YES | 1351 | if (GNUNET_YES == |
1257 | == GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessionmap, | 1352 | GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessionmap, |
1258 | &session->target, session)) | 1353 | &session->target, |
1354 | session)) | ||
1259 | { | 1355 | { |
1260 | GNUNET_assert(NULL != session->client); | 1356 | GNUNET_assert (NULL != session->client); |
1261 | GNUNET_SERVER_client_set_timeout (session->client, | 1357 | GNUNET_SERVER_client_set_timeout (session->client, |
1262 | GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); | 1358 | GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); |
1263 | GNUNET_STATISTICS_update (plugin->env->stats, | 1359 | GNUNET_STATISTICS_update (plugin->env->stats, |
@@ -1265,9 +1361,11 @@ tcp_plugin_send (void *cls, struct Session *session, const char *msgbuf, | |||
1265 | GNUNET_NO); | 1361 | GNUNET_NO); |
1266 | 1362 | ||
1267 | /* append pm to pending_messages list */ | 1363 | /* append pm to pending_messages list */ |
1268 | GNUNET_CONTAINER_DLL_insert_tail(session->pending_messages_head, | 1364 | GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head, |
1269 | session->pending_messages_tail, pm); | 1365 | session->pending_messages_tail, |
1270 | 1366 | pm); | |
1367 | session->msgs_in_queue++; | ||
1368 | session->bytes_in_queue += pm->message_size; | ||
1271 | process_pending_messages (session); | 1369 | process_pending_messages (session); |
1272 | return msgbuf_size; | 1370 | return msgbuf_size; |
1273 | } | 1371 | } |
@@ -1283,17 +1381,25 @@ tcp_plugin_send (void *cls, struct Session *session, const char *msgbuf, | |||
1283 | GNUNET_NO); | 1381 | GNUNET_NO); |
1284 | 1382 | ||
1285 | /* append pm to pending_messages list */ | 1383 | /* append pm to pending_messages list */ |
1286 | GNUNET_CONTAINER_DLL_insert_tail(session->pending_messages_head, | 1384 | GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head, |
1287 | session->pending_messages_tail, pm); | 1385 | session->pending_messages_tail, |
1386 | pm); | ||
1387 | session->msgs_in_queue++; | ||
1388 | session->bytes_in_queue += pm->message_size; | ||
1288 | return msgbuf_size; | 1389 | return msgbuf_size; |
1289 | } | 1390 | } |
1290 | else | 1391 | else |
1291 | { | 1392 | { |
1292 | LOG(GNUNET_ERROR_TYPE_ERROR, "Invalid session %p\n", session); | 1393 | LOG(GNUNET_ERROR_TYPE_ERROR, |
1394 | "Invalid session %p\n", session); | ||
1293 | if (NULL != cont) | 1395 | if (NULL != cont) |
1294 | cont (cont_cls, &session->target, GNUNET_SYSERR, pm->message_size, 0); | 1396 | cont (cont_cls, |
1295 | GNUNET_break(0); | 1397 | &session->target, |
1296 | GNUNET_free(pm); | 1398 | GNUNET_SYSERR, |
1399 | pm->message_size, | ||
1400 | 0); | ||
1401 | GNUNET_break (0); | ||
1402 | GNUNET_free (pm); | ||
1297 | return GNUNET_SYSERR; /* session does not exist here */ | 1403 | return GNUNET_SYSERR; /* session does not exist here */ |
1298 | } | 1404 | } |
1299 | } | 1405 | } |
@@ -2346,9 +2452,10 @@ notify_send_probe (void *cls, | |||
2346 | size_t ret; | 2452 | size_t ret; |
2347 | 2453 | ||
2348 | tcp_probe_ctx->transmit_handle = NULL; | 2454 | tcp_probe_ctx->transmit_handle = NULL; |
2349 | GNUNET_CONTAINER_DLL_remove(plugin->probe_head, plugin->probe_tail, | 2455 | GNUNET_CONTAINER_DLL_remove (plugin->probe_head, |
2350 | tcp_probe_ctx); | 2456 | plugin->probe_tail, |
2351 | if (buf == NULL ) | 2457 | tcp_probe_ctx); |
2458 | if (buf == NULL) | ||
2352 | { | 2459 | { |
2353 | GNUNET_CONNECTION_destroy (tcp_probe_ctx->sock); | 2460 | GNUNET_CONNECTION_destroy (tcp_probe_ctx->sock); |
2354 | GNUNET_free(tcp_probe_ctx); | 2461 | GNUNET_free(tcp_probe_ctx); |
@@ -2402,8 +2509,9 @@ try_connection_reversal (void *cls, | |||
2402 | sizeof(struct GNUNET_PeerIdentity)); | 2509 | sizeof(struct GNUNET_PeerIdentity)); |
2403 | tcp_probe_ctx->plugin = plugin; | 2510 | tcp_probe_ctx->plugin = plugin; |
2404 | tcp_probe_ctx->sock = sock; | 2511 | tcp_probe_ctx->sock = sock; |
2405 | GNUNET_CONTAINER_DLL_insert(plugin->probe_head, plugin->probe_tail, | 2512 | GNUNET_CONTAINER_DLL_insert (plugin->probe_head, |
2406 | tcp_probe_ctx); | 2513 | plugin->probe_tail, |
2514 | tcp_probe_ctx); | ||
2407 | tcp_probe_ctx->transmit_handle = GNUNET_CONNECTION_notify_transmit_ready ( | 2515 | tcp_probe_ctx->transmit_handle = GNUNET_CONNECTION_notify_transmit_ready ( |
2408 | sock, ntohs (tcp_probe_ctx->message.header.size), | 2516 | sock, ntohs (tcp_probe_ctx->message.header.size), |
2409 | GNUNET_TIME_UNIT_FOREVER_REL, ¬ify_send_probe, tcp_probe_ctx); | 2517 | GNUNET_TIME_UNIT_FOREVER_REL, ¬ify_send_probe, tcp_probe_ctx); |
@@ -2433,6 +2541,62 @@ tcp_get_network (void *cls, | |||
2433 | 2541 | ||
2434 | 2542 | ||
2435 | /** | 2543 | /** |
2544 | * Return information about the given session to the | ||
2545 | * monitor callback. | ||
2546 | * | ||
2547 | * @param cls the `struct Plugin` with the monitor callback (`sic`) | ||
2548 | * @param peer peer we send information about | ||
2549 | * @param value our `struct Session` to send information about | ||
2550 | * @return #GNUNET_OK (continue to iterate) | ||
2551 | */ | ||
2552 | static int | ||
2553 | send_session_info_iter (void *cls, | ||
2554 | const struct GNUNET_PeerIdentity *peer, | ||
2555 | void *value) | ||
2556 | { | ||
2557 | struct Plugin *plugin = cls; | ||
2558 | struct Session *session = value; | ||
2559 | |||
2560 | notify_session_monitor (plugin, | ||
2561 | session, | ||
2562 | GNUNET_TRANSPORT_SS_UP); | ||
2563 | return GNUNET_OK; | ||
2564 | } | ||
2565 | |||
2566 | |||
2567 | /** | ||
2568 | * Begin monitoring sessions of a plugin. There can only | ||
2569 | * be one active monitor per plugin (i.e. if there are | ||
2570 | * multiple monitors, the transport service needs to | ||
2571 | * multiplex the generated events over all of them). | ||
2572 | * | ||
2573 | * @param cls closure of the plugin | ||
2574 | * @param sic callback to invoke, NULL to disable monitor; | ||
2575 | * plugin will being by iterating over all active | ||
2576 | * sessions immediately and then enter monitor mode | ||
2577 | * @param sic_cls closure for @a sic | ||
2578 | */ | ||
2579 | static void | ||
2580 | tcp_plugin_setup_monitor (void *cls, | ||
2581 | GNUNET_TRANSPORT_SessionInfoCallback sic, | ||
2582 | void *sic_cls) | ||
2583 | { | ||
2584 | struct Plugin *plugin = cls; | ||
2585 | |||
2586 | plugin->sic = sic; | ||
2587 | plugin->sic_cls = sic_cls; | ||
2588 | if (NULL != sic) | ||
2589 | { | ||
2590 | GNUNET_CONTAINER_multipeermap_iterate (plugin->sessionmap, | ||
2591 | &send_session_info_iter, | ||
2592 | plugin); | ||
2593 | /* signal end of first iteration */ | ||
2594 | sic (sic_cls, NULL, NULL); | ||
2595 | } | ||
2596 | } | ||
2597 | |||
2598 | |||
2599 | /** | ||
2436 | * Entry point for the plugin. | 2600 | * Entry point for the plugin. |
2437 | * | 2601 | * |
2438 | * @param cls closure, the 'struct GNUNET_TRANSPORT_PluginEnvironment*' | 2602 | * @param cls closure, the 'struct GNUNET_TRANSPORT_PluginEnvironment*' |
@@ -2569,6 +2733,7 @@ libgnunet_plugin_transport_tcp_init (void *cls) | |||
2569 | api->get_network = &tcp_get_network; | 2733 | api->get_network = &tcp_get_network; |
2570 | api->update_session_timeout = &tcp_plugin_update_session_timeout; | 2734 | api->update_session_timeout = &tcp_plugin_update_session_timeout; |
2571 | api->update_inbound_delay = &tcp_plugin_update_inbound_delay; | 2735 | api->update_inbound_delay = &tcp_plugin_update_inbound_delay; |
2736 | api->setup_monitor = &tcp_plugin_setup_monitor; | ||
2572 | plugin->service = service; | 2737 | plugin->service = service; |
2573 | if (NULL != service) | 2738 | if (NULL != service) |
2574 | { | 2739 | { |
@@ -2678,8 +2843,9 @@ libgnunet_plugin_transport_tcp_done (void *cls) | |||
2678 | GNUNET_NAT_unregister (plugin->nat); | 2843 | GNUNET_NAT_unregister (plugin->nat); |
2679 | while (NULL != (tcp_probe = plugin->probe_head)) | 2844 | while (NULL != (tcp_probe = plugin->probe_head)) |
2680 | { | 2845 | { |
2681 | GNUNET_CONTAINER_DLL_remove(plugin->probe_head, plugin->probe_tail, | 2846 | GNUNET_CONTAINER_DLL_remove (plugin->probe_head, |
2682 | tcp_probe); | 2847 | plugin->probe_tail, |
2848 | tcp_probe); | ||
2683 | GNUNET_CONNECTION_destroy (tcp_probe->sock); | 2849 | GNUNET_CONNECTION_destroy (tcp_probe->sock); |
2684 | GNUNET_free(tcp_probe); | 2850 | GNUNET_free(tcp_probe); |
2685 | } | 2851 | } |
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index 782a5ea8f..e9d6b54d4 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c | |||
@@ -1573,6 +1573,7 @@ create_session (struct Plugin *plugin, | |||
1573 | s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS; | 1573 | s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS; |
1574 | s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS; | 1574 | s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS; |
1575 | s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO; | 1575 | s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO; |
1576 | s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT); | ||
1576 | s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT, | 1577 | s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT, |
1577 | &session_timeout, s); | 1578 | &session_timeout, s); |
1578 | return s; | 1579 | return s; |
diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c index 0c3d29733..9ca58ef51 100644 --- a/src/transport/plugin_transport_unix.c +++ b/src/transport/plugin_transport_unix.c | |||
@@ -886,6 +886,7 @@ unix_plugin_get_session (void *cls, | |||
886 | session->target = address->peer; | 886 | session->target = address->peer; |
887 | session->address = GNUNET_HELLO_address_copy (address); | 887 | session->address = GNUNET_HELLO_address_copy (address); |
888 | session->plugin = plugin; | 888 | session->plugin = plugin; |
889 | session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); | ||
889 | session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | 890 | session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, |
890 | &session_timeout, | 891 | &session_timeout, |
891 | session); | 892 | session); |