diff options
author | Sree Harsha Totakura <totakura@in.tum.de> | 2012-10-04 08:49:12 +0000 |
---|---|---|
committer | Sree Harsha Totakura <totakura@in.tum.de> | 2012-10-04 08:49:12 +0000 |
commit | e0bb43c1f61ba02d19733fc4c3f09758d0b04029 (patch) | |
tree | 87e33f776390f39daec2e5cdfd5592d2de6e67e7 /src/stream | |
parent | 7b32b05be57fd86b25dab886c363c998515d107a (diff) | |
download | gnunet-e0bb43c1f61ba02d19733fc4c3f09758d0b04029.tar.gz gnunet-e0bb43c1f61ba02d19733fc4c3f09758d0b04029.zip |
moved read_io task and its corresponding timeout task to read handle
Diffstat (limited to 'src/stream')
-rw-r--r-- | src/stream/stream_api.c | 127 |
1 files changed, 52 insertions, 75 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 0d10cf352..c5d8abada 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -261,11 +261,6 @@ struct GNUNET_STREAM_Socket | |||
261 | struct GNUNET_PeerIdentity other_peer; | 261 | struct GNUNET_PeerIdentity other_peer; |
262 | 262 | ||
263 | /** | 263 | /** |
264 | * Task identifier for the read io timeout task | ||
265 | */ | ||
266 | GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id; | ||
267 | |||
268 | /** | ||
269 | * Task identifier for retransmission task after timeout | 264 | * Task identifier for retransmission task after timeout |
270 | */ | 265 | */ |
271 | GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id; | 266 | GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id; |
@@ -281,11 +276,6 @@ struct GNUNET_STREAM_Socket | |||
281 | GNUNET_SCHEDULER_TaskIdentifier ack_task_id; | 276 | GNUNET_SCHEDULER_TaskIdentifier ack_task_id; |
282 | 277 | ||
283 | /** | 278 | /** |
284 | * Task scheduled to continue a read operation. | ||
285 | */ | ||
286 | GNUNET_SCHEDULER_TaskIdentifier read_task_id; | ||
287 | |||
288 | /** | ||
289 | * The state of the protocol associated with this socket | 279 | * The state of the protocol associated with this socket |
290 | */ | 280 | */ |
291 | enum State state; | 281 | enum State state; |
@@ -512,6 +502,16 @@ struct GNUNET_STREAM_IOReadHandle | |||
512 | * The closure pointer for the read processor callback | 502 | * The closure pointer for the read processor callback |
513 | */ | 503 | */ |
514 | void *proc_cls; | 504 | void *proc_cls; |
505 | |||
506 | /** | ||
507 | * Task identifier for the read io timeout task | ||
508 | */ | ||
509 | GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id; | ||
510 | |||
511 | /** | ||
512 | * Task scheduled to continue a read operation. | ||
513 | */ | ||
514 | GNUNET_SCHEDULER_TaskIdentifier read_task_id; | ||
515 | }; | 515 | }; |
516 | 516 | ||
517 | 517 | ||
@@ -921,22 +921,22 @@ call_read_processor (void *cls, | |||
921 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 921 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
922 | { | 922 | { |
923 | struct GNUNET_STREAM_Socket *socket = cls; | 923 | struct GNUNET_STREAM_Socket *socket = cls; |
924 | struct GNUNET_STREAM_IOReadHandle *read_handle; | ||
924 | size_t read_size; | 925 | size_t read_size; |
925 | size_t valid_read_size; | 926 | size_t valid_read_size; |
926 | unsigned int packet; | 927 | unsigned int packet; |
927 | uint32_t sequence_increase; | 928 | uint32_t sequence_increase; |
928 | uint32_t offset_increase; | 929 | uint32_t offset_increase; |
929 | 930 | ||
930 | socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; | 931 | read_handle = socket->read_handle; |
932 | GNUNET_assert (NULL != read_handle); | ||
933 | read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK; | ||
931 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | 934 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) |
932 | return; | 935 | return; |
933 | |||
934 | if (NULL == socket->receive_buffer) | 936 | if (NULL == socket->receive_buffer) |
935 | return; | 937 | return; |
936 | |||
937 | GNUNET_assert (NULL != socket->read_handle); | 938 | GNUNET_assert (NULL != socket->read_handle); |
938 | GNUNET_assert (NULL != socket->read_handle->proc); | 939 | GNUNET_assert (NULL != socket->read_handle->proc); |
939 | |||
940 | /* Check the bitmap for any holes */ | 940 | /* Check the bitmap for any holes */ |
941 | for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) | 941 | for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) |
942 | { | 942 | { |
@@ -950,22 +950,19 @@ call_read_processor (void *cls, | |||
950 | socket->receive_buffer_boundaries[packet-1] - socket->copy_offset; | 950 | socket->receive_buffer_boundaries[packet-1] - socket->copy_offset; |
951 | GNUNET_assert (0 != valid_read_size); | 951 | GNUNET_assert (0 != valid_read_size); |
952 | /* Cancel the read_io_timeout_task */ | 952 | /* Cancel the read_io_timeout_task */ |
953 | GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id); | 953 | GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id); |
954 | socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; | 954 | read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; |
955 | /* Call the data processor */ | 955 | /* Call the data processor */ |
956 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 956 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n", |
957 | "%s: Calling read processor\n", | ||
958 | GNUNET_i2s (&socket->other_peer)); | 957 | GNUNET_i2s (&socket->other_peer)); |
959 | read_size = | 958 | read_size = |
960 | socket->read_handle->proc (socket->read_handle->proc_cls, | 959 | socket->read_handle->proc (socket->read_handle->proc_cls, |
961 | socket->status, | 960 | socket->status, |
962 | socket->receive_buffer + socket->copy_offset, | 961 | socket->receive_buffer + socket->copy_offset, |
963 | valid_read_size); | 962 | valid_read_size); |
964 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 963 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n", |
965 | "%s: Read processor read %d bytes\n", | ||
966 | GNUNET_i2s (&socket->other_peer), read_size); | 964 | GNUNET_i2s (&socket->other_peer), read_size); |
967 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 965 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n", |
968 | "%s: Read processor completed successfully\n", | ||
969 | GNUNET_i2s (&socket->other_peer)); | 966 | GNUNET_i2s (&socket->other_peer)); |
970 | /* Free the read handle */ | 967 | /* Free the read handle */ |
971 | GNUNET_free (socket->read_handle); | 968 | GNUNET_free (socket->read_handle); |
@@ -980,14 +977,13 @@ call_read_processor (void *cls, | |||
980 | if (socket->copy_offset < socket->receive_buffer_boundaries[packet]) | 977 | if (socket->copy_offset < socket->receive_buffer_boundaries[packet]) |
981 | break; | 978 | break; |
982 | } | 979 | } |
983 | |||
984 | /* If no packets can be removed we can't move the buffer */ | 980 | /* If no packets can be removed we can't move the buffer */ |
985 | if (0 == packet) return; | 981 | if (0 == packet) |
982 | return; | ||
986 | sequence_increase = packet; | 983 | sequence_increase = packet; |
987 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 984 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
988 | "%s: Sequence increase after read processor completion: %u\n", | 985 | "%s: Sequence increase after read processor completion: %u\n", |
989 | GNUNET_i2s (&socket->other_peer), sequence_increase); | 986 | GNUNET_i2s (&socket->other_peer), sequence_increase); |
990 | |||
991 | /* Shift the data in the receive buffer */ | 987 | /* Shift the data in the receive buffer */ |
992 | socket->receive_buffer = | 988 | socket->receive_buffer = |
993 | memmove (socket->receive_buffer, | 989 | memmove (socket->receive_buffer, |
@@ -1040,24 +1036,26 @@ read_io_timeout (void *cls, | |||
1040 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 1036 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
1041 | { | 1037 | { |
1042 | struct GNUNET_STREAM_Socket *socket = cls; | 1038 | struct GNUNET_STREAM_Socket *socket = cls; |
1039 | struct GNUNET_STREAM_IOReadHandle *read_handle; | ||
1043 | GNUNET_STREAM_DataProcessor proc; | 1040 | GNUNET_STREAM_DataProcessor proc; |
1044 | void *proc_cls; | 1041 | void *proc_cls; |
1045 | 1042 | ||
1046 | socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; | 1043 | read_handle = socket->read_handle; |
1044 | GNUNET_assert (NULL != read_handle); | ||
1045 | read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; | ||
1047 | if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason)) | 1046 | if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason)) |
1048 | return; | 1047 | return; |
1049 | if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) | 1048 | if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK) |
1050 | { | 1049 | { |
1051 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1050 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1052 | "%s: Read task timedout - Cancelling it\n", | 1051 | "%s: Read task timedout - Cancelling it\n", |
1053 | GNUNET_i2s (&socket->other_peer)); | 1052 | GNUNET_i2s (&socket->other_peer)); |
1054 | GNUNET_SCHEDULER_cancel (socket->read_task_id); | 1053 | GNUNET_SCHEDULER_cancel (read_handle->read_task_id); |
1055 | socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; | 1054 | read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK; |
1056 | } | 1055 | } |
1057 | GNUNET_assert (NULL != socket->read_handle); | 1056 | proc = read_handle->proc; |
1058 | proc = socket->read_handle->proc; | 1057 | proc_cls = read_handle->proc_cls; |
1059 | proc_cls = socket->read_handle->proc_cls; | 1058 | GNUNET_free (read_handle); |
1060 | GNUNET_free (socket->read_handle); | ||
1061 | socket->read_handle = NULL; | 1059 | socket->read_handle = NULL; |
1062 | /* Call the read processor to signal timeout */ | 1060 | /* Call the read processor to signal timeout */ |
1063 | proc (proc_cls, | 1061 | proc (proc_cls, |
@@ -1220,15 +1218,15 @@ handle_data (struct GNUNET_STREAM_Socket *socket, | |||
1220 | } | 1218 | } |
1221 | if ((NULL != socket->read_handle) /* A read handle is waiting */ | 1219 | if ((NULL != socket->read_handle) /* A read handle is waiting */ |
1222 | /* There is no current read task */ | 1220 | /* There is no current read task */ |
1223 | && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id) | 1221 | && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id) |
1224 | /* We have the first packet */ | 1222 | /* We have the first packet */ |
1225 | && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0))) | 1223 | && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0))) |
1226 | { | 1224 | { |
1227 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n", | 1225 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n", |
1228 | GNUNET_i2s (&socket->other_peer)); | 1226 | GNUNET_i2s (&socket->other_peer)); |
1229 | socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, | 1227 | socket->read_handle->read_task_id = |
1230 | socket); | 1228 | GNUNET_SCHEDULER_add_now (&call_read_processor, socket); |
1231 | } | 1229 | } |
1232 | break; | 1230 | break; |
1233 | default: | 1231 | default: |
1234 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1232 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -3093,14 +3091,7 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) | |||
3093 | GNUNET_STREAM_io_write_cancel (socket->write_handle); | 3091 | GNUNET_STREAM_io_write_cancel (socket->write_handle); |
3094 | //socket->write_handle = NULL; | 3092 | //socket->write_handle = NULL; |
3095 | } | 3093 | } |
3096 | if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) | 3094 | /* Terminate the ack'ing task if they are still present */ |
3097 | { | ||
3098 | /* socket closed with read task pending!? */ | ||
3099 | GNUNET_break (0); | ||
3100 | GNUNET_SCHEDULER_cancel (socket->read_task_id); | ||
3101 | socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; | ||
3102 | } | ||
3103 | /* Terminate the ack'ing tasks if they are still present */ | ||
3104 | if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK) | 3095 | if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK) |
3105 | { | 3096 | { |
3106 | GNUNET_SCHEDULER_cancel (socket->ack_task_id); | 3097 | GNUNET_SCHEDULER_cancel (socket->ack_task_id); |
@@ -3433,22 +3424,14 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, | |||
3433 | read_handle->proc_cls = proc_cls; | 3424 | read_handle->proc_cls = proc_cls; |
3434 | read_handle->socket = socket; | 3425 | read_handle->socket = socket; |
3435 | socket->read_handle = read_handle; | 3426 | socket->read_handle = read_handle; |
3436 | /* Check if we have a packet at bitmap 0 */ | ||
3437 | if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, | 3427 | if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, |
3438 | 0)) | 3428 | 0)) |
3439 | { | 3429 | read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, |
3440 | socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, | 3430 | socket); |
3441 | socket); | 3431 | read_handle->read_io_timeout_task_id = |
3442 | } | 3432 | GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket); |
3443 | /* Setup the read timeout task */ | 3433 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n", |
3444 | socket->read_io_timeout_task_id = | 3434 | GNUNET_i2s (&socket->other_peer), __func__); |
3445 | GNUNET_SCHEDULER_add_delayed (timeout, | ||
3446 | &read_io_timeout, | ||
3447 | socket); | ||
3448 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
3449 | "%s: %s() END\n", | ||
3450 | GNUNET_i2s (&socket->other_peer), | ||
3451 | __func__); | ||
3452 | return read_handle; | 3435 | return read_handle; |
3453 | } | 3436 | } |
3454 | 3437 | ||
@@ -3500,17 +3483,11 @@ GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh) | |||
3500 | /* Read io time task should be there; if it is already executed then this | 3483 | /* Read io time task should be there; if it is already executed then this |
3501 | read handle is not valid; However upon scheduler shutdown the read io task | 3484 | read handle is not valid; However upon scheduler shutdown the read io task |
3502 | may be executed before */ | 3485 | may be executed before */ |
3503 | if (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id) | 3486 | if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id) |
3504 | { | 3487 | GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id); |
3505 | GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id); | ||
3506 | socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; | ||
3507 | } | ||
3508 | /* reading task may be present; if so we have to stop it */ | 3488 | /* reading task may be present; if so we have to stop it */ |
3509 | if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id) | 3489 | if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id) |
3510 | { | 3490 | GNUNET_SCHEDULER_cancel (ioh->read_task_id); |
3511 | GNUNET_SCHEDULER_cancel (socket->read_task_id); | ||
3512 | socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; | ||
3513 | } | ||
3514 | GNUNET_free (ioh); | 3491 | GNUNET_free (ioh); |
3515 | socket->read_handle = NULL; | 3492 | socket->read_handle = NULL; |
3516 | } | 3493 | } |