aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/datastore/datastore.h2
-rw-r--r--src/datastore/datastore_api.c2
-rw-r--r--src/include/gnunet_core_service.h3
-rw-r--r--src/transport/gnunet-service-transport.c263
-rw-r--r--src/transport/plugin_transport.h85
-rw-r--r--src/transport/plugin_transport_tcp.c221
-rw-r--r--src/transport/plugin_transport_template.c18
-rw-r--r--src/transport/plugin_transport_udp.c18
-rw-r--r--src/transport/plugin_transport_udp_nat.c18
9 files changed, 230 insertions, 400 deletions
diff --git a/src/datastore/datastore.h b/src/datastore/datastore.h
index aa2646c0a..f827f8766 100644
--- a/src/datastore/datastore.h
+++ b/src/datastore/datastore.h
@@ -27,7 +27,7 @@
27#ifndef DATASTORE_H 27#ifndef DATASTORE_H
28#define DATASTORE_H 28#define DATASTORE_H
29 29
30#define DEBUG_DATASTORE GNUNET_NO 30#define DEBUG_DATASTORE GNUNET_YES
31 31
32#include "gnunet_util_lib.h" 32#include "gnunet_util_lib.h"
33 33
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index 064f36025..3b7c3a2ed 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -656,6 +656,7 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
656 { 656 {
657 gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode)); 657 gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
658 } 658 }
659 GNUNET_assert (h->response_proc == NULL);
659 transmit_for_result (h, iter, iter_cls, timeout); 660 transmit_for_result (h, iter, iter_cls, timeout);
660} 661}
661 662
@@ -680,6 +681,7 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
680 m = (struct GNUNET_MessageHeader*) &h[1]; 681 m = (struct GNUNET_MessageHeader*) &h[1];
681 m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM); 682 m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
682 m->size = htons(sizeof (struct GNUNET_MessageHeader)); 683 m->size = htons(sizeof (struct GNUNET_MessageHeader));
684 GNUNET_assert (h->response_proc == NULL);
683 transmit_for_result (h, iter, iter_cls, timeout); 685 transmit_for_result (h, iter, iter_cls, timeout);
684} 686}
685 687
diff --git a/src/include/gnunet_core_service.h b/src/include/gnunet_core_service.h
index 060a0f8ee..af0a4eaa9 100644
--- a/src/include/gnunet_core_service.h
+++ b/src/include/gnunet_core_service.h
@@ -273,7 +273,8 @@ GNUNET_CORE_peer_request_connect_cancel (struct GNUNET_CORE_PeerRequestHandle *r
273 * @param bpm_out set to the current bandwidth limit (sending) for this peer 273 * @param bpm_out set to the current bandwidth limit (sending) for this peer
274 * @param latency current latency estimate, "FOREVER" if we have been 274 * @param latency current latency estimate, "FOREVER" if we have been
275 * disconnected 275 * disconnected
276 * @param amount set to the amount that was actually reserved or unreserved 276 * @param amount set to the amount that was actually reserved or unreserved;
277 * either the full requested amount or zero (no partial reservations)
277 * @param preference current traffic preference for the given peer 278 * @param preference current traffic preference for the given peer
278 */ 279 */
279typedef void 280typedef void
diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c
index 3c5feaf51..5048b029c 100644
--- a/src/transport/gnunet-service-transport.c
+++ b/src/transport/gnunet-service-transport.c
@@ -871,6 +871,37 @@ update_quota (struct NeighbourList *n)
871 uint64_t allowed; 871 uint64_t allowed;
872 uint64_t remaining; 872 uint64_t remaining;
873 873
874#if 0
875 struct GNUNET_TIME_Absolute now;
876 unsigned long long delta;
877 unsigned long long total_allowed;
878 unsigned long long total_remaining;
879
880 now = GNUNET_TIME_absolute_get ();
881 delta = now.value - session->last_quota_update.value;
882 if ((delta < MIN_QUOTA_REFRESH_TIME) && (!force))
883 return; /* too early, not enough data */
884
885 total_allowed = session->quota_in * delta;
886 if (total_allowed > session->last_received)
887 {
888 /* got less than acceptable */
889 total_remaining = total_allowed - session->last_received;
890 session->last_received = 0;
891 delta = total_remaining / session->quota_in; /* bonus seconds */
892 if (delta > MAX_BANDWIDTH_CARRY)
893 delta = MAX_BANDWIDTH_CARRY; /* limit amount of carry-over */
894 }
895 else
896 {
897 /* got more than acceptable */
898 session->last_received -= total_allowed;
899 delta = 0;
900 }
901 session->last_quota_update.value = now.value - delta;
902#endif
903
904
874 delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); 905 delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
875 if (delta.value < MIN_QUOTA_REFRESH_TIME) 906 if (delta.value < MIN_QUOTA_REFRESH_TIME)
876 return; /* not enough time passed for doing quota update */ 907 return; /* not enough time passed for doing quota update */
@@ -2539,24 +2570,63 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
2539 2570
2540 2571
2541/** 2572/**
2573 * Calculate how long we should delay reading from the TCP socket to
2574 * ensure that we stay within our bandwidth limits (push back).
2575 *
2576 * @param n for which neighbour should this be calculated
2577 * @return how long to delay receiving more data
2578 */
2579static struct GNUNET_TIME_Relative
2580calculate_throttle_delay (struct NeighbourList *n)
2581{
2582 struct GNUNET_TIME_Relative ret;
2583 struct GNUNET_TIME_Absolute now;
2584 uint64_t del;
2585 uint64_t avail;
2586 uint64_t excess;
2587
2588 now = GNUNET_TIME_absolute_get ();
2589 del = now.value - n->last_quota_update.value;
2590 if (del > MAX_BANDWIDTH_CARRY)
2591 {
2592 update_quota (n /*, GNUNET_YES*/);
2593 del = now.value - n->last_quota_update.value;
2594 GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
2595 }
2596 if (n->quota_in == 0)
2597 n->quota_in = 1; /* avoid divison by zero */
2598 avail = del * n->quota_in;
2599 if (avail > n->last_received)
2600 return GNUNET_TIME_UNIT_ZERO; /* can receive right now */
2601 excess = n->last_received - avail;
2602 ret.value = excess / n->quota_in;
2603 if (ret.value > 0)
2604 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2605 "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n",
2606 (unsigned long long) excess,
2607 (unsigned long long) n->quota_in,
2608 (unsigned long long) ret.value);
2609 return ret;
2610}
2611
2612
2613/**
2542 * Function called by the plugin for each received message. 2614 * Function called by the plugin for each received message.
2543 * Update data volumes, possibly notify plugins about 2615 * Update data volumes, possibly notify plugins about
2544 * reducing the rate at which they read from the socket 2616 * reducing the rate at which they read from the socket
2545 * and generally forward to our receive callback. 2617 * and generally forward to our receive callback.
2546 * 2618 *
2547 * @param cls the "struct TransportPlugin *" we gave to the plugin 2619 * @param cls the "struct TransportPlugin *" we gave to the plugin
2548 * @param message the message, NULL if peer was disconnected
2549 * @param distance the transport cost to this peer (not latency!)
2550 * @param sender_address the address that the sender reported
2551 * (opaque to transport service)
2552 * @param sender_address_len the length of the sender address
2553 * @param peer (claimed) identity of the other peer 2620 * @param peer (claimed) identity of the other peer
2554 * @return the new service_context that the plugin should use 2621 * @param message the message, NULL if we only care about
2555 * for future receive calls for messages from this 2622 * learning about the delay until we should receive again
2556 * particular peer 2623 * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
2557 * 2624 * @param sender_address binary address of the sender (if observed)
2558 */ 2625 * @param sender_address_len number of bytes in sender_address
2559static void 2626 * @return how long the plugin should wait until receiving more data
2627 * (plugins that do not support this, can ignore the return value)
2628 */
2629static struct GNUNET_TIME_Relative
2560plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, 2630plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
2561 const struct GNUNET_MessageHeader *message, 2631 const struct GNUNET_MessageHeader *message,
2562 unsigned int distance, const char *sender_address, 2632 unsigned int distance, const char *sender_address,
@@ -2572,99 +2642,86 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
2572 2642
2573 n = find_neighbour (peer); 2643 n = find_neighbour (peer);
2574 if (n == NULL) 2644 if (n == NULL)
2575 { 2645 n = setup_new_neighbour (peer);
2576 if (message == NULL)
2577 return; /* disconnect of peer already marked down */
2578 n = setup_new_neighbour (peer);
2579 }
2580 service_context = n->plugins; 2646 service_context = n->plugins;
2581 while ((service_context != NULL) && (plugin != service_context->plugin)) 2647 while ((service_context != NULL) && (plugin != service_context->plugin))
2582 service_context = service_context->next; 2648 service_context = service_context->next;
2583 GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL)); 2649 GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
2584 if (message == NULL) 2650 if (message != NULL)
2585 {
2586#if DEBUG_TRANSPORT
2587 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2588 "Receive failed from `%4s', triggering disconnect\n",
2589 GNUNET_i2s (&n->id));
2590#endif
2591 /* TODO: call stats */
2592 disconnect_neighbour (n, GNUNET_YES);
2593 return;
2594 }
2595 peer_address = add_peer_address(n,
2596 plugin->short_name,
2597 sender_address,
2598 sender_address_len);
2599 if (peer_address != NULL)
2600 { 2651 {
2601 peer_address->distance = distance; 2652 peer_address = add_peer_address(n,
2602 if (peer_address->connected == GNUNET_NO) 2653 plugin->short_name,
2603 { 2654 sender_address,
2604 peer_address->connected = GNUNET_YES; 2655 sender_address_len);
2605 peer_address->connect_attempts++; 2656 if (peer_address != NULL)
2606 } 2657 {
2607 peer_address->timeout 2658 peer_address->distance = distance;
2608 = 2659 if (peer_address->connected == GNUNET_NO)
2609 GNUNET_TIME_relative_to_absolute 2660 {
2610 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); 2661 peer_address->connected = GNUNET_YES;
2611 } 2662 peer_address->connect_attempts++;
2612 /* update traffic received amount ... */ 2663 }
2613 msize = ntohs (message->size); 2664 peer_address->timeout
2614 n->last_received += msize; 2665 =
2615 n->distance = distance; 2666 GNUNET_TIME_relative_to_absolute
2616 n->peer_timeout = 2667 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2617 GNUNET_TIME_relative_to_absolute 2668 }
2618 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); 2669 /* update traffic received amount ... */
2619 GNUNET_SCHEDULER_cancel (sched, 2670 msize = ntohs (message->size);
2620 n->timeout_task); 2671 n->distance = distance;
2621 n->timeout_task = 2672 n->peer_timeout =
2622 GNUNET_SCHEDULER_add_delayed (sched, 2673 GNUNET_TIME_relative_to_absolute
2623 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2674 (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2624 &neighbour_timeout_task, n); 2675 GNUNET_SCHEDULER_cancel (sched,
2625 update_quota (n); 2676 n->timeout_task);
2626 if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) 2677 n->timeout_task =
2627 { 2678 GNUNET_SCHEDULER_add_delayed (sched,
2628 /* dropping message due to frequent inbound volume violations! */ 2679 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2629 GNUNET_log (GNUNET_ERROR_TYPE_WARNING | 2680 &neighbour_timeout_task, n);
2630 GNUNET_ERROR_TYPE_BULK, 2681 if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2631 _ 2682 {
2632 ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count); 2683 /* dropping message due to frequent inbound volume violations! */
2633 /* TODO: call stats */ 2684 GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
2634 return; 2685 GNUNET_ERROR_TYPE_BULK,
2635 } 2686 _
2636 switch (ntohs (message->type)) 2687 ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"),
2637 { 2688 n->quota_violation_count);
2638 case GNUNET_MESSAGE_TYPE_HELLO: 2689 return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */
2639 process_hello (plugin, message); 2690 }
2640 break; 2691 switch (ntohs (message->type))
2641 case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: 2692 {
2642 handle_ping(plugin, message, peer, sender_address, sender_address_len); 2693 case GNUNET_MESSAGE_TYPE_HELLO:
2643 break; 2694 process_hello (plugin, message);
2644 case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: 2695 break;
2645 handle_pong(plugin, message, peer, sender_address, sender_address_len); 2696 case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
2646 break; 2697 handle_ping(plugin, message, peer, sender_address, sender_address_len);
2647 default: 2698 break;
2699 case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
2700 handle_pong(plugin, message, peer, sender_address, sender_address_len);
2701 break;
2702 default:
2648#if DEBUG_TRANSPORT 2703#if DEBUG_TRANSPORT
2649 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2704 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2650 "Received message of type %u from `%4s', sending to all clients.\n", 2705 "Received message of type %u from `%4s', sending to all clients.\n",
2651 ntohs (message->type), GNUNET_i2s (peer)); 2706 ntohs (message->type), GNUNET_i2s (peer));
2652#endif 2707#endif
2653 /* transmit message to all clients */ 2708 /* transmit message to all clients */
2654 im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); 2709 im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
2655 im->header.size = htons (sizeof (struct InboundMessage) + msize); 2710 im->header.size = htons (sizeof (struct InboundMessage) + msize);
2656 im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); 2711 im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2657 im->latency = GNUNET_TIME_relative_hton (n->latency); 2712 im->latency = GNUNET_TIME_relative_hton (n->latency);
2658 im->peer = *peer; 2713 im->peer = *peer;
2659 memcpy (&im[1], message, msize); 2714 memcpy (&im[1], message, msize);
2660 cpos = clients; 2715 cpos = clients;
2661 while (cpos != NULL) 2716 while (cpos != NULL)
2662 { 2717 {
2663 transmit_to_client (cpos, &im->header, GNUNET_YES); 2718 transmit_to_client (cpos, &im->header, GNUNET_YES);
2664 cpos = cpos->next; 2719 cpos = cpos->next;
2665 } 2720 }
2666 GNUNET_free (im); 2721 GNUNET_free (im);
2667 } 2722 }
2723 }
2724 return calculate_throttle_delay (n);
2668} 2725}
2669 2726
2670 2727
@@ -2830,8 +2887,6 @@ handle_set_quota (void *cls,
2830 const struct QuotaSetMessage *qsm = 2887 const struct QuotaSetMessage *qsm =
2831 (const struct QuotaSetMessage *) message; 2888 (const struct QuotaSetMessage *) message;
2832 struct NeighbourList *n; 2889 struct NeighbourList *n;
2833 struct TransportPlugin *p;
2834 struct ReadyList *rl;
2835 uint32_t qin; 2890 uint32_t qin;
2836 2891
2837 n = find_neighbour (&qsm->peer); 2892 n = find_neighbour (&qsm->peer);
@@ -2850,14 +2905,6 @@ handle_set_quota (void *cls,
2850 if (n->quota_in < qin) 2905 if (n->quota_in < qin)
2851 n->last_quota_update = GNUNET_TIME_absolute_get (); 2906 n->last_quota_update = GNUNET_TIME_absolute_get ();
2852 n->quota_in = qin; 2907 n->quota_in = qin;
2853 rl = n->plugins;
2854 while (rl != NULL)
2855 {
2856 p = rl->plugin;
2857 p->api->set_receive_quota (p->api->cls,
2858 &qsm->peer, qin);
2859 rl = rl->next;
2860 }
2861 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2908 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2862} 2909}
2863 2910
@@ -2974,8 +3021,6 @@ create_environment (struct TransportPlugin *plug)
2974 plug->env.cls = plug; 3021 plug->env.cls = plug;
2975 plug->env.receive = &plugin_env_receive; 3022 plug->env.receive = &plugin_env_receive;
2976 plug->env.notify_address = &plugin_env_notify_address; 3023 plug->env.notify_address = &plugin_env_notify_address;
2977 plug->env.default_quota_in =
2978 (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
2979 plug->env.max_connections = max_connect_per_transport; 3024 plug->env.max_connections = max_connect_per_transport;
2980} 3025}
2981 3026
diff --git a/src/transport/plugin_transport.h b/src/transport/plugin_transport.h
index 2630ffbad..40653a618 100644
--- a/src/transport/plugin_transport.h
+++ b/src/transport/plugin_transport.h
@@ -26,14 +26,6 @@
26 * Note that the destructors of transport plugins will 26 * Note that the destructors of transport plugins will
27 * be given the value returned by the constructor 27 * be given the value returned by the constructor
28 * and is expected to return a NULL pointer. 28 * and is expected to return a NULL pointer.
29 *
30 * TODO:
31 * - consider moving DATA message (latency measurement)
32 * to service; avoids encapsulation overheads and
33 * would enable latency measurements for non-bidi
34 * transports.
35 * -
36 *
37 * @author Christian Grothoff 29 * @author Christian Grothoff
38 */ 30 */
39#ifndef PLUGIN_TRANSPORT_H 31#ifndef PLUGIN_TRANSPORT_H
@@ -51,21 +43,24 @@
51 * 43 *
52 * @param cls closure 44 * @param cls closure
53 * @param peer (claimed) identity of the other peer 45 * @param peer (claimed) identity of the other peer
54 * @param message the message, NULL if peer was disconnected 46 * @param message the message, NULL if we only care about
55 * @param distance in overlay hops; use 1 unless DV 47 * learning about the delay until we should receive again
48 * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
56 * @param sender_address binary address of the sender (if observed) 49 * @param sender_address binary address of the sender (if observed)
57 * @param sender_address_len number of bytes in sender_address 50 * @param sender_address_len number of bytes in sender_address
51 * @return how long the plugin should wait until receiving more data
52 * (plugins that do not support this, can ignore the return value)
58 */ 53 */
59typedef void (*GNUNET_TRANSPORT_PluginReceiveCallback) (void *cls, 54typedef struct GNUNET_TIME_Relative (*GNUNET_TRANSPORT_PluginReceiveCallback) (void *cls,
60 const struct 55 const struct
61 GNUNET_PeerIdentity * 56 GNUNET_PeerIdentity *
62 peer, 57 peer,
63 const struct 58 const struct
64 GNUNET_MessageHeader * 59 GNUNET_MessageHeader *
65 message, 60 message,
66 uint32_t distance, 61 uint32_t distance,
67 const char *sender_address, 62 const char *sender_address,
68 size_t sender_address_len); 63 size_t sender_address_len);
69 64
70 65
71/** 66/**
@@ -89,6 +84,27 @@ typedef void (*GNUNET_TRANSPORT_AddressNotification) (void *cls,
89 84
90 85
91/** 86/**
87 * Function that will be called whenever the plugin receives data over
88 * the network and wants to determine how long it should wait until
89 * the next time it reads from the given peer. Note that some plugins
90 * (such as UDP) may not be able to wait (for a particular peer), so
91 * the waiting part is optional. Plugins that can wait should call
92 * this function, sleep the given amount of time, and call it again
93 * (with zero bytes read) UNTIL it returns zero and only then read.
94 *
95 * @param cls closure
96 * @param peer which peer did we read data from
97 * @param amount_recved number of bytes read (can be zero)
98 * @return how long to wait until reading more from this peer
99 * (to enforce inbound quotas)
100 */
101typedef struct GNUNET_TIME_Relative (*GNUNET_TRANSPORT_TrafficReport) (void *cls,
102 const struct
103 GNUNET_PeerIdentity *peer,
104 size_t amount_recved);
105
106
107/**
92 * The transport service will pass a pointer to a struct 108 * The transport service will pass a pointer to a struct
93 * of this type as the first and only argument to the 109 * of this type as the first and only argument to the
94 * entry point of each transport plugin. 110 * entry point of each transport plugin.
@@ -129,10 +145,10 @@ struct GNUNET_TRANSPORT_PluginEnvironment
129 GNUNET_TRANSPORT_AddressNotification notify_address; 145 GNUNET_TRANSPORT_AddressNotification notify_address;
130 146
131 /** 147 /**
132 * What is the default quota (in terms of incoming bytes per 148 * Inform service about traffic received, get information
133 * ms) for new connections? 149 * about when we might be willing to receive more.
134 */ 150 */
135 uint32_t default_quota_in; 151 GNUNET_TRANSPORT_TrafficReport traffic_report;
136 152
137 /** 153 /**
138 * What is the maximum number of connections that this transport 154 * What is the maximum number of connections that this transport
@@ -270,21 +286,6 @@ typedef void
270 286
271 287
272/** 288/**
273 * Set a quota for receiving data from the given peer; this is a
274 * per-transport limit. The transport should limit its read/select
275 * calls to stay below the quota (in terms of incoming data).
276 *
277 * @param cls closure
278 * @param peer the peer for whom the quota is given
279 * @param quota_in quota for receiving/sending data in bytes per ms
280 */
281typedef void
282 (*GNUNET_TRANSPORT_SetQuota) (void *cls,
283 const struct GNUNET_PeerIdentity * target,
284 uint32_t quota_in);
285
286
287/**
288 * Another peer has suggested an address for this peer and transport 289 * Another peer has suggested an address for this peer and transport
289 * plugin. Check that this could be a valid address. This function 290 * plugin. Check that this could be a valid address. This function
290 * is not expected to 'validate' the address in the sense of trying to 291 * is not expected to 'validate' the address in the sense of trying to
@@ -338,14 +339,6 @@ struct GNUNET_TRANSPORT_PluginFunctions
338 GNUNET_TRANSPORT_AddressPrettyPrinter address_pretty_printer; 339 GNUNET_TRANSPORT_AddressPrettyPrinter address_pretty_printer;
339 340
340 /** 341 /**
341 * Function that the transport service can use to try to enforce a
342 * quota for the number of bytes received via this transport.
343 * Transports that can not refuse incoming data (such as UDP)
344 * are free to ignore these calls.
345 */
346 GNUNET_TRANSPORT_SetQuota set_receive_quota;
347
348 /**
349 * Function that will be called to check if a binary address 342 * Function that will be called to check if a binary address
350 * for this plugin is well-formed. If clearly needed, patch 343 * for this plugin is well-formed. If clearly needed, patch
351 * up information such as port numbers. 344 * up information such as port numbers.
diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c
index 98a5f1638..2ec80f9e9 100644
--- a/src/transport/plugin_transport_tcp.c
+++ b/src/transport/plugin_transport_tcp.c
@@ -165,9 +165,9 @@ struct Session
165 struct GNUNET_PeerIdentity target; 165 struct GNUNET_PeerIdentity target;
166 166
167 /** 167 /**
168 * At what time did we reset last_received last? 168 * ID of task used to delay receiving more to throttle sender.
169 */ 169 */
170 struct GNUNET_TIME_Absolute last_quota_update; 170 GNUNET_SCHEDULER_TaskIdentifier receive_delay_task;
171 171
172 /** 172 /**
173 * Address of the other peer (either based on our 'connect' 173 * Address of the other peer (either based on our 'connect'
@@ -176,18 +176,6 @@ struct Session
176 void *connect_addr; 176 void *connect_addr;
177 177
178 /** 178 /**
179 * How many bytes have we received since the "last_quota_update"
180 * timestamp?
181 */
182 uint64_t last_received;
183
184 /**
185 * Number of bytes per ms that this peer is allowed
186 * to send to us.
187 */
188 uint32_t quota_in;
189
190 /**
191 * Length of connect_addr. 179 * Length of connect_addr.
192 */ 180 */
193 size_t connect_alen; 181 size_t connect_alen;
@@ -266,28 +254,6 @@ struct Plugin
266 254
267 255
268/** 256/**
269 * Find a session handle for the given peer.
270 * FIXME: using a hash map we could do this in O(1).
271 *
272 * @return NULL if no matching session exists
273 */
274static struct Session *
275find_session_by_target (struct Plugin *plugin,
276 const struct GNUNET_PeerIdentity *target)
277{
278 struct Session *ret;
279
280 ret = plugin->sessions;
281 while ( (ret != NULL) &&
282 ((GNUNET_SYSERR == ret->expecting_welcome) ||
283 (0 != memcmp (target,
284 &ret->target, sizeof (struct GNUNET_PeerIdentity)))))
285 ret = ret->next;
286 return ret;
287}
288
289
290/**
291 * Find the session handle for the given client. 257 * Find the session handle for the given client.
292 * 258 *
293 * @return NULL if no matching session exists 259 * @return NULL if no matching session exists
@@ -332,9 +298,6 @@ create_session (struct Plugin *plugin,
332 plugin->sessions = ret; 298 plugin->sessions = ret;
333 ret->client = client; 299 ret->client = client;
334 ret->target = *target; 300 ret->target = *target;
335 ret->last_quota_update = GNUNET_TIME_absolute_get ();
336 // FIXME: This is simply wrong...
337 ret->quota_in = plugin->env->default_quota_in;
338 ret->expecting_welcome = GNUNET_YES; 301 ret->expecting_welcome = GNUNET_YES;
339 pm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (struct WelcomeMessage)); 302 pm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (struct WelcomeMessage));
340 pm->msg = (const char*) &pm[1]; 303 pm->msg = (const char*) &pm[1];
@@ -534,18 +497,22 @@ disconnect_session (struct Session *session)
534 transport service may know about this one, so we need to 497 transport service may know about this one, so we need to
535 notify transport service about disconnect */ 498 notify transport service about disconnect */
536 // FIXME: we should have a very clear connect-disconnect 499 // FIXME: we should have a very clear connect-disconnect
537 // protocol with gnunet-service-transport! 500 // protocol with gnunet-service-transport!
538 session->plugin->env->receive (session->plugin->env->cls, 501 // FIXME: but this is not possible for all plugins, so what gives?
539 &session->target, NULL, 502 }
540 1, 503 if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK)
541 session->connect_addr, 504 {
542 session->connect_alen); 505 GNUNET_SCHEDULER_cancel (session->plugin->env->sched,
506 session->receive_delay_task);
507 if (session->client != NULL)
508 GNUNET_SERVER_receive_done (session->client,
509 GNUNET_SYSERR);
543 } 510 }
544 if (session->client != NULL) 511 if (session->client != NULL)
545 { 512 {
546 GNUNET_SERVER_client_drop (session->client); 513 GNUNET_SERVER_client_drop (session->client);
547 session->client = NULL; 514 session->client = NULL;
548 } 515 }
549 GNUNET_free_non_null (session->connect_addr); 516 GNUNET_free_non_null (session->connect_addr);
550 GNUNET_free (session); 517 GNUNET_free (session);
551} 518}
@@ -846,98 +813,6 @@ tcp_plugin_address_pretty_printer (void *cls,
846 813
847 814
848/** 815/**
849 * Update the last-received and bandwidth quota values
850 * for this session.
851 *
852 * @param session session to update
853 * @param force set to GNUNET_YES if we should update even
854 * though the minimum refresh time has not yet expired
855 */
856static void
857update_quota (struct Session *session, int force)
858{
859 struct GNUNET_TIME_Absolute now;
860 unsigned long long delta;
861 unsigned long long total_allowed;
862 unsigned long long total_remaining;
863
864 now = GNUNET_TIME_absolute_get ();
865 delta = now.value - session->last_quota_update.value;
866 if ((delta < MIN_QUOTA_REFRESH_TIME) && (!force))
867 return; /* too early, not enough data */
868
869 total_allowed = session->quota_in * delta;
870 if (total_allowed > session->last_received)
871 {
872 /* got less than acceptable */
873 total_remaining = total_allowed - session->last_received;
874 session->last_received = 0;
875 delta = total_remaining / session->quota_in; /* bonus seconds */
876 if (delta > MAX_BANDWIDTH_CARRY)
877 delta = MAX_BANDWIDTH_CARRY; /* limit amount of carry-over */
878 }
879 else
880 {
881 /* got more than acceptable */
882 session->last_received -= total_allowed;
883 delta = 0;
884 }
885 session->last_quota_update.value = now.value - delta;
886}
887
888
889/**
890 * Set a quota for receiving data from the given peer; this is a
891 * per-transport limit. The transport should limit its read/select
892 * calls to stay below the quota (in terms of incoming data).
893 *
894 * @param cls closure
895 * @param target the peer for whom the quota is given
896 * @param quota_in quota for receiving/sending data in bytes per ms
897 */
898static void
899tcp_plugin_set_receive_quota (void *cls,
900 const struct GNUNET_PeerIdentity *target,
901 uint32_t quota_in)
902{
903 struct Plugin *plugin = cls;
904 struct Session *session;
905
906 // FIXME: This is simply wrong:
907 // We may have multiple sessions for the target,
908 // and some OTHER session might be the one to
909 // survive; not to mention the inbound-quota should
910 // be enforced across transports!
911 // => keep quota-related states in the service (globally, per peer)
912 // and update/query the information when it is needed!
913 // => we can likely get rid of this entire function and
914 // replace it with a query/update API!
915 session = find_session_by_target (plugin, target);
916 if (session == NULL)
917 {
918 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
919 "tcp",
920 "Could not find session for peer `%4s' to update quota.\n",
921 GNUNET_i2s (target));
922 return; /* peer must have disconnected, ignore */
923 }
924 if (session->quota_in != quota_in)
925 {
926 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
927 "tcp",
928 "Changing quota for peer `%4s' from %u to %u\n",
929 GNUNET_i2s (target),
930 session->quota_in,
931 quota_in);
932 update_quota (session, GNUNET_YES);
933 if (session->quota_in > quota_in)
934 session->last_quota_update = GNUNET_TIME_absolute_get ();
935 session->quota_in = quota_in;
936 }
937}
938
939
940/**
941 * Check if the given port is plausible (must be either 816 * Check if the given port is plausible (must be either
942 * our listen port or our advertised port). If it is 817 * our listen port or our advertised port). If it is
943 * neither, we return one of these two ports at random. 818 * neither, we return one of these two ports at random.
@@ -1073,46 +948,6 @@ handle_tcp_welcome (void *cls,
1073 948
1074 949
1075/** 950/**
1076 * Calculate how long we should delay reading from the TCP socket to
1077 * ensure that we stay within our bandwidth limits (push back).
1078 *
1079 * @param session for which client should this be calculated
1080 */
1081static struct GNUNET_TIME_Relative
1082calculate_throttle_delay (struct Session *session)
1083{
1084 struct GNUNET_TIME_Relative ret;
1085 struct GNUNET_TIME_Absolute now;
1086 uint64_t del;
1087 uint64_t avail;
1088 uint64_t excess;
1089
1090 now = GNUNET_TIME_absolute_get ();
1091 del = now.value - session->last_quota_update.value;
1092 if (del > MAX_BANDWIDTH_CARRY)
1093 {
1094 update_quota (session, GNUNET_YES);
1095 del = now.value - session->last_quota_update.value;
1096 GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
1097 }
1098 if (session->quota_in == 0)
1099 session->quota_in = 1; /* avoid divison by zero */
1100 avail = del * session->quota_in;
1101 if (avail > session->last_received)
1102 return GNUNET_TIME_UNIT_ZERO; /* can receive right now */
1103 excess = session->last_received - avail;
1104 ret.value = excess / session->quota_in;
1105 GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1106 "tcp",
1107 "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n",
1108 (unsigned long long) excess,
1109 (unsigned long long) session->quota_in,
1110 (unsigned long long) ret.value);
1111 return ret;
1112}
1113
1114
1115/**
1116 * Task to signal the server that we can continue 951 * Task to signal the server that we can continue
1117 * receiving from the TCP client now. 952 * receiving from the TCP client now.
1118 */ 953 */
@@ -1120,7 +955,18 @@ static void
1120delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 955delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1121{ 956{
1122 struct Session *session = cls; 957 struct Session *session = cls;
1123 GNUNET_SERVER_receive_done (session->client, GNUNET_OK); 958 struct GNUNET_TIME_Relative delay;
959
960 session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK;
961 delay = session->plugin->env->receive (session->plugin->env->cls,
962 &session->target,
963 NULL, 0, NULL, 0);
964 if (delay.value == 0)
965 GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
966 else
967 session->receive_delay_task =
968 GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched,
969 delay, &delayed_done, session);
1124} 970}
1125 971
1126 972
@@ -1165,18 +1011,16 @@ handle_tcp_data (void *cls,
1165 (unsigned int) ntohs (message->type), 1011 (unsigned int) ntohs (message->type),
1166 GNUNET_i2s (&session->target)); 1012 GNUNET_i2s (&session->target));
1167#endif 1013#endif
1168 plugin->env->receive (plugin->env->cls, &session->target, message, 1, 1014 delay = plugin->env->receive (plugin->env->cls, &session->target, message, 1,
1169 session->connect_addr, 1015 session->connect_addr,
1170 session->connect_alen); 1016 session->connect_alen);
1171 /* update bandwidth used */ 1017
1172 session->last_received += msize;
1173 update_quota (session, GNUNET_NO);
1174 delay = calculate_throttle_delay (session);
1175 if (delay.value == 0) 1018 if (delay.value == 0)
1176 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1019 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1177 else 1020 else
1178 GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched, 1021 session->receive_delay_task =
1179 delay, &delayed_done, session); 1022 GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched,
1023 delay, &delayed_done, session);
1180} 1024}
1181 1025
1182 1026
@@ -1342,7 +1186,6 @@ libgnunet_plugin_transport_tcp_init (void *cls)
1342 api->send = &tcp_plugin_send; 1186 api->send = &tcp_plugin_send;
1343 api->disconnect = &tcp_plugin_disconnect; 1187 api->disconnect = &tcp_plugin_disconnect;
1344 api->address_pretty_printer = &tcp_plugin_address_pretty_printer; 1188 api->address_pretty_printer = &tcp_plugin_address_pretty_printer;
1345 api->set_receive_quota = &tcp_plugin_set_receive_quota;
1346 api->check_address = &tcp_plugin_check_address; 1189 api->check_address = &tcp_plugin_check_address;
1347 plugin->service = service; 1190 plugin->service = service;
1348 plugin->server = GNUNET_SERVICE_get_server (service); 1191 plugin->server = GNUNET_SERVICE_get_server (service);
diff --git a/src/transport/plugin_transport_template.c b/src/transport/plugin_transport_template.c
index fc1c1722b..6ad555a51 100644
--- a/src/transport/plugin_transport_template.c
+++ b/src/transport/plugin_transport_template.c
@@ -219,23 +219,6 @@ template_plugin_address_pretty_printer (void *cls,
219 asc (asc_cls, NULL); 219 asc (asc_cls, NULL);
220} 220}
221 221
222/**
223 * Set a quota for receiving data from the given peer; this is a
224 * per-transport limit. The transport should limit its read/select
225 * calls to stay below the quota (in terms of incoming data).
226 *
227 * @param cls closure
228 * @param target the peer for whom the quota is given
229 * @param quota_in quota for receiving/sending data in bytes per ms
230 */
231static void
232template_plugin_set_receive_quota (void *cls,
233 const struct GNUNET_PeerIdentity *target,
234 uint32_t quota_in)
235{
236 // struct Plugin *plugin = cls;
237 // FIXME!
238}
239 222
240 223
241/** 224/**
@@ -280,7 +263,6 @@ gnunet_plugin_transport_template_init (void *cls)
280 api->send = &template_plugin_send; 263 api->send = &template_plugin_send;
281 api->disconnect = &template_plugin_disconnect; 264 api->disconnect = &template_plugin_disconnect;
282 api->address_pretty_printer = &template_plugin_address_pretty_printer; 265 api->address_pretty_printer = &template_plugin_address_pretty_printer;
283 api->set_receive_quota = &template_plugin_set_receive_quota;
284 api->check_address = &template_plugin_address_suggested; 266 api->check_address = &template_plugin_address_suggested;
285 return api; 267 return api;
286} 268}
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c
index 1cfa1ef43..c00ba1298 100644
--- a/src/transport/plugin_transport_udp.c
+++ b/src/transport/plugin_transport_udp.c
@@ -682,23 +682,6 @@ udp_plugin_address_pretty_printer (void *cls,
682 !numeric, timeout, &append_port, ppc); 682 !numeric, timeout, &append_port, ppc);
683} 683}
684 684
685/**
686 * Set a quota for receiving data from the given peer; this is a
687 * per-transport limit. This call has no meaning for UDP, as if
688 * we don't receive data it still comes in. UDP has no friendliness
689 * guarantees, and our buffers will fill at some level.
690 *
691 * @param cls closure
692 * @param target the peer for whom the quota is given
693 * @param quota_in quota for receiving/sending data in bytes per ms
694 */
695static void
696udp_plugin_set_receive_quota (void *cls,
697 const struct GNUNET_PeerIdentity *target,
698 uint32_t quota_in)
699{
700 return; /* Do nothing */
701}
702 685
703/** 686/**
704 * The exported method. Makes the core api available via a global and 687 * The exported method. Makes the core api available via a global and
@@ -766,7 +749,6 @@ libgnunet_plugin_transport_udp_init (void *cls)
766 api->send = &udp_plugin_send; 749 api->send = &udp_plugin_send;
767 api->disconnect = &udp_disconnect; 750 api->disconnect = &udp_disconnect;
768 api->address_pretty_printer = &udp_plugin_address_pretty_printer; 751 api->address_pretty_printer = &udp_plugin_address_pretty_printer;
769 api->set_receive_quota = &udp_plugin_set_receive_quota;
770 api->check_address = &udp_check_address; 752 api->check_address = &udp_check_address;
771 753
772 plugin->service = service; 754 plugin->service = service;
diff --git a/src/transport/plugin_transport_udp_nat.c b/src/transport/plugin_transport_udp_nat.c
index a4e6d1fb0..8bf5ac06b 100644
--- a/src/transport/plugin_transport_udp_nat.c
+++ b/src/transport/plugin_transport_udp_nat.c
@@ -1595,23 +1595,6 @@ udp_nat_plugin_address_pretty_printer (void *cls,
1595 !numeric, timeout, &append_port, ppc); 1595 !numeric, timeout, &append_port, ppc);
1596} 1596}
1597 1597
1598/**
1599 * Set a quota for receiving data from the given peer; this is a
1600 * per-transport limit. This call has no meaning for UDP, as if
1601 * we don't receive data it still comes in. UDP has no friendliness
1602 * guarantees, and our buffers will fill at some level.
1603 *
1604 * @param cls closure
1605 * @param target the peer for whom the quota is given
1606 * @param quota_in quota for receiving/sending data in bytes per ms
1607 */
1608static void
1609udp_nat_plugin_set_receive_quota (void *cls,
1610 const struct GNUNET_PeerIdentity *target,
1611 uint32_t quota_in)
1612{
1613 return; /* Do nothing */
1614}
1615 1598
1616/** 1599/**
1617 * The exported method. Makes the core api available via a global and 1600 * The exported method. Makes the core api available via a global and
@@ -1715,7 +1698,6 @@ libgnunet_plugin_transport_udp_nat_init (void *cls)
1715 api->send = &udp_nat_plugin_send; 1698 api->send = &udp_nat_plugin_send;
1716 api->disconnect = &udp_nat_disconnect; 1699 api->disconnect = &udp_nat_disconnect;
1717 api->address_pretty_printer = &udp_nat_plugin_address_pretty_printer; 1700 api->address_pretty_printer = &udp_nat_plugin_address_pretty_printer;
1718 api->set_receive_quota = &udp_nat_plugin_set_receive_quota;
1719 api->check_address = &udp_nat_check_address; 1701 api->check_address = &udp_nat_check_address;
1720 1702
1721 plugin->service = service; 1703 plugin->service = service;