aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Wachs <wachs@net.in.tum.de>2011-09-16 16:15:52 +0000
committerMatthias Wachs <wachs@net.in.tum.de>2011-09-16 16:15:52 +0000
commit74b34ed9b400f74a7977e268626b85b51acfedd4 (patch)
tree0b6dcb1a2936d3784b46863e1ea265b703b26e84
parent3ebd0f182125b2c0dbbadf54c5bf723753a7a705 (diff)
downloadgnunet-74b34ed9b400f74a7977e268626b85b51acfedd4.tar.gz
gnunet-74b34ed9b400f74a7977e268626b85b51acfedd4.zip
transmitting data
-rw-r--r--src/transport/plugin_transport_http.h67
-rw-r--r--src/transport/plugin_transport_http_client.c90
-rw-r--r--src/transport/plugin_transport_http_new.c43
-rw-r--r--src/transport/plugin_transport_http_server.c152
4 files changed, 296 insertions, 56 deletions
diff --git a/src/transport/plugin_transport_http.h b/src/transport/plugin_transport_http.h
index b7b89e6e6..5e452ba84 100644
--- a/src/transport/plugin_transport_http.h
+++ b/src/transport/plugin_transport_http.h
@@ -45,6 +45,7 @@
45#define DEBUG_HTTP GNUNET_YES 45#define DEBUG_HTTP GNUNET_YES
46#define VERBOSE_SERVER GNUNET_YES 46#define VERBOSE_SERVER GNUNET_YES
47#define VERBOSE_CLIENT GNUNET_YES 47#define VERBOSE_CLIENT GNUNET_YES
48#define VERBOSE_CURL GNUNET_NO
48 49
49#if BUILD_HTTPS 50#if BUILD_HTTPS
50#define LIBGNUNET_PLUGIN_TRANSPORT_INIT libgnunet_plugin_transport_https_init 51#define LIBGNUNET_PLUGIN_TRANSPORT_INIT libgnunet_plugin_transport_https_init
@@ -180,6 +181,17 @@ struct Session
180 struct Plugin *plugin; 181 struct Plugin *plugin;
181 182
182 /** 183 /**
184 * next pointer for double linked list
185 */
186 struct HTTP_Message *msg_head;
187
188 /**
189 * previous pointer for double linked list
190 */
191 struct HTTP_Message *msg_tail;
192
193
194 /**
183 * message stream tokenizer for incoming data 195 * message stream tokenizer for incoming data
184 */ 196 */
185 struct GNUNET_SERVER_MessageStreamTokenizer *msg_tk; 197 struct GNUNET_SERVER_MessageStreamTokenizer *msg_tk;
@@ -229,6 +241,7 @@ struct Session
229 241
230 void *client_put; 242 void *client_put;
231 void *client_get; 243 void *client_get;
244 int put_paused;
232 245
233 void *server_recv; 246 void *server_recv;
234 void *server_send; 247 void *server_send;
@@ -238,6 +251,49 @@ struct Session
238 251
239}; 252};
240 253
254/**
255 * Message to send using http
256 */
257struct HTTP_Message
258{
259 /**
260 * next pointer for double linked list
261 */
262 struct HTTP_Message *next;
263
264 /**
265 * previous pointer for double linked list
266 */
267 struct HTTP_Message *prev;
268
269 /**
270 * buffer containing data to send
271 */
272 char *buf;
273
274 /**
275 * amount of data already sent
276 */
277 size_t pos;
278
279 /**
280 * buffer length
281 */
282 size_t size;
283
284 /**
285 * Continuation function to call once the transmission buffer
286 * has again space available. NULL if there is no
287 * continuation to call.
288 */
289 GNUNET_TRANSPORT_TransmitContinuation transmit_cont;
290
291 /**
292 * Closure for transmit_cont.
293 */
294 void *transmit_cont_cls;
295};
296
241void 297void
242delete_session (struct Session *s); 298delete_session (struct Session *s);
243 299
@@ -246,6 +302,13 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
246 const void *addr, size_t addrlen, 302 const void *addr, size_t addrlen,
247 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls); 303 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls);
248 304
305struct GNUNET_TIME_Relative
306http_plugin_receive (void *cls, const struct GNUNET_PeerIdentity * peer,
307 const struct GNUNET_MessageHeader * message,
308 struct Session * session,
309 const char *sender_address,
310 uint16_t sender_address_len);
311
249const char * 312const char *
250http_plugin_address_to_string (void *cls, const void *addr, size_t addrlen); 313http_plugin_address_to_string (void *cls, const void *addr, size_t addrlen);
251 314
@@ -256,7 +319,7 @@ int
256client_connect (struct Session *s); 319client_connect (struct Session *s);
257 320
258int 321int
259client_send (struct Session *s, const char *msgbuf, size_t msgbuf_size); 322client_send (struct Session *s, struct HTTP_Message *msg);
260 323
261int 324int
262client_start (struct Plugin *plugin); 325client_start (struct Plugin *plugin);
@@ -268,7 +331,7 @@ int
268server_disconnect (struct Session *s); 331server_disconnect (struct Session *s);
269 332
270int 333int
271server_send (struct Session *s, const char *msgbuf, size_t msgbuf_size); 334server_send (struct Session *s, struct HTTP_Message * msg);
272 335
273int 336int
274server_start (struct Plugin *plugin); 337server_start (struct Plugin *plugin);
diff --git a/src/transport/plugin_transport_http_client.c b/src/transport/plugin_transport_http_client.c
index 04a985906..3b3a4705b 100644
--- a/src/transport/plugin_transport_http_client.c
+++ b/src/transport/plugin_transport_http_client.c
@@ -26,7 +26,7 @@
26 26
27#include "plugin_transport_http.h" 27#include "plugin_transport_http.h"
28 28
29#if VERBOSE_CLIENT 29#if VERBOSE_CURL
30/** 30/**
31 * Function to log curl debug messages with GNUNET_log 31 * Function to log curl debug messages with GNUNET_log
32 * @param curl handle 32 * @param curl handle
@@ -58,8 +58,9 @@ client_log (CURL * curl, curl_infotype type, char *data, size_t size, void *cls)
58#endif 58#endif
59 59
60int 60int
61client_send (struct Session *s, const char *msgbuf, size_t msgbuf_size) 61client_send (struct Session *s, struct HTTP_Message *msg)
62{ 62{
63 GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg);
63 return GNUNET_OK; 64 return GNUNET_OK;
64} 65}
65 66
@@ -183,7 +184,7 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
183 "Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), http_plugin_address_to_string(plugin, s->addr, s->addrlen)); 184 "Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), http_plugin_address_to_string(plugin, s->addr, s->addrlen));
184#endif 185#endif
185 client_disconnect(s); 186 client_disconnect(s);
186 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen)); 187 //GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen));
187 if (s->msg_tk != NULL) 188 if (s->msg_tk != NULL)
188 GNUNET_SERVER_mst_destroy (s->msg_tk); 189 GNUNET_SERVER_mst_destroy (s->msg_tk);
189 notify_session_end (plugin, &s->target, s); 190 notify_session_end (plugin, &s->target, s);
@@ -202,6 +203,8 @@ client_disconnect (struct Session *s)
202 int res = GNUNET_OK; 203 int res = GNUNET_OK;
203 CURLMcode mret; 204 CURLMcode mret;
204 struct Plugin *plugin = s->plugin; 205 struct Plugin *plugin = s->plugin;
206 struct HTTP_Message * msg;
207 struct HTTP_Message * t;
205 208
206#if 0 209#if 0
207 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, 210 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
@@ -241,6 +244,17 @@ client_disconnect (struct Session *s)
241 s->client_get = NULL; 244 s->client_get = NULL;
242 } 245 }
243 246
247 msg = s->msg_head;
248 while (msg != NULL)
249 {
250 t = msg->next;
251 if (NULL != msg->transmit_cont)
252 msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_SYSERR);
253 GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg);
254 GNUNET_free (msg);
255 msg = t;
256 }
257
244 plugin->cur_connections -= 2; 258 plugin->cur_connections -= 2;
245 /* Re-schedule since handles have changed */ 259 /* Re-schedule since handles have changed */
246 if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK) 260 if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK)
@@ -255,20 +269,15 @@ client_disconnect (struct Session *s)
255} 269}
256 270
257static void 271static void
258curl_receive_mst_cb (void *cls, void *client, 272client_receive_mst_cb (void *cls, void *client,
259 const struct GNUNET_MessageHeader *message) 273 const struct GNUNET_MessageHeader *message)
260{ 274{
261 struct Session *s = cls; 275 struct Session *s = cls;
262 struct Plugin *plugin = s->plugin; 276 struct Plugin *plugin = s->plugin;
263 struct GNUNET_TRANSPORT_ATS_Information distance[2];
264 struct GNUNET_TIME_Relative delay; 277 struct GNUNET_TIME_Relative delay;
265 278
266 distance[0].type = htonl (GNUNET_TRANSPORT_ATS_QUALITY_NET_DISTANCE); 279 delay = http_plugin_receive (s, &s->target, message, s, s->addr, s->addrlen);
267 distance[0].value = htonl (1);
268 distance[1].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
269 distance[1].value = htonl (0);
270 280
271 delay = plugin->env->receive (plugin->env->cls, &s->target, message, (const struct GNUNET_TRANSPORT_ATS_Information*) &distance, 2, s, s->addr, s->addrlen);
272 s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay); 281 s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay);
273 282
274 if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value) 283 if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
@@ -290,23 +299,23 @@ curl_receive_mst_cb (void *cls, void *client,
290* @return bytes read from stream 299* @return bytes read from stream
291*/ 300*/
292static size_t 301static size_t
293curl_receive_cb (void *stream, size_t size, size_t nmemb, void *cls) 302client_receive (void *stream, size_t size, size_t nmemb, void *cls)
294{ 303{
295 struct Session *s = cls; 304 struct Session *s = cls;
296 struct Plugin *plugin = s->plugin; 305 struct Plugin *plugin = s->plugin;
297 306
298 if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value) 307 if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
299 { 308 {
300#if DEBUG_HTTP 309#if DEBUG_CLIENT
301 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 310 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
302 "Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n", 311 "no inbound bandwidth available! Next read was delayed for %llu ms\n",
303 s, GNUNET_TIME_absolute_get_difference(s->delay, GNUNET_TIME_absolute_get()).rel_value); 312 s, GNUNET_TIME_absolute_get_difference(s->delay, GNUNET_TIME_absolute_get()).rel_value);
304#endif 313#endif
305 return 0; 314 return 0;
306 } 315 }
307 316
308 if (s->msg_tk == NULL) 317 if (s->msg_tk == NULL)
309 s->msg_tk = GNUNET_SERVER_mst_create (&curl_receive_mst_cb, s); 318 s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s);
310 319
311 GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, size * nmemb, GNUNET_NO, 320 GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, size * nmemb, GNUNET_NO,
312 GNUNET_NO); 321 GNUNET_NO);
@@ -329,30 +338,30 @@ curl_receive_cb (void *stream, size_t size, size_t nmemb, void *cls)
329 * @return bytes written to stream 338 * @return bytes written to stream
330 */ 339 */
331static size_t 340static size_t
332curl_send_cb (void *stream, size_t size, size_t nmemb, void *ptr) 341client_send_cb (void *stream, size_t size, size_t nmemb, void *cls)
333{ 342{
343 struct Session *s = cls;
344 //struct Plugin *plugin = s->plugin;
334 size_t bytes_sent = 0; 345 size_t bytes_sent = 0;
335
336#if 0
337 struct Session *ps = ptr;
338 struct HTTP_Message *msg = ps->pending_msgs_tail;
339
340 size_t len; 346 size_t len;
341 347
342 if (ps->send_active == GNUNET_NO) 348 struct HTTP_Message *msg = s->msg_head;
349/*
350 if (s->put_paused == GNUNET_NO)
343 return CURL_READFUNC_PAUSE; 351 return CURL_READFUNC_PAUSE;
344 if ((ps->pending_msgs_tail == NULL) && (ps->send_active == GNUNET_YES)) 352 if ((s->msg_head == NULL) && (s->put_paused == GNUNET_YES))
345 { 353 {
346#if DEBUG_CONNECTIONS 354#if VERBOSE_CLIENT
347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 355 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Suspending handle `%s' `%s'\n",
348 "Connection %X: No Message to send, pausing connection\n", ps); 356 GNUNET_i2s (&s->target),GNUNET_a2s (s->addr, s->addrlen));
349#endif 357#endif
350 ps->send_active = GNUNET_NO; 358 s->put_paused = GNUNET_NO;
351 return CURL_READFUNC_PAUSE; 359 return CURL_READFUNC_PAUSE;
352 } 360 }
353 361*/
362 if (msg == NULL)
363 return bytes_sent;
354 GNUNET_assert (msg != NULL); 364 GNUNET_assert (msg != NULL);
355
356 /* data to send */ 365 /* data to send */
357 if (msg->pos < msg->size) 366 if (msg->pos < msg->size)
358 { 367 {
@@ -383,17 +392,14 @@ curl_send_cb (void *stream, size_t size, size_t nmemb, void *ptr)
383#if DEBUG_CONNECTIONS 392#if DEBUG_CONNECTIONS
384 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 393 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
385 "Connection %X: Message with %u bytes sent, removing message from queue\n", 394 "Connection %X: Message with %u bytes sent, removing message from queue\n",
386 ps, msg->pos); 395 s, msg->pos);
387#endif 396#endif
388 /* Calling transmit continuation */ 397 /* Calling transmit continuation */
389 if (NULL != ps->pending_msgs_tail->transmit_cont) 398 if (NULL != msg->transmit_cont)
390 msg->transmit_cont (ps->pending_msgs_tail->transmit_cont_cls, 399 msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK);
391 &(ps->peercontext)->identity, GNUNET_OK); 400 GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg);
392 ps->queue_length_cur -= msg->size; 401 GNUNET_free (msg);
393 remove_http_message (ps, msg);
394 } 402 }
395
396#endif
397 return bytes_sent; 403 return bytes_sent;
398} 404}
399 405
@@ -423,7 +429,7 @@ client_connect (struct Session *s)
423#endif 429#endif
424 /* create get connection */ 430 /* create get connection */
425 s->client_get = curl_easy_init (); 431 s->client_get = curl_easy_init ();
426#if VERBOSE_CLIENT 432#if VERBOSE_CURL
427 curl_easy_setopt (s->client_get, CURLOPT_VERBOSE, 1L); 433 curl_easy_setopt (s->client_get, CURLOPT_VERBOSE, 1L);
428 curl_easy_setopt (s->client_get, CURLOPT_DEBUGFUNCTION, &client_log); 434 curl_easy_setopt (s->client_get, CURLOPT_DEBUGFUNCTION, &client_log);
429 curl_easy_setopt (s->client_get, CURLOPT_DEBUGDATA, s->client_get); 435 curl_easy_setopt (s->client_get, CURLOPT_DEBUGDATA, s->client_get);
@@ -436,9 +442,9 @@ client_connect (struct Session *s)
436 curl_easy_setopt (s->client_get, CURLOPT_URL, url); 442 curl_easy_setopt (s->client_get, CURLOPT_URL, url);
437 //curl_easy_setopt (s->client_get, CURLOPT_HEADERFUNCTION, &curl_get_header_cb); 443 //curl_easy_setopt (s->client_get, CURLOPT_HEADERFUNCTION, &curl_get_header_cb);
438 //curl_easy_setopt (s->client_get, CURLOPT_WRITEHEADER, ps); 444 //curl_easy_setopt (s->client_get, CURLOPT_WRITEHEADER, ps);
439 curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, curl_send_cb); 445 curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, client_send_cb);
440 curl_easy_setopt (s->client_get, CURLOPT_READDATA, s); 446 curl_easy_setopt (s->client_get, CURLOPT_READDATA, s);
441 curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb); 447 curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, client_receive);
442 curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, s); 448 curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, s);
443 curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT_MS, 449 curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT_MS,
444 (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); 450 (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
@@ -453,7 +459,7 @@ client_connect (struct Session *s)
453 459
454 /* create put connection */ 460 /* create put connection */
455 s->client_put = curl_easy_init (); 461 s->client_put = curl_easy_init ();
456#if VERBOSE_CLIENT 462#if VERBOSE_CURL
457 curl_easy_setopt (s->client_put, CURLOPT_VERBOSE, 1L); 463 curl_easy_setopt (s->client_put, CURLOPT_VERBOSE, 1L);
458 curl_easy_setopt (s->client_put, CURLOPT_DEBUGFUNCTION, &client_log); 464 curl_easy_setopt (s->client_put, CURLOPT_DEBUGFUNCTION, &client_log);
459 curl_easy_setopt (s->client_put, CURLOPT_DEBUGDATA, s->client_put); 465 curl_easy_setopt (s->client_put, CURLOPT_DEBUGDATA, s->client_put);
@@ -467,9 +473,9 @@ client_connect (struct Session *s)
467 curl_easy_setopt (s->client_put, CURLOPT_PUT, 1L); 473 curl_easy_setopt (s->client_put, CURLOPT_PUT, 1L);
468 //curl_easy_setopt (s->client_put, CURLOPT_HEADERFUNCTION, &curl_put_header_cb); 474 //curl_easy_setopt (s->client_put, CURLOPT_HEADERFUNCTION, &curl_put_header_cb);
469 //curl_easy_setopt (s->client_put, CURLOPT_WRITEHEADER, ps); 475 //curl_easy_setopt (s->client_put, CURLOPT_WRITEHEADER, ps);
470 curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, curl_send_cb); 476 curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, client_send_cb);
471 curl_easy_setopt (s->client_put, CURLOPT_READDATA, s); 477 curl_easy_setopt (s->client_put, CURLOPT_READDATA, s);
472 curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb); 478 curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, client_receive);
473 curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, s); 479 curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, s);
474 curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT_MS, 480 curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT_MS,
475 (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); 481 (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
diff --git a/src/transport/plugin_transport_http_new.c b/src/transport/plugin_transport_http_new.c
index 719182cc0..48477b809 100644
--- a/src/transport/plugin_transport_http_new.c
+++ b/src/transport/plugin_transport_http_new.c
@@ -292,6 +292,27 @@ http_plugin_address_suggested (void *cls, const void *addr, size_t addrlen)
292 return GNUNET_SYSERR; 292 return GNUNET_SYSERR;
293} 293}
294 294
295struct GNUNET_TIME_Relative
296http_plugin_receive (void *cls, const struct GNUNET_PeerIdentity * peer,
297 const struct GNUNET_MessageHeader * message,
298 struct Session * session,
299 const char *sender_address,
300 uint16_t sender_address_len)
301{
302 struct Session *s = cls;
303 struct Plugin *plugin = s->plugin;
304 struct GNUNET_TRANSPORT_ATS_Information distance[2];
305 struct GNUNET_TIME_Relative delay;
306
307 distance[0].type = htonl (GNUNET_TRANSPORT_ATS_QUALITY_NET_DISTANCE);
308 distance[0].value = htonl (1);
309 distance[1].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
310 distance[1].value = htonl (0);
311
312 delay = plugin->env->receive (plugin->env->cls, &s->target, message, (const struct GNUNET_TRANSPORT_ATS_Information*) &distance, 2, s, s->addr, s->addrlen);
313 return delay;
314}
315
295/** 316/**
296 * Function called for a quick conversion of the binary address to 317 * Function called for a quick conversion of the binary address to
297 * a numeric address. Note that the caller must not free the 318 * a numeric address. Note that the caller must not free the
@@ -425,7 +446,7 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
425 s->transmit_cont = cont; 446 s->transmit_cont = cont;
426 s->transmit_cont_cls = cont_cls; 447 s->transmit_cont_cls = cont_cls;
427 s->next = NULL; 448 s->next = NULL;
428 449 s->delay = GNUNET_TIME_absolute_get_forever();
429 return s; 450 return s;
430} 451}
431 452
@@ -486,7 +507,7 @@ http_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target,
486 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) 507 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
487{ 508{
488 struct Plugin *plugin = cls; 509 struct Plugin *plugin = cls;
489 510 struct HTTP_Message *msg;
490 GNUNET_assert (plugin != NULL); 511 GNUNET_assert (plugin != NULL);
491 512
492 int res = GNUNET_SYSERR; 513 int res = GNUNET_SYSERR;
@@ -529,10 +550,20 @@ http_plugin_send (void *cls, const struct GNUNET_PeerIdentity *target,
529 return GNUNET_SYSERR; 550 return GNUNET_SYSERR;
530 } 551 }
531 } 552 }
532 else if (s->inbound == GNUNET_NO) 553
533 res = client_send (s, msgbuf, msgbuf_size); 554 msg = GNUNET_malloc (sizeof (struct HTTP_Message) + msgbuf_size);
534 else if (s->inbound == GNUNET_YES) 555 msg->next = NULL;
535 res = server_send (s, msgbuf, msgbuf_size); 556 msg->size = msgbuf_size;
557 msg->pos = 0;
558 msg->buf = (char *) &msg[1];
559 msg->transmit_cont = cont;
560 msg->transmit_cont_cls = cont_cls;
561 memcpy (msg->buf, msgbuf, msgbuf_size);
562
563 if (s->inbound == GNUNET_NO)
564 res = client_send (s, msg);
565 if (s->inbound == GNUNET_YES)
566 res = server_send (s, msg);
536 567
537 return res; 568 return res;
538} 569}
diff --git a/src/transport/plugin_transport_http_server.c b/src/transport/plugin_transport_http_server.c
index 43d9171eb..98fbfda1c 100644
--- a/src/transport/plugin_transport_http_server.c
+++ b/src/transport/plugin_transport_http_server.c
@@ -232,6 +232,81 @@ server_load_certificate (struct Plugin *plugin)
232 232
233 233
234/** 234/**
235 * Callback called by MessageStreamTokenizer when a message has arrived
236 * @param cls current session as closure
237 * @param client clien
238 * @param message the message to be forwarded to transport service
239 */
240static void
241server_receive_mst_cb (void *cls, void *client,
242 const struct GNUNET_MessageHeader *message)
243{
244 struct Session *s = cls;
245 struct Plugin *plugin = s->plugin;
246 struct GNUNET_TIME_Relative delay;
247
248 delay = http_plugin_receive (s, &s->target, message, s, s->addr, s->addrlen);
249
250 s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay);
251
252 if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
253 {
254#if VERBOSE_CLIENT
255 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Server: peer `%s' address `%s' next read delayed for %llu ms\n",
256 GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen), delay);
257#endif
258 }
259}
260
261/**
262 * Callback called by MHD when it needs data to send
263 * @param cls current session
264 * @param pos position in buffer
265 * @param buf the buffer to write data to
266 * @param max max number of bytes available in buffer
267 * @return bytes written to buffer
268 */
269static ssize_t
270mhd_send_callback (void *cls, uint64_t pos, char *buf, size_t max)
271{
272 struct Session *s = cls;
273 struct HTTP_Message *msg;
274 int bytes_read = 0;
275
276 msg = s->msg_head;
277 if (msg != NULL)
278 {
279 /* sending */
280 if ((msg->size - msg->pos) <= max)
281 {
282 memcpy (buf, &msg->buf[msg->pos], (msg->size - msg->pos));
283 bytes_read = msg->size - msg->pos;
284 msg->pos += (msg->size - msg->pos);
285 }
286 else
287 {
288 memcpy (buf, &msg->buf[msg->pos], max);
289 msg->pos += max;
290 bytes_read = max;
291 }
292
293 /* removing message */
294 if (msg->pos == msg->size)
295 {
296 if (NULL != msg->transmit_cont)
297 msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK);
298 GNUNET_CONTAINER_DLL_remove(s->msg_head, s->msg_tail, msg);
299 GNUNET_free (msg);
300 }
301 }
302#if DEBUG_CONNECTIONS
303 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connection %X: MHD has sent %u bytes\n",
304 s, bytes_read);
305#endif
306 return bytes_read;
307}
308
309/**
235 * Process GET or PUT request received via MHD. For 310 * Process GET or PUT request received via MHD. For
236 * GET, queue response that will send back our pending 311 * GET, queue response that will send back our pending
237 * messages. For PUT, process incoming data and send 312 * messages. For PUT, process incoming data and send
@@ -403,11 +478,7 @@ error:
403 res = MHD_queue_response (mhd_connection, MHD_HTTP_NOT_FOUND, response); 478 res = MHD_queue_response (mhd_connection, MHD_HTTP_NOT_FOUND, response);
404 MHD_destroy_response (response); 479 MHD_destroy_response (response);
405 return res; 480 return res;
406
407
408found: 481found:
409
410
411 sc = GNUNET_malloc (sizeof (struct ServerConnection)); 482 sc = GNUNET_malloc (sizeof (struct ServerConnection));
412 sc->mhd_conn = mhd_connection; 483 sc->mhd_conn = mhd_connection;
413 sc->direction = direction; 484 sc->direction = direction;
@@ -418,8 +489,9 @@ found:
418 s->server_recv = sc; 489 s->server_recv = sc;
419 490
420 (*httpSessionCache) = sc; 491 (*httpSessionCache) = sc;
421 return MHD_YES;
422 } 492 }
493
494
423 /* existing connection */ 495 /* existing connection */
424 sc = (*httpSessionCache); 496 sc = (*httpSessionCache);
425 s = sc->session; 497 s = sc->session;
@@ -437,6 +509,67 @@ found:
437 return MHD_YES; 509 return MHD_YES;
438 } 510 }
439 511
512 GNUNET_assert (s != NULL);
513 if (sc->direction == _SEND)
514 {
515 response =
516 MHD_create_response_from_callback (-1, 32 * 1024, &mhd_send_callback,
517 s, NULL);
518 res = MHD_queue_response (mhd_connection, MHD_HTTP_OK, response);
519 MHD_destroy_response (response);
520 return MHD_YES;
521 }
522 if (sc->direction == _RECEIVE)
523 {
524 if (*upload_data_size == 0)
525 {
526#if VERBOSE_SERVER
527 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
528 "Server: peer `%s' PUT on address `%s' connected\n",
529 GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen));
530#endif
531 return MHD_YES;
532 }
533
534 /* Recieving data */
535 if ((*upload_data_size > 0))
536 {
537#if VERBOSE_SERVER
538 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
539 "Server: peer `%s' PUT on address `%s' received %u bytes\n",
540 GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen));
541#endif
542 if ((GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value))
543 {
544 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545 "Connection %X: PUT with %u bytes forwarded to MST\n", s,
546 *upload_data_size);
547
548 if (s->msg_tk == NULL)
549 {
550 s->msg_tk = GNUNET_SERVER_mst_create (&server_receive_mst_cb, s);
551 }
552 res = GNUNET_SERVER_mst_receive (s->msg_tk, s, upload_data, *upload_data_size, GNUNET_NO, GNUNET_NO);
553 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554 "Server: Received %u bytes\n",
555 *upload_data_size);
556 (*upload_data_size) = 0;
557 }
558 else
559 {
560/*
561#if DEBUG_HTTP
562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
563 "Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n",
564 s, ps->peercontext->delay.rel_value);
565#endif
566*/
567 }
568 return MHD_YES;
569 }
570 else
571 return MHD_NO;
572 }
440 return res; 573 return res;
441} 574}
442 575
@@ -484,6 +617,8 @@ server_disconnect_cb (void *cls, struct MHD_Connection *connection,
484 tc = s->server_send; 617 tc = s->server_send;
485 tc->disconnect = GNUNET_YES; 618 tc->disconnect = GNUNET_YES;
486 } 619 }
620 if (s->msg_tk != NULL)
621 GNUNET_SERVER_mst_destroy(s->msg_tk);
487 } 622 }
488 GNUNET_free (sc); 623 GNUNET_free (sc);
489 624
@@ -500,6 +635,7 @@ server_disconnect_cb (void *cls, struct MHD_Connection *connection,
500 } 635 }
501 plugin->cur_connections--; 636 plugin->cur_connections--;
502 637
638
503 if ((s->server_send == NULL) && (s->server_recv == NULL)) 639 if ((s->server_send == NULL) && (s->server_recv == NULL))
504 { 640 {
505#if VERBOSE_SERVER 641#if VERBOSE_SERVER
@@ -507,6 +643,7 @@ server_disconnect_cb (void *cls, struct MHD_Connection *connection,
507 "Server: peer `%s' on address `%s' disconnected\n", 643 "Server: peer `%s' on address `%s' disconnected\n",
508 GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen)); 644 GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen));
509#endif 645#endif
646
510 notify_session_end(s->plugin, &s->target, s); 647 notify_session_end(s->plugin, &s->target, s);
511 } 648 }
512} 649}
@@ -538,8 +675,9 @@ server_disconnect (struct Session *s)
538} 675}
539 676
540int 677int
541server_send (struct Session *s, const char *msgbuf, size_t msgbuf_size) 678server_send (struct Session *s, struct HTTP_Message * msg)
542{ 679{
680 GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg);
543 return GNUNET_OK; 681 return GNUNET_OK;
544} 682}
545 683
@@ -809,6 +947,8 @@ server_stop (struct Plugin *plugin)
809 while (s != NULL) 947 while (s != NULL)
810 { 948 {
811 t = s->next; 949 t = s->next;
950 if (s->msg_tk != NULL)
951 GNUNET_SERVER_mst_destroy(s->msg_tk);
812 delete_session (s); 952 delete_session (s);
813 s = t; 953 s = t;
814 } 954 }