aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Wachs <wachs@net.in.tum.de>2011-09-16 14:24:29 +0000
committerMatthias Wachs <wachs@net.in.tum.de>2011-09-16 14:24:29 +0000
commitacb0b0077234212372463cc24366ac79e4bcceb6 (patch)
treed00a00cdf70a4d47b225090b10bd18fe24d6e908 /src
parentabcb5dfce1f7b5ac066de37e1ec8d32463c04c9a (diff)
downloadgnunet-acb0b0077234212372463cc24366ac79e4bcceb6.tar.gz
gnunet-acb0b0077234212372463cc24366ac79e4bcceb6.zip
client sending & receiving
Diffstat (limited to 'src')
-rw-r--r--src/transport/plugin_transport_http.h9
-rw-r--r--src/transport/plugin_transport_http_client.c162
-rw-r--r--src/transport/plugin_transport_http_server.c4
3 files changed, 162 insertions, 13 deletions
diff --git a/src/transport/plugin_transport_http.h b/src/transport/plugin_transport_http.h
index 11b369d40..b7b89e6e6 100644
--- a/src/transport/plugin_transport_http.h
+++ b/src/transport/plugin_transport_http.h
@@ -82,6 +82,7 @@ struct Plugin
82 */ 82 */
83 struct GNUNET_NAT_Handle *nat; 83 struct GNUNET_NAT_Handle *nat;
84 84
85
85 /** 86 /**
86 * ipv4 DLL head 87 * ipv4 DLL head
87 */ 88 */
@@ -124,7 +125,6 @@ struct Plugin
124 125
125 int cur_connections; 126 int cur_connections;
126 uint32_t last_tag; 127 uint32_t last_tag;
127
128 /* 128 /*
129 * Server handles 129 * Server handles
130 */ 130 */
@@ -180,9 +180,9 @@ struct Session
180 struct Plugin *plugin; 180 struct Plugin *plugin;
181 181
182 /** 182 /**
183 * The client (used to identify this connection) 183 * message stream tokenizer for incoming data
184 */ 184 */
185 /* void *client; */ 185 struct GNUNET_SERVER_MessageStreamTokenizer *msg_tk;
186 186
187 /** 187 /**
188 * Continuation function to call once the transmission buffer 188 * Continuation function to call once the transmission buffer
@@ -232,7 +232,8 @@ struct Session
232 232
233 void *server_recv; 233 void *server_recv;
234 void *server_send; 234 void *server_send;
235 235 struct GNUNET_TIME_Absolute delay;
236 GNUNET_SCHEDULER_TaskIdentifier reset_task;
236 uint32_t tag; 237 uint32_t tag;
237 238
238}; 239};
diff --git a/src/transport/plugin_transport_http_client.c b/src/transport/plugin_transport_http_client.c
index c2962394f..04a985906 100644
--- a/src/transport/plugin_transport_http_client.c
+++ b/src/transport/plugin_transport_http_client.c
@@ -183,7 +183,9 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
183 "Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), http_plugin_address_to_string(plugin, s->addr, s->addrlen)); 183 "Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), http_plugin_address_to_string(plugin, s->addr, s->addrlen));
184#endif 184#endif
185 client_disconnect(s); 185 client_disconnect(s);
186 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen)); 186 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen));
187 if (s->msg_tk != NULL)
188 GNUNET_SERVER_mst_destroy (s->msg_tk);
187 notify_session_end (plugin, &s->target, s); 189 notify_session_end (plugin, &s->target, s);
188 } 190 }
189 } 191 }
@@ -252,6 +254,148 @@ client_disconnect (struct Session *s)
252 return res; 254 return res;
253} 255}
254 256
257static void
258curl_receive_mst_cb (void *cls, void *client,
259 const struct GNUNET_MessageHeader *message)
260{
261 struct Session *s = cls;
262 struct Plugin *plugin = s->plugin;
263 struct GNUNET_TRANSPORT_ATS_Information distance[2];
264 struct GNUNET_TIME_Relative delay;
265
266 distance[0].type = htonl (GNUNET_TRANSPORT_ATS_QUALITY_NET_DISTANCE);
267 distance[0].value = htonl (1);
268 distance[1].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
269 distance[1].value = htonl (0);
270
271 delay = plugin->env->receive (plugin->env->cls, &s->target, message, (const struct GNUNET_TRANSPORT_ATS_Information*) &distance, 2, s, s->addr, s->addrlen);
272 s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay);
273
274 if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
275 {
276#if VERBOSE_CLIENT
277 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: peer `%s' address `%s' next read delayed for %llu ms\n",
278 GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen), delay);
279#endif
280 }
281}
282
283/**
284* Callback method used with libcurl
285* Method is called when libcurl needs to write data during sending
286* @param stream pointer where to write data
287* @param size size of an individual element
288* @param nmemb count of elements that can be written to the buffer
289* @param ptr destination pointer, passed to the libcurl handle
290* @return bytes read from stream
291*/
292static size_t
293curl_receive_cb (void *stream, size_t size, size_t nmemb, void *cls)
294{
295 struct Session *s = cls;
296 struct Plugin *plugin = s->plugin;
297
298 if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
299 {
300#if DEBUG_HTTP
301 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
302 "Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n",
303 s, GNUNET_TIME_absolute_get_difference(s->delay, GNUNET_TIME_absolute_get()).rel_value);
304#endif
305 return 0;
306 }
307
308 if (s->msg_tk == NULL)
309 s->msg_tk = GNUNET_SERVER_mst_create (&curl_receive_mst_cb, s);
310
311 GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, size * nmemb, GNUNET_NO,
312 GNUNET_NO);
313
314#if VERBOSE_CLIENT
315 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Received %u bytes from peer `%s'\n",
316 size * nmemb,
317 GNUNET_i2s (&s->target));
318#endif
319 return (size * nmemb);
320}
321
322/**
323 * Callback method used with libcurl
324 * Method is called when libcurl needs to read data during sending
325 * @param stream pointer where to write data
326 * @param size size of an individual element
327 * @param nmemb count of elements that can be written to the buffer
328 * @param ptr source pointer, passed to the libcurl handle
329 * @return bytes written to stream
330 */
331static size_t
332curl_send_cb (void *stream, size_t size, size_t nmemb, void *ptr)
333{
334 size_t bytes_sent = 0;
335
336#if 0
337 struct Session *ps = ptr;
338 struct HTTP_Message *msg = ps->pending_msgs_tail;
339
340 size_t len;
341
342 if (ps->send_active == GNUNET_NO)
343 return CURL_READFUNC_PAUSE;
344 if ((ps->pending_msgs_tail == NULL) && (ps->send_active == GNUNET_YES))
345 {
346#if DEBUG_CONNECTIONS
347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
348 "Connection %X: No Message to send, pausing connection\n", ps);
349#endif
350 ps->send_active = GNUNET_NO;
351 return CURL_READFUNC_PAUSE;
352 }
353
354 GNUNET_assert (msg != NULL);
355
356 /* data to send */
357 if (msg->pos < msg->size)
358 {
359 /* data fit in buffer */
360 if ((msg->size - msg->pos) <= (size * nmemb))
361 {
362 len = (msg->size - msg->pos);
363 memcpy (stream, &msg->buf[msg->pos], len);
364 msg->pos += len;
365 bytes_sent = len;
366 }
367 else
368 {
369 len = size * nmemb;
370 memcpy (stream, &msg->buf[msg->pos], len);
371 msg->pos += len;
372 bytes_sent = len;
373 }
374 }
375 /* no data to send */
376 else
377 {
378 bytes_sent = 0;
379 }
380
381 if (msg->pos == msg->size)
382 {
383#if DEBUG_CONNECTIONS
384 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
385 "Connection %X: Message with %u bytes sent, removing message from queue\n",
386 ps, msg->pos);
387#endif
388 /* Calling transmit continuation */
389 if (NULL != ps->pending_msgs_tail->transmit_cont)
390 msg->transmit_cont (ps->pending_msgs_tail->transmit_cont_cls,
391 &(ps->peercontext)->identity, GNUNET_OK);
392 ps->queue_length_cur -= msg->size;
393 remove_http_message (ps, msg);
394 }
395
396#endif
397 return bytes_sent;
398}
255 399
256int 400int
257client_connect (struct Session *s) 401client_connect (struct Session *s)
@@ -292,10 +436,10 @@ client_connect (struct Session *s)
292 curl_easy_setopt (s->client_get, CURLOPT_URL, url); 436 curl_easy_setopt (s->client_get, CURLOPT_URL, url);
293 //curl_easy_setopt (s->client_get, CURLOPT_HEADERFUNCTION, &curl_get_header_cb); 437 //curl_easy_setopt (s->client_get, CURLOPT_HEADERFUNCTION, &curl_get_header_cb);
294 //curl_easy_setopt (s->client_get, CURLOPT_WRITEHEADER, ps); 438 //curl_easy_setopt (s->client_get, CURLOPT_WRITEHEADER, ps);
295 //curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, curl_send_cb); 439 curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, curl_send_cb);
296 //curl_easy_setopt (s->client_get, CURLOPT_READDATA, ps); 440 curl_easy_setopt (s->client_get, CURLOPT_READDATA, s);
297 //curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb); 441 curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb);
298 //curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, ps); 442 curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, s);
299 curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT_MS, 443 curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT_MS,
300 (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); 444 (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
301 curl_easy_setopt (s->client_get, CURLOPT_PRIVATE, s); 445 curl_easy_setopt (s->client_get, CURLOPT_PRIVATE, s);
@@ -323,10 +467,10 @@ client_connect (struct Session *s)
323 curl_easy_setopt (s->client_put, CURLOPT_PUT, 1L); 467 curl_easy_setopt (s->client_put, CURLOPT_PUT, 1L);
324 //curl_easy_setopt (s->client_put, CURLOPT_HEADERFUNCTION, &curl_put_header_cb); 468 //curl_easy_setopt (s->client_put, CURLOPT_HEADERFUNCTION, &curl_put_header_cb);
325 //curl_easy_setopt (s->client_put, CURLOPT_WRITEHEADER, ps); 469 //curl_easy_setopt (s->client_put, CURLOPT_WRITEHEADER, ps);
326 //curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, curl_send_cb); 470 curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, curl_send_cb);
327 //curl_easy_setopt (s->client_put, CURLOPT_READDATA, ps); 471 curl_easy_setopt (s->client_put, CURLOPT_READDATA, s);
328 //curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb); 472 curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb);
329 //curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, ps); 473 curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, s);
330 curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT_MS, 474 curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT_MS,
331 (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); 475 (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
332 curl_easy_setopt (s->client_put, CURLOPT_PRIVATE, s); 476 curl_easy_setopt (s->client_put, CURLOPT_PRIVATE, s);
diff --git a/src/transport/plugin_transport_http_server.c b/src/transport/plugin_transport_http_server.c
index df164aa4b..43d9171eb 100644
--- a/src/transport/plugin_transport_http_server.c
+++ b/src/transport/plugin_transport_http_server.c
@@ -292,6 +292,9 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection,
292 292
293 if (check == GNUNET_NO) 293 if (check == GNUNET_NO)
294 goto error; 294 goto error;
295
296 plugin->cur_connections++;
297
295#if VERBOSE_SERVER 298#if VERBOSE_SERVER
296 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: New inbound connection from %s with tag %u\n", GNUNET_h2s_full(&(target.hashPubKey)), tag); 299 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: New inbound connection from %s with tag %u\n", GNUNET_h2s_full(&(target.hashPubKey)), tag);
297#endif 300#endif
@@ -495,6 +498,7 @@ server_disconnect_cb (void *cls, struct MHD_Connection *connection,
495 } 498 }
496 t = t->next; 499 t = t->next;
497 } 500 }
501 plugin->cur_connections--;
498 502
499 if ((s->server_send == NULL) && (s->server_recv == NULL)) 503 if ((s->server_send == NULL) && (s->server_recv == NULL))
500 { 504 {