aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2012-06-15 15:19:08 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2012-06-15 15:19:08 +0000
commite444070fbb20f49bb55afd5d6158a9b42333557f (patch)
tree7784d9a782e5147601e78a1d1cfaee538b0fb50b
parent819304af72322c391fc3e0bfd441586af851b8da (diff)
downloadgnunet-e444070fbb20f49bb55afd5d6158a9b42333557f.tar.gz
gnunet-e444070fbb20f49bb55afd5d6158a9b42333557f.zip
stream misc fixing
-rw-r--r--src/stream/stream_api.c66
-rw-r--r--src/stream/test_stream_big.c55
2 files changed, 83 insertions, 38 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 5d22ff7b3..63e27ea98 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -471,6 +471,22 @@ static unsigned int default_timeout = 10;
471 471
472 472
473/** 473/**
474 * Function to print the contents of an address location. Used only for debugging
475 *
476 * @param ptr the address location; Should be more than 5 bytes long
477 */
478static void
479debug_print_contents (const void *ptr)
480{
481 /* const char *c; */
482
483 /* c = ptr; */
484 /* LOG (GNUNET_ERROR_TYPE_DEBUG, */
485 /* "--- contents: %u %u %u %u %u\n", c[0], c[1], c[2], c[3], c[4]); */
486}
487
488
489/**
474 * Callback function for sending queued message 490 * Callback function for sending queued message
475 * 491 *
476 * @param cls closure the socket 492 * @param cls closure the socket
@@ -830,6 +846,7 @@ write_data (struct GNUNET_STREAM_Socket *socket)
830 "%s: Placing DATA message with sequence %u in send queue\n", 846 "%s: Placing DATA message with sequence %u in send queue\n",
831 GNUNET_i2s (&socket->other_peer), 847 GNUNET_i2s (&socket->other_peer),
832 ntohl (io_handle->messages[packet]->sequence_number)); 848 ntohl (io_handle->messages[packet]->sequence_number));
849 debug_print_contents(&(io_handle->messages[packet][1]));
833 copy_and_queue_message (socket, 850 copy_and_queue_message (socket,
834 &io_handle->messages[packet]->header, 851 &io_handle->messages[packet]->header,
835 NULL, 852 NULL,
@@ -849,6 +866,7 @@ write_data (struct GNUNET_STREAM_Socket *socket)
849 "%s: Placing DATA message with sequence %u in send queue\n", 866 "%s: Placing DATA message with sequence %u in send queue\n",
850 GNUNET_i2s (&socket->other_peer), 867 GNUNET_i2s (&socket->other_peer),
851 ntohl (io_handle->messages[packet]->sequence_number)); 868 ntohl (io_handle->messages[packet]->sequence_number));
869 debug_print_contents(&(io_handle->messages[packet][1]));
852 copy_and_queue_message (socket, 870 copy_and_queue_message (socket,
853 &io_handle->messages[packet]->header, 871 &io_handle->messages[packet]->header,
854 NULL, 872 NULL,
@@ -910,6 +928,7 @@ call_read_processor (void *cls,
910 LOG (GNUNET_ERROR_TYPE_DEBUG, 928 LOG (GNUNET_ERROR_TYPE_DEBUG,
911 "%s: Calling read processor\n", 929 "%s: Calling read processor\n",
912 GNUNET_i2s (&socket->other_peer)); 930 GNUNET_i2s (&socket->other_peer));
931 debug_print_contents (socket->receive_buffer + socket->copy_offset);
913 read_size = 932 read_size =
914 socket->read_handle->proc (socket->read_handle->proc_cls, 933 socket->read_handle->proc (socket->read_handle->proc_cls,
915 socket->status, 934 socket->status,
@@ -943,11 +962,12 @@ call_read_processor (void *cls,
943 GNUNET_i2s (&socket->other_peer), sequence_increase); 962 GNUNET_i2s (&socket->other_peer), sequence_increase);
944 963
945 /* Shift the data in the receive buffer */ 964 /* Shift the data in the receive buffer */
946 memmove (socket->receive_buffer, 965 socket->receive_buffer =
947 socket->receive_buffer 966 memmove (socket->receive_buffer,
948 + socket->receive_buffer_boundaries[sequence_increase-1], 967 socket->receive_buffer
949 socket->receive_buffer_size 968 + socket->receive_buffer_boundaries[sequence_increase-1],
950 - socket->receive_buffer_boundaries[sequence_increase-1]); 969 socket->receive_buffer_size
970 - socket->receive_buffer_boundaries[sequence_increase-1]);
951 /* Shift the bitmap */ 971 /* Shift the bitmap */
952 socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; 972 socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
953 /* Set read_sequence_number */ 973 /* Set read_sequence_number */
@@ -963,9 +983,18 @@ call_read_processor (void *cls,
963 { 983 {
964 if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) 984 if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
965 { 985 {
966 socket->receive_buffer_boundaries[packet] = 986 uint32_t ahead_buffer_boundary;
967 socket->receive_buffer_boundaries[packet + sequence_increase] 987
968 - offset_increase; 988 ahead_buffer_boundary =
989 socket->receive_buffer_boundaries[packet + sequence_increase];
990 if (0 == ahead_buffer_boundary)
991 socket->receive_buffer_boundaries[packet] = 0;
992 else
993 {
994 GNUNET_assert (offset_increase < ahead_buffer_boundary);
995 socket->receive_buffer_boundaries[packet] =
996 ahead_buffer_boundary - offset_increase;
997 }
969 } 998 }
970 else 999 else
971 socket->receive_buffer_boundaries[packet] = 0; 1000 socket->receive_buffer_boundaries[packet] = 0;
@@ -1130,6 +1159,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
1130 1159
1131 /* Copy Data to buffer */ 1160 /* Copy Data to buffer */
1132 payload = &msg[1]; 1161 payload = &msg[1];
1162 debug_print_contents(payload);
1133 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); 1163 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1134 memcpy (socket->receive_buffer + relative_offset, 1164 memcpy (socket->receive_buffer + relative_offset,
1135 payload, 1165 payload,
@@ -2418,22 +2448,24 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
2418 } 2448 }
2419 else /* We have to call the write continuation callback now */ 2449 else /* We have to call the write continuation callback now */
2420 { 2450 {
2451 struct GNUNET_STREAM_IOWriteHandle *write_handle;
2452
2421 /* Free the packets */ 2453 /* Free the packets */
2422 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) 2454 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2423 { 2455 {
2424 GNUNET_free_non_null (socket->write_handle->messages[packet]); 2456 GNUNET_free_non_null (socket->write_handle->messages[packet]);
2425 } 2457 }
2426 if (NULL != socket->write_handle->write_cont) 2458 write_handle = socket->write_handle;
2427 socket->write_handle->write_cont 2459 socket->write_handle = NULL;
2428 (socket->write_handle->write_cont_cls, 2460 if (NULL != write_handle->write_cont)
2429 socket->status, 2461 write_handle->write_cont (write_handle->write_cont_cls,
2430 socket->write_handle->size); 2462 socket->status,
2463 write_handle->size);
2464 /* We are done with the write handle - Freeing it */
2465 GNUNET_free (write_handle);
2431 LOG (GNUNET_ERROR_TYPE_DEBUG, 2466 LOG (GNUNET_ERROR_TYPE_DEBUG,
2432 "%s: Write completion callback completed\n", 2467 "%s: Write completion callback completed\n",
2433 GNUNET_i2s (&socket->other_peer)); 2468 GNUNET_i2s (&socket->other_peer));
2434 /* We are done with the write handle - Freeing it */
2435 GNUNET_free (socket->write_handle);
2436 socket->write_handle = NULL;
2437 } 2469 }
2438 break; 2470 break;
2439 default: 2471 default:
diff --git a/src/stream/test_stream_big.c b/src/stream/test_stream_big.c
index b3e6fb20c..001c4f67e 100644
--- a/src/stream/test_stream_big.c
+++ b/src/stream/test_stream_big.c
@@ -89,6 +89,8 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
89 GNUNET_STREAM_close (peer1.socket); 89 GNUNET_STREAM_close (peer1.socket);
90 if (NULL != peer2.socket) 90 if (NULL != peer2.socket)
91 GNUNET_STREAM_close (peer2.socket); 91 GNUNET_STREAM_close (peer2.socket);
92 if (NULL != peer2_listen_socket)
93 GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
92 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); 94 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
93 if (0 != abort_task) 95 if (0 != abort_task)
94 { 96 {
@@ -152,7 +154,7 @@ write_completion (void *cls,
152 { 154 {
153 peer->io_write_handle = 155 peer->io_write_handle =
154 GNUNET_STREAM_write (peer->socket, 156 GNUNET_STREAM_write (peer->socket,
155 (void *) data, 157 ((void *) data) + peer->bytes_wrote,
156 DATA_SIZE - peer->bytes_wrote, 158 DATA_SIZE - peer->bytes_wrote,
157 GNUNET_TIME_relative_multiply 159 GNUNET_TIME_relative_multiply
158 (GNUNET_TIME_UNIT_SECONDS, 5), 160 (GNUNET_TIME_UNIT_SECONDS, 5),
@@ -222,9 +224,17 @@ stream_open_cb (void *cls,
222 224
223 225
224/** 226/**
227 * Scheduler call back; to be executed when a new stream is connected
228 * Called from listen connect for peer2
229 */
230static void
231stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
232
233
234/**
225 * Input processor 235 * Input processor
226 * 236 *
227 * @param cls the closure from GNUNET_STREAM_write/read 237 * @param cls peer2
228 * @param status the status of the stream at the time this function is called 238 * @param status the status of the stream at the time this function is called
229 * @param data traffic from the other side 239 * @param data traffic from the other side
230 * @param size the number of bytes available in data read 240 * @param size the number of bytes available in data read
@@ -240,21 +250,23 @@ input_processor (void *cls,
240 struct PeerData *peer = cls; 250 struct PeerData *peer = cls;
241 251
242 GNUNET_assert (GNUNET_STREAM_OK == status); 252 GNUNET_assert (GNUNET_STREAM_OK == status);
253 GNUNET_assert (&peer2 == peer);
243 GNUNET_assert (size < DATA_SIZE); 254 GNUNET_assert (size < DATA_SIZE);
244 GNUNET_assert (memcmp (data + peer->bytes_read, 255 GNUNET_assert (0 == memcmp (((void *)data ) + peer->bytes_read,
245 input_data, 256 input_data, size));
246 size));
247 peer->bytes_read += size; 257 peer->bytes_read += size;
248 258
249 if (peer->bytes_read < DATA_SIZE) 259 if (peer->bytes_read < DATA_SIZE)
250 { 260 {
251 peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) 261 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
252 peer->socket, 262 read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
253 GNUNET_TIME_relative_multiply 263 /* peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) */
254 (GNUNET_TIME_UNIT_SECONDS, 5), 264 /* peer->socket, */
255 &input_processor, 265 /* GNUNET_TIME_relative_multiply */
256 cls); 266 /* (GNUNET_TIME_UNIT_SECONDS, 5), */
257 GNUNET_assert (NULL != peer->io_read_handle); 267 /* &input_processor, */
268 /* cls); */
269 /* GNUNET_assert (NULL != peer->io_read_handle); */
258 } 270 }
259 else 271 else
260 { 272 {
@@ -274,18 +286,17 @@ static void
274stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 286stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
275{ 287{
276 struct GNUNET_STREAM_Socket *socket = cls; 288 struct GNUNET_STREAM_Socket *socket = cls;
289 struct PeerData *peer = cls;
277 290
278 read_task = GNUNET_SCHEDULER_NO_TASK; 291 read_task = GNUNET_SCHEDULER_NO_TASK;
279 GNUNET_assert (socket == peer2.socket); 292 GNUNET_assert (&peer2 == peer);
280 peer2.bytes_read = 0; 293 peer->io_read_handle =
281 GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ 294 GNUNET_STREAM_read (peer->socket,
282 peer2.io_read_handle =
283 GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
284 GNUNET_TIME_relative_multiply 295 GNUNET_TIME_relative_multiply
285 (GNUNET_TIME_UNIT_SECONDS, 10), 296 (GNUNET_TIME_UNIT_SECONDS, 10),
286 &input_processor, 297 &input_processor,
287 cls); 298 peer);
288 GNUNET_assert (NULL != peer2.io_read_handle); 299 GNUNET_assert (NULL != peer->io_read_handle);
289} 300}
290 301
291 302
@@ -311,7 +322,8 @@ stream_listen_cb (void *cls,
311 "Peer connected: %s\n", GNUNET_i2s(initiator)); 322 "Peer connected: %s\n", GNUNET_i2s(initiator));
312 323
313 peer2.socket = socket; 324 peer2.socket = socket;
314 read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, socket); 325 peer2.bytes_read = 0;
326 read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
315 return GNUNET_OK; 327 return GNUNET_OK;
316} 328}
317 329
@@ -347,6 +359,7 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
347 GNUNET_assert (NULL != peer1.socket); 359 GNUNET_assert (NULL != peer1.socket);
348} 360}
349 361
362
350/** 363/**
351 * Initialize framework and start test 364 * Initialize framework and start test
352 */ 365 */
@@ -366,7 +379,7 @@ run (void *cls, char *const *args, const char *cfgfile,
366 379
367 abort_task = 380 abort_task =
368 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 381 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
369 (GNUNET_TIME_UNIT_SECONDS, 30), &do_abort, 382 (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
370 NULL); 383 NULL);
371 384
372 test_task = GNUNET_SCHEDULER_add_now (&test, NULL); 385 test_task = GNUNET_SCHEDULER_add_now (&test, NULL);