aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorBart Polot <bart@net.in.tum.de>2013-07-16 11:21:06 +0000
committerBart Polot <bart@net.in.tum.de>2013-07-16 11:21:06 +0000
commitf52fd7190c7c36abc8b8d20ffe5c7ead7376d8cb (patch)
tree46defc25fbb1eaf03d502c0aae5d7bed780d5826 /src/stream
parentbaab42aeb0b5a6d2a782c9c8e9554908c66fa80c (diff)
downloadgnunet-f52fd7190c7c36abc8b8d20ffe5c7ead7376d8cb.tar.gz
gnunet-f52fd7190c7c36abc8b8d20ffe5c7ead7376d8cb.zip
Remove stream
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/Makefile.am100
-rw-r--r--src/stream/README11
-rw-r--r--src/stream/mesh-test.c31
-rw-r--r--src/stream/mesh.supp58
-rw-r--r--src/stream/perf_stream_api.c1053
-rw-r--r--src/stream/stream.h176
-rw-r--r--src/stream/stream_api.c3806
-rw-r--r--src/stream/test_stream_2peers.c667
-rw-r--r--src/stream/test_stream_2peers_halfclose.c890
-rw-r--r--src/stream/test_stream_big.c429
-rw-r--r--src/stream/test_stream_local.c439
-rw-r--r--src/stream/test_stream_local.conf85
-rw-r--r--src/stream/test_stream_sequence_wraparound.c425
13 files changed, 0 insertions, 8170 deletions
diff --git a/src/stream/Makefile.am b/src/stream/Makefile.am
deleted file mode 100644
index d21f170be..000000000
--- a/src/stream/Makefile.am
+++ /dev/null
@@ -1,100 +0,0 @@
1INCLUDES = -I$(top_srcdir)/src/include
2
3if MINGW
4 WINFLAGS = -Wl,--no-undefined -Wl,--export-all-symbols
5endif
6
7if USE_COVERAGE
8 AM_CFLAGS = --coverage -O0
9 XLIB = -lgcov
10endif
11
12lib_LTLIBRARIES = libgnunetstream.la
13
14libgnunetstream_la_SOURCES = \
15 stream_api.c stream.h
16libgnunetstream_la_LIBADD = \
17 $(top_builddir)/src/mesh/libgnunetmesh.la \
18 $(top_builddir)/src/lockmanager/libgnunetlockmanager.la \
19 $(top_builddir)/src/statistics/libgnunetstatistics.la \
20 $(top_builddir)/src/util/libgnunetutil.la $(XLIB)
21libgnunetstream_la_LDFLAGS = \
22 $(GN_LIB_LDFLAGS) \
23 -version-info 1:0:0
24
25if HAVE_BENCHMARKS
26 STREAM_BENCHMARKS = \
27 perf_stream_api
28endif
29
30if HAVE_TESTING
31check_PROGRAMS = \
32 test_stream_2peers \
33 test_stream_2peers_halfclose \
34 test_stream_local \
35 test_stream_big \
36 test_stream_sequence_wraparound \
37 mesh-test \
38 $(STREAM_BENCHMARKS)
39endif
40
41EXTRA_DIST = test_stream_local.conf
42
43if ENABLE_TEST_RUN
44TESTS = \
45 test_stream_2peers \
46 test_stream_2peers_halfclose \
47 test_stream_local \
48 test_stream_big \
49 test_stream_sequence_wraparound \
50 $(STREAM_BENCHMARKS)
51endif
52
53test_stream_2peers_SOURCES = \
54 test_stream_2peers.c
55test_stream_2peers_LDADD = \
56 $(top_builddir)/src/stream/libgnunetstream.la \
57 $(top_builddir)/src/util/libgnunetutil.la \
58 $(top_builddir)/src/testbed/libgnunettestbed.la
59
60
61test_stream_2peers_halfclose_SOURCES = \
62 test_stream_2peers_halfclose.c
63test_stream_2peers_halfclose_LDADD = \
64 $(top_builddir)/src/stream/libgnunetstream.la \
65 $(top_builddir)/src/util/libgnunetutil.la \
66 $(top_builddir)/src/testbed/libgnunettestbed.la
67
68test_stream_local_SOURCES = \
69 test_stream_local.c
70test_stream_local_LDADD = \
71 $(top_builddir)/src/stream/libgnunetstream.la \
72 $(top_builddir)/src/util/libgnunetutil.la \
73 $(top_builddir)/src/testing/libgnunettesting.la
74
75test_stream_big_SOURCES = \
76 test_stream_big.c
77test_stream_big_LDADD = \
78 $(top_builddir)/src/stream/libgnunetstream.la \
79 $(top_builddir)/src/util/libgnunetutil.la \
80 $(top_builddir)/src/testing/libgnunettesting.la
81
82test_stream_sequence_wraparound_SOURCES = \
83 test_stream_sequence_wraparound.c
84test_stream_sequence_wraparound_LDADD = \
85 $(top_builddir)/src/stream/libgnunetstream.la \
86 $(top_builddir)/src/util/libgnunetutil.la \
87 $(top_builddir)/src/testing/libgnunettesting.la
88
89perf_stream_api_SOURCES = \
90 perf_stream_api.c
91perf_stream_api_LDADD = \
92 $(top_builddir)/src/stream/libgnunetstream.la \
93 $(top_builddir)/src/util/libgnunetutil.la \
94 $(top_builddir)/src/testing/libgnunettesting.la \
95 $(top_builddir)/src/testbed/libgnunettestbed.la
96
97mesh_test_SOURCES = mesh-test.c
98mesh_test_LDADD = \
99 $(top_builddir)/src/mesh/libgnunetmesh.la \
100 $(top_builddir)/src/util/libgnunetutil.la
diff --git a/src/stream/README b/src/stream/README
deleted file mode 100644
index 977ca2d49..000000000
--- a/src/stream/README
+++ /dev/null
@@ -1,11 +0,0 @@
1Stream library provides stream connections between peers in GNUnet. This is a
2convenience library which hides the complexity of dividing data stream into
3packets, transmitting them and retransmitting them in case of communication
4errors.
5
6This library's API are similar to unix PIPE API. The user is expected to open a
7stream to a listening target peer. Once the stream is established, the user can
8use it as a pipe. Any data written into the stream at one peer will be readable
9by the other peer and vice versa.
10
11This library uses mesh API for establishing tunnels between peers. \ No newline at end of file
diff --git a/src/stream/mesh-test.c b/src/stream/mesh-test.c
deleted file mode 100644
index eb9594a9f..000000000
--- a/src/stream/mesh-test.c
+++ /dev/null
@@ -1,31 +0,0 @@
1#include "platform.h"
2#include "gnunet_common.h"
3#include "gnunet_util_lib.h"
4#include "gnunet_mesh_service.h"
5
6static void
7run (void *cls, char *const *args,
8 const char *cfgfile,
9 const struct GNUNET_CONFIGURATION_Handle *cfg)
10{
11 struct GNUNET_MESH_Handle *m;
12
13 m = GNUNET_MESH_connect (cfg, /* the configuration handle */
14 socket, /* cls */
15 NULL, /* No inbound tunnel handler */
16 NULL, /* No in-tunnel cleaner */
17 NULL,
18 NULL); /* We don't get inbound tunnels */
19}
20
21int
22main (int argc, char **argv)
23{
24 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
25 GNUNET_GETOPT_OPTION_END
26 };
27 GNUNET_PROGRAM_run (argc, argv, "mesh-test",
28 "help",
29 options, &run, NULL);
30 return 0;
31}
diff --git a/src/stream/mesh.supp b/src/stream/mesh.supp
deleted file mode 100644
index 7107c0b3d..000000000
--- a/src/stream/mesh.supp
+++ /dev/null
@@ -1,58 +0,0 @@
1{
2 <stream_open_to_mesh>
3 Memcheck:Leak
4 fun:malloc
5 ...
6 fun:GNUNET_PEER_intern
7 fun:GNUNET_MESH_peer_request_connect_add
8 fun:GNUNET_STREAM_open
9 ...
10}
11{
12 <unknown_from_mesh>
13 Memcheck:Leak
14 fun:malloc
15 fun:GNUNET_xmalloc_unchecked_
16 fun:GNUNET_xmalloc_
17 fun:GNUNET_CONTAINER_multihashmap_put
18 fun:GNUNET_PEER_intern
19 fun:process_tunnel_created
20 fun:msg_received
21 fun:receive_task
22 fun:run_ready
23 fun:GNUNET_SCHEDULER_run
24 fun:GNUNET_PROGRAM_run2
25 fun:GNUNET_PROGRAM_run
26}
27{
28 <stream_open_to_mesh_via_request_connect_add>
29 Memcheck:Leak
30 fun:malloc
31 fun:GNUNET_xmalloc_unchecked_
32 fun:GNUNET_xmalloc_
33 fun:GNUNET_CONTAINER_multihashmap_create
34 fun:GNUNET_PEER_intern
35 fun:GNUNET_MESH_peer_request_connect_add
36 fun:GNUNET_STREAM_open
37 fun:stream_ca
38 fun:configuration_receiver
39 fun:handle_peer_config
40 fun:message_handler
41 fun:receive_task
42}
43{
44 <stream_open_to_mesh_via_request_connect_add2>
45 Memcheck:Leak
46 fun:malloc
47 fun:GNUNET_xmalloc_unchecked_
48 fun:GNUNET_xmalloc_
49 fun:GNUNET_xgrow_
50 fun:GNUNET_PEER_intern
51 fun:GNUNET_MESH_peer_request_connect_add
52 fun:GNUNET_STREAM_open
53 fun:stream_ca
54 fun:configuration_receiver
55 fun:handle_peer_config
56 fun:message_handler
57 fun:receive_task
58}
diff --git a/src/stream/perf_stream_api.c b/src/stream/perf_stream_api.c
deleted file mode 100644
index 502bb7aa0..000000000
--- a/src/stream/perf_stream_api.c
+++ /dev/null
@@ -1,1053 +0,0 @@
1 /*
2 This file is part of GNUnet.
3 (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file stream/perf_stream_api.c
23 * @brief performance benchmarks for stream api
24 * @author Sree Harsha Totakura
25 */
26
27#define LOG(kind, ...) \
28 GNUNET_log (kind, __VA_ARGS__);
29
30/****************************************************************************************/
31/* Test is setup into the following major steps: */
32/* 1. Measurements over loopback (1 hop). i.e. we use only one peer and open */
33/* stream connections over loopback. Messages will go through */
34/* STREAM_API->MESH_API->MESH_SERVICE->MESH_API->STREAM_API. */
35/* 2. Measurements over 2 peers (2 hops). We use testbed to create 2 peers, */
36/* connect them and then create stream connections. Messages will go through */
37/* STREAM_API->MESH_API->MESH_SERVICE->CORE1.....CORE2->MESH_API->STREAM_API */
38/* 3. Measurements over 3 peers (3 hops). We use testbed to create 3 peers, */
39/* connect them in a line topology: peer1->peer2->peer3. Messages will go */
40/* through */
41/* STREAM_API->MESH_API->MESH_SERVICE->CORE1..CORE2..CORE3->MESH_API->STREAM_API. */
42/****************************************************************************************/
43
44#include "platform.h"
45#include "gnunet_common.h"
46#include "gnunet_util_lib.h"
47#include "gnunet_testbed_service.h"
48#include "gnunet_stream_lib.h"
49
50/**
51 * Simple struct to keep track of progress, and print a
52 * nice little percentage meter for long running tasks.
53 */
54struct ProgressMeter
55{
56 unsigned int total;
57
58 unsigned int modnum;
59
60 unsigned int dotnum;
61
62 unsigned int completed;
63
64 int print;
65
66 char *startup_string;
67};
68
69
70/**
71 * Steps in testing
72 */
73enum TestStep
74{
75 /**
76 * Single hop loopback testing
77 */
78 TEST_STEP_1_HOP,
79
80 /**
81 * Testing with 2 peers
82 */
83 TEST_STEP_2_HOP,
84
85 /**
86 * Testing with 3 peers
87 */
88 TEST_STEP_3_HOP
89};
90
91
92/**
93 * Structure for holding peer's sockets and IO Handles
94 */
95struct PeerData
96{
97 /**
98 * Peer's stream socket
99 */
100 struct GNUNET_STREAM_Socket *socket;
101
102 /**
103 * Peer's io write handle
104 */
105 struct GNUNET_STREAM_WriteHandle *io_write_handle;
106
107 /**
108 * Peer's io read handle
109 */
110 struct GNUNET_STREAM_ReadHandle *io_read_handle;
111
112 /**
113 * The peer handle when we use the testbed servie
114 */
115 struct GNUNET_TESTBED_Peer *peer;
116
117 /**
118 * Handle to peer specific opearations while using testbed service
119 */
120 struct GNUNET_TESTBED_Operation *op;
121
122 /**
123 * The identity of this peer
124 */
125 struct GNUNET_PeerIdentity id;
126
127 /**
128 * Peer's shutdown handle
129 */
130 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
131
132 /**
133 * Bytes the peer has written
134 */
135 size_t bytes_wrote;
136
137 /**
138 * Byte the peer has read
139 */
140 size_t bytes_read;
141
142 /**
143 * number of packets sent
144 */
145 unsigned int packets_wrote;
146
147 /**
148 * number of packets read
149 */
150 unsigned int packets_read;
151};
152
153
154/**
155 * Enumeration of stages in this testing
156 */
157enum TestStage
158{
159 /**
160 * The initial stage
161 */
162 INIT,
163
164 /**
165 * Uplink testing stage
166 */
167 UPLINK_OK,
168
169 /**
170 * Downlink testing stage
171 */
172 DOWNLINK_OK
173};
174
175
176/**
177 * Maximum size of the data which we will transfer during tests
178 */
179#define DATA_SIZE 5000000 /* 5mB */
180
181/**
182 * Fixed number of packets we send in each direction during each subtest
183 */
184#define MAX_PACKETS 1000
185
186/**
187 * Listen socket of peer2
188 */
189struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
190
191/**
192 * Handle to configuration during TEST_STEP_1_HOP
193 */
194const struct GNUNET_CONFIGURATION_Handle *config;
195
196/**
197 * Handle for the progress meter
198 */
199static struct ProgressMeter *meter;
200
201/**
202 * Placeholder for peer data
203 */
204static struct PeerData peer_data[3];
205
206/**
207 * Handle to common operations while using testbed
208 */
209static struct GNUNET_TESTBED_Operation *common_op;
210
211/**
212 * Task ID for abort task
213 */
214static GNUNET_SCHEDULER_TaskIdentifier abort_task;
215
216/**
217 * Task ID for write task
218 */
219static GNUNET_SCHEDULER_TaskIdentifier write_task;
220
221/**
222 * Task ID for read task
223 */
224static GNUNET_SCHEDULER_TaskIdentifier read_task;
225
226/**
227 * Absolute time when profiling starts
228 */
229static struct GNUNET_TIME_Absolute prof_start_time;
230
231/**
232 * Test time taken for sending the data
233 */
234static struct GNUNET_TIME_Relative prof_time;
235
236/**
237 * Random data block. Should generate data first
238 */
239static uint32_t data[DATA_SIZE / 4];
240
241/**
242 * Payload sizes to test each major test with
243 */
244static uint16_t payload_size[] =
245{ 20, 500, 2000, 7000, 13000, 25000, 30000};//, 50000, 60000, 63000, 64000 };
246
247/**
248 * Current step of testing
249 */
250static enum TestStep test_step;
251
252/**
253 * Index for choosing payload size
254 */
255static unsigned int payload_size_index;
256
257/**
258 * Number of peers we want to create while using the testbed service
259 */
260static int num_peers;
261
262/**
263 * Flag to indicate that the other peer should reset its data read source index
264 */
265static int reset_read;
266
267/**
268 * Testing result of a major test
269 */
270static enum TestStage result;
271
272/**
273 * Create a meter to keep track of the progress of some task.
274 *
275 * @param total the total number of items to complete
276 * @param start_string a string to prefix the meter with (if printing)
277 * @param print GNUNET_YES to print the meter, GNUNET_NO to count
278 * internally only
279 *
280 * @return the progress meter
281 */
282static struct ProgressMeter *
283create_meter (unsigned int total, char *start_string, int print)
284{
285 struct ProgressMeter *ret;
286
287 ret = GNUNET_malloc (sizeof (struct ProgressMeter));
288 ret->print = print;
289 ret->total = total;
290 ret->modnum = total / 4;
291 if (ret->modnum == 0) /* Divide by zero check */
292 ret->modnum = 1;
293 ret->dotnum = (total / 50) + 1;
294 if (start_string != NULL)
295 ret->startup_string = GNUNET_strdup (start_string);
296 else
297 ret->startup_string = GNUNET_strdup ("");
298
299 return ret;
300}
301
302
303/**
304 * Update progress meter (increment by one).
305 *
306 * @param meter the meter to update and print info for
307 *
308 * @return GNUNET_YES if called the total requested,
309 * GNUNET_NO if more items expected
310 */
311static int
312update_meter (struct ProgressMeter *meter)
313{
314 if (meter->print == GNUNET_YES)
315 {
316 if (meter->completed % meter->modnum == 0)
317 {
318 if (meter->completed == 0)
319 {
320 FPRINTF (stdout, "%sProgress: [0%%", meter->startup_string);
321 }
322 else
323 FPRINTF (stdout, "%d%%",
324 (int) (((float) meter->completed / meter->total) * 100));
325 }
326 else if (meter->completed % meter->dotnum == 0)
327 FPRINTF (stdout, "%s", ".");
328
329 if (meter->completed + 1 == meter->total)
330 FPRINTF (stdout, "%d%%]\n", 100);
331 fflush (stdout);
332 }
333 meter->completed++;
334
335 if (meter->completed == meter->total)
336 return GNUNET_YES;
337 if (meter->completed > meter->total)
338 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Progress meter overflow!!\n");
339 return GNUNET_NO;
340}
341
342
343/**
344 * Reset progress meter.
345 *
346 * @param meter the meter to reset
347 *
348 * @return GNUNET_YES if meter reset,
349 * GNUNET_SYSERR on error
350 */
351static int
352reset_meter (struct ProgressMeter *meter)
353{
354 if (meter == NULL)
355 return GNUNET_SYSERR;
356
357 meter->completed = 0;
358 return GNUNET_YES;
359}
360
361
362/**
363 * Release resources for meter
364 *
365 * @param meter the meter to free
366 */
367static void
368free_meter (struct ProgressMeter *meter)
369{
370 GNUNET_free_non_null (meter->startup_string);
371 GNUNET_free (meter);
372}
373
374
375/**
376 * Shutdown nicely
377 */
378static void
379do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
380{
381 switch (test_step)
382 {
383 case TEST_STEP_1_HOP:
384 if (NULL != peer_data[0].socket)
385 GNUNET_STREAM_close (peer_data[0].socket);
386 if (NULL != peer_data[1].socket)
387 GNUNET_STREAM_close (peer_data[1].socket);
388 if (NULL != peer2_listen_socket)
389 GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
390 break;
391 case TEST_STEP_2_HOP:
392 if (NULL != peer_data[1].socket)
393 GNUNET_STREAM_close (peer_data[1].socket);
394 if (NULL != peer_data[0].op)
395 GNUNET_TESTBED_operation_done (peer_data[0].op);
396 if (NULL != peer_data[1].op)
397 GNUNET_TESTBED_operation_done (peer_data[1].op);
398 break;
399 case TEST_STEP_3_HOP:
400 GNUNET_break (0);
401 }
402 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
403 GNUNET_SCHEDULER_cancel (abort_task);
404 if (GNUNET_SCHEDULER_NO_TASK != write_task)
405 GNUNET_SCHEDULER_cancel (write_task);
406 GNUNET_SCHEDULER_shutdown (); /* Shutdown this testcase */
407 if (NULL != meter)
408 {
409 free_meter (meter);
410 meter = NULL;
411 }
412}
413
414
415/**
416 * Something went wrong and timed out. Kill everything and set error flag
417 */
418static void
419do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
420{
421 abort_task = GNUNET_SCHEDULER_NO_TASK;
422 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test: ABORT\n");
423 if (GNUNET_SCHEDULER_NO_TASK != read_task)
424 GNUNET_SCHEDULER_cancel (read_task);
425 result = GNUNET_SYSERR;
426 do_close (cls, tc);
427}
428
429
430/**
431 * Completion callback for shutdown
432 *
433 * @param cls the closure from GNUNET_STREAM_shutdown call
434 * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
435 * SHUT_RDWR)
436 */
437static void
438shutdown_completion (void *cls,
439 int operation)
440{
441 static int shutdowns;
442
443 if (++shutdowns == 1)
444 {
445 peer_data[0].shutdown_handle = NULL;
446 peer_data[1].shutdown_handle = GNUNET_STREAM_shutdown (peer_data[1].socket, SHUT_RDWR,
447 &shutdown_completion, cls);
448 return;
449 }
450 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
451 GNUNET_SCHEDULER_add_now (&do_close, cls);
452}
453
454
455/**
456 * Shutdown sockets gracefully
457 */
458static void
459do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
460{
461 peer_data[0].shutdown_handle = GNUNET_STREAM_shutdown (peer_data[0].socket, SHUT_RDWR,
462 &shutdown_completion, cls);
463}
464
465
466/**
467 * Scheduler call back; to be executed when a new stream is connected
468 * Called from listen connect for peer2
469 */
470static void
471stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
472
473
474/**
475 * Task for calling STREAM_write with a chunk of random data
476 *
477 * @param cls the peer data entity
478 * @param tc the task context
479 */
480static void
481stream_write_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
482
483
484/**
485 * The write completion function; called upon writing some data to stream or
486 * upon error
487 *
488 * @param cls the closure from GNUNET_STREAM_write/read
489 * @param status the status of the stream at the time this function is called
490 * @param size the number of bytes written
491 */
492static void
493write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size)
494{
495 struct PeerData *pdata = cls;
496 double throughput;
497 double prof_time_sec;
498 unsigned int packets_wrote;
499
500 if (GNUNET_STREAM_OK != status)
501 {
502 GNUNET_SCHEDULER_cancel (abort_task);
503 abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
504 return;
505 }
506 GNUNET_assert (size <= DATA_SIZE);
507 packets_wrote = (size + payload_size[payload_size_index] - 1)
508 / payload_size[payload_size_index];
509 pdata->bytes_wrote += size;
510 for (;packets_wrote > 0; packets_wrote--)
511 {
512 update_meter (meter);
513 pdata->packets_wrote++;
514 }
515 if (pdata->packets_wrote < MAX_PACKETS) /* Have more data to send */
516 {
517 size_t write_amount;
518
519 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
520 {
521 GNUNET_SCHEDULER_cancel (abort_task);
522 abort_task =
523 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
524 (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort,
525 NULL);
526 }
527 write_amount = (MAX_PACKETS - pdata->packets_wrote) *
528 payload_size[payload_size_index];
529 if (write_amount > DATA_SIZE)
530 write_amount = DATA_SIZE;
531 reset_read = GNUNET_YES;
532 pdata->io_write_handle = GNUNET_STREAM_write (pdata->socket, data,
533 write_amount,
534 GNUNET_TIME_UNIT_FOREVER_REL,
535 &write_completion, pdata);
536 GNUNET_assert (NULL != pdata->io_write_handle);
537 }
538 else
539 {
540 free_meter (meter);
541 meter = NULL;
542 prof_time = GNUNET_TIME_absolute_get_duration (prof_start_time);
543 prof_time_sec = (((double) prof_time.rel_value)/ ((double) 1000));
544 throughput = ((float) pdata->bytes_wrote) / prof_time_sec;
545 PRINTF ("Throughput %.2f kB/sec\n", throughput / 1000.00);
546 switch (result)
547 {
548 case INIT:
549 result = UPLINK_OK;
550 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
551 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == write_task);
552 pdata->bytes_read = 0;
553 pdata->packets_read = 0;
554 meter = create_meter (MAX_PACKETS, "Testing Downlink\n", GNUNET_YES);
555 read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer_data[0]);
556 write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, &peer_data[1]);
557 break;
558 case UPLINK_OK:
559 result = DOWNLINK_OK;
560 GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
561 break;
562 case DOWNLINK_OK:
563 GNUNET_assert (0);
564 }
565 }
566}
567
568
569/**
570 * Task for calling STREAM_write with a chunk of random data
571 *
572 * @param cls the peer data entity
573 * @param tc the task context
574 */
575static void
576stream_write_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
577{
578 struct PeerData *pdata = cls;
579 size_t write_amount;
580
581 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
582 {
583 GNUNET_SCHEDULER_cancel (abort_task);
584 abort_task =
585 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
586 (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort,
587 NULL);
588 }
589 write_task = GNUNET_SCHEDULER_NO_TASK;
590 prof_start_time = GNUNET_TIME_absolute_get ();
591 pdata->bytes_wrote = 0;
592 pdata->packets_wrote = 0;
593 write_amount = MAX_PACKETS * payload_size[payload_size_index];
594 if (write_amount > DATA_SIZE)
595 write_amount = DATA_SIZE;
596 reset_read = GNUNET_YES;
597 pdata->io_write_handle = GNUNET_STREAM_write (pdata->socket, data,
598 write_amount,
599 GNUNET_TIME_UNIT_FOREVER_REL,
600 &write_completion, pdata);
601 GNUNET_assert (NULL != pdata->io_write_handle);
602}
603
604
605/**
606 * Scheduler call back; to be executed when a new stream is connected
607 * Called from listen connect for peer2
608 */
609static void
610stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
611
612
613/**
614 * Input processor
615 *
616 * @param cls peer2
617 * @param status the status of the stream at the time this function is called
618 * @param data traffic from the other side
619 * @param size the number of bytes available in data read
620 * @return number of bytes of processed from 'data' (any data remaining should be
621 * given to the next time the read processor is called).
622 */
623static size_t
624input_processor (void *cls, enum GNUNET_STREAM_Status status,
625 const void *input_data, size_t size)
626{
627 struct PeerData *pdata = cls;
628
629 if (GNUNET_STREAM_OK != status)
630 {
631 GNUNET_SCHEDULER_cancel (abort_task);
632 abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
633 return 0;
634 }
635 GNUNET_assert (size <= DATA_SIZE);
636 if (GNUNET_YES == reset_read)
637 {
638 pdata->bytes_read = 0;
639 reset_read = GNUNET_NO;
640 }
641 GNUNET_assert ((pdata->bytes_read + size) <= DATA_SIZE);
642 GNUNET_assert (0 == memcmp (((void *)data ) + pdata->bytes_read,
643 input_data, size));
644 pdata->bytes_read += size;
645 pdata->packets_read += (size + payload_size[payload_size_index] - 1)
646 / payload_size[payload_size_index];
647 if (pdata->packets_read < MAX_PACKETS)
648 {
649 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
650 read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, pdata);
651 }
652 else
653 {
654 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reading finished successfully\n");
655 }
656 return size;
657}
658
659
660/**
661 * Scheduler call back; to be executed when a new stream is connected
662 * Called from listen connect for peer2
663 */
664static void
665stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
666{
667 struct PeerData *pdata = cls;
668
669 read_task = GNUNET_SCHEDULER_NO_TASK;
670 pdata->io_read_handle =
671 GNUNET_STREAM_read (pdata->socket, GNUNET_TIME_UNIT_FOREVER_REL,
672 &input_processor, pdata);
673 GNUNET_assert (NULL != pdata->io_read_handle);
674}
675
676
677/**
678 * Functions of this type are called upon new stream connection from other peers
679 *
680 * @param cls the closure from GNUNET_STREAM_listen
681 * @param socket the socket representing the stream
682 * @param initiator the identity of the peer who wants to establish a stream
683 * with us
684 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
685 * stream (the socket will be invalid after the call)
686 */
687static int
688stream_listen_cb (void *cls, struct GNUNET_STREAM_Socket *socket,
689 const struct GNUNET_PeerIdentity *initiator)
690{
691 struct PeerData *pdata = cls;
692
693
694 if ((NULL == socket) || (NULL == initiator))
695 {
696 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
697 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
698 GNUNET_SCHEDULER_cancel (abort_task);
699 abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
700 return GNUNET_OK;
701 }
702 GNUNET_assert (NULL != socket);
703 GNUNET_assert (pdata == &peer_data[1]);
704 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer connected: %s\n",
705 GNUNET_i2s(initiator));
706 pdata->socket = socket;
707 pdata->bytes_read = 0;
708 read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, pdata);
709 return GNUNET_OK;
710}
711
712
713/**
714 * Function executed after stream has been established
715 *
716 * @param cls the closure from GNUNET_STREAM_open
717 * @param socket socket to use to communicate with the other side (read/write)
718 */
719static void
720stream_open_cb (void *cls,
721 struct GNUNET_STREAM_Socket *socket)
722{
723 struct PeerData *pdata = cls;
724
725 GNUNET_assert (socket == pdata->socket);
726 meter = create_meter (MAX_PACKETS, "Testing Uplink\n", GNUNET_YES);
727 write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, pdata);
728}
729
730
731/**
732 * Listen success callback; connects a peer to stream as client
733 */
734static void
735stream_connect (void)
736{
737 peer_data[0].socket =
738 GNUNET_STREAM_open (config, &peer_data[1].id, 10, &stream_open_cb,
739 &peer_data[0],
740 GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
741 payload_size[payload_size_index],
742 GNUNET_STREAM_OPTION_END);
743 GNUNET_assert (NULL != peer_data[0].socket);
744}
745
746
747/**
748 * Initialize framework and start test
749 *
750 * @param cls closure
751 * @param cfg configuration of the peer that was started
752 * @param peer identity of the peer that was created
753 */
754static void
755run (void *cls,
756 const struct GNUNET_CONFIGURATION_Handle *cfg,
757 struct GNUNET_TESTING_Peer *peer)
758{
759 struct GNUNET_PeerIdentity id;
760
761 GNUNET_TESTING_peer_get_identity (peer, &id);
762 config = cfg;
763 peer2_listen_socket =
764 GNUNET_STREAM_listen (config, 10, &stream_listen_cb, &peer_data[1],
765 GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
766 &stream_connect,
767 GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
768 payload_size[payload_size_index],
769 GNUNET_STREAM_OPTION_END);
770 GNUNET_assert (NULL != peer2_listen_socket);
771 peer_data[0].id = id;
772 peer_data[1].id = id;
773 abort_task =
774 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
775 (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort,
776 NULL);
777}
778
779
780/**
781 * Adapter function called to establish a connection to
782 * a service.
783 *
784 * @param cls closure
785 * @param cfg configuration of the peer to connect to; will be available until
786 * GNUNET_TESTBED_operation_done() is called on the operation returned
787 * from GNUNET_TESTBED_service_connect()
788 * @return service handle to return in 'op_result', NULL on error
789 */
790static void *
791stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg);
792
793
794/**
795 * Adapter function called to destroy a connection to
796 * a service.
797 *
798 * @param cls closure
799 * @param op_result service handle returned from the connect adapter
800 */
801static void
802stream_da (void *cls, void *op_result)
803{
804 if (&peer_data[1] == cls)
805 {
806 GNUNET_STREAM_listen_close (op_result);
807 return;
808 }
809 else if (&peer_data[0] == cls)
810 {
811 GNUNET_STREAM_close (op_result);
812 return;
813 }
814 GNUNET_assert (0);
815}
816
817
818/**
819 * Listen success callback; connects a peer to stream as client. Called from
820 * testbed stream_ca
821 */
822static void
823stream_connect2 (void)
824{
825 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n");
826 peer_data[0].op =
827 GNUNET_TESTBED_service_connect (&peer_data[0], peer_data[0].peer,
828 "stream", NULL, NULL, stream_ca,
829 stream_da, &peer_data[0]);
830}
831
832
833/**
834 * Adapter function called to establish a connection to
835 * a service.
836 *
837 * @param cls closure
838 * @param cfg configuration of the peer to connect to; will be available until
839 * GNUNET_TESTBED_operation_done() is called on the operation returned
840 * from GNUNET_TESTBED_service_connect()
841 * @return service handle to return in 'op_result', NULL on error
842 */
843static void *
844stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
845{
846 struct PeerData *pdata = cls;
847
848 if (&peer_data[1] == pdata)
849 {
850 peer2_listen_socket = NULL;
851 peer2_listen_socket =
852 GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, &peer_data[1],
853 GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
854 &stream_connect2,
855 GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
856 payload_size[payload_size_index],
857 GNUNET_STREAM_OPTION_END);
858 GNUNET_assert (NULL != peer2_listen_socket);
859 return peer2_listen_socket;
860 }
861 if (&peer_data[0] == pdata)
862 {
863 pdata->socket =
864 GNUNET_STREAM_open (cfg, &peer_data[1].id, 10, &stream_open_cb,
865 &peer_data[0],
866 GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
867 payload_size[payload_size_index],
868 GNUNET_STREAM_OPTION_END);
869 GNUNET_assert (NULL != pdata->socket);
870 return pdata->socket;
871 }
872 GNUNET_assert (0);
873 return NULL;
874}
875
876
877/**
878 * Callback to be called when the requested peer information is available
879 *
880 * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
881 * @param op the operation this callback corresponds to
882 * @param pinfo the result; will be NULL if the operation has failed
883 * @param emsg error message if the operation has failed; will be NULL if the
884 * operation is successfull
885 */
886static void
887peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op,
888 const struct GNUNET_TESTBED_PeerInformation *pinfo,
889 const char *emsg)
890{
891 struct PeerData *pdata = cb_cls;
892
893 GNUNET_assert (NULL == emsg);
894 GNUNET_assert (common_op == op);
895 GNUNET_assert (NULL != pdata);
896 memcpy (&pdata->id, pinfo->result.id, sizeof (struct GNUNET_PeerIdentity));
897 GNUNET_TESTBED_operation_done (op);
898 if (pdata == &peer_data[0])
899 {
900 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n",
901 GNUNET_i2s (&pdata->id));
902 common_op = GNUNET_TESTBED_peer_get_information (peer_data[1].peer,
903 GNUNET_TESTBED_PIT_IDENTITY,
904 &peerinfo_cb, &peer_data[1]);
905 }
906 else if (pdata == &peer_data[1])
907 {
908 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n",
909 GNUNET_i2s (&pdata->id));
910 if (TEST_STEP_2_HOP == test_step)
911 peer_data[1].op =
912 GNUNET_TESTBED_service_connect (&peer_data[1], peer_data[1].peer,
913 "stream", NULL, NULL, stream_ca,
914 stream_da, &peer_data[1]);
915 else
916 GNUNET_break (0); /* FIXME: 3 hop test case here... */
917 }
918}
919
920
921/**
922 * Controller event callback
923 *
924 * @param cls NULL
925 * @param event the controller event
926 */
927static void
928controller_event_cb (void *cls,
929 const struct GNUNET_TESTBED_EventInformation *event)
930{
931 switch (event->type)
932 {
933 case GNUNET_TESTBED_ET_OPERATION_FINISHED:
934 if (NULL != event->details.operation_finished.emsg)
935 {
936 FPRINTF (stderr, "Error while expecting an operation to succeed:%s \n",
937 event->details.operation_finished.emsg);
938 GNUNET_assert (0);
939 }
940 break;
941 default:
942 GNUNET_assert (0);
943 }
944}
945
946
947/**
948 * Signature of a main function for a testcase.
949 *
950 * @param cls closure
951 * @param num_peers number of peers in 'peers'
952 * @param peers handle to peers run in the testbed
953 * @param links_succeeded the number of overlay link connection attempts that
954 * succeeded
955 * @param links_failed the number of overlay link connection attempts that
956 * failed
957 */
958static void
959test_master (void *cls, unsigned int num_peers_,
960 struct GNUNET_TESTBED_Peer **peers,
961 unsigned int links_succeeded,
962 unsigned int links_failed)
963{
964 GNUNET_assert (NULL != peers);
965 GNUNET_assert (NULL != peers[0]);
966 GNUNET_assert (NULL != peers[1]);
967 GNUNET_assert (num_peers_ == num_peers);
968 peer_data[0].peer = peers[0];
969 peer_data[1].peer = peers[1];
970 if (2 == num_peers)
971 /* Get the peer identity and configuration of peers */
972 common_op =
973 GNUNET_TESTBED_peer_get_information (peer_data[0].peer,
974 GNUNET_TESTBED_PIT_IDENTITY,
975 &peerinfo_cb, &peer_data[0]);
976 else
977 GNUNET_break (0);
978 abort_task =
979 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
980 (GNUNET_TIME_UNIT_SECONDS, 120), &do_abort,
981 NULL);
982}
983
984
985/**
986 * Main function
987 */
988int main (int argc, char **argv)
989{
990 char *test_name = "perf_stream_api";
991 char *cfg_file = "test_stream_local.conf";
992 uint64_t event_mask;
993 unsigned int count;
994 int ret;
995
996 meter = create_meter ((sizeof (data) / 4), "Generating random data\n", GNUNET_YES);
997 for (count=0; count < (sizeof (data) / 4); count++)
998 {
999 data[count] = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1000 UINT32_MAX);
1001 update_meter (meter);
1002 }
1003 reset_meter (meter);
1004 free_meter (meter);
1005 meter = NULL;
1006 test_step = TEST_STEP_1_HOP;
1007 for (payload_size_index = 0;
1008 payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
1009 payload_size_index++)
1010 {
1011 PRINTF ("\nTesting over loopback with payload size %hu\n",
1012 payload_size[payload_size_index]);
1013 (void) memset (peer_data, 0, sizeof (peer_data));
1014 result = INIT;
1015 reset_read = GNUNET_NO;
1016 ret = GNUNET_TESTING_peer_run (test_name, cfg_file, &run, NULL);
1017 if ((0 != ret) || (DOWNLINK_OK != result))
1018 goto return_fail;
1019 }
1020 test_step = TEST_STEP_2_HOP;
1021 num_peers = 2;
1022 event_mask = 0;
1023 event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
1024 for (payload_size_index = 0;
1025 payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
1026 payload_size_index++)
1027 {
1028 PRINTF ("\nTesting over 1 hop with payload size %hu\n",
1029 payload_size[payload_size_index]);
1030 (void) memset (peer_data, 0, sizeof (peer_data));
1031 result = INIT;
1032 reset_read = GNUNET_NO;
1033 (void) GNUNET_TESTBED_test_run (test_name, cfg_file, num_peers, event_mask,
1034 &controller_event_cb, NULL, &test_master,
1035 NULL);
1036 if (DOWNLINK_OK != result)
1037 goto return_fail;
1038 }
1039 test_step = TEST_STEP_3_HOP;
1040 for (payload_size_index = 0;
1041 payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
1042 payload_size_index++)
1043 {
1044 /* Initialize testbed here */
1045 }
1046 return 0;
1047
1048 return_fail:
1049 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test failed\n");
1050 return 1;
1051}
1052
1053/* end of perf_stream_api.c */
diff --git a/src/stream/stream.h b/src/stream/stream.h
deleted file mode 100644
index d9ba09f11..000000000
--- a/src/stream/stream.h
+++ /dev/null
@@ -1,176 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file stream/stream.h
23 * @brief P2P protocol for the stream connections
24 * @author Sree Harsha Totakura
25 */
26
27#ifndef STREAM_H
28#define STREAM_H
29
30#ifdef __cplusplus
31extern "C"
32{
33#if 0 /* keep Emacsens' auto-indent happy */
34}
35#endif
36#endif
37
38#include "gnunet_util_lib.h"
39
40GNUNET_NETWORK_STRUCT_BEGIN
41
42
43/**
44 * The HELLO message to begin the handshake
45 */
46struct GNUNET_STREAM_HelloMessage
47{
48 /**
49 * Type is GNUNET_MESSAGE_TYPE_STREAM_HELLO
50 */
51 struct GNUNET_MessageHeader header;
52
53 /**
54 * The application port number
55 */
56 uint64_t port GNUNET_PACKED;;
57};
58
59/**
60 * The Data message, should be prefixed with stream header with its type set to
61 * GNUNET_STREAM_Data
62 */
63struct GNUNET_STREAM_DataMessage
64{
65
66 /**
67 * Type is GNUNET_MESSAGE_TYPE_STREAM_DATA
68 */
69 struct GNUNET_MessageHeader header;
70
71 /**
72 * Sequence number; starts with a random value. (Just in case
73 * someone breaks mesh and is able to try to do a Sequence
74 * Prediction Attack on us.)
75 */
76 uint32_t sequence_number GNUNET_PACKED;
77
78 /**
79 * number of milliseconds to the soft deadline for sending acknowledgement
80 * measured from the time this message is received. It is optimal for the
81 * communication to send the ack within the soft deadline
82 */
83 struct GNUNET_TIME_RelativeNBO ack_deadline;
84
85 /**
86 * Offset of the packet in the overall stream, modulo 2^32; allows
87 * the receiver to calculate where in the destination buffer the
88 * message should be placed. In network byte order.
89 */
90 uint32_t offset GNUNET_PACKED;
91
92 /**
93 * The data should be appended here
94 */
95};
96
97
98/**
99 * Number of bits in GNUNET_STREAM_AckBitmap
100 */
101#define GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH 64
102
103/**
104 * The Selective Acknowledgement Bitmap
105 */
106typedef uint64_t GNUNET_STREAM_AckBitmap;
107
108
109/**
110 * The Acknowledgment Message to confirm receipt of DATA.
111 */
112struct GNUNET_STREAM_AckMessage
113{
114
115 /**
116 * Type is GNUNET_MESSAGE_TYPE_STREAM_ACK
117 */
118 struct GNUNET_MessageHeader header;
119
120 /**
121 * The sequence number of the next Data Message receiver is
122 * anticipating. Data messages less than this number are received by receiver
123 */
124 uint32_t base_sequence_number GNUNET_PACKED;
125
126 /**
127 * The Selective Acknowledgement Bitmap. Computed relative to the base_seq
128 * (bit n corresponds to the Data message with sequence number base_seq+n)
129 */
130 GNUNET_STREAM_AckBitmap bitmap GNUNET_PACKED;
131
132 /**
133 * Available buffer space past the last acknowledged buffer (for flow control),
134 * in bytes.
135 */
136 uint32_t receive_window_remaining GNUNET_PACKED;
137};
138
139
140/**
141 * Message for Acknowledging HELLO
142 */
143struct GNUNET_STREAM_HelloAckMessage
144{
145 /**
146 * The stream message header
147 */
148 struct GNUNET_MessageHeader header;
149
150 /**
151 * The selected sequence number. Following data tranmissions from the sender
152 * start with this sequence
153 */
154 uint32_t sequence_number GNUNET_PACKED;
155
156 /**
157 * The size(in bytes) of the receive window on the peer sending this message
158 *
159 * FIXME: Remove if not needed
160 */
161 uint32_t receiver_window_size GNUNET_PACKED;
162};
163
164GNUNET_NETWORK_STRUCT_END
165
166
167#if 0 /** keep Emacsens' auto-indent happy */
168{
169#endif
170#ifdef __cplusplus
171}
172#endif
173
174#endif /* STREAM.H */
175
176/* End of stream.h */
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
deleted file mode 100644
index 3647ba21d..000000000
--- a/src/stream/stream_api.c
+++ /dev/null
@@ -1,3806 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/* TODO:
22 *
23 * Checks for matching the sender and socket->other_peer in server
24 * message handlers
25 *
26 * Add code for write io timeout
27 *
28 * Include retransmission for control messages
29 **/
30
31/**
32 * @file stream/stream_api.c
33 * @brief Implementation of the stream library
34 * @author Sree Harsha Totakura
35 */
36
37#include "platform.h"
38#include "gnunet_common.h"
39#include "gnunet_util_lib.h"
40#include "gnunet_lockmanager_service.h"
41#include "gnunet_statistics_service.h"
42#include "gnunet_stream_lib.h"
43#include "stream.h"
44
45#define STREAM_PORT 4242
46
47/**
48 * Generic logging shorthand
49 */
50#define LOG(kind,...) \
51 GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
52
53/**
54 * Debug logging shorthand
55 */
56#define LOG_DEBUG(...) \
57 LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
58
59/**
60 * Time in relative seconds shorthand
61 */
62#define TIME_REL_SECS(sec) \
63 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
64
65/**
66 * The maximum packet size of a stream packet
67 */
68#define DEFAULT_MAX_PAYLOAD_SIZE 64000
69
70/**
71 * Receive buffer
72 */
73#define RECEIVE_BUFFER_SIZE 4096000
74
75/**
76 * states in the Protocol
77 */
78enum State
79 {
80 /**
81 * Client initialization state
82 */
83 STATE_INIT,
84
85 /**
86 * Listener initialization state
87 */
88 STATE_LISTEN,
89
90 /**
91 * Pre-connection establishment state
92 */
93 STATE_HELLO_WAIT,
94
95 /**
96 * State where a connection has been established
97 */
98 STATE_ESTABLISHED,
99
100 /**
101 * State where the socket is closed on our side and waiting to be ACK'ed
102 */
103 STATE_RECEIVE_CLOSE_WAIT,
104
105 /**
106 * State where the socket is closed for reading
107 */
108 STATE_RECEIVE_CLOSED,
109
110 /**
111 * State where the socket is closed on our side and waiting to be ACK'ed
112 */
113 STATE_TRANSMIT_CLOSE_WAIT,
114
115 /**
116 * State where the socket is closed for writing
117 */
118 STATE_TRANSMIT_CLOSED,
119
120 /**
121 * State where the socket is closed on our side and waiting to be ACK'ed
122 */
123 STATE_CLOSE_WAIT,
124
125 /**
126 * State where the socket is closed
127 */
128 STATE_CLOSED
129 };
130
131
132/**
133 * Functions of this type are called when a message is written
134 *
135 * @param cls the closure from queue_message
136 * @param socket the socket the written message was bound to
137 */
138typedef void (*SendFinishCallback) (void *cls,
139 struct GNUNET_STREAM_Socket *socket);
140
141
142/**
143 * The send message queue
144 */
145struct MessageQueue
146{
147 /**
148 * The message
149 */
150 struct GNUNET_MessageHeader *message;
151
152 /**
153 * Callback to be called when the message is sent
154 */
155 SendFinishCallback finish_cb;
156
157 /**
158 * The closure for finish_cb
159 */
160 void *finish_cb_cls;
161
162 /**
163 * The next message in queue. Should be NULL in the last message
164 */
165 struct MessageQueue *next;
166
167 /**
168 * The next message in queue. Should be NULL in the first message
169 */
170 struct MessageQueue *prev;
171};
172
173
174/**
175 * The STREAM Socket Handler
176 */
177struct GNUNET_STREAM_Socket
178{
179 /**
180 * The mesh handle
181 */
182 struct GNUNET_MESH_Handle *mesh;
183
184 /**
185 * Handle to statistics
186 */
187 struct GNUNET_STATISTICS_Handle *stat_handle;
188
189 /**
190 * The mesh tunnel handle
191 */
192 struct GNUNET_MESH_Tunnel *tunnel;
193
194 /**
195 * Stream open closure
196 */
197 void *open_cls;
198
199 /**
200 * Stream open callback
201 */
202 GNUNET_STREAM_OpenCallback open_cb;
203
204 /**
205 * The current transmit handle (if a pending transmit request exists)
206 */
207 struct GNUNET_MESH_TransmitHandle *transmit_handle;
208
209 /**
210 * The current message associated with the transmit handle
211 */
212 struct MessageQueue *queue_head;
213
214 /**
215 * The queue tail, should always point to the last message in queue
216 */
217 struct MessageQueue *queue_tail;
218
219 /**
220 * The write IO_handle associated with this socket
221 */
222 struct GNUNET_STREAM_WriteHandle *write_handle;
223
224 /**
225 * The read IO_handle associated with this socket
226 */
227 struct GNUNET_STREAM_ReadHandle *read_handle;
228
229 /**
230 * The shutdown handle associated with this socket
231 */
232 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
233
234 /**
235 * Buffer for storing received messages
236 */
237 void *receive_buffer;
238
239 /**
240 * The listen socket from which this socket is derived. Should be NULL if it
241 * is not a derived socket
242 */
243 struct GNUNET_STREAM_ListenSocket *lsocket;
244
245 /**
246 * The peer identity of the peer at the other end of the stream
247 */
248 struct GNUNET_PeerIdentity other_peer;
249
250 /**
251 * The Acknowledgement Bitmap
252 */
253 GNUNET_STREAM_AckBitmap ack_bitmap;
254
255 /**
256 * Task identifier for retransmission task after timeout
257 */
258 GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
259
260 /**
261 * Task identifier for retransmission of control messages
262 */
263 GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
264
265 /**
266 * The task for sending timely Acks
267 */
268 GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
269
270 /**
271 * Retransmission timeout
272 */
273 struct GNUNET_TIME_Relative retransmit_timeout;
274
275 /**
276 * Time when the Acknowledgement was queued
277 */
278 struct GNUNET_TIME_Absolute ack_time_registered;
279
280 /**
281 * Queued Acknowledgement deadline
282 */
283 struct GNUNET_TIME_Relative ack_time_deadline;
284
285 /**
286 * Mesh transmit timeout
287 */
288 struct GNUNET_TIME_Relative mesh_retry_timeout;
289
290 /**
291 * Data retransmission timeout
292 */
293 struct GNUNET_TIME_Relative data_retransmit_timeout;
294
295 /**
296 * The state of the protocol associated with this socket
297 */
298 enum State state;
299
300 /**
301 * Whether testing mode is active or not
302 */
303 int testing_active;
304
305 /**
306 * Is receive closed
307 */
308 int receive_closed;
309
310 /**
311 * Is transmission closed
312 */
313 int transmit_closed;
314
315 /**
316 * The application port number
317 */
318 uint32_t port;
319
320 /**
321 * The write sequence number to be set incase of testing
322 */
323 uint32_t testing_set_write_sequence_number_value;
324
325 /**
326 * Write sequence number. Set to random when sending HELLO(client) and
327 * HELLO_ACK(server)
328 */
329 uint32_t write_sequence_number;
330
331 /**
332 * Read sequence number. This number's value is determined during handshake
333 */
334 uint32_t read_sequence_number;
335
336 /**
337 * The receiver buffer size
338 */
339 uint32_t receive_buffer_size;
340
341 /**
342 * The receiver buffer boundaries
343 */
344 uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
345
346 /**
347 * receiver's available buffer after the last acknowledged packet
348 */
349 uint32_t receiver_window_available;
350
351 /**
352 * The offset pointer used during write operation
353 */
354 uint32_t write_offset;
355
356 /**
357 * The offset after which we are expecting data
358 */
359 uint32_t read_offset;
360
361 /**
362 * The offset upto which user has read from the received buffer
363 */
364 uint32_t copy_offset;
365
366 /**
367 * The maximum size of the data message payload this stream handle can send
368 */
369 uint16_t max_payload_size;
370};
371
372
373/**
374 * A socket for listening
375 */
376struct GNUNET_STREAM_ListenSocket
377{
378 /**
379 * The mesh handle
380 */
381 struct GNUNET_MESH_Handle *mesh;
382
383 /**
384 * Handle to statistics
385 */
386 struct GNUNET_STATISTICS_Handle *stat_handle;
387
388 /**
389 * Our configuration
390 */
391 struct GNUNET_CONFIGURATION_Handle *cfg;
392
393 /**
394 * Handle to the lock manager service
395 */
396 struct GNUNET_LOCKMANAGER_Handle *lockmanager;
397
398 /**
399 * The active LockingRequest from lockmanager
400 */
401 struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
402
403 /**
404 * Callback to call after acquring a lock and listening
405 */
406 GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
407
408 /**
409 * The callback function which is called after successful opening socket
410 */
411 GNUNET_STREAM_ListenCallback listen_cb;
412
413 /**
414 * The call back closure
415 */
416 void *listen_cb_cls;
417
418 /**
419 * The service port
420 */
421 uint32_t port;
422
423 /**
424 * The id of the lockmanager timeout task
425 */
426 GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
427
428 /**
429 * The retransmit timeout
430 */
431 struct GNUNET_TIME_Relative retransmit_timeout;
432
433 /**
434 * Listen enabled?
435 */
436 int listening;
437
438 /**
439 * Whether testing mode is active or not
440 */
441 int testing_active;
442
443 /**
444 * The write sequence number to be set incase of testing
445 */
446 uint32_t testing_set_write_sequence_number_value;
447
448 /**
449 * The maximum size of the data message payload this stream handle can send
450 */
451 uint16_t max_payload_size;
452
453};
454
455
456/**
457 * The IO Write Handle
458 */
459struct GNUNET_STREAM_WriteHandle
460{
461 /**
462 * The socket to which this write handle is associated
463 */
464 struct GNUNET_STREAM_Socket *socket;
465
466 /**
467 * The write continuation callback
468 */
469 GNUNET_STREAM_CompletionContinuation write_cont;
470
471 /**
472 * Write continuation closure
473 */
474 void *write_cont_cls;
475
476 /**
477 * The packet_buffers associated with this Handle
478 */
479 struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
480
481 /**
482 * The bitmap of this IOHandle; Corresponding bit for a message is set when
483 * it has been acknowledged by the receiver
484 */
485 GNUNET_STREAM_AckBitmap ack_bitmap;
486
487 /**
488 * Number of bytes in this write handle
489 */
490 size_t size;
491
492 /**
493 * Number of packets already transmitted from this IO handle. Retransmitted
494 * packets are not taken into account here. This is used to determine which
495 * packets account for retransmission and which packets occupy buffer space at
496 * the receiver.
497 */
498 unsigned int packets_sent;
499
500 /**
501 * The maximum of the base numbers of the received acks
502 */
503 uint32_t max_ack_base_num;
504
505};
506
507
508/**
509 * The IO Read Handle
510 */
511struct GNUNET_STREAM_ReadHandle
512{
513 /**
514 * The socket to which this read handle is associated
515 */
516 struct GNUNET_STREAM_Socket *socket;
517
518 /**
519 * Callback for the read processor
520 */
521 GNUNET_STREAM_DataProcessor proc;
522
523 /**
524 * The closure pointer for the read processor callback
525 */
526 void *proc_cls;
527
528 /**
529 * Task identifier for the read io timeout task
530 */
531 GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
532
533 /**
534 * Task scheduled to continue a read operation.
535 */
536 GNUNET_SCHEDULER_TaskIdentifier read_task_id;
537
538 /**
539 * Task scheduled from GNUNET_STREAM_read() to lookup the ACK bitmap and call
540 * the read processor task
541 */
542 GNUNET_SCHEDULER_TaskIdentifier probe_data_availability_task_id;
543};
544
545
546/**
547 * Handle for Shutdown
548 */
549struct GNUNET_STREAM_ShutdownHandle
550{
551 /**
552 * The socket associated with this shutdown handle
553 */
554 struct GNUNET_STREAM_Socket *socket;
555
556 /**
557 * Shutdown completion callback
558 */
559 GNUNET_STREAM_ShutdownCompletion completion_cb;
560
561 /**
562 * Closure for completion callback
563 */
564 void *completion_cls;
565
566 /**
567 * Close message retransmission task id
568 */
569 GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
570
571 /**
572 * Task scheduled to call the shutdown continuation callback
573 */
574 GNUNET_SCHEDULER_TaskIdentifier call_cont_task_id;
575
576 /**
577 * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
578 */
579 int operation;
580};
581
582
583/**
584 * Collection of the state necessary to read and write gnunet messages
585 * to a stream socket. Should be used as closure for stream_data_processor.
586 */
587struct MQStreamState
588{
589 /**
590 * Message stream tokenizer for the data received from the
591 * stream socket.
592 */
593 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
594
595 /**
596 * The stream socket to use for receiving and transmitting
597 * messages with the message queue.
598 */
599 struct GNUNET_STREAM_Socket *socket;
600
601 /**
602 * Current read handle, NULL if no read active.
603 */
604 struct GNUNET_STREAM_ReadHandle *rh;
605
606 /**
607 * Current write handle, NULL if no write active.
608 */
609 struct GNUNET_STREAM_WriteHandle *wh;
610};
611
612
613
614/**
615 * Default value in seconds for various timeouts
616 */
617static const unsigned int default_timeout = 10;
618
619/**
620 * The domain name for locks we use here
621 */
622static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
623
624/**
625 * Callback function for sending queued message
626 *
627 * @param cls closure the socket
628 * @param size number of bytes available in buf
629 * @param buf where the callee should write the message
630 * @return number of bytes written to buf
631 */
632static size_t
633send_message_notify (void *cls, size_t size, void *buf)
634{
635 struct GNUNET_STREAM_Socket *socket = cls;
636 struct MessageQueue *head;
637 size_t ret;
638
639 socket->transmit_handle = NULL; /* Remove the transmit handle */
640 head = socket->queue_head;
641 if (NULL == head)
642 return 0; /* just to be safe */
643 if (0 == size) /* request timed out */
644 {
645 socket->mesh_retry_timeout = GNUNET_TIME_STD_BACKOFF
646 (socket->mesh_retry_timeout);
647 LOG (GNUNET_ERROR_TYPE_DEBUG,
648 "%s: Message sending to MESH timed out. Retrying in %s \n",
649 GNUNET_i2s (&socket->other_peer),
650 GNUNET_STRINGS_relative_time_to_string (socket->mesh_retry_timeout,
651 GNUNET_YES));
652 socket->transmit_handle =
653 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
654 GNUNET_NO, /* Corking */
655 socket->mesh_retry_timeout,
656 ntohs (head->message->size),
657 &send_message_notify,
658 socket);
659 return 0;
660 }
661 ret = ntohs (head->message->size);
662 GNUNET_assert (size >= ret);
663 memcpy (buf, head->message, ret);
664 if (NULL != head->finish_cb)
665 {
666 head->finish_cb (head->finish_cb_cls, socket);
667 }
668 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
669 socket->queue_tail,
670 head);
671 GNUNET_free (head->message);
672 GNUNET_free (head);
673 if (NULL != socket->transmit_handle)
674 return ret; /* 'finish_cb' might have triggered message already! */
675 head = socket->queue_head;
676 if (NULL != head) /* more pending messages to send */
677 {
678 socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
679 socket->transmit_handle =
680 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
681 GNUNET_NO, /* Corking */
682 socket->mesh_retry_timeout,
683 ntohs (head->message->size),
684 &send_message_notify,
685 socket);
686 }
687 return ret;
688}
689
690
691/**
692 * Queues a message for sending using the mesh connection of a socket
693 *
694 * @param socket the socket whose mesh connection is used
695 * @param message the message to be sent
696 * @param finish_cb the callback to be called when the message is sent
697 * @param finish_cb_cls the closure for the callback
698 * @param urgent set to GNUNET_YES to add the message to the beginning of the
699 * queue; GNUNET_NO to add at the tail
700 */
701static void
702queue_message (struct GNUNET_STREAM_Socket *socket,
703 struct GNUNET_MessageHeader *message,
704 SendFinishCallback finish_cb,
705 void *finish_cb_cls,
706 int urgent)
707{
708 struct MessageQueue *queue_entity;
709
710 GNUNET_assert ((ntohs (message->type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
711 && (ntohs (message->type)
712 <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
713 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Queueing message of type %d and size %d\n",
714 GNUNET_i2s (&socket->other_peer),
715 ntohs (message->type),ntohs (message->size));
716 GNUNET_assert (NULL != message);
717 queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
718 queue_entity->message = message;
719 queue_entity->finish_cb = finish_cb;
720 queue_entity->finish_cb_cls = finish_cb_cls;
721 if (GNUNET_YES == urgent)
722 {
723 GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
724 queue_entity);
725 if (NULL != socket->transmit_handle)
726 {
727 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
728 socket->transmit_handle = NULL;
729 }
730 }
731 else
732 GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
733 socket->queue_tail,
734 queue_entity);
735 if (NULL == socket->transmit_handle)
736 {
737 socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
738 socket->transmit_handle =
739 GNUNET_MESH_notify_transmit_ready (socket->tunnel,
740 GNUNET_NO, /* Corking */
741 socket->mesh_retry_timeout,
742 ntohs (message->size),
743 &send_message_notify,
744 socket);
745 }
746}
747
748
749/**
750 * Copies a message and queues it for sending using the mesh connection of
751 * given socket
752 *
753 * @param socket the socket whose mesh connection is used
754 * @param message the message to be sent
755 * @param finish_cb the callback to be called when the message is sent
756 * @param finish_cb_cls the closure for the callback
757 */
758static void
759copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
760 const struct GNUNET_MessageHeader *message,
761 SendFinishCallback finish_cb,
762 void *finish_cb_cls)
763{
764 struct GNUNET_MessageHeader *msg_copy;
765 uint16_t size;
766
767 size = ntohs (message->size);
768 msg_copy = GNUNET_malloc (size);
769 memcpy (msg_copy, message, size);
770 queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
771}
772
773
774/**
775 * Writes data using the given socket. The amount of data written is limited by
776 * the receiver_window_size
777 *
778 * @param socket the socket to use
779 */
780static void
781write_data (struct GNUNET_STREAM_Socket *socket);
782
783
784/**
785 * Task for retransmitting data messages if they aren't ACK before their ack
786 * deadline
787 *
788 * @param cls the socket
789 * @param tc the Task context
790 */
791static void
792data_retransmission_task (void *cls,
793 const struct GNUNET_SCHEDULER_TaskContext *tc)
794{
795 struct GNUNET_STREAM_Socket *socket = cls;
796
797 socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
798 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
799 return;
800 LOG (GNUNET_ERROR_TYPE_DEBUG,
801 "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
802 write_data (socket);
803}
804
805
806/**
807 * Task for sending ACK message
808 *
809 * @param cls the socket
810 * @param tc the Task context
811 */
812static void
813ack_task (void *cls,
814 const struct GNUNET_SCHEDULER_TaskContext *tc)
815{
816 struct GNUNET_STREAM_Socket *socket = cls;
817 struct GNUNET_STREAM_AckMessage *ack_msg;
818
819 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
820 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
821 return;
822 /* Create the ACK Message */
823 ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
824 ack_msg->header.size = htons (sizeof (struct
825 GNUNET_STREAM_AckMessage));
826 ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
827 ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
828 ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
829 ack_msg->receive_window_remaining =
830 htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
831 /* Queue up ACK for immediate sending */
832 queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
833}
834
835
836/**
837 * Retransmission task for shutdown messages
838 *
839 * @param cls the shutdown handle
840 * @param tc the Task Context
841 */
842static void
843close_msg_retransmission_task (void *cls,
844 const struct GNUNET_SCHEDULER_TaskContext *tc)
845{
846 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
847 struct GNUNET_MessageHeader *msg;
848 struct GNUNET_STREAM_Socket *socket;
849
850 shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
851 GNUNET_assert (NULL != shutdown_handle);
852 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
853 return;
854 socket = shutdown_handle->socket;
855 msg = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
856 msg->size = htons (sizeof (struct GNUNET_MessageHeader));
857 switch (shutdown_handle->operation)
858 {
859 case SHUT_RDWR:
860 msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
861 break;
862 case SHUT_RD:
863 msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
864 break;
865 case SHUT_WR:
866 msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
867 break;
868 default:
869 GNUNET_free (msg);
870 shutdown_handle->close_msg_retransmission_task_id =
871 GNUNET_SCHEDULER_NO_TASK;
872 return;
873 }
874 queue_message (socket, msg, NULL, NULL, GNUNET_NO);
875 shutdown_handle->close_msg_retransmission_task_id =
876 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
877 &close_msg_retransmission_task,
878 shutdown_handle);
879}
880
881
882/**
883 * Function to modify a bit in GNUNET_STREAM_AckBitmap
884 *
885 * @param bitmap the bitmap to modify
886 * @param bit the bit number to modify
887 * @param value GNUNET_YES to on, GNUNET_NO to off
888 */
889static void
890ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
891 unsigned int bit,
892 int value)
893{
894 GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
895 if (GNUNET_YES == value)
896 *bitmap |= (1LL << bit);
897 else
898 *bitmap &= ~(1LL << bit);
899}
900
901
902/**
903 * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
904 *
905 * @param bitmap address of the bitmap that has to be checked
906 * @param bit the bit number to check
907 * @return GNUNET_YES if the bit is set; GNUNET_NO if not
908 */
909static uint8_t
910ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
911 unsigned int bit)
912{
913 GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
914 return 0 != (*bitmap & (1LL << bit));
915}
916
917
918/**
919 * Writes data using the given socket. The amount of data written is limited by
920 * the receiver_window_size
921 *
922 * @param socket the socket to use
923 */
924static void
925write_data (struct GNUNET_STREAM_Socket *socket)
926{
927 struct GNUNET_STREAM_WriteHandle *io_handle = socket->write_handle;
928 unsigned int packet;
929
930 for (packet=0; packet < io_handle->packets_sent; packet++)
931 {
932 if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
933 packet))
934 {
935 LOG (GNUNET_ERROR_TYPE_DEBUG,
936 "%s: Retransmitting DATA message with sequence %u\n",
937 GNUNET_i2s (&socket->other_peer),
938 ntohl (io_handle->messages[packet]->sequence_number));
939 copy_and_queue_message (socket,
940 &io_handle->messages[packet]->header,
941 NULL,
942 NULL);
943 }
944 }
945 /* Now send new packets if there is enough buffer space */
946 while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
947 (NULL != io_handle->messages[packet]) &&
948 (socket->receiver_window_available
949 >= ntohs (io_handle->messages[packet]->header.size)))
950 {
951 socket->receiver_window_available -=
952 ntohs (io_handle->messages[packet]->header.size);
953 LOG (GNUNET_ERROR_TYPE_DEBUG,
954 "%s: Placing DATA message with sequence %u in send queue\n",
955 GNUNET_i2s (&socket->other_peer),
956 ntohl (io_handle->messages[packet]->sequence_number));
957 copy_and_queue_message (socket,
958 &io_handle->messages[packet]->header,
959 NULL,
960 NULL);
961 packet++;
962 }
963 io_handle->packets_sent = packet;
964 if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
965 {
966 socket->data_retransmit_timeout = GNUNET_TIME_STD_BACKOFF
967 (socket->data_retransmit_timeout);
968 socket->data_retransmission_task_id =
969 GNUNET_SCHEDULER_add_delayed (socket->data_retransmit_timeout,
970 &data_retransmission_task,
971 socket);
972 }
973}
974
975
976/**
977 * Cleansup the sockets read handle
978 *
979 * @param socket the socket whose read handle has to be cleanedup
980 */
981static void
982cleanup_read_handle (struct GNUNET_STREAM_Socket *socket)
983{
984 struct GNUNET_STREAM_ReadHandle *read_handle;
985
986 read_handle = socket->read_handle;
987 /* Read io time task should be there; if it is already executed then this
988 read handle is not valid; However upon scheduler shutdown the read io task
989 may be executed before */
990 if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_io_timeout_task_id)
991 GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
992 /* reading task may be present; if so we have to stop it */
993 if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_task_id)
994 GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
995 if (GNUNET_SCHEDULER_NO_TASK != read_handle->probe_data_availability_task_id)
996 GNUNET_SCHEDULER_cancel (read_handle->probe_data_availability_task_id);
997 GNUNET_free (read_handle);
998 socket->read_handle = NULL;
999}
1000
1001
1002/**
1003 * Task for calling the read processor
1004 *
1005 * @param cls the socket
1006 * @param tc the task context
1007 */
1008static void
1009call_read_processor (void *cls,
1010 const struct GNUNET_SCHEDULER_TaskContext *tc)
1011{
1012 struct GNUNET_STREAM_Socket *socket = cls;
1013 struct GNUNET_STREAM_ReadHandle *read_handle;
1014 GNUNET_STREAM_DataProcessor proc;
1015 void *proc_cls;
1016 size_t read_size;
1017 size_t valid_read_size;
1018 unsigned int packet;
1019 uint32_t sequence_increase;
1020 uint32_t offset_increase;
1021
1022 read_handle = socket->read_handle;
1023 GNUNET_assert (NULL != read_handle);
1024 read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1025 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1026 return;
1027 if (NULL == socket->receive_buffer)
1028 return;
1029 GNUNET_assert (NULL != read_handle->proc);
1030 /* Check the bitmap for any holes */
1031 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1032 {
1033 if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
1034 packet))
1035 break;
1036 }
1037 /* We only call read processor if we have the first packet */
1038 GNUNET_assert (0 < packet);
1039 valid_read_size =
1040 socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
1041 GNUNET_assert (0 != valid_read_size);
1042 proc = read_handle->proc;
1043 proc_cls = read_handle->proc_cls;
1044 cleanup_read_handle (socket);
1045 /* Call the data processor */
1046 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
1047 GNUNET_i2s (&socket->other_peer));
1048 read_size = proc (proc_cls, GNUNET_STREAM_OK,
1049 socket->receive_buffer + socket->copy_offset,
1050 valid_read_size);
1051 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
1052 GNUNET_i2s (&socket->other_peer), read_size);
1053 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
1054 GNUNET_i2s (&socket->other_peer));
1055 GNUNET_assert (read_size <= valid_read_size);
1056 socket->copy_offset += read_size;
1057 /* Determine upto which packet we can remove from the buffer */
1058 for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1059 {
1060 if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
1061 {
1062 packet++;
1063 break;
1064 }
1065 if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
1066 break;
1067 }
1068 /* If no packets can be removed we can't move the buffer */
1069 if (0 == packet)
1070 return;
1071 sequence_increase = packet;
1072 LOG (GNUNET_ERROR_TYPE_DEBUG,
1073 "%s: Sequence increase after read processor completion: %u\n",
1074 GNUNET_i2s (&socket->other_peer), sequence_increase);
1075 /* Shift the data in the receive buffer */
1076 socket->receive_buffer =
1077 memmove (socket->receive_buffer,
1078 socket->receive_buffer
1079 + socket->receive_buffer_boundaries[sequence_increase-1],
1080 socket->receive_buffer_size
1081 - socket->receive_buffer_boundaries[sequence_increase-1]);
1082 /* Shift the bitmap */
1083 socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
1084 /* Set read_sequence_number */
1085 socket->read_sequence_number += sequence_increase;
1086 /* Set read_offset */
1087 offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
1088 socket->read_offset += offset_increase;
1089 /* Fix copy_offset */
1090 GNUNET_assert (offset_increase <= socket->copy_offset);
1091 socket->copy_offset -= offset_increase;
1092 /* Fix relative boundaries */
1093 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
1094 {
1095 if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
1096 {
1097 uint32_t ahead_buffer_boundary;
1098
1099 ahead_buffer_boundary =
1100 socket->receive_buffer_boundaries[packet + sequence_increase];
1101 if (0 == ahead_buffer_boundary)
1102 socket->receive_buffer_boundaries[packet] = 0;
1103 else
1104 {
1105 GNUNET_assert (offset_increase < ahead_buffer_boundary);
1106 socket->receive_buffer_boundaries[packet] =
1107 ahead_buffer_boundary - offset_increase;
1108 }
1109 }
1110 else
1111 socket->receive_buffer_boundaries[packet] = 0;
1112 }
1113}
1114
1115
1116/**
1117 * Cancels the existing read io handle
1118 *
1119 * @param cls the closure from the SCHEDULER call
1120 * @param tc the task context
1121 */
1122static void
1123read_io_timeout (void *cls,
1124 const struct GNUNET_SCHEDULER_TaskContext *tc)
1125{
1126 struct GNUNET_STREAM_Socket *socket = cls;
1127 struct GNUNET_STREAM_ReadHandle *read_handle;
1128 GNUNET_STREAM_DataProcessor proc;
1129 void *proc_cls;
1130
1131 read_handle = socket->read_handle;
1132 GNUNET_assert (NULL != read_handle);
1133 read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
1134 if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
1135 return;
1136 if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
1137 {
1138 LOG (GNUNET_ERROR_TYPE_DEBUG,
1139 "%s: Read task timedout - Cancelling it\n",
1140 GNUNET_i2s (&socket->other_peer));
1141 GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
1142 read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
1143 }
1144 proc = read_handle->proc;
1145 proc_cls = read_handle->proc_cls;
1146 GNUNET_free (read_handle);
1147 socket->read_handle = NULL;
1148 /* Call the read processor to signal timeout */
1149 proc (proc_cls,
1150 GNUNET_STREAM_TIMEOUT,
1151 NULL,
1152 0);
1153}
1154
1155
1156/**
1157 * Handler for DATA messages; Same for both client and server
1158 *
1159 * @param socket the socket through which the ack was received
1160 * @param tunnel connection to the other end
1161 * @param msg the data message
1162 * @return GNUNET_OK to keep the connection open,
1163 * GNUNET_SYSERR to close it (signal serious error)
1164 */
1165static int
1166handle_data (struct GNUNET_STREAM_Socket *socket,
1167 struct GNUNET_MESH_Tunnel *tunnel,
1168 const struct GNUNET_STREAM_DataMessage *msg)
1169{
1170 const void *payload;
1171 struct GNUNET_TIME_Relative ack_deadline_rel;
1172 uint32_t bytes_needed;
1173 uint32_t relative_offset;
1174 uint32_t relative_sequence_number;
1175 uint16_t size;
1176
1177 size = htons (msg->header.size);
1178 if (size < sizeof (struct GNUNET_STREAM_DataMessage))
1179 {
1180 GNUNET_break_op (0);
1181 return GNUNET_SYSERR;
1182 }
1183 switch (socket->state)
1184 {
1185 case STATE_ESTABLISHED:
1186 case STATE_TRANSMIT_CLOSED:
1187 case STATE_TRANSMIT_CLOSE_WAIT:
1188 /* check if the message's sequence number is in the range we are
1189 expecting */
1190 relative_sequence_number =
1191 ntohl (msg->sequence_number) - socket->read_sequence_number;
1192 if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
1193 {
1194 LOG (GNUNET_ERROR_TYPE_DEBUG,
1195 "%s: Ignoring received message with sequence number %u\n",
1196 GNUNET_i2s (&socket->other_peer),
1197 ntohl (msg->sequence_number));
1198 /* Start ACK sending task if one is not already present */
1199 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1200 {
1201 socket->ack_task_id =
1202 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1203 (msg->ack_deadline),
1204 &ack_task,
1205 socket);
1206 }
1207 return GNUNET_YES;
1208 }
1209 /* Check if we have already seen this message */
1210 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
1211 relative_sequence_number))
1212 {
1213 LOG (GNUNET_ERROR_TYPE_DEBUG,
1214 "%s: Ignoring already received message with sequence number %u\n",
1215 GNUNET_i2s (&socket->other_peer),
1216 ntohl (msg->sequence_number));
1217 /* Start ACK sending task if one is not already present */
1218 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1219 {
1220 socket->ack_task_id =
1221 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1222 (msg->ack_deadline), &ack_task, socket);
1223 }
1224 return GNUNET_YES;
1225 }
1226 LOG (GNUNET_ERROR_TYPE_DEBUG,
1227 "%1$s: Receiving DATA with sequence number: %2$u and size: %3$d from "
1228 "%1$s\n", GNUNET_i2s (&socket->other_peer),
1229 ntohl (msg->sequence_number), ntohs (msg->header.size));
1230 /* Check if we have to allocate the buffer */
1231 size -= sizeof (struct GNUNET_STREAM_DataMessage);
1232 relative_offset = ntohl (msg->offset) - socket->read_offset;
1233 bytes_needed = relative_offset + size;
1234 if (bytes_needed > socket->receive_buffer_size)
1235 {
1236 if (bytes_needed <= RECEIVE_BUFFER_SIZE)
1237 {
1238 socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
1239 bytes_needed);
1240 socket->receive_buffer_size = bytes_needed;
1241 }
1242 else
1243 {
1244 LOG (GNUNET_ERROR_TYPE_DEBUG,
1245 "%s: Cannot accommodate packet %d as buffer is full\n",
1246 GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
1247 return GNUNET_YES;
1248 }
1249 }
1250 /* Copy Data to buffer */
1251 payload = &msg[1];
1252 GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
1253 memcpy (socket->receive_buffer + relative_offset, payload, size);
1254 socket->receive_buffer_boundaries[relative_sequence_number] =
1255 relative_offset + size;
1256 /* Modify the ACK bitmap */
1257 ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
1258 GNUNET_YES);
1259 /* Start ACK sending task if one is not already present */
1260 ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
1261 if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
1262 {
1263 ack_deadline_rel =
1264 GNUNET_TIME_relative_min (ack_deadline_rel,
1265 GNUNET_TIME_relative_multiply
1266 (GNUNET_TIME_UNIT_SECONDS, 300));
1267 socket->ack_task_id =
1268 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
1269 (msg->ack_deadline), &ack_task, socket);
1270 socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1271 socket->ack_time_deadline = ack_deadline_rel;
1272 }
1273 else
1274 {
1275 struct GNUNET_TIME_Relative ack_time_past;
1276 struct GNUNET_TIME_Relative ack_time_remaining;
1277 struct GNUNET_TIME_Relative ack_time_min;
1278 ack_time_past =
1279 GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
1280 ack_time_remaining = GNUNET_TIME_relative_subtract
1281 (socket->ack_time_deadline, ack_time_past);
1282 ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
1283 ack_deadline_rel);
1284 if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
1285 sizeof (struct GNUNET_TIME_Relative)))
1286 {
1287 ack_deadline_rel = ack_time_min;
1288 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
1289 socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
1290 &ack_task, socket);
1291 socket->ack_time_registered = GNUNET_TIME_absolute_get ();
1292 socket->ack_time_deadline = ack_deadline_rel;
1293 }
1294 }
1295 if ((NULL != socket->read_handle) /* A read handle is waiting */
1296 /* There is no current read task */
1297 && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
1298 /* We have the first packet */
1299 && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
1300 {
1301 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
1302 GNUNET_i2s (&socket->other_peer));
1303 socket->read_handle->read_task_id =
1304 GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
1305 }
1306 break;
1307 default:
1308 LOG (GNUNET_ERROR_TYPE_DEBUG,
1309 "%s: Received data message when it cannot be handled\n",
1310 GNUNET_i2s (&socket->other_peer));
1311 break;
1312 }
1313 return GNUNET_YES;
1314}
1315
1316
1317/**
1318 * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
1319 *
1320 * @param cls the socket (set from GNUNET_MESH_connect)
1321 * @param tunnel connection to the other end
1322 * @param tunnel_ctx place to store local state associated with the tunnel
1323 * @param message the actual message
1324 * @return GNUNET_OK to keep the connection open,
1325 * GNUNET_SYSERR to close it (signal serious error)
1326 */
1327static int
1328client_handle_data (void *cls,
1329 struct GNUNET_MESH_Tunnel *tunnel,
1330 void **tunnel_ctx,
1331 const struct GNUNET_MessageHeader *message)
1332{
1333 struct GNUNET_STREAM_Socket *socket = cls;
1334
1335 return handle_data (socket, tunnel,
1336 (const struct GNUNET_STREAM_DataMessage *) message);
1337}
1338
1339
1340/**
1341 * Callback to set state to ESTABLISHED
1342 *
1343 * @param cls the closure NULL;
1344 * @param socket the socket to requiring state change
1345 */
1346static void
1347set_state_established (void *cls,
1348 struct GNUNET_STREAM_Socket *socket)
1349{
1350 LOG (GNUNET_ERROR_TYPE_DEBUG,
1351 "%s: Attaining ESTABLISHED state\n",
1352 GNUNET_i2s (&socket->other_peer));
1353 socket->write_offset = 0;
1354 socket->read_offset = 0;
1355 socket->state = STATE_ESTABLISHED;
1356 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
1357 socket->control_retransmission_task_id);
1358 GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
1359 socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1360 if (NULL != socket->lsocket)
1361 {
1362 LOG (GNUNET_ERROR_TYPE_DEBUG,
1363 "%s: Calling listen callback\n",
1364 GNUNET_i2s (&socket->other_peer));
1365 if (GNUNET_SYSERR ==
1366 socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
1367 socket,
1368 &socket->other_peer))
1369 {
1370 socket->state = STATE_CLOSED;
1371 /* FIXME: We should close in a decent way (send RST) */
1372 GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
1373 GNUNET_free (socket);
1374 }
1375 }
1376 else
1377 socket->open_cb (socket->open_cls, socket);
1378}
1379
1380
1381/**
1382 * Callback to set state to HELLO_WAIT
1383 *
1384 * @param cls the closure from queue_message
1385 * @param socket the socket to requiring state change
1386 */
1387static void
1388set_state_hello_wait (void *cls,
1389 struct GNUNET_STREAM_Socket *socket)
1390{
1391 GNUNET_assert (STATE_INIT == socket->state);
1392 LOG (GNUNET_ERROR_TYPE_DEBUG,
1393 "%s: Attaining HELLO_WAIT state\n",
1394 GNUNET_i2s (&socket->other_peer));
1395 socket->state = STATE_HELLO_WAIT;
1396}
1397
1398
1399/**
1400 * Callback to set state to CLOSE_WAIT
1401 *
1402 * @param cls the closure from queue_message
1403 * @param socket the socket requiring state change
1404 */
1405static void
1406set_state_close_wait (void *cls,
1407 struct GNUNET_STREAM_Socket *socket)
1408{
1409 LOG (GNUNET_ERROR_TYPE_DEBUG,
1410 "%s: Attaing CLOSE_WAIT state\n",
1411 GNUNET_i2s (&socket->other_peer));
1412 socket->state = STATE_CLOSE_WAIT;
1413 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1414 socket->receive_buffer = NULL;
1415 socket->receive_buffer_size = 0;
1416}
1417
1418
1419/**
1420 * Callback to set state to RECEIVE_CLOSE_WAIT
1421 *
1422 * @param cls the closure from queue_message
1423 * @param socket the socket requiring state change
1424 */
1425static void
1426set_state_receive_close_wait (void *cls,
1427 struct GNUNET_STREAM_Socket *socket)
1428{
1429 LOG (GNUNET_ERROR_TYPE_DEBUG,
1430 "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
1431 GNUNET_i2s (&socket->other_peer));
1432 socket->state = STATE_RECEIVE_CLOSE_WAIT;
1433 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1434 socket->receive_buffer = NULL;
1435 socket->receive_buffer_size = 0;
1436}
1437
1438
1439/**
1440 * Callback to set state to TRANSMIT_CLOSE_WAIT
1441 *
1442 * @param cls the closure from queue_message
1443 * @param socket the socket requiring state change
1444 */
1445static void
1446set_state_transmit_close_wait (void *cls,
1447 struct GNUNET_STREAM_Socket *socket)
1448{
1449 LOG (GNUNET_ERROR_TYPE_DEBUG,
1450 "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
1451 GNUNET_i2s (&socket->other_peer));
1452 socket->state = STATE_TRANSMIT_CLOSE_WAIT;
1453}
1454
1455
1456/**
1457 * Callback to set state to CLOSED
1458 *
1459 * @param cls the closure from queue_message
1460 * @param socket the socket requiring state change
1461 */
1462static void
1463set_state_closed (void *cls,
1464 struct GNUNET_STREAM_Socket *socket)
1465{
1466 socket->state = STATE_CLOSED;
1467}
1468
1469
1470/**
1471 * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
1472 *
1473 * @return the generate hello message
1474 */
1475static struct GNUNET_MessageHeader *
1476generate_hello (struct GNUNET_STREAM_Socket *socket)
1477{
1478 struct GNUNET_STREAM_HelloMessage *msg;
1479
1480 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloMessage));
1481 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
1482 msg->header.size = htons (sizeof (struct GNUNET_STREAM_HelloMessage));
1483 msg->port = GNUNET_htonll ((uint64_t) socket->port);
1484 return &msg->header;
1485}
1486
1487
1488/**
1489 * Returns a new HelloAckMessage. Also sets the write sequence number for the
1490 * socket
1491 *
1492 * @param socket the socket for which this HelloAckMessage has to be generated
1493 * @param generate_seq GNUNET_YES to generate the write sequence number,
1494 * GNUNET_NO to use the existing sequence number
1495 * @return the HelloAckMessage
1496 */
1497static struct GNUNET_STREAM_HelloAckMessage *
1498generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
1499 int generate_seq)
1500{
1501 struct GNUNET_STREAM_HelloAckMessage *msg;
1502
1503 if (GNUNET_YES == generate_seq)
1504 {
1505 if (GNUNET_YES == socket->testing_active)
1506 socket->write_sequence_number =
1507 socket->testing_set_write_sequence_number_value;
1508 else
1509 socket->write_sequence_number =
1510 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
1511 LOG_DEBUG ("%s: write sequence number %u\n",
1512 GNUNET_i2s (&socket->other_peer),
1513 (unsigned int) socket->write_sequence_number);
1514 }
1515 msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1516 msg->header.size =
1517 htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
1518 msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
1519 msg->sequence_number = htonl (socket->write_sequence_number);
1520 msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
1521 return msg;
1522}
1523
1524
1525/**
1526 * Task for retransmitting control messages if they aren't ACK'ed before a
1527 * deadline
1528 *
1529 * @param cls the socket
1530 * @param tc the Task context
1531 */
1532static void
1533control_retransmission_task (void *cls,
1534 const struct GNUNET_SCHEDULER_TaskContext *tc)
1535{
1536 struct GNUNET_STREAM_Socket *socket = cls;
1537
1538 socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
1539 if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
1540 return;
1541 LOG_DEBUG ("%s: Retransmitting a control message\n",
1542 GNUNET_i2s (&socket->other_peer));
1543 switch (socket->state)
1544 {
1545 case STATE_INIT:
1546 GNUNET_break (0);
1547 break;
1548 case STATE_LISTEN:
1549 GNUNET_break (0);
1550 break;
1551 case STATE_HELLO_WAIT:
1552 if (NULL == socket->lsocket) /* We are client */
1553 queue_message (socket, generate_hello (socket), NULL, NULL, GNUNET_NO);
1554 else
1555 queue_message (socket,
1556 (struct GNUNET_MessageHeader *)
1557 generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1558 GNUNET_NO);
1559 socket->control_retransmission_task_id =
1560 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
1561 &control_retransmission_task, socket);
1562 break;
1563 case STATE_ESTABLISHED:
1564 if (NULL == socket->lsocket)
1565 queue_message (socket,
1566 (struct GNUNET_MessageHeader *)
1567 generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
1568 GNUNET_NO);
1569 else
1570 GNUNET_break (0);
1571 break;
1572 default:
1573 GNUNET_break (0);
1574 }
1575}
1576
1577
1578/**
1579 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
1580 *
1581 * @param cls the socket (set from GNUNET_MESH_connect)
1582 * @param tunnel connection to the other end
1583 * @param tunnel_ctx this is NULL
1584 * @param message the actual message
1585 * @return GNUNET_OK to keep the connection open,
1586 * GNUNET_SYSERR to close it (signal serious error)
1587 */
1588static int
1589client_handle_hello_ack (void *cls,
1590 struct GNUNET_MESH_Tunnel *tunnel,
1591 void **tunnel_ctx,
1592 const struct GNUNET_MessageHeader *message)
1593{
1594 struct GNUNET_STREAM_Socket *socket = cls;
1595 const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
1596 struct GNUNET_STREAM_HelloAckMessage *reply;
1597
1598 ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
1599 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
1600 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1601 GNUNET_assert (socket->tunnel == tunnel);
1602 switch (socket->state)
1603 {
1604 case STATE_HELLO_WAIT:
1605 socket->read_sequence_number = ntohl (ack_msg->sequence_number);
1606 LOG_DEBUG ("%s: Read sequence number %u\n",
1607 GNUNET_i2s (&socket->other_peer), socket->read_sequence_number);
1608 socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
1609 reply = generate_hello_ack (socket, GNUNET_YES);
1610 queue_message (socket, &reply->header, &set_state_established,
1611 NULL, GNUNET_NO);
1612 return GNUNET_OK;
1613 case STATE_ESTABLISHED:
1614 // call statistics (# ACKs ignored++)
1615 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
1616 socket->control_retransmission_task_id);
1617 socket->control_retransmission_task_id =
1618 GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
1619 return GNUNET_OK;
1620 default:
1621 LOG_DEBUG ("%1$s: Server %1$s sent HELLO_ACK when in state %2$d\n",
1622 GNUNET_i2s (&socket->other_peer), socket->state);
1623 socket->state = STATE_CLOSED; // introduce STATE_ERROR?
1624 return GNUNET_SYSERR;
1625 }
1626}
1627
1628
1629/**
1630 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
1631 *
1632 * @param cls the socket (set from GNUNET_MESH_connect)
1633 * @param tunnel connection to the other end
1634 * @param tunnel_ctx this is NULL
1635 * @param message the actual message
1636 * @return GNUNET_OK to keep the connection open,
1637 * GNUNET_SYSERR to close it (signal serious error)
1638 */
1639static int
1640client_handle_reset (void *cls,
1641 struct GNUNET_MESH_Tunnel *tunnel,
1642 void **tunnel_ctx,
1643 const struct GNUNET_MessageHeader *message)
1644{
1645 // struct GNUNET_STREAM_Socket *socket = cls;
1646
1647 return GNUNET_OK;
1648}
1649
1650
1651/**
1652 * Frees the socket's receive buffers, marks the socket as receive closed and
1653 * calls the DataProcessor with GNUNET_STREAM_SHUTDOWN status if a read handle
1654 * is present
1655 *
1656 * @param socket the socket
1657 */
1658static void
1659do_receive_shutdown (struct GNUNET_STREAM_Socket *socket)
1660{
1661 socket->receive_closed = GNUNET_YES;
1662 GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
1663 socket->receive_buffer = NULL;
1664 socket->receive_buffer_size = 0;
1665 if (NULL != socket->read_handle)
1666 {
1667 GNUNET_STREAM_DataProcessor proc;
1668 void *proc_cls;
1669
1670 proc = socket->read_handle->proc;
1671 proc_cls = socket->read_handle->proc_cls;
1672 GNUNET_STREAM_read_cancel (socket->read_handle);
1673 socket->read_handle = NULL;
1674 if (NULL != proc)
1675 proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
1676 }
1677}
1678
1679
1680/**
1681 * Marks the socket as transmit closed and calls the CompletionContinuation with
1682 * GNUNET_STREAM_SHUTDOWN status if a write handle is present
1683 *
1684 * @param socket the socket
1685 */
1686static void
1687do_transmit_shutdown (struct GNUNET_STREAM_Socket *socket)
1688{
1689 socket->transmit_closed = GNUNET_YES;
1690 /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
1691 that that stream has been shutdown */
1692 if (NULL != socket->write_handle)
1693 {
1694 GNUNET_STREAM_CompletionContinuation wc;
1695 void *wc_cls;
1696
1697 wc = socket->write_handle->write_cont;
1698 wc_cls = socket->write_handle->write_cont_cls;
1699 GNUNET_STREAM_write_cancel (socket->write_handle);
1700 socket->write_handle = NULL;
1701 if (NULL != wc)
1702 wc (wc_cls,
1703 GNUNET_STREAM_SHUTDOWN, 0);
1704 }
1705}
1706
1707
1708/**
1709 * Common message handler for handling TRANSMIT_CLOSE messages
1710 *
1711 * @param socket the socket through which the ack was received
1712 * @param tunnel connection to the other end
1713 * @param msg the transmit close message
1714 * @return GNUNET_OK to keep the connection open,
1715 * GNUNET_SYSERR to close it (signal serious error)
1716 */
1717static int
1718handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
1719 struct GNUNET_MESH_Tunnel *tunnel,
1720 const struct GNUNET_MessageHeader *msg)
1721{
1722 struct GNUNET_MessageHeader *reply;
1723
1724 switch (socket->state)
1725 {
1726 case STATE_INIT:
1727 case STATE_LISTEN:
1728 case STATE_HELLO_WAIT:
1729 LOG (GNUNET_ERROR_TYPE_DEBUG,
1730 "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1731 GNUNET_i2s (&socket->other_peer));
1732 return GNUNET_OK;
1733 default:
1734 break;
1735 }
1736 /* Send TRANSMIT_CLOSE_ACK */
1737 reply = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
1738 reply->type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
1739 reply->size = htons (sizeof (struct GNUNET_MessageHeader));
1740 queue_message (socket, reply, NULL, NULL, GNUNET_NO);
1741 LOG (GNUNET_ERROR_TYPE_DEBUG, "%1$s: Received TRANSMIT_CLOSE from %1$s\n",
1742 GNUNET_i2s (&socket->other_peer));
1743 switch(socket->state)
1744 {
1745 case STATE_RECEIVE_CLOSED:
1746 case STATE_RECEIVE_CLOSE_WAIT:
1747 case STATE_CLOSE_WAIT:
1748 case STATE_CLOSED:
1749 return GNUNET_OK;
1750 default:
1751 break;
1752 }
1753 do_receive_shutdown (socket);
1754 if (GNUNET_YES == socket->transmit_closed)
1755 socket->state = STATE_CLOSED;
1756 else
1757 socket->state = STATE_RECEIVE_CLOSED;
1758 return GNUNET_OK;
1759}
1760
1761
1762/**
1763 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
1764 *
1765 * @param cls the socket (set from GNUNET_MESH_connect)
1766 * @param tunnel connection to the other end
1767 * @param tunnel_ctx this is NULL
1768 * @param message the actual message
1769 * @return GNUNET_OK to keep the connection open,
1770 * GNUNET_SYSERR to close it (signal serious error)
1771 */
1772static int
1773client_handle_transmit_close (void *cls,
1774 struct GNUNET_MESH_Tunnel *tunnel,
1775 void **tunnel_ctx,
1776 const struct GNUNET_MessageHeader *message)
1777{
1778 struct GNUNET_STREAM_Socket *socket = cls;
1779
1780 return handle_transmit_close (socket,
1781 tunnel,
1782 (struct GNUNET_MessageHeader *)message);
1783}
1784
1785
1786/**
1787 * Task for calling the shutdown continuation callback
1788 *
1789 * @param cls the socket
1790 * @param tc the scheduler task context
1791 */
1792static void
1793call_cont_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1794{
1795 struct GNUNET_STREAM_Socket *socket = cls;
1796
1797 GNUNET_assert (NULL != socket->shutdown_handle);
1798 socket->shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_NO_TASK;
1799 if (NULL != socket->shutdown_handle->completion_cb)
1800 socket->shutdown_handle->completion_cb
1801 (socket->shutdown_handle->completion_cls,
1802 socket->shutdown_handle->operation);
1803 GNUNET_free (socket->shutdown_handle);
1804 socket->shutdown_handle = NULL;
1805}
1806
1807
1808/**
1809 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
1810 *
1811 * @param socket the socket
1812 * @param tunnel connection to the other end
1813 * @param message the actual message
1814 * @param operation the close operation which is being ACK'ed
1815 * @return GNUNET_OK to keep the connection open,
1816 * GNUNET_SYSERR to close it (signal serious error)
1817 */
1818static int
1819handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
1820 struct GNUNET_MESH_Tunnel *tunnel,
1821 const struct GNUNET_MessageHeader *message,
1822 int operation)
1823{
1824 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
1825
1826 shutdown_handle = socket->shutdown_handle;
1827 if (NULL == shutdown_handle)
1828 {
1829 /* This may happen when the shudown handle is cancelled */
1830 LOG (GNUNET_ERROR_TYPE_DEBUG,
1831 "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
1832 GNUNET_i2s (&socket->other_peer));
1833 return GNUNET_OK;
1834 }
1835 switch (operation)
1836 {
1837 case SHUT_RDWR:
1838 switch (socket->state)
1839 {
1840 case STATE_CLOSE_WAIT:
1841 if (SHUT_RDWR != shutdown_handle->operation)
1842 {
1843 LOG (GNUNET_ERROR_TYPE_DEBUG,
1844 "%s: Received CLOSE_ACK when shutdown handle is not for "
1845 "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
1846 return GNUNET_OK;
1847 }
1848 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
1849 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1850 socket->state = STATE_CLOSED;
1851 break;
1852 default:
1853 LOG (GNUNET_ERROR_TYPE_DEBUG,
1854 "%s: Received CLOSE_ACK when it is not expected\n",
1855 GNUNET_i2s (&socket->other_peer));
1856 return GNUNET_OK;
1857 }
1858 break;
1859 case SHUT_RD:
1860 switch (socket->state)
1861 {
1862 case STATE_RECEIVE_CLOSE_WAIT:
1863 if (SHUT_RD != shutdown_handle->operation)
1864 {
1865 LOG (GNUNET_ERROR_TYPE_DEBUG,
1866 "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
1867 "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
1868 return GNUNET_OK;
1869 }
1870 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
1871 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1872 socket->state = STATE_RECEIVE_CLOSED;
1873 break;
1874 default:
1875 LOG (GNUNET_ERROR_TYPE_DEBUG,
1876 "%s: Received RECEIVE_CLOSE_ACK when it is not expected\n",
1877 GNUNET_i2s (&socket->other_peer));
1878 return GNUNET_OK;
1879 }
1880 break;
1881 case SHUT_WR:
1882 switch (socket->state)
1883 {
1884 case STATE_TRANSMIT_CLOSE_WAIT:
1885 if (SHUT_WR != shutdown_handle->operation)
1886 {
1887 LOG (GNUNET_ERROR_TYPE_DEBUG,
1888 "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
1889 "is not for SHUT_WR\n",
1890 GNUNET_i2s (&socket->other_peer));
1891 return GNUNET_OK;
1892 }
1893 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
1894 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1895 socket->state = STATE_TRANSMIT_CLOSED;
1896 break;
1897 default:
1898 LOG (GNUNET_ERROR_TYPE_DEBUG,
1899 "%s: Received TRANSMIT_CLOSE_ACK when it is not expected\n",
1900 GNUNET_i2s (&socket->other_peer));
1901 return GNUNET_OK;
1902 }
1903 break;
1904 default:
1905 GNUNET_assert (0);
1906 }
1907 shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_add_now
1908 (&call_cont_task, socket);
1909 if (GNUNET_SCHEDULER_NO_TASK
1910 != shutdown_handle->close_msg_retransmission_task_id)
1911 {
1912 GNUNET_SCHEDULER_cancel
1913 (shutdown_handle->close_msg_retransmission_task_id);
1914 shutdown_handle->close_msg_retransmission_task_id =
1915 GNUNET_SCHEDULER_NO_TASK;
1916 }
1917 return GNUNET_OK;
1918}
1919
1920
1921/**
1922 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
1923 *
1924 * @param cls the socket (set from GNUNET_MESH_connect)
1925 * @param tunnel connection to the other end
1926 * @param tunnel_ctx this is NULL
1927 * @param message the actual message
1928 * @return GNUNET_OK to keep the connection open,
1929 * GNUNET_SYSERR to close it (signal serious error)
1930 */
1931static int
1932client_handle_transmit_close_ack (void *cls,
1933 struct GNUNET_MESH_Tunnel *tunnel,
1934 void **tunnel_ctx,
1935 const struct GNUNET_MessageHeader *message)
1936{
1937 struct GNUNET_STREAM_Socket *socket = cls;
1938
1939 return handle_generic_close_ack (socket,
1940 tunnel,
1941 (const struct GNUNET_MessageHeader *)
1942 message,
1943 SHUT_WR);
1944}
1945
1946
1947/**
1948 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
1949 *
1950 * @param socket the socket
1951 * @param tunnel connection to the other end
1952 * @param message the actual message
1953 * @return GNUNET_OK to keep the connection open,
1954 * GNUNET_SYSERR to close it (signal serious error)
1955 */
1956static int
1957handle_receive_close (struct GNUNET_STREAM_Socket *socket,
1958 struct GNUNET_MESH_Tunnel *tunnel,
1959 const struct GNUNET_MessageHeader *message)
1960{
1961 struct GNUNET_MessageHeader *receive_close_ack;
1962
1963 switch (socket->state)
1964 {
1965 case STATE_INIT:
1966 case STATE_LISTEN:
1967 case STATE_HELLO_WAIT:
1968 LOG (GNUNET_ERROR_TYPE_DEBUG,
1969 "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
1970 GNUNET_i2s (&socket->other_peer));
1971 return GNUNET_OK;
1972 default:
1973 break;
1974 }
1975 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n",
1976 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
1977 receive_close_ack = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
1978 receive_close_ack->size = htons (sizeof (struct GNUNET_MessageHeader));
1979 receive_close_ack->type =
1980 htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
1981 queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO);
1982 switch (socket->state)
1983 {
1984 case STATE_TRANSMIT_CLOSED:
1985 case STATE_TRANSMIT_CLOSE_WAIT:
1986 case STATE_CLOSED:
1987 case STATE_CLOSE_WAIT:
1988 return GNUNET_OK;
1989 default:
1990 break;
1991 }
1992 do_transmit_shutdown (socket);
1993 if (GNUNET_YES == socket->receive_closed)
1994 socket->state = STATE_CLOSED;
1995 else
1996 socket->state = STATE_TRANSMIT_CLOSED;
1997 return GNUNET_OK;
1998}
1999
2000
2001/**
2002 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2003 *
2004 * @param cls the socket (set from GNUNET_MESH_connect)
2005 * @param tunnel connection to the other end
2006 * @param tunnel_ctx this is NULL
2007 * @param message the actual message
2008 * @return GNUNET_OK to keep the connection open,
2009 * GNUNET_SYSERR to close it (signal serious error)
2010 */
2011static int
2012client_handle_receive_close (void *cls,
2013 struct GNUNET_MESH_Tunnel *tunnel,
2014 void **tunnel_ctx,
2015 const struct GNUNET_MessageHeader *message)
2016{
2017 struct GNUNET_STREAM_Socket *socket = cls;
2018
2019 return
2020 handle_receive_close (socket,
2021 tunnel,
2022 (const struct GNUNET_MessageHeader *) message);
2023}
2024
2025
2026/**
2027 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2028 *
2029 * @param cls the socket (set from GNUNET_MESH_connect)
2030 * @param tunnel connection to the other end
2031 * @param tunnel_ctx this is NULL
2032 * @param message the actual message
2033 * @return GNUNET_OK to keep the connection open,
2034 * GNUNET_SYSERR to close it (signal serious error)
2035 */
2036static int
2037client_handle_receive_close_ack (void *cls,
2038 struct GNUNET_MESH_Tunnel *tunnel,
2039 void **tunnel_ctx,
2040 const struct GNUNET_MessageHeader *message)
2041{
2042 struct GNUNET_STREAM_Socket *socket = cls;
2043
2044 return handle_generic_close_ack (socket,
2045 tunnel,
2046 (const struct GNUNET_MessageHeader *)
2047 message,
2048 SHUT_RD);
2049}
2050
2051
2052/**
2053 * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2054 *
2055 * @param socket the socket
2056 * @param tunnel connection to the other end
2057 * @param message the actual message
2058 * @return GNUNET_OK to keep the connection open,
2059 * GNUNET_SYSERR to close it (signal serious error)
2060 */
2061static int
2062handle_close (struct GNUNET_STREAM_Socket *socket,
2063 struct GNUNET_MESH_Tunnel *tunnel,
2064 const struct GNUNET_MessageHeader *message)
2065{
2066 struct GNUNET_MessageHeader *close_ack;
2067
2068 switch (socket->state)
2069 {
2070 case STATE_INIT:
2071 case STATE_LISTEN:
2072 case STATE_HELLO_WAIT:
2073 LOG (GNUNET_ERROR_TYPE_DEBUG,
2074 "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
2075 GNUNET_i2s (&socket->other_peer));
2076 return GNUNET_OK;
2077 default:
2078 break;
2079 }
2080 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE from %s\n",
2081 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2082 close_ack = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
2083 close_ack->size = htons (sizeof (struct GNUNET_MessageHeader));
2084 close_ack->type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
2085 queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
2086 if ((STATE_CLOSED == socket->state) || (STATE_CLOSE_WAIT == socket->state))
2087 return GNUNET_OK;
2088 if (GNUNET_NO == socket->transmit_closed)
2089 do_transmit_shutdown (socket);
2090 if (GNUNET_NO == socket->receive_closed)
2091 do_receive_shutdown (socket);
2092 return GNUNET_OK;
2093}
2094
2095
2096/**
2097 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2098 *
2099 * @param cls the socket (set from GNUNET_MESH_connect)
2100 * @param tunnel connection to the other end
2101 * @param tunnel_ctx this is NULL
2102 * @param message the actual message
2103 * @return GNUNET_OK to keep the connection open,
2104 * GNUNET_SYSERR to close it (signal serious error)
2105 */
2106static int
2107client_handle_close (void *cls,
2108 struct GNUNET_MESH_Tunnel *tunnel,
2109 void **tunnel_ctx,
2110 const struct GNUNET_MessageHeader *message)
2111{
2112 struct GNUNET_STREAM_Socket *socket = cls;
2113
2114 return handle_close (socket,
2115 tunnel,
2116 (const struct GNUNET_MessageHeader *) message);
2117}
2118
2119
2120/**
2121 * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2122 *
2123 * @param cls the socket (set from GNUNET_MESH_connect)
2124 * @param tunnel connection to the other end
2125 * @param tunnel_ctx this is NULL
2126 * @param message the actual message
2127 * @return GNUNET_OK to keep the connection open,
2128 * GNUNET_SYSERR to close it (signal serious error)
2129 */
2130static int
2131client_handle_close_ack (void *cls,
2132 struct GNUNET_MESH_Tunnel *tunnel,
2133 void **tunnel_ctx,
2134 const struct GNUNET_MessageHeader *message)
2135{
2136 struct GNUNET_STREAM_Socket *socket = cls;
2137
2138 return handle_generic_close_ack (socket,
2139 tunnel,
2140 (const struct GNUNET_MessageHeader *)
2141 message,
2142 SHUT_RDWR);
2143}
2144
2145/*****************************/
2146/* Server's Message Handlers */
2147/*****************************/
2148
2149/**
2150 * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
2151 *
2152 * @param cls the closure
2153 * @param tunnel connection to the other end
2154 * @param tunnel_ctx the socket
2155 * @param message the actual message
2156 * @return GNUNET_OK to keep the connection open,
2157 * GNUNET_SYSERR to close it (signal serious error)
2158 */
2159static int
2160server_handle_data (void *cls,
2161 struct GNUNET_MESH_Tunnel *tunnel,
2162 void **tunnel_ctx,
2163 const struct GNUNET_MessageHeader *message)
2164{
2165 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2166
2167 return handle_data (socket,
2168 tunnel,
2169 (const struct GNUNET_STREAM_DataMessage *)message);
2170}
2171
2172
2173/**
2174 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
2175 *
2176 * @param cls the closure
2177 * @param tunnel connection to the other end
2178 * @param tunnel_ctx the socket
2179 * @param message the actual message
2180 * @return GNUNET_OK to keep the connection open,
2181 * GNUNET_SYSERR to close it (signal serious error)
2182 */
2183static int
2184server_handle_hello (void *cls,
2185 struct GNUNET_MESH_Tunnel *tunnel,
2186 void **tunnel_ctx,
2187 const struct GNUNET_MessageHeader *message)
2188{
2189 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2190 const struct GNUNET_STREAM_HelloMessage *hello;
2191 struct GNUNET_STREAM_HelloAckMessage *reply;
2192 uint32_t port;
2193
2194 hello = (const struct GNUNET_STREAM_HelloMessage *) message;
2195 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
2196 GNUNET_assert (socket->tunnel == tunnel);
2197 LOG_DEBUG ("%1$s: Received HELLO from %1$s\n",
2198 GNUNET_i2s (&socket->other_peer));
2199 port = (uint32_t) GNUNET_ntohll (hello->port);
2200 switch (socket->state)
2201 {
2202 case STATE_INIT:
2203 if (port != socket->port)
2204 {
2205 LOG_DEBUG ("Ignoring HELLO for port %u\n", port);
2206 GNUNET_MESH_tunnel_destroy (tunnel);
2207 GNUNET_free (socket);
2208 return GNUNET_OK;
2209 }
2210 reply = generate_hello_ack (socket, GNUNET_YES);
2211 queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
2212 GNUNET_NO);
2213 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
2214 socket->control_retransmission_task_id);
2215 socket->control_retransmission_task_id =
2216 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2217 &control_retransmission_task, socket);
2218 break;
2219 case STATE_HELLO_WAIT:
2220 /* Perhaps our HELLO_ACK was lost */
2221 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
2222 socket->control_retransmission_task_id);
2223 GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2224 socket->control_retransmission_task_id =
2225 GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
2226 break;
2227 default:
2228 LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
2229 GNUNET_i2s (&socket->other_peer), socket->state);
2230 /* FIXME: Send RESET? */
2231 }
2232 return GNUNET_OK;
2233}
2234
2235
2236/**
2237 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
2238 *
2239 * @param cls the closure
2240 * @param tunnel connection to the other end
2241 * @param tunnel_ctx the socket
2242 * @param message the actual message
2243 * @return GNUNET_OK to keep the connection open,
2244 * GNUNET_SYSERR to close it (signal serious error)
2245 */
2246static int
2247server_handle_hello_ack (void *cls,
2248 struct GNUNET_MESH_Tunnel *tunnel,
2249 void **tunnel_ctx,
2250 const struct GNUNET_MessageHeader *message)
2251{
2252 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2253 const struct GNUNET_STREAM_HelloAckMessage *ack_message;
2254
2255 GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
2256 ntohs (message->type));
2257 GNUNET_assert (socket->tunnel == tunnel);
2258 ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
2259 switch (socket->state)
2260 {
2261 case STATE_HELLO_WAIT:
2262 LOG (GNUNET_ERROR_TYPE_DEBUG,
2263 "%s: Received HELLO_ACK from %s\n",
2264 GNUNET_i2s (&socket->other_peer),
2265 GNUNET_i2s (&socket->other_peer));
2266 socket->read_sequence_number = ntohl (ack_message->sequence_number);
2267 LOG (GNUNET_ERROR_TYPE_DEBUG,
2268 "%s: Read sequence number %u\n",
2269 GNUNET_i2s (&socket->other_peer),
2270 (unsigned int) socket->read_sequence_number);
2271 socket->receiver_window_available =
2272 ntohl (ack_message->receiver_window_size);
2273 set_state_established (NULL, socket);
2274 break;
2275 default:
2276 LOG (GNUNET_ERROR_TYPE_DEBUG,
2277 "Client sent HELLO_ACK when in state %d\n", socket->state);
2278 }
2279 return GNUNET_OK;
2280}
2281
2282
2283/**
2284 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
2285 *
2286 * @param cls the closure
2287 * @param tunnel connection to the other end
2288 * @param tunnel_ctx the socket
2289 * @param message the actual message
2290 * @return GNUNET_OK to keep the connection open,
2291 * GNUNET_SYSERR to close it (signal serious error)
2292 */
2293static int
2294server_handle_reset (void *cls,
2295 struct GNUNET_MESH_Tunnel *tunnel,
2296 void **tunnel_ctx,
2297 const struct GNUNET_MessageHeader *message)
2298{
2299 // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2300 /* FIXME */
2301 return GNUNET_OK;
2302}
2303
2304
2305/**
2306 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
2307 *
2308 * @param cls the closure
2309 * @param tunnel connection to the other end
2310 * @param tunnel_ctx the socket
2311 * @param message the actual message
2312 * @return GNUNET_OK to keep the connection open,
2313 * GNUNET_SYSERR to close it (signal serious error)
2314 */
2315static int
2316server_handle_transmit_close (void *cls,
2317 struct GNUNET_MESH_Tunnel *tunnel,
2318 void **tunnel_ctx,
2319 const struct GNUNET_MessageHeader *message)
2320{
2321 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2322
2323 return handle_transmit_close (socket, tunnel, message);
2324}
2325
2326
2327/**
2328 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
2329 *
2330 * @param cls the closure
2331 * @param tunnel connection to the other end
2332 * @param tunnel_ctx the socket
2333 * @param message the actual message
2334 * @return GNUNET_OK to keep the connection open,
2335 * GNUNET_SYSERR to close it (signal serious error)
2336 */
2337static int
2338server_handle_transmit_close_ack (void *cls,
2339 struct GNUNET_MESH_Tunnel *tunnel,
2340 void **tunnel_ctx,
2341 const struct GNUNET_MessageHeader *message)
2342{
2343 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2344
2345 return handle_generic_close_ack (socket, tunnel, message, SHUT_WR);
2346}
2347
2348
2349/**
2350 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
2351 *
2352 * @param cls the closure
2353 * @param tunnel connection to the other end
2354 * @param tunnel_ctx the socket
2355 * @param message the actual message
2356 * @return GNUNET_OK to keep the connection open,
2357 * GNUNET_SYSERR to close it (signal serious error)
2358 */
2359static int
2360server_handle_receive_close (void *cls,
2361 struct GNUNET_MESH_Tunnel *tunnel,
2362 void **tunnel_ctx,
2363 const struct GNUNET_MessageHeader *message)
2364{
2365 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2366
2367 return handle_receive_close (socket, tunnel, message);
2368}
2369
2370
2371/**
2372 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
2373 *
2374 * @param cls the closure
2375 * @param tunnel connection to the other end
2376 * @param tunnel_ctx the socket
2377 * @param message the actual message
2378 * @return GNUNET_OK to keep the connection open,
2379 * GNUNET_SYSERR to close it (signal serious error)
2380 */
2381static int
2382server_handle_receive_close_ack (void *cls,
2383 struct GNUNET_MESH_Tunnel *tunnel,
2384 void **tunnel_ctx,
2385 const struct GNUNET_MessageHeader *message)
2386{
2387 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2388
2389 return handle_generic_close_ack (socket, tunnel, message, SHUT_RD);
2390}
2391
2392
2393/**
2394 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
2395 *
2396 * @param cls the listen socket (from GNUNET_MESH_connect in
2397 * GNUNET_STREAM_listen)
2398 * @param tunnel connection to the other end
2399 * @param tunnel_ctx the socket
2400 * @param message the actual message
2401 * @return GNUNET_OK to keep the connection open,
2402 * GNUNET_SYSERR to close it (signal serious error)
2403 */
2404static int
2405server_handle_close (void *cls,
2406 struct GNUNET_MESH_Tunnel *tunnel,
2407 void **tunnel_ctx,
2408 const struct GNUNET_MessageHeader *message)
2409{
2410 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2411
2412 return handle_close (socket, tunnel, message);
2413}
2414
2415
2416/**
2417 * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
2418 *
2419 * @param cls the closure
2420 * @param tunnel connection to the other end
2421 * @param tunnel_ctx the socket
2422 * @param message the actual message
2423 * @return GNUNET_OK to keep the connection open,
2424 * GNUNET_SYSERR to close it (signal serious error)
2425 */
2426static int
2427server_handle_close_ack (void *cls,
2428 struct GNUNET_MESH_Tunnel *tunnel,
2429 void **tunnel_ctx,
2430 const struct GNUNET_MessageHeader *message)
2431{
2432 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2433
2434 return handle_generic_close_ack (socket, tunnel, message, SHUT_RDWR);
2435}
2436
2437
2438/**
2439 * Handler for DATA_ACK messages
2440 *
2441 * @param socket the socket through which the ack was received
2442 * @param tunnel connection to the other end
2443 * @param ack the acknowledgment message
2444 * @return GNUNET_OK to keep the connection open,
2445 * GNUNET_SYSERR to close it (signal serious error)
2446 */
2447static int
2448handle_ack (struct GNUNET_STREAM_Socket *socket,
2449 struct GNUNET_MESH_Tunnel *tunnel,
2450 const struct GNUNET_STREAM_AckMessage *ack)
2451{
2452 struct GNUNET_STREAM_WriteHandle *write_handle;
2453 uint64_t ack_bitmap;
2454 unsigned int packet;
2455 int need_retransmission;
2456 uint32_t sequence_difference;
2457
2458 switch (socket->state)
2459 {
2460 case (STATE_ESTABLISHED):
2461 case (STATE_RECEIVE_CLOSED):
2462 case (STATE_RECEIVE_CLOSE_WAIT):
2463 if (NULL == socket->write_handle)
2464 {
2465 LOG (GNUNET_ERROR_TYPE_DEBUG,
2466 "%s: Received DATA_ACK when write_handle is NULL\n",
2467 GNUNET_i2s (&socket->other_peer));
2468 return GNUNET_OK;
2469 }
2470 sequence_difference =
2471 socket->write_sequence_number - ntohl (ack->base_sequence_number);
2472 if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2473 {
2474 LOG (GNUNET_ERROR_TYPE_DEBUG,
2475 "%s: Received DATA_ACK with unexpected base sequence number\n",
2476 GNUNET_i2s (&socket->other_peer));
2477 LOG (GNUNET_ERROR_TYPE_DEBUG,
2478 "%s: Current write sequence: %u; Ack's base sequence: %u\n",
2479 GNUNET_i2s (&socket->other_peer),
2480 socket->write_sequence_number,
2481 ntohl (ack->base_sequence_number));
2482 return GNUNET_OK;
2483 }
2484 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
2485 GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
2486 /* Cancel the retransmission task */
2487 if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2488 {
2489 GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2490 socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2491 socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
2492 }
2493 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2494 {
2495 if (NULL == socket->write_handle->messages[packet])
2496 break;
2497 /* BS: Base sequence from ack; PS: sequence num of current packet */
2498 sequence_difference = ntohl (ack->base_sequence_number)
2499 - ntohl (socket->write_handle->messages[packet]->sequence_number);
2500 if (0 == sequence_difference)
2501 break; /* The message in our handle is not yet received */
2502 /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2503 /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
2504 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
2505 packet, GNUNET_YES);
2506 }
2507 if (((ntohl (ack->base_sequence_number)
2508 - (socket->write_handle->max_ack_base_num))
2509 <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
2510 {
2511 socket->write_handle->max_ack_base_num = ntohl (ack->base_sequence_number);
2512 socket->receiver_window_available =
2513 ntohl (ack->receive_window_remaining);
2514 }
2515 else
2516 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2517 "Ignoring to modify receive window available as base: %u, max_ack_base: %u\n",
2518 ntohl (ack->base_sequence_number),
2519 socket->write_handle->max_ack_base_num);
2520 if ((packet == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2521 || ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
2522 && (NULL == socket->write_handle->messages[packet])))
2523 goto call_write_cont_cb;
2524 GNUNET_assert (ntohl
2525 (socket->write_handle->messages[packet]->sequence_number)
2526 == ntohl (ack->base_sequence_number));
2527 /* Update our bitmap */
2528 ack_bitmap = GNUNET_ntohll (ack->bitmap);
2529 for (; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2530 {
2531 if (NULL == socket->write_handle->messages[packet]) break;
2532 if (ackbitmap_is_bit_set (&ack_bitmap, ntohl
2533 (socket->write_handle->messages[packet]->sequence_number)
2534 - ntohl (ack->base_sequence_number)))
2535 ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet, GNUNET_YES);
2536 }
2537 /* Check if we have received all acknowledgements */
2538 need_retransmission = GNUNET_NO;
2539 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2540 {
2541 if (NULL == socket->write_handle->messages[packet]) break;
2542 if (GNUNET_YES != ackbitmap_is_bit_set
2543 (&socket->write_handle->ack_bitmap,packet))
2544 {
2545 need_retransmission = GNUNET_YES;
2546 break;
2547 }
2548 }
2549 if (GNUNET_YES == need_retransmission)
2550 {
2551 write_data (socket);
2552 return GNUNET_OK;
2553 }
2554
2555 call_write_cont_cb:
2556 /* Free the packets */
2557 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
2558 {
2559 GNUNET_free_non_null (socket->write_handle->messages[packet]);
2560 }
2561 write_handle = socket->write_handle;
2562 socket->write_handle = NULL;
2563 if (NULL != write_handle->write_cont)
2564 write_handle->write_cont (write_handle->write_cont_cls,
2565 GNUNET_STREAM_OK,
2566 write_handle->size);
2567 /* We are done with the write handle - Freeing it */
2568 GNUNET_free (write_handle);
2569 LOG (GNUNET_ERROR_TYPE_DEBUG,
2570 "%s: Write completion callback completed\n",
2571 GNUNET_i2s (&socket->other_peer));
2572 break;
2573 default:
2574 break;
2575 }
2576 return GNUNET_OK;
2577}
2578
2579
2580/**
2581 * Handler for DATA_ACK messages
2582 *
2583 * @param cls the 'struct GNUNET_STREAM_Socket'
2584 * @param tunnel connection to the other end
2585 * @param tunnel_ctx unused
2586 * @param message the actual message
2587 * @return GNUNET_OK to keep the connection open,
2588 * GNUNET_SYSERR to close it (signal serious error)
2589 */
2590static int
2591client_handle_ack (void *cls,
2592 struct GNUNET_MESH_Tunnel *tunnel,
2593 void **tunnel_ctx,
2594 const struct GNUNET_MessageHeader *message)
2595{
2596 struct GNUNET_STREAM_Socket *socket = cls;
2597 const struct GNUNET_STREAM_AckMessage *ack;
2598
2599 ack = (const struct GNUNET_STREAM_AckMessage *) message;
2600 return handle_ack (socket, tunnel, ack);
2601}
2602
2603
2604/**
2605 * Handler for DATA_ACK messages
2606 *
2607 * @param cls the server's listen socket
2608 * @param tunnel connection to the other end
2609 * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
2610 * @param message the actual message
2611 * @return GNUNET_OK to keep the connection open,
2612 * GNUNET_SYSERR to close it (signal serious error)
2613 */
2614static int
2615server_handle_ack (void *cls,
2616 struct GNUNET_MESH_Tunnel *tunnel,
2617 void **tunnel_ctx,
2618 const struct GNUNET_MessageHeader *message)
2619{
2620 struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
2621 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
2622
2623 return handle_ack (socket, tunnel, ack);
2624}
2625
2626
2627/**
2628 * For client message handlers, the stream socket is in the
2629 * closure argument.
2630 */
2631static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
2632 {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2633 {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
2634 sizeof (struct GNUNET_STREAM_AckMessage) },
2635 {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2636 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2637 {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2638 sizeof (struct GNUNET_MessageHeader)},
2639 {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2640 sizeof (struct GNUNET_MessageHeader)},
2641 {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2642 sizeof (struct GNUNET_MessageHeader)},
2643 {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2644 sizeof (struct GNUNET_MessageHeader)},
2645 {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2646 sizeof (struct GNUNET_MessageHeader)},
2647 {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2648 sizeof (struct GNUNET_MessageHeader)},
2649 {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2650 sizeof (struct GNUNET_MessageHeader)},
2651 {NULL, 0, 0}
2652};
2653
2654
2655/**
2656 * For server message handlers, the stream socket is in the
2657 * tunnel context, and the listen socket in the closure argument.
2658 */
2659static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
2660 {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
2661 {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
2662 sizeof (struct GNUNET_STREAM_AckMessage) },
2663 {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO,
2664 sizeof (struct GNUNET_STREAM_HelloMessage)},
2665 {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
2666 sizeof (struct GNUNET_STREAM_HelloAckMessage)},
2667 {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
2668 sizeof (struct GNUNET_MessageHeader)},
2669 {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
2670 sizeof (struct GNUNET_MessageHeader)},
2671 {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
2672 sizeof (struct GNUNET_MessageHeader)},
2673 {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
2674 sizeof (struct GNUNET_MessageHeader)},
2675 {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
2676 sizeof (struct GNUNET_MessageHeader)},
2677 {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
2678 sizeof (struct GNUNET_MessageHeader)},
2679 {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
2680 sizeof (struct GNUNET_MessageHeader)},
2681 {NULL, 0, 0}
2682};
2683
2684
2685/**
2686 * Function called when our target peer is connected to our tunnel
2687 *
2688 * @param cls the socket for which this tunnel is created
2689 * @param peer the peer identity of the target
2690 * @param atsi performance data for the connection
2691 *
2692 * FIXME static
2693 */
2694void
2695mesh_peer_connect_callback (void *cls,
2696 const struct GNUNET_PeerIdentity *peer,
2697 const struct GNUNET_ATS_Information * atsi)
2698{
2699 struct GNUNET_STREAM_Socket *socket = cls;
2700 struct GNUNET_MessageHeader *message;
2701
2702 if (0 != memcmp (peer,
2703 &socket->other_peer,
2704 sizeof (struct GNUNET_PeerIdentity)))
2705 {
2706 LOG (GNUNET_ERROR_TYPE_DEBUG,
2707 "%s: A peer which is not our target has connected to our tunnel\n",
2708 GNUNET_i2s(peer));
2709 return;
2710 }
2711 LOG (GNUNET_ERROR_TYPE_DEBUG,
2712 "%s: Target peer %s connected\n",
2713 GNUNET_i2s (&socket->other_peer),
2714 GNUNET_i2s (&socket->other_peer));
2715 /* Set state to INIT */
2716 socket->state = STATE_INIT;
2717 /* Send HELLO message */
2718 message = generate_hello (socket);
2719 queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
2720 if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2721 GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2722 socket->control_retransmission_task_id =
2723 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
2724 &control_retransmission_task, socket);
2725}
2726
2727
2728/**
2729 * Function called when our target peer is disconnected from our tunnel
2730 *
2731 * @param cls the socket associated which this tunnel
2732 * @param peer the peer identity of the target
2733 *
2734 * FIXME static
2735 */
2736void
2737mesh_peer_disconnect_callback (void *cls,
2738 const struct GNUNET_PeerIdentity *peer)
2739{
2740 struct GNUNET_STREAM_Socket *socket=cls;
2741
2742 /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
2743 LOG_DEBUG ("%1$s: Other peer %1$s disconnected \n",
2744 GNUNET_i2s (&socket->other_peer));
2745}
2746
2747
2748/**
2749 * Method called whenever a peer creates a tunnel to us
2750 *
2751 * @param cls closure
2752 * @param tunnel new handle to the tunnel
2753 * @param initiator peer that started the tunnel
2754 * @param port incoming port
2755 * @return initial tunnel context for the tunnel
2756 * (can be NULL -- that's not an error)
2757 */
2758static void *
2759new_tunnel_notify (void *cls,
2760 struct GNUNET_MESH_Tunnel *tunnel,
2761 const struct GNUNET_PeerIdentity *initiator,
2762 uint32_t port)
2763{
2764 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2765 struct GNUNET_STREAM_Socket *socket;
2766
2767 /* FIXME: If a tunnel is already created, we should not accept new tunnels
2768 from the same peer again until the socket is closed */
2769 if (GNUNET_NO == lsocket->listening)
2770 {
2771 GNUNET_MESH_tunnel_destroy (tunnel);
2772 return NULL;
2773 }
2774 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2775 socket->other_peer = *initiator;
2776 socket->tunnel = tunnel;
2777 socket->state = STATE_INIT;
2778 socket->lsocket = lsocket;
2779 socket->port = lsocket->port;
2780 socket->stat_handle = lsocket->stat_handle;
2781 socket->retransmit_timeout = lsocket->retransmit_timeout;
2782 socket->testing_active = lsocket->testing_active;
2783 socket->testing_set_write_sequence_number_value =
2784 lsocket->testing_set_write_sequence_number_value;
2785 socket->max_payload_size = lsocket->max_payload_size;
2786 LOG_DEBUG ("%1$s: Peer %1$s initiated tunnel to us\n",
2787 GNUNET_i2s (&socket->other_peer));
2788 if (NULL != socket->stat_handle)
2789 {
2790 GNUNET_STATISTICS_update (socket->stat_handle,
2791 "total inbound connections received",
2792 1, GNUNET_NO);
2793 GNUNET_STATISTICS_update (socket->stat_handle,
2794 "inbound connections", 1, GNUNET_NO);
2795 }
2796 return socket;
2797}
2798
2799
2800/**
2801 * Function called whenever an inbound tunnel is destroyed. Should clean up
2802 * any associated state. This function is NOT called if the client has
2803 * explicitly asked for the tunnel to be destroyed using
2804 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
2805 * the tunnel.
2806 *
2807 * @param cls closure (set from GNUNET_MESH_connect)
2808 * @param tunnel connection to the other end (henceforth invalid)
2809 * @param tunnel_ctx place where local state associated
2810 * with the tunnel is stored
2811 */
2812static void
2813tunnel_cleaner (void *cls,
2814 const struct GNUNET_MESH_Tunnel *tunnel,
2815 void *tunnel_ctx)
2816{
2817 struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
2818 struct MessageQueue *head;
2819
2820 GNUNET_assert (tunnel == socket->tunnel);
2821 GNUNET_break_op(0);
2822 LOG (GNUNET_ERROR_TYPE_DEBUG,
2823 "%s: Peer %s has terminated connection abruptly\n",
2824 GNUNET_i2s (&socket->other_peer),
2825 GNUNET_i2s (&socket->other_peer));
2826 if (NULL != socket->stat_handle)
2827 {
2828 GNUNET_STATISTICS_update (socket->stat_handle,
2829 "connections terminated abruptly", 1, GNUNET_NO);
2830 GNUNET_STATISTICS_update (socket->stat_handle,
2831 "inbound connections", -1, GNUNET_NO);
2832 }
2833 /* Clear Transmit handles */
2834 if (NULL != socket->transmit_handle)
2835 {
2836 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
2837 socket->transmit_handle = NULL;
2838 }
2839 /* Stop Tasks using socket->tunnel */
2840 if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
2841 {
2842 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
2843 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
2844 }
2845 if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
2846 {
2847 GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
2848 socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2849 }
2850 /* Terminate the control retransmission tasks */
2851 if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
2852 {
2853 GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
2854 socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
2855 }
2856 /* Clear existing message queue */
2857 while (NULL != (head = socket->queue_head)) {
2858 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
2859 socket->queue_tail,
2860 head);
2861 GNUNET_free (head->message);
2862 GNUNET_free (head);
2863 }
2864 socket->tunnel = NULL;
2865}
2866
2867
2868/**
2869 * Callback to signal timeout on lockmanager lock acquire
2870 *
2871 * @param cls the ListenSocket
2872 * @param tc the scheduler task context
2873 */
2874static void
2875lockmanager_acquire_timeout (void *cls,
2876 const struct GNUNET_SCHEDULER_TaskContext *tc)
2877{
2878 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2879 GNUNET_STREAM_ListenCallback listen_cb;
2880 void *listen_cb_cls;
2881
2882 lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2883 listen_cb = lsocket->listen_cb;
2884 listen_cb_cls = lsocket->listen_cb_cls;
2885 if (NULL != listen_cb)
2886 listen_cb (listen_cb_cls, NULL, NULL);
2887}
2888
2889
2890/**
2891 * Callback to notify us on the status changes on app_port lock
2892 *
2893 * @param cls the ListenSocket
2894 * @param domain the domain name of the lock
2895 * @param lock the app_port
2896 * @param status the current status of the lock
2897 */
2898static void
2899lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
2900 enum GNUNET_LOCKMANAGER_Status status)
2901{
2902 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
2903
2904 GNUNET_assert (lock == (uint32_t) lsocket->port);
2905 if (GNUNET_LOCKMANAGER_SUCCESS == status)
2906 {
2907 lsocket->listening = GNUNET_YES;
2908 if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
2909 {
2910 GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
2911 lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
2912 }
2913 if (NULL == lsocket->mesh)
2914 {
2915 uint32_t ports[] = {lsocket->port, 0};
2916
2917 lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
2918 lsocket, /* Closure */
2919 &new_tunnel_notify,
2920 &tunnel_cleaner,
2921 server_message_handlers,
2922 ports);
2923 GNUNET_assert (NULL != lsocket->mesh);
2924 if (NULL != lsocket->listen_ok_cb)
2925 {
2926 (void) lsocket->listen_ok_cb ();
2927 }
2928 }
2929 }
2930 if (GNUNET_LOCKMANAGER_RELEASE == status)
2931 lsocket->listening = GNUNET_NO;
2932}
2933
2934
2935/*****************/
2936/* API functions */
2937/*****************/
2938
2939
2940/**
2941 * Tries to open a stream to the target peer
2942 *
2943 * @param cfg configuration to use
2944 * @param target the target peer to which the stream has to be opened
2945 * @param app_port the application port number which uniquely identifies this
2946 * stream
2947 * @param open_cb this function will be called after stream has be established;
2948 * cannot be NULL
2949 * @param open_cb_cls the closure for open_cb
2950 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
2951 * @return if successful it returns the stream socket; NULL if stream cannot be
2952 * opened
2953 */
2954struct GNUNET_STREAM_Socket *
2955GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
2956 const struct GNUNET_PeerIdentity *target,
2957 uint32_t app_port,
2958 GNUNET_STREAM_OpenCallback open_cb,
2959 void *open_cb_cls,
2960 ...)
2961{
2962 struct GNUNET_STREAM_Socket *socket;
2963 enum GNUNET_STREAM_Option option;
2964 va_list vargs;
2965 uint16_t payload_size;
2966
2967 LOG (GNUNET_ERROR_TYPE_DEBUG,
2968 "%s\n", __func__);
2969 GNUNET_assert (NULL != open_cb);
2970 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
2971 socket->other_peer = *target;
2972 socket->open_cb = open_cb;
2973 socket->open_cls = open_cb_cls;
2974 socket->port = app_port;
2975 /* Set defaults */
2976 socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
2977 socket->testing_active = GNUNET_NO;
2978 socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
2979 va_start (vargs, open_cb_cls); /* Parse variable args */
2980 do {
2981 option = va_arg (vargs, enum GNUNET_STREAM_Option);
2982 switch (option)
2983 {
2984 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
2985 /* Expect struct GNUNET_TIME_Relative */
2986 socket->retransmit_timeout = va_arg (vargs,
2987 struct GNUNET_TIME_Relative);
2988 break;
2989 case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
2990 socket->testing_active = GNUNET_YES;
2991 socket->testing_set_write_sequence_number_value = va_arg (vargs,
2992 uint32_t);
2993 break;
2994 case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
2995 GNUNET_break (0); /* Option irrelevant in STREAM_open */
2996 break;
2997 case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
2998 GNUNET_break (0); /* Option irrelevant in STREAM_open */
2999 break;
3000 case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3001 payload_size = (uint16_t) va_arg (vargs, unsigned int);
3002 GNUNET_assert (0 != payload_size);
3003 if (payload_size < socket->max_payload_size)
3004 socket->max_payload_size = payload_size;
3005 break;
3006 case GNUNET_STREAM_OPTION_END:
3007 break;
3008 }
3009 } while (GNUNET_STREAM_OPTION_END != option);
3010 va_end (vargs); /* End of variable args parsing */
3011 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
3012 socket, /* cls */
3013 NULL, /* No inbound tunnel handler */
3014 NULL, /* No in-tunnel cleaner */
3015 client_message_handlers,
3016 NULL); /* We don't get inbound tunnels */
3017 if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */
3018 {
3019 GNUNET_free (socket);
3020 return NULL;
3021 }
3022 /* Now create the mesh tunnel to target */
3023 LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
3024 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
3025 socket, /* Tunnel context */
3026 &socket->other_peer,
3027 STREAM_PORT, 1, 0);
3028 GNUNET_assert (NULL != socket->tunnel);
3029 socket->stat_handle = GNUNET_STATISTICS_create ("stream", cfg);
3030 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
3031 return socket;
3032}
3033
3034
3035/**
3036 * Shutdown the stream for reading or writing (similar to man 2 shutdown).
3037 *
3038 * @param socket the stream socket
3039 * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
3040 * @param completion_cb the callback that will be called upon successful
3041 * shutdown of given operation
3042 * @param completion_cls the closure for the completion callback
3043 * @return the shutdown handle
3044 */
3045struct GNUNET_STREAM_ShutdownHandle *
3046GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
3047 int operation,
3048 GNUNET_STREAM_ShutdownCompletion completion_cb,
3049 void *completion_cls)
3050{
3051 struct GNUNET_STREAM_ShutdownHandle *handle;
3052 struct GNUNET_MessageHeader *msg;
3053
3054 GNUNET_assert (NULL == socket->shutdown_handle);
3055 handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
3056 handle->socket = socket;
3057 handle->completion_cb = completion_cb;
3058 handle->completion_cls = completion_cls;
3059 socket->shutdown_handle = handle;
3060 if ( ((GNUNET_YES == socket->receive_closed) && (SHUT_RD == operation))
3061 || ((GNUNET_YES == socket->transmit_closed) && (SHUT_WR == operation))
3062 || ((GNUNET_YES == socket->transmit_closed)
3063 && (GNUNET_YES == socket->receive_closed)
3064 && (SHUT_RDWR == operation)) )
3065 {
3066 handle->operation = operation;
3067 handle->call_cont_task_id = GNUNET_SCHEDULER_add_now (&call_cont_task,
3068 socket);
3069 return handle;
3070 }
3071 msg = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
3072 msg->size = htons (sizeof (struct GNUNET_MessageHeader));
3073 switch (operation)
3074 {
3075 case SHUT_RD:
3076 handle->operation = SHUT_RD;
3077 if (NULL != socket->read_handle)
3078 LOG (GNUNET_ERROR_TYPE_WARNING,
3079 "Existing read handle should be cancelled before shutting"
3080 " down reading\n");
3081 msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
3082 queue_message (socket, msg, &set_state_receive_close_wait, NULL,
3083 GNUNET_NO);
3084 socket->receive_closed = GNUNET_YES;
3085 break;
3086 case SHUT_WR:
3087 handle->operation = SHUT_WR;
3088 if (NULL != socket->write_handle)
3089 LOG (GNUNET_ERROR_TYPE_WARNING,
3090 "Existing write handle should be cancelled before shutting"
3091 " down writing\n");
3092 msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
3093 queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
3094 GNUNET_NO);
3095 socket->transmit_closed = GNUNET_YES;
3096 break;
3097 case SHUT_RDWR:
3098 handle->operation = SHUT_RDWR;
3099 if (NULL != socket->write_handle)
3100 LOG (GNUNET_ERROR_TYPE_WARNING,
3101 "Existing write handle should be cancelled before shutting"
3102 " down writing\n");
3103 if (NULL != socket->read_handle)
3104 LOG (GNUNET_ERROR_TYPE_WARNING,
3105 "Existing read handle should be cancelled before shutting"
3106 " down reading\n");
3107 msg->type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
3108 queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
3109 socket->transmit_closed = GNUNET_YES;
3110 socket->receive_closed = GNUNET_YES;
3111 break;
3112 default:
3113 LOG (GNUNET_ERROR_TYPE_WARNING,
3114 "GNUNET_STREAM_shutdown called with invalid value for "
3115 "parameter operation -- Ignoring\n");
3116 GNUNET_free (msg);
3117 GNUNET_free (handle);
3118 return NULL;
3119 }
3120 handle->close_msg_retransmission_task_id =
3121 GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
3122 &close_msg_retransmission_task,
3123 handle);
3124 return handle;
3125}
3126
3127
3128/**
3129 * Cancels a pending shutdown. Note that the shutdown messages may already be
3130 * sent and the stream is shutdown already for the operation given to
3131 * GNUNET_STREAM_shutdown(). This function only clears up any retranmissions of
3132 * shutdown messages and frees the shutdown handle.
3133 *
3134 * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
3135 */
3136void
3137GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
3138{
3139 if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
3140 GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
3141 if (GNUNET_SCHEDULER_NO_TASK != handle->call_cont_task_id)
3142 GNUNET_SCHEDULER_cancel (handle->call_cont_task_id);
3143 handle->socket->shutdown_handle = NULL;
3144 GNUNET_free (handle);
3145}
3146
3147
3148/**
3149 * Closes the stream
3150 *
3151 * @param socket the stream socket
3152 */
3153void
3154GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
3155{
3156 struct MessageQueue *head;
3157
3158 if (NULL != socket->read_handle)
3159 {
3160 LOG (GNUNET_ERROR_TYPE_WARNING,
3161 "Closing STREAM socket when a read handle is pending\n");
3162 GNUNET_STREAM_read_cancel (socket->read_handle);
3163 }
3164 if (NULL != socket->write_handle)
3165 {
3166 LOG (GNUNET_ERROR_TYPE_WARNING,
3167 "Closing STREAM socket when a write handle is pending\n");
3168 GNUNET_STREAM_write_cancel (socket->write_handle);
3169 //socket->write_handle = NULL;
3170 }
3171 /* Terminate the ack'ing task if they are still present */
3172 if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
3173 {
3174 GNUNET_SCHEDULER_cancel (socket->ack_task_id);
3175 socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
3176 }
3177 /* Terminate the control retransmission tasks */
3178 if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
3179 {
3180 GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
3181 }
3182 /* Clear Transmit handles */
3183 if (NULL != socket->transmit_handle)
3184 {
3185 GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
3186 socket->transmit_handle = NULL;
3187 }
3188 /* Clear existing message queue */
3189 while (NULL != (head = socket->queue_head)) {
3190 GNUNET_CONTAINER_DLL_remove (socket->queue_head,
3191 socket->queue_tail,
3192 head);
3193 GNUNET_free (head->message);
3194 GNUNET_free (head);
3195 }
3196 /* Close associated tunnel */
3197 if (NULL != socket->tunnel)
3198 {
3199 GNUNET_MESH_tunnel_destroy (socket->tunnel);
3200 socket->tunnel = NULL;
3201 }
3202 /* Close mesh connection */
3203 if ((NULL != socket->mesh) && (NULL == socket->lsocket))
3204 {
3205 GNUNET_MESH_disconnect (socket->mesh);
3206 socket->mesh = NULL;
3207 }
3208 /* Close statistics connection */
3209 if ( (NULL != socket->stat_handle) && (NULL == socket->lsocket) )
3210 GNUNET_STATISTICS_destroy (socket->stat_handle, GNUNET_YES);
3211 /* Release receive buffer */
3212 if (NULL != socket->receive_buffer)
3213 {
3214 GNUNET_free (socket->receive_buffer);
3215 }
3216 GNUNET_free (socket);
3217}
3218
3219
3220/**
3221 * Listens for stream connections for a specific application ports
3222 *
3223 * @param cfg the configuration to use
3224 * @param app_port the application port for which new streams will be accepted
3225 * @param listen_cb this function will be called when a peer tries to establish
3226 * a stream with us
3227 * @param listen_cb_cls closure for listen_cb
3228 * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
3229 * @return listen socket, NULL for any error
3230 */
3231struct GNUNET_STREAM_ListenSocket *
3232GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
3233 uint32_t app_port,
3234 GNUNET_STREAM_ListenCallback listen_cb,
3235 void *listen_cb_cls,
3236 ...)
3237{
3238 struct GNUNET_STREAM_ListenSocket *lsocket;
3239 struct GNUNET_TIME_Relative listen_timeout;
3240 enum GNUNET_STREAM_Option option;
3241 va_list vargs;
3242 uint16_t payload_size;
3243
3244 GNUNET_assert (NULL != listen_cb);
3245 lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
3246 lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
3247 lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
3248 if (NULL == lsocket->lockmanager)
3249 {
3250 GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3251 GNUNET_free (lsocket);
3252 return NULL;
3253 }
3254 lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */
3255 /* Set defaults */
3256 lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
3257 lsocket->testing_active = GNUNET_NO;
3258 lsocket->listen_ok_cb = NULL;
3259 lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
3260 listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */
3261 va_start (vargs, listen_cb_cls);
3262 do {
3263 option = va_arg (vargs, enum GNUNET_STREAM_Option);
3264 switch (option)
3265 {
3266 case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
3267 lsocket->retransmit_timeout = va_arg (vargs,
3268 struct GNUNET_TIME_Relative);
3269 break;
3270 case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
3271 lsocket->testing_active = GNUNET_YES;
3272 lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
3273 uint32_t);
3274 break;
3275 case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
3276 listen_timeout = GNUNET_TIME_relative_multiply
3277 (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
3278 break;
3279 case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
3280 lsocket->listen_ok_cb = va_arg (vargs,
3281 GNUNET_STREAM_ListenSuccessCallback);
3282 break;
3283 case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
3284 payload_size = (uint16_t) va_arg (vargs, unsigned int);
3285 GNUNET_assert (0 != payload_size);
3286 if (payload_size < lsocket->max_payload_size)
3287 lsocket->max_payload_size = payload_size;
3288 break;
3289 case GNUNET_STREAM_OPTION_END:
3290 break;
3291 }
3292 } while (GNUNET_STREAM_OPTION_END != option);
3293 va_end (vargs);
3294 lsocket->port = app_port;
3295 lsocket->listen_cb = listen_cb;
3296 lsocket->listen_cb_cls = listen_cb_cls;
3297 lsocket->locking_request =
3298 GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
3299 (uint32_t) lsocket->port,
3300 &lock_status_change_cb, lsocket);
3301 lsocket->lockmanager_acquire_timeout_task =
3302 GNUNET_SCHEDULER_add_delayed (listen_timeout,
3303 &lockmanager_acquire_timeout, lsocket);
3304 lsocket->stat_handle = GNUNET_STATISTICS_create ("stream",
3305 lsocket->cfg);
3306 return lsocket;
3307}
3308
3309
3310/**
3311 * Closes the listen socket
3312 *
3313 * @param lsocket the listen socket
3314 */
3315void
3316GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
3317{
3318 /* Close MESH connection */
3319 if (NULL != lsocket->mesh)
3320 GNUNET_MESH_disconnect (lsocket->mesh);
3321 if (NULL != lsocket->stat_handle)
3322 GNUNET_STATISTICS_destroy (lsocket->stat_handle, GNUNET_YES);
3323 GNUNET_CONFIGURATION_destroy (lsocket->cfg);
3324 if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
3325 GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
3326 if (NULL != lsocket->locking_request)
3327 GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
3328 if (NULL != lsocket->lockmanager)
3329 GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
3330 GNUNET_free (lsocket);
3331}
3332
3333
3334/**
3335 * Tries to write the given data to the stream. The maximum size of data that
3336 * can be written per a write operation is ~ 4MB (64 * (64000 - sizeof (struct
3337 * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
3338 * violation, however only the said number of maximum bytes will be written.
3339 *
3340 * @param socket the socket representing a stream
3341 * @param data the data buffer from where the data is written into the stream
3342 * @param size the number of bytes to be written from the data buffer
3343 * @param timeout the timeout period
3344 * @param write_cont the function to call upon writing some bytes into the
3345 * stream
3346 * @param write_cont_cls the closure
3347 *
3348 * @return handle to cancel the operation; if a previous write is pending NULL
3349 * is returned. If the stream has been shutdown for this operation or
3350 * is broken then write_cont is immediately called and NULL is
3351 * returned.
3352 */
3353struct GNUNET_STREAM_WriteHandle *
3354GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
3355 const void *data,
3356 size_t size,
3357 struct GNUNET_TIME_Relative timeout,
3358 GNUNET_STREAM_CompletionContinuation write_cont,
3359 void *write_cont_cls)
3360{
3361 struct GNUNET_STREAM_WriteHandle *io_handle;
3362 struct GNUNET_STREAM_DataMessage *dmsg;
3363 const void *sweep;
3364 struct GNUNET_TIME_Relative ack_deadline;
3365 unsigned int num_needed_packets;
3366 unsigned int cnt;
3367 uint32_t packet_size;
3368 uint32_t payload_size;
3369 uint16_t max_data_packet_size;
3370
3371 LOG (GNUNET_ERROR_TYPE_DEBUG,
3372 "%s\n", __func__);
3373 if (NULL != socket->write_handle)
3374 {
3375 GNUNET_break (0);
3376 return NULL;
3377 }
3378 if (NULL == socket->tunnel)
3379 {
3380 if (NULL != write_cont)
3381 write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3382 return NULL;
3383 }
3384 switch (socket->state)
3385 {
3386 case STATE_TRANSMIT_CLOSED:
3387 case STATE_TRANSMIT_CLOSE_WAIT:
3388 case STATE_CLOSED:
3389 case STATE_CLOSE_WAIT:
3390 if (NULL != write_cont)
3391 write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
3392 LOG (GNUNET_ERROR_TYPE_DEBUG,
3393 "%s() END\n", __func__);
3394 return NULL;
3395 case STATE_INIT:
3396 case STATE_LISTEN:
3397 case STATE_HELLO_WAIT:
3398 if (NULL != write_cont)
3399 write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
3400 LOG (GNUNET_ERROR_TYPE_DEBUG,
3401 "%s() END\n", __func__);
3402 return NULL;
3403 case STATE_ESTABLISHED:
3404 case STATE_RECEIVE_CLOSED:
3405 case STATE_RECEIVE_CLOSE_WAIT:
3406 break;
3407 }
3408 if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
3409 size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size;
3410 num_needed_packets =
3411 (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
3412 io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_WriteHandle));
3413 io_handle->socket = socket;
3414 io_handle->write_cont = write_cont;
3415 io_handle->write_cont_cls = write_cont_cls;
3416 io_handle->size = size;
3417 io_handle->packets_sent = 0;
3418 sweep = data;
3419 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3420 determined from RTT */
3421 ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
3422 /* Divide the given buffer into packets for sending */
3423 max_data_packet_size =
3424 socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3425 io_handle->max_ack_base_num = socket->write_sequence_number;
3426 for (cnt=0; cnt < num_needed_packets; cnt++)
3427 {
3428 if ((cnt + 1) * socket->max_payload_size < size)
3429 {
3430 payload_size = socket->max_payload_size;
3431 packet_size = max_data_packet_size;
3432 }
3433 else
3434 {
3435 payload_size = size - (cnt * socket->max_payload_size);
3436 packet_size = payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
3437 }
3438 dmsg = GNUNET_malloc (packet_size);
3439 dmsg->header.size = htons (packet_size);
3440 dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
3441 dmsg->sequence_number = htonl (socket->write_sequence_number++);
3442 dmsg->offset = htonl (socket->write_offset);
3443 /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
3444 determined from RTT */
3445 dmsg->ack_deadline = GNUNET_TIME_relative_hton (ack_deadline);
3446 /* Copy data from given buffer to the packet */
3447 memcpy (&dmsg[1], sweep, payload_size);
3448 io_handle->messages[cnt] = dmsg;
3449 sweep += payload_size;
3450 socket->write_offset += payload_size;
3451 }
3452 /* ack the last data message. FIXME: remove when we figure out how to do this
3453 using RTT */
3454 io_handle->messages[num_needed_packets - 1]->ack_deadline =
3455 GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
3456 socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
3457 socket->write_handle = io_handle;
3458 write_data (socket);
3459 LOG (GNUNET_ERROR_TYPE_DEBUG,
3460 "%s() END\n", __func__);
3461 return io_handle;
3462}
3463
3464
3465/**
3466 * Function to check the ACK bitmap for any received messages and call the data processor
3467 *
3468 * @param cls the socket
3469 * @param tc the scheduler task context
3470 */
3471static void
3472probe_data_availability (void *cls,
3473 const struct GNUNET_SCHEDULER_TaskContext *tc)
3474{
3475 struct GNUNET_STREAM_Socket *socket = cls;
3476
3477 GNUNET_assert (NULL != socket->read_handle);
3478 socket->read_handle->probe_data_availability_task_id =
3479 GNUNET_SCHEDULER_NO_TASK;
3480 if (GNUNET_SCHEDULER_NO_TASK != socket->read_handle->read_task_id)
3481 return; /* A task to call read processor is present */
3482 if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
3483 0))
3484 socket->read_handle->read_task_id
3485 = GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
3486}
3487
3488
3489/**
3490 * Tries to read data from the stream. Should not be called when another read
3491 * handle is present; the existing read handle should be canceled with
3492 * GNUNET_STREAM_read_cancel(). Only one read handle per socket is present at
3493 * any time
3494 *
3495 * @param socket the socket representing a stream
3496 * @param timeout the timeout period
3497 * @param proc function to call with data (once only)
3498 * @param proc_cls the closure for proc
3499 * @return handle to cancel the operation; NULL is returned if the stream has
3500 * been shutdown for this type of opeartion (the DataProcessor is
3501 * immediately called with GNUNET_STREAM_SHUTDOWN as status)
3502 */
3503struct GNUNET_STREAM_ReadHandle *
3504GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
3505 struct GNUNET_TIME_Relative timeout,
3506 GNUNET_STREAM_DataProcessor proc,
3507 void *proc_cls)
3508{
3509 struct GNUNET_STREAM_ReadHandle *read_handle;
3510
3511 LOG (GNUNET_ERROR_TYPE_DEBUG,
3512 "%s: %s()\n",
3513 GNUNET_i2s (&socket->other_peer),
3514 __func__);
3515 /* Only one read handle is permitted at any time; cancel the existing or wait
3516 for it to complete */
3517 GNUNET_assert (NULL == socket->read_handle);
3518 GNUNET_assert (NULL != proc);
3519 if (GNUNET_YES == socket->receive_closed)
3520 return NULL;
3521 switch (socket->state)
3522 {
3523 case STATE_RECEIVE_CLOSED:
3524 case STATE_RECEIVE_CLOSE_WAIT:
3525 case STATE_CLOSED:
3526 case STATE_CLOSE_WAIT:
3527 LOG (GNUNET_ERROR_TYPE_DEBUG,
3528 "%s: %s() END\n",
3529 GNUNET_i2s (&socket->other_peer),
3530 __func__);
3531 proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
3532 return NULL;
3533 default:
3534 break;
3535 }
3536 read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ReadHandle));
3537 read_handle->proc = proc;
3538 read_handle->proc_cls = proc_cls;
3539 read_handle->socket = socket;
3540 socket->read_handle = read_handle;
3541 read_handle->probe_data_availability_task_id =
3542 GNUNET_SCHEDULER_add_now (&probe_data_availability, socket);
3543 read_handle->read_io_timeout_task_id =
3544 GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
3545 LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
3546 GNUNET_i2s (&socket->other_peer), __func__);
3547 return read_handle;
3548}
3549
3550
3551/**
3552 * Cancels pending write operation. Also cancels packet retransmissions which
3553 * may have resulted otherwise.
3554 *
3555 * CAUTION: Normally a write operation is considered successful if the data
3556 * given to it is sent and acknowledged by the receiver. As data is divided
3557 * into packets, it is possible that not all packets are received by the
3558 * receiver. Any missing packets are then retransmitted till the receiver
3559 * acknowledges all packets or until a timeout . During this scenario if the
3560 * write operation is cancelled all such retransmissions are also
3561 * cancelled. This may leave the receiver's receive buffer incompletely filled
3562 * as some missing packets are never retransmitted. So this operation should be
3563 * used before shutting down transmission from our side or before closing the
3564 * socket.
3565 *
3566 * @param wh write operation handle to cancel
3567 */
3568void
3569GNUNET_STREAM_write_cancel (struct GNUNET_STREAM_WriteHandle *wh)
3570{
3571 struct GNUNET_STREAM_Socket *socket = wh->socket;
3572 unsigned int packet;
3573
3574 GNUNET_assert (NULL != socket->write_handle);
3575 GNUNET_assert (socket->write_handle == wh);
3576 if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
3577 {
3578 GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
3579 socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
3580 }
3581 for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
3582 {
3583 if (NULL == wh->messages[packet]) break;
3584 GNUNET_free (wh->messages[packet]);
3585 }
3586 GNUNET_free (socket->write_handle);
3587 socket->write_handle = NULL;
3588}
3589
3590
3591/**
3592 * Cancel pending read operation.
3593 *
3594 * @param rh read operation handle to cancel
3595 */
3596void
3597GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh)
3598{
3599 struct GNUNET_STREAM_Socket *socket;
3600
3601 socket = rh->socket;
3602 GNUNET_assert (NULL != socket->read_handle);
3603 GNUNET_assert (rh == socket->read_handle);
3604 cleanup_read_handle (socket);
3605}
3606
3607
3608/**
3609 * Functions of this signature are called whenever writing operations
3610 * on a stream are executed
3611 *
3612 * @param cls the closure from GNUNET_STREAM_write
3613 * @param status the status of the stream at the time this function is called;
3614 * GNUNET_STREAM_OK if writing to stream was completed successfully;
3615 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
3616 * (this doesn't mean that the data is never sent, the receiver may
3617 * have read the data but its ACKs may have been lost);
3618 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
3619 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
3620 * be processed.
3621 * @param size the number of bytes written
3622 */
3623static void
3624mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status,
3625 size_t size)
3626{
3627 struct GNUNET_MQ_Handle *mq = cls;
3628 struct MQStreamState *mss = GNUNET_MQ_impl_state (mq);
3629
3630 switch (status)
3631 {
3632 case GNUNET_STREAM_OK:
3633 break;
3634 case GNUNET_STREAM_SHUTDOWN:
3635 /* FIXME: call shutdown handler */
3636 return;
3637 case GNUNET_STREAM_TIMEOUT:
3638 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT);
3639 return;
3640 case GNUNET_STREAM_SYSERR:
3641 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
3642 return;
3643 default:
3644 GNUNET_assert (0);
3645 return;
3646 }
3647
3648 mss->wh = NULL;
3649
3650 GNUNET_MQ_impl_send_continue (mq);
3651}
3652
3653
3654static void
3655mq_stream_send_impl (struct GNUNET_MQ_Handle *mq,
3656 const struct GNUNET_MessageHeader *msg, void *impl_state)
3657{
3658 struct MQStreamState *mss = impl_state;
3659
3660 /* no way to cancel sending now */
3661 GNUNET_MQ_impl_send_commit (mq);
3662
3663 mss->wh = GNUNET_STREAM_write (mss->socket, msg, ntohs (msg->size),
3664 GNUNET_TIME_UNIT_FOREVER_REL,
3665 mq_stream_write_queued, mq);
3666}
3667
3668
3669/**
3670 * Functions with this signature are called whenever a
3671 * complete message is received by the tokenizer.
3672 *
3673 * Do not call GNUNET_SERVER_mst_destroy in callback
3674 *
3675 * @param cls closure
3676 * @param client identification of the client
3677 * @param message the actual message
3678 *
3679 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
3680 */
3681static int
3682mq_stream_mst_callback (void *cls, void *client,
3683 const struct GNUNET_MessageHeader *message)
3684{
3685 struct GNUNET_MQ_Handle *mq = cls;
3686
3687 GNUNET_assert (NULL != message);
3688 GNUNET_MQ_inject_message (mq, message);
3689 return GNUNET_OK;
3690}
3691
3692
3693/**
3694 * Functions of this signature are called whenever data is available from the
3695 * stream.
3696 *
3697 * @param cls the closure from GNUNET_STREAM_read
3698 * @param status the status of the stream at the time this function is called
3699 * @param data traffic from the other side
3700 * @param size the number of bytes available in data read; will be 0 on timeout
3701 * @return number of bytes of processed from 'data' (any data remaining should be
3702 * given to the next time the read processor is called).
3703 */
3704static size_t
3705mq_stream_data_processor (void *cls,
3706 enum GNUNET_STREAM_Status status,
3707 const void *data,
3708 size_t size)
3709{
3710 struct GNUNET_MQ_Handle *mq = cls;
3711 struct MQStreamState *mss = GNUNET_MQ_impl_state (mq);
3712 int ret;
3713
3714 switch (status)
3715 {
3716 case GNUNET_STREAM_OK:
3717 break;
3718 case GNUNET_STREAM_SHUTDOWN:
3719 /* FIXME: call shutdown handler */
3720 return 0;
3721 case GNUNET_STREAM_TIMEOUT:
3722 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT);
3723 return 0;
3724 case GNUNET_STREAM_SYSERR:
3725 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
3726 return 0;
3727 default:
3728 GNUNET_assert (0);
3729 return 0;
3730 }
3731
3732 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
3733 if (GNUNET_OK != ret)
3734 {
3735 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
3736 return 0;
3737 }
3738 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
3739 mq_stream_data_processor, mq);
3740 /* we always read all data */
3741 return size;
3742}
3743
3744
3745static void
3746mq_stream_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
3747{
3748 struct MQStreamState *mss = impl_state;
3749
3750 if (NULL != mss->rh)
3751 {
3752 GNUNET_STREAM_read_cancel (mss->rh);
3753 mss->rh = NULL;
3754 }
3755
3756 if (NULL != mss->wh)
3757 {
3758 GNUNET_STREAM_write_cancel (mss->wh);
3759 mss->wh = NULL;
3760 }
3761
3762 if (NULL != mss->mst)
3763 {
3764 GNUNET_SERVER_mst_destroy (mss->mst);
3765 mss->mst = NULL;
3766 }
3767
3768 GNUNET_free (mss);
3769}
3770
3771
3772
3773/**
3774 * Create a message queue for a stream socket.
3775 *
3776 * @param socket the socket to read/write in the message queue
3777 * @param msg_handlers message handler array
3778 * @param error_handler callback for errors
3779 * @param cls closure for message handlers and error handler
3780 * @return the message queue for the socket
3781 */
3782struct GNUNET_MQ_Handle *
3783GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket,
3784 const struct GNUNET_MQ_MessageHandler *msg_handlers,
3785 GNUNET_MQ_ErrorHandler error_handler,
3786 void *cls)
3787{
3788 struct GNUNET_MQ_Handle *mq;
3789 struct MQStreamState *mss;
3790
3791 mss = GNUNET_new (struct MQStreamState);
3792 mss->socket = socket;
3793 mq = GNUNET_MQ_queue_for_callbacks (mq_stream_send_impl,
3794 mq_stream_destroy_impl,
3795 NULL,
3796 mss, msg_handlers, error_handler, cls);
3797 if (NULL != msg_handlers)
3798 {
3799 mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);
3800 mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
3801 mq_stream_data_processor, mq);
3802 }
3803 return mq;
3804}
3805
3806/* end of stream_api.c */
diff --git a/src/stream/test_stream_2peers.c b/src/stream/test_stream_2peers.c
deleted file mode 100644
index 8d2e3ab63..000000000
--- a/src/stream/test_stream_2peers.c
+++ /dev/null
@@ -1,667 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file stream/test_stream_2peers.c
23 * @brief Stream API testing between 2 peers using testing API
24 * @author Sree Harsha Totakura
25 */
26
27#include <string.h>
28
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_mesh_service.h"
32#include "gnunet_stream_lib.h"
33#include "gnunet_testbed_service.h"
34
35/**
36 * Number of peers; Do NOT change this
37 */
38#define NUM_PEERS 2
39
40/**
41 * Shorthand for Relative time in seconds
42 */
43#define TIME_REL_SECS(sec) \
44 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
45
46/**
47 * Structure for holding peer's sockets and IO Handles
48 */
49struct PeerData
50{
51 /**
52 * Handle to testbed peer
53 */
54 struct GNUNET_TESTBED_Peer *peer;
55
56 /**
57 * Peer's stream socket
58 */
59 struct GNUNET_STREAM_Socket *socket;
60
61 /**
62 * Peer's io write handle
63 */
64 struct GNUNET_STREAM_WriteHandle *io_write_handle;
65
66 /**
67 * Peer's io read handle
68 */
69 struct GNUNET_STREAM_ReadHandle *io_read_handle;
70
71 /**
72 * Peer's shutdown handle
73 */
74 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
75
76 /**
77 * The service connect operation to stream
78 */
79 struct GNUNET_TESTBED_Operation *op;
80
81 /**
82 * Our Peer id
83 */
84 struct GNUNET_PeerIdentity our_id;
85
86 /**
87 * Bytes the peer has written
88 */
89 unsigned int bytes_wrote;
90
91 /**
92 * Byte the peer has read
93 */
94 unsigned int bytes_read;
95};
96
97
98/**
99 * Different states in test setup
100 */
101enum SetupState
102{
103 /**
104 * Get the identity of peer 1
105 */
106 PEER1_GET_IDENTITY,
107
108 /**
109 * Get the identity of peer 2
110 */
111 PEER2_GET_IDENTITY,
112
113 /**
114 * Connect to stream service of peer 1
115 */
116 PEER1_STREAM_CONNECT,
117
118 /**
119 * Connect to stream service of peer 2
120 */
121 PEER2_STREAM_CONNECT
122
123};
124
125/**
126 * Various states during test setup
127 */
128static enum SetupState setup_state;
129
130/**
131 * Data context for peer 1
132 */
133static struct PeerData peer1;
134
135/**
136 * Data context for peer 2
137 */
138static struct PeerData peer2;
139
140/**
141 * Testbed operation handle
142 */
143static struct GNUNET_TESTBED_Operation *op;
144
145static GNUNET_SCHEDULER_TaskIdentifier abort_task;
146
147static char *data = "ABCD";
148static int result;
149
150static int writing_success;
151static int reading_success;
152
153
154/**
155 * Input processor
156 *
157 * @param cls the closure from GNUNET_STREAM_write/read
158 * @param status the status of the stream at the time this function is called
159 * @param data traffic from the other side
160 * @param size the number of bytes available in data read
161 * @return number of bytes of processed from 'data' (any data remaining should be
162 * given to the next time the read processor is called).
163 */
164static size_t
165input_processor (void *cls,
166 enum GNUNET_STREAM_Status status,
167 const void *input_data,
168 size_t size);
169
170/**
171 * Task for calling STREAM_read
172 *
173 * @param cls the peer data entity
174 * @param tc the task context
175 */
176static void
177stream_read_task (void *cls,
178 const struct GNUNET_SCHEDULER_TaskContext *tc)
179{
180 struct PeerData *peer = cls;
181
182 peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
183 GNUNET_TIME_relative_multiply
184 (GNUNET_TIME_UNIT_SECONDS, 5),
185 &input_processor,
186 peer);
187 GNUNET_assert (NULL != peer->io_read_handle);
188}
189
190/**
191 * The write completion function; called upon writing some data to stream or
192 * upon error
193 *
194 * @param cls the closure from GNUNET_STREAM_write/read
195 * @param status the status of the stream at the time this function is called
196 * @param size the number of bytes read or written
197 */
198static void
199write_completion (void *cls,
200 enum GNUNET_STREAM_Status status,
201 size_t size);
202
203
204/**
205 * Task for calling STREAM_write
206 *
207 * @param cls the peer data entity
208 * @param tc the task context
209 */
210static void
211stream_write_task (void *cls,
212 const struct GNUNET_SCHEDULER_TaskContext *tc)
213{
214 struct PeerData *peer = cls;
215
216 peer->io_write_handle =
217 GNUNET_STREAM_write (peer->socket,
218 (void *) data,
219 strlen(data) - peer->bytes_wrote,
220 GNUNET_TIME_relative_multiply
221 (GNUNET_TIME_UNIT_SECONDS, 5),
222 &write_completion,
223 peer);
224
225 GNUNET_assert (NULL != peer->io_write_handle);
226 }
227
228
229/**
230 * Close sockets and stop testing deamons nicely
231 */
232static void
233do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
234{
235 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
236 GNUNET_SCHEDULER_cancel (abort_task);
237 if (NULL != peer1.socket)
238 GNUNET_STREAM_close (peer1.socket);
239 if (NULL != peer1.op)
240 GNUNET_TESTBED_operation_done (peer1.op);
241 else
242 GNUNET_SCHEDULER_shutdown (); /* For shutting down testbed */
243}
244
245
246/**
247 * Completion callback for shutdown
248 *
249 * @param cls the closure from GNUNET_STREAM_shutdown call
250 * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
251 * SHUT_RDWR)
252 */
253static void
254shutdown_completion (void *cls,
255 int operation)
256{
257 static int shutdowns;
258
259 if (++shutdowns == 1)
260 {
261 peer1.shutdown_handle = NULL;
262 peer2.shutdown_handle = GNUNET_STREAM_shutdown (peer2.socket, SHUT_RDWR,
263 &shutdown_completion, cls);
264 return;
265 }
266 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
267 GNUNET_SCHEDULER_add_now (&do_close, cls);
268}
269
270
271/**
272 * Shutdown sockets gracefully
273 */
274static void
275do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
276{
277 result = GNUNET_OK;
278 peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, SHUT_RDWR,
279 &shutdown_completion, cls);
280}
281
282
283/**
284 * Something went wrong and timed out. Kill everything and set error flag
285 */
286static void
287do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
288{
289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
290 result = GNUNET_SYSERR;
291 abort_task = 0;
292 do_close (cls, tc);
293}
294
295
296/**
297 * The write completion function; called upon writing some data to stream or
298 * upon error
299 *
300 * @param cls the closure from GNUNET_STREAM_write/read
301 * @param status the status of the stream at the time this function is called
302 * @param size the number of bytes read or written
303 */
304static void
305write_completion (void *cls,
306 enum GNUNET_STREAM_Status status,
307 size_t size)
308{
309 struct PeerData *peer=cls;
310
311 GNUNET_assert (GNUNET_STREAM_OK == status);
312 GNUNET_assert (size <= strlen (data));
313 peer->bytes_wrote += size;
314
315 if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
316 {
317 GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
318 }
319 else
320 {
321 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
322 "Writing completed\n");
323
324 if (&peer2 == peer) /* Peer1 has finished writing; should read now */
325 {
326 peer->bytes_read = 0;
327 GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
328 }
329 else
330 {
331 writing_success = GNUNET_YES;
332 if (GNUNET_YES == reading_success)
333 GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
334 }
335 }
336}
337
338
339/**
340 * Function executed after stream has been established
341 *
342 * @param cls the closure from GNUNET_STREAM_open
343 * @param socket socket to use to communicate with the other side (read/write)
344 */
345static void
346stream_open_cb (void *cls,
347 struct GNUNET_STREAM_Socket *socket)
348{
349 struct PeerData *peer=cls;
350
351 GNUNET_assert (&peer2 == peer);
352 GNUNET_assert (socket == peer2.socket);
353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Stream established from peer2\n",
354 GNUNET_i2s (&peer1.our_id));
355 peer->bytes_wrote = 0;
356 GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
357}
358
359
360/**
361 * Input processor
362 *
363 * @param cls the closure from GNUNET_STREAM_write/read
364 * @param status the status of the stream at the time this function is called
365 * @param data traffic from the other side
366 * @param size the number of bytes available in data read
367 * @return number of bytes of processed from 'data' (any data remaining should be
368 * given to the next time the read processor is called).
369 */
370static size_t
371input_processor (void *cls,
372 enum GNUNET_STREAM_Status status,
373 const void *input_data,
374 size_t size)
375{
376 struct PeerData *peer;
377
378 peer = (struct PeerData *) cls;
379
380 if (GNUNET_STREAM_TIMEOUT == status)
381 {
382 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
383 "Read operation timedout - reading again!\n");
384 GNUNET_assert (0 == size);
385 GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
386 return 0;
387 }
388
389 GNUNET_assert (GNUNET_STREAM_OK == status);
390 GNUNET_assert (size <= strlen (data));
391 GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read,
392 (const char *) input_data,
393 size));
394 peer->bytes_read += size;
395
396 if (peer->bytes_read < strlen (data))
397 {
398 GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
399 }
400 else
401 {
402 if (&peer1 == peer) /* Peer2 has completed reading; should write */
403 {
404 peer->bytes_wrote = 0;
405 GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
406 }
407 else /* Peer1 has completed reading. End of tests */
408 {
409 reading_success = GNUNET_YES;
410 if (GNUNET_YES == writing_success)
411 GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
412 }
413 }
414 return size;
415}
416
417
418/**
419 * Functions of this type are called upon new stream connection from other peers
420 *
421 * @param cls the closure from GNUNET_STREAM_listen
422 * @param socket the socket representing the stream
423 * @param initiator the identity of the peer who wants to establish a stream
424 * with us
425 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
426 * stream (the socket will be invalid after the call)
427 */
428static int
429stream_listen_cb (void *cls,
430 struct GNUNET_STREAM_Socket *socket,
431 const struct GNUNET_PeerIdentity *initiator)
432{
433 if ((NULL == socket) || (NULL == initiator))
434 {
435 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
436 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
437 GNUNET_SCHEDULER_cancel (abort_task);
438 abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
439 return GNUNET_OK;
440 }
441 GNUNET_assert (NULL != initiator);
442 GNUNET_assert (socket != peer2.socket);
443 GNUNET_assert (0 == memcmp (initiator, &peer2.our_id,
444 sizeof (struct GNUNET_PeerIdentity)));
445 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Peer connected: %s\n",
446 GNUNET_i2s (&peer1.our_id), GNUNET_i2s (initiator));
447 peer1.socket = socket;
448 peer1.bytes_read = 0;
449 GNUNET_SCHEDULER_add_now (&stream_read_task, &peer1);
450 return GNUNET_OK;
451}
452
453
454/**
455 * Listen success callback; connects a peer to stream as client
456 */
457static void stream_connect (void);
458
459
460/**
461 * Adapter function called to destroy a connection to
462 * a service.
463 *
464 * @param cls closure
465 * @param op_result service handle returned from the connect adapter
466 */
467static void
468stream_da (void *cls, void *op_result)
469{
470 struct GNUNET_STREAM_ListenSocket *lsocket;
471 struct GNUNET_STREAM_Socket *socket;
472
473 if (&peer1 == cls)
474 {
475 lsocket = op_result;
476 GNUNET_STREAM_listen_close (lsocket);
477 if (NULL != peer2.op)
478 GNUNET_TESTBED_operation_done (peer2.op);
479 else
480 GNUNET_SCHEDULER_shutdown ();
481 return;
482 }
483 if (&peer2 == cls)
484 {
485 socket = op_result;
486 GNUNET_STREAM_close (socket);
487 GNUNET_SCHEDULER_shutdown (); /* Exit point of the test */
488 return;
489 }
490 GNUNET_assert (0);
491}
492
493
494/**
495 * Adapter function called to establish a connection to
496 * a service.
497 *
498 * @param cls closure
499 * @param cfg configuration of the peer to connect to; will be available until
500 * GNUNET_TESTBED_operation_done() is called on the operation returned
501 * from GNUNET_TESTBED_service_connect()
502 * @return service handle to return in 'op_result', NULL on error
503 */
504static void *
505stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
506{
507 struct GNUNET_STREAM_ListenSocket *lsocket;
508
509 switch (setup_state)
510 {
511 case PEER1_STREAM_CONNECT:
512 lsocket = GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, NULL,
513 GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
514 &stream_connect, GNUNET_STREAM_OPTION_END);
515 return lsocket;
516 case PEER2_STREAM_CONNECT:
517 peer2.socket = GNUNET_STREAM_open (cfg, &peer1.our_id, 10, &stream_open_cb,
518 &peer2, GNUNET_STREAM_OPTION_END);
519 return peer2.socket;
520 default:
521 GNUNET_assert (0);
522 }
523}
524
525
526/**
527 * Listen success callback; connects a peer to stream as client
528 */
529static void
530stream_connect (void)
531{
532 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n");
533 peer2.op = GNUNET_TESTBED_service_connect (&peer2, peer2.peer, "stream",
534 NULL, NULL,
535 stream_ca, stream_da, &peer2);
536 setup_state = PEER2_STREAM_CONNECT;
537}
538
539
540/**
541 * Callback to be called when the requested peer information is available
542 *
543 * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
544 * @param op the operation this callback corresponds to
545 * @param pinfo the result; will be NULL if the operation has failed
546 * @param emsg error message if the operation has failed; will be NULL if the
547 * operation is successfull
548 */
549static void
550peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op_,
551 const struct GNUNET_TESTBED_PeerInformation *pinfo,
552 const char *emsg)
553{
554 GNUNET_assert (NULL == emsg);
555 GNUNET_assert (op == op_);
556 switch (setup_state)
557 {
558 case PEER1_GET_IDENTITY:
559 memcpy (&peer1.our_id, pinfo->result.id,
560 sizeof (struct GNUNET_PeerIdentity));
561 GNUNET_TESTBED_operation_done (op);
562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n", GNUNET_i2s
563 (&peer1.our_id));
564 op = GNUNET_TESTBED_peer_get_information (peer2.peer,
565 GNUNET_TESTBED_PIT_IDENTITY,
566 &peerinfo_cb, NULL);
567 setup_state = PEER2_GET_IDENTITY;
568 break;
569 case PEER2_GET_IDENTITY:
570 memcpy (&peer2.our_id, pinfo->result.id,
571 sizeof (struct GNUNET_PeerIdentity));
572 GNUNET_TESTBED_operation_done (op);
573 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n", GNUNET_i2s
574 (&peer2.our_id));
575 peer1.op = GNUNET_TESTBED_service_connect (&peer1, peer1.peer, "stream",
576 NULL, NULL, stream_ca,
577 stream_da, &peer1);
578 setup_state = PEER1_STREAM_CONNECT;
579 break;
580 default:
581 GNUNET_assert (0);
582 }
583}
584
585
586/**
587 * Controller event callback
588 *
589 * @param cls NULL
590 * @param event the controller event
591 */
592static void
593controller_event_cb (void *cls,
594 const struct GNUNET_TESTBED_EventInformation *event)
595{
596 switch (event->type)
597 {
598 case GNUNET_TESTBED_ET_OPERATION_FINISHED:
599 switch (setup_state)
600 {
601 case PEER1_STREAM_CONNECT:
602 case PEER2_STREAM_CONNECT:
603 GNUNET_assert (NULL == event->details.operation_finished.emsg);
604 break;
605 default:
606 GNUNET_assert (0);
607 }
608 break;
609 default:
610 GNUNET_assert (0);
611 }
612}
613
614
615/**
616 * Signature of a main function for a testcase.
617 *
618 * @param cls closure
619 * @param num_peers number of peers in 'peers'
620 * @param peers handle to peers run in the testbed
621 * @param links_succeeded the number of overlay link connection attempts that
622 * succeeded
623 * @param links_failed the number of overlay link connection attempts that
624 * failed
625 */
626static void
627test_master (void *cls, unsigned int num_peers,
628 struct GNUNET_TESTBED_Peer **peers,
629 unsigned int links_succeeded,
630 unsigned int links_failed)
631{
632 GNUNET_assert (NULL != peers);
633 GNUNET_assert (NULL != peers[0]);
634 GNUNET_assert (NULL != peers[1]);
635 peer1.peer = peers[0];
636 peer2.peer = peers[1];
637 /* Get the peer identity and configuration of peers */
638 op = GNUNET_TESTBED_peer_get_information (peer1.peer,
639 GNUNET_TESTBED_PIT_IDENTITY,
640 &peerinfo_cb, NULL);
641 setup_state = PEER1_GET_IDENTITY;
642 abort_task =
643 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
644 (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
645 NULL);
646}
647
648
649/**
650 * Main function
651 */
652int main (int argc, char **argv)
653{
654 uint64_t event_mask;
655
656 result = GNUNET_NO;
657 event_mask = 0;
658 event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
659 (void) GNUNET_TESTBED_test_run ("test_stream_2peers",
660 "test_stream_local.conf",
661 NUM_PEERS, event_mask, &controller_event_cb,
662 NULL,
663 &test_master, NULL);
664 if (GNUNET_SYSERR == result)
665 return 1;
666 return 0;
667}
diff --git a/src/stream/test_stream_2peers_halfclose.c b/src/stream/test_stream_2peers_halfclose.c
deleted file mode 100644
index 74fae30a4..000000000
--- a/src/stream/test_stream_2peers_halfclose.c
+++ /dev/null
@@ -1,890 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file stream/test_stream_2peers_halfclose.c
23 * @brief Testcases for Stream API halfclosed connections between 2 peers
24 * @author Sree Harsha Totakura
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_testbed_service.h"
29#include "gnunet_mesh_service.h"
30#include "gnunet_stream_lib.h"
31
32/**
33 * Number of peers
34 */
35#define NUM_PEERS 2
36
37#define TIME_REL_SECS(sec) \
38 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
39
40/**
41 * Structure for holding peer's sockets and IO Handles
42 */
43struct PeerData
44{
45 /**
46 * The testbed peer handle corresponding to this peer
47 */
48 struct GNUNET_TESTBED_Peer *peer;
49
50 /**
51 * Peer's stream socket
52 */
53 struct GNUNET_STREAM_Socket *socket;
54
55 /**
56 * Peer's io write handle
57 */
58 struct GNUNET_STREAM_WriteHandle *io_write_handle;
59
60 /**
61 * Peer's io read handle
62 */
63 struct GNUNET_STREAM_ReadHandle *io_read_handle;
64
65 /**
66 * Peer's shutdown handle
67 */
68 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
69
70 /**
71 * Testbed operation handle specific for this peer
72 */
73 struct GNUNET_TESTBED_Operation *op;
74
75 /**
76 * Our Peer id
77 */
78 struct GNUNET_PeerIdentity our_id;
79
80 /**
81 * Bytes the peer has written
82 */
83 unsigned int bytes_wrote;
84
85 /**
86 * Byte the peer has read
87 */
88 unsigned int bytes_read;
89
90 /**
91 * GNUNET_YES if the peer has successfully completed the current test
92 */
93 unsigned int test_ok;
94
95 /**
96 * The shutdown operation that has to be used by the stream_shutdown_task
97 */
98 int shutdown_operation;
99};
100
101
102/**
103 * Enumeration for various tests that are to be passed in the same order as
104 * below
105 */
106enum Test
107{
108 /**
109 * Peer1 writing; Peer2 reading
110 */
111 PEER1_WRITE,
112
113 /**
114 * Peer1 write shutdown; Peer2 should get an error when it tries to read;
115 */
116 PEER1_WRITE_SHUTDOWN,
117
118 /**
119 * Peer1 reads; Peer2 writes (connection is halfclosed)
120 */
121 PEER1_HALFCLOSE_READ,
122
123 /**
124 * Peer1 attempts to write; Should fail with stream already shutdown error
125 */
126 PEER1_HALFCLOSE_WRITE_FAIL,
127
128 /**
129 * Peer1 read shutdown; Peer2 should get stream shutdown error during write
130 */
131 PEER1_READ_SHUTDOWN,
132
133 /**
134 * All tests successfully finished
135 */
136 SUCCESS
137};
138
139
140/**
141 * Different states in test setup
142 */
143enum SetupState
144{
145 /**
146 * Get the identity of peer 1
147 */
148 PEER1_GET_IDENTITY,
149
150 /**
151 * Get the identity of peer 2
152 */
153 PEER2_GET_IDENTITY,
154
155 /**
156 * Connect to stream service of peer 2
157 */
158 PEER2_STREAM_CONNECT,
159
160 /**
161 * Connect to stream service of peer 1
162 */
163 PEER1_STREAM_CONNECT
164
165};
166
167
168/**
169 * Peer1 writes first and then calls for SHUT_WR
170 * Peer2 reads first and then calls for SHUT_RD
171 * Attempt to write again by Peer1 should be rejected
172 * Attempt to read again by Peer2 should be rejected
173 * Peer1 then reads from Peer2 which writes
174 */
175static struct PeerData peer1;
176static struct PeerData peer2;
177
178/**
179 * Task for aborting the test case if it takes too long
180 */
181static GNUNET_SCHEDULER_TaskIdentifier abort_task;
182
183/**
184 * Task for reading from stream
185 */
186static GNUNET_SCHEDULER_TaskIdentifier read_task;
187
188static char *data = "ABCD";
189
190/**
191 * Handle to testbed operation
192 */
193struct GNUNET_TESTBED_Operation *op;
194
195/**
196 * Final testing result
197 */
198static int result;
199
200/**
201 * Current running test
202 */
203enum Test current_test;
204
205/**
206 * State is test setup
207 */
208enum SetupState setup_state;
209
210
211/**
212 * Input processor
213 *
214 * @param cls the closure from GNUNET_STREAM_write/read
215 * @param status the status of the stream at the time this function is called
216 * @param data traffic from the other side
217 * @param size the number of bytes available in data read
218 * @return number of bytes of processed from 'data' (any data remaining should be
219 * given to the next time the read processor is called).
220 */
221static size_t
222input_processor (void *cls,
223 enum GNUNET_STREAM_Status status,
224 const void *input_data,
225 size_t size);
226
227
228/**
229 * The transition function; responsible for the transitions among tests
230 */
231static void
232transition();
233
234
235/**
236 * Task for calling STREAM_read
237 *
238 * @param cls the peer data entity
239 * @param tc the task context
240 */
241static void
242stream_read_task (void *cls,
243 const struct GNUNET_SCHEDULER_TaskContext *tc)
244{
245 struct PeerData *peer = cls;
246
247 peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
248 GNUNET_TIME_relative_multiply
249 (GNUNET_TIME_UNIT_SECONDS, 5),
250 &input_processor,
251 cls);
252 switch (current_test)
253 {
254 case PEER1_WRITE_SHUTDOWN:
255 GNUNET_assert (&peer2 == peer);
256 GNUNET_assert (NULL == peer->io_read_handle);
257 peer2.test_ok = GNUNET_YES;
258 transition (); /* to PEER1_HALFCLOSE_READ */
259 break;
260 default:
261 GNUNET_assert (NULL != peer->io_read_handle);
262 }
263}
264
265
266/**
267 * The write completion function; called upon writing some data to stream or
268 * upon error
269 *
270 * @param cls the closure from GNUNET_STREAM_write/read
271 * @param status the status of the stream at the time this function is called
272 * @param size the number of bytes read or written
273 */
274static void
275write_completion (void *cls,
276 enum GNUNET_STREAM_Status status,
277 size_t size);
278
279
280/**
281 * Task for calling STREAM_write
282 *
283 * @param cls the peer data entity
284 * @param tc the task context
285 */
286static void
287stream_write_task (void *cls,
288 const struct GNUNET_SCHEDULER_TaskContext *tc)
289{
290 struct PeerData *peer = cls;
291
292 peer->io_write_handle =
293 GNUNET_STREAM_write (peer->socket,
294 (void *) data,
295 strlen(data) - peer->bytes_wrote,
296 GNUNET_TIME_relative_multiply
297 (GNUNET_TIME_UNIT_SECONDS, 5),
298 &write_completion,
299 peer);
300 switch (current_test)
301 {
302 case PEER1_HALFCLOSE_WRITE_FAIL:
303 GNUNET_assert (&peer1 == peer);
304 GNUNET_assert (NULL == peer->io_write_handle);
305 transition(); /* To PEER1_READ_SHUTDOWN */
306 break;
307 case PEER1_READ_SHUTDOWN:
308 GNUNET_assert (&peer2 == peer);
309 GNUNET_assert (NULL == peer->io_write_handle);
310 transition (); /* To SUCCESS */
311 break;
312 default:
313 GNUNET_assert (NULL != peer->io_write_handle);
314 }
315}
316
317
318/**
319 * Close sockets and stop testing deamons nicely
320 */
321static void
322do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
323{
324 if (NULL != peer2.socket)
325 GNUNET_STREAM_close (peer2.socket);
326 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
327 GNUNET_SCHEDULER_cancel (abort_task);
328 if (NULL != peer2.op)
329 GNUNET_TESTBED_operation_done (peer2.op);
330 else
331 GNUNET_SCHEDULER_shutdown (); /* For shutting down testbed */
332}
333
334
335/**
336 * Completion callback for shutdown
337 *
338 * @param cls the closure from GNUNET_STREAM_shutdown call
339 * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
340 * SHUT_RDWR)
341 */
342void
343shutdown_completion (void *cls,
344 int operation)
345{
346 switch (current_test)
347 {
348 case PEER1_WRITE:
349 GNUNET_assert (0);
350 case PEER1_WRITE_SHUTDOWN:
351 GNUNET_assert (cls == &peer1);
352 GNUNET_assert (SHUT_WR == operation);
353 peer1.test_ok = GNUNET_YES;
354 /* Peer2 should read with error */
355 peer2.bytes_read = 0;
356 GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
357 break;
358 case PEER1_READ_SHUTDOWN:
359 peer1.test_ok = GNUNET_YES;
360 peer2.bytes_wrote = 0;
361 GNUNET_SCHEDULER_add_now (&stream_write_task, &peer2);
362 break;
363 case PEER1_HALFCLOSE_READ:
364 case PEER1_HALFCLOSE_WRITE_FAIL:
365 case SUCCESS:
366 GNUNET_assert (0); /* We shouldn't reach here */
367 }
368}
369
370
371/**
372 * Task for calling STREAM_shutdown
373 *
374 * @param cls the peer entity
375 * @param tc the TaskContext
376 */
377static void
378stream_shutdown_task (void *cls,
379 const struct GNUNET_SCHEDULER_TaskContext *tc)
380{
381 struct PeerData *peer = cls;
382
383 peer->shutdown_handle = GNUNET_STREAM_shutdown (peer->socket,
384 peer->shutdown_operation,
385 &shutdown_completion,
386 peer);
387 GNUNET_assert (NULL != peer->shutdown_handle);
388}
389
390
391/**
392 * Something went wrong and timed out. Kill everything and set error flag
393 */
394static void
395do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
396{
397 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
398 if (0 != read_task)
399 {
400 GNUNET_SCHEDULER_cancel (read_task);
401 }
402 result = GNUNET_SYSERR;
403 abort_task = 0;
404 do_close (cls, tc);
405}
406
407
408/**
409 * The transition function; responsible for the transitions among tests
410 */
411static void
412transition()
413{
414 if ((GNUNET_YES == peer1.test_ok) && (GNUNET_YES == peer2.test_ok))
415 {
416 peer1.test_ok = GNUNET_NO;
417 peer2.test_ok = GNUNET_NO;
418 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
419 "TEST %d SUCCESSFULL\n", current_test);
420 switch (current_test)
421 {
422 case PEER1_WRITE:
423 current_test = PEER1_WRITE_SHUTDOWN;
424 /* Peer1 should shutdown writing */
425 peer1.shutdown_operation = SHUT_WR;
426 GNUNET_SCHEDULER_add_now (&stream_shutdown_task, &peer1);
427 break;
428 case PEER1_WRITE_SHUTDOWN:
429 current_test = PEER1_HALFCLOSE_READ;
430 /* Peer2 should be able to write successfully */
431 peer2.bytes_wrote = 0;
432 GNUNET_SCHEDULER_add_now (&stream_write_task, &peer2);
433
434 /* Peer1 should be able to read successfully */
435 peer1.bytes_read = 0;
436 GNUNET_SCHEDULER_add_now (&stream_read_task, &peer1);
437 break;
438 case PEER1_HALFCLOSE_READ:
439 current_test = PEER1_HALFCLOSE_WRITE_FAIL;
440 peer1.bytes_wrote = 0;
441 peer2.bytes_read = 0;
442 peer2.test_ok = GNUNET_YES;
443 GNUNET_SCHEDULER_add_now (&stream_write_task, &peer1);
444 break;
445 case PEER1_HALFCLOSE_WRITE_FAIL:
446 current_test = PEER1_READ_SHUTDOWN;
447 peer1.shutdown_operation = SHUT_RD;
448 GNUNET_SCHEDULER_add_now (&stream_shutdown_task, &peer1);
449 break;
450 case PEER1_READ_SHUTDOWN:
451 current_test = SUCCESS;
452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453 "All tests successful\n");
454 GNUNET_SCHEDULER_add_now (&do_close, NULL);
455 break;
456 case SUCCESS:
457 GNUNET_assert (0); /* We shouldn't reach here */
458
459 }
460 }
461}
462
463/**
464 * The write completion function; called upon writing some data to stream or
465 * upon error
466 *
467 * @param cls the closure from GNUNET_STREAM_write/read
468 * @param status the status of the stream at the time this function is called
469 * @param size the number of bytes read or written
470 */
471static void
472write_completion (void *cls,
473 enum GNUNET_STREAM_Status status,
474 size_t size)
475{
476 struct PeerData *peer = cls;
477
478 switch (current_test)
479 {
480 case PEER1_WRITE:
481 case PEER1_HALFCLOSE_READ:
482
483 GNUNET_assert (GNUNET_STREAM_OK == status);
484 GNUNET_assert (size <= strlen (data));
485 peer->bytes_wrote += size;
486
487 if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
488 {
489 GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
490 }
491 else
492 {
493 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
494 "Writing completed\n");
495
496 if (&peer1 == peer)
497 {
498 peer1.test_ok = GNUNET_YES;
499 transition (); /* to PEER1_WRITE_SHUTDOWN */
500 }
501 else /* This will happen during PEER1_HALFCLOSE_READ */
502 {
503 peer2.test_ok = GNUNET_YES;
504 transition (); /* to PEER1_HALFCLOSE_WRITE_FAIL */
505 }
506 }
507 break;
508 case PEER1_HALFCLOSE_WRITE_FAIL:
509 GNUNET_assert (peer == &peer1);
510 GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status);
511 GNUNET_assert (0 == size);
512 peer1.test_ok = GNUNET_YES;
513 break;
514 case PEER1_READ_SHUTDOWN:
515 GNUNET_assert (peer == &peer2);
516 GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status);
517 GNUNET_assert (0 == size);
518 peer2.test_ok = GNUNET_YES;
519 break;
520 case PEER1_WRITE_SHUTDOWN:
521 case SUCCESS:
522 GNUNET_assert (0); /* We shouldn't reach here */
523 }
524}
525
526
527/**
528 * Function executed after stream has been established
529 *
530 * @param cls the closure from GNUNET_STREAM_open
531 * @param socket socket to use to communicate with the other side (read/write)
532 */
533static void
534stream_open_cb (void *cls,
535 struct GNUNET_STREAM_Socket *socket)
536{
537 struct PeerData *peer;
538
539 GNUNET_assert (socket == peer1.socket);
540 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
541 "%s: Stream established from peer1\n",
542 GNUNET_i2s (&peer1.our_id));
543 peer = (struct PeerData *) cls;
544 peer->bytes_wrote = 0;
545 GNUNET_assert (socket == peer1.socket);
546 GNUNET_assert (socket == peer->socket);
547 peer1.test_ok = GNUNET_NO;
548 peer2.test_ok = GNUNET_NO;
549 current_test = PEER1_WRITE;
550 GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
551}
552
553
554/**
555 * Input processor
556 *
557 * @param cls the closure from GNUNET_STREAM_write/read
558 * @param status the status of the stream at the time this function is called
559 * @param data traffic from the other side
560 * @param size the number of bytes available in data read
561 * @return number of bytes of processed from 'data' (any data remaining should be
562 * given to the next time the read processor is called).
563 */
564static size_t
565input_processor (void *cls,
566 enum GNUNET_STREAM_Status status,
567 const void *input_data,
568 size_t size)
569{
570 struct PeerData *peer;
571
572 peer = (struct PeerData *) cls;
573
574 switch (current_test)
575 {
576 case PEER1_WRITE:
577 case PEER1_HALFCLOSE_READ:
578 if (GNUNET_STREAM_TIMEOUT == status)
579 {
580 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
581 "Read operation timedout - reading again!\n");
582 GNUNET_assert (0 == size);
583 GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
584 return 0;
585 }
586
587 GNUNET_assert (GNUNET_STREAM_OK == status);
588 GNUNET_assert (size <= strlen (data));
589 GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read,
590 (const char *) input_data,
591 size));
592 peer->bytes_read += size;
593
594 if (peer->bytes_read < strlen (data))
595 {
596 GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
597 }
598 else
599 {
600 if (&peer2 == peer) /* Peer2 has completed reading; should write */
601 {
602 peer2.test_ok = GNUNET_YES;
603 transition (); /* Transition to PEER1_WRITE_SHUTDOWN */
604 }
605 else /* Peer1 has completed reading. End of tests */
606 {
607 peer1.test_ok = GNUNET_YES;
608 transition (); /* to PEER1_HALFCLOSE_WRITE_FAIL */
609 }
610 }
611 break;
612 case PEER1_WRITE_SHUTDOWN:
613 GNUNET_assert (0); /* This callback will not be called when stream
614 is shutdown */
615 break;
616 case PEER1_HALFCLOSE_WRITE_FAIL:
617 case PEER1_READ_SHUTDOWN:
618 case SUCCESS:
619 GNUNET_assert (0); /* We shouldn't reach here */
620 }
621
622 return size;
623}
624
625
626/**
627 * Scheduler call back; to be executed when a new stream is connected
628 * Called from listen connect for peer2
629 */
630static void
631stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
632{
633 read_task = GNUNET_SCHEDULER_NO_TASK;
634 GNUNET_assert (NULL != cls);
635 peer2.bytes_read = 0;
636 GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
637}
638
639
640/**
641 * Functions of this type are called upon new stream connection from other peers
642 *
643 * @param cls the closure from GNUNET_STREAM_listen
644 * @param socket the socket representing the stream
645 * @param initiator the identity of the peer who wants to establish a stream
646 * with us
647 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
648 * stream (the socket will be invalid after the call)
649 */
650static int
651stream_listen_cb (void *cls,
652 struct GNUNET_STREAM_Socket *socket,
653 const struct GNUNET_PeerIdentity *initiator)
654{
655 if ((NULL == socket) || (NULL == initiator))
656 {
657 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
658 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
659 GNUNET_SCHEDULER_cancel (abort_task);
660 abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
661 return GNUNET_OK;
662 }
663 GNUNET_assert (socket != peer1.socket);
664 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
665 "%s: Peer connected: %s\n",
666 GNUNET_i2s (&peer2.our_id),
667 GNUNET_i2s(initiator));
668 peer2.socket = socket;
669 /* FIXME: reading should be done right now instead of a scheduled call */
670 read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
671 return GNUNET_OK;
672}
673
674
675/**
676 * Listen success callback; connects a peer to stream as client
677 */
678static void
679stream_connect (void);
680
681
682/**
683 * Adapter function called to destroy a connection to
684 * a service.
685 *
686 * @param cls closure
687 * @param op_result service handle returned from the connect adapter
688 */
689static void
690stream_da (void *cls, void *op_result)
691{
692 struct GNUNET_STREAM_ListenSocket *lsocket;
693
694 if (&peer2 == cls)
695 {
696 lsocket = op_result;
697 GNUNET_STREAM_listen_close (lsocket);
698 if (NULL != peer1.op)
699 GNUNET_TESTBED_operation_done (peer1.op);
700 else
701 GNUNET_SCHEDULER_shutdown ();
702 return;
703 }
704 if (&peer1 == cls)
705 {
706 GNUNET_assert (op_result == peer1.socket);
707 GNUNET_STREAM_close (peer1.socket);
708 GNUNET_SCHEDULER_shutdown (); /* Exit point of the test */
709 return;
710 }
711 GNUNET_assert (0);
712}
713
714
715/**
716 * Adapter function called to establish a connection to
717 * a service.
718 *
719 * @param cls closure
720 * @param cfg configuration of the peer to connect to; will be available until
721 * GNUNET_TESTBED_operation_done() is called on the operation returned
722 * from GNUNET_TESTBED_service_connect()
723 * @return service handle to return in 'op_result', NULL on error
724 */
725static void *
726stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
727{
728 struct GNUNET_STREAM_ListenSocket *lsocket;
729
730 switch (setup_state)
731 {
732 case PEER2_STREAM_CONNECT:
733 lsocket = GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, NULL,
734 GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
735 &stream_connect, GNUNET_STREAM_OPTION_END);
736 GNUNET_assert (NULL != lsocket);
737 return lsocket;
738 case PEER1_STREAM_CONNECT:
739 peer1.socket = GNUNET_STREAM_open (cfg, &peer2.our_id, 10, &stream_open_cb,
740 &peer1, GNUNET_STREAM_OPTION_END);
741 GNUNET_assert (NULL != peer1.socket);
742 return peer1.socket;
743 default:
744 GNUNET_assert (0);
745 }
746}
747
748
749/**
750 * Listen success callback; connects a peer to stream as client
751 */
752static void
753stream_connect (void)
754{
755 GNUNET_assert (PEER2_STREAM_CONNECT == setup_state);
756 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n");
757 peer1.op = GNUNET_TESTBED_service_connect (&peer1, peer1.peer, "stream",
758 NULL, NULL,
759 stream_ca, stream_da, &peer1);
760 setup_state = PEER1_STREAM_CONNECT;
761}
762
763
764/**
765 * Callback to be called when the requested peer information is available
766 *
767 * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
768 * @param op the operation this callback corresponds to
769 * @param pinfo the result; will be NULL if the operation has failed
770 * @param emsg error message if the operation has failed; will be NULL if the
771 * operation is successfull
772 */
773static void
774peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op_,
775 const struct GNUNET_TESTBED_PeerInformation *pinfo,
776 const char *emsg)
777{
778 GNUNET_assert (NULL == emsg);
779 GNUNET_assert (op == op_);
780 switch (setup_state)
781 {
782 case PEER1_GET_IDENTITY:
783 memcpy (&peer1.our_id, pinfo->result.id,
784 sizeof (struct GNUNET_PeerIdentity));
785 GNUNET_TESTBED_operation_done (op);
786 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n", GNUNET_i2s
787 (&peer1.our_id));
788 op = GNUNET_TESTBED_peer_get_information (peer2.peer,
789 GNUNET_TESTBED_PIT_IDENTITY,
790 &peerinfo_cb, NULL);
791 setup_state = PEER2_GET_IDENTITY;
792 break;
793 case PEER2_GET_IDENTITY:
794 memcpy (&peer2.our_id, pinfo->result.id,
795 sizeof (struct GNUNET_PeerIdentity));
796 GNUNET_TESTBED_operation_done (op);
797 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n", GNUNET_i2s
798 (&peer2.our_id));
799 peer2.op = GNUNET_TESTBED_service_connect (&peer2, peer2.peer, "stream",
800 NULL, NULL,
801 stream_ca, stream_da, &peer2);
802 setup_state = PEER2_STREAM_CONNECT;
803 break;
804 default:
805 GNUNET_assert (0);
806 }
807}
808
809
810/**
811 * Controller event callback
812 *
813 * @param cls NULL
814 * @param event the controller event
815 */
816static void
817controller_event_cb (void *cls,
818 const struct GNUNET_TESTBED_EventInformation *event)
819{
820 switch (event->type)
821 {
822 case GNUNET_TESTBED_ET_OPERATION_FINISHED:
823 switch (setup_state)
824 {
825 case PEER1_STREAM_CONNECT:
826 case PEER2_STREAM_CONNECT:
827 GNUNET_assert (NULL == event->details.operation_finished.emsg);
828 break;
829 default:
830 GNUNET_assert (0);
831 }
832 break;
833 default:
834 GNUNET_assert (0);
835 }
836}
837
838
839/**
840 * Signature of a main function for a testcase.
841 *
842 * @param cls closure
843 * @param num_peers number of peers in 'peers'
844 * @param peers handle to peers run in the testbed
845 * @param links_succeeded the number of overlay link connection attempts that
846 * succeeded
847 * @param links_failed the number of overlay link connection attempts that
848 * failed
849 */
850static void
851test_master (void *cls, unsigned int num_peers,
852 struct GNUNET_TESTBED_Peer **peers,
853 unsigned int links_succeeded,
854 unsigned int links_failed)
855{
856 GNUNET_assert (NULL != peers);
857 GNUNET_assert (NULL != peers[0]);
858 GNUNET_assert (NULL != peers[1]);
859 peer1.peer = peers[0];
860 peer2.peer = peers[1];
861 op = GNUNET_TESTBED_peer_get_information (peer1.peer,
862 GNUNET_TESTBED_PIT_IDENTITY,
863 &peerinfo_cb, NULL);
864 setup_state = PEER1_GET_IDENTITY;
865 abort_task =
866 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
867 (GNUNET_TIME_UNIT_SECONDS, 1000), &do_abort,
868 NULL);
869}
870
871
872/**
873 * Main function
874 */
875int main (int argc, char **argv)
876{
877 uint64_t event_mask;
878
879 result = GNUNET_NO;
880 event_mask = 0;
881 event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
882 (void) GNUNET_TESTBED_test_run ("test_stream_2peers_halfclose",
883 "test_stream_local.conf", NUM_PEERS,
884 event_mask,
885 &controller_event_cb, NULL, &test_master,
886 NULL);
887 if (GNUNET_SYSERR == result)
888 return 1;
889 return 0;
890}
diff --git a/src/stream/test_stream_big.c b/src/stream/test_stream_big.c
deleted file mode 100644
index a60d2ce95..000000000
--- a/src/stream/test_stream_big.c
+++ /dev/null
@@ -1,429 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file stream/test_stream_big.c
23 * @brief large data transfer using stream API between local peers
24 * @author Sree Harsha Totakura
25 */
26
27#include <string.h>
28
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_stream_lib.h"
32#include "gnunet_testing_lib.h"
33
34#define LOG(kind, ...) \
35 GNUNET_log (kind, __VA_ARGS__);
36
37#define TIME_REL_SECS(sec) \
38 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
39
40
41/**
42 * Structure for holding peer's sockets and IO Handles
43 */
44struct PeerData
45{
46 /**
47 * Peer's stream socket
48 */
49 struct GNUNET_STREAM_Socket *socket;
50
51 struct GNUNET_PeerIdentity self;
52
53 /**
54 * Peer's io write handle
55 */
56 struct GNUNET_STREAM_WriteHandle *io_write_handle;
57
58 /**
59 * Peer's io read handle
60 */
61 struct GNUNET_STREAM_ReadHandle *io_read_handle;
62
63 /**
64 * Peer's shutdown handle
65 */
66 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
67
68 /**
69 * Bytes the peer has written
70 */
71 unsigned int bytes_wrote;
72
73 /**
74 * Byte the peer has read
75 */
76 unsigned int bytes_read;
77};
78
79static struct PeerData peer1;
80static struct PeerData peer2;
81static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
82static const struct GNUNET_CONFIGURATION_Handle *config;
83
84static GNUNET_SCHEDULER_TaskIdentifier abort_task;
85static GNUNET_SCHEDULER_TaskIdentifier read_task;
86static GNUNET_SCHEDULER_TaskIdentifier write_task;
87
88#define DATA_SIZE 65536 /* 64KB */
89static uint32_t data[DATA_SIZE / 4]; /* 64KB array */
90static int result;
91
92/**
93 * Shutdown nicely
94 */
95static void
96do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
97{
98 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
99 GNUNET_SCHEDULER_cancel (abort_task);
100 if (NULL != peer1.socket)
101 GNUNET_STREAM_close (peer1.socket);
102 if (NULL != peer2.socket)
103 GNUNET_STREAM_close (peer2.socket);
104 if (NULL != peer2_listen_socket)
105 GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
106}
107
108
109/**
110 * Something went wrong and timed out. Kill everything and set error flag
111 */
112static void
113do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
114{
115 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
116 if (GNUNET_SCHEDULER_NO_TASK != read_task)
117 GNUNET_SCHEDULER_cancel (read_task);
118 result = GNUNET_SYSERR;
119 abort_task = GNUNET_SCHEDULER_NO_TASK;
120 do_close (cls, tc);
121}
122
123
124/**
125 * Completion callback for shutdown
126 *
127 * @param cls the closure from GNUNET_STREAM_shutdown call
128 * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
129 * SHUT_RDWR)
130 */
131static void
132shutdown_completion (void *cls,
133 int operation)
134{
135 static int shutdowns;
136
137 if (++shutdowns == 1)
138 {
139 peer1.shutdown_handle = NULL;
140 peer2.shutdown_handle = GNUNET_STREAM_shutdown (peer2.socket, SHUT_RDWR,
141 &shutdown_completion, cls);
142 return;
143 }
144 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
145 GNUNET_SCHEDULER_add_now (&do_close, cls);
146}
147
148
149/**
150 * Shutdown sockets gracefully
151 */
152static void
153do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
154{
155 result = GNUNET_OK;
156 peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, SHUT_RDWR,
157 &shutdown_completion, cls);
158}
159
160
161/**
162 * The write completion function; called upon writing some data to stream or
163 * upon error
164 *
165 * @param cls the closure from GNUNET_STREAM_write/read
166 * @param status the status of the stream at the time this function is called
167 * @param size the number of bytes read or written
168 */
169static void
170write_completion (void *cls,
171 enum GNUNET_STREAM_Status status,
172 size_t size)
173{
174 struct PeerData *peer;
175
176 peer = (struct PeerData *) cls;
177 GNUNET_assert (GNUNET_STREAM_OK == status);
178 GNUNET_assert (size <= DATA_SIZE);
179 peer->bytes_wrote += size;
180
181 if (peer->bytes_wrote < DATA_SIZE) /* Have more data to send */
182 {
183 peer->io_write_handle =
184 GNUNET_STREAM_write (peer->socket,
185 ((void *) data) + peer->bytes_wrote,
186 sizeof (data) - peer->bytes_wrote,
187 GNUNET_TIME_relative_multiply
188 (GNUNET_TIME_UNIT_SECONDS, 5),
189 &write_completion,
190 cls);
191 GNUNET_assert (NULL != peer->io_write_handle);
192 }
193 else
194 {
195 LOG (GNUNET_ERROR_TYPE_DEBUG, "Writing successfully finished\n");
196 result = GNUNET_OK;
197 GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
198 }
199}
200
201
202/**
203 * Task for calling STREAM_write with a chunk of random data
204 *
205 * @param cls the peer data entity
206 * @param tc the task context
207 */
208static void
209stream_write_task (void *cls,
210 const struct GNUNET_SCHEDULER_TaskContext *tc)
211{
212 struct PeerData *peer=cls;
213 unsigned int count;
214
215 write_task = GNUNET_SCHEDULER_NO_TASK;
216 for (count=0; count < DATA_SIZE / 4; count++)
217 {
218 data[count]=GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
219 UINT32_MAX);
220 }
221 LOG (GNUNET_ERROR_TYPE_DEBUG, "Generation of random data complete\n");
222 peer->io_write_handle = GNUNET_STREAM_write (peer->socket,
223 data,
224 sizeof (data),
225 GNUNET_TIME_relative_multiply
226 (GNUNET_TIME_UNIT_SECONDS, 10),
227 &write_completion,
228 peer);
229 GNUNET_assert (NULL != peer->io_write_handle);
230}
231
232
233/**
234 * Function executed after stream has been established
235 *
236 * @param cls the closure from GNUNET_STREAM_open
237 * @param socket socket to use to communicate with the other side (read/write)
238 */
239static void
240stream_open_cb (void *cls,
241 struct GNUNET_STREAM_Socket *socket)
242{
243 struct PeerData *peer;
244
245 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
246 peer = (struct PeerData *) cls;
247 peer->bytes_wrote = 0;
248 GNUNET_assert (socket == peer1.socket);
249 GNUNET_assert (socket == peer->socket);
250 write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
251}
252
253
254/**
255 * Scheduler call back; to be executed when a new stream is connected
256 * Called from listen connect for peer2
257 */
258static void
259stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
260
261
262/**
263 * Input processor
264 *
265 * @param cls peer2
266 * @param status the status of the stream at the time this function is called
267 * @param data traffic from the other side
268 * @param size the number of bytes available in data read
269 * @return number of bytes of processed from 'data' (any data remaining should be
270 * given to the next time the read processor is called).
271 */
272static size_t
273input_processor (void *cls,
274 enum GNUNET_STREAM_Status status,
275 const void *input_data,
276 size_t size)
277{
278 struct PeerData *peer = cls;
279
280 GNUNET_assert (GNUNET_STREAM_OK == status);
281 GNUNET_assert (&peer2 == peer);
282 GNUNET_assert (size < DATA_SIZE);
283 GNUNET_assert (0 == memcmp (((void *)data ) + peer->bytes_read,
284 input_data, size));
285 peer->bytes_read += size;
286
287 if (peer->bytes_read < DATA_SIZE)
288 {
289 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
290 read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
291 /* peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) */
292 /* peer->socket, */
293 /* GNUNET_TIME_relative_multiply */
294 /* (GNUNET_TIME_UNIT_SECONDS, 5), */
295 /* &input_processor, */
296 /* cls); */
297 /* GNUNET_assert (NULL != peer->io_read_handle); */
298 }
299 else
300 {
301 /* Peer2 has completed reading*/
302 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reading finished successfully\n");
303 }
304 return size;
305}
306
307
308/**
309 * Scheduler call back; to be executed when a new stream is connected
310 * Called from listen connect for peer2
311 */
312static void
313stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
314{
315 struct PeerData *peer = cls;
316
317 read_task = GNUNET_SCHEDULER_NO_TASK;
318 GNUNET_assert (&peer2 == peer);
319 peer->io_read_handle =
320 GNUNET_STREAM_read (peer->socket,
321 GNUNET_TIME_relative_multiply
322 (GNUNET_TIME_UNIT_SECONDS, 10),
323 &input_processor,
324 peer);
325 GNUNET_assert (NULL != peer->io_read_handle);
326}
327
328
329/**
330 * Functions of this type are called upon new stream connection from other peers
331 *
332 * @param cls the closure from GNUNET_STREAM_listen
333 * @param socket the socket representing the stream
334 * @param initiator the identity of the peer who wants to establish a stream
335 * with us
336 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
337 * stream (the socket will be invalid after the call)
338 */
339static int
340stream_listen_cb (void *cls,
341 struct GNUNET_STREAM_Socket *socket,
342 const struct GNUNET_PeerIdentity *initiator)
343{
344 if ((NULL == socket) || (NULL == initiator))
345 {
346 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
347 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
348 GNUNET_SCHEDULER_cancel (abort_task);
349 abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
350 return GNUNET_OK;
351 }
352 GNUNET_assert (NULL != socket);
353 GNUNET_assert (socket != peer1.socket);
354
355 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
356 "Peer connected: %s\n", GNUNET_i2s(initiator));
357
358 peer2.socket = socket;
359 peer2.bytes_read = 0;
360 read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
361 return GNUNET_OK;
362}
363
364
365/**
366 * Listen success callback; connects a peer to stream as client
367 */
368static void
369stream_connect (void)
370{
371 struct PeerData *peer = &peer1;
372
373 /* Connect to stream */
374 peer->socket = GNUNET_STREAM_open (config,
375 &peer2.self, /* Null for local peer? */
376 10, /* App port */
377 &stream_open_cb, &peer1,
378 GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE, 500,
379 GNUNET_STREAM_OPTION_END);
380 GNUNET_assert (NULL != peer->socket);
381}
382
383
384/**
385 * Initialize framework and start test
386 *
387 * @param cls closure
388 * @param cfg configuration of the peer that was started
389 * @param peer identity of the peer that was created
390 */
391static void
392run (void *cls,
393 const struct GNUNET_CONFIGURATION_Handle *cfg,
394 struct GNUNET_TESTING_Peer *peer)
395{
396 struct GNUNET_PeerIdentity self;
397
398 GNUNET_TESTING_peer_get_identity (peer, &self);
399 config = cfg;
400 peer2_listen_socket =
401 GNUNET_STREAM_listen (config,
402 10, /* App port */
403 &stream_listen_cb,
404 NULL,
405 GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
406 &stream_connect,
407 GNUNET_STREAM_OPTION_END);
408 GNUNET_assert (NULL != peer2_listen_socket);
409 peer1.self = self;
410 peer2.self = self;
411 abort_task =
412 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
413 (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
414 NULL);
415}
416
417/**
418 * Main function
419 */
420int main (int argc, char **argv)
421{
422 if (0 != GNUNET_TESTING_peer_run ("test_stream_big",
423 "test_stream_local.conf",
424 &run, NULL))
425 return 1;
426 return (GNUNET_SYSERR == result) ? 1 : 0;
427}
428
429/* end of test_stream_big.c */
diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c
deleted file mode 100644
index cb23cd5e5..000000000
--- a/src/stream/test_stream_local.c
+++ /dev/null
@@ -1,439 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file stream/test_stream_local.c
23 * @brief Stream API testing between local peers
24 * @author Sree Harsha Totakura
25 */
26
27#include <string.h>
28
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_mesh_service.h"
32#include "gnunet_stream_lib.h"
33#include "gnunet_testing_lib.h"
34
35/**
36 * Relative seconds shorthand
37 */
38#define TIME_REL_SECS(sec) \
39 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
40
41
42/**
43 * Structure for holding peer's sockets and IO Handles
44 */
45struct PeerData
46{
47 /**
48 * Peer's stream socket
49 */
50 struct GNUNET_STREAM_Socket *socket;
51
52 /**
53 * Peer's io write handle
54 */
55 struct GNUNET_STREAM_WriteHandle *io_write_handle;
56
57 /**
58 * Peer's io read handle
59 */
60 struct GNUNET_STREAM_ReadHandle *io_read_handle;
61
62 /**
63 * Peer's shutdown handle
64 */
65 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
66
67 /**
68 * Bytes the peer has written
69 */
70 unsigned int bytes_wrote;
71
72 /**
73 * Byte the peer has read
74 */
75 unsigned int bytes_read;
76};
77
78static struct PeerData peer1;
79static struct PeerData peer2;
80static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
81static const struct GNUNET_CONFIGURATION_Handle *config;
82static struct GNUNET_TESTING_Peer *self;
83static struct GNUNET_PeerIdentity self_id;
84
85static GNUNET_SCHEDULER_TaskIdentifier abort_task;
86
87static char *data = "ABCD";
88static int result;
89
90static int writing_success;
91static int reading_success;
92
93
94/**
95 * Input processor
96 *
97 * @param cls the closure from GNUNET_STREAM_write/read
98 * @param status the status of the stream at the time this function is called
99 * @param data traffic from the other side
100 * @param size the number of bytes available in data read
101 * @return number of bytes of processed from 'data' (any data remaining should be
102 * given to the next time the read processor is called).
103 */
104static size_t
105input_processor (void *cls,
106 enum GNUNET_STREAM_Status status,
107 const void *input_data,
108 size_t size);
109
110/**
111 * Task for calling STREAM_read
112 *
113 * @param cls the peer data entity
114 * @param tc the task context
115 */
116static void
117stream_read_task (void *cls,
118 const struct GNUNET_SCHEDULER_TaskContext *tc)
119{
120 struct PeerData *peer = cls;
121
122 peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
123 GNUNET_TIME_relative_multiply
124 (GNUNET_TIME_UNIT_SECONDS, 5),
125 &input_processor,
126 peer);
127 GNUNET_assert (NULL != peer->io_read_handle);
128}
129
130
131/**
132 * The write completion function; called upon writing some data to stream or
133 * upon error
134 *
135 * @param cls the closure from GNUNET_STREAM_write/read
136 * @param status the status of the stream at the time this function is called
137 * @param size the number of bytes read or written
138 */
139static void
140write_completion (void *cls,
141 enum GNUNET_STREAM_Status status,
142 size_t size);
143
144
145/**
146 * Task for calling STREAM_write
147 *
148 * @param cls the peer data entity
149 * @param tc the task context
150 */
151static void
152stream_write_task (void *cls,
153 const struct GNUNET_SCHEDULER_TaskContext *tc)
154{
155 struct PeerData *peer = cls;
156
157 peer->io_write_handle =
158 GNUNET_STREAM_write (peer->socket,
159 (void *) data,
160 strlen(data) - peer->bytes_wrote,
161 GNUNET_TIME_relative_multiply
162 (GNUNET_TIME_UNIT_SECONDS, 5),
163 &write_completion,
164 peer);
165
166 GNUNET_assert (NULL != peer->io_write_handle);
167 }
168
169
170/**
171 * Shutdown nicely
172 */
173static void
174do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
175{
176 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
177 GNUNET_SCHEDULER_cancel (abort_task);
178 if (NULL != peer1.socket)
179 GNUNET_STREAM_close (peer1.socket);
180 if (NULL != peer2.socket)
181 GNUNET_STREAM_close (peer2.socket);
182 if (NULL != peer2_listen_socket)
183 GNUNET_STREAM_listen_close (peer2_listen_socket);
184}
185
186
187/**
188 * Something went wrong and timed out. Kill everything and set error flag
189 */
190static void
191do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
192{
193 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
194 result = GNUNET_SYSERR;
195 abort_task = GNUNET_SCHEDULER_NO_TASK;
196 do_close (cls, tc);
197}
198
199
200/**
201 * Completion callback for shutdown
202 *
203 * @param cls the closure from GNUNET_STREAM_shutdown call
204 * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
205 * SHUT_RDWR)
206 */
207static void
208shutdown_completion (void *cls,
209 int operation)
210{
211 static int shutdowns;
212
213 if (++shutdowns == 1)
214 {
215 peer1.shutdown_handle = NULL;
216 peer2.shutdown_handle = GNUNET_STREAM_shutdown (peer2.socket, SHUT_RDWR,
217 &shutdown_completion, cls);
218 return;
219 }
220 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
221 GNUNET_SCHEDULER_add_now (&do_close, cls);
222}
223
224
225/**
226 * Shutdown sockets gracefully
227 */
228static void
229do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
230{
231 result = GNUNET_OK;
232 peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, SHUT_RDWR,
233 &shutdown_completion, cls);
234}
235
236
237/**
238 * The write completion function; called upon writing some data to stream or
239 * upon error
240 *
241 * @param cls the closure from GNUNET_STREAM_write/read
242 * @param status the status of the stream at the time this function is called
243 * @param size the number of bytes read or written
244 */
245static void
246write_completion (void *cls,
247 enum GNUNET_STREAM_Status status,
248 size_t size)
249{
250 struct PeerData *peer=cls;
251
252 GNUNET_assert (GNUNET_STREAM_OK == status);
253 GNUNET_assert (size <= strlen (data));
254 peer->bytes_wrote += size;
255 if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
256 {
257 GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
258 }
259 else
260 {
261 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
262 "Writing completed\n");
263 if (&peer1 == peer) /* Peer1 has finished writing; should read now */
264 {
265 peer->bytes_read = 0;
266 GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
267 }
268 else
269 {
270 writing_success = GNUNET_YES;
271 if (GNUNET_YES == reading_success)
272 GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
273 }
274 }
275}
276
277
278/**
279 * Function executed after stream has been established
280 *
281 * @param cls the closure from GNUNET_STREAM_open
282 * @param socket socket to use to communicate with the other side (read/write)
283 */
284static void
285stream_open_cb (void *cls,
286 struct GNUNET_STREAM_Socket *socket)
287{
288 struct PeerData *peer=cls;
289
290 GNUNET_assert (&peer1 == peer);
291 GNUNET_assert (socket == peer1.socket);
292 GNUNET_assert (socket == peer->socket);
293 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
294 peer->bytes_wrote = 0;
295 GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
296}
297
298
299/**
300 * Input processor
301 *
302 * @param cls the closure from GNUNET_STREAM_write/read
303 * @param status the status of the stream at the time this function is called
304 * @param data traffic from the other side
305 * @param size the number of bytes available in data read
306 * @return number of bytes of processed from 'data' (any data remaining should be
307 * given to the next time the read processor is called).
308 */
309static size_t
310input_processor (void *cls,
311 enum GNUNET_STREAM_Status status,
312 const void *input_data,
313 size_t size)
314{
315 struct PeerData *peer = cls;
316
317 GNUNET_assert (GNUNET_STREAM_OK == status);
318 GNUNET_assert (size <= strlen (data));
319 GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read,
320 (const char *) input_data,
321 size));
322 peer->bytes_read += size;
323 if (peer->bytes_read < strlen (data))
324 {
325 GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
326 }
327 else
328 {
329 if (&peer2 == peer) /* Peer2 has completed reading; should write */
330 {
331 peer->bytes_wrote = 0;
332 GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
333 }
334 else /* Peer1 has completed reading. End of tests */
335 {
336 reading_success = GNUNET_YES;
337 if (GNUNET_YES == writing_success)
338 GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
339 }
340 }
341 return size;
342}
343
344
345/**
346 * Functions of this type are called upon new stream connection from other peers
347 *
348 * @param cls the PeerData of peer2
349 * @param socket the socket representing the stream
350 * @param initiator the identity of the peer who wants to establish a stream
351 * with us
352 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
353 * stream (the socket will be invalid after the call)
354 */
355static int
356stream_listen_cb (void *cls,
357 struct GNUNET_STREAM_Socket *socket,
358 const struct GNUNET_PeerIdentity *initiator)
359{
360 struct PeerData *peer=cls;
361
362 if ((NULL == socket) || (NULL == initiator))
363 {
364 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
365 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
366 GNUNET_SCHEDULER_cancel (abort_task);
367 abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
368 return GNUNET_OK;
369 }
370 GNUNET_assert (NULL != socket);
371 GNUNET_assert (socket != peer1.socket);
372 GNUNET_assert (&peer2 == peer);
373 GNUNET_assert (0 == memcmp (&self_id,
374 initiator,
375 sizeof (struct GNUNET_PeerIdentity)));
376 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
377 "Peer connected: %s\n", GNUNET_i2s(initiator));
378 peer->socket = socket;
379 peer->bytes_read = 0;
380 GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
381 return GNUNET_OK;
382}
383
384
385/**
386 * Listen success callback; connects a peer to stream as client
387 */
388static void
389stream_connect (void)
390{
391 peer1.socket = GNUNET_STREAM_open (config,
392 &self_id,
393 10, /* App port */
394 &stream_open_cb,
395 &peer1,
396 GNUNET_STREAM_OPTION_END);
397 GNUNET_assert (NULL != peer1.socket);
398}
399
400
401/**
402 * Initialize framework and start test
403 */
404static void
405run (void *cls,
406 const struct GNUNET_CONFIGURATION_Handle *cfg,
407 struct GNUNET_TESTING_Peer *peer)
408{
409 config = cfg;
410 self = peer;
411 GNUNET_TESTING_peer_get_identity (peer, &self_id);
412 peer2_listen_socket =
413 GNUNET_STREAM_listen (config,
414 10, /* App port */
415 &stream_listen_cb,
416 &peer2,
417 GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
418 &stream_connect,
419 GNUNET_STREAM_OPTION_END);
420 GNUNET_assert (NULL != peer2_listen_socket);
421 abort_task =
422 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
423 (GNUNET_TIME_UNIT_SECONDS, 30), &do_abort,
424 NULL);
425}
426
427/**
428 * Main function
429 */
430int main (int argc, char **argv)
431{
432 if (0 != GNUNET_TESTING_peer_run ("test_stream_local",
433 "test_stream_local.conf",
434 &run, NULL))
435 return 1;
436 return (GNUNET_SYSERR == result) ? 1 : 0;
437}
438
439/* end of test_stream_local.c */
diff --git a/src/stream/test_stream_local.conf b/src/stream/test_stream_local.conf
deleted file mode 100644
index dc4929a62..000000000
--- a/src/stream/test_stream_local.conf
+++ /dev/null
@@ -1,85 +0,0 @@
1[lockmanager]
2AUTOSTART = NO
3ACCEPT_FROM = 127.0.0.1;
4HOSTNAME = localhost
5PORT = 12101
6
7[statistics]
8AUTOSTART = YES
9ACCEPT_FROM = 127.0.0.1;
10PORT = 12102
11
12[fs]
13AUTOSTART = NO
14
15[resolver]
16AUTOSTART = NO
17
18[mesh]
19AUTOSTART = YES
20ACCEPT_FROM = 127.0.0.1;
21HOSTNAME = localhost
22PORT = 10700
23# PREFIX = valgrind --leak-check=full
24# PREFIX = xterm -geometry 100x85 -T peer1 -e gdb --args
25
26[dht]
27AUTOSTART = YES
28ACCEPT_FROM6 = ::1;
29ACCEPT_FROM = 127.0.0.1;
30HOSTNAME = localhost
31PORT = 12100
32
33[dhtcache]
34QUOTA = 1 MB
35DATABASE = sqlite
36
37[transport]
38PLUGINS = tcp
39ACCEPT_FROM6 = ::1;
40ACCEPT_FROM = 127.0.0.1;
41NEIGHBOUR_LIMIT = 50
42PORT = 12365
43
44[ats]
45WAN_QUOTA_OUT = 3932160
46WAN_QUOTA_IN = 3932160
47
48[core]
49PORT = 12092
50
51[arm]
52DEFAULTSERVICES = core lockmanager
53PORT = 12366
54
55[transport-tcp]
56TIMEOUT = 300 s
57PORT = 12368
58
59[TESTING]
60WEAKRANDOM = YES
61
62[testbed]
63OVERLAY_TOPOLOGY = LINE
64
65[gnunetd]
66HOSTKEY = $SERVICEHOME/.hostkey
67
68[PATHS]
69SERVICEHOME = /tmp/test-stream/
70
71[dns]
72AUTOSTART = NO
73
74[nse]
75AUTOSTART = NO
76
77[vpn]
78AUTOSTART = NO
79
80[nat]
81RETURN_LOCAL_ADDRESSES = YES
82
83[consensus]
84AUTOSTART = NO
85
diff --git a/src/stream/test_stream_sequence_wraparound.c b/src/stream/test_stream_sequence_wraparound.c
deleted file mode 100644
index 4c2329d26..000000000
--- a/src/stream/test_stream_sequence_wraparound.c
+++ /dev/null
@@ -1,425 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file stream/test_stream_sequence_wraparound.c
23 * @brief test cases for sequence wrap around situations during data transfer
24 * @author Sree Harsha Totakura
25 */
26
27#include <string.h>
28
29#include "platform.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_stream_lib.h"
32#include "gnunet_testing_lib.h"
33
34/**
35 * Generic logging shorthand
36 */
37#define LOG(kind, ...) \
38 GNUNET_log (kind, __VA_ARGS__);
39
40/**
41 * Relative seconds shorthand
42 */
43#define TIME_REL_SECS(sec) \
44 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
45
46/**
47 * Structure for holding peer's sockets and IO Handles
48 */
49struct PeerData
50{
51 /**
52 * Peer's stream socket
53 */
54 struct GNUNET_STREAM_Socket *socket;
55
56 /**
57 * Peer's io write handle
58 */
59 struct GNUNET_STREAM_WriteHandle *io_write_handle;
60
61 /**
62 * Peer's io read handle
63 */
64 struct GNUNET_STREAM_ReadHandle *io_read_handle;
65
66 /**
67 * Peer's shutdown handle
68 */
69 struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
70
71 /**
72 * Bytes the peer has written
73 */
74 unsigned int bytes_wrote;
75
76 /**
77 * Byte the peer has read
78 */
79 unsigned int bytes_read;
80};
81
82static struct PeerData peer1;
83static struct PeerData peer2;
84static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
85static const struct GNUNET_CONFIGURATION_Handle *config;
86static struct GNUNET_TESTING_Peer *self;
87static struct GNUNET_PeerIdentity self_id;
88
89static GNUNET_SCHEDULER_TaskIdentifier abort_task;
90static GNUNET_SCHEDULER_TaskIdentifier read_task;
91static GNUNET_SCHEDULER_TaskIdentifier write_task;
92
93#define DATA_SIZE 65536 /* 64KB */
94static uint32_t data[DATA_SIZE / 4]; /* 64KB array */
95static int result;
96
97/**
98 * Shutdown nicely
99 */
100static void
101do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
102{
103 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
104 GNUNET_SCHEDULER_cancel (abort_task);
105 if (NULL != peer1.socket)
106 GNUNET_STREAM_close (peer1.socket);
107 if (NULL != peer2.socket)
108 GNUNET_STREAM_close (peer2.socket);
109 if (NULL != peer2_listen_socket)
110 GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
111}
112
113
114/**
115 * Something went wrong and timed out. Kill everything and set error flag
116 */
117static void
118do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
119{
120 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
121 if (GNUNET_SCHEDULER_NO_TASK != read_task)
122 {
123 GNUNET_SCHEDULER_cancel (read_task);
124 }
125 result = GNUNET_SYSERR;
126 abort_task = GNUNET_SCHEDULER_NO_TASK;
127 do_close (cls, tc);
128}
129
130
131/**
132 * Completion callback for shutdown
133 *
134 * @param cls the closure from GNUNET_STREAM_shutdown call
135 * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
136 * SHUT_RDWR)
137 */
138static void
139shutdown_completion (void *cls,
140 int operation)
141{
142 static int shutdowns;
143
144 if (++shutdowns == 1)
145 {
146 peer1.shutdown_handle = NULL;
147 peer2.shutdown_handle = GNUNET_STREAM_shutdown (peer2.socket, SHUT_RDWR,
148 &shutdown_completion, cls);
149 return;
150 }
151 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
152 GNUNET_SCHEDULER_add_now (&do_close, cls);
153}
154
155
156/**
157 * Shutdown sockets gracefully
158 */
159static void
160do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
161{
162 result = GNUNET_OK;
163 peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, SHUT_RDWR,
164 &shutdown_completion, cls);
165}
166
167
168/**
169 * The write completion function; called upon writing some data to stream or
170 * upon error
171 *
172 * @param cls the closure from GNUNET_STREAM_write/read
173 * @param status the status of the stream at the time this function is called
174 * @param size the number of bytes read or written
175 */
176static void
177write_completion (void *cls,
178 enum GNUNET_STREAM_Status status,
179 size_t size)
180{
181 struct PeerData *peer;
182
183 peer = (struct PeerData *) cls;
184 GNUNET_assert (GNUNET_STREAM_OK == status);
185 GNUNET_assert (size <= DATA_SIZE);
186 peer->bytes_wrote += size;
187
188 if (peer->bytes_wrote < DATA_SIZE) /* Have more data to send */
189 {
190 peer->io_write_handle =
191 GNUNET_STREAM_write (peer->socket,
192 ((void *) data) + peer->bytes_wrote,
193 DATA_SIZE - peer->bytes_wrote,
194 GNUNET_TIME_relative_multiply
195 (GNUNET_TIME_UNIT_SECONDS, 5),
196 &write_completion,
197 cls);
198 GNUNET_assert (NULL != peer->io_write_handle);
199 }
200 else
201 {
202 LOG (GNUNET_ERROR_TYPE_DEBUG, "Writing successfully finished\n");
203 result = GNUNET_OK;
204 GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
205 }
206}
207
208
209/**
210 * Task for calling STREAM_write with a chunk of random data
211 *
212 * @param cls the peer data entity
213 * @param tc the task context
214 */
215static void
216stream_write_task (void *cls,
217 const struct GNUNET_SCHEDULER_TaskContext *tc)
218{
219 struct PeerData *peer=cls;
220 unsigned int count;
221
222 write_task = GNUNET_SCHEDULER_NO_TASK;
223 for (count=0; count < DATA_SIZE / 4; count++)
224 {
225 data[count]=GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
226 UINT32_MAX);
227 }
228 LOG (GNUNET_ERROR_TYPE_DEBUG, "Generation of random data complete\n");
229 peer->io_write_handle = GNUNET_STREAM_write (peer->socket,
230 data,
231 DATA_SIZE,
232 GNUNET_TIME_relative_multiply
233 (GNUNET_TIME_UNIT_SECONDS, 10),
234 &write_completion,
235 peer);
236 GNUNET_assert (NULL != peer->io_write_handle);
237}
238
239
240/**
241 * Function executed after stream has been established
242 *
243 * @param cls the closure from GNUNET_STREAM_open
244 * @param socket socket to use to communicate with the other side (read/write)
245 */
246static void
247stream_open_cb (void *cls,
248 struct GNUNET_STREAM_Socket *socket)
249{
250 struct PeerData *peer;
251
252 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
253 peer = (struct PeerData *) cls;
254 peer->bytes_wrote = 0;
255 GNUNET_assert (socket == peer1.socket);
256 GNUNET_assert (socket == peer->socket);
257 write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
258}
259
260
261/**
262 * Scheduler call back; to be executed when a new stream is connected
263 * Called from listen connect for peer2
264 */
265static void
266stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
267
268
269/**
270 * Input processor
271 *
272 * @param cls peer2
273 * @param status the status of the stream at the time this function is called
274 * @param data traffic from the other side
275 * @param size the number of bytes available in data read
276 * @return number of bytes of processed from 'data' (any data remaining should be
277 * given to the next time the read processor is called).
278 */
279static size_t
280input_processor (void *cls,
281 enum GNUNET_STREAM_Status status,
282 const void *input_data,
283 size_t size)
284{
285 struct PeerData *peer = cls;
286
287 GNUNET_assert (GNUNET_STREAM_OK == status);
288 GNUNET_assert (&peer2 == peer);
289 GNUNET_assert (size < DATA_SIZE);
290 GNUNET_assert (0 == memcmp (((void *)data ) + peer->bytes_read,
291 input_data, size));
292 peer->bytes_read += size;
293
294 if (peer->bytes_read < DATA_SIZE)
295 {
296 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
297 read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
298 }
299 else
300 {
301 /* Peer2 has completed reading*/
302 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reading finished successfully\n");
303 }
304 return size;
305}
306
307
308/**
309 * Scheduler call back; to be executed when a new stream is connected
310 * Called from listen connect for peer2
311 */
312static void
313stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
314{
315 struct PeerData *peer = cls;
316
317 read_task = GNUNET_SCHEDULER_NO_TASK;
318 GNUNET_assert (&peer2 == peer);
319 peer->io_read_handle =
320 GNUNET_STREAM_read (peer->socket,
321 GNUNET_TIME_relative_multiply
322 (GNUNET_TIME_UNIT_SECONDS, 10),
323 &input_processor,
324 peer);
325 GNUNET_assert (NULL != peer->io_read_handle);
326}
327
328
329/**
330 * Functions of this type are called upon new stream connection from other peers
331 *
332 * @param cls the closure from GNUNET_STREAM_listen
333 * @param socket the socket representing the stream
334 * @param initiator the identity of the peer who wants to establish a stream
335 * with us
336 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
337 * stream (the socket will be invalid after the call)
338 */
339static int
340stream_listen_cb (void *cls,
341 struct GNUNET_STREAM_Socket *socket,
342 const struct GNUNET_PeerIdentity *initiator)
343{
344 if ((NULL == socket) || (NULL == initiator))
345 {
346 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
347 if (GNUNET_SCHEDULER_NO_TASK != abort_task)
348 GNUNET_SCHEDULER_cancel (abort_task);
349 abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
350 return GNUNET_OK;
351 }
352 GNUNET_assert (NULL != socket);
353 GNUNET_assert (socket != peer1.socket);
354 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
355 "Peer connected: %s\n", GNUNET_i2s(initiator));
356 peer2.socket = socket;
357 peer2.bytes_read = 0;
358 read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
359 return GNUNET_OK;
360}
361
362
363/**
364 * Listen success callback; connects a peer to stream as client
365 */
366static void
367stream_connect (void)
368{
369 peer1.socket =
370 GNUNET_STREAM_open (config,
371 &self_id, /* Null for local peer? */
372 10, /* App port */
373 &stream_open_cb,
374 &peer1,
375 GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER,
376 UINT32_MAX - GNUNET_CRYPTO_random_u32
377 (GNUNET_CRYPTO_QUALITY_WEAK, 64),
378 GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE, 500,
379 GNUNET_STREAM_OPTION_END);
380 GNUNET_assert (NULL != peer1.socket);
381}
382
383
384/**
385 * Initialize framework and start test
386 */
387static void
388run (void *cls,
389 const struct GNUNET_CONFIGURATION_Handle *cfg,
390 struct GNUNET_TESTING_Peer *peer)
391{
392 config = cfg;
393 self = peer;
394 (void) GNUNET_TESTING_peer_get_identity (peer, &self_id);
395 peer2_listen_socket =
396 GNUNET_STREAM_listen (config,
397 10, /* App port */
398 &stream_listen_cb,
399 NULL,
400 GNUNET_STREAM_OPTION_LISTEN_TIMEOUT,
401 60 * 1000,
402 GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
403 &stream_connect,
404 GNUNET_STREAM_OPTION_END);
405 GNUNET_assert (NULL != peer2_listen_socket);
406 abort_task =
407 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
408 (GNUNET_TIME_UNIT_SECONDS, 100), &do_abort,
409 NULL);
410}
411
412
413/**
414 * Main function
415 */
416int main (int argc, char **argv)
417{
418 if (0 != GNUNET_TESTING_peer_run ("test_stream_sequence_wraparound",
419 "test_stream_local.conf",
420 &run, NULL))
421 return 1;
422 return (GNUNET_SYSERR == result) ? 1 : 0;
423}
424
425/* end of test_stream_sequence_wraparound.c */