aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2009-06-10 02:19:45 +0000
committerChristian Grothoff <christian@grothoff.org>2009-06-10 02:19:45 +0000
commite40233928df9f3970d24f06103844489883c879d (patch)
tree3dd2cb457300974e0008f40888ff91642154857a /src/core
parente8dc51d5890c5f9fdd0278295c8aad3d5d9200c1 (diff)
downloadgnunet-e40233928df9f3970d24f06103844489883c879d.tar.gz
gnunet-e40233928df9f3970d24f06103844489883c879d.zip
bound queue size, clean up code
Diffstat (limited to 'src/core')
-rw-r--r--src/core/gnunet-service-core.c163
1 files changed, 129 insertions, 34 deletions
diff --git a/src/core/gnunet-service-core.c b/src/core/gnunet-service-core.c
index 8f64bf366..f7f5a5fe7 100644
--- a/src/core/gnunet-service-core.c
+++ b/src/core/gnunet-service-core.c
@@ -24,16 +24,9 @@
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * 25 *
26 * TODO: 26 * TODO:
27 * TESTING:
28 * - write test for basic core functions:
29 * + connect to peer
30 * + transmit (encrypted) message [with handshake]
31 * + receive (encrypted) message, forward plaintext to clients
32 * POST-TESTING: 27 * POST-TESTING:
33 * - revisit API (which arguments are used, needed)? 28 * - revisit API (which arguments are used, needed)?
34 * - add code to bound queue size when handling client's SEND message
35 * - add code to bound message queue size when passing messages to clients 29 * - add code to bound message queue size when passing messages to clients
36 * - add code to discard_expired_messages
37 * - add code to re-transmit key if first attempt failed 30 * - add code to re-transmit key if first attempt failed
38 * + timeout on connect / key exchange, etc. 31 * + timeout on connect / key exchange, etc.
39 * + timeout for automatic re-try, etc. 32 * + timeout for automatic re-try, etc.
@@ -83,6 +76,16 @@
83 76
84 77
85/** 78/**
79 * After how much time past the "official" expiration time do
80 * we discard messages? Should not be zero since we may
81 * intentionally defer transmission until close to the deadline
82 * and then may be slightly past the deadline due to inaccuracy
83 * in sleep and our own CPU consumption.
84 */
85#define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
86
87
88/**
86 * What is the maximum delay for a SET_KEY message? 89 * What is the maximum delay for a SET_KEY message?
87 */ 90 */
88#define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS 91#define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
@@ -125,6 +128,12 @@
125 128
126 129
127/** 130/**
131 * How many messages do we queue per peer at most?
132 */
133#define MAX_PEER_QUEUE_SIZE 16
134
135
136/**
128 * What is the maximum age of a message for us to consider 137 * What is the maximum age of a message for us to consider
129 * processing it? Note that this looks at the timestamp used 138 * processing it? Note that this looks at the timestamp used
130 * by the other peer, so clock skew between machines does 139 * by the other peer, so clock skew between machines does
@@ -933,6 +942,7 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
933{ 942{
934 struct Client *pos; 943 struct Client *pos;
935 struct Client *prev; 944 struct Client *prev;
945 struct Event *e;
936 946
937#if DEBUG_CORE_CLIENT 947#if DEBUG_CORE_CLIENT
938 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 948 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -950,6 +960,11 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
950 prev->next = pos->next; 960 prev->next = pos->next;
951 if (pos->th != NULL) 961 if (pos->th != NULL)
952 GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th); 962 GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th);
963 while (NULL != (e = pos->event_head))
964 {
965 pos->event_head = e->next;
966 GNUNET_free (e);
967 }
953 GNUNET_free (pos); 968 GNUNET_free (pos);
954 return; 969 return;
955 } 970 }
@@ -1104,6 +1119,8 @@ static void process_plaintext_neighbour_queue (struct Neighbour *n);
1104static void 1119static void
1105process_encrypted_neighbour_queue (struct Neighbour *n) 1120process_encrypted_neighbour_queue (struct Neighbour *n)
1106{ 1121{
1122 struct MessageEntry *m;
1123
1107 if (n->th != NULL) 1124 if (n->th != NULL)
1108 return; /* request already pending */ 1125 return; /* request already pending */
1109 if (n->encrypted_head == NULL) 1126 if (n->encrypted_head == NULL)
@@ -1132,7 +1149,13 @@ process_encrypted_neighbour_queue (struct Neighbour *n)
1132 { 1149 {
1133 /* message request too large (oops) */ 1150 /* message request too large (oops) */
1134 GNUNET_break (0); 1151 GNUNET_break (0);
1135 /* FIXME: handle error somehow! */ 1152 /* discard encrypted message */
1153 GNUNET_assert (NULL != (m = n->encrypted_head));
1154 n->encrypted_head = m->next;
1155 if (m->next == NULL)
1156 n->encrypted_tail = NULL;
1157 GNUNET_free (m);
1158 process_encrypted_neighbour_queue (n);
1136 } 1159 }
1137} 1160}
1138 1161
@@ -1448,7 +1471,29 @@ batch_message (struct Neighbour *n,
1448static void 1471static void
1449discard_expired_messages (struct Neighbour *n) 1472discard_expired_messages (struct Neighbour *n)
1450{ 1473{
1451 /* FIXME */ 1474 struct MessageEntry *prev;
1475 struct MessageEntry *next;
1476 struct MessageEntry *pos;
1477 struct GNUNET_TIME_Absolute cutoff;
1478
1479 cutoff = GNUNET_TIME_relative_to_absolute(PAST_EXPIRATION_DISCARD_TIME);
1480 prev = NULL;
1481 pos = n->messages;
1482 while (pos != NULL)
1483 {
1484 next = pos->next;
1485 if (pos->deadline.value < cutoff.value)
1486 {
1487 if (prev == NULL)
1488 n->messages = next;
1489 else
1490 prev->next = next;
1491 GNUNET_free (pos);
1492 }
1493 else
1494 prev = pos;
1495 pos = next;
1496 }
1452} 1497}
1453 1498
1454 1499
@@ -1651,8 +1696,6 @@ send_connect_continuation (void *cls, size_t size, void *buf)
1651 GNUNET_i2s (&sm->peer)); 1696 GNUNET_i2s (&sm->peer));
1652#endif 1697#endif
1653 GNUNET_free (sm); 1698 GNUNET_free (sm);
1654 /* FIXME: do we need to do something here to let the
1655 client know about the failure!? */
1656 return 0; 1699 return 0;
1657 } 1700 }
1658#if DEBUG_CORE 1701#if DEBUG_CORE
@@ -1678,9 +1721,13 @@ handle_client_send (void *cls,
1678 struct SendMessage *smc; 1721 struct SendMessage *smc;
1679 const struct GNUNET_MessageHeader *mh; 1722 const struct GNUNET_MessageHeader *mh;
1680 struct Neighbour *n; 1723 struct Neighbour *n;
1681 struct MessageEntry *pred; 1724 struct MessageEntry *prev;
1682 struct MessageEntry *pos; 1725 struct MessageEntry *pos;
1683 struct MessageEntry *e; 1726 struct MessageEntry *e;
1727 struct MessageEntry *min_prio_entry;
1728 struct MessageEntry *min_prio_prev;
1729 unsigned int min_prio;
1730 unsigned int queue_size;
1684 uint16_t msize; 1731 uint16_t msize;
1685 1732
1686 msize = ntohs (message->size); 1733 msize = ntohs (message->size);
@@ -1714,20 +1761,26 @@ handle_client_send (void *cls,
1714#endif 1761#endif
1715 msize += sizeof (struct SendMessage); 1762 msize += sizeof (struct SendMessage);
1716 /* ask transport to connect to the peer */ 1763 /* ask transport to connect to the peer */
1717 /* FIXME: this code does not handle the
1718 case where we get multiple SendMessages before
1719 transport responds to this request;
1720 => need to track pending requests! */
1721 smc = GNUNET_malloc (msize); 1764 smc = GNUNET_malloc (msize);
1722 memcpy (smc, sm, msize); 1765 memcpy (smc, sm, msize);
1723 GNUNET_TRANSPORT_notify_transmit_ready (transport, 1766 if (NULL ==
1724 &sm->peer, 1767 GNUNET_TRANSPORT_notify_transmit_ready (transport,
1725 0, 1768 &sm->peer,
1726 GNUNET_TIME_absolute_get_remaining 1769 0,
1727 (GNUNET_TIME_absolute_ntoh 1770 GNUNET_TIME_absolute_get_remaining
1728 (sm->deadline)), 1771 (GNUNET_TIME_absolute_ntoh
1729 &send_connect_continuation, 1772 (sm->deadline)),
1730 smc); 1773 &send_connect_continuation,
1774 smc))
1775 {
1776 /* transport has already a request pending for this peer! */
1777#if DEBUG_CORE
1778 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1779 "Dropped second message destined for `%4s' since connection is still down.\n",
1780 GNUNET_i2s(&sm->peer));
1781#endif
1782 GNUNET_free (smc);
1783 }
1731 if (client != NULL) 1784 if (client != NULL)
1732 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1785 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1733 return; 1786 return;
@@ -1739,7 +1792,50 @@ handle_client_send (void *cls,
1739 msize, 1792 msize,
1740 GNUNET_i2s (&sm->peer)); 1793 GNUNET_i2s (&sm->peer));
1741#endif 1794#endif
1742 /* FIXME: consider bounding queue size */ 1795 /* bound queue size */
1796 discard_expired_messages (n);
1797 min_prio = (unsigned int) -1;
1798 queue_size = 0;
1799 prev = NULL;
1800 pos = n->messages;
1801 while (pos != NULL)
1802 {
1803 if (pos->priority < min_prio)
1804 {
1805 min_prio_entry = pos;
1806 min_prio_prev = prev;
1807 min_prio = pos->priority;
1808 }
1809 queue_size++;
1810 prev = pos;
1811 pos = pos->next;
1812 }
1813 if (queue_size >= MAX_PEER_QUEUE_SIZE)
1814 {
1815 /* queue full */
1816 if (ntohl(sm->priority) <= min_prio)
1817 {
1818 /* discard new entry */
1819#if DEBUG_CORE
1820 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1821 "Queue full, discarding new request\n");
1822#endif
1823 if (client != NULL)
1824 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1825 return;
1826 }
1827 /* discard "min_prio_entry" */
1828#if DEBUG_CORE
1829 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1830 "Queue full, discarding existing older request\n");
1831#endif
1832 if (min_prio_prev == NULL)
1833 n->messages = min_prio_entry->next;
1834 else
1835 min_prio_prev->next = min_prio_entry->next;
1836 GNUNET_free (min_prio_entry);
1837 }
1838
1743 e = GNUNET_malloc (sizeof (struct MessageEntry) + msize); 1839 e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
1744 e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline); 1840 e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
1745 e->priority = ntohl (sm->priority); 1841 e->priority = ntohl (sm->priority);
@@ -1747,17 +1843,17 @@ handle_client_send (void *cls,
1747 memcpy (&e[1], mh, msize); 1843 memcpy (&e[1], mh, msize);
1748 1844
1749 /* insert, keep list sorted by deadline */ 1845 /* insert, keep list sorted by deadline */
1750 pred = NULL; 1846 prev = NULL;
1751 pos = n->messages; 1847 pos = n->messages;
1752 while ((pos != NULL) && (pos->deadline.value < e->deadline.value)) 1848 while ((pos != NULL) && (pos->deadline.value < e->deadline.value))
1753 { 1849 {
1754 pred = pos; 1850 prev = pos;
1755 pos = pos->next; 1851 pos = pos->next;
1756 } 1852 }
1757 if (pred == NULL) 1853 if (prev == NULL)
1758 n->messages = e; 1854 n->messages = e;
1759 else 1855 else
1760 pred->next = e; 1856 prev->next = e;
1761 e->next = pos; 1857 e->next = pos;
1762 1858
1763 /* consider scheduling now */ 1859 /* consider scheduling now */
@@ -2813,6 +2909,7 @@ static void
2813cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 2909cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2814{ 2910{
2815 struct Neighbour *n; 2911 struct Neighbour *n;
2912 struct Client *c;
2816 2913
2817#if DEBUG_CORE 2914#if DEBUG_CORE
2818 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2915 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2826,6 +2923,8 @@ cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2826 neighbours = n->next; 2923 neighbours = n->next;
2827 free_neighbour (n); 2924 free_neighbour (n);
2828 } 2925 }
2926 while (NULL != (c = clients))
2927 handle_client_disconnect (NULL, c->client_handle);
2829} 2928}
2830 2929
2831 2930
@@ -2926,10 +3025,6 @@ cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
2926 3025
2927 if (my_private_key != NULL) 3026 if (my_private_key != NULL)
2928 GNUNET_CRYPTO_rsa_key_free (my_private_key); 3027 GNUNET_CRYPTO_rsa_key_free (my_private_key);
2929 /*
2930 FIXME:
2931 - free clients
2932 */
2933} 3028}
2934 3029
2935 3030