diff options
author | Matthias Wachs <wachs@net.in.tum.de> | 2011-09-16 14:24:29 +0000 |
---|---|---|
committer | Matthias Wachs <wachs@net.in.tum.de> | 2011-09-16 14:24:29 +0000 |
commit | acb0b0077234212372463cc24366ac79e4bcceb6 (patch) | |
tree | d00a00cdf70a4d47b225090b10bd18fe24d6e908 /src | |
parent | abcb5dfce1f7b5ac066de37e1ec8d32463c04c9a (diff) | |
download | gnunet-acb0b0077234212372463cc24366ac79e4bcceb6.tar.gz gnunet-acb0b0077234212372463cc24366ac79e4bcceb6.zip |
client sending & receiving
Diffstat (limited to 'src')
-rw-r--r-- | src/transport/plugin_transport_http.h | 9 | ||||
-rw-r--r-- | src/transport/plugin_transport_http_client.c | 162 | ||||
-rw-r--r-- | src/transport/plugin_transport_http_server.c | 4 |
3 files changed, 162 insertions, 13 deletions
diff --git a/src/transport/plugin_transport_http.h b/src/transport/plugin_transport_http.h index 11b369d40..b7b89e6e6 100644 --- a/src/transport/plugin_transport_http.h +++ b/src/transport/plugin_transport_http.h | |||
@@ -82,6 +82,7 @@ struct Plugin | |||
82 | */ | 82 | */ |
83 | struct GNUNET_NAT_Handle *nat; | 83 | struct GNUNET_NAT_Handle *nat; |
84 | 84 | ||
85 | |||
85 | /** | 86 | /** |
86 | * ipv4 DLL head | 87 | * ipv4 DLL head |
87 | */ | 88 | */ |
@@ -124,7 +125,6 @@ struct Plugin | |||
124 | 125 | ||
125 | int cur_connections; | 126 | int cur_connections; |
126 | uint32_t last_tag; | 127 | uint32_t last_tag; |
127 | |||
128 | /* | 128 | /* |
129 | * Server handles | 129 | * Server handles |
130 | */ | 130 | */ |
@@ -180,9 +180,9 @@ struct Session | |||
180 | struct Plugin *plugin; | 180 | struct Plugin *plugin; |
181 | 181 | ||
182 | /** | 182 | /** |
183 | * The client (used to identify this connection) | 183 | * message stream tokenizer for incoming data |
184 | */ | 184 | */ |
185 | /* void *client; */ | 185 | struct GNUNET_SERVER_MessageStreamTokenizer *msg_tk; |
186 | 186 | ||
187 | /** | 187 | /** |
188 | * Continuation function to call once the transmission buffer | 188 | * Continuation function to call once the transmission buffer |
@@ -232,7 +232,8 @@ struct Session | |||
232 | 232 | ||
233 | void *server_recv; | 233 | void *server_recv; |
234 | void *server_send; | 234 | void *server_send; |
235 | 235 | struct GNUNET_TIME_Absolute delay; | |
236 | GNUNET_SCHEDULER_TaskIdentifier reset_task; | ||
236 | uint32_t tag; | 237 | uint32_t tag; |
237 | 238 | ||
238 | }; | 239 | }; |
diff --git a/src/transport/plugin_transport_http_client.c b/src/transport/plugin_transport_http_client.c index c2962394f..04a985906 100644 --- a/src/transport/plugin_transport_http_client.c +++ b/src/transport/plugin_transport_http_client.c | |||
@@ -183,7 +183,9 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
183 | "Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), http_plugin_address_to_string(plugin, s->addr, s->addrlen)); | 183 | "Connection to '%s' %s ended\n", GNUNET_i2s(&s->target), http_plugin_address_to_string(plugin, s->addr, s->addrlen)); |
184 | #endif | 184 | #endif |
185 | client_disconnect(s); | 185 | client_disconnect(s); |
186 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen)); | 186 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen)); |
187 | if (s->msg_tk != NULL) | ||
188 | GNUNET_SERVER_mst_destroy (s->msg_tk); | ||
187 | notify_session_end (plugin, &s->target, s); | 189 | notify_session_end (plugin, &s->target, s); |
188 | } | 190 | } |
189 | } | 191 | } |
@@ -252,6 +254,148 @@ client_disconnect (struct Session *s) | |||
252 | return res; | 254 | return res; |
253 | } | 255 | } |
254 | 256 | ||
257 | static void | ||
258 | curl_receive_mst_cb (void *cls, void *client, | ||
259 | const struct GNUNET_MessageHeader *message) | ||
260 | { | ||
261 | struct Session *s = cls; | ||
262 | struct Plugin *plugin = s->plugin; | ||
263 | struct GNUNET_TRANSPORT_ATS_Information distance[2]; | ||
264 | struct GNUNET_TIME_Relative delay; | ||
265 | |||
266 | distance[0].type = htonl (GNUNET_TRANSPORT_ATS_QUALITY_NET_DISTANCE); | ||
267 | distance[0].value = htonl (1); | ||
268 | distance[1].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); | ||
269 | distance[1].value = htonl (0); | ||
270 | |||
271 | delay = plugin->env->receive (plugin->env->cls, &s->target, message, (const struct GNUNET_TRANSPORT_ATS_Information*) &distance, 2, s, s->addr, s->addrlen); | ||
272 | s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay); | ||
273 | |||
274 | if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value) | ||
275 | { | ||
276 | #if VERBOSE_CLIENT | ||
277 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: peer `%s' address `%s' next read delayed for %llu ms\n", | ||
278 | GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen), delay); | ||
279 | #endif | ||
280 | } | ||
281 | } | ||
282 | |||
283 | /** | ||
284 | * Callback method used with libcurl | ||
285 | * Method is called when libcurl needs to write data during sending | ||
286 | * @param stream pointer where to write data | ||
287 | * @param size size of an individual element | ||
288 | * @param nmemb count of elements that can be written to the buffer | ||
289 | * @param ptr destination pointer, passed to the libcurl handle | ||
290 | * @return bytes read from stream | ||
291 | */ | ||
292 | static size_t | ||
293 | curl_receive_cb (void *stream, size_t size, size_t nmemb, void *cls) | ||
294 | { | ||
295 | struct Session *s = cls; | ||
296 | struct Plugin *plugin = s->plugin; | ||
297 | |||
298 | if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value) | ||
299 | { | ||
300 | #if DEBUG_HTTP | ||
301 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
302 | "Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n", | ||
303 | s, GNUNET_TIME_absolute_get_difference(s->delay, GNUNET_TIME_absolute_get()).rel_value); | ||
304 | #endif | ||
305 | return 0; | ||
306 | } | ||
307 | |||
308 | if (s->msg_tk == NULL) | ||
309 | s->msg_tk = GNUNET_SERVER_mst_create (&curl_receive_mst_cb, s); | ||
310 | |||
311 | GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, size * nmemb, GNUNET_NO, | ||
312 | GNUNET_NO); | ||
313 | |||
314 | #if VERBOSE_CLIENT | ||
315 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Received %u bytes from peer `%s'\n", | ||
316 | size * nmemb, | ||
317 | GNUNET_i2s (&s->target)); | ||
318 | #endif | ||
319 | return (size * nmemb); | ||
320 | } | ||
321 | |||
322 | /** | ||
323 | * Callback method used with libcurl | ||
324 | * Method is called when libcurl needs to read data during sending | ||
325 | * @param stream pointer where to write data | ||
326 | * @param size size of an individual element | ||
327 | * @param nmemb count of elements that can be written to the buffer | ||
328 | * @param ptr source pointer, passed to the libcurl handle | ||
329 | * @return bytes written to stream | ||
330 | */ | ||
331 | static size_t | ||
332 | curl_send_cb (void *stream, size_t size, size_t nmemb, void *ptr) | ||
333 | { | ||
334 | size_t bytes_sent = 0; | ||
335 | |||
336 | #if 0 | ||
337 | struct Session *ps = ptr; | ||
338 | struct HTTP_Message *msg = ps->pending_msgs_tail; | ||
339 | |||
340 | size_t len; | ||
341 | |||
342 | if (ps->send_active == GNUNET_NO) | ||
343 | return CURL_READFUNC_PAUSE; | ||
344 | if ((ps->pending_msgs_tail == NULL) && (ps->send_active == GNUNET_YES)) | ||
345 | { | ||
346 | #if DEBUG_CONNECTIONS | ||
347 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
348 | "Connection %X: No Message to send, pausing connection\n", ps); | ||
349 | #endif | ||
350 | ps->send_active = GNUNET_NO; | ||
351 | return CURL_READFUNC_PAUSE; | ||
352 | } | ||
353 | |||
354 | GNUNET_assert (msg != NULL); | ||
355 | |||
356 | /* data to send */ | ||
357 | if (msg->pos < msg->size) | ||
358 | { | ||
359 | /* data fit in buffer */ | ||
360 | if ((msg->size - msg->pos) <= (size * nmemb)) | ||
361 | { | ||
362 | len = (msg->size - msg->pos); | ||
363 | memcpy (stream, &msg->buf[msg->pos], len); | ||
364 | msg->pos += len; | ||
365 | bytes_sent = len; | ||
366 | } | ||
367 | else | ||
368 | { | ||
369 | len = size * nmemb; | ||
370 | memcpy (stream, &msg->buf[msg->pos], len); | ||
371 | msg->pos += len; | ||
372 | bytes_sent = len; | ||
373 | } | ||
374 | } | ||
375 | /* no data to send */ | ||
376 | else | ||
377 | { | ||
378 | bytes_sent = 0; | ||
379 | } | ||
380 | |||
381 | if (msg->pos == msg->size) | ||
382 | { | ||
383 | #if DEBUG_CONNECTIONS | ||
384 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
385 | "Connection %X: Message with %u bytes sent, removing message from queue\n", | ||
386 | ps, msg->pos); | ||
387 | #endif | ||
388 | /* Calling transmit continuation */ | ||
389 | if (NULL != ps->pending_msgs_tail->transmit_cont) | ||
390 | msg->transmit_cont (ps->pending_msgs_tail->transmit_cont_cls, | ||
391 | &(ps->peercontext)->identity, GNUNET_OK); | ||
392 | ps->queue_length_cur -= msg->size; | ||
393 | remove_http_message (ps, msg); | ||
394 | } | ||
395 | |||
396 | #endif | ||
397 | return bytes_sent; | ||
398 | } | ||
255 | 399 | ||
256 | int | 400 | int |
257 | client_connect (struct Session *s) | 401 | client_connect (struct Session *s) |
@@ -292,10 +436,10 @@ client_connect (struct Session *s) | |||
292 | curl_easy_setopt (s->client_get, CURLOPT_URL, url); | 436 | curl_easy_setopt (s->client_get, CURLOPT_URL, url); |
293 | //curl_easy_setopt (s->client_get, CURLOPT_HEADERFUNCTION, &curl_get_header_cb); | 437 | //curl_easy_setopt (s->client_get, CURLOPT_HEADERFUNCTION, &curl_get_header_cb); |
294 | //curl_easy_setopt (s->client_get, CURLOPT_WRITEHEADER, ps); | 438 | //curl_easy_setopt (s->client_get, CURLOPT_WRITEHEADER, ps); |
295 | //curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, curl_send_cb); | 439 | curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, curl_send_cb); |
296 | //curl_easy_setopt (s->client_get, CURLOPT_READDATA, ps); | 440 | curl_easy_setopt (s->client_get, CURLOPT_READDATA, s); |
297 | //curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb); | 441 | curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb); |
298 | //curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, ps); | 442 | curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, s); |
299 | curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT_MS, | 443 | curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT_MS, |
300 | (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); | 444 | (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); |
301 | curl_easy_setopt (s->client_get, CURLOPT_PRIVATE, s); | 445 | curl_easy_setopt (s->client_get, CURLOPT_PRIVATE, s); |
@@ -323,10 +467,10 @@ client_connect (struct Session *s) | |||
323 | curl_easy_setopt (s->client_put, CURLOPT_PUT, 1L); | 467 | curl_easy_setopt (s->client_put, CURLOPT_PUT, 1L); |
324 | //curl_easy_setopt (s->client_put, CURLOPT_HEADERFUNCTION, &curl_put_header_cb); | 468 | //curl_easy_setopt (s->client_put, CURLOPT_HEADERFUNCTION, &curl_put_header_cb); |
325 | //curl_easy_setopt (s->client_put, CURLOPT_WRITEHEADER, ps); | 469 | //curl_easy_setopt (s->client_put, CURLOPT_WRITEHEADER, ps); |
326 | //curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, curl_send_cb); | 470 | curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, curl_send_cb); |
327 | //curl_easy_setopt (s->client_put, CURLOPT_READDATA, ps); | 471 | curl_easy_setopt (s->client_put, CURLOPT_READDATA, s); |
328 | //curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb); | 472 | curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb); |
329 | //curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, ps); | 473 | curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, s); |
330 | curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT_MS, | 474 | curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT_MS, |
331 | (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); | 475 | (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); |
332 | curl_easy_setopt (s->client_put, CURLOPT_PRIVATE, s); | 476 | curl_easy_setopt (s->client_put, CURLOPT_PRIVATE, s); |
diff --git a/src/transport/plugin_transport_http_server.c b/src/transport/plugin_transport_http_server.c index df164aa4b..43d9171eb 100644 --- a/src/transport/plugin_transport_http_server.c +++ b/src/transport/plugin_transport_http_server.c | |||
@@ -292,6 +292,9 @@ server_access_cb (void *cls, struct MHD_Connection *mhd_connection, | |||
292 | 292 | ||
293 | if (check == GNUNET_NO) | 293 | if (check == GNUNET_NO) |
294 | goto error; | 294 | goto error; |
295 | |||
296 | plugin->cur_connections++; | ||
297 | |||
295 | #if VERBOSE_SERVER | 298 | #if VERBOSE_SERVER |
296 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: New inbound connection from %s with tag %u\n", GNUNET_h2s_full(&(target.hashPubKey)), tag); | 299 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: New inbound connection from %s with tag %u\n", GNUNET_h2s_full(&(target.hashPubKey)), tag); |
297 | #endif | 300 | #endif |
@@ -495,6 +498,7 @@ server_disconnect_cb (void *cls, struct MHD_Connection *connection, | |||
495 | } | 498 | } |
496 | t = t->next; | 499 | t = t->next; |
497 | } | 500 | } |
501 | plugin->cur_connections--; | ||
498 | 502 | ||
499 | if ((s->server_send == NULL) && (s->server_recv == NULL)) | 503 | if ((s->server_send == NULL) && (s->server_recv == NULL)) |
500 | { | 504 | { |