aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-12-11 10:51:28 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-12-11 10:51:28 +0000
commit04fe186c8825f1eba376a092772095bc20bf353a (patch)
treedeafd7a681ee84d246cef2fe59d16845fecbb158 /src/stream
parent3f8d16566988f458ed2b8fc23604b71c73cdf7f6 (diff)
downloadgnunet-04fe186c8825f1eba376a092772095bc20bf353a.tar.gz
gnunet-04fe186c8825f1eba376a092772095bc20bf353a.zip
allow calling GNUNET_STREAM_read() from DataProcessor callback
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/stream_api.c102
1 files changed, 73 insertions, 29 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 9c6056c01..1ca4031f2 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -537,6 +537,12 @@ struct GNUNET_STREAM_IOReadHandle
537 * Task scheduled to continue a read operation. 537 * Task scheduled to continue a read operation.
538 */ 538 */
539 GNUNET_SCHEDULER_TaskIdentifier read_task_id; 539 GNUNET_SCHEDULER_TaskIdentifier read_task_id;
540
541 /**
542 * Task scheduled from GNUNET_STREAM_read() to lookup the ACK bitmap and call
543 * the read processor task
544 */
545 GNUNET_SCHEDULER_TaskIdentifier probe_data_availability_task_id;
540}; 546};
541 547
542 548
@@ -945,6 +951,32 @@ write_data (struct GNUNET_STREAM_Socket *socket)
945 951
946 952
947/** 953/**
954 * Cleansup the sockets read handle
955 *
956 * @param socket the socket whose read handle has to be cleanedup
957 */
958static void
959cleanup_read_handle (struct GNUNET_STREAM_Socket *socket)
960{
961 struct GNUNET_STREAM_IOReadHandle *read_handle;
962
963 read_handle = socket->read_handle;
964 /* Read io time task should be there; if it is already executed then this
965 read handle is not valid; However upon scheduler shutdown the read io task
966 may be executed before */
967 if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_io_timeout_task_id)
968 GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
969 /* reading task may be present; if so we have to stop it */
970 if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_task_id)
971 GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
972 if (GNUNET_SCHEDULER_NO_TASK != read_handle->probe_data_availability_task_id)
973 GNUNET_SCHEDULER_cancel (read_handle->probe_data_availability_task_id);
974 GNUNET_free (read_handle);
975 socket->read_handle = NULL;
976}
977
978
979/**
948 * Task for calling the read processor 980 * Task for calling the read processor
949 * 981 *
950 * @param cls the socket 982 * @param cls the socket
@@ -956,6 +988,8 @@ call_read_processor (void *cls,
956{ 988{
957 struct GNUNET_STREAM_Socket *socket = cls; 989 struct GNUNET_STREAM_Socket *socket = cls;
958 struct GNUNET_STREAM_IOReadHandle *read_handle; 990 struct GNUNET_STREAM_IOReadHandle *read_handle;
991 GNUNET_STREAM_DataProcessor proc;
992 void *proc_cls;
959 size_t read_size; 993 size_t read_size;
960 size_t valid_read_size; 994 size_t valid_read_size;
961 unsigned int packet; 995 unsigned int packet;
@@ -969,8 +1003,7 @@ call_read_processor (void *cls,
969 return; 1003 return;
970 if (NULL == socket->receive_buffer) 1004 if (NULL == socket->receive_buffer)
971 return; 1005 return;
972 GNUNET_assert (NULL != socket->read_handle); 1006 GNUNET_assert (NULL != read_handle->proc);
973 GNUNET_assert (NULL != socket->read_handle->proc);
974 /* Check the bitmap for any holes */ 1007 /* Check the bitmap for any holes */
975 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) 1008 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
976 { 1009 {
@@ -983,24 +1016,19 @@ call_read_processor (void *cls,
983 valid_read_size = 1016 valid_read_size =
984 socket->receive_buffer_boundaries[packet-1] - socket->copy_offset; 1017 socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
985 GNUNET_assert (0 != valid_read_size); 1018 GNUNET_assert (0 != valid_read_size);
986 /* Cancel the read_io_timeout_task */ 1019 proc = read_handle->proc;
987 GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id); 1020 proc_cls = read_handle->proc_cls;
988 read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; 1021 cleanup_read_handle (socket);
989 /* Call the data processor */ 1022 /* Call the data processor */
990 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n", 1023 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
991 GNUNET_i2s (&socket->other_peer)); 1024 GNUNET_i2s (&socket->other_peer));
992 read_size = 1025 read_size = proc (proc_cls, socket->status,
993 socket->read_handle->proc (socket->read_handle->proc_cls, 1026 socket->receive_buffer + socket->copy_offset,
994 socket->status, 1027 valid_read_size);
995 socket->receive_buffer + socket->copy_offset,
996 valid_read_size);
997 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n", 1028 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
998 GNUNET_i2s (&socket->other_peer), read_size); 1029 GNUNET_i2s (&socket->other_peer), read_size);
999 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n", 1030 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
1000 GNUNET_i2s (&socket->other_peer)); 1031 GNUNET_i2s (&socket->other_peer));
1001 /* Free the read handle */
1002 GNUNET_free (socket->read_handle);
1003 socket->read_handle = NULL;
1004 GNUNET_assert (read_size <= valid_read_size); 1032 GNUNET_assert (read_size <= valid_read_size);
1005 socket->copy_offset += read_size; 1033 socket->copy_offset += read_size;
1006 /* Determine upto which packet we can remove from the buffer */ 1034 /* Determine upto which packet we can remove from the buffer */
@@ -3554,7 +3582,34 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3554 3582
3555 3583
3556/** 3584/**
3557 * Tries to read data from the stream. 3585 * Function to check the ACK bitmap for any received messages and call the data processor
3586 *
3587 * @param cls the socket
3588 * @param tc the scheduler task context
3589 */
3590static void
3591probe_data_availability (void *cls,
3592 const struct GNUNET_SCHEDULER_TaskContext *tc)
3593{
3594 struct GNUNET_STREAM_Socket *socket = cls;
3595
3596 GNUNET_assert (NULL != socket->read_handle);
3597 socket->read_handle->probe_data_availability_task_id =
3598 GNUNET_SCHEDULER_NO_TASK;
3599 if (GNUNET_SCHEDULER_NO_TASK != socket->read_handle->read_task_id)
3600 return; /* A task to call read processor is present */
3601 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3602 0))
3603 socket->read_handle->read_task_id
3604 = GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
3605}
3606
3607
3608
3609/**
3610 * Tries to read data from the stream. Should not be called when another read
3611 * handle is present; the existing read handle should be canceled with
3612 * GNUNET_STREAM_io_read_cancel(). Only one read handle per socket is present at any time
3558 * 3613 *
3559 * @param socket the socket representing a stream 3614 * @param socket the socket representing a stream
3560 * @param timeout the timeout period 3615 * @param timeout the timeout period
@@ -3583,7 +3638,7 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3583 first before continuing or has to wait until it is completed */ 3638 first before continuing or has to wait until it is completed */
3584 if (NULL != socket->read_handle) 3639 if (NULL != socket->read_handle)
3585 { 3640 {
3586 GNUNET_break (0); 3641 GNUNET_assert (0);
3587 return NULL; 3642 return NULL;
3588 } 3643 }
3589 GNUNET_assert (NULL != proc); 3644 GNUNET_assert (NULL != proc);
@@ -3607,10 +3662,8 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3607 read_handle->proc_cls = proc_cls; 3662 read_handle->proc_cls = proc_cls;
3608 read_handle->socket = socket; 3663 read_handle->socket = socket;
3609 socket->read_handle = read_handle; 3664 socket->read_handle = read_handle;
3610 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, 3665 read_handle->probe_data_availability_task_id =
3611 0)) 3666 GNUNET_SCHEDULER_add_now (&probe_data_availability, socket);
3612 read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
3613 socket);
3614 read_handle->read_io_timeout_task_id = 3667 read_handle->read_io_timeout_task_id =
3615 GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket); 3668 GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3616 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n", 3669 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
@@ -3660,16 +3713,7 @@ GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
3660 socket = ioh->socket; 3713 socket = ioh->socket;
3661 GNUNET_assert (NULL != socket->read_handle); 3714 GNUNET_assert (NULL != socket->read_handle);
3662 GNUNET_assert (ioh == socket->read_handle); 3715 GNUNET_assert (ioh == socket->read_handle);
3663 /* Read io time task should be there; if it is already executed then this 3716 cleanup_read_handle (socket);
3664 read handle is not valid; However upon scheduler shutdown the read io task
3665 may be executed before */
3666 if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
3667 GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
3668 /* reading task may be present; if so we have to stop it */
3669 if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
3670 GNUNET_SCHEDULER_cancel (ioh->read_task_id);
3671 GNUNET_free (ioh);
3672 socket->read_handle = NULL;
3673} 3717}
3674 3718
3675/* end of stream_api.c */ 3719/* end of stream_api.c */