commit 4b0a8adbcc7d22cc86bb07dc56f7367dc96ad1ed
parent e421dc4c64105540b24ee732e8887ee26c40276a
Author: Andrey Uzunov <andrey.uzunov@gmail.com>
Date: Sun, 11 Aug 2013 20:40:36 +0000
spdy: WINDOW_UPDATE sent on data receiving to allow big HTTP bodies
Diffstat:
5 files changed, 161 insertions(+), 5 deletions(-)
diff --git a/src/microspdy/internal.h b/src/microspdy/internal.h
@@ -28,14 +28,24 @@
#include "platform.h"
#include "microspdy.h"
-/* size of read buffers for each connection
- * must be at least the size of SPDY_MAX_SUPPORTED_FRAME_SIZE */
+/**
+ * size of read buffers for each connection
+ * must be at least the size of SPDY_MAX_SUPPORTED_FRAME_SIZE
+ */
#define SPDYF_BUFFER_SIZE 8192
-/* number of frames written to the socket at once. After X frames
+/**
+ * initial size of window for each stream (this is for the data
+ * within data frames that can be handled)
+ */
+#define SPDYF_INITIAL_WINDOW_SIZE 65536
+
+/**
+ * number of frames written to the socket at once. After X frames
* everything should be run again. In this way the application can
* response to more important requests while a big file is still
- * being transmitted to the client */
+ * being transmitted to the client
+ */
#define SPDYF_NUM_SENT_FRAMES_AT_ONCE 10
diff --git a/src/microspdy/session.c b/src/microspdy/session.c
@@ -380,7 +380,9 @@ spdyf_handler_read_data (struct SPDY_Session *session)
frame->length,
0 == (SPDY_DATA_FLAG_FIN & frame->flags));
- session->read_buffer_beginning += frame->length;
+ session->read_buffer_beginning += frame->length;
+
+ stream->window_size -= frame->length;
//TODO close in and send rst maybe
SPDYF_ASSERT(SPDY_YES == ret, "Cancel POST data is not yet implemented");
@@ -389,6 +391,20 @@ spdyf_handler_read_data (struct SPDY_Session *session)
{
stream->is_in_closed = true;
}
+ else if(stream->window_size < SPDYF_INITIAL_WINDOW_SIZE / 2)
+ {
+ //very simple implementation of flow control
+ //when the window's size is under the half of the initial value,
+ //increase it again up to the initial value
+
+ //prepare WINDOW_UPDATE
+ if(SPDY_YES == SPDYF_prepare_window_update(session, stream,
+ SPDYF_INITIAL_WINDOW_SIZE - stream->window_size))
+ {
+ stream->window_size = SPDYF_INITIAL_WINDOW_SIZE;
+ }
+ //else: do it later
+ }
session->status = SPDY_SESSION_STATUS_WAIT_FOR_HEADER;
free(frame);
}
@@ -751,6 +767,47 @@ SPDYF_handler_write_rst_stream (struct SPDY_Session *session)
return SPDY_YES;
}
+
+int
+SPDYF_handler_write_window_update (struct SPDY_Session *session)
+{
+ struct SPDYF_Response_Queue *response_queue = session->response_queue_head;
+ struct SPDYF_Control_Frame control_frame;
+ size_t total_size;
+
+ SPDYF_ASSERT(NULL == session->write_buffer, "the function is called not in the correct moment");
+
+ memcpy(&control_frame, response_queue->control_frame, sizeof(control_frame));
+
+ total_size = sizeof(struct SPDYF_Control_Frame) //SPDY header
+ + 4 // stream id as "subheader"
+ + 4; // delta-window-size as "subheader"
+
+ if(NULL == (session->write_buffer = malloc(total_size)))
+ {
+ return SPDY_NO;
+ }
+ session->write_buffer_beginning = 0;
+ session->write_buffer_offset = 0;
+ session->write_buffer_size = total_size;
+
+ control_frame.length = 8; // always for WINDOW_UPDATE
+ SPDYF_CONTROL_FRAME_HTON(&control_frame);
+
+ //put frame headers to write buffer
+ memcpy(session->write_buffer + session->write_buffer_offset,&control_frame,sizeof(struct SPDYF_Control_Frame));
+ session->write_buffer_offset += sizeof(struct SPDYF_Control_Frame);
+
+ //put stream id and delta-window-size to write buffer
+ memcpy(session->write_buffer + session->write_buffer_offset, response_queue->data, 8);
+ session->write_buffer_offset += 8;
+
+ SPDYF_ASSERT(0 == session->write_buffer_beginning, "bug1");
+ SPDYF_ASSERT(session->write_buffer_offset == session->write_buffer_size, "bug2");
+
+ return SPDY_YES;
+}
+
void
SPDYF_handler_ignore_frame (struct SPDY_Session *session)
@@ -1638,3 +1695,55 @@ SPDYF_prepare_rst_stream (struct SPDY_Session *session,
return SPDY_YES;
}
+
+
+int
+SPDYF_prepare_window_update (struct SPDY_Session *session,
+ struct SPDYF_Stream * stream,
+ int32_t delta_window_size)
+{
+ struct SPDYF_Response_Queue *response_to_queue;
+ struct SPDYF_Control_Frame *control_frame;
+ uint32_t *data;
+
+ SPDYF_ASSERT(NULL != stream, "stream cannot be NULL");
+
+ if(NULL == (response_to_queue = malloc(sizeof(struct SPDYF_Response_Queue))))
+ {
+ return SPDY_NO;
+ }
+ memset(response_to_queue, 0, sizeof(struct SPDYF_Response_Queue));
+
+ if(NULL == (control_frame = malloc(sizeof(struct SPDYF_Control_Frame))))
+ {
+ free(response_to_queue);
+ return SPDY_NO;
+ }
+ memset(control_frame, 0, sizeof(struct SPDYF_Control_Frame));
+
+ if(NULL == (data = malloc(8)))
+ {
+ free(control_frame);
+ free(response_to_queue);
+ return SPDY_NO;
+ }
+ *(data) = HTON31(stream->stream_id);
+ *(data + 1) = HTON31(delta_window_size);
+
+ control_frame->control_bit = 1;
+ control_frame->version = SPDY_VERSION;
+ control_frame->type = SPDY_CONTROL_FRAME_TYPES_WINDOW_UPDATE;
+ control_frame->flags = 0;
+
+ response_to_queue->control_frame = control_frame;
+ response_to_queue->process_response_handler = &SPDYF_handler_write_window_update;
+ response_to_queue->data = data;
+ response_to_queue->data_size = 8;
+ response_to_queue->stream = stream;
+
+ SPDYF_queue_response (response_to_queue,
+ session,
+ -1);
+
+ return SPDY_YES;
+}
diff --git a/src/microspdy/session.h b/src/microspdy/session.h
@@ -172,6 +172,23 @@ SPDYF_prepare_rst_stream (struct SPDY_Session *session,
/**
+ * Prepares WINDOW_UPDATE frame to tell the other party that more
+ * data can be sent on the stream. The frame will be put at the head of
+ * the queue.
+ *
+ * @param session SPDY session
+ * @param stream stream to which the changed window will apply
+ * @param delta_window_size how much the window grows
+ * @return SPDY_NO on memory error or
+ * SPDY_YES on success
+ */
+int
+SPDYF_prepare_window_update (struct SPDY_Session *session,
+ struct SPDYF_Stream * stream,
+ int32_t delta_window_size);
+
+
+/**
* Handler called by session_write to fill the write buffer according to
* the data frame waiting in the response queue.
* When response data is given by user callback, the lib does not know
@@ -233,6 +250,20 @@ SPDYF_handler_write_rst_stream (struct SPDY_Session *session);
/**
+ * Handler called by session_write to fill the write buffer based on the
+ * control frame (WINDOW_UPDATE) waiting in the response queue.
+ *
+ * @param session SPDY session
+ * @return SPDY_NO on error (not enough memory). If
+ * the error is unrecoverable the handler changes session's
+ * status.
+ * SPDY_YES on success
+ */
+int
+SPDYF_handler_write_window_update (struct SPDY_Session *session);
+
+
+/**
* Carefully ignore the full size of frames which are not yet supported
* by the lib.
* TODO Ignoring frames containing compressed bodies means that the
diff --git a/src/microspdy/stream.c b/src/microspdy/stream.c
@@ -104,6 +104,7 @@ SPDYF_stream_new (struct SPDY_Session *session)
stream->flag_unidirectional = (frame->flags & SPDY_SYN_STREAM_FLAG_UNIDIRECTIONAL) != 0;
stream->is_out_closed = stream->flag_unidirectional;
stream->is_server_initiator = false;
+ stream->window_size = SPDYF_INITIAL_WINDOW_SIZE;
//put the stream to the list of streams for the session
DLL_insert(session->streams_head, session->streams_tail, stream);
diff --git a/src/microspdy/structures.h b/src/microspdy/structures.h
@@ -554,6 +554,11 @@ struct SPDYF_Stream
uint32_t assoc_stream_id;
/**
+ * The window of the data within data frames.
+ */
+ uint32_t window_size;
+
+ /**
* Stream priority. 0 is the highest, 7 is the lowest.
*/
uint8_t priority;