aboutsummaryrefslogtreecommitdiff
path: root/src/stream/stream_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2012-02-03 10:01:23 +0000
committerChristian Grothoff <christian@grothoff.org>2012-02-03 10:01:23 +0000
commit3bbaca39b63c43f2969d30338f8a00874c68d29b (patch)
treea55b588ffeba08b475ad460c40ef14f86e590497 /src/stream/stream_api.c
parent3d2cb5ed7e56049c905ef81dbcc35529827b0a9f (diff)
downloadgnunet-3bbaca39b63c43f2969d30338f8a00874c68d29b.tar.gz
gnunet-3bbaca39b63c43f2969d30338f8a00874c68d29b.zip
-misc stream hxing
Diffstat (limited to 'src/stream/stream_api.c')
-rw-r--r--src/stream/stream_api.c158
1 files changed, 72 insertions, 86 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 25b876333..3dcb2d85a 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -23,11 +23,10 @@
23 * @brief Implementation of the stream library 23 * @brief Implementation of the stream library
24 * @author Sree Harsha Totakura 24 * @author Sree Harsha Totakura
25 */ 25 */
26 26#include "platform.h"
27#include "gnunet_common.h" 27#include "gnunet_common.h"
28#include "gnunet_stream_lib.h" 28#include "gnunet_stream_lib.h"
29 29
30
31/** 30/**
32 * states in the Protocol 31 * states in the Protocol
33 */ 32 */
@@ -103,12 +102,12 @@ struct GNUNET_STREAM_Socket
103 /** 102 /**
104 * The session id associated with this stream connection 103 * The session id associated with this stream connection
105 */ 104 */
106 unint32_t session_id; 105 uint32_t session_id;
107 106
108 /** 107 /**
109 * The peer identity of the peer at the other end of the stream 108 * The peer identity of the peer at the other end of the stream
110 */ 109 */
111 GNUNET_PeerIdentity *other_peer; 110 GNUNET_PeerIdentity other_peer;
112 111
113 /** 112 /**
114 * Stream open closure 113 * Stream open closure
@@ -182,35 +181,6 @@ static unsigned int default_timeout = 300;
182 181
183 182
184/** 183/**
185 * Converts message fields from host byte order to network byte order
186 *
187 * @param msg the message to convert
188 */
189static void
190GNUNET_STREAM_convert_message_h2n (struct GNUNET_STREAM_MessageHeader *msg)
191{
192 /* Add type specific message conversion here */
193
194 msg->size = htons (msg->size);
195 msg->type = htons (msg->type);
196}
197
198
199/**
200 * Converts message fields from network byte order to host byte order
201 *
202 * @param msg the messeage to convert
203 */
204static void
205GNUNET_STREAM_convert_message_n2h (struct GNUNET_STREAM_MessageHeader *msg)
206{
207 msg->size = ntohs (msg->size);
208 msg->type = ntohs (msg->type);
209
210 /* Add type specific message conversion here */
211}
212
213/**
214 * Callback function from send_message 184 * Callback function from send_message
215 * 185 *
216 * @param cls closure the socket on which the send message was called 186 * @param cls closure the socket on which the send message was called
@@ -221,24 +191,28 @@ GNUNET_STREAM_convert_message_n2h (struct GNUNET_STREAM_MessageHeader *msg)
221static size_t 191static size_t
222send_message_notify (void *cls, size_t size, void *buf) 192send_message_notify (void *cls, size_t size, void *buf)
223{ 193{
224 struct GNUNET_STREAM_Socket *socket; 194 struct GNUNET_STREAM_Socket *socket = cls;
195 size_t ret;
225 196
226 socket = (struct GNUNET_STREAM_Socket *) cls;
227 socket->transmit_handle = NULL; /* Remove the transmit handle */ 197 socket->transmit_handle = NULL; /* Remove the transmit handle */
228 if (0 == size) /* Socket closed? */ 198 if (0 == size) /* Socket closed? */
229 { 199 {
200 // statistics ("message timeout")
201
202
230 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 203 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
231 "Message not sent as tunnel was closed \n"); 204 "Message not sent as tunnel was closed \n");
205 ret = 0;
232 } 206 }
233 else /* Size is more or equal to what was requested */ 207 else /* Size is more or equal to what was requested */
234 { 208 {
235 size = socket->message->size; 209 ret = ntohs (socket->message->size);
236 GNUNET_STREAM_convert_message_h2n (socket->message) /* Convert h2n */ 210 GNUNET_assert (size >= ret);
237 memcpy (buf, socket->message, size); 211 memcpy (buf, socket->message, ret);
238 } 212 }
239 GNUNET_free (socket->message); /* Free the message memory */ 213 GNUNET_free (socket->message); /* Free the message memory */
240 socket->message = NULL; 214 socket->message = NULL;
241 return size; 215 return ret;
242} 216}
243 217
244 218
@@ -258,8 +232,8 @@ send_message (struct GNUNET_STREAM_Socket *socket,
258 0, /* Corking */ 232 0, /* Corking */
259 timeout, /* FIXME: Maxdelay */ 233 timeout, /* FIXME: Maxdelay */
260 socket->other_peer, 234 socket->other_peer,
261 message->size, 235 ntohs (message->size),
262 send_message_notify, 236 &send_message_notify,
263 socket); 237 socket);
264} 238}
265 239
@@ -295,24 +269,58 @@ handle_data (void *cls,
295 const struct GNUNET_MessageHeader *message, 269 const struct GNUNET_MessageHeader *message,
296 const struct GNUNET_ATS_Information*atsi) 270 const struct GNUNET_ATS_Information*atsi)
297{ 271{
272 struct GNUNET_STREAM_Socket *socket = cls;
298 uint16_t size; 273 uint16_t size;
299 struct GNUNET_STREAM_MessageHeader *message_copy; 274 const struct GNUNET_STREAM_DataMessage *data_msg;
300 275 const void *payload;
276
301 size = ntohs (message->size); 277 size = ntohs (message->size);
302 message_copy = GNUNET_malloc (size); 278 if (size < sizeof (struct GNUNET_STREAM_DataMessage))
303 memcpy (message_copy, message, size); 279 {
304 GNUNET_STREAM_convert_message_n2h (message_copy); 280 GNUNET_break_op (0);
281 return GNUNET_SYSERR;
282 }
283 data_msg = (const struct GNUNET_STREAM_DataMessage *) message;
284 size -= sizeof (Struct GNUNET_STREAM_DataMessage);
285 payload = &data_msg[1];
286 /* ... */
305 287
306 route_message (message_copy); 288 return GNUNET_OK;
289}
290
291
292/**
293 * Message Handler for mesh
294 *
295 * @param cls closure (set from GNUNET_MESH_connect)
296 * @param tunnel connection to the other end
297 * @param tunnel_ctx place to store local state associated with the tunnel
298 * @param sender who sent the message
299 * @param message the actual message
300 * @param atsi performance data for the connection
301 * @return GNUNET_OK to keep the connection open,
302 * GNUNET_SYSERR to close it (signal serious error)
303 */
304static int
305handle_ack (void *cls,
306 struct GNUNET_MESH_Tunnel *tunnel,
307 void **tunnel_ctx,
308 const struct GNUNET_PeerIdentity *sender,
309 const struct GNUNET_MessageHeader *message,
310 const struct GNUNET_ATS_Information*atsi)
311{
312 struct GNUNET_STREAM_Socket *socket = cls;
313 const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
314
307} 315}
308 316
309 317
310static struct GNUNET_MESH_MessageHandler message_handlers[] = { 318static struct GNUNET_MESH_MessageHandler message_handlers[] = {
311 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0}, 319 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
312 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_ACK, 0}, 320 {&handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, sizeof (struct GNUNET_STREAM_AckMessage) },
313 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 0}, 321 {&handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 0},
314 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, 0}, 322 {&handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK, 0},
315 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RESET, 0}, 323 {&handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET, 0},
316 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, 0}, 324 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE, 0},
317 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK, 0}, 325 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK, 0},
318 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0}, 326 {&handle_data, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE, 0},
@@ -335,9 +343,8 @@ mesh_peer_connect_callback (void *cls,
335 const struct GNUNET_PeerIdentity *peer, 343 const struct GNUNET_PeerIdentity *peer,
336 const struct GNUNET_ATS_Information * atsi) 344 const struct GNUNET_ATS_Information * atsi)
337{ 345{
338 const struct GNUNET_STREAM_Socket *socket; 346 const struct GNUNET_STREAM_Socket *socket = cls;
339 347
340 socket = (const struct GNUNET_STREAM_Socket *) cls;
341 if (0 != memcmp (socket->other_peer, 348 if (0 != memcmp (socket->other_peer,
342 peer, 349 peer,
343 sizeof (struct GNUNET_PeerIdentity))) 350 sizeof (struct GNUNET_PeerIdentity)))
@@ -426,19 +433,9 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
426 va_list vargs; /* Variable arguments */ 433 va_list vargs; /* Variable arguments */
427 434
428 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); 435 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
429 if (NULL == socket) 436 socket->other_peer = *target;
430 { 437 socket->open_cb = open_cb;
431 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 438 socket->open_cls = open_cb_cls;
432 "Unable to allocate memory\n");
433 return NULL;
434 }
435 socket->other_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
436 if (NULL == socket->other_peer)
437 {
438 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
439 "Unable to allocate memory \n");
440 return NULL;
441 }
442 439
443 /* Set defaults */ 440 /* Set defaults */
444 socket->retransmit_timeout = 441 socket->retransmit_timeout =
@@ -462,16 +459,13 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
462 va_end (vargs); /* End of variable args parsing */ 459 va_end (vargs); /* End of variable args parsing */
463 460
464 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */ 461 socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
465 10, /* QUEUE size as parameter? */ 462 1, /* QUEUE size as parameter? */
466 NULL, /* cls */ 463 socket, /* cls */
467 NULL, /* No inbound tunnel handler */ 464 NULL, /* No inbound tunnel handler */
468 NULL, /* No inbound tunnel cleaner */ 465 NULL, /* No inbound tunnel cleaner */
469 message_handlers, 466 message_handlers,
470 NULL); /* We don't get inbound tunnels */ 467 NULL); /* We don't get inbound tunnels */
471 468 // FIXME: if (NULL == socket->mesh) ...
472 memcpy (socket->other_peer, target, sizeof (struct GNUNET_PeerIdentity));
473 socket->open_cb = open_cb;
474 socket->open_cls = open_cb_cls;
475 469
476 /* Now create the mesh tunnel to target */ 470 /* Now create the mesh tunnel to target */
477 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh, 471 socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
@@ -479,6 +473,7 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
479 &mesh_peer_connect_callback, 473 &mesh_peer_connect_callback,
480 &mesh_peer_disconnect_callback, 474 &mesh_peer_disconnect_callback,
481 (void *) socket); 475 (void *) socket);
476 // FIXME: if (NULL == socket->tunnel) ...
482 477
483 return socket; 478 return socket;
484} 479}
@@ -502,8 +497,6 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
502 { 497 {
503 GNUNET_free (socket->message); 498 GNUNET_free (socket->message);
504 } 499 }
505 /* Clear memory allocated for other peer's PeerIdentity */
506 GNUNET_Free (socket->other_peer);
507 /* Close associated tunnel */ 500 /* Close associated tunnel */
508 if (NULL != socket->tunnel) 501 if (NULL != socket->tunnel)
509 { 502 {
@@ -528,16 +521,15 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
528 * @return initial tunnel context for the tunnel 521 * @return initial tunnel context for the tunnel
529 * (can be NULL -- that's not an error) 522 * (can be NULL -- that's not an error)
530 */ 523 */
531void 524static void
532new_tunnel_notify (void *cls, 525new_tunnel_notify (void *cls,
533 struct GNUNET_MESH_Tunnel *tunnel, 526 struct GNUNET_MESH_Tunnel *tunnel,
534 const struct GNUNET_PeerIdentity *initiator, 527 const struct GNUNET_PeerIdentity *initiator,
535 const struct GNUNET_ATS_Information *atsi) 528 const struct GNUNET_ATS_Information *atsi)
536{ 529{
537 struct GNUNET_STREAM_ListenSocket *lsocket; 530 struct GNUNET_STREAM_ListenSocket *lsocket = cls;
538 struct GNUNET_STREAM_Socket *socket; 531 struct GNUNET_STREAM_Socket *socket;
539 532
540 lsocket = (struct GNUNET_STREAM_ListenSocket *) cls;
541 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); 533 socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
542 socket->tunnel = tunnel; 534 socket->tunnel = tunnel;
543 socket->session_id = 0; /* FIXME */ 535 socket->session_id = 0; /* FIXME */
@@ -574,7 +566,7 @@ new_tunnel_notify (void *cls,
574 * @param tunnel_ctx place where local state associated 566 * @param tunnel_ctx place where local state associated
575 * with the tunnel is stored 567 * with the tunnel is stored
576 */ 568 */
577void 569static void
578tunnel_cleaner (void *cls, 570tunnel_cleaner (void *cls,
579 const struct GNUNET_MESH_Tunnel *tunnel, 571 const struct GNUNET_MESH_Tunnel *tunnel,
580 void *tunnel_ctx) 572 void *tunnel_ctx)
@@ -623,12 +615,6 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
623 struct GNUNET_STREAM_ListenSocket *lsocket; 615 struct GNUNET_STREAM_ListenSocket *lsocket;
624 616
625 lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket)); 617 lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
626 if (NULL == lsocket)
627 {
628 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
629 "Unable to allocate memory\n");
630 return NULL;
631 }
632 lsocket->port = app_port; 618 lsocket->port = app_port;
633 lsocket->listen_cb = listen_cb; 619 lsocket->listen_cb = listen_cb;
634 lsocket->listen_cb_cls = listen_cb_cls; 620 lsocket->listen_cb_cls = listen_cb_cls;