diff options
author | Sree Harsha Totakura <totakura@in.tum.de> | 2012-12-11 10:51:28 +0000 |
---|---|---|
committer | Sree Harsha Totakura <totakura@in.tum.de> | 2012-12-11 10:51:28 +0000 |
commit | 04fe186c8825f1eba376a092772095bc20bf353a (patch) | |
tree | deafd7a681ee84d246cef2fe59d16845fecbb158 /src/stream | |
parent | 3f8d16566988f458ed2b8fc23604b71c73cdf7f6 (diff) | |
download | gnunet-04fe186c8825f1eba376a092772095bc20bf353a.tar.gz gnunet-04fe186c8825f1eba376a092772095bc20bf353a.zip |
allow calling GNUNET_STREAM_read() from DataProcessor callback
Diffstat (limited to 'src/stream')
-rw-r--r-- | src/stream/stream_api.c | 102 |
1 files changed, 73 insertions, 29 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 9c6056c01..1ca4031f2 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c | |||
@@ -537,6 +537,12 @@ struct GNUNET_STREAM_IOReadHandle | |||
537 | * Task scheduled to continue a read operation. | 537 | * Task scheduled to continue a read operation. |
538 | */ | 538 | */ |
539 | GNUNET_SCHEDULER_TaskIdentifier read_task_id; | 539 | GNUNET_SCHEDULER_TaskIdentifier read_task_id; |
540 | |||
541 | /** | ||
542 | * Task scheduled from GNUNET_STREAM_read() to lookup the ACK bitmap and call | ||
543 | * the read processor task | ||
544 | */ | ||
545 | GNUNET_SCHEDULER_TaskIdentifier probe_data_availability_task_id; | ||
540 | }; | 546 | }; |
541 | 547 | ||
542 | 548 | ||
@@ -945,6 +951,32 @@ write_data (struct GNUNET_STREAM_Socket *socket) | |||
945 | 951 | ||
946 | 952 | ||
947 | /** | 953 | /** |
954 | * Cleansup the sockets read handle | ||
955 | * | ||
956 | * @param socket the socket whose read handle has to be cleanedup | ||
957 | */ | ||
958 | static void | ||
959 | cleanup_read_handle (struct GNUNET_STREAM_Socket *socket) | ||
960 | { | ||
961 | struct GNUNET_STREAM_IOReadHandle *read_handle; | ||
962 | |||
963 | read_handle = socket->read_handle; | ||
964 | /* Read io time task should be there; if it is already executed then this | ||
965 | read handle is not valid; However upon scheduler shutdown the read io task | ||
966 | may be executed before */ | ||
967 | if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_io_timeout_task_id) | ||
968 | GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id); | ||
969 | /* reading task may be present; if so we have to stop it */ | ||
970 | if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_task_id) | ||
971 | GNUNET_SCHEDULER_cancel (read_handle->read_task_id); | ||
972 | if (GNUNET_SCHEDULER_NO_TASK != read_handle->probe_data_availability_task_id) | ||
973 | GNUNET_SCHEDULER_cancel (read_handle->probe_data_availability_task_id); | ||
974 | GNUNET_free (read_handle); | ||
975 | socket->read_handle = NULL; | ||
976 | } | ||
977 | |||
978 | |||
979 | /** | ||
948 | * Task for calling the read processor | 980 | * Task for calling the read processor |
949 | * | 981 | * |
950 | * @param cls the socket | 982 | * @param cls the socket |
@@ -956,6 +988,8 @@ call_read_processor (void *cls, | |||
956 | { | 988 | { |
957 | struct GNUNET_STREAM_Socket *socket = cls; | 989 | struct GNUNET_STREAM_Socket *socket = cls; |
958 | struct GNUNET_STREAM_IOReadHandle *read_handle; | 990 | struct GNUNET_STREAM_IOReadHandle *read_handle; |
991 | GNUNET_STREAM_DataProcessor proc; | ||
992 | void *proc_cls; | ||
959 | size_t read_size; | 993 | size_t read_size; |
960 | size_t valid_read_size; | 994 | size_t valid_read_size; |
961 | unsigned int packet; | 995 | unsigned int packet; |
@@ -969,8 +1003,7 @@ call_read_processor (void *cls, | |||
969 | return; | 1003 | return; |
970 | if (NULL == socket->receive_buffer) | 1004 | if (NULL == socket->receive_buffer) |
971 | return; | 1005 | return; |
972 | GNUNET_assert (NULL != socket->read_handle); | 1006 | GNUNET_assert (NULL != read_handle->proc); |
973 | GNUNET_assert (NULL != socket->read_handle->proc); | ||
974 | /* Check the bitmap for any holes */ | 1007 | /* Check the bitmap for any holes */ |
975 | for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) | 1008 | for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) |
976 | { | 1009 | { |
@@ -983,24 +1016,19 @@ call_read_processor (void *cls, | |||
983 | valid_read_size = | 1016 | valid_read_size = |
984 | socket->receive_buffer_boundaries[packet-1] - socket->copy_offset; | 1017 | socket->receive_buffer_boundaries[packet-1] - socket->copy_offset; |
985 | GNUNET_assert (0 != valid_read_size); | 1018 | GNUNET_assert (0 != valid_read_size); |
986 | /* Cancel the read_io_timeout_task */ | 1019 | proc = read_handle->proc; |
987 | GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id); | 1020 | proc_cls = read_handle->proc_cls; |
988 | read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; | 1021 | cleanup_read_handle (socket); |
989 | /* Call the data processor */ | 1022 | /* Call the data processor */ |
990 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n", | 1023 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n", |
991 | GNUNET_i2s (&socket->other_peer)); | 1024 | GNUNET_i2s (&socket->other_peer)); |
992 | read_size = | 1025 | read_size = proc (proc_cls, socket->status, |
993 | socket->read_handle->proc (socket->read_handle->proc_cls, | 1026 | socket->receive_buffer + socket->copy_offset, |
994 | socket->status, | 1027 | valid_read_size); |
995 | socket->receive_buffer + socket->copy_offset, | ||
996 | valid_read_size); | ||
997 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n", | 1028 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n", |
998 | GNUNET_i2s (&socket->other_peer), read_size); | 1029 | GNUNET_i2s (&socket->other_peer), read_size); |
999 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n", | 1030 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n", |
1000 | GNUNET_i2s (&socket->other_peer)); | 1031 | GNUNET_i2s (&socket->other_peer)); |
1001 | /* Free the read handle */ | ||
1002 | GNUNET_free (socket->read_handle); | ||
1003 | socket->read_handle = NULL; | ||
1004 | GNUNET_assert (read_size <= valid_read_size); | 1032 | GNUNET_assert (read_size <= valid_read_size); |
1005 | socket->copy_offset += read_size; | 1033 | socket->copy_offset += read_size; |
1006 | /* Determine upto which packet we can remove from the buffer */ | 1034 | /* Determine upto which packet we can remove from the buffer */ |
@@ -3554,7 +3582,34 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, | |||
3554 | 3582 | ||
3555 | 3583 | ||
3556 | /** | 3584 | /** |
3557 | * Tries to read data from the stream. | 3585 | * Function to check the ACK bitmap for any received messages and call the data processor |
3586 | * | ||
3587 | * @param cls the socket | ||
3588 | * @param tc the scheduler task context | ||
3589 | */ | ||
3590 | static void | ||
3591 | probe_data_availability (void *cls, | ||
3592 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
3593 | { | ||
3594 | struct GNUNET_STREAM_Socket *socket = cls; | ||
3595 | |||
3596 | GNUNET_assert (NULL != socket->read_handle); | ||
3597 | socket->read_handle->probe_data_availability_task_id = | ||
3598 | GNUNET_SCHEDULER_NO_TASK; | ||
3599 | if (GNUNET_SCHEDULER_NO_TASK != socket->read_handle->read_task_id) | ||
3600 | return; /* A task to call read processor is present */ | ||
3601 | if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, | ||
3602 | 0)) | ||
3603 | socket->read_handle->read_task_id | ||
3604 | = GNUNET_SCHEDULER_add_now (&call_read_processor, socket); | ||
3605 | } | ||
3606 | |||
3607 | |||
3608 | |||
3609 | /** | ||
3610 | * Tries to read data from the stream. Should not be called when another read | ||
3611 | * handle is present; the existing read handle should be canceled with | ||
3612 | * GNUNET_STREAM_io_read_cancel(). Only one read handle per socket is present at any time | ||
3558 | * | 3613 | * |
3559 | * @param socket the socket representing a stream | 3614 | * @param socket the socket representing a stream |
3560 | * @param timeout the timeout period | 3615 | * @param timeout the timeout period |
@@ -3583,7 +3638,7 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, | |||
3583 | first before continuing or has to wait until it is completed */ | 3638 | first before continuing or has to wait until it is completed */ |
3584 | if (NULL != socket->read_handle) | 3639 | if (NULL != socket->read_handle) |
3585 | { | 3640 | { |
3586 | GNUNET_break (0); | 3641 | GNUNET_assert (0); |
3587 | return NULL; | 3642 | return NULL; |
3588 | } | 3643 | } |
3589 | GNUNET_assert (NULL != proc); | 3644 | GNUNET_assert (NULL != proc); |
@@ -3607,10 +3662,8 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, | |||
3607 | read_handle->proc_cls = proc_cls; | 3662 | read_handle->proc_cls = proc_cls; |
3608 | read_handle->socket = socket; | 3663 | read_handle->socket = socket; |
3609 | socket->read_handle = read_handle; | 3664 | socket->read_handle = read_handle; |
3610 | if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, | 3665 | read_handle->probe_data_availability_task_id = |
3611 | 0)) | 3666 | GNUNET_SCHEDULER_add_now (&probe_data_availability, socket); |
3612 | read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, | ||
3613 | socket); | ||
3614 | read_handle->read_io_timeout_task_id = | 3667 | read_handle->read_io_timeout_task_id = |
3615 | GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket); | 3668 | GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket); |
3616 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n", | 3669 | LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n", |
@@ -3660,16 +3713,7 @@ GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh) | |||
3660 | socket = ioh->socket; | 3713 | socket = ioh->socket; |
3661 | GNUNET_assert (NULL != socket->read_handle); | 3714 | GNUNET_assert (NULL != socket->read_handle); |
3662 | GNUNET_assert (ioh == socket->read_handle); | 3715 | GNUNET_assert (ioh == socket->read_handle); |
3663 | /* Read io time task should be there; if it is already executed then this | 3716 | cleanup_read_handle (socket); |
3664 | read handle is not valid; However upon scheduler shutdown the read io task | ||
3665 | may be executed before */ | ||
3666 | if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id) | ||
3667 | GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id); | ||
3668 | /* reading task may be present; if so we have to stop it */ | ||
3669 | if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id) | ||
3670 | GNUNET_SCHEDULER_cancel (ioh->read_task_id); | ||
3671 | GNUNET_free (ioh); | ||
3672 | socket->read_handle = NULL; | ||
3673 | } | 3717 | } |
3674 | 3718 | ||
3675 | /* end of stream_api.c */ | 3719 | /* end of stream_api.c */ |