aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-10-04 08:49:12 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-10-04 08:49:12 +0000
commite0bb43c1f61ba02d19733fc4c3f09758d0b04029 (patch)
tree87e33f776390f39daec2e5cdfd5592d2de6e67e7 /src/stream
parent7b32b05be57fd86b25dab886c363c998515d107a (diff)
downloadgnunet-e0bb43c1f61ba02d19733fc4c3f09758d0b04029.tar.gz
gnunet-e0bb43c1f61ba02d19733fc4c3f09758d0b04029.zip
moved read_io task and its corresponding timeout task to read handle
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/stream_api.c127
1 files changed, 52 insertions, 75 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 0d10cf352..c5d8abada 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -261,11 +261,6 @@ struct GNUNET_STREAM_Socket
261 struct GNUNET_PeerIdentity other_peer; 261 struct GNUNET_PeerIdentity other_peer;
262 262
263 /** 263 /**
264 * Task identifier for the read io timeout task
265 */
266 GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
267
268 /**
269 * Task identifier for retransmission task after timeout 264 * Task identifier for retransmission task after timeout
270 */ 265 */
271 GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id; 266 GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
@@ -281,11 +276,6 @@ struct GNUNET_STREAM_Socket
281 GNUNET_SCHEDULER_TaskIdentifier ack_task_id; 276 GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
282 277
283 /** 278 /**
284 * Task scheduled to continue a read operation.
285 */
286 GNUNET_SCHEDULER_TaskIdentifier read_task_id;
287
288 /**
289 * The state of the protocol associated with this socket 279 * The state of the protocol associated with this socket
290 */ 280 */
291 enum State state; 281 enum State state;
@@ -512,6 +502,16 @@ struct GNUNET_STREAM_IOReadHandle
512 * The closure pointer for the read processor callback 502 * The closure pointer for the read processor callback
513 */ 503 */
514 void *proc_cls; 504 void *proc_cls;
505
506 /**
507 * Task identifier for the read io timeout task
508 */
509 GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
510
511 /**
512 * Task scheduled to continue a read operation.
513 */
514 GNUNET_SCHEDULER_TaskIdentifier read_task_id;
515}; 515};
516 516
517 517
@@ -921,22 +921,22 @@ call_read_processor (void *cls,
921 const struct GNUNET_SCHEDULER_TaskContext *tc) 921 const struct GNUNET_SCHEDULER_TaskContext *tc)
922{ 922{
923 struct GNUNET_STREAM_Socket *socket = cls; 923 struct GNUNET_STREAM_Socket *socket = cls;
924 struct GNUNET_STREAM_IOReadHandle *read_handle;
924 size_t read_size; 925 size_t read_size;
925 size_t valid_read_size; 926 size_t valid_read_size;
926 unsigned int packet; 927 unsigned int packet;
927 uint32_t sequence_increase; 928 uint32_t sequence_increase;
928 uint32_t offset_increase; 929 uint32_t offset_increase;
929 930
930 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; 931 read_handle = socket->read_handle;
932 GNUNET_assert (NULL != read_handle);
933 read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
931 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 934 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
932 return; 935 return;
933
934 if (NULL == socket->receive_buffer) 936 if (NULL == socket->receive_buffer)
935 return; 937 return;
936
937 GNUNET_assert (NULL != socket->read_handle); 938 GNUNET_assert (NULL != socket->read_handle);
938 GNUNET_assert (NULL != socket->read_handle->proc); 939 GNUNET_assert (NULL != socket->read_handle->proc);
939
940 /* Check the bitmap for any holes */ 940 /* Check the bitmap for any holes */
941 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) 941 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
942 { 942 {
@@ -950,22 +950,19 @@ call_read_processor (void *cls,
950 socket->receive_buffer_boundaries[packet-1] - socket->copy_offset; 950 socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
951 GNUNET_assert (0 != valid_read_size); 951 GNUNET_assert (0 != valid_read_size);
952 /* Cancel the read_io_timeout_task */ 952 /* Cancel the read_io_timeout_task */
953 GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id); 953 GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
954 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; 954 read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
955 /* Call the data processor */ 955 /* Call the data processor */
956 LOG (GNUNET_ERROR_TYPE_DEBUG, 956 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
957 "%s: Calling read processor\n",
958 GNUNET_i2s (&socket->other_peer)); 957 GNUNET_i2s (&socket->other_peer));
959 read_size = 958 read_size =
960 socket->read_handle->proc (socket->read_handle->proc_cls, 959 socket->read_handle->proc (socket->read_handle->proc_cls,
961 socket->status, 960 socket->status,
962 socket->receive_buffer + socket->copy_offset, 961 socket->receive_buffer + socket->copy_offset,
963 valid_read_size); 962 valid_read_size);
964 LOG (GNUNET_ERROR_TYPE_DEBUG, 963 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
965 "%s: Read processor read %d bytes\n",
966 GNUNET_i2s (&socket->other_peer), read_size); 964 GNUNET_i2s (&socket->other_peer), read_size);
967 LOG (GNUNET_ERROR_TYPE_DEBUG, 965 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
968 "%s: Read processor completed successfully\n",
969 GNUNET_i2s (&socket->other_peer)); 966 GNUNET_i2s (&socket->other_peer));
970 /* Free the read handle */ 967 /* Free the read handle */
971 GNUNET_free (socket->read_handle); 968 GNUNET_free (socket->read_handle);
@@ -980,14 +977,13 @@ call_read_processor (void *cls,
980 if (socket->copy_offset < socket->receive_buffer_boundaries[packet]) 977 if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
981 break; 978 break;
982 } 979 }
983
984 /* If no packets can be removed we can't move the buffer */ 980 /* If no packets can be removed we can't move the buffer */
985 if (0 == packet) return; 981 if (0 == packet)
982 return;
986 sequence_increase = packet; 983 sequence_increase = packet;
987 LOG (GNUNET_ERROR_TYPE_DEBUG, 984 LOG (GNUNET_ERROR_TYPE_DEBUG,
988 "%s: Sequence increase after read processor completion: %u\n", 985 "%s: Sequence increase after read processor completion: %u\n",
989 GNUNET_i2s (&socket->other_peer), sequence_increase); 986 GNUNET_i2s (&socket->other_peer), sequence_increase);
990
991 /* Shift the data in the receive buffer */ 987 /* Shift the data in the receive buffer */
992 socket->receive_buffer = 988 socket->receive_buffer =
993 memmove (socket->receive_buffer, 989 memmove (socket->receive_buffer,
@@ -1040,24 +1036,26 @@ read_io_timeout (void *cls,
1040 const struct GNUNET_SCHEDULER_TaskContext *tc) 1036 const struct GNUNET_SCHEDULER_TaskContext *tc)
1041{ 1037{
1042 struct GNUNET_STREAM_Socket *socket = cls; 1038 struct GNUNET_STREAM_Socket *socket = cls;
1039 struct GNUNET_STREAM_IOReadHandle *read_handle;
1043 GNUNET_STREAM_DataProcessor proc; 1040 GNUNET_STREAM_DataProcessor proc;
1044 void *proc_cls; 1041 void *proc_cls;
1045 1042
1046 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; 1043 read_handle = socket->read_handle;
1044 GNUNET_assert (NULL != read_handle);
1045 read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1047 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason)) 1046 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1048 return; 1047 return;
1049 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) 1048 if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1050 { 1049 {
1051 LOG (GNUNET_ERROR_TYPE_DEBUG, 1050 LOG (GNUNET_ERROR_TYPE_DEBUG,
1052 "%s: Read task timedout - Cancelling it\n", 1051 "%s: Read task timedout - Cancelling it\n",
1053 GNUNET_i2s (&socket->other_peer)); 1052 GNUNET_i2s (&socket->other_peer));
1054 GNUNET_SCHEDULER_cancel (socket->read_task_id); 1053 GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1055 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; 1054 read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1056 } 1055 }
1057 GNUNET_assert (NULL != socket->read_handle); 1056 proc = read_handle->proc;
1058 proc = socket->read_handle->proc; 1057 proc_cls = read_handle->proc_cls;
1059 proc_cls = socket->read_handle->proc_cls; 1058 GNUNET_free (read_handle);
1060 GNUNET_free (socket->read_handle);
1061 socket->read_handle = NULL; 1059 socket->read_handle = NULL;
1062 /* Call the read processor to signal timeout */ 1060 /* Call the read processor to signal timeout */
1063 proc (proc_cls, 1061 proc (proc_cls,
@@ -1220,15 +1218,15 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
1220 } 1218 }
1221 if ((NULL != socket->read_handle) /* A read handle is waiting */ 1219 if ((NULL != socket->read_handle) /* A read handle is waiting */
1222 /* There is no current read task */ 1220 /* There is no current read task */
1223 && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id) 1221 && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1224 /* We have the first packet */ 1222 /* We have the first packet */
1225 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0))) 1223 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1226 { 1224 {
1227 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n", 1225 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1228 GNUNET_i2s (&socket->other_peer)); 1226 GNUNET_i2s (&socket->other_peer));
1229 socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, 1227 socket->read_handle->read_task_id =
1230 socket); 1228 GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1231 } 1229 }
1232 break; 1230 break;
1233 default: 1231 default:
1234 LOG (GNUNET_ERROR_TYPE_DEBUG, 1232 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -3093,14 +3091,7 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3093 GNUNET_STREAM_io_write_cancel (socket->write_handle); 3091 GNUNET_STREAM_io_write_cancel (socket->write_handle);
3094 //socket->write_handle = NULL; 3092 //socket->write_handle = NULL;
3095 } 3093 }
3096 if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) 3094 /* Terminate the ack'ing task if they are still present */
3097 {
3098 /* socket closed with read task pending!? */
3099 GNUNET_break (0);
3100 GNUNET_SCHEDULER_cancel (socket->read_task_id);
3101 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3102 }
3103 /* Terminate the ack'ing tasks if they are still present */
3104 if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK) 3095 if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3105 { 3096 {
3106 GNUNET_SCHEDULER_cancel (socket->ack_task_id); 3097 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
@@ -3433,22 +3424,14 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3433 read_handle->proc_cls = proc_cls; 3424 read_handle->proc_cls = proc_cls;
3434 read_handle->socket = socket; 3425 read_handle->socket = socket;
3435 socket->read_handle = read_handle; 3426 socket->read_handle = read_handle;
3436 /* Check if we have a packet at bitmap 0 */
3437 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, 3427 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3438 0)) 3428 0))
3439 { 3429 read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3440 socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, 3430 socket);
3441 socket); 3431 read_handle->read_io_timeout_task_id =
3442 } 3432 GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3443 /* Setup the read timeout task */ 3433 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3444 socket->read_io_timeout_task_id = 3434 GNUNET_i2s (&socket->other_peer), __func__);
3445 GNUNET_SCHEDULER_add_delayed (timeout,
3446 &read_io_timeout,
3447 socket);
3448 LOG (GNUNET_ERROR_TYPE_DEBUG,
3449 "%s: %s() END\n",
3450 GNUNET_i2s (&socket->other_peer),
3451 __func__);
3452 return read_handle; 3435 return read_handle;
3453} 3436}
3454 3437
@@ -3500,17 +3483,11 @@ GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3500 /* Read io time task should be there; if it is already executed then this 3483 /* Read io time task should be there; if it is already executed then this
3501 read handle is not valid; However upon scheduler shutdown the read io task 3484 read handle is not valid; However upon scheduler shutdown the read io task
3502 may be executed before */ 3485 may be executed before */
3503 if (GNUNET_SCHEDULER_NO_TASK != socket->read_io_timeout_task_id) 3486 if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3504 { 3487 GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3505 GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
3506 socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
3507 }
3508 /* reading task may be present; if so we have to stop it */ 3488 /* reading task may be present; if so we have to stop it */
3509 if (GNUNET_SCHEDULER_NO_TASK != socket->read_task_id) 3489 if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3510 { 3490 GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3511 GNUNET_SCHEDULER_cancel (socket->read_task_id);
3512 socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
3513 }
3514 GNUNET_free (ioh); 3491 GNUNET_free (ioh);
3515 socket->read_handle = NULL; 3492 socket->read_handle = NULL;
3516} 3493}