aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Wachs <wachs@net.in.tum.de>2014-01-30 16:58:20 +0000
committerMatthias Wachs <wachs@net.in.tum.de>2014-01-30 16:58:20 +0000
commit6a4c5ee6195fb7a6fcf90b1bae5ca36926e82023 (patch)
treef541686c7c894be4c74926fad156aae32b3f97ca
parentfb0fd00ae3d28e937b4e15a9aa50b62fb8440268 (diff)
downloadgnunet-6a4c5ee6195fb7a6fcf90b1bae5ca36926e82023.tar.gz
gnunet-6a4c5ee6195fb7a6fcf90b1bae5ca36926e82023.zip
send receive delay rescheduling support
-rw-r--r--src/include/gnunet_transport_plugin.h22
-rw-r--r--src/transport/gnunet-service-transport.c2
-rw-r--r--src/transport/gnunet-service-transport_neighbours.c152
-rw-r--r--src/transport/gnunet-service-transport_neighbours.h9
-rw-r--r--src/transport/gnunet-service-transport_plugins.c4
-rw-r--r--src/transport/gnunet-service-transport_plugins.h2
-rw-r--r--src/transport/plugin_transport_tcp.c57
-rw-r--r--src/transport/transport_api.c2
8 files changed, 230 insertions, 20 deletions
diff --git a/src/include/gnunet_transport_plugin.h b/src/include/gnunet_transport_plugin.h
index 187d5d508..f1092e949 100644
--- a/src/include/gnunet_transport_plugin.h
+++ b/src/include/gnunet_transport_plugin.h
@@ -194,6 +194,15 @@ typedef struct GNUNET_TIME_Relative
194 const struct GNUNET_PeerIdentity *peer, 194 const struct GNUNET_PeerIdentity *peer,
195 size_t amount_recved); 195 size_t amount_recved);
196 196
197typedef void
198(*GNUNET_TRANSPORT_RegisterQuotaNotification) (void *cls,
199 const struct GNUNET_PeerIdentity *peer,
200 const char *plugin,
201 struct Session *session);
202
203typedef void
204(*GNUNET_TRANSPORT_UnregisterQuotaNotification) (void *cls,
205 const struct GNUNET_PeerIdentity *peer, const char *plugin, struct Session *session);
197 206
198/** 207/**
199 * Function that returns a HELLO message. 208 * Function that returns a HELLO message.
@@ -275,6 +284,9 @@ struct GNUNET_TRANSPORT_PluginEnvironment
275 */ 284 */
276 GNUNET_TRANSPORT_UpdateAddressMetrics update_address_metrics; 285 GNUNET_TRANSPORT_UpdateAddressMetrics update_address_metrics;
277 286
287 GNUNET_TRANSPORT_RegisterQuotaNotification register_quota_notification;
288
289 GNUNET_TRANSPORT_UnregisterQuotaNotification unregister_quota_notification;
278 290
279 /** 291 /**
280 * What is the maximum number of connections that this transport 292 * What is the maximum number of connections that this transport
@@ -484,6 +496,14 @@ typedef void
484 const struct GNUNET_PeerIdentity *peer, 496 const struct GNUNET_PeerIdentity *peer,
485 struct Session *session); 497 struct Session *session);
486 498
499
500
501typedef void
502(*GNUNET_TRANSPORT_UpdateInboundDelay) (void *cls,
503 const struct GNUNET_PeerIdentity *peer,
504 struct Session *session,
505 struct GNUNET_TIME_Relative delay);
506
487/** 507/**
488 * Function called for a quick conversion of the binary address to 508 * Function called for a quick conversion of the binary address to
489 * a numeric address. Note that the caller must not free the 509 * a numeric address. Note that the caller must not free the
@@ -575,6 +595,8 @@ struct GNUNET_TRANSPORT_PluginFunctions
575 */ 595 */
576 GNUNET_TRANSPORT_UpdateSessionTimeout update_session_timeout; 596 GNUNET_TRANSPORT_UpdateSessionTimeout update_session_timeout;
577 597
598 GNUNET_TRANSPORT_UpdateInboundDelay update_inbound_delay;
599
578 /** 600 /**
579 * Function that will be called whenever the transport service wants to 601 * Function that will be called whenever the transport service wants to
580 * notify the plugin that the inbound quota changed and that the plugin 602 * notify the plugin that the inbound quota changed and that the plugin
diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c
index 76a398dc9..4b5e1c05c 100644
--- a/src/transport/gnunet-service-transport.c
+++ b/src/transport/gnunet-service-transport.c
@@ -934,6 +934,8 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
934 NULL ); 934 NULL );
935 GST_manipulation_init (GST_cfg); 935 GST_manipulation_init (GST_cfg);
936 GST_plugins_load (&GST_manipulation_recv, 936 GST_plugins_load (&GST_manipulation_recv,
937 &GST_neighbours_register_quota_notification,
938 &GST_neighbours_unregister_quota_notification,
937 &plugin_env_address_change_notification, 939 &plugin_env_address_change_notification,
938 &plugin_env_session_start, 940 &plugin_env_session_start,
939 &plugin_env_session_end, 941 &plugin_env_session_end,
diff --git a/src/transport/gnunet-service-transport_neighbours.c b/src/transport/gnunet-service-transport_neighbours.c
index 58cb0fbb3..45f1575fe 100644
--- a/src/transport/gnunet-service-transport_neighbours.c
+++ b/src/transport/gnunet-service-transport_neighbours.c
@@ -508,6 +508,8 @@ static unsigned long long bytes_in_send_queue;
508static GNUNET_SCHEDULER_TaskIdentifier util_transmission_tk; 508static GNUNET_SCHEDULER_TaskIdentifier util_transmission_tk;
509 509
510 510
511static struct GNUNET_CONTAINER_MultiPeerMap *registered_quota_notifications;
512
511/** 513/**
512 * Lookup a neighbour entry in the neighbours hash map. 514 * Lookup a neighbour entry in the neighbours hash map.
513 * 515 *
@@ -1689,13 +1691,152 @@ send_session_connect_ack_message (const struct GNUNET_HELLO_Address *address,
1689 1691
1690} 1692}
1691 1693
1694struct QuotaNotificationRequest
1695{
1696 struct GNUNET_PeerIdentity peer;
1697 struct Session *session;
1698 char *plugin;
1699};
1700
1701struct QNR_LookContext
1702{
1703 struct GNUNET_PeerIdentity peer;
1704 struct Session *session;
1705 const char *plugin;
1706
1707 struct QuotaNotificationRequest *res;
1708};
1709
1710static int
1711find_notification_request (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
1712{
1713 struct QNR_LookContext *qnr_ctx = cls;
1714 struct QuotaNotificationRequest *qnr = value;
1715
1716 if ((qnr->session == qnr_ctx->session) &&
1717 (0 == memcmp (&qnr->peer, &qnr_ctx->peer, sizeof (struct GNUNET_PeerIdentity))) &&
1718 (0 == strcmp(qnr_ctx->plugin, qnr->plugin)))
1719 {
1720 qnr_ctx->res = value;
1721 return GNUNET_NO;
1722 }
1723 return GNUNET_YES;
1724}
1725
1726void
1727GST_neighbours_register_quota_notification(void *cls,
1728 const struct GNUNET_PeerIdentity *peer, const char *plugin,
1729 struct Session *session)
1730{
1731 struct QuotaNotificationRequest *qnr;
1732 struct QNR_LookContext qnr_ctx;
1733
1734 qnr_ctx.peer = (*peer);
1735 qnr_ctx.plugin = plugin;
1736 qnr_ctx.session = session;
1737 qnr_ctx.res = NULL;
1738 int res;
1739
1740 res = GNUNET_CONTAINER_multipeermap_get_multiple (registered_quota_notifications,
1741 peer, &find_notification_request, &qnr_ctx);
1742 if (NULL != qnr_ctx.res)
1743 {
1744 GNUNET_break(0);
1745 return;
1746 }
1747
1748 qnr = GNUNET_new (struct QuotaNotificationRequest);
1749 qnr->peer = (*peer);
1750 qnr->plugin = GNUNET_strdup (plugin);
1751 qnr->session = session;
1752
1753 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1754 "Adding notification for peer `%s' plugin `%s' session %p \n",
1755 GNUNET_i2s (peer), plugin, session);
1756
1757 GNUNET_CONTAINER_multipeermap_put (registered_quota_notifications, peer,
1758 qnr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1759}
1760
1761
1762void
1763GST_neighbours_unregister_quota_notification(void *cls,
1764 const struct GNUNET_PeerIdentity *peer, const char *plugin, struct Session *session)
1765{
1766 struct QNR_LookContext qnr_ctx;
1767 qnr_ctx.peer = (*peer);
1768 qnr_ctx.plugin = plugin;
1769 qnr_ctx.session = session;
1770 qnr_ctx.res = NULL;
1771 int res;
1772
1773 res = GNUNET_CONTAINER_multipeermap_iterate (registered_quota_notifications,
1774 &find_notification_request, &qnr_ctx);
1775 if (NULL == qnr_ctx.res)
1776 {
1777 GNUNET_break(0);
1778 return;
1779 }
1780
1781 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1782 "Removing notification for peer `%s' plugin `%s' session %p \n",
1783 GNUNET_i2s (peer), plugin, session);
1784
1785 GNUNET_CONTAINER_multipeermap_remove (registered_quota_notifications, peer,
1786 qnr_ctx.res);
1787 GNUNET_free (qnr_ctx.res->plugin);
1788 GNUNET_free (qnr_ctx.res);
1789}
1790
1791static int
1792notification_cb(void *cls, const struct GNUNET_PeerIdentity *key, void *value)
1793{
1794 struct NeighbourMapEntry *n = cls;
1795 struct QuotaNotificationRequest *qnr = value;
1796 struct GNUNET_TRANSPORT_PluginFunctions *papi;
1797 struct GNUNET_TIME_Relative delay;
1798 int do_forward;
1799
1800 papi = GST_plugins_find(qnr->plugin);
1801 if (NULL == papi)
1802 {
1803 GNUNET_break (0);
1804 return GNUNET_OK;
1805 }
1806
1807 delay = GST_neighbours_calculate_receive_delay (key, 0, &do_forward);
1808 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1809 "New inbound delay for peer `%s' is %llu ms\n", GNUNET_i2s (key),
1810 delay.rel_value_us / 1000);
1811
1812 if (NULL != papi->update_inbound_delay)
1813 papi->update_inbound_delay (papi->cls, key, qnr->session, delay);
1814 return GNUNET_OK;
1815}
1816
1817static
1818int
1819free_notification_cb(void *cls, const struct GNUNET_PeerIdentity *key,
1820 void *value)
1821{
1822 struct NeighbourMapEntry *n = cls;
1823 struct QuotaNotificationRequest *qnr = value;
1824
1825 GNUNET_CONTAINER_multipeermap_remove (registered_quota_notifications, key,
1826 qnr);
1827 GNUNET_free(qnr);
1828
1829 return GNUNET_OK;
1830}
1831
1692static void 1832static void
1693inbound_bw_tracker_update (void *cls) 1833inbound_bw_tracker_update(void *cls)
1694{ 1834{
1695 struct Neighbour *n = cls; 1835 struct NeighbourMapEntry *n = cls;
1696 1836
1697 /* Quota was updated, tell plugins to update the time to receive next */ 1837 /* Quota was updated, tell plugins to update the time to receive next */
1698 1838 GNUNET_CONTAINER_multipeermap_get_multiple (registered_quota_notifications,
1839 &n->id, &notification_cb, n);
1699} 1840}
1700 1841
1701 1842
@@ -3655,6 +3796,7 @@ GST_neighbours_start (void *cls,
3655 disconnect_notify_cb = disconnect_cb; 3796 disconnect_notify_cb = disconnect_cb;
3656 neighbour_change_cb = peer_address_cb; 3797 neighbour_change_cb = peer_address_cb;
3657 neighbours = GNUNET_CONTAINER_multipeermap_create (NEIGHBOUR_TABLE_SIZE, GNUNET_NO); 3798 neighbours = GNUNET_CONTAINER_multipeermap_create (NEIGHBOUR_TABLE_SIZE, GNUNET_NO);
3799 registered_quota_notifications = GNUNET_CONTAINER_multipeermap_create (NEIGHBOUR_TABLE_SIZE, GNUNET_NO);
3658 util_transmission_tk = GNUNET_SCHEDULER_add_delayed (UTIL_TRANSMISSION_INTERVAL, 3800 util_transmission_tk = GNUNET_SCHEDULER_add_delayed (UTIL_TRANSMISSION_INTERVAL,
3659 utilization_transmission, NULL); 3801 utilization_transmission, NULL);
3660} 3802}
@@ -3723,6 +3865,10 @@ GST_neighbours_stop ()
3723 GNUNET_free (cur); 3865 GNUNET_free (cur);
3724 } 3866 }
3725 3867
3868 GNUNET_CONTAINER_multipeermap_iterate (registered_quota_notifications,
3869 &free_notification_cb, NULL);
3870 GNUNET_CONTAINER_multipeermap_destroy (registered_quota_notifications);
3871
3726 neighbours = NULL; 3872 neighbours = NULL;
3727 callback_cls = NULL; 3873 callback_cls = NULL;
3728 connect_notify_cb = NULL; 3874 connect_notify_cb = NULL;
diff --git a/src/transport/gnunet-service-transport_neighbours.h b/src/transport/gnunet-service-transport_neighbours.h
index 01fbe83d7..ff62867fa 100644
--- a/src/transport/gnunet-service-transport_neighbours.h
+++ b/src/transport/gnunet-service-transport_neighbours.h
@@ -107,6 +107,15 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
107 size_t msg_size, struct GNUNET_TIME_Relative timeout, 107 size_t msg_size, struct GNUNET_TIME_Relative timeout,
108 GST_NeighbourSendContinuation cont, void *cont_cls); 108 GST_NeighbourSendContinuation cont, void *cont_cls);
109 109
110void
111GST_neighbours_register_quota_notification (void *cls,
112 const struct GNUNET_PeerIdentity *peer,
113 const char *plugin,
114 struct Session *session);
115
116void
117GST_neighbours_unregister_quota_notification(void *cls,
118 const struct GNUNET_PeerIdentity *peer, const char *plugin, struct Session *session);
110 119
111/** 120/**
112 * We have received a message from the given sender. 121 * We have received a message from the given sender.
diff --git a/src/transport/gnunet-service-transport_plugins.c b/src/transport/gnunet-service-transport_plugins.c
index 865b5dd19..c5b15cf42 100644
--- a/src/transport/gnunet-service-transport_plugins.c
+++ b/src/transport/gnunet-service-transport_plugins.c
@@ -94,6 +94,8 @@ static struct TransportPlugin *plugins_tail;
94 */ 94 */
95void 95void
96GST_plugins_load (GNUNET_TRANSPORT_PluginReceiveCallback recv_cb, 96GST_plugins_load (GNUNET_TRANSPORT_PluginReceiveCallback recv_cb,
97 GNUNET_TRANSPORT_RegisterQuotaNotification register_quota_cb,
98 GNUNET_TRANSPORT_UnregisterQuotaNotification unregister_quota_cb,
97 GNUNET_TRANSPORT_AddressNotification address_cb, 99 GNUNET_TRANSPORT_AddressNotification address_cb,
98 GNUNET_TRANSPORT_SessionStart session_start_cb, 100 GNUNET_TRANSPORT_SessionStart session_start_cb,
99 GNUNET_TRANSPORT_SessionEnd session_end_cb, 101 GNUNET_TRANSPORT_SessionEnd session_end_cb,
@@ -142,6 +144,8 @@ GST_plugins_load (GNUNET_TRANSPORT_PluginReceiveCallback recv_cb,
142 plug->env.session_end = session_end_cb; 144 plug->env.session_end = session_end_cb;
143 plug->env.get_address_type = address_type_cb; 145 plug->env.get_address_type = address_type_cb;
144 plug->env.update_address_metrics = metric_update_cb; 146 plug->env.update_address_metrics = metric_update_cb;
147 plug->env.register_quota_notification = register_quota_cb;
148 plug->env.unregister_quota_notification = unregister_quota_cb;
145 plug->env.max_connections = tneigh; 149 plug->env.max_connections = tneigh;
146 plug->env.stats = GST_stats; 150 plug->env.stats = GST_stats;
147 GNUNET_CONTAINER_DLL_insert (plugins_head, plugins_tail, plug); 151 GNUNET_CONTAINER_DLL_insert (plugins_head, plugins_tail, plug);
diff --git a/src/transport/gnunet-service-transport_plugins.h b/src/transport/gnunet-service-transport_plugins.h
index 9d8d503f8..561b79a9d 100644
--- a/src/transport/gnunet-service-transport_plugins.h
+++ b/src/transport/gnunet-service-transport_plugins.h
@@ -48,6 +48,8 @@
48 */ 48 */
49void 49void
50GST_plugins_load (GNUNET_TRANSPORT_PluginReceiveCallback recv_cb, 50GST_plugins_load (GNUNET_TRANSPORT_PluginReceiveCallback recv_cb,
51 GNUNET_TRANSPORT_RegisterQuotaNotification register_quota_cb,
52 GNUNET_TRANSPORT_UnregisterQuotaNotification unregister_quota_cb,
51 GNUNET_TRANSPORT_AddressNotification address_cb, 53 GNUNET_TRANSPORT_AddressNotification address_cb,
52 GNUNET_TRANSPORT_SessionStart session_start_cb, 54 GNUNET_TRANSPORT_SessionStart session_start_cb,
53 GNUNET_TRANSPORT_SessionEnd session_end_cb, 55 GNUNET_TRANSPORT_SessionEnd session_end_cb,
diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c
index 657043c24..09c410f2b 100644
--- a/src/transport/plugin_transport_tcp.c
+++ b/src/transport/plugin_transport_tcp.c
@@ -779,6 +779,8 @@ tcp_disconnect_session (void *cls, struct Session *session)
779 GNUNET_SERVER_notify_transmit_ready_cancel (session->transmit_handle); 779 GNUNET_SERVER_notify_transmit_ready_cancel (session->transmit_handle);
780 session->transmit_handle = NULL; 780 session->transmit_handle = NULL;
781 } 781 }
782 plugin->env->unregister_quota_notification (plugin->env->cls,
783 &session->target, PLUGIN_NAME, session);
782 session->plugin->env->session_end (session->plugin->env->cls, 784 session->plugin->env->session_end (session->plugin->env->cls,
783 &session->target, session); 785 &session->target, session);
784 786
@@ -929,6 +931,8 @@ create_session (struct Plugin *plugin,
929 GNUNET_STATISTICS_update (plugin->env->stats, 931 GNUNET_STATISTICS_update (plugin->env->stats,
930 gettext_noop ("# TCP sessions active"), 1, GNUNET_NO); 932 gettext_noop ("# TCP sessions active"), 1, GNUNET_NO);
931 } 933 }
934 plugin->env->register_quota_notification (plugin->env->cls,
935 &address->peer, PLUGIN_NAME, session);
932 session->timeout_task = GNUNET_SCHEDULER_add_delayed ( 936 session->timeout_task = GNUNET_SCHEDULER_add_delayed (
933 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, session); 937 GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, session);
934 return session; 938 return session;
@@ -1323,6 +1327,41 @@ tcp_plugin_update_session_timeout (void *cls,
1323} 1327}
1324 1328
1325/** 1329/**
1330 * Task to signal the server that we can continue
1331 * receiving from the TCP client now.
1332 *
1333 * @param cls the `struct Session*`
1334 * @param tc task context (unused)
1335 */
1336static void
1337delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1338{
1339 struct Session *session = cls;
1340
1341 session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK;
1342 reschedule_session_timeout (session);
1343
1344 GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1345}
1346
1347static void tcp_plugin_update_inbound_delay (void *cls,
1348 const struct GNUNET_PeerIdentity *peer,
1349 struct Session *session,
1350 struct GNUNET_TIME_Relative delay)
1351{
1352 if (GNUNET_SCHEDULER_NO_TASK == session->receive_delay_task)
1353 return;
1354
1355 LOG(GNUNET_ERROR_TYPE_DEBUG,
1356 "New inbound delay %llu us\n",delay.rel_value_us);
1357
1358 GNUNET_SCHEDULER_cancel (session->receive_delay_task);
1359 session->receive_delay_task = GNUNET_SCHEDULER_add_delayed (delay,
1360 &delayed_done, session);
1361}
1362
1363
1364/**
1326 * Create a new session to transmit data to the target 1365 * Create a new session to transmit data to the target
1327 * This session will used to send data to this peer and the plugin will 1366 * This session will used to send data to this peer and the plugin will
1328 * notify us by calling the env->session_end function 1367 * notify us by calling the env->session_end function
@@ -2102,23 +2141,6 @@ handle_tcp_welcome (void *cls, struct GNUNET_SERVER_Client *client,
2102 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2141 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2103} 2142}
2104 2143
2105/**
2106 * Task to signal the server that we can continue
2107 * receiving from the TCP client now.
2108 *
2109 * @param cls the `struct Session*`
2110 * @param tc task context (unused)
2111 */
2112static void
2113delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2114{
2115 struct Session *session = cls;
2116
2117 session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK;
2118 reschedule_session_timeout (session);
2119
2120 GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
2121}
2122 2144
2123/** 2145/**
2124 * We've received data for this peer via TCP. Unbox, 2146 * We've received data for this peer via TCP. Unbox,
@@ -2479,6 +2501,7 @@ else
2479 api->string_to_address = &tcp_string_to_address; 2501 api->string_to_address = &tcp_string_to_address;
2480 api->get_network = &tcp_get_network; 2502 api->get_network = &tcp_get_network;
2481 api->update_session_timeout = &tcp_plugin_update_session_timeout; 2503 api->update_session_timeout = &tcp_plugin_update_session_timeout;
2504 api->update_inbound_delay = &tcp_plugin_update_inbound_delay;
2482 plugin->service = service; 2505 plugin->service = service;
2483 if (NULL != service) 2506 if (NULL != service)
2484 { 2507 {
diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c
index ac7c55599..e892dd2ee 100644
--- a/src/transport/transport_api.c
+++ b/src/transport/transport_api.c
@@ -430,6 +430,8 @@ outbound_bw_tracker_update (void *cls)
430 430
431 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 431 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
432 n->th->notify_size + n->traffic_overhead); 432 n->th->notify_size + n->traffic_overhead);
433 LOG(GNUNET_ERROR_TYPE_DEBUG,
434 "New outbound delay %llu us\n",delay.rel_value_us);
433 GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap, 435 GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
434 n->hn, delay.rel_value_us); 436 n->hn, delay.rel_value_us);
435 schedule_transmission (n->h); 437 schedule_transmission (n->h);