diff options
author | Christian Grothoff <christian@grothoff.org> | 2009-06-10 02:19:45 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2009-06-10 02:19:45 +0000 |
commit | e40233928df9f3970d24f06103844489883c879d (patch) | |
tree | 3dd2cb457300974e0008f40888ff91642154857a /src/core | |
parent | e8dc51d5890c5f9fdd0278295c8aad3d5d9200c1 (diff) | |
download | gnunet-e40233928df9f3970d24f06103844489883c879d.tar.gz gnunet-e40233928df9f3970d24f06103844489883c879d.zip |
bound queue size, clean up code
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/gnunet-service-core.c | 163 |
1 files changed, 129 insertions, 34 deletions
diff --git a/src/core/gnunet-service-core.c b/src/core/gnunet-service-core.c index 8f64bf366..f7f5a5fe7 100644 --- a/src/core/gnunet-service-core.c +++ b/src/core/gnunet-service-core.c | |||
@@ -24,16 +24,9 @@ | |||
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | * | 25 | * |
26 | * TODO: | 26 | * TODO: |
27 | * TESTING: | ||
28 | * - write test for basic core functions: | ||
29 | * + connect to peer | ||
30 | * + transmit (encrypted) message [with handshake] | ||
31 | * + receive (encrypted) message, forward plaintext to clients | ||
32 | * POST-TESTING: | 27 | * POST-TESTING: |
33 | * - revisit API (which arguments are used, needed)? | 28 | * - revisit API (which arguments are used, needed)? |
34 | * - add code to bound queue size when handling client's SEND message | ||
35 | * - add code to bound message queue size when passing messages to clients | 29 | * - add code to bound message queue size when passing messages to clients |
36 | * - add code to discard_expired_messages | ||
37 | * - add code to re-transmit key if first attempt failed | 30 | * - add code to re-transmit key if first attempt failed |
38 | * + timeout on connect / key exchange, etc. | 31 | * + timeout on connect / key exchange, etc. |
39 | * + timeout for automatic re-try, etc. | 32 | * + timeout for automatic re-try, etc. |
@@ -83,6 +76,16 @@ | |||
83 | 76 | ||
84 | 77 | ||
85 | /** | 78 | /** |
79 | * After how much time past the "official" expiration time do | ||
80 | * we discard messages? Should not be zero since we may | ||
81 | * intentionally defer transmission until close to the deadline | ||
82 | * and then may be slightly past the deadline due to inaccuracy | ||
83 | * in sleep and our own CPU consumption. | ||
84 | */ | ||
85 | #define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS | ||
86 | |||
87 | |||
88 | /** | ||
86 | * What is the maximum delay for a SET_KEY message? | 89 | * What is the maximum delay for a SET_KEY message? |
87 | */ | 90 | */ |
88 | #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS | 91 | #define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS |
@@ -125,6 +128,12 @@ | |||
125 | 128 | ||
126 | 129 | ||
127 | /** | 130 | /** |
131 | * How many messages do we queue per peer at most? | ||
132 | */ | ||
133 | #define MAX_PEER_QUEUE_SIZE 16 | ||
134 | |||
135 | |||
136 | /** | ||
128 | * What is the maximum age of a message for us to consider | 137 | * What is the maximum age of a message for us to consider |
129 | * processing it? Note that this looks at the timestamp used | 138 | * processing it? Note that this looks at the timestamp used |
130 | * by the other peer, so clock skew between machines does | 139 | * by the other peer, so clock skew between machines does |
@@ -933,6 +942,7 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
933 | { | 942 | { |
934 | struct Client *pos; | 943 | struct Client *pos; |
935 | struct Client *prev; | 944 | struct Client *prev; |
945 | struct Event *e; | ||
936 | 946 | ||
937 | #if DEBUG_CORE_CLIENT | 947 | #if DEBUG_CORE_CLIENT |
938 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 948 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -950,6 +960,11 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
950 | prev->next = pos->next; | 960 | prev->next = pos->next; |
951 | if (pos->th != NULL) | 961 | if (pos->th != NULL) |
952 | GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th); | 962 | GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th); |
963 | while (NULL != (e = pos->event_head)) | ||
964 | { | ||
965 | pos->event_head = e->next; | ||
966 | GNUNET_free (e); | ||
967 | } | ||
953 | GNUNET_free (pos); | 968 | GNUNET_free (pos); |
954 | return; | 969 | return; |
955 | } | 970 | } |
@@ -1104,6 +1119,8 @@ static void process_plaintext_neighbour_queue (struct Neighbour *n); | |||
1104 | static void | 1119 | static void |
1105 | process_encrypted_neighbour_queue (struct Neighbour *n) | 1120 | process_encrypted_neighbour_queue (struct Neighbour *n) |
1106 | { | 1121 | { |
1122 | struct MessageEntry *m; | ||
1123 | |||
1107 | if (n->th != NULL) | 1124 | if (n->th != NULL) |
1108 | return; /* request already pending */ | 1125 | return; /* request already pending */ |
1109 | if (n->encrypted_head == NULL) | 1126 | if (n->encrypted_head == NULL) |
@@ -1132,7 +1149,13 @@ process_encrypted_neighbour_queue (struct Neighbour *n) | |||
1132 | { | 1149 | { |
1133 | /* message request too large (oops) */ | 1150 | /* message request too large (oops) */ |
1134 | GNUNET_break (0); | 1151 | GNUNET_break (0); |
1135 | /* FIXME: handle error somehow! */ | 1152 | /* discard encrypted message */ |
1153 | GNUNET_assert (NULL != (m = n->encrypted_head)); | ||
1154 | n->encrypted_head = m->next; | ||
1155 | if (m->next == NULL) | ||
1156 | n->encrypted_tail = NULL; | ||
1157 | GNUNET_free (m); | ||
1158 | process_encrypted_neighbour_queue (n); | ||
1136 | } | 1159 | } |
1137 | } | 1160 | } |
1138 | 1161 | ||
@@ -1448,7 +1471,29 @@ batch_message (struct Neighbour *n, | |||
1448 | static void | 1471 | static void |
1449 | discard_expired_messages (struct Neighbour *n) | 1472 | discard_expired_messages (struct Neighbour *n) |
1450 | { | 1473 | { |
1451 | /* FIXME */ | 1474 | struct MessageEntry *prev; |
1475 | struct MessageEntry *next; | ||
1476 | struct MessageEntry *pos; | ||
1477 | struct GNUNET_TIME_Absolute cutoff; | ||
1478 | |||
1479 | cutoff = GNUNET_TIME_relative_to_absolute(PAST_EXPIRATION_DISCARD_TIME); | ||
1480 | prev = NULL; | ||
1481 | pos = n->messages; | ||
1482 | while (pos != NULL) | ||
1483 | { | ||
1484 | next = pos->next; | ||
1485 | if (pos->deadline.value < cutoff.value) | ||
1486 | { | ||
1487 | if (prev == NULL) | ||
1488 | n->messages = next; | ||
1489 | else | ||
1490 | prev->next = next; | ||
1491 | GNUNET_free (pos); | ||
1492 | } | ||
1493 | else | ||
1494 | prev = pos; | ||
1495 | pos = next; | ||
1496 | } | ||
1452 | } | 1497 | } |
1453 | 1498 | ||
1454 | 1499 | ||
@@ -1651,8 +1696,6 @@ send_connect_continuation (void *cls, size_t size, void *buf) | |||
1651 | GNUNET_i2s (&sm->peer)); | 1696 | GNUNET_i2s (&sm->peer)); |
1652 | #endif | 1697 | #endif |
1653 | GNUNET_free (sm); | 1698 | GNUNET_free (sm); |
1654 | /* FIXME: do we need to do something here to let the | ||
1655 | client know about the failure!? */ | ||
1656 | return 0; | 1699 | return 0; |
1657 | } | 1700 | } |
1658 | #if DEBUG_CORE | 1701 | #if DEBUG_CORE |
@@ -1678,9 +1721,13 @@ handle_client_send (void *cls, | |||
1678 | struct SendMessage *smc; | 1721 | struct SendMessage *smc; |
1679 | const struct GNUNET_MessageHeader *mh; | 1722 | const struct GNUNET_MessageHeader *mh; |
1680 | struct Neighbour *n; | 1723 | struct Neighbour *n; |
1681 | struct MessageEntry *pred; | 1724 | struct MessageEntry *prev; |
1682 | struct MessageEntry *pos; | 1725 | struct MessageEntry *pos; |
1683 | struct MessageEntry *e; | 1726 | struct MessageEntry *e; |
1727 | struct MessageEntry *min_prio_entry; | ||
1728 | struct MessageEntry *min_prio_prev; | ||
1729 | unsigned int min_prio; | ||
1730 | unsigned int queue_size; | ||
1684 | uint16_t msize; | 1731 | uint16_t msize; |
1685 | 1732 | ||
1686 | msize = ntohs (message->size); | 1733 | msize = ntohs (message->size); |
@@ -1714,20 +1761,26 @@ handle_client_send (void *cls, | |||
1714 | #endif | 1761 | #endif |
1715 | msize += sizeof (struct SendMessage); | 1762 | msize += sizeof (struct SendMessage); |
1716 | /* ask transport to connect to the peer */ | 1763 | /* ask transport to connect to the peer */ |
1717 | /* FIXME: this code does not handle the | ||
1718 | case where we get multiple SendMessages before | ||
1719 | transport responds to this request; | ||
1720 | => need to track pending requests! */ | ||
1721 | smc = GNUNET_malloc (msize); | 1764 | smc = GNUNET_malloc (msize); |
1722 | memcpy (smc, sm, msize); | 1765 | memcpy (smc, sm, msize); |
1723 | GNUNET_TRANSPORT_notify_transmit_ready (transport, | 1766 | if (NULL == |
1724 | &sm->peer, | 1767 | GNUNET_TRANSPORT_notify_transmit_ready (transport, |
1725 | 0, | 1768 | &sm->peer, |
1726 | GNUNET_TIME_absolute_get_remaining | 1769 | 0, |
1727 | (GNUNET_TIME_absolute_ntoh | 1770 | GNUNET_TIME_absolute_get_remaining |
1728 | (sm->deadline)), | 1771 | (GNUNET_TIME_absolute_ntoh |
1729 | &send_connect_continuation, | 1772 | (sm->deadline)), |
1730 | smc); | 1773 | &send_connect_continuation, |
1774 | smc)) | ||
1775 | { | ||
1776 | /* transport has already a request pending for this peer! */ | ||
1777 | #if DEBUG_CORE | ||
1778 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1779 | "Dropped second message destined for `%4s' since connection is still down.\n", | ||
1780 | GNUNET_i2s(&sm->peer)); | ||
1781 | #endif | ||
1782 | GNUNET_free (smc); | ||
1783 | } | ||
1731 | if (client != NULL) | 1784 | if (client != NULL) |
1732 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1785 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1733 | return; | 1786 | return; |
@@ -1739,7 +1792,50 @@ handle_client_send (void *cls, | |||
1739 | msize, | 1792 | msize, |
1740 | GNUNET_i2s (&sm->peer)); | 1793 | GNUNET_i2s (&sm->peer)); |
1741 | #endif | 1794 | #endif |
1742 | /* FIXME: consider bounding queue size */ | 1795 | /* bound queue size */ |
1796 | discard_expired_messages (n); | ||
1797 | min_prio = (unsigned int) -1; | ||
1798 | queue_size = 0; | ||
1799 | prev = NULL; | ||
1800 | pos = n->messages; | ||
1801 | while (pos != NULL) | ||
1802 | { | ||
1803 | if (pos->priority < min_prio) | ||
1804 | { | ||
1805 | min_prio_entry = pos; | ||
1806 | min_prio_prev = prev; | ||
1807 | min_prio = pos->priority; | ||
1808 | } | ||
1809 | queue_size++; | ||
1810 | prev = pos; | ||
1811 | pos = pos->next; | ||
1812 | } | ||
1813 | if (queue_size >= MAX_PEER_QUEUE_SIZE) | ||
1814 | { | ||
1815 | /* queue full */ | ||
1816 | if (ntohl(sm->priority) <= min_prio) | ||
1817 | { | ||
1818 | /* discard new entry */ | ||
1819 | #if DEBUG_CORE | ||
1820 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1821 | "Queue full, discarding new request\n"); | ||
1822 | #endif | ||
1823 | if (client != NULL) | ||
1824 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
1825 | return; | ||
1826 | } | ||
1827 | /* discard "min_prio_entry" */ | ||
1828 | #if DEBUG_CORE | ||
1829 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1830 | "Queue full, discarding existing older request\n"); | ||
1831 | #endif | ||
1832 | if (min_prio_prev == NULL) | ||
1833 | n->messages = min_prio_entry->next; | ||
1834 | else | ||
1835 | min_prio_prev->next = min_prio_entry->next; | ||
1836 | GNUNET_free (min_prio_entry); | ||
1837 | } | ||
1838 | |||
1743 | e = GNUNET_malloc (sizeof (struct MessageEntry) + msize); | 1839 | e = GNUNET_malloc (sizeof (struct MessageEntry) + msize); |
1744 | e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline); | 1840 | e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline); |
1745 | e->priority = ntohl (sm->priority); | 1841 | e->priority = ntohl (sm->priority); |
@@ -1747,17 +1843,17 @@ handle_client_send (void *cls, | |||
1747 | memcpy (&e[1], mh, msize); | 1843 | memcpy (&e[1], mh, msize); |
1748 | 1844 | ||
1749 | /* insert, keep list sorted by deadline */ | 1845 | /* insert, keep list sorted by deadline */ |
1750 | pred = NULL; | 1846 | prev = NULL; |
1751 | pos = n->messages; | 1847 | pos = n->messages; |
1752 | while ((pos != NULL) && (pos->deadline.value < e->deadline.value)) | 1848 | while ((pos != NULL) && (pos->deadline.value < e->deadline.value)) |
1753 | { | 1849 | { |
1754 | pred = pos; | 1850 | prev = pos; |
1755 | pos = pos->next; | 1851 | pos = pos->next; |
1756 | } | 1852 | } |
1757 | if (pred == NULL) | 1853 | if (prev == NULL) |
1758 | n->messages = e; | 1854 | n->messages = e; |
1759 | else | 1855 | else |
1760 | pred->next = e; | 1856 | prev->next = e; |
1761 | e->next = pos; | 1857 | e->next = pos; |
1762 | 1858 | ||
1763 | /* consider scheduling now */ | 1859 | /* consider scheduling now */ |
@@ -2813,6 +2909,7 @@ static void | |||
2813 | cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 2909 | cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
2814 | { | 2910 | { |
2815 | struct Neighbour *n; | 2911 | struct Neighbour *n; |
2912 | struct Client *c; | ||
2816 | 2913 | ||
2817 | #if DEBUG_CORE | 2914 | #if DEBUG_CORE |
2818 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2915 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -2826,6 +2923,8 @@ cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
2826 | neighbours = n->next; | 2923 | neighbours = n->next; |
2827 | free_neighbour (n); | 2924 | free_neighbour (n); |
2828 | } | 2925 | } |
2926 | while (NULL != (c = clients)) | ||
2927 | handle_client_disconnect (NULL, c->client_handle); | ||
2829 | } | 2928 | } |
2830 | 2929 | ||
2831 | 2930 | ||
@@ -2926,10 +3025,6 @@ cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg) | |||
2926 | 3025 | ||
2927 | if (my_private_key != NULL) | 3026 | if (my_private_key != NULL) |
2928 | GNUNET_CRYPTO_rsa_key_free (my_private_key); | 3027 | GNUNET_CRYPTO_rsa_key_free (my_private_key); |
2929 | /* | ||
2930 | FIXME: | ||
2931 | - free clients | ||
2932 | */ | ||
2933 | } | 3028 | } |
2934 | 3029 | ||
2935 | 3030 | ||