diff options
-rw-r--r-- | src/include/gnunet_applications.h | 6 | ||||
-rw-r--r-- | src/include/gnunet_protocols.h | 33 | ||||
-rw-r--r-- | src/include/gnunet_set_service.h | 21 | ||||
-rw-r--r-- | src/set/Makefile.am | 9 | ||||
-rw-r--r-- | src/set/gnunet-service-set.c | 506 | ||||
-rw-r--r-- | src/set/gnunet-set.c | 30 | ||||
-rw-r--r-- | src/set/mq.c | 584 | ||||
-rw-r--r-- | src/set/mq.h | 202 | ||||
-rw-r--r-- | src/set/set.conf.in | 11 | ||||
-rw-r--r-- | src/set/set.h | 33 | ||||
-rw-r--r-- | src/set/set_api.c | 23 | ||||
-rw-r--r-- | src/set/strata_estimator.c | 8 | ||||
-rw-r--r-- | src/set/strata_estimator.h | 2 | ||||
-rw-r--r-- | src/set/test_set_api.c | 4 |
14 files changed, 1402 insertions, 70 deletions
diff --git a/src/include/gnunet_applications.h b/src/include/gnunet_applications.h index 5710a8838..4e86e332e 100644 --- a/src/include/gnunet_applications.h +++ b/src/include/gnunet_applications.h | |||
@@ -77,6 +77,12 @@ extern "C" | |||
77 | #define GNUNET_APPLICATION_TYPE_CONSENSUS 18 | 77 | #define GNUNET_APPLICATION_TYPE_CONSENSUS 18 |
78 | 78 | ||
79 | 79 | ||
80 | /** | ||
81 | * Set. Used for two-peer set operations implemented using stream. | ||
82 | */ | ||
83 | #define GNUNET_APPLICATION_TYPE_SET 19 | ||
84 | |||
85 | |||
80 | #if 0 /* keep Emacsens' auto-indent happy */ | 86 | #if 0 /* keep Emacsens' auto-indent happy */ |
81 | { | 87 | { |
82 | #endif | 88 | #endif |
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index ae28da7b1..33e97b80c 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -1807,10 +1807,41 @@ extern "C" | |||
1807 | #define GNUNET_MESSAGE_TYPE_SET_REQUEST 578 | 1807 | #define GNUNET_MESSAGE_TYPE_SET_REQUEST 578 |
1808 | 1808 | ||
1809 | /** | 1809 | /** |
1810 | * Evaluate a set operation | 1810 | * Evaluate a set operation. |
1811 | */ | 1811 | */ |
1812 | #define GNUNET_MESSAGE_TYPE_SET_CREATE 579 | 1812 | #define GNUNET_MESSAGE_TYPE_SET_CREATE 579 |
1813 | 1813 | ||
1814 | /** | ||
1815 | * Evaluate a set operation. | ||
1816 | */ | ||
1817 | #define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 580 | ||
1818 | |||
1819 | /** | ||
1820 | * Strata estimator. | ||
1821 | */ | ||
1822 | #define GNUNET_MESSAGE_TYPE_SET_P2P_SE 581 | ||
1823 | |||
1824 | /** | ||
1825 | * Invertible bloom filter. | ||
1826 | */ | ||
1827 | #define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 582 | ||
1828 | |||
1829 | /** | ||
1830 | * Actual set elements. | ||
1831 | */ | ||
1832 | #define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 583 | ||
1833 | |||
1834 | /** | ||
1835 | * Requests for the elements with the given hashes. | ||
1836 | */ | ||
1837 | #define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 584 | ||
1838 | |||
1839 | /** | ||
1840 | * Operation is done. | ||
1841 | */ | ||
1842 | #define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 585 | ||
1843 | |||
1844 | |||
1814 | 1845 | ||
1815 | /******************************************************************************* | 1846 | /******************************************************************************* |
1816 | * TESTBED LOGGER message types | 1847 | * TESTBED LOGGER message types |
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h index cf782c841..87d766c68 100644 --- a/src/include/gnunet_set_service.h +++ b/src/include/gnunet_set_service.h | |||
@@ -63,11 +63,6 @@ struct GNUNET_SET_OperationHandle; | |||
63 | 63 | ||
64 | 64 | ||
65 | /** | 65 | /** |
66 | * Opaque handle to a listen operation. | ||
67 | */ | ||
68 | struct GNUNET_SET_ListenHandle; | ||
69 | |||
70 | /** | ||
71 | * The operation that a set set supports. | 66 | * The operation that a set set supports. |
72 | */ | 67 | */ |
73 | enum GNUNET_SET_OperationType | 68 | enum GNUNET_SET_OperationType |
@@ -135,10 +130,12 @@ struct GNUNET_SET_Element | |||
135 | * Number of bytes in the buffer pointed to by data. | 130 | * Number of bytes in the buffer pointed to by data. |
136 | */ | 131 | */ |
137 | uint16_t size; | 132 | uint16_t size; |
133 | |||
138 | /** | 134 | /** |
139 | * Application-specific element type. | 135 | * Application-specific element type. |
140 | */ | 136 | */ |
141 | uint16_t type; | 137 | uint16_t type; |
138 | |||
142 | /** | 139 | /** |
143 | * Actual data of the element | 140 | * Actual data of the element |
144 | */ | 141 | */ |
@@ -153,6 +150,7 @@ struct GNUNET_SET_Element | |||
153 | */ | 150 | */ |
154 | typedef void (*GNUNET_SET_Continuation) (void *cls); | 151 | typedef void (*GNUNET_SET_Continuation) (void *cls); |
155 | 152 | ||
153 | |||
156 | /** | 154 | /** |
157 | * Callback for set operation results. Called for each element | 155 | * Callback for set operation results. Called for each element |
158 | * in the result set. | 156 | * in the result set. |
@@ -161,10 +159,9 @@ typedef void (*GNUNET_SET_Continuation) (void *cls); | |||
161 | * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK | 159 | * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK |
162 | * @param status see enum GNUNET_SET_Status | 160 | * @param status see enum GNUNET_SET_Status |
163 | */ | 161 | */ |
164 | typedef void | 162 | typedef void (*GNUNET_SET_ResultIterator) (void *cls, |
165 | (*GNUNET_SET_ResultIterator) (void *cls, | 163 | struct GNUNET_SET_Element *element, |
166 | struct GNUNET_SET_Element *element, | 164 | enum GNUNET_SET_Status status); |
167 | enum GNUNET_SET_Status status); | ||
168 | 165 | ||
169 | 166 | ||
170 | /** | 167 | /** |
@@ -201,7 +198,7 @@ typedef void | |||
201 | * @return a handle to the set | 198 | * @return a handle to the set |
202 | */ | 199 | */ |
203 | struct GNUNET_SET_Handle * | 200 | struct GNUNET_SET_Handle * |
204 | GNUNET_SET_create (struct GNUNET_CONFIGURATION_Handle *cfg, | 201 | GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, |
205 | enum GNUNET_SET_OperationType op); | 202 | enum GNUNET_SET_OperationType op); |
206 | 203 | ||
207 | 204 | ||
@@ -270,8 +267,6 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, | |||
270 | void *result_cls); | 267 | void *result_cls); |
271 | 268 | ||
272 | 269 | ||
273 | |||
274 | |||
275 | /** | 270 | /** |
276 | * Wait for set operation requests for the given application id | 271 | * Wait for set operation requests for the given application id |
277 | * | 272 | * |
@@ -285,7 +280,7 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, | |||
285 | * @return a handle that can be used to cancel the listen operation | 280 | * @return a handle that can be used to cancel the listen operation |
286 | */ | 281 | */ |
287 | struct GNUNET_SET_ListenHandle * | 282 | struct GNUNET_SET_ListenHandle * |
288 | GNUNET_SET_listen (struct GNUNET_CONFIGURATION_Handle *cfg, | 283 | GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, |
289 | enum GNUNET_SET_OperationType op_type, | 284 | enum GNUNET_SET_OperationType op_type, |
290 | const struct GNUNET_HashCode *app_id, | 285 | const struct GNUNET_HashCode *app_id, |
291 | GNUNET_SET_ListenCallback listen_cb, | 286 | GNUNET_SET_ListenCallback listen_cb, |
diff --git a/src/set/Makefile.am b/src/set/Makefile.am index 06f418465..c1639823e 100644 --- a/src/set/Makefile.am +++ b/src/set/Makefile.am | |||
@@ -29,6 +29,7 @@ gnunet_set_SOURCES = \ | |||
29 | gnunet_set_LDADD = \ | 29 | gnunet_set_LDADD = \ |
30 | $(top_builddir)/src/util/libgnunetutil.la \ | 30 | $(top_builddir)/src/util/libgnunetutil.la \ |
31 | $(top_builddir)/src/set/libgnunetset.la \ | 31 | $(top_builddir)/src/set/libgnunetset.la \ |
32 | $(top_builddir)/src/stream/libgnunetstream.la \ | ||
32 | $(top_builddir)/src/testbed/libgnunettestbed.la \ | 33 | $(top_builddir)/src/testbed/libgnunettestbed.la \ |
33 | $(GN_LIBINTL) | 34 | $(GN_LIBINTL) |
34 | gnunet_set_DEPENDENCIES = \ | 35 | gnunet_set_DEPENDENCIES = \ |
@@ -36,6 +37,8 @@ gnunet_set_DEPENDENCIES = \ | |||
36 | 37 | ||
37 | gnunet_service_set_SOURCES = \ | 38 | gnunet_service_set_SOURCES = \ |
38 | gnunet-service-set.c \ | 39 | gnunet-service-set.c \ |
40 | gnunet-service-set_union.c \ | ||
41 | mq.c \ | ||
39 | ibf.c \ | 42 | ibf.c \ |
40 | strata_estimator.c | 43 | strata_estimator.c |
41 | gnunet_service_set_LDADD = \ | 44 | gnunet_service_set_LDADD = \ |
@@ -44,12 +47,16 @@ gnunet_service_set_LDADD = \ | |||
44 | $(top_builddir)/src/stream/libgnunetstream.la \ | 47 | $(top_builddir)/src/stream/libgnunetstream.la \ |
45 | $(top_builddir)/src/mesh/libgnunetmesh.la \ | 48 | $(top_builddir)/src/mesh/libgnunetmesh.la \ |
46 | $(GN_LIBINTL) | 49 | $(GN_LIBINTL) |
50 | # hack for mq.c, see automake Objects ‘created with both libtool and without’ | ||
51 | # remove once GNUNET_MQ is in util/ | ||
52 | gnunet_service_set_CFLAGS = $(AM_CFLAGS) | ||
47 | 53 | ||
48 | libgnunetset_la_SOURCES = \ | 54 | libgnunetset_la_SOURCES = \ |
49 | set_api.c \ | 55 | set_api.c \ |
50 | mq.c | 56 | mq.c |
51 | libgnunetset_la_LIBADD = \ | 57 | libgnunetset_la_LIBADD = \ |
52 | $(top_builddir)/src/util/libgnunetutil.la \ | 58 | $(top_builddir)/src/util/libgnunetutil.la \ |
59 | $(top_builddir)/src/stream/libgnunetstream.la \ | ||
53 | $(LTLIBINTL) | 60 | $(LTLIBINTL) |
54 | libgnunetset_la_LDFLAGS = \ | 61 | libgnunetset_la_LDFLAGS = \ |
55 | $(GN_LIB_LDFLAGS) | 62 | $(GN_LIB_LDFLAGS) |
@@ -67,6 +74,8 @@ test_set_api_LDADD = \ | |||
67 | $(top_builddir)/src/util/libgnunetutil.la \ | 74 | $(top_builddir)/src/util/libgnunetutil.la \ |
68 | $(top_builddir)/src/testing/libgnunettesting.la \ | 75 | $(top_builddir)/src/testing/libgnunettesting.la \ |
69 | $(top_builddir)/src/set/libgnunetset.la | 76 | $(top_builddir)/src/set/libgnunetset.la |
77 | test_set_api_DEPENDENCIES = \ | ||
78 | libgnunetset.la | ||
70 | 79 | ||
71 | EXTRA_DIST = \ | 80 | EXTRA_DIST = \ |
72 | test_set.conf | 81 | test_set.conf |
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 69296fe79..6663e3fae 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -24,31 +24,194 @@ | |||
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | */ | 25 | */ |
26 | 26 | ||
27 | #include "platform.h" | ||
28 | #include "gnunet_common.h" | ||
29 | #include "gnunet_protocols.h" | ||
30 | #include "gnunet_applications.h" | ||
31 | #include "gnunet_util_lib.h" | ||
32 | #include "gnunet_core_service.h" | ||
33 | #include "gnunet_stream_lib.h" | ||
34 | 27 | ||
35 | struct Set | 28 | #include "gnunet-service-set.h" |
36 | { | 29 | #include "set_protocol.h" |
37 | 30 | ||
38 | }; | ||
39 | 31 | ||
40 | struct Listener | 32 | /** |
41 | { | 33 | * Configuration of our local peer. |
34 | */ | ||
35 | const struct GNUNET_CONFIGURATION_Handle *configuration; | ||
36 | |||
37 | /** | ||
38 | * Socket listening for other peers via stream. | ||
39 | */ | ||
40 | static struct GNUNET_STREAM_ListenSocket *stream_listen_socket; | ||
42 | 41 | ||
43 | }; | 42 | /** |
43 | * Sets are held in a doubly linked list. | ||
44 | */ | ||
45 | static struct Set *sets_head; | ||
44 | 46 | ||
45 | /* | 47 | /** |
46 | static struct Listener *sets_head; | 48 | * Sets are held in a doubly linked list. |
47 | static struct Listener *sets_tail; | 49 | */ |
50 | static struct Set *sets_tail; | ||
48 | 51 | ||
52 | /** | ||
53 | * Listeners are held in a doubly linked list. | ||
54 | */ | ||
49 | static struct Listener *listeners_head; | 55 | static struct Listener *listeners_head; |
56 | |||
57 | /** | ||
58 | * Listeners are held in a doubly linked list. | ||
59 | */ | ||
50 | static struct Listener *listeners_tail; | 60 | static struct Listener *listeners_tail; |
51 | */ | 61 | |
62 | /** | ||
63 | * Incoming sockets from remote peers are | ||
64 | * held in a doubly linked list. | ||
65 | */ | ||
66 | static struct Incoming *incoming_head; | ||
67 | |||
68 | /** | ||
69 | * Incoming sockets from remote peers are | ||
70 | * held in a doubly linked list. | ||
71 | */ | ||
72 | static struct Incoming *incoming_tail; | ||
73 | |||
74 | /** | ||
75 | * Counter for allocating unique request IDs for clients. | ||
76 | */ | ||
77 | static uint32_t request_id = 1; | ||
78 | |||
79 | |||
80 | /** | ||
81 | * Disconnect a client and free all resources | ||
82 | * that the client allocated (e.g. Sets or Listeners) | ||
83 | * | ||
84 | * @param client the client to disconnect | ||
85 | */ | ||
86 | void | ||
87 | client_disconnect (struct GNUNET_SERVER_Client *client) | ||
88 | { | ||
89 | /* FIXME: clean up any data structures belonging to the client */ | ||
90 | GNUNET_SERVER_client_disconnect (client); | ||
91 | } | ||
92 | |||
93 | |||
94 | /** | ||
95 | * Get set that is owned by the client, if any. | ||
96 | * | ||
97 | * @param client client to look for | ||
98 | * @return set that the client owns, NULL if the client | ||
99 | * does not own a set | ||
100 | */ | ||
101 | static struct Set * | ||
102 | get_set (struct GNUNET_SERVER_Client *client) | ||
103 | { | ||
104 | struct Set *set; | ||
105 | for (set = sets_head; NULL != set; set = set->next) | ||
106 | if (set->client == client) | ||
107 | return set; | ||
108 | return NULL; | ||
109 | } | ||
110 | |||
111 | |||
112 | /** | ||
113 | * Get the listener associated to a client, if any. | ||
114 | * | ||
115 | * @param client the client | ||
116 | * @return listener associated with the client, NULL | ||
117 | * if there isn't any | ||
118 | */ | ||
119 | static struct Listener * | ||
120 | get_listener (struct GNUNET_SERVER_Client *client) | ||
121 | { | ||
122 | struct Listener *listener; | ||
123 | for (listener = listeners_head; NULL != listener; listener = listener->next) | ||
124 | if (listener->client == client) | ||
125 | return listener; | ||
126 | return NULL; | ||
127 | } | ||
128 | |||
129 | /** | ||
130 | * Get the incoming socket associated with the given id | ||
131 | * | ||
132 | * @param id id to look for | ||
133 | * @return the incoming socket associated with the id, | ||
134 | * or NULL if there is none | ||
135 | */ | ||
136 | static struct Incoming * | ||
137 | get_incoming (uint32_t id) | ||
138 | { | ||
139 | struct Incoming *incoming; | ||
140 | for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) | ||
141 | if (incoming->request_id == id) | ||
142 | return incoming; | ||
143 | return NULL; | ||
144 | } | ||
145 | |||
146 | static void | ||
147 | destroy_incoming (struct Incoming *incoming) | ||
148 | { | ||
149 | if (NULL != incoming->mq) | ||
150 | { | ||
151 | GNUNET_MQ_destroy (incoming->mq); | ||
152 | incoming->mq = NULL; | ||
153 | } | ||
154 | if (NULL != incoming->socket) | ||
155 | { | ||
156 | GNUNET_STREAM_close (incoming->socket); | ||
157 | incoming->socket = NULL; | ||
158 | } | ||
159 | GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); | ||
160 | GNUNET_free (incoming); | ||
161 | } | ||
162 | |||
163 | |||
164 | /** | ||
165 | * Handle a request for a set operation for | ||
166 | * another peer. | ||
167 | * | ||
168 | * @param cls the incoming socket | ||
169 | * @param mh the message | ||
170 | */ | ||
171 | static void | ||
172 | handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) | ||
173 | { | ||
174 | struct Incoming *incoming = cls; | ||
175 | const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; | ||
176 | struct GNUNET_MQ_Message *mqm; | ||
177 | struct RequestMessage *cmsg; | ||
178 | struct Listener *listener; | ||
179 | const struct GNUNET_MessageHeader *context_msg; | ||
180 | |||
181 | if (ntohs (mh->size) < sizeof *msg) | ||
182 | { | ||
183 | GNUNET_break (0); | ||
184 | destroy_incoming (incoming); | ||
185 | return; | ||
186 | } | ||
187 | else if (ntohs (mh->size) == sizeof *msg) | ||
188 | { | ||
189 | context_msg = NULL; | ||
190 | } | ||
191 | else | ||
192 | { | ||
193 | context_msg = &msg[1].header; | ||
194 | if ((ntohs (context_msg->size) + sizeof *msg) != ntohs (msg->header.size)) | ||
195 | { | ||
196 | /* size of context message is invalid */ | ||
197 | GNUNET_break (0); | ||
198 | destroy_incoming (incoming); | ||
199 | return; | ||
200 | } | ||
201 | } | ||
202 | |||
203 | for (listener = listeners_head; listener != NULL; listener = listener->next) | ||
204 | { | ||
205 | if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) || | ||
206 | (htons (msg->operation) != listener->operation) ) | ||
207 | continue; | ||
208 | mqm = GNUNET_MQ_msg_concat (cmsg, context_msg, GNUNET_MESSAGE_TYPE_SET_REQUEST); | ||
209 | incoming->request_id = request_id++; | ||
210 | cmsg->request_id = htonl (incoming->request_id); | ||
211 | GNUNET_MQ_send (listener->client_mq, mqm); | ||
212 | return; | ||
213 | } | ||
214 | } | ||
52 | 215 | ||
53 | 216 | ||
54 | /** | 217 | /** |
@@ -63,7 +226,303 @@ handle_client_create (void *cls, | |||
63 | struct GNUNET_SERVER_Client *client, | 226 | struct GNUNET_SERVER_Client *client, |
64 | const struct GNUNET_MessageHeader *m) | 227 | const struct GNUNET_MessageHeader *m) |
65 | { | 228 | { |
229 | struct SetCreateMessage *msg = (struct SetCreateMessage *) m; | ||
230 | struct Set *set; | ||
231 | |||
232 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new set created\n"); | ||
233 | |||
234 | if (NULL != get_set (client)) | ||
235 | { | ||
236 | GNUNET_break (0); | ||
237 | GNUNET_SERVER_client_disconnect (client); | ||
238 | return; | ||
239 | } | ||
240 | |||
241 | set = GNUNET_new (struct Set); | ||
242 | |||
243 | switch (ntohs (msg->operation)) | ||
244 | { | ||
245 | case GNUNET_SET_OPERATION_INTERSECTION: | ||
246 | /* FIXME: cfuchs */ | ||
247 | GNUNET_assert (0); | ||
248 | break; | ||
249 | case GNUNET_SET_OPERATION_UNION: | ||
250 | set = union_set_create (); | ||
251 | break; | ||
252 | default: | ||
253 | GNUNET_free (set); | ||
254 | GNUNET_break (0); | ||
255 | GNUNET_SERVER_client_disconnect (client); | ||
256 | return; | ||
257 | } | ||
258 | |||
259 | set->client = client; | ||
260 | set->client_mq = GNUNET_MQ_queue_for_server_client (client); | ||
261 | GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set); | ||
262 | |||
263 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
264 | } | ||
265 | |||
266 | |||
267 | /** | ||
268 | * Called when a client wants to create a new set. | ||
269 | * | ||
270 | * @param cls unused | ||
271 | * @param client client that sent the message | ||
272 | * @param m message sent by the client | ||
273 | */ | ||
274 | static void | ||
275 | handle_client_listen (void *cls, | ||
276 | struct GNUNET_SERVER_Client *client, | ||
277 | const struct GNUNET_MessageHeader *m) | ||
278 | { | ||
279 | struct ListenMessage *msg = (struct ListenMessage *) m; | ||
280 | struct Listener *listener; | ||
281 | |||
282 | if (NULL != get_listener (client)) | ||
283 | { | ||
284 | GNUNET_break (0); | ||
285 | GNUNET_SERVER_client_disconnect (client); | ||
286 | return; | ||
287 | } | ||
66 | 288 | ||
289 | listener = GNUNET_new (struct Listener); | ||
290 | listener->app_id = msg->app_id; | ||
291 | listener->operation = msg->operation; | ||
292 | GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); | ||
293 | } | ||
294 | |||
295 | |||
296 | /** | ||
297 | * Called when a client wants to create a new set. | ||
298 | * | ||
299 | * @param cls unused | ||
300 | * @param client client that sent the message | ||
301 | * @param m message sent by the client | ||
302 | */ | ||
303 | static void | ||
304 | handle_client_add (void *cls, | ||
305 | struct GNUNET_SERVER_Client *client, | ||
306 | const struct GNUNET_MessageHeader *m) | ||
307 | { | ||
308 | struct Set *set; | ||
309 | |||
310 | set = get_set (client); | ||
311 | if (NULL == set) | ||
312 | { | ||
313 | GNUNET_break (0); | ||
314 | client_disconnect (client); | ||
315 | return; | ||
316 | } | ||
317 | switch (set->operation) | ||
318 | { | ||
319 | case GNUNET_SET_OPERATION_UNION: | ||
320 | union_add (set, (struct ElementMessage *) m); | ||
321 | break; | ||
322 | case GNUNET_SET_OPERATION_INTERSECTION: | ||
323 | /* FIXME: cfuchs */ | ||
324 | break; | ||
325 | default: | ||
326 | GNUNET_assert (0); | ||
327 | break; | ||
328 | } | ||
329 | } | ||
330 | |||
331 | |||
332 | /** | ||
333 | * Called when a client wants to evaluate a set operation with another peer. | ||
334 | * | ||
335 | * @param cls unused | ||
336 | * @param client client that sent the message | ||
337 | * @param m message sent by the client | ||
338 | */ | ||
339 | static void | ||
340 | handle_client_evaluate (void *cls, | ||
341 | struct GNUNET_SERVER_Client *client, | ||
342 | const struct GNUNET_MessageHeader *m) | ||
343 | { | ||
344 | struct Set *set; | ||
345 | struct EvaluateMessage *msg = (struct EvaluateMessage *) m; | ||
346 | struct EvaluateOperation *eo; | ||
347 | |||
348 | set = get_set (client); | ||
349 | |||
350 | if (NULL == set) | ||
351 | { | ||
352 | GNUNET_break (0); | ||
353 | client_disconnect (client); | ||
354 | return; | ||
355 | } | ||
356 | |||
357 | eo = GNUNET_new (struct EvaluateOperation); | ||
358 | eo->peer = msg->peer; | ||
359 | eo->app_id = msg->app_id; | ||
360 | eo->request_id = msg->request_id; | ||
361 | eo->context_msg = GNUNET_copy_message (&msg[1].header); | ||
362 | eo->set = set; | ||
363 | |||
364 | switch (set->operation) | ||
365 | { | ||
366 | case GNUNET_SET_OPERATION_INTERSECTION: | ||
367 | /* FIXME: cfuchs */ | ||
368 | break; | ||
369 | case GNUNET_SET_OPERATION_UNION: | ||
370 | union_evaluate (eo); | ||
371 | break; | ||
372 | default: | ||
373 | GNUNET_assert (0); | ||
374 | break; | ||
375 | } | ||
376 | } | ||
377 | |||
378 | |||
379 | /** | ||
380 | * Handle a cancel request from a client. | ||
381 | * | ||
382 | * @param cls unused | ||
383 | * @param client the client | ||
384 | * @param m the cancel message | ||
385 | */ | ||
386 | static void | ||
387 | handle_client_cancel (void *cls, | ||
388 | struct GNUNET_SERVER_Client *client, | ||
389 | const struct GNUNET_MessageHeader *m) | ||
390 | { | ||
391 | /* FIXME: implement */ | ||
392 | } | ||
393 | |||
394 | |||
395 | /** | ||
396 | * Handle an ack from a client. | ||
397 | * | ||
398 | * @param cls unused | ||
399 | * @param client the client | ||
400 | * @param m the message | ||
401 | */ | ||
402 | static void | ||
403 | handle_client_ack (void *cls, | ||
404 | struct GNUNET_SERVER_Client *client, | ||
405 | const struct GNUNET_MessageHeader *m) | ||
406 | { | ||
407 | /* FIXME: implement */ | ||
408 | } | ||
409 | |||
410 | |||
411 | /** | ||
412 | * Handle a request from the client to accept | ||
413 | * a set operation. | ||
414 | * | ||
415 | * @param cls unused | ||
416 | * @param client the client | ||
417 | * @param m the message | ||
418 | */ | ||
419 | static void | ||
420 | handle_client_accept (void *cls, | ||
421 | struct GNUNET_SERVER_Client *client, | ||
422 | const struct GNUNET_MessageHeader *m) | ||
423 | { | ||
424 | struct AcceptMessage *msg = (struct AcceptMessage *) m; | ||
425 | struct Set *set; | ||
426 | struct Incoming *incoming; | ||
427 | struct EvaluateOperation *eo; | ||
428 | |||
429 | set = get_set (client); | ||
430 | |||
431 | if (NULL == set) | ||
432 | { | ||
433 | GNUNET_break (0); | ||
434 | client_disconnect (client); | ||
435 | return; | ||
436 | } | ||
437 | |||
438 | incoming = get_incoming (ntohl (msg->request_id)); | ||
439 | |||
440 | if ( (NULL == incoming) || | ||
441 | (incoming->operation != set->operation) ) | ||
442 | { | ||
443 | GNUNET_break (0); | ||
444 | client_disconnect (client); | ||
445 | return; | ||
446 | } | ||
447 | |||
448 | eo = GNUNET_new (struct EvaluateOperation); | ||
449 | eo->peer = incoming->peer; | ||
450 | eo->app_id = incoming->app_id; | ||
451 | eo->request_id = msg->request_id; | ||
452 | eo->set = set; | ||
453 | |||
454 | switch (set->operation) | ||
455 | { | ||
456 | case GNUNET_SET_OPERATION_INTERSECTION: | ||
457 | /* FIXME: cfuchs*/ | ||
458 | GNUNET_assert (0); | ||
459 | break; | ||
460 | case GNUNET_SET_OPERATION_UNION: | ||
461 | union_accept (eo, incoming); | ||
462 | break; | ||
463 | default: | ||
464 | GNUNET_assert (0); | ||
465 | break; | ||
466 | } | ||
467 | } | ||
468 | |||
469 | |||
470 | /** | ||
471 | * Functions of this type are called upon new stream connection from other peers | ||
472 | * or upon binding error which happen when the app_port given in | ||
473 | * GNUNET_STREAM_listen() is already taken. | ||
474 | * | ||
475 | * @param cls the closure from GNUNET_STREAM_listen | ||
476 | * @param socket the socket representing the stream; NULL on binding error | ||
477 | * @param initiator the identity of the peer who wants to establish a stream | ||
478 | * with us; NULL on binding error | ||
479 | * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the | ||
480 | * stream (the socket will be invalid after the call) | ||
481 | */ | ||
482 | static int | ||
483 | stream_listen_cb (void *cls, | ||
484 | struct GNUNET_STREAM_Socket *socket, | ||
485 | const struct GNUNET_PeerIdentity *initiator) | ||
486 | { | ||
487 | struct Incoming *incoming; | ||
488 | static const struct GNUNET_MQ_Handler handlers[] = { | ||
489 | {handle_p2p_operation_request, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST}, | ||
490 | GNUNET_MQ_HANDLERS_END | ||
491 | }; | ||
492 | |||
493 | if (NULL == socket) | ||
494 | { | ||
495 | GNUNET_break (0); | ||
496 | return GNUNET_SYSERR; | ||
497 | } | ||
498 | |||
499 | incoming = GNUNET_new (struct Incoming); | ||
500 | incoming->peer = *initiator; | ||
501 | incoming->socket = socket; | ||
502 | incoming->mq = GNUNET_MQ_queue_for_stream_socket (incoming->socket, handlers, incoming); | ||
503 | /* FIXME: timeout for peers that only connect but don't send anything */ | ||
504 | GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); | ||
505 | return GNUNET_OK; | ||
506 | } | ||
507 | |||
508 | |||
509 | /** | ||
510 | * Called to clean up, after a shutdown has been requested. | ||
511 | * | ||
512 | * @param cls closure | ||
513 | * @param tc context information (why was this task triggered now) | ||
514 | */ | ||
515 | static void | ||
516 | shutdown_task (void *cls, | ||
517 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
518 | { | ||
519 | if (NULL != stream_listen_socket) | ||
520 | { | ||
521 | GNUNET_STREAM_listen_close (stream_listen_socket); | ||
522 | stream_listen_socket = NULL; | ||
523 | } | ||
524 | |||
525 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); | ||
67 | } | 526 | } |
68 | 527 | ||
69 | 528 | ||
@@ -77,15 +536,24 @@ handle_client_create (void *cls, | |||
77 | */ | 536 | */ |
78 | static void | 537 | static void |
79 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *cfg) | 538 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *cfg) |
80 | |||
81 | { | 539 | { |
82 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { | 540 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { |
83 | {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0}, | 541 | {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0}, |
542 | {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0}, | ||
543 | {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0}, | ||
544 | {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 0}, | ||
545 | {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0}, | ||
546 | {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0}, | ||
547 | {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0}, | ||
84 | {NULL, NULL, 0, 0} | 548 | {NULL, NULL, 0, 0} |
85 | }; | 549 | }; |
86 | 550 | ||
87 | 551 | configuration = cfg; | |
552 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); | ||
88 | GNUNET_SERVER_add_handlers (server, server_handlers); | 553 | GNUNET_SERVER_add_handlers (server, server_handlers); |
554 | stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET, | ||
555 | &stream_listen_cb, NULL, | ||
556 | GNUNET_STREAM_OPTION_END); | ||
89 | } | 557 | } |
90 | 558 | ||
91 | 559 | ||
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c index bb494ffe7..7de687611 100644 --- a/src/set/gnunet-set.c +++ b/src/set/gnunet-set.c | |||
@@ -27,6 +27,23 @@ | |||
27 | #include "gnunet_common.h" | 27 | #include "gnunet_common.h" |
28 | #include "gnunet_util_lib.h" | 28 | #include "gnunet_util_lib.h" |
29 | #include "gnunet_testbed_service.h" | 29 | #include "gnunet_testbed_service.h" |
30 | #include "gnunet_set_service.h" | ||
31 | |||
32 | |||
33 | static struct GNUNET_HashCode app_id; | ||
34 | static struct GNUNET_SET_Handle *set1; | ||
35 | static struct GNUNET_SET_Handle *set2; | ||
36 | static struct GNUNET_SET_ListenHandle *listen_handle; | ||
37 | |||
38 | |||
39 | static void | ||
40 | listen_cb (void *cls, | ||
41 | const struct GNUNET_PeerIdentity *other_peer, | ||
42 | const struct GNUNET_MessageHeader *context_msg, | ||
43 | struct GNUNET_SET_Request *request) | ||
44 | { | ||
45 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); | ||
46 | } | ||
30 | 47 | ||
31 | 48 | ||
32 | /** | 49 | /** |
@@ -40,10 +57,15 @@ | |||
40 | static void | 57 | static void |
41 | run (void *cls, char *const *args, | 58 | run (void *cls, char *const *args, |
42 | const char *cfgfile, | 59 | const char *cfgfile, |
43 | const struct GNUNET_CONFIGURATION_Handle * | 60 | const struct GNUNET_CONFIGURATION_Handle *cfg) |
44 | cfg) | ||
45 | { | 61 | { |
46 | /* FIXME */ | 62 | static const char* app_str = "gnunet-set"; |
63 | GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id); | ||
64 | |||
65 | set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | ||
66 | set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | ||
67 | listen_handle = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, &app_id, | ||
68 | listen_cb, NULL); | ||
47 | } | 69 | } |
48 | 70 | ||
49 | 71 | ||
@@ -56,7 +78,7 @@ main (int argc, char **argv) | |||
56 | }; | 78 | }; |
57 | GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set", | 79 | GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set", |
58 | "help", | 80 | "help", |
59 | options, &run, NULL, GNUNET_YES); | 81 | options, &run, NULL, GNUNET_NO); |
60 | return 0; | 82 | return 0; |
61 | } | 83 | } |
62 | 84 | ||
diff --git a/src/set/mq.c b/src/set/mq.c index 313e9ce0c..236a692d4 100644 --- a/src/set/mq.c +++ b/src/set/mq.c | |||
@@ -26,13 +26,181 @@ | |||
26 | 26 | ||
27 | #include "mq.h" | 27 | #include "mq.h" |
28 | 28 | ||
29 | /** | ||
30 | * Signature of functions implementing the | ||
31 | * sending part of a message queue | ||
32 | * | ||
33 | * @param q the message queue | ||
34 | * @param m the message | ||
35 | */ | ||
36 | typedef void (*SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m); | ||
37 | |||
38 | |||
39 | typedef void (*DestroyImpl) (struct GNUNET_MQ_MessageQueue *q); | ||
40 | |||
41 | |||
42 | /** | ||
43 | * Collection of the state necessary to read and write gnunet messages | ||
44 | * to a stream socket. Should be used as closure for stream_data_processor. | ||
45 | */ | ||
46 | struct MessageStreamState | ||
47 | { | ||
48 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | ||
49 | struct MessageQueue *mq; | ||
50 | struct GNUNET_STREAM_Socket *socket; | ||
51 | struct GNUNET_STREAM_ReadHandle *rh; | ||
52 | struct GNUNET_STREAM_WriteHandle *wh; | ||
53 | }; | ||
54 | |||
55 | |||
56 | struct ServerClientSocketState | ||
57 | { | ||
58 | struct GNUNET_SERVER_Client *client; | ||
59 | struct GNUNET_SERVER_TransmitHandle* th; | ||
60 | }; | ||
61 | |||
62 | |||
63 | struct ClientConnectionState | ||
64 | { | ||
65 | struct GNUNET_CLIENT_Connection *connection; | ||
66 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
67 | }; | ||
68 | |||
69 | |||
70 | struct GNUNET_MQ_MessageQueue | ||
71 | { | ||
72 | /** | ||
73 | * Handlers array, or NULL if the queue should not receive messages | ||
74 | */ | ||
75 | const struct GNUNET_MQ_Handler *handlers; | ||
76 | |||
77 | /** | ||
78 | * Closure for the handler callbacks | ||
79 | */ | ||
80 | void *handlers_cls; | ||
81 | |||
82 | /** | ||
83 | * Actual implementation of message sending, | ||
84 | * called when a message is added | ||
85 | */ | ||
86 | SendImpl send_impl; | ||
87 | |||
88 | /** | ||
89 | * Implementation-dependent queue destruction function | ||
90 | */ | ||
91 | DestroyImpl destroy_impl; | ||
92 | |||
93 | /** | ||
94 | * Implementation-specific state | ||
95 | */ | ||
96 | void *impl_state; | ||
97 | |||
98 | /** | ||
99 | * Linked list of messages pending to be sent | ||
100 | */ | ||
101 | struct GNUNET_MQ_Message *msg_head; | ||
102 | |||
103 | /** | ||
104 | * Linked list of messages pending to be sent | ||
105 | */ | ||
106 | struct GNUNET_MQ_Message *msg_tail; | ||
107 | |||
108 | /** | ||
109 | * Message that is currently scheduled to be | ||
110 | * sent. Not the head of the message queue, as the implementation | ||
111 | * needs to know if sending has been already scheduled or not. | ||
112 | */ | ||
113 | struct GNUNET_MQ_Message *current_msg; | ||
114 | |||
115 | /** | ||
116 | * Map of associations, lazily allocated | ||
117 | */ | ||
118 | struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; | ||
119 | |||
120 | /** | ||
121 | * Next id that should be used for the assoc_map, | ||
122 | * initialized lazily to a random value together with | ||
123 | * assoc_map | ||
124 | */ | ||
125 | uint32_t assoc_id; | ||
126 | }; | ||
127 | |||
29 | 128 | ||
30 | struct GNUNET_MQ_Message | 129 | struct GNUNET_MQ_Message |
31 | { | 130 | { |
131 | /** | ||
132 | * Messages are stored in a linked list | ||
133 | */ | ||
134 | struct GNUNET_MQ_Message *next; | ||
135 | |||
136 | /** | ||
137 | * Messages are stored in a linked list | ||
138 | */ | ||
139 | struct GNUNET_MQ_Message *prev; | ||
140 | |||
141 | /** | ||
142 | * Actual allocated message header, | ||
143 | * usually points to the end of the containing GNUNET_MQ_Message | ||
144 | */ | ||
32 | struct GNUNET_MessageHeader *mh; | 145 | struct GNUNET_MessageHeader *mh; |
146 | |||
147 | /** | ||
148 | * Queue the message is queued in, NULL if message is not queued. | ||
149 | */ | ||
150 | struct GNUNET_MQ_MessageQueue *parent_queue; | ||
151 | |||
152 | /** | ||
153 | * Called after the message was sent irrevokably | ||
154 | */ | ||
155 | GNUNET_MQ_NotifyCallback sent_cb; | ||
156 | |||
157 | /** | ||
158 | * Closure for send_cb | ||
159 | */ | ||
160 | void *sent_cls; | ||
33 | }; | 161 | }; |
34 | 162 | ||
35 | 163 | ||
164 | /** | ||
165 | * Call the right callback for a message received | ||
166 | * by a queue | ||
167 | */ | ||
168 | static void | ||
169 | dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) | ||
170 | { | ||
171 | const struct GNUNET_MQ_Handler *handler; | ||
172 | |||
173 | handler = mq->handlers; | ||
174 | if (NULL == handler) | ||
175 | return; | ||
176 | for (; NULL != handler->cb; handler++) | ||
177 | if (handler->type == ntohs (mh->type)) | ||
178 | handler->cb (mq->handlers_cls, mh); | ||
179 | } | ||
180 | |||
181 | |||
182 | void | ||
183 | GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm) | ||
184 | { | ||
185 | GNUNET_assert (NULL == mqm->parent_queue); | ||
186 | GNUNET_free (mqm); | ||
187 | } | ||
188 | |||
189 | |||
190 | /** | ||
191 | * Send a message with the give message queue. | ||
192 | * May only be called once per message. | ||
193 | * | ||
194 | * @param mq message queue | ||
195 | * @param mqm the message to send. | ||
196 | */ | ||
197 | void | ||
198 | GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | ||
199 | { | ||
200 | mq->send_impl (mq, mqm); | ||
201 | } | ||
202 | |||
203 | |||
36 | struct GNUNET_MQ_Message * | 204 | struct GNUNET_MQ_Message * |
37 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | 205 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) |
38 | { | 206 | { |
@@ -40,8 +208,422 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | |||
40 | mqm = GNUNET_malloc (sizeof *mqm + size); | 208 | mqm = GNUNET_malloc (sizeof *mqm + size); |
41 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; | 209 | mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; |
42 | mqm->mh->size = htons (size); | 210 | mqm->mh->size = htons (size); |
43 | mqm->mh->type = htons(type); | 211 | mqm->mh->type = htons (type); |
44 | if (NULL != mhp) | 212 | if (NULL != mhp) |
45 | *mhp = mqm->mh; | 213 | *mhp = mqm->mh; |
46 | return mqm; | 214 | return mqm; |
47 | } | 215 | } |
216 | |||
217 | |||
218 | struct GNUNET_MQ_Message * | ||
219 | GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, struct GNUNET_MessageHeader *m, uint16_t type) | ||
220 | { | ||
221 | struct GNUNET_MQ_Message *mq; | ||
222 | |||
223 | GNUNET_assert (NULL != mhp); | ||
224 | if (NULL == m) | ||
225 | return GNUNET_MQ_msg_ (mhp, base_size, type); | ||
226 | GNUNET_assert (ntohs (m->size >= sizeof (struct GNUNET_MessageHeader))); | ||
227 | /* check for overflow */ | ||
228 | if (base_size + ntohs (m->size) <= base_size) | ||
229 | return NULL; | ||
230 | mq = GNUNET_MQ_msg_ (mhp, base_size + ntohs (m->size), type); | ||
231 | memcpy (((void *) *mhp) + base_size, m, ntohs (m->size)); | ||
232 | return mq; | ||
233 | } | ||
234 | |||
235 | |||
236 | /** | ||
237 | * Functions of this signature are called whenever writing operations | ||
238 | * on a stream are executed | ||
239 | * | ||
240 | * @param cls the closure from GNUNET_STREAM_write | ||
241 | * @param status the status of the stream at the time this function is called; | ||
242 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
243 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
244 | * (this doesn't mean that the data is never sent, the receiver may | ||
245 | * have read the data but its ACKs may have been lost); | ||
246 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
247 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
248 | * be processed. | ||
249 | * @param size the number of bytes written | ||
250 | */ | ||
251 | static void | ||
252 | stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
253 | { | ||
254 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
255 | struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state; | ||
256 | struct GNUNET_MQ_Message *mqm; | ||
257 | |||
258 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
259 | |||
260 | /* call cb for message we finished sending */ | ||
261 | mqm = mq->current_msg; | ||
262 | if (NULL != mqm) | ||
263 | { | ||
264 | if (NULL != mqm->sent_cb) | ||
265 | mqm->sent_cb (mqm->sent_cls); | ||
266 | GNUNET_free (mqm); | ||
267 | } | ||
268 | |||
269 | mss->wh = NULL; | ||
270 | |||
271 | mqm = mq->msg_head; | ||
272 | mq->current_msg = mqm; | ||
273 | if (NULL == mqm) | ||
274 | return; | ||
275 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); | ||
276 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
277 | GNUNET_TIME_UNIT_FOREVER_REL, stream_write_queued, cls); | ||
278 | GNUNET_assert (NULL != mss->wh); | ||
279 | } | ||
280 | |||
281 | |||
282 | static void | ||
283 | stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | ||
284 | { | ||
285 | if (NULL != mq->current_msg) | ||
286 | { | ||
287 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | ||
288 | return; | ||
289 | } | ||
290 | stream_write_queued (mq, GNUNET_STREAM_OK, 0); | ||
291 | } | ||
292 | |||
293 | |||
294 | /** | ||
295 | * Functions with this signature are called whenever a | ||
296 | * complete message is received by the tokenizer. | ||
297 | * | ||
298 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
299 | * | ||
300 | * @param cls closure | ||
301 | * @param client identification of the client | ||
302 | * @param message the actual message | ||
303 | * | ||
304 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
305 | */ | ||
306 | static int | ||
307 | stream_mst_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) | ||
308 | { | ||
309 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
310 | |||
311 | GNUNET_assert (NULL != message); | ||
312 | dispatch_message (mq, message); | ||
313 | return GNUNET_OK; | ||
314 | } | ||
315 | |||
316 | |||
317 | /** | ||
318 | * Functions of this signature are called whenever data is available from the | ||
319 | * stream. | ||
320 | * | ||
321 | * @param cls the closure from GNUNET_STREAM_read | ||
322 | * @param status the status of the stream at the time this function is called | ||
323 | * @param data traffic from the other side | ||
324 | * @param size the number of bytes available in data read; will be 0 on timeout | ||
325 | * @return number of bytes of processed from 'data' (any data remaining should be | ||
326 | * given to the next time the read processor is called). | ||
327 | */ | ||
328 | static size_t | ||
329 | stream_data_processor (void *cls, | ||
330 | enum GNUNET_STREAM_Status status, | ||
331 | const void *data, | ||
332 | size_t size) | ||
333 | { | ||
334 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
335 | struct MessageStreamState *mss; | ||
336 | int ret; | ||
337 | mss = (struct MessageStreamState *) mq->impl_state; | ||
338 | |||
339 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
340 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); | ||
341 | GNUNET_assert (GNUNET_OK == ret); | ||
342 | /* we always read all data */ | ||
343 | return size; | ||
344 | } | ||
345 | |||
346 | |||
347 | struct GNUNET_MQ_MessageQueue * | ||
348 | GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, | ||
349 | const struct GNUNET_MQ_Handler *handlers, | ||
350 | void *cls) | ||
351 | { | ||
352 | struct GNUNET_MQ_MessageQueue *mq; | ||
353 | struct MessageStreamState *mss; | ||
354 | |||
355 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | ||
356 | mss = GNUNET_new (struct MessageStreamState); | ||
357 | mss->socket = socket; | ||
358 | mq->impl_state = mss; | ||
359 | mq->send_impl = stream_socket_send_impl; | ||
360 | mq->handlers = handlers; | ||
361 | mq->handlers_cls = cls; | ||
362 | if (NULL != handlers) | ||
363 | { | ||
364 | mss->mst = GNUNET_SERVER_mst_create (stream_mst_callback, mq); | ||
365 | mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
366 | stream_data_processor, mq); | ||
367 | } | ||
368 | return mq; | ||
369 | } | ||
370 | |||
371 | |||
372 | /** | ||
373 | * Transmit a queued message to the session's client. | ||
374 | * | ||
375 | * @param cls consensus session | ||
376 | * @param size number of bytes available in buf | ||
377 | * @param buf where the callee should write the message | ||
378 | * @return number of bytes written to buf | ||
379 | */ | ||
380 | static size_t | ||
381 | transmit_queued (void *cls, size_t size, | ||
382 | void *buf) | ||
383 | { | ||
384 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
385 | struct GNUNET_MQ_Message *mqm = mq->current_msg; | ||
386 | struct ServerClientSocketState *state = mq->impl_state; | ||
387 | size_t msg_size; | ||
388 | |||
389 | mq->current_msg = NULL; | ||
390 | GNUNET_assert (NULL != mqm); | ||
391 | GNUNET_assert (NULL != buf); | ||
392 | msg_size = ntohs (mqm->mh->size); | ||
393 | GNUNET_assert (size >= msg_size); | ||
394 | memcpy (buf, mqm->mh, msg_size); | ||
395 | GNUNET_free (mqm); | ||
396 | state->th = NULL; | ||
397 | if (NULL != mq->msg_head) | ||
398 | { | ||
399 | mq->current_msg = mq->msg_head; | ||
400 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); | ||
401 | state->th = | ||
402 | GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, | ||
403 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
404 | &transmit_queued, mq); | ||
405 | } | ||
406 | return msg_size; | ||
407 | } | ||
408 | |||
409 | |||
410 | static void | ||
411 | server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | ||
412 | { | ||
413 | struct ServerClientSocketState *state = mq->impl_state; | ||
414 | int msize; | ||
415 | |||
416 | GNUNET_assert (NULL != state); | ||
417 | |||
418 | if (NULL != state->th) | ||
419 | { | ||
420 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | ||
421 | return; | ||
422 | } | ||
423 | GNUNET_assert (NULL == mq->current_msg); | ||
424 | msize = ntohs (mq->msg_head->mh->size); | ||
425 | mq->current_msg = mqm; | ||
426 | state->th = | ||
427 | GNUNET_SERVER_notify_transmit_ready (state->client, msize, | ||
428 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
429 | &transmit_queued, mq); | ||
430 | } | ||
431 | |||
432 | |||
433 | struct GNUNET_MQ_MessageQueue * | ||
434 | GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) | ||
435 | { | ||
436 | struct GNUNET_MQ_MessageQueue *mq; | ||
437 | struct ServerClientSocketState *scss; | ||
438 | |||
439 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | ||
440 | scss = GNUNET_new (struct ServerClientSocketState); | ||
441 | mq->impl_state = scss; | ||
442 | mq->send_impl = server_client_send_impl; | ||
443 | return mq; | ||
444 | } | ||
445 | |||
446 | |||
447 | /** | ||
448 | * Transmit a queued message to the session's client. | ||
449 | * | ||
450 | * @param cls consensus session | ||
451 | * @param size number of bytes available in buf | ||
452 | * @param buf where the callee should write the message | ||
453 | * @return number of bytes written to buf | ||
454 | */ | ||
455 | static size_t | ||
456 | connection_client_transmit_queued (void *cls, size_t size, | ||
457 | void *buf) | ||
458 | { | ||
459 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
460 | struct GNUNET_MQ_Message *mqm = mq->current_msg; | ||
461 | struct ClientConnectionState *state = mq->impl_state; | ||
462 | size_t msg_size; | ||
463 | |||
464 | mq->current_msg = NULL; | ||
465 | GNUNET_assert (NULL != mqm); | ||
466 | GNUNET_assert (NULL != buf); | ||
467 | msg_size = ntohs (mqm->mh->size); | ||
468 | GNUNET_assert (size >= msg_size); | ||
469 | memcpy (buf, mqm->mh, msg_size); | ||
470 | GNUNET_free (mqm); | ||
471 | state->th = NULL; | ||
472 | if (NULL != mq->msg_head) | ||
473 | { | ||
474 | mq->current_msg = mq->msg_head; | ||
475 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); | ||
476 | state->th = | ||
477 | GNUNET_CLIENT_notify_transmit_ready (state->connection, msg_size, | ||
478 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | ||
479 | &connection_client_transmit_queued, mq); | ||
480 | } | ||
481 | return msg_size; | ||
482 | } | ||
483 | |||
484 | |||
485 | static void | ||
486 | connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | ||
487 | { | ||
488 | struct ClientConnectionState *state = mq->impl_state; | ||
489 | int msize; | ||
490 | |||
491 | GNUNET_assert (NULL != state); | ||
492 | |||
493 | if (NULL != state->th) | ||
494 | { | ||
495 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | ||
496 | return; | ||
497 | } | ||
498 | GNUNET_assert (NULL == mq->current_msg); | ||
499 | mq->current_msg = mqm; | ||
500 | msize = ntohs (mqm->mh->size); | ||
501 | state->th = | ||
502 | GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, | ||
503 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | ||
504 | &connection_client_transmit_queued, mq); | ||
505 | } | ||
506 | |||
507 | |||
508 | /** | ||
509 | * Type of a function to call when we receive a message | ||
510 | * from the service. | ||
511 | * | ||
512 | * @param cls closure | ||
513 | * @param msg message received, NULL on timeout or fatal error | ||
514 | */ | ||
515 | static void | ||
516 | handle_client_message (void *cls, | ||
517 | const struct GNUNET_MessageHeader *msg) | ||
518 | { | ||
519 | struct GNUNET_MQ_MessageQueue *mq = cls; | ||
520 | |||
521 | GNUNET_assert (NULL != msg); | ||
522 | |||
523 | dispatch_message (mq, msg); | ||
524 | } | ||
525 | |||
526 | |||
527 | struct GNUNET_MQ_MessageQueue * | ||
528 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, | ||
529 | const struct GNUNET_MQ_Handler *handlers, | ||
530 | void *cls) | ||
531 | { | ||
532 | struct GNUNET_MQ_MessageQueue *mq; | ||
533 | struct ClientConnectionState *state; | ||
534 | |||
535 | GNUNET_assert (NULL != connection); | ||
536 | |||
537 | mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); | ||
538 | mq->handlers = handlers; | ||
539 | mq->handlers_cls = cls; | ||
540 | state = GNUNET_new (struct ClientConnectionState); | ||
541 | state->connection = connection; | ||
542 | mq->impl_state = state; | ||
543 | mq->send_impl = connection_client_send_impl; | ||
544 | |||
545 | if (NULL != handlers) | ||
546 | { | ||
547 | GNUNET_CLIENT_receive (connection, handle_client_message, mq, | ||
548 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
549 | } | ||
550 | |||
551 | return mq; | ||
552 | } | ||
553 | |||
554 | |||
555 | |||
556 | void | ||
557 | GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, | ||
558 | const struct GNUNET_MQ_Handler *new_handlers, | ||
559 | void *cls) | ||
560 | { | ||
561 | mq->handlers = new_handlers; | ||
562 | mq->handlers_cls = cls; | ||
563 | } | ||
564 | |||
565 | |||
566 | |||
567 | /** | ||
568 | * Associate the assoc_data in mq with a unique request id. | ||
569 | * | ||
570 | * @param mq message queue, id will be unique for the queue | ||
571 | * @param mqm message to associate | ||
572 | * @param data to associate | ||
573 | */ | ||
574 | uint32_t | ||
575 | GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, | ||
576 | struct GNUNET_MQ_Message *mqm, | ||
577 | void *assoc_data) | ||
578 | { | ||
579 | uint32_t id; | ||
580 | |||
581 | if (NULL == mq->assoc_map) | ||
582 | mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8); | ||
583 | id = mq->assoc_id++; | ||
584 | GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data, | ||
585 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
586 | return id; | ||
587 | } | ||
588 | |||
589 | |||
590 | |||
591 | void * | ||
592 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | ||
593 | { | ||
594 | if (NULL == mq->assoc_map) | ||
595 | return NULL; | ||
596 | return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); | ||
597 | } | ||
598 | |||
599 | |||
600 | void * | ||
601 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) | ||
602 | { | ||
603 | void *val; | ||
604 | |||
605 | if (NULL == mq->assoc_map) | ||
606 | return NULL; | ||
607 | val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id); | ||
608 | GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val); | ||
609 | return val; | ||
610 | } | ||
611 | |||
612 | |||
613 | void | ||
614 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, | ||
615 | GNUNET_MQ_NotifyCallback cb, | ||
616 | void *cls) | ||
617 | { | ||
618 | mqm->sent_cb = cb; | ||
619 | mqm->sent_cls = cls; | ||
620 | } | ||
621 | |||
622 | |||
623 | void | ||
624 | GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) | ||
625 | { | ||
626 | /* FIXME: destroy all pending messages in the queue */ | ||
627 | GNUNET_free (mq); | ||
628 | } | ||
629 | |||
diff --git a/src/set/mq.h b/src/set/mq.h index 960025885..27ab0469e 100644 --- a/src/set/mq.h +++ b/src/set/mq.h | |||
@@ -30,23 +30,109 @@ | |||
30 | #include "gnunet_common.h" | 30 | #include "gnunet_common.h" |
31 | #include "gnunet_util_lib.h" | 31 | #include "gnunet_util_lib.h" |
32 | #include "gnunet_connection_lib.h" | 32 | #include "gnunet_connection_lib.h" |
33 | #include "gnunet_server_lib.h" | ||
34 | #include "gnunet_stream_lib.h" | ||
33 | 35 | ||
34 | 36 | ||
35 | #define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_(((void) mvar->header, (struct GNUNET_MessageHeader**) &mvar), (esize) + sizeof *mvar, type) | 37 | /** |
38 | * Allocate a GNUNET_MQ_Message, with extra space allocated after the space needed | ||
39 | * by the message struct. | ||
40 | * The allocated message will already have the type and size field set. | ||
41 | * | ||
42 | * @param mvar variable to store the allocated message in; | ||
43 | * must have a header field | ||
44 | * @param esize extra space to allocate after the message | ||
45 | * @param type type of the message | ||
46 | * @return the MQ message | ||
47 | */ | ||
48 | #define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), (esize) + sizeof *(mvar), (type)) | ||
36 | 49 | ||
50 | /** | ||
51 | * Allocate a GNUNET_MQ_Message. | ||
52 | * The allocated message will already have the type and size field set. | ||
53 | * | ||
54 | * @param mvar variable to store the allocated message in; | ||
55 | * must have a header field | ||
56 | * @param type type of the message | ||
57 | * @return the MQ message | ||
58 | */ | ||
37 | #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) | 59 | #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) |
38 | 60 | ||
39 | #define GNUNET_MQ_msg_raw(type) GNUNET_MQ_msg_ (NULL, sizeof (struct GNUNET_MessageHeader), type) | 61 | /** |
62 | * Allocate a GNUNET_MQ_Message, and concatenate another message | ||
63 | * after the space needed by the message struct. | ||
64 | * | ||
65 | * @param mvar variable to store the allocated message in; | ||
66 | * must have a header field | ||
67 | * @param mc message to concatenate, can be NULL | ||
68 | * @param type type of the message | ||
69 | * @return the MQ message, NULL if mc is to large to be concatenated | ||
70 | */ | ||
71 | #define GNUNET_MQ_msg_concat(mvar, mc, t) GNUNET_MQ_msg_concat_(((void) mvar->header, (struct GNUNET_MessageHeader **) &(mvar)), \ | ||
72 | sizeof *mvar, (struct GNUNET_MessageHeader *) mc, t) | ||
73 | |||
74 | |||
75 | /** | ||
76 | * Allocate a GNUNET_MQ_Message, where the message only consists of a header. | ||
77 | * The allocated message will already have the type and size field set. | ||
78 | * | ||
79 | * @param mvar variable to store the allocated message in; | ||
80 | * must have a header field | ||
81 | * @param type type of the message | ||
82 | */ | ||
83 | #define GNUNET_MQ_msg_header(type) GNUNET_MQ_msg_ (NULL, sizeof (struct GNUNET_MessageHeader), type) | ||
40 | 84 | ||
85 | |||
86 | /** | ||
87 | * Allocate a GNUNET_MQ_Message, where the message only consists of a header and extra space. | ||
88 | * The allocated message will already have the type and size field set. | ||
89 | * | ||
90 | * @param mvar variable to store the allocated message in; | ||
91 | * must have a header field | ||
92 | * @param esize extra space to allocate after the message header | ||
93 | * @param type type of the message | ||
94 | */ | ||
95 | #define GNUNET_MQ_msg_header_extra(mh, esize, type) GNUNET_MQ_msg_ (&mh, sizeof (struct GNUNET_MessageHeader), type) | ||
96 | |||
97 | |||
98 | /** | ||
99 | * End-marker for the handlers array | ||
100 | */ | ||
41 | #define GNUNET_MQ_HANDLERS_END {NULL, 0} | 101 | #define GNUNET_MQ_HANDLERS_END {NULL, 0} |
42 | 102 | ||
103 | /** | ||
104 | * Opaque handle to a message queue | ||
105 | */ | ||
43 | struct GNUNET_MQ_MessageQueue; | 106 | struct GNUNET_MQ_MessageQueue; |
44 | 107 | ||
108 | /** | ||
109 | * Opaque handle to an allocated message | ||
110 | */ | ||
45 | struct GNUNET_MQ_Message; | 111 | struct GNUNET_MQ_Message; |
46 | 112 | ||
113 | /** | ||
114 | * Called when a message has been received. | ||
115 | * | ||
116 | * @param cls closure | ||
117 | * @param msg the received message | ||
118 | */ | ||
119 | typedef void (*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg); | ||
120 | |||
121 | |||
122 | /** | ||
123 | * Message handler for a specific message type. | ||
124 | */ | ||
47 | struct GNUNET_MQ_Handler | 125 | struct GNUNET_MQ_Handler |
48 | { | 126 | { |
49 | void *cb; | 127 | /** |
128 | * Callback, called every time a new message of | ||
129 | * the specified type has been receied. | ||
130 | */ | ||
131 | GNUNET_MQ_MessageCallback cb; | ||
132 | |||
133 | /** | ||
134 | * Type of the message we are interested in | ||
135 | */ | ||
50 | uint16_t type; | 136 | uint16_t type; |
51 | }; | 137 | }; |
52 | 138 | ||
@@ -63,11 +149,33 @@ typedef void (*GNUNET_MQ_NotifyCallback) (void *cls); | |||
63 | * @param mhp message header to store the allocated message header in, can be NULL | 149 | * @param mhp message header to store the allocated message header in, can be NULL |
64 | * @param size size of the message to allocate | 150 | * @param size size of the message to allocate |
65 | * @param type type of the message, will be set in the allocated message | 151 | * @param type type of the message, will be set in the allocated message |
66 | * @param return the allocated MQ message | 152 | * @return the allocated MQ message |
67 | */ | 153 | */ |
68 | struct GNUNET_MQ_Message * | 154 | struct GNUNET_MQ_Message * |
69 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type); | 155 | GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type); |
70 | 156 | ||
157 | |||
158 | /** | ||
159 | * Create a new message for MQ, by concatenating another message | ||
160 | * after a message of the specified type. | ||
161 | * | ||
162 | * @retrn the allocated MQ message | ||
163 | */ | ||
164 | struct GNUNET_MQ_Message * | ||
165 | GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, struct GNUNET_MessageHeader *m, uint16_t type); | ||
166 | |||
167 | |||
168 | /** | ||
169 | * Discard the message queue message, free all | ||
170 | * allocated resources. Must be called in the event | ||
171 | * that a message is created but should not actually be sent. | ||
172 | * | ||
173 | * @param mqm the message to discard | ||
174 | */ | ||
175 | void | ||
176 | GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm); | ||
177 | |||
178 | |||
71 | /** | 179 | /** |
72 | * Send a message with the give message queue. | 180 | * Send a message with the give message queue. |
73 | * May only be called once per message. | 181 | * May only be called once per message. |
@@ -78,6 +186,7 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | |||
78 | void | 186 | void |
79 | GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm); | 187 | GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm); |
80 | 188 | ||
189 | |||
81 | /** | 190 | /** |
82 | * Cancel sending the message. Message must have been sent with GNUNET_MQ_send before. | 191 | * Cancel sending the message. Message must have been sent with GNUNET_MQ_send before. |
83 | * May not be called after the notify sent callback has been called | 192 | * May not be called after the notify sent callback has been called |
@@ -100,32 +209,105 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, | |||
100 | struct GNUNET_MQ_Message *mqm, | 209 | struct GNUNET_MQ_Message *mqm, |
101 | void *assoc_data); | 210 | void *assoc_data); |
102 | 211 | ||
212 | /** | ||
213 | * Get the data associated with a request id in a queue | ||
214 | * | ||
215 | * @param mq the message queue with the association | ||
216 | * @param request_id the request id we are interested in | ||
217 | * @return the associated data | ||
218 | */ | ||
103 | void * | 219 | void * |
104 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); | 220 | GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); |
105 | 221 | ||
222 | |||
223 | /** | ||
224 | * Remove the association for a request id | ||
225 | * | ||
226 | * @param mq the message queue with the association | ||
227 | * @param request_id the request id we want to remove | ||
228 | * @return the associated data | ||
229 | */ | ||
106 | void * | 230 | void * |
107 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); | 231 | GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); |
108 | 232 | ||
109 | 233 | ||
110 | 234 | ||
235 | /** | ||
236 | * Create a message queue for a GNUNET_CLIENT_Connection. | ||
237 | * If handlers are specfied, receive messages from the connection. | ||
238 | * | ||
239 | * @param connection the client connection | ||
240 | * @param handlers handlers for receiving messages | ||
241 | * @param cls closure for the handlers | ||
242 | * @return the message queue | ||
243 | */ | ||
111 | struct GNUNET_MQ_MessageQueue * | 244 | struct GNUNET_MQ_MessageQueue * |
112 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, | 245 | GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, |
113 | const struct GNUNET_MQ_Handler *handlers, | 246 | const struct GNUNET_MQ_Handler *handlers, |
114 | void *cls); | 247 | void *cls); |
115 | 248 | ||
116 | 249 | ||
250 | /** | ||
251 | * Create a message queue for a GNUNET_STREAM_Socket. | ||
252 | * | ||
253 | * @param param client the client | ||
254 | * @return the message queue | ||
255 | */ | ||
256 | struct GNUNET_MQ_MessageQueue * | ||
257 | GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client); | ||
258 | |||
259 | |||
260 | |||
261 | /** | ||
262 | * Create a message queue for a GNUNET_STREAM_Socket. | ||
263 | * If handlers are specfied, receive messages from the stream socket. | ||
264 | * | ||
265 | * @param socket the stream socket | ||
266 | * @param handlers handlers for receiving messages | ||
267 | * @param cls closure for the handlers | ||
268 | * @return the message queue | ||
269 | */ | ||
270 | struct GNUNET_MQ_MessageQueue * | ||
271 | GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, | ||
272 | const struct GNUNET_MQ_Handler *handlers, | ||
273 | void *cls); | ||
274 | |||
275 | /** | ||
276 | * Replace the handlers of a message queue with new handlers. | ||
277 | * Takes effect immediately, even for messages that already have been received, but for | ||
278 | * with the handler has not been called. | ||
279 | * | ||
280 | * @param mq message queue | ||
281 | * @param new_handlers new handlers | ||
282 | * @param cls new closure for the handlers | ||
283 | */ | ||
117 | void | 284 | void |
118 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, | 285 | GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, |
119 | GNUNET_MQ_NotifyCallback cb, | 286 | const struct GNUNET_MQ_Handler *new_handlers, |
120 | void *cls); | 287 | void *cls); |
288 | |||
121 | 289 | ||
122 | 290 | ||
291 | /** | ||
292 | * Call a callback once the message has been sent, that is, the message | ||
293 | * can not be canceled anymore. | ||
294 | * There can be only one notify sent callback per message. | ||
295 | * | ||
296 | * @param mqm message to call the notify callback for | ||
297 | * @param cb the notify callback | ||
298 | * @param cls closure for the callback | ||
299 | */ | ||
123 | void | 300 | void |
124 | GNUNET_MQ_notify_timeout (struct GNUNET_MQ_Message *mqm, | 301 | GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, |
125 | GNUNET_MQ_NotifyCallback cb, | 302 | GNUNET_MQ_NotifyCallback cb, |
126 | void *cls); | 303 | void *cls); |
127 | 304 | ||
128 | 305 | ||
306 | /** | ||
307 | * Destroy the message queue. | ||
308 | * | ||
309 | * @param mq message queue to destroy | ||
310 | */ | ||
129 | void | 311 | void |
130 | GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq); | 312 | GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq); |
131 | 313 | ||
diff --git a/src/set/set.conf.in b/src/set/set.conf.in index e69de29bb..708b94b50 100644 --- a/src/set/set.conf.in +++ b/src/set/set.conf.in | |||
@@ -0,0 +1,11 @@ | |||
1 | [set] | ||
2 | AUTOSTART = NO | ||
3 | # PORT = 2106 | ||
4 | HOSTNAME = localhost | ||
5 | HOME = $SERVICEHOME | ||
6 | BINARY = gnunet-service-set | ||
7 | ACCEPT_FROM = 127.0.0.1; | ||
8 | ACCEPT_FROM6 = ::1; | ||
9 | UNIXPATH = /tmp/gnunet-service-set.sock | ||
10 | UNIX_MATCH_UID = YES | ||
11 | UNIX_MATCH_GID = YES | ||
diff --git a/src/set/set.h b/src/set/set.h index ef11e5abc..10e607982 100644 --- a/src/set/set.h +++ b/src/set/set.h | |||
@@ -20,12 +20,13 @@ | |||
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @author Florian Dold | 22 | * @author Florian Dold |
23 | * @file consensus/consensus.h | 23 | * @file set/set.h |
24 | * @brief | 24 | * @brief messages used for the set api |
25 | */ | 25 | */ |
26 | #ifndef SET_H | 26 | #ifndef SET_H |
27 | #define SET_H | 27 | #define SET_H |
28 | 28 | ||
29 | #include "platform.h" | ||
29 | #include "gnunet_common.h" | 30 | #include "gnunet_common.h" |
30 | 31 | ||
31 | 32 | ||
@@ -68,12 +69,6 @@ struct ListenMessage | |||
68 | * Operation type, values of enum GNUNET_SET_OperationType | 69 | * Operation type, values of enum GNUNET_SET_OperationType |
69 | */ | 70 | */ |
70 | uint16_t operation GNUNET_PACKED; | 71 | uint16_t operation GNUNET_PACKED; |
71 | |||
72 | /** | ||
73 | * Operation type, values of enum GNUNET_SET_OperationType | ||
74 | */ | ||
75 | uint16_t op GNUNET_PACKED; | ||
76 | |||
77 | }; | 72 | }; |
78 | 73 | ||
79 | 74 | ||
@@ -88,6 +83,12 @@ struct AcceptMessage | |||
88 | * request id of the request we want to accept | 83 | * request id of the request we want to accept |
89 | */ | 84 | */ |
90 | uint32_t request_id GNUNET_PACKED; | 85 | uint32_t request_id GNUNET_PACKED; |
86 | |||
87 | /** | ||
88 | * Zero if the client has rejected the request, | ||
89 | * non-zero if it has accepted it | ||
90 | */ | ||
91 | uint32_t accepted GNUNET_PACKED; | ||
91 | }; | 92 | }; |
92 | 93 | ||
93 | 94 | ||
@@ -119,8 +120,14 @@ struct EvaluateMessage | |||
119 | */ | 120 | */ |
120 | struct GNUNET_MessageHeader header; | 121 | struct GNUNET_MessageHeader header; |
121 | 122 | ||
122 | struct GNUNET_PeerIdentity other_peer; | 123 | /** |
124 | * Peer to evaluate the operation with | ||
125 | */ | ||
126 | struct GNUNET_PeerIdentity peer; | ||
123 | 127 | ||
128 | /** | ||
129 | * Application id | ||
130 | */ | ||
124 | struct GNUNET_HashCode app_id; | 131 | struct GNUNET_HashCode app_id; |
125 | 132 | ||
126 | /** | 133 | /** |
@@ -144,8 +151,15 @@ struct ResultMessage | |||
144 | */ | 151 | */ |
145 | uint32_t request_id GNUNET_PACKED; | 152 | uint32_t request_id GNUNET_PACKED; |
146 | 153 | ||
154 | /** | ||
155 | * Was the evaluation successful? | ||
156 | */ | ||
147 | uint16_t result_status GNUNET_PACKED; | 157 | uint16_t result_status GNUNET_PACKED; |
148 | 158 | ||
159 | /** | ||
160 | * Type of the element attachted to the message, | ||
161 | * if any. | ||
162 | */ | ||
149 | uint16_t element_type GNUNET_PACKED; | 163 | uint16_t element_type GNUNET_PACKED; |
150 | 164 | ||
151 | /* rest: the actual element */ | 165 | /* rest: the actual element */ |
@@ -179,6 +193,7 @@ struct CancelMessage | |||
179 | uint32_t request_id GNUNET_PACKED; | 193 | uint32_t request_id GNUNET_PACKED; |
180 | }; | 194 | }; |
181 | 195 | ||
196 | |||
182 | GNUNET_NETWORK_STRUCT_END | 197 | GNUNET_NETWORK_STRUCT_END |
183 | 198 | ||
184 | #endif | 199 | #endif |
diff --git a/src/set/set_api.c b/src/set/set_api.c index 45c4e47bd..b2491afe7 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c | |||
@@ -93,7 +93,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) | |||
93 | if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) | 93 | if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) |
94 | { | 94 | { |
95 | struct GNUNET_MQ_Message *mqm; | 95 | struct GNUNET_MQ_Message *mqm; |
96 | mqm = GNUNET_MQ_msg_raw (GNUNET_MESSAGE_TYPE_SET_ACK); | 96 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK); |
97 | GNUNET_MQ_send (set->mq, mqm); | 97 | GNUNET_MQ_send (set->mq, mqm); |
98 | } | 98 | } |
99 | 99 | ||
@@ -136,7 +136,15 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh) | |||
136 | req->request_id = ntohl (msg->request_id); | 136 | req->request_id = ntohl (msg->request_id); |
137 | lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req); | 137 | lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req); |
138 | if (GNUNET_NO == req->accepted) | 138 | if (GNUNET_NO == req->accepted) |
139 | { | ||
140 | struct GNUNET_MQ_Message *mqm; | ||
141 | struct AcceptMessage *amsg; | ||
142 | |||
143 | mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); | ||
144 | amsg->request_id = msg->request_id; | ||
145 | GNUNET_MQ_send (lh->mq, mqm); | ||
139 | GNUNET_free (req); | 146 | GNUNET_free (req); |
147 | } | ||
140 | } | 148 | } |
141 | 149 | ||
142 | 150 | ||
@@ -152,7 +160,7 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh) | |||
152 | * @return a handle to the set | 160 | * @return a handle to the set |
153 | */ | 161 | */ |
154 | struct GNUNET_SET_Handle * | 162 | struct GNUNET_SET_Handle * |
155 | GNUNET_SET_create (struct GNUNET_CONFIGURATION_Handle *cfg, | 163 | GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, |
156 | enum GNUNET_SET_OperationType op) | 164 | enum GNUNET_SET_OperationType op) |
157 | { | 165 | { |
158 | struct GNUNET_SET_Handle *set; | 166 | struct GNUNET_SET_Handle *set; |
@@ -165,6 +173,7 @@ GNUNET_SET_create (struct GNUNET_CONFIGURATION_Handle *cfg, | |||
165 | 173 | ||
166 | set = GNUNET_new (struct GNUNET_SET_Handle); | 174 | set = GNUNET_new (struct GNUNET_SET_Handle); |
167 | set->client = GNUNET_CLIENT_connect ("set", cfg); | 175 | set->client = GNUNET_CLIENT_connect ("set", cfg); |
176 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set client created\n"); | ||
168 | GNUNET_assert (NULL != set->client); | 177 | GNUNET_assert (NULL != set->client); |
169 | set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set); | 178 | set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set); |
170 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); | 179 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); |
@@ -295,7 +304,7 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, | |||
295 | 304 | ||
296 | mqm = GNUNET_MQ_msg_extra (msg, htons(context_msg->size), GNUNET_MESSAGE_TYPE_SET_EVALUATE); | 305 | mqm = GNUNET_MQ_msg_extra (msg, htons(context_msg->size), GNUNET_MESSAGE_TYPE_SET_EVALUATE); |
297 | msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh)); | 306 | msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh)); |
298 | msg->other_peer = *other_peer; | 307 | msg->peer = *other_peer; |
299 | msg->app_id = *app_id; | 308 | msg->app_id = *app_id; |
300 | memcpy (&msg[1], context_msg, htons (context_msg->size)); | 309 | memcpy (&msg[1], context_msg, htons (context_msg->size)); |
301 | oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); | 310 | oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); |
@@ -318,7 +327,7 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, | |||
318 | * @return a handle that can be used to cancel the listen operation | 327 | * @return a handle that can be used to cancel the listen operation |
319 | */ | 328 | */ |
320 | struct GNUNET_SET_ListenHandle * | 329 | struct GNUNET_SET_ListenHandle * |
321 | GNUNET_SET_listen (struct GNUNET_CONFIGURATION_Handle *cfg, | 330 | GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, |
322 | enum GNUNET_SET_OperationType operation, | 331 | enum GNUNET_SET_OperationType operation, |
323 | const struct GNUNET_HashCode *app_id, | 332 | const struct GNUNET_HashCode *app_id, |
324 | GNUNET_SET_ListenCallback listen_cb, | 333 | GNUNET_SET_ListenCallback listen_cb, |
@@ -396,9 +405,11 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, | |||
396 | 405 | ||
397 | mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT); | 406 | mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT); |
398 | msg->request_id = htonl (request->request_id); | 407 | msg->request_id = htonl (request->request_id); |
399 | oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); | 408 | msg->accepted = 1; |
400 | GNUNET_MQ_send (set->mq, mqm); | 409 | GNUNET_MQ_send (set->mq, mqm); |
401 | 410 | ||
411 | oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh); | ||
412 | |||
402 | return oh; | 413 | return oh; |
403 | } | 414 | } |
404 | 415 | ||
@@ -416,7 +427,7 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *h) | |||
416 | 427 | ||
417 | h_assoc = GNUNET_MQ_assoc_remove (h->set->mq, h->request_id); | 428 | h_assoc = GNUNET_MQ_assoc_remove (h->set->mq, h->request_id); |
418 | GNUNET_assert (h_assoc == h); | 429 | GNUNET_assert (h_assoc == h); |
419 | mqm = GNUNET_MQ_msg_raw (GNUNET_MESSAGE_TYPE_SET_CANCEL); | 430 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL); |
420 | GNUNET_MQ_send (h->set->mq, mqm); | 431 | GNUNET_MQ_send (h->set->mq, mqm); |
421 | GNUNET_free (h); | 432 | GNUNET_free (h); |
422 | } | 433 | } |
diff --git a/src/set/strata_estimator.c b/src/set/strata_estimator.c index 685c50f0f..f42c827e2 100644 --- a/src/set/strata_estimator.c +++ b/src/set/strata_estimator.c | |||
@@ -53,15 +53,15 @@ strata_estimator_read (const void *buf, struct StrataEstimator *se) | |||
53 | 53 | ||
54 | 54 | ||
55 | void | 55 | void |
56 | strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key) | 56 | strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key) |
57 | { | 57 | { |
58 | uint32_t v; | 58 | uint64_t v; |
59 | int i; | 59 | int i; |
60 | v = key->bits[0]; | 60 | v = key.key_val; |
61 | /* count trailing '1'-bits of v */ | 61 | /* count trailing '1'-bits of v */ |
62 | for (i = 0; v & 1; v>>=1, i++) | 62 | for (i = 0; v & 1; v>>=1, i++) |
63 | /* empty */; | 63 | /* empty */; |
64 | ibf_insert (se->strata[i], ibf_key_from_hashcode (key)); | 64 | ibf_insert (se->strata[i], key); |
65 | } | 65 | } |
66 | 66 | ||
67 | 67 | ||
diff --git a/src/set/strata_estimator.h b/src/set/strata_estimator.h index cb5bd3d0a..57f0961a9 100644 --- a/src/set/strata_estimator.h +++ b/src/set/strata_estimator.h | |||
@@ -66,7 +66,7 @@ strata_estimator_difference (const struct StrataEstimator *se1, | |||
66 | 66 | ||
67 | 67 | ||
68 | void | 68 | void |
69 | strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key); | 69 | strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key); |
70 | 70 | ||
71 | 71 | ||
72 | void | 72 | void |
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c index 7e2ed2b41..0753fd139 100644 --- a/src/set/test_set_api.c +++ b/src/set/test_set_api.c | |||
@@ -44,8 +44,8 @@ run (void *cls, | |||
44 | struct GNUNET_SET_Handle *set1; | 44 | struct GNUNET_SET_Handle *set1; |
45 | struct GNUNET_SET_Handle *set2; | 45 | struct GNUNET_SET_Handle *set2; |
46 | 46 | ||
47 | set1 = GNUNET_SET_create (GNUNET_SET_OPERATION_UNION); | 47 | set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); |
48 | set2 = GNUNET_SET_create (GNUNET_SET_OPERATION_UNION); | 48 | set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); |
49 | } | 49 | } |
50 | 50 | ||
51 | int | 51 | int |