aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2020-08-18 18:09:58 +0200
committerChristian Grothoff <christian@grothoff.org>2020-08-18 18:09:58 +0200
commit4d607f2f2838431cc7a349441f8f018ab99633a2 (patch)
treecf0e41012667b94b893d133c78ffdf0a18a274dd
parent0c0cbfb5913b87135b51798d8c08cd49951e51f2 (diff)
downloadgnunet-4d607f2f2838431cc7a349441f8f018ab99633a2.tar.gz
gnunet-4d607f2f2838431cc7a349441f8f018ab99633a2.zip
splitting of set intersection functionality from set service (not yet finished, FTBFS)
-rw-r--r--configure.ac2
-rw-r--r--po/POTFILES.in5
-rw-r--r--src/include/gnunet_protocols.h94
-rw-r--r--src/include/gnunet_seti_service.h369
-rw-r--r--src/include/gnunet_setu_service.h4
-rw-r--r--src/seti/.gitignore3
-rw-r--r--src/seti/Makefile.am90
-rw-r--r--src/seti/gnunet-service-seti.c3274
-rw-r--r--src/seti/gnunet-service-seti_protocol.h144
-rw-r--r--src/seti/gnunet-seti-profiler.c480
-rw-r--r--src/seti/plugin_block_seti_test.c123
-rw-r--r--src/seti/seti.conf.in12
-rw-r--r--src/seti/seti.h267
-rw-r--r--src/seti/seti_api.c895
-rw-r--r--src/seti/test_seti.conf33
-rw-r--r--src/seti/test_seti_api.c393
16 files changed, 6172 insertions, 16 deletions
diff --git a/configure.ac b/configure.ac
index 72309c78d..bd92bd0e9 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1939,6 +1939,8 @@ src/scalarproduct/Makefile
1939src/scalarproduct/scalarproduct.conf 1939src/scalarproduct/scalarproduct.conf
1940src/set/Makefile 1940src/set/Makefile
1941src/set/set.conf 1941src/set/set.conf
1942src/seti/Makefile
1943src/seti/seti.conf
1942src/setu/Makefile 1944src/setu/Makefile
1943src/setu/setu.conf 1945src/setu/setu.conf
1944src/sq/Makefile 1946src/sq/Makefile
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 7d19122ca..12e27fa81 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -333,6 +333,11 @@ src/set/ibf.c
333src/set/ibf_sim.c 333src/set/ibf_sim.c
334src/set/plugin_block_set_test.c 334src/set/plugin_block_set_test.c
335src/set/set_api.c 335src/set/set_api.c
336src/seti/gnunet-service-set_intersection.c
337src/seti/gnunet-service-seti.c
338src/seti/gnunet-seti-profiler.c
339src/seti/plugin_block_seti_test.c
340src/seti/setu_api.c
336src/setu/gnunet-service-setu.c 341src/setu/gnunet-service-setu.c
337src/setu/gnunet-service-setu_strata_estimator.c 342src/setu/gnunet-service-setu_strata_estimator.c
338src/setu/gnunet-setu-ibf-profiler.c 343src/setu/gnunet-setu-ibf-profiler.c
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index c3fcde0b9..e9a2b1c0e 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1713,80 +1713,146 @@ extern "C" {
1713 * Demand the whole element from the other 1713 * Demand the whole element from the other
1714 * peer, given only the hash code. 1714 * peer, given only the hash code.
1715 */ 1715 */
1716#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL 565 1716#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL 559
1717 1717
1718/** 1718/**
1719 * Demand the whole element from the other 1719 * Demand the whole element from the other
1720 * peer, given only the hash code. 1720 * peer, given only the hash code.
1721 */ 1721 */
1722#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND 566 1722#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND 560
1723 1723
1724/** 1724/**
1725 * Tell the other peer to send us a list of 1725 * Tell the other peer to send us a list of
1726 * hashes that match an IBF key. 1726 * hashes that match an IBF key.
1727 */ 1727 */
1728#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY 567 1728#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY 561
1729 1729
1730/** 1730/**
1731 * Tell the other peer which hashes match a 1731 * Tell the other peer which hashes match a
1732 * given IBF key. 1732 * given IBF key.
1733 */ 1733 */
1734#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER 568 1734#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER 562
1735 1735
1736/** 1736/**
1737 * Request a set union operation from a remote peer. 1737 * Request a set union operation from a remote peer.
1738 */ 1738 */
1739#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST 581 1739#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST 563
1740 1740
1741/** 1741/**
1742 * Strata estimator. 1742 * Strata estimator.
1743 */ 1743 */
1744#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE 582 1744#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE 564
1745 1745
1746/** 1746/**
1747 * Invertible bloom filter. 1747 * Invertible bloom filter.
1748 */ 1748 */
1749#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF 583 1749#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF 565
1750 1750
1751/** 1751/**
1752 * Actual set elements. 1752 * Actual set elements.
1753 */ 1753 */
1754#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS 584 1754#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS 566
1755 1755
1756/** 1756/**
1757 * Requests for the elements with the given hashes. 1757 * Requests for the elements with the given hashes.
1758 */ 1758 */
1759#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENT_REQUESTS 585 1759#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENT_REQUESTS 567
1760 1760
1761/** 1761/**
1762 * Set operation is done. 1762 * Set operation is done.
1763 */ 1763 */
1764#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE 586 1764#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE 568
1765 1765
1766/** 1766/**
1767 * Compressed strata estimator. 1767 * Compressed strata estimator.
1768 */ 1768 */
1769#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC 590 1769#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC 569
1770 1770
1771/** 1771/**
1772 * Request all missing elements from the other peer, 1772 * Request all missing elements from the other peer,
1773 * based on their sets and the elements we previously sent 1773 * based on their sets and the elements we previously sent
1774 * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS. 1774 * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS.
1775 */ 1775 */
1776#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE 597 1776#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE 570
1777 1777
1778/** 1778/**
1779 * Send a set element, not as response to a demand but because 1779 * Send a set element, not as response to a demand but because
1780 * we're sending the full set. 1780 * we're sending the full set.
1781 */ 1781 */
1782#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT 598 1782#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT 571
1783 1783
1784/** 1784/**
1785 * Request all missing elements from the other peer, 1785 * Request all missing elements from the other peer,
1786 * based on their sets and the elements we previously sent 1786 * based on their sets and the elements we previously sent
1787 * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS. 1787 * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS.
1788 */ 1788 */
1789#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 599 1789#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER 572
1790
1791
1792/*******************************************************************************
1793 * SETI message types
1794 ******************************************************************************/
1795
1796
1797/**
1798 * Cancel a set operation
1799 */
1800#define GNUNET_MESSAGE_TYPE_SETI_CANCEL 580
1801
1802/**
1803 * Add element to set.
1804 */
1805#define GNUNET_MESSAGE_TYPE_SETI_ADD 581
1806
1807/**
1808 * Create a new local set
1809 */
1810#define GNUNET_MESSAGE_TYPE_SETI_CREATE 582
1811
1812/**
1813 * Handle result message from operation
1814 */
1815#define GNUNET_MESSAGE_TYPE_SETI_RESULT 583
1816
1817/**
1818 * Evaluate a set operation
1819 */
1820#define GNUNET_MESSAGE_TYPE_SETI_EVALUATE 584
1821
1822/**
1823 * Listen for operation requests
1824 */
1825#define GNUNET_MESSAGE_TYPE_SETI_LISTEN 585
1826
1827/**
1828 * Reject a set request.
1829 */
1830#define GNUNET_MESSAGE_TYPE_SETI_REJECT 586
1831
1832/**
1833 * Accept an incoming set request
1834 */
1835#define GNUNET_MESSAGE_TYPE_SETI_ACCEPT 587
1836
1837/**
1838 * Notify the client of an incoming request from a remote peer
1839 */
1840#define GNUNET_MESSAGE_TYPE_SETI_REQUEST 588
1841
1842/**
1843 * Information about the element count for intersection
1844 */
1845#define GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO 591
1846
1847/**
1848 * Bloom filter message for intersection exchange started by Bob.
1849 */
1850#define GNUNET_MESSAGE_TYPE_SETI_P2P_BF 592
1851
1852/**
1853 * Intersection operation is done.
1854 */
1855#define GNUNET_MESSAGE_TYPE_SETI_P2P_DONE 593
1790 1856
1791 1857
1792/******************************************************************************* 1858/*******************************************************************************
diff --git a/src/include/gnunet_seti_service.h b/src/include/gnunet_seti_service.h
new file mode 100644
index 000000000..c0b6f41a5
--- /dev/null
+++ b/src/include/gnunet_seti_service.h
@@ -0,0 +1,369 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013, 2014, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @author Florian Dold
22 * @author Christian Grothoff
23 *
24 * @file
25 * Two-peer set intersection operations
26 *
27 * @defgroup set Set intersection service
28 * Two-peer set operations
29 *
30 * @{
31 */
32
33#ifndef GNUNET_SETI_SERVICE_H
34#define GNUNET_SETI_SERVICE_H
35
36#ifdef __cplusplus
37extern "C"
38{
39#if 0 /* keep Emacsens' auto-indent happy */
40}
41#endif
42#endif
43
44#include "gnunet_common.h"
45#include "gnunet_time_lib.h"
46#include "gnunet_configuration_lib.h"
47
48
49/**
50 * Maximum size of a context message for set operation requests.
51 */
52#define GNUNET_SETI_CONTEXT_MESSAGE_MAX_SIZE ((1 << 16) - 1024)
53
54/**
55 * Opaque handle to a set.
56 */
57struct GNUNET_SETI_Handle;
58
59/**
60 * Opaque handle to a set operation request from another peer.
61 */
62struct GNUNET_SETI_Request;
63
64/**
65 * Opaque handle to a listen operation.
66 */
67struct GNUNET_SETI_ListenHandle;
68
69/**
70 * Opaque handle to a set operation.
71 */
72struct GNUNET_SETI_OperationHandle;
73
74
75/**
76 * Status for the result callback
77 */
78enum GNUNET_SETI_Status
79{
80
81 /**
82 * Element should be added to the result set of the local peer, i.e. the
83 * element is in the intersection.
84 */
85 GNUNET_SETI_STATUS_ADD_LOCAL,
86
87 /**
88 * Element should be delete from the result set of the local peer, i.e. the
89 * local peer is having an element that is not in the intersection.
90 */
91 GNUNET_SETI_STATUS_DEL_LOCAL,
92
93 /**
94 * The other peer refused to do the operation with us, or something went
95 * wrong.
96 */
97 GNUNET_SETI_STATUS_FAILURE,
98
99 /**
100 * Success, all elements have been sent (and received).
101 */
102 GNUNET_SETI_STATUS_DONE
103};
104
105
106/**
107 * Element stored in a set.
108 */
109struct GNUNET_SETI_Element
110{
111 /**
112 * Number of bytes in the buffer pointed to by data.
113 */
114 uint16_t size;
115
116 /**
117 * Application-specific element type.
118 */
119 uint16_t element_type;
120
121 /**
122 * Actual data of the element
123 */
124 const void *data;
125};
126
127
128/**
129 * Possible options to pass to a set operation.
130 *
131 * Used as tag for struct #GNUNET_SETI_Option.
132 */
133enum GNUNET_SETI_OptionType
134{
135 /**
136 * List terminator.
137 */
138 GNUNET_SETI_OPTION_END = 0,
139
140 /**
141 * Return the elements remaining in the intersection
142 * (#GNUNET_SETI_STATUS_ADD_LOCAL). If not given, the default is to return a
143 * list of the elements to be removed (#GNUNET_SETI_STATUS_DEL_LOCAL).
144 */
145 GNUNET_SETI_OPTION_RETURN_INTERSECTION = 1,
146};
147
148
149/**
150 * Option for set operations.
151 */
152struct GNUNET_SETI_Option
153{
154 /**
155 * Type of the option.
156 */
157 enum GNUNET_SETI_OptionType type;
158
159 /**
160 * Value for the option, only used with some options.
161 */
162 union
163 {
164 uint64_t num;
165 } v;
166};
167
168
169/**
170 * Callback for set union operation results. Called for each element
171 * in the result set.
172 *
173 * @param cls closure
174 * @param element a result element, only valid if status is #GNUNET_SETI_STATUS_OK
175 * @param current_size current set size
176 * @param status see `enum GNUNET_SETI_Status`
177 */
178typedef void
179(*GNUNET_SETI_ResultIterator) (void *cls,
180 const struct GNUNET_SETI_Element *element,
181 uint64_t current_size,
182 enum GNUNET_SETI_Status status);
183
184
185/**
186 * Called when another peer wants to do a set operation with the
187 * local peer. If a listen error occurs, the @a request is NULL.
188 *
189 * @param cls closure
190 * @param other_peer the other peer
191 * @param context_msg message with application specific information from
192 * the other peer
193 * @param request request from the other peer (never NULL), use GNUNET_SETI_accept()
194 * to accept it, otherwise the request will be refused
195 * Note that we can't just return value from the listen callback,
196 * as it is also necessary to specify the set we want to do the
197 * operation with, whith sometimes can be derived from the context
198 * message. It's necessary to specify the timeout.
199 */
200typedef void
201(*GNUNET_SETI_ListenCallback) (void *cls,
202 const struct GNUNET_PeerIdentity *other_peer,
203 const struct GNUNET_MessageHeader *context_msg,
204 struct GNUNET_SETI_Request *request);
205
206
207/**
208 * Create an empty set, supporting the specified operation.
209 *
210 * @param cfg configuration to use for connecting to the
211 * set service
212 * @return a handle to the set
213 */
214struct GNUNET_SETI_Handle *
215GNUNET_SETI_create (const struct GNUNET_CONFIGURATION_Handle *cfg);
216
217
218/**
219 * Add an element to the given set.
220 *
221 * @param set set to add element to
222 * @param element element to add to the set
223 * @param cb function to call when finished, can be NULL
224 * @param cb_cls closure for @a cb
225 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
226 * set is invalid (e.g. the set service crashed)
227 */
228int
229GNUNET_SETI_add_element (struct GNUNET_SETI_Handle *set,
230 const struct GNUNET_SETI_Element *element,
231 GNUNET_SCHEDULER_TaskCallback cb,
232 void *cb_cls);
233
234
235/**
236 * Destroy the set handle, and free all associated resources. Operations may
237 * still be pending when a set is destroyed (and will be allowed to complete).
238 *
239 * @param set set to destroy
240 */
241void
242GNUNET_SETI_destroy (struct GNUNET_SETI_Handle *set);
243
244
245/**
246 * Prepare a set operation to be evaluated with another peer. The evaluation
247 * will not start until the client provides a local set with
248 * GNUNET_SETI_commit().
249 *
250 * @param other_peer peer with the other set
251 * @param app_id hash for the application using the set
252 * @param context_msg additional information for the request
253 * @param options options to use when processing the request
254 * @param result_cb called on error or success
255 * @param result_cls closure for @a result_cb
256 * @return a handle to cancel the operation
257 */
258struct GNUNET_SETI_OperationHandle *
259GNUNET_SETI_prepare (const struct GNUNET_PeerIdentity *other_peer,
260 const struct GNUNET_HashCode *app_id,
261 const struct GNUNET_MessageHeader *context_msg,
262 const struct GNUNET_SETI_Option options[],
263 GNUNET_SETI_ResultIterator result_cb,
264 void *result_cls);
265
266
267/**
268 * Wait for set operation requests for the given application ID.
269 * If the connection to the set service is lost, the listener is
270 * re-created transparently with exponential backoff.
271 *
272 * @param cfg configuration to use for connecting to
273 * the set service
274 * @param app_id id of the application that handles set operation requests
275 * @param listen_cb called for each incoming request matching the operation
276 * and application id
277 * @param listen_cls handle for @a listen_cb
278 * @return a handle that can be used to cancel the listen operation
279 */
280struct GNUNET_SETI_ListenHandle *
281GNUNET_SETI_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
282 const struct GNUNET_HashCode *app_id,
283 GNUNET_SETI_ListenCallback listen_cb,
284 void *listen_cls);
285
286
287/**
288 * Cancel the given listen operation. After calling cancel, the
289 * listen callback for this listen handle will not be called again.
290 * Note that cancelling a listen operation will automatically reject
291 * all operations that have not yet been accepted.
292 *
293 * @param lh handle for the listen operation
294 */
295void
296GNUNET_SETI_listen_cancel (struct GNUNET_SETI_ListenHandle *lh);
297
298
299/**
300 * Accept a request we got via GNUNET_SETI_listen(). Must be called during
301 * GNUNET_SETI_listen(), as the `struct GNUNET_SETI_Request` becomes invalid
302 * afterwards.
303 * Call GNUNET_SETI_commit() to provide the local set to use for the operation,
304 * and to begin the exchange with the remote peer.
305 *
306 * @param request request to accept
307 * @param options options to use when processing the request
308 * @param result_cb callback for the results
309 * @param result_cls closure for @a result_cb
310 * @return a handle to cancel the operation
311 */
312struct GNUNET_SETI_OperationHandle *
313GNUNET_SETI_accept (struct GNUNET_SETI_Request *request,
314 const struct GNUNET_SETI_Option options[],
315 GNUNET_SETI_ResultIterator result_cb,
316 void *result_cls);
317
318
319/**
320 * Commit a set to be used with a set operation.
321 * This function is called once we have fully constructed
322 * the set that we want to use for the operation. At this
323 * time, the P2P protocol can then begin to exchange the
324 * set information and call the result callback with the
325 * result information.
326 *
327 * @param oh handle to the set operation
328 * @param set the set to use for the operation
329 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
330 * set is invalid (e.g. the set service crashed)
331 */
332int
333GNUNET_SETI_commit (struct GNUNET_SETI_OperationHandle *oh,
334 struct GNUNET_SETI_Handle *set);
335
336
337/**
338 * Cancel the given set operation. May not be called after the operation's
339 * `GNUNET_SETI_ResultIterator` has been called with a status of
340 * #GNUNET_SETI_STATUS_FAILURE or #GNUNET_SETI_STATUS_DONE.
341 *
342 * @param oh set operation to cancel
343 */
344void
345GNUNET_SETI_operation_cancel (struct GNUNET_SETI_OperationHandle *oh);
346
347
348/**
349 * Hash a set element.
350 *
351 * @param element the element that should be hashed
352 * @param[out] ret_hash a pointer to where the hash of @a element
353 * should be stored
354 */
355void
356GNUNET_SETI_element_hash (const struct GNUNET_SETI_Element *element,
357 struct GNUNET_HashCode *ret_hash);
358
359
360#if 0 /* keep Emacsens' auto-indent happy */
361{
362#endif
363#ifdef __cplusplus
364}
365#endif
366
367#endif
368
369/** @} */ /* end of group */
diff --git a/src/include/gnunet_setu_service.h b/src/include/gnunet_setu_service.h
index 092c03198..d737b97c1 100644
--- a/src/include/gnunet_setu_service.h
+++ b/src/include/gnunet_setu_service.h
@@ -22,9 +22,9 @@
22 * @author Christian Grothoff 22 * @author Christian Grothoff
23 * 23 *
24 * @file 24 * @file
25 * Two-peer set operations 25 * Two-peer set union operations
26 * 26 *
27 * @defgroup set Set service 27 * @defgroup set Set union service
28 * Two-peer set operations 28 * Two-peer set operations
29 * 29 *
30 * @{ 30 * @{
diff --git a/src/seti/.gitignore b/src/seti/.gitignore
new file mode 100644
index 000000000..5f234a4c2
--- /dev/null
+++ b/src/seti/.gitignore
@@ -0,0 +1,3 @@
1gnunet-seti-profiler
2gnunet-service-seti
3test_seti_api
diff --git a/src/seti/Makefile.am b/src/seti/Makefile.am
new file mode 100644
index 000000000..d96ffff03
--- /dev/null
+++ b/src/seti/Makefile.am
@@ -0,0 +1,90 @@
1# This Makefile.am is in the public domain
2AM_CPPFLAGS = -I$(top_srcdir)/src/include
3
4pkgcfgdir= $(pkgdatadir)/config.d/
5
6libexecdir= $(pkglibdir)/libexec/
7
8plugindir = $(libdir)/gnunet
9
10pkgcfg_DATA = \
11 seti.conf
12
13if USE_COVERAGE
14 AM_CFLAGS = -fprofile-arcs -ftest-coverage
15endif
16
17if HAVE_TESTING
18bin_PROGRAMS = \
19 gnunet-seti-profiler
20endif
21
22libexec_PROGRAMS = \
23 gnunet-service-seti
24
25lib_LTLIBRARIES = \
26 libgnunetseti.la
27
28gnunet_seti_profiler_SOURCES = \
29 gnunet-seti-profiler.c
30gnunet_seti_profiler_LDADD = \
31 $(top_builddir)/src/util/libgnunetutil.la \
32 $(top_builddir)/src/statistics/libgnunetstatistics.la \
33 libgnunetseti.la \
34 $(top_builddir)/src/testing/libgnunettesting.la \
35 $(GN_LIBINTL)
36
37
38gnunet_service_seti_SOURCES = \
39 gnunet-service-seti.c \
40 gnunet-service-set_protocol.h
41gnunet_service_seti_LDADD = \
42 $(top_builddir)/src/util/libgnunetutil.la \
43 $(top_builddir)/src/statistics/libgnunetstatistics.la \
44 $(top_builddir)/src/core/libgnunetcore.la \
45 $(top_builddir)/src/cadet/libgnunetcadet.la \
46 $(top_builddir)/src/block/libgnunetblock.la \
47 libgnunetseti.la \
48 $(GN_LIBINTL)
49
50libgnunetseti_la_SOURCES = \
51 seti_api.c seti.h
52libgnunetseti_la_LIBADD = \
53 $(top_builddir)/src/util/libgnunetutil.la \
54 $(LTLIBINTL)
55libgnunetseti_la_LDFLAGS = \
56 $(GN_LIB_LDFLAGS)
57
58if HAVE_TESTING
59check_PROGRAMS = \
60 test_seti_api
61endif
62
63if ENABLE_TEST_RUN
64AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset XDG_CONFIG_HOME;
65TESTS = $(check_PROGRAMS)
66endif
67
68test_seti_api_SOURCES = \
69 test_seti_api.c
70test_seti_api_LDADD = \
71 $(top_builddir)/src/util/libgnunetutil.la \
72 $(top_builddir)/src/testing/libgnunettesting.la \
73 libgnunetset.la
74
75plugin_LTLIBRARIES = \
76 libgnunet_plugin_block_seti_test.la
77
78libgnunet_plugin_block_seti_test_la_SOURCES = \
79 plugin_block_seti_test.c
80libgnunet_plugin_block_seti_test_la_LIBADD = \
81 $(top_builddir)/src/block/libgnunetblock.la \
82 $(top_builddir)/src/block/libgnunetblockgroup.la \
83 $(top_builddir)/src/util/libgnunetutil.la \
84 $(LTLIBINTL)
85libgnunet_plugin_block_seti_test_la_LDFLAGS = \
86 $(GN_PLUGIN_LDFLAGS)
87
88
89EXTRA_DIST = \
90 test_seti.conf
diff --git a/src/seti/gnunet-service-seti.c b/src/seti/gnunet-service-seti.c
new file mode 100644
index 000000000..3b8da01cd
--- /dev/null
+++ b/src/seti/gnunet-service-seti.c
@@ -0,0 +1,3274 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/gnunet-service-seti.c
22 * @brief two-peer set intersection operations
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "gnunet-service-seti_protocol.h"
27#include "gnunet_statistics_service.h"
28
29/**
30 * How long do we hold on to an incoming channel if there is
31 * no local listener before giving up?
32 */
33#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
34
35
36/**
37 * Current phase we are in for a intersection operation.
38 */
39enum IntersectionOperationPhase
40{
41 /**
42 * We are just starting.
43 */
44 PHASE_INITIAL,
45
46 /**
47 * We have send the number of our elements to the other
48 * peer, but did not setup our element set yet.
49 */
50 PHASE_COUNT_SENT,
51
52 /**
53 * We have initialized our set and are now reducing it by exchanging
54 * Bloom filters until one party notices the their element hashes
55 * are equal.
56 */
57 PHASE_BF_EXCHANGE,
58
59 /**
60 * We must next send the P2P DONE message (after finishing mostly
61 * with the local client). Then we will wait for the channel to close.
62 */
63 PHASE_MUST_SEND_DONE,
64
65 /**
66 * We have received the P2P DONE message, and must finish with the
67 * local client before terminating the channel.
68 */
69 PHASE_DONE_RECEIVED,
70
71 /**
72 * The protocol is over. Results may still have to be sent to the
73 * client.
74 */
75 PHASE_FINISHED
76};
77
78
79/**
80 * Implementation-specific set state. Used as opaque pointer, and
81 * specified further in the respective implementation.
82 */
83struct SetState;
84
85/**
86 * Implementation-specific set operation. Used as opaque pointer, and
87 * specified further in the respective implementation.
88 */
89struct OperationState;
90
91/**
92 * A set that supports a specific operation with other peers.
93 */
94struct Set;
95
96/**
97 * Information about an element element in the set. All elements are
98 * stored in a hash-table from their hash-code to their 'struct
99 * Element', so that the remove and add operations are reasonably
100 * fast.
101 */
102struct ElementEntry;
103
104/**
105 * Operation context used to execute a set operation.
106 */
107struct Operation;
108
109
110/**
111 * MutationEvent gives information about changes
112 * to an element (removal / addition) in a set content.
113 */
114struct MutationEvent
115{
116 /**
117 * First generation affected by this mutation event.
118 *
119 * If @a generation is 0, this mutation event is a list
120 * sentinel element.
121 */
122 unsigned int generation;
123
124 /**
125 * If @a added is #GNUNET_YES, then this is a
126 * `remove` event, otherwise it is an `add` event.
127 */
128 int added;
129};
130
131
132/**
133 * Information about an element element in the set. All elements are
134 * stored in a hash-table from their hash-code to their `struct
135 * Element`, so that the remove and add operations are reasonably
136 * fast.
137 */
138struct ElementEntry
139{
140 /**
141 * The actual element. The data for the element
142 * should be allocated at the end of this struct.
143 */
144 struct GNUNET_SET_Element element;
145
146 /**
147 * Hash of the element. For set union: Will be used to derive the
148 * different IBF keys for different salts.
149 */
150 struct GNUNET_HashCode element_hash;
151
152 /**
153 * If @a mutations is not NULL, it contains
154 * a list of mutations, ordered by increasing generation.
155 * The list is terminated by a sentinel event with `generation`
156 * set to 0.
157 *
158 * If @a mutations is NULL, then this element exists in all generations
159 * of the respective set content this element belongs to.
160 */
161 struct MutationEvent *mutations;
162
163 /**
164 * Number of elements in the array @a mutations.
165 */
166 unsigned int mutations_size;
167
168 /**
169 * #GNUNET_YES if the element is a remote element, and does not belong
170 * to the operation's set.
171 */
172 int remote;
173};
174
175
176/**
177 * A listener is inhabited by a client, and waits for evaluation
178 * requests from remote peers.
179 */
180struct Listener;
181
182
183/**
184 * State we keep per client.
185 */
186struct ClientState
187{
188 /**
189 * Set, if associated with the client, otherwise NULL.
190 */
191 struct Set *set;
192
193 /**
194 * Listener, if associated with the client, otherwise NULL.
195 */
196 struct Listener *listener;
197
198 /**
199 * Client handle.
200 */
201 struct GNUNET_SERVICE_Client *client;
202
203 /**
204 * Message queue.
205 */
206 struct GNUNET_MQ_Handle *mq;
207};
208
209
210/**
211 * Operation context used to execute a set operation.
212 */
213struct Operation
214{
215 /**
216 * Kept in a DLL of the listener, if @e listener is non-NULL.
217 */
218 struct Operation *next;
219
220 /**
221 * Kept in a DLL of the listener, if @e listener is non-NULL.
222 */
223 struct Operation *prev;
224
225 /**
226 * Channel to the peer.
227 */
228 struct GNUNET_CADET_Channel *channel;
229
230 /**
231 * Port this operation runs on.
232 */
233 struct Listener *listener;
234
235 /**
236 * Message queue for the channel.
237 */
238 struct GNUNET_MQ_Handle *mq;
239
240 /**
241 * Context message, may be NULL.
242 */
243 struct GNUNET_MessageHeader *context_msg;
244
245 /**
246 * Set associated with the operation, NULL until the spec has been
247 * associated with a set.
248 */
249 struct Set *set;
250
251 /**
252 * Operation-specific operation state. Note that the exact
253 * type depends on this being a union or intersection operation
254 * (and thus on @e vt).
255 */
256 struct OperationState *state; // FIXME: inline
257
258 /**
259 * The identity of the requesting peer. Needs to
260 * be stored here as the op spec might not have been created yet.
261 */
262 struct GNUNET_PeerIdentity peer;
263
264 /**
265 * Timeout task, if the incoming peer has not been accepted
266 * after the timeout, it will be disconnected.
267 */
268 struct GNUNET_SCHEDULER_Task *timeout_task;
269
270 /**
271 * Salt to use for the operation.
272 */
273 uint32_t salt;
274
275 /**
276 * Remote peers element count
277 */
278 uint32_t remote_element_count;
279
280 /**
281 * ID used to identify an operation between service and client
282 */
283 uint32_t client_request_id;
284
285 /**
286 * When are elements sent to the client, and which elements are sent?
287 */
288 enum GNUNET_SET_ResultMode result_mode;
289
290 /**
291 * Always use delta operation instead of sending full sets,
292 * even it it's less efficient.
293 */
294 int force_delta;
295
296 /**
297 * Always send full sets, even if delta operations would
298 * be more efficient.
299 */
300 int force_full;
301
302 /**
303 * #GNUNET_YES to fail operations where Byzantine faults
304 * are suspected
305 */
306 int byzantine;
307
308 /**
309 * Lower bound for the set size, used only when
310 * byzantine mode is enabled.
311 */
312 int byzantine_lower_bound;
313
314 /**
315 * Unique request id for the request from a remote peer, sent to the
316 * client, which will accept or reject the request. Set to '0' iff
317 * the request has not been suggested yet.
318 */
319 uint32_t suggest_id;
320
321 /**
322 * Generation in which the operation handle
323 * was created.
324 */
325 unsigned int generation_created;
326};
327
328
329/**
330 * SetContent stores the actual set elements, which may be shared by
331 * multiple generations derived from one set.
332 */
333struct SetContent
334{
335 /**
336 * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
337 */
338 struct GNUNET_CONTAINER_MultiHashMap *elements;
339
340 /**
341 * Mutations requested by the client that we're
342 * unable to execute right now because we're iterating
343 * over the underlying hash map of elements.
344 */
345 struct PendingMutation *pending_mutations_head;
346
347 /**
348 * Mutations requested by the client that we're
349 * unable to execute right now because we're iterating
350 * over the underlying hash map of elements.
351 */
352 struct PendingMutation *pending_mutations_tail;
353
354 /**
355 * Number of references to the content.
356 */
357 unsigned int refcount;
358
359 /**
360 * FIXME: document!
361 */
362 unsigned int latest_generation;
363
364 /**
365 * Number of concurrently active iterators.
366 */
367 int iterator_count;
368};
369
370
371struct GenerationRange
372{
373 /**
374 * First generation that is excluded.
375 */
376 unsigned int start;
377
378 /**
379 * Generation after the last excluded generation.
380 */
381 unsigned int end;
382};
383
384
385/**
386 * Information about a mutation to apply to a set.
387 */
388struct PendingMutation
389{
390 /**
391 * Mutations are kept in a DLL.
392 */
393 struct PendingMutation *prev;
394
395 /**
396 * Mutations are kept in a DLL.
397 */
398 struct PendingMutation *next;
399
400 /**
401 * Set this mutation is about.
402 */
403 struct Set *set;
404
405 /**
406 * Message that describes the desired mutation.
407 * May only be a #GNUNET_MESSAGE_TYPE_SET_ADD or
408 * #GNUNET_MESSAGE_TYPE_SET_REMOVE.
409 */
410 struct GNUNET_SET_ElementMessage *msg;
411};
412
413
414/**
415 * A set that supports a specific operation with other peers.
416 */
417struct Set
418{
419 /**
420 * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`).
421 */
422 struct Set *next;
423
424 /**
425 * Sets are held in a doubly linked list.
426 */
427 struct Set *prev;
428
429 /**
430 * Client that owns the set. Only one client may own a set,
431 * and there can only be one set per client.
432 */
433 struct ClientState *cs;
434
435 /**
436 * Content, possibly shared by multiple sets,
437 * and thus reference counted.
438 */
439 struct SetContent *content;
440
441 /**
442 * Implementation-specific state.
443 */
444 struct SetState *state;
445
446 /**
447 * Current state of iterating elements for the client.
448 * NULL if we are not currently iterating.
449 */
450 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
451
452 /**
453 * Evaluate operations are held in a linked list.
454 */
455 struct Operation *ops_head;
456
457 /**
458 * Evaluate operations are held in a linked list.
459 */
460 struct Operation *ops_tail;
461
462 /**
463 * List of generations we have to exclude, due to lazy copies.
464 */
465 struct GenerationRange *excluded_generations;
466
467 /**
468 * Current generation, that is, number of previously executed
469 * operations and lazy copies on the underlying set content.
470 */
471 unsigned int current_generation;
472
473 /**
474 * Number of elements in array @a excluded_generations.
475 */
476 unsigned int excluded_generations_size;
477
478 /**
479 * Type of operation supported for this set
480 */
481 enum GNUNET_SET_OperationType operation;
482
483 /**
484 * Generation we're currently iteration over.
485 */
486 unsigned int iter_generation;
487
488 /**
489 * Each @e iter is assigned a unique number, so that the client
490 * can distinguish iterations.
491 */
492 uint16_t iteration_id;
493};
494
495
496/**
497 * State of an evaluate operation with another peer.
498 */
499struct OperationState
500{
501 /**
502 * The bf we currently receive
503 */
504 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
505
506 /**
507 * BF of the set's element.
508 */
509 struct GNUNET_CONTAINER_BloomFilter *local_bf;
510
511 /**
512 * Remaining elements in the intersection operation.
513 * Maps element-id-hashes to 'elements in our set'.
514 */
515 struct GNUNET_CONTAINER_MultiHashMap *my_elements;
516
517 /**
518 * Iterator for sending the final set of @e my_elements to the client.
519 */
520 struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
521
522 /**
523 * Evaluate operations are held in a linked list.
524 */
525 struct OperationState *next;
526
527 /**
528 * Evaluate operations are held in a linked list.
529 */
530 struct OperationState *prev;
531
532 /**
533 * For multipart BF transmissions, we have to store the
534 * bloomfilter-data until we fully received it.
535 */
536 char *bf_data;
537
538 /**
539 * XOR of the keys of all of the elements (remaining) in my set.
540 * Always updated when elements are added or removed to
541 * @e my_elements.
542 */
543 struct GNUNET_HashCode my_xor;
544
545 /**
546 * XOR of the keys of all of the elements (remaining) in
547 * the other peer's set. Updated when we receive the
548 * other peer's Bloom filter.
549 */
550 struct GNUNET_HashCode other_xor;
551
552 /**
553 * How many bytes of @e bf_data are valid?
554 */
555 uint32_t bf_data_offset;
556
557 /**
558 * Current element count contained within @e my_elements.
559 * (May differ briefly during initialization.)
560 */
561 uint32_t my_element_count;
562
563 /**
564 * size of the bloomfilter in @e bf_data.
565 */
566 uint32_t bf_data_size;
567
568 /**
569 * size of the bloomfilter
570 */
571 uint32_t bf_bits_per_element;
572
573 /**
574 * Salt currently used for BF construction (by us or the other peer,
575 * depending on where we are in the code).
576 */
577 uint32_t salt;
578
579 /**
580 * Current state of the operation.
581 */
582 enum IntersectionOperationPhase phase;
583
584 /**
585 * Generation in which the operation handle
586 * was created.
587 */
588 unsigned int generation_created;
589
590 /**
591 * Did we send the client that we are done?
592 */
593 int client_done_sent;
594
595 /**
596 * Set whenever we reach the state where the death of the
597 * channel is perfectly find and should NOT result in the
598 * operation being cancelled.
599 */
600 int channel_death_expected;
601};
602
603
604/**
605 * Extra state required for efficient set intersection.
606 * Merely tracks the total number of elements.
607 */
608struct SetState
609{
610 /**
611 * Number of currently valid elements in the set which have not been
612 * removed.
613 */
614 uint32_t current_set_element_count;
615};
616
617
618/**
619 * A listener is inhabited by a client, and waits for evaluation
620 * requests from remote peers.
621 */
622struct Listener
623{
624 /**
625 * Listeners are held in a doubly linked list.
626 */
627 struct Listener *next;
628
629 /**
630 * Listeners are held in a doubly linked list.
631 */
632 struct Listener *prev;
633
634 /**
635 * Head of DLL of operations this listener is responsible for.
636 * Once the client has accepted/declined the operation, the
637 * operation is moved to the respective set's operation DLLS.
638 */
639 struct Operation *op_head;
640
641 /**
642 * Tail of DLL of operations this listener is responsible for.
643 * Once the client has accepted/declined the operation, the
644 * operation is moved to the respective set's operation DLLS.
645 */
646 struct Operation *op_tail;
647
648 /**
649 * Client that owns the listener.
650 * Only one client may own a listener.
651 */
652 struct ClientState *cs;
653
654 /**
655 * The port we are listening on with CADET.
656 */
657 struct GNUNET_CADET_Port *open_port;
658
659 /**
660 * Application ID for the operation, used to distinguish
661 * multiple operations of the same type with the same peer.
662 */
663 struct GNUNET_HashCode app_id;
664
665 /**
666 * The type of the operation.
667 */
668 enum GNUNET_SET_OperationType operation;
669};
670
671
672/**
673 * Handle to the cadet service, used to listen for and connect to
674 * remote peers.
675 */
676static struct GNUNET_CADET_Handle *cadet;
677
678/**
679 * Statistics handle.
680 */
681static struct GNUNET_STATISTICS_Handle *_GSS_statistics;
682
683/**
684 * Listeners are held in a doubly linked list.
685 */
686static struct Listener *listener_head;
687
688/**
689 * Listeners are held in a doubly linked list.
690 */
691static struct Listener *listener_tail;
692
693/**
694 * Number of active clients.
695 */
696static unsigned int num_clients;
697
698/**
699 * Are we in shutdown? if #GNUNET_YES and the number of clients
700 * drops to zero, disconnect from CADET.
701 */
702static int in_shutdown;
703
704/**
705 * Counter for allocating unique IDs for clients, used to identify
706 * incoming operation requests from remote peers, that the client can
707 * choose to accept or refuse. 0 must not be used (reserved for
708 * uninitialized).
709 */
710static uint32_t suggest_id;
711
712
713/**
714 * If applicable in the current operation mode, send a result message
715 * to the client indicating we removed an element.
716 *
717 * @param op intersection operation
718 * @param element element to send
719 */
720static void
721send_client_removed_element (struct Operation *op,
722 struct GNUNET_SET_Element *element)
723{
724 struct GNUNET_MQ_Envelope *ev;
725 struct GNUNET_SET_ResultMessage *rm;
726
727 if (GNUNET_SET_RESULT_REMOVED != op->result_mode)
728 return; /* Wrong mode for transmitting removed elements */
729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730 "Sending removed element (size %u) to client\n",
731 element->size);
732 GNUNET_STATISTICS_update (_GSS_statistics,
733 "# Element removed messages sent",
734 1,
735 GNUNET_NO);
736 GNUNET_assert (0 != op->client_request_id);
737 ev = GNUNET_MQ_msg_extra (rm,
738 element->size,
739 GNUNET_MESSAGE_TYPE_SET_RESULT);
740 if (NULL == ev)
741 {
742 GNUNET_break (0);
743 return;
744 }
745 rm->result_status = htons (GNUNET_SET_STATUS_OK);
746 rm->request_id = htonl (op->client_request_id);
747 rm->element_type = element->element_type;
748 GNUNET_memcpy (&rm[1],
749 element->data,
750 element->size);
751 GNUNET_MQ_send (op->set->cs->mq,
752 ev);
753}
754
755
756/**
757 * Fills the "my_elements" hashmap with all relevant elements.
758 *
759 * @param cls the `struct Operation *` we are performing
760 * @param key current key code
761 * @param value the `struct ElementEntry *` from the hash map
762 * @return #GNUNET_YES (we should continue to iterate)
763 */
764static int
765filtered_map_initialization (void *cls,
766 const struct GNUNET_HashCode *key,
767 void *value)
768{
769 struct Operation *op = cls;
770 struct ElementEntry *ee = value;
771 struct GNUNET_HashCode mutated_hash;
772
773
774 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
775 "FIMA called for %s:%u\n",
776 GNUNET_h2s (&ee->element_hash),
777 ee->element.size);
778
779 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
780 {
781 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
783 GNUNET_h2s (&ee->element_hash),
784 ee->element.size);
785 return GNUNET_YES; /* element not valid in our operation's generation */
786 }
787
788 /* Test if element is in other peer's bloomfilter */
789 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
790 op->state->salt,
791 &mutated_hash);
792 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
793 "Testing mingled hash %s with salt %u\n",
794 GNUNET_h2s (&mutated_hash),
795 op->state->salt);
796 if (GNUNET_NO ==
797 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
798 &mutated_hash))
799 {
800 /* remove this element */
801 send_client_removed_element (op,
802 &ee->element);
803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
804 "Reduced initialization, not starting with %s:%u\n",
805 GNUNET_h2s (&ee->element_hash),
806 ee->element.size);
807 return GNUNET_YES;
808 }
809 op->state->my_element_count++;
810 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
811 &ee->element_hash,
812 &op->state->my_xor);
813 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
814 "Filtered initialization of my_elements, adding %s:%u\n",
815 GNUNET_h2s (&ee->element_hash),
816 ee->element.size);
817 GNUNET_break (GNUNET_YES ==
818 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
819 &ee->element_hash,
820 ee,
821 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
822
823 return GNUNET_YES;
824}
825
826
827/**
828 * Removes elements from our hashmap if they are not contained within the
829 * provided remote bloomfilter.
830 *
831 * @param cls closure with the `struct Operation *`
832 * @param key current key code
833 * @param value value in the hash map
834 * @return #GNUNET_YES (we should continue to iterate)
835 */
836static int
837iterator_bf_reduce (void *cls,
838 const struct GNUNET_HashCode *key,
839 void *value)
840{
841 struct Operation *op = cls;
842 struct ElementEntry *ee = value;
843 struct GNUNET_HashCode mutated_hash;
844
845 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
846 op->state->salt,
847 &mutated_hash);
848 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849 "Testing mingled hash %s with salt %u\n",
850 GNUNET_h2s (&mutated_hash),
851 op->state->salt);
852 if (GNUNET_NO ==
853 GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
854 &mutated_hash))
855 {
856 GNUNET_break (0 < op->state->my_element_count);
857 op->state->my_element_count--;
858 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
859 &ee->element_hash,
860 &op->state->my_xor);
861 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862 "Bloom filter reduction of my_elements, removing %s:%u\n",
863 GNUNET_h2s (&ee->element_hash),
864 ee->element.size);
865 GNUNET_assert (GNUNET_YES ==
866 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
867 &ee->element_hash,
868 ee));
869 send_client_removed_element (op,
870 &ee->element);
871 }
872 else
873 {
874 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
875 "Bloom filter reduction of my_elements, keeping %s:%u\n",
876 GNUNET_h2s (&ee->element_hash),
877 ee->element.size);
878 }
879 return GNUNET_YES;
880}
881
882
883/**
884 * Create initial bloomfilter based on all the elements given.
885 *
886 * @param cls the `struct Operation *`
887 * @param key current key code
888 * @param value the `struct ElementEntry` to process
889 * @return #GNUNET_YES (we should continue to iterate)
890 */
891static int
892iterator_bf_create (void *cls,
893 const struct GNUNET_HashCode *key,
894 void *value)
895{
896 struct Operation *op = cls;
897 struct ElementEntry *ee = value;
898 struct GNUNET_HashCode mutated_hash;
899
900 GNUNET_BLOCK_mingle_hash (&ee->element_hash,
901 op->state->salt,
902 &mutated_hash);
903 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
904 "Initializing BF with hash %s with salt %u\n",
905 GNUNET_h2s (&mutated_hash),
906 op->state->salt);
907 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
908 &mutated_hash);
909 return GNUNET_YES;
910}
911
912
913/**
914 * Inform the client that the intersection operation has failed,
915 * and proceed to destroy the evaluate operation.
916 *
917 * @param op the intersection operation to fail
918 */
919static void
920fail_intersection_operation (struct Operation *op)
921{
922 struct GNUNET_MQ_Envelope *ev;
923 struct GNUNET_SET_ResultMessage *msg;
924
925 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
926 "Intersection operation failed\n");
927 GNUNET_STATISTICS_update (_GSS_statistics,
928 "# Intersection operations failed",
929 1,
930 GNUNET_NO);
931 if (NULL != op->state->my_elements)
932 {
933 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
934 op->state->my_elements = NULL;
935 }
936 ev = GNUNET_MQ_msg (msg,
937 GNUNET_MESSAGE_TYPE_SET_RESULT);
938 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
939 msg->request_id = htonl (op->client_request_id);
940 msg->element_type = htons (0);
941 GNUNET_MQ_send (op->set->cs->mq,
942 ev);
943 _GSS_operation_destroy (op,
944 GNUNET_YES);
945}
946
947
948/**
949 * Send a bloomfilter to our peer. After the result done message has
950 * been sent to the client, destroy the evaluate operation.
951 *
952 * @param op intersection operation
953 */
954static void
955send_bloomfilter (struct Operation *op)
956{
957 struct GNUNET_MQ_Envelope *ev;
958 struct BFMessage *msg;
959 uint32_t bf_size;
960 uint32_t bf_elementbits;
961 uint32_t chunk_size;
962 char *bf_data;
963 uint32_t offset;
964
965 /* We consider the ratio of the set sizes to determine
966 the number of bits per element, as the smaller set
967 should use more bits to maximize its set reduction
968 potential and minimize overall bandwidth consumption. */
969 bf_elementbits = 2 + ceil (log2 ((double)
970 (op->remote_element_count
971 / (double) op->state->my_element_count)));
972 if (bf_elementbits < 1)
973 bf_elementbits = 1; /* make sure k is not 0 */
974 /* optimize BF-size to ~50% of bits set */
975 bf_size = ceil ((double) (op->state->my_element_count
976 * bf_elementbits / log (2)));
977 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
978 "Sending Bloom filter (%u) of size %u bytes\n",
979 (unsigned int) bf_elementbits,
980 (unsigned int) bf_size);
981 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
982 bf_size,
983 bf_elementbits);
984 op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
985 UINT32_MAX);
986 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
987 &iterator_bf_create,
988 op);
989
990 /* send our Bloom filter */
991 GNUNET_STATISTICS_update (_GSS_statistics,
992 "# Intersection Bloom filters sent",
993 1,
994 GNUNET_NO);
995 chunk_size = 60 * 1024 - sizeof(struct BFMessage);
996 if (bf_size <= chunk_size)
997 {
998 /* singlepart */
999 chunk_size = bf_size;
1000 ev = GNUNET_MQ_msg_extra (msg,
1001 chunk_size,
1002 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
1003 GNUNET_assert (GNUNET_SYSERR !=
1004 GNUNET_CONTAINER_bloomfilter_get_raw_data (
1005 op->state->local_bf,
1006 (char *) &msg[1],
1007 bf_size));
1008 msg->sender_element_count = htonl (op->state->my_element_count);
1009 msg->bloomfilter_total_length = htonl (bf_size);
1010 msg->bits_per_element = htonl (bf_elementbits);
1011 msg->sender_mutator = htonl (op->state->salt);
1012 msg->element_xor_hash = op->state->my_xor;
1013 GNUNET_MQ_send (op->mq, ev);
1014 }
1015 else
1016 {
1017 /* multipart */
1018 bf_data = GNUNET_malloc (bf_size);
1019 GNUNET_assert (GNUNET_SYSERR !=
1020 GNUNET_CONTAINER_bloomfilter_get_raw_data (
1021 op->state->local_bf,
1022 bf_data,
1023 bf_size));
1024 offset = 0;
1025 while (offset < bf_size)
1026 {
1027 if (bf_size - chunk_size < offset)
1028 chunk_size = bf_size - offset;
1029 ev = GNUNET_MQ_msg_extra (msg,
1030 chunk_size,
1031 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
1032 GNUNET_memcpy (&msg[1],
1033 &bf_data[offset],
1034 chunk_size);
1035 offset += chunk_size;
1036 msg->sender_element_count = htonl (op->state->my_element_count);
1037 msg->bloomfilter_total_length = htonl (bf_size);
1038 msg->bits_per_element = htonl (bf_elementbits);
1039 msg->sender_mutator = htonl (op->state->salt);
1040 msg->element_xor_hash = op->state->my_xor;
1041 GNUNET_MQ_send (op->mq, ev);
1042 }
1043 GNUNET_free (bf_data);
1044 }
1045 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1046 op->state->local_bf = NULL;
1047}
1048
1049
1050/**
1051 * Signal to the client that the operation has finished and
1052 * destroy the operation.
1053 *
1054 * @param cls operation to destroy
1055 */
1056static void
1057send_client_done_and_destroy (void *cls)
1058{
1059 struct Operation *op = cls;
1060 struct GNUNET_MQ_Envelope *ev;
1061 struct GNUNET_SET_ResultMessage *rm;
1062
1063 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064 "Intersection succeeded, sending DONE to local client\n");
1065 GNUNET_STATISTICS_update (_GSS_statistics,
1066 "# Intersection operations succeeded",
1067 1,
1068 GNUNET_NO);
1069 ev = GNUNET_MQ_msg (rm,
1070 GNUNET_MESSAGE_TYPE_SET_RESULT);
1071 rm->request_id = htonl (op->client_request_id);
1072 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1073 rm->element_type = htons (0);
1074 GNUNET_MQ_send (op->set->cs->mq,
1075 ev);
1076 _GSS_operation_destroy (op,
1077 GNUNET_YES);
1078}
1079
1080
1081/**
1082 * Remember that we are done dealing with the local client
1083 * AND have sent the other peer our message that we are done,
1084 * so we are not just waiting for the channel to die before
1085 * telling the local client that we are done as our last act.
1086 *
1087 * @param cls the `struct Operation`.
1088 */
1089static void
1090finished_local_operations (void *cls)
1091{
1092 struct Operation *op = cls;
1093
1094 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095 "DONE sent to other peer, now waiting for other end to close the channel\n");
1096 op->state->phase = PHASE_FINISHED;
1097 op->state->channel_death_expected = GNUNET_YES;
1098}
1099
1100
1101/**
1102 * Notify the other peer that we are done. Once this message
1103 * is out, we still need to notify the local client that we
1104 * are done.
1105 *
1106 * @param op operation to notify for.
1107 */
1108static void
1109send_p2p_done (struct Operation *op)
1110{
1111 struct GNUNET_MQ_Envelope *ev;
1112 struct IntersectionDoneMessage *idm;
1113
1114 GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase);
1115 GNUNET_assert (GNUNET_NO == op->state->channel_death_expected);
1116 ev = GNUNET_MQ_msg (idm,
1117 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
1118 idm->final_element_count = htonl (op->state->my_element_count);
1119 idm->element_xor_hash = op->state->my_xor;
1120 GNUNET_MQ_notify_sent (ev,
1121 &finished_local_operations,
1122 op);
1123 GNUNET_MQ_send (op->mq,
1124 ev);
1125}
1126
1127
1128/**
1129 * Send all elements in the full result iterator.
1130 *
1131 * @param cls the `struct Operation *`
1132 */
1133static void
1134send_remaining_elements (void *cls)
1135{
1136 struct Operation *op = cls;
1137 const void *nxt;
1138 const struct ElementEntry *ee;
1139 struct GNUNET_MQ_Envelope *ev;
1140 struct GNUNET_SET_ResultMessage *rm;
1141 const struct GNUNET_SET_Element *element;
1142 int res;
1143
1144 res = GNUNET_CONTAINER_multihashmap_iterator_next (
1145 op->state->full_result_iter,
1146 NULL,
1147 &nxt);
1148 if (GNUNET_NO == res)
1149 {
1150 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1151 "Sending done and destroy because iterator ran out\n");
1152 GNUNET_CONTAINER_multihashmap_iterator_destroy (
1153 op->state->full_result_iter);
1154 op->state->full_result_iter = NULL;
1155 if (PHASE_DONE_RECEIVED == op->state->phase)
1156 {
1157 op->state->phase = PHASE_FINISHED;
1158 send_client_done_and_destroy (op);
1159 }
1160 else if (PHASE_MUST_SEND_DONE == op->state->phase)
1161 {
1162 send_p2p_done (op);
1163 }
1164 else
1165 {
1166 GNUNET_assert (0);
1167 }
1168 return;
1169 }
1170 ee = nxt;
1171 element = &ee->element;
1172 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1173 "Sending element %s:%u to client (full set)\n",
1174 GNUNET_h2s (&ee->element_hash),
1175 element->size);
1176 GNUNET_assert (0 != op->client_request_id);
1177 ev = GNUNET_MQ_msg_extra (rm,
1178 element->size,
1179 GNUNET_MESSAGE_TYPE_SET_RESULT);
1180 GNUNET_assert (NULL != ev);
1181 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1182 rm->request_id = htonl (op->client_request_id);
1183 rm->element_type = element->element_type;
1184 GNUNET_memcpy (&rm[1],
1185 element->data,
1186 element->size);
1187 GNUNET_MQ_notify_sent (ev,
1188 &send_remaining_elements,
1189 op);
1190 GNUNET_MQ_send (op->set->cs->mq,
1191 ev);
1192}
1193
1194
1195/**
1196 * Fills the "my_elements" hashmap with the initial set of
1197 * (non-deleted) elements from the set of the specification.
1198 *
1199 * @param cls closure with the `struct Operation *`
1200 * @param key current key code for the element
1201 * @param value value in the hash map with the `struct ElementEntry *`
1202 * @return #GNUNET_YES (we should continue to iterate)
1203 */
1204static int
1205initialize_map_unfiltered (void *cls,
1206 const struct GNUNET_HashCode *key,
1207 void *value)
1208{
1209 struct ElementEntry *ee = value;
1210 struct Operation *op = cls;
1211
1212 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1213 return GNUNET_YES; /* element not live in operation's generation */
1214 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
1215 &ee->element_hash,
1216 &op->state->my_xor);
1217 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1218 "Initial full initialization of my_elements, adding %s:%u\n",
1219 GNUNET_h2s (&ee->element_hash),
1220 ee->element.size);
1221 GNUNET_break (GNUNET_YES ==
1222 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
1223 &ee->element_hash,
1224 ee,
1225 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1226 return GNUNET_YES;
1227}
1228
1229
1230/**
1231 * Send our element count to the peer, in case our element count is
1232 * lower than theirs.
1233 *
1234 * @param op intersection operation
1235 */
1236static void
1237send_element_count (struct Operation *op)
1238{
1239 struct GNUNET_MQ_Envelope *ev;
1240 struct IntersectionElementInfoMessage *msg;
1241
1242 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1243 "Sending our element count (%u)\n",
1244 op->state->my_element_count);
1245 ev = GNUNET_MQ_msg (msg,
1246 GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
1247 msg->sender_element_count = htonl (op->state->my_element_count);
1248 GNUNET_MQ_send (op->mq, ev);
1249}
1250
1251
1252/**
1253 * We go first, initialize our map with all elements and
1254 * send the first Bloom filter.
1255 *
1256 * @param op operation to start exchange for
1257 */
1258static void
1259begin_bf_exchange (struct Operation *op)
1260{
1261 op->state->phase = PHASE_BF_EXCHANGE;
1262 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1263 &initialize_map_unfiltered,
1264 op);
1265 send_bloomfilter (op);
1266}
1267
1268
1269/**
1270 * Handle the initial `struct IntersectionElementInfoMessage` from a
1271 * remote peer.
1272 *
1273 * @param cls the intersection operation
1274 * @param mh the header of the message
1275 */
1276void
1277handle_intersection_p2p_element_info (void *cls,
1278 const struct
1279 IntersectionElementInfoMessage *msg)
1280{
1281 struct Operation *op = cls;
1282
1283 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
1284 {
1285 GNUNET_break_op (0);
1286 fail_intersection_operation (op);
1287 return;
1288 }
1289 op->remote_element_count = ntohl (msg->sender_element_count);
1290 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1291 "Received remote element count (%u), I have %u\n",
1292 op->remote_element_count,
1293 op->state->my_element_count);
1294 if (((PHASE_INITIAL != op->state->phase) &&
1295 (PHASE_COUNT_SENT != op->state->phase)) ||
1296 (op->state->my_element_count > op->remote_element_count) ||
1297 (0 == op->state->my_element_count) ||
1298 (0 == op->remote_element_count))
1299 {
1300 GNUNET_break_op (0);
1301 fail_intersection_operation (op);
1302 return;
1303 }
1304 GNUNET_break (NULL == op->state->remote_bf);
1305 begin_bf_exchange (op);
1306 GNUNET_CADET_receive_done (op->channel);
1307}
1308
1309
1310/**
1311 * Process a Bloomfilter once we got all the chunks.
1312 *
1313 * @param op the intersection operation
1314 */
1315static void
1316process_bf (struct Operation *op)
1317{
1318 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1319 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
1320 op->state->phase,
1321 op->remote_element_count,
1322 op->state->my_element_count,
1323 GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
1324 switch (op->state->phase)
1325 {
1326 case PHASE_INITIAL:
1327 GNUNET_break_op (0);
1328 fail_intersection_operation (op);
1329 return;
1330
1331 case PHASE_COUNT_SENT:
1332 /* This is the first BF being sent, build our initial map with
1333 filtering in place */
1334 op->state->my_element_count = 0;
1335 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1336 &filtered_map_initialization,
1337 op);
1338 break;
1339
1340 case PHASE_BF_EXCHANGE:
1341 /* Update our set by reduction */
1342 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
1343 &iterator_bf_reduce,
1344 op);
1345 break;
1346
1347 case PHASE_MUST_SEND_DONE:
1348 GNUNET_break_op (0);
1349 fail_intersection_operation (op);
1350 return;
1351
1352 case PHASE_DONE_RECEIVED:
1353 GNUNET_break_op (0);
1354 fail_intersection_operation (op);
1355 return;
1356
1357 case PHASE_FINISHED:
1358 GNUNET_break_op (0);
1359 fail_intersection_operation (op);
1360 return;
1361 }
1362 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1363 op->state->remote_bf = NULL;
1364
1365 if ((0 == op->state->my_element_count) || /* fully disjoint */
1366 ((op->state->my_element_count == op->remote_element_count) &&
1367 (0 == GNUNET_memcmp (&op->state->my_xor,
1368 &op->state->other_xor))))
1369 {
1370 /* we are done */
1371 op->state->phase = PHASE_MUST_SEND_DONE;
1372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373 "Intersection succeeded, sending DONE to other peer\n");
1374 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1375 op->state->local_bf = NULL;
1376 if (GNUNET_SET_RESULT_FULL == op->result_mode)
1377 {
1378 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1379 "Sending full result set (%u elements)\n",
1380 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
1381 op->state->full_result_iter
1382 = GNUNET_CONTAINER_multihashmap_iterator_create (
1383 op->state->my_elements);
1384 send_remaining_elements (op);
1385 return;
1386 }
1387 send_p2p_done (op);
1388 return;
1389 }
1390 op->state->phase = PHASE_BF_EXCHANGE;
1391 send_bloomfilter (op);
1392}
1393
1394
1395/**
1396 * Check an BF message from a remote peer.
1397 *
1398 * @param cls the intersection operation
1399 * @param msg the header of the message
1400 * @return #GNUNET_OK if @a msg is well-formed
1401 */
1402static int
1403check_intersection_p2p_bf (void *cls,
1404 const struct BFMessage *msg)
1405{
1406 struct Operation *op = cls;
1407
1408 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
1409 {
1410 GNUNET_break_op (0);
1411 return GNUNET_SYSERR;
1412 }
1413 return GNUNET_OK;
1414}
1415
1416
1417/**
1418 * Handle an BF message from a remote peer.
1419 *
1420 * @param cls the intersection operation
1421 * @param msg the header of the message
1422 */
1423static
1424handle_intersection_p2p_bf (void *cls,
1425 const struct BFMessage *msg)
1426{
1427 struct Operation *op = cls;
1428 uint32_t bf_size;
1429 uint32_t chunk_size;
1430 uint32_t bf_bits_per_element;
1431
1432 switch (op->state->phase)
1433 {
1434 case PHASE_INITIAL:
1435 GNUNET_break_op (0);
1436 fail_intersection_operation (op);
1437 return;
1438
1439 case PHASE_COUNT_SENT:
1440 case PHASE_BF_EXCHANGE:
1441 bf_size = ntohl (msg->bloomfilter_total_length);
1442 bf_bits_per_element = ntohl (msg->bits_per_element);
1443 chunk_size = htons (msg->header.size) - sizeof(struct BFMessage);
1444 op->state->other_xor = msg->element_xor_hash;
1445 if (bf_size == chunk_size)
1446 {
1447 if (NULL != op->state->bf_data)
1448 {
1449 GNUNET_break_op (0);
1450 fail_intersection_operation (op);
1451 return;
1452 }
1453 /* single part, done here immediately */
1454 op->state->remote_bf
1455 = GNUNET_CONTAINER_bloomfilter_init ((const char *) &msg[1],
1456 bf_size,
1457 bf_bits_per_element);
1458 op->state->salt = ntohl (msg->sender_mutator);
1459 op->remote_element_count = ntohl (msg->sender_element_count);
1460 process_bf (op);
1461 break;
1462 }
1463 /* multipart chunk */
1464 if (NULL == op->state->bf_data)
1465 {
1466 /* first chunk, initialize */
1467 op->state->bf_data = GNUNET_malloc (bf_size);
1468 op->state->bf_data_size = bf_size;
1469 op->state->bf_bits_per_element = bf_bits_per_element;
1470 op->state->bf_data_offset = 0;
1471 op->state->salt = ntohl (msg->sender_mutator);
1472 op->remote_element_count = ntohl (msg->sender_element_count);
1473 }
1474 else
1475 {
1476 /* increment */
1477 if ((op->state->bf_data_size != bf_size) ||
1478 (op->state->bf_bits_per_element != bf_bits_per_element) ||
1479 (op->state->bf_data_offset + chunk_size > bf_size) ||
1480 (op->state->salt != ntohl (msg->sender_mutator)) ||
1481 (op->remote_element_count != ntohl (msg->sender_element_count)))
1482 {
1483 GNUNET_break_op (0);
1484 fail_intersection_operation (op);
1485 return;
1486 }
1487 }
1488 GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset],
1489 (const char *) &msg[1],
1490 chunk_size);
1491 op->state->bf_data_offset += chunk_size;
1492 if (op->state->bf_data_offset == bf_size)
1493 {
1494 /* last chunk, run! */
1495 op->state->remote_bf
1496 = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
1497 bf_size,
1498 bf_bits_per_element);
1499 GNUNET_free (op->state->bf_data);
1500 op->state->bf_data = NULL;
1501 op->state->bf_data_size = 0;
1502 process_bf (op);
1503 }
1504 break;
1505
1506 default:
1507 GNUNET_break_op (0);
1508 fail_intersection_operation (op);
1509 return;
1510 }
1511 GNUNET_CADET_receive_done (op->channel);
1512}
1513
1514
1515/**
1516 * Remove all elements from our hashmap.
1517 *
1518 * @param cls closure with the `struct Operation *`
1519 * @param key current key code
1520 * @param value value in the hash map
1521 * @return #GNUNET_YES (we should continue to iterate)
1522 */
1523static int
1524filter_all (void *cls,
1525 const struct GNUNET_HashCode *key,
1526 void *value)
1527{
1528 struct Operation *op = cls;
1529 struct ElementEntry *ee = value;
1530
1531 GNUNET_break (0 < op->state->my_element_count);
1532 op->state->my_element_count--;
1533 GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
1534 &ee->element_hash,
1535 &op->state->my_xor);
1536 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1537 "Final reduction of my_elements, removing %s:%u\n",
1538 GNUNET_h2s (&ee->element_hash),
1539 ee->element.size);
1540 GNUNET_assert (GNUNET_YES ==
1541 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
1542 &ee->element_hash,
1543 ee));
1544 send_client_removed_element (op,
1545 &ee->element);
1546 return GNUNET_YES;
1547}
1548
1549
1550/**
1551 * Handle a done message from a remote peer
1552 *
1553 * @param cls the intersection operation
1554 * @param mh the message
1555 */
1556static void
1557handle_intersection_p2p_done (void *cls,
1558 const struct IntersectionDoneMessage *idm)
1559{
1560 struct Operation *op = cls;
1561
1562 if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
1563 {
1564 GNUNET_break_op (0);
1565 fail_intersection_operation (op);
1566 return;
1567 }
1568 if (PHASE_BF_EXCHANGE != op->state->phase)
1569 {
1570 /* wrong phase to conclude? FIXME: Or should we allow this
1571 if the other peer has _initially_ already an empty set? */
1572 GNUNET_break_op (0);
1573 fail_intersection_operation (op);
1574 return;
1575 }
1576 if (0 == ntohl (idm->final_element_count))
1577 {
1578 /* other peer determined empty set is the intersection,
1579 remove all elements */
1580 GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
1581 &filter_all,
1582 op);
1583 }
1584 if ((op->state->my_element_count != ntohl (idm->final_element_count)) ||
1585 (0 != GNUNET_memcmp (&op->state->my_xor,
1586 &idm->element_xor_hash)))
1587 {
1588 /* Other peer thinks we are done, but we disagree on the result! */
1589 GNUNET_break_op (0);
1590 fail_intersection_operation (op);
1591 return;
1592 }
1593 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1594 "Got IntersectionDoneMessage, have %u elements in intersection\n",
1595 op->state->my_element_count);
1596 op->state->phase = PHASE_DONE_RECEIVED;
1597 GNUNET_CADET_receive_done (op->channel);
1598
1599 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1600 if (GNUNET_SET_RESULT_FULL == op->result_mode)
1601 {
1602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1603 "Sending full result set to client (%u elements)\n",
1604 GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
1605 op->state->full_result_iter
1606 = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
1607 send_remaining_elements (op);
1608 return;
1609 }
1610 op->state->phase = PHASE_FINISHED;
1611 send_client_done_and_destroy (op);
1612}
1613
1614
1615/**
1616 * Initiate a set intersection operation with a remote peer.
1617 *
1618 * @param op operation that is created, should be initialized to
1619 * begin the evaluation
1620 * @param opaque_context message to be transmitted to the listener
1621 * to convince it to accept, may be NULL
1622 * @return operation-specific state to keep in @a op
1623 */
1624static struct OperationState *
1625intersection_evaluate (struct Operation *op,
1626 const struct GNUNET_MessageHeader *opaque_context)
1627{
1628 struct OperationState *state;
1629 struct GNUNET_MQ_Envelope *ev;
1630 struct OperationRequestMessage *msg;
1631
1632 ev = GNUNET_MQ_msg_nested_mh (msg,
1633 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1634 opaque_context);
1635 if (NULL == ev)
1636 {
1637 /* the context message is too large!? */
1638 GNUNET_break (0);
1639 return NULL;
1640 }
1641 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1642 "Initiating intersection operation evaluation\n");
1643 state = GNUNET_new (struct OperationState);
1644 /* we started the operation, thus we have to send the operation request */
1645 state->phase = PHASE_INITIAL;
1646 state->my_element_count = op->set->state->current_set_element_count;
1647 state->my_elements
1648 = GNUNET_CONTAINER_multihashmap_create (state->my_element_count,
1649 GNUNET_YES);
1650
1651 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
1652 msg->element_count = htonl (state->my_element_count);
1653 GNUNET_MQ_send (op->mq,
1654 ev);
1655 state->phase = PHASE_COUNT_SENT;
1656 if (NULL != opaque_context)
1657 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1658 "Sent op request with context message\n");
1659 else
1660 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1661 "Sent op request without context message\n");
1662 return state;
1663}
1664
1665
1666/**
1667 * Accept an intersection operation request from a remote peer. Only
1668 * initializes the private operation state.
1669 *
1670 * @param op operation that will be accepted as an intersection operation
1671 */
1672static struct OperationState *
1673intersection_accept (struct Operation *op)
1674{
1675 struct OperationState *state;
1676
1677 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1678 "Accepting set intersection operation\n");
1679 state = GNUNET_new (struct OperationState);
1680 state->phase = PHASE_INITIAL;
1681 state->my_element_count
1682 = op->set->state->current_set_element_count;
1683 state->my_elements
1684 = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count,
1685 op->remote_element_count),
1686 GNUNET_YES);
1687 op->state = state;
1688 if (op->remote_element_count < state->my_element_count)
1689 {
1690 /* If the other peer (Alice) has fewer elements than us (Bob),
1691 we just send the count as Alice should send the first BF */
1692 send_element_count (op);
1693 state->phase = PHASE_COUNT_SENT;
1694 return state;
1695 }
1696 /* We have fewer elements, so we start with the BF */
1697 begin_bf_exchange (op);
1698 return state;
1699}
1700
1701
1702/**
1703 * Destroy the intersection operation. Only things specific to the
1704 * intersection operation are destroyed.
1705 *
1706 * @param op intersection operation to destroy
1707 */
1708static void
1709intersection_op_cancel (struct Operation *op)
1710{
1711 /* check if the op was canceled twice */
1712 GNUNET_assert (NULL != op->state);
1713 if (NULL != op->state->remote_bf)
1714 {
1715 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
1716 op->state->remote_bf = NULL;
1717 }
1718 if (NULL != op->state->local_bf)
1719 {
1720 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
1721 op->state->local_bf = NULL;
1722 }
1723 if (NULL != op->state->my_elements)
1724 {
1725 GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
1726 op->state->my_elements = NULL;
1727 }
1728 if (NULL != op->state->full_result_iter)
1729 {
1730 GNUNET_CONTAINER_multihashmap_iterator_destroy (
1731 op->state->full_result_iter);
1732 op->state->full_result_iter = NULL;
1733 }
1734 GNUNET_free (op->state);
1735 op->state = NULL;
1736 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1737 "Destroying intersection op state done\n");
1738}
1739
1740
1741/**
1742 * Create a new set supporting the intersection operation.
1743 *
1744 * @return the newly created set
1745 */
1746static struct SetState *
1747intersection_set_create ()
1748{
1749 struct SetState *set_state;
1750
1751 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1752 "Intersection set created\n");
1753 set_state = GNUNET_new (struct SetState);
1754 set_state->current_set_element_count = 0;
1755
1756 return set_state;
1757}
1758
1759
1760/**
1761 * Add the element from the given element message to the set.
1762 *
1763 * @param set_state state of the set want to add to
1764 * @param ee the element to add to the set
1765 */
1766static void
1767intersection_add (struct SetState *set_state,
1768 struct ElementEntry *ee)
1769{
1770 set_state->current_set_element_count++;
1771}
1772
1773
1774/**
1775 * Destroy a set that supports the intersection operation
1776 *
1777 * @param set_state the set to destroy
1778 */
1779static void
1780intersection_set_destroy (struct SetState *set_state)
1781{
1782 GNUNET_free (set_state);
1783}
1784
1785
1786/**
1787 * Remove the element given in the element message from the set.
1788 *
1789 * @param set_state state of the set to remove from
1790 * @param element set element to remove
1791 */
1792static void
1793intersection_remove (struct SetState *set_state,
1794 struct ElementEntry *element)
1795{
1796 GNUNET_assert (0 < set_state->current_set_element_count);
1797 set_state->current_set_element_count--;
1798}
1799
1800
1801/**
1802 * Callback for channel death for the intersection operation.
1803 *
1804 * @param op operation that lost the channel
1805 */
1806static void
1807intersection_channel_death (struct Operation *op)
1808{
1809 if (GNUNET_YES == op->state->channel_death_expected)
1810 {
1811 /* oh goodie, we are done! */
1812 send_client_done_and_destroy (op);
1813 }
1814 else
1815 {
1816 /* sorry, channel went down early, too bad. */
1817 _GSS_operation_destroy (op,
1818 GNUNET_YES);
1819 }
1820}
1821
1822
1823/**
1824 * Get the incoming socket associated with the given id.
1825 *
1826 * @param listener the listener to look in
1827 * @param id id to look for
1828 * @return the incoming socket associated with the id,
1829 * or NULL if there is none
1830 */
1831static struct Operation *
1832get_incoming (uint32_t id)
1833{
1834 for (struct Listener *listener = listener_head; NULL != listener;
1835 listener = listener->next)
1836 {
1837 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
1838 if (op->suggest_id == id)
1839 return op;
1840 }
1841 return NULL;
1842}
1843
1844
1845/**
1846 * Destroy an incoming request from a remote peer
1847 *
1848 * @param op remote request to destroy
1849 */
1850static void
1851incoming_destroy (struct Operation *op)
1852{
1853 struct Listener *listener;
1854
1855 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1856 "Destroying incoming operation %p\n",
1857 op);
1858 if (NULL != (listener = op->listener))
1859 {
1860 GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
1861 op->listener = NULL;
1862 }
1863 if (NULL != op->timeout_task)
1864 {
1865 GNUNET_SCHEDULER_cancel (op->timeout_task);
1866 op->timeout_task = NULL;
1867 }
1868 _GSS_operation_destroy2 (op);
1869}
1870
1871
1872/**
1873 * Context for the #garbage_collect_cb().
1874 */
1875struct GarbageContext
1876{
1877 /**
1878 * Map for which we are garbage collecting removed elements.
1879 */
1880 struct GNUNET_CONTAINER_MultiHashMap *map;
1881
1882 /**
1883 * Lowest generation for which an operation is still pending.
1884 */
1885 unsigned int min_op_generation;
1886
1887 /**
1888 * Largest generation for which an operation is still pending.
1889 */
1890 unsigned int max_op_generation;
1891};
1892
1893
1894/**
1895 * Function invoked to check if an element can be removed from
1896 * the set's history because it is no longer needed.
1897 *
1898 * @param cls the `struct GarbageContext *`
1899 * @param key key of the element in the map
1900 * @param value the `struct ElementEntry *`
1901 * @return #GNUNET_OK (continue to iterate)
1902 */
1903static int
1904garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value)
1905{
1906 // struct GarbageContext *gc = cls;
1907 // struct ElementEntry *ee = value;
1908
1909 // if (GNUNET_YES != ee->removed)
1910 // return GNUNET_OK;
1911 // if ( (gc->max_op_generation < ee->generation_added) ||
1912 // (ee->generation_removed > gc->min_op_generation) )
1913 // {
1914 // GNUNET_assert (GNUNET_YES ==
1915 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
1916 // key,
1917 // ee));
1918 // GNUNET_free (ee);
1919 // }
1920 return GNUNET_OK;
1921}
1922
1923
1924/**
1925 * Collect and destroy elements that are not needed anymore, because
1926 * their lifetime (as determined by their generation) does not overlap
1927 * with any active set operation.
1928 *
1929 * @param set set to garbage collect
1930 */
1931static void
1932collect_generation_garbage (struct Set *set)
1933{
1934 struct GarbageContext gc;
1935
1936 gc.min_op_generation = UINT_MAX;
1937 gc.max_op_generation = 0;
1938 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
1939 {
1940 gc.min_op_generation =
1941 GNUNET_MIN (gc.min_op_generation, op->generation_created);
1942 gc.max_op_generation =
1943 GNUNET_MAX (gc.max_op_generation, op->generation_created);
1944 }
1945 gc.map = set->content->elements;
1946 GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
1947 &garbage_collect_cb,
1948 &gc);
1949}
1950
1951
1952/**
1953 * Is @a generation in the range of exclusions?
1954 *
1955 * @param generation generation to query
1956 * @param excluded array of generations where the element is excluded
1957 * @param excluded_size length of the @a excluded array
1958 * @return #GNUNET_YES if @a generation is in any of the ranges
1959 */
1960static int
1961is_excluded_generation (unsigned int generation,
1962 struct GenerationRange *excluded,
1963 unsigned int excluded_size)
1964{
1965 for (unsigned int i = 0; i < excluded_size; i++)
1966 if ((generation >= excluded[i].start) && (generation < excluded[i].end))
1967 return GNUNET_YES;
1968 return GNUNET_NO;
1969}
1970
1971
1972/**
1973 * Is element @a ee part of the set during @a query_generation?
1974 *
1975 * @param ee element to test
1976 * @param query_generation generation to query
1977 * @param excluded array of generations where the element is excluded
1978 * @param excluded_size length of the @a excluded array
1979 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
1980 */
1981static int
1982is_element_of_generation (struct ElementEntry *ee,
1983 unsigned int query_generation,
1984 struct GenerationRange *excluded,
1985 unsigned int excluded_size)
1986{
1987 struct MutationEvent *mut;
1988 int is_present;
1989
1990 GNUNET_assert (NULL != ee->mutations);
1991 if (GNUNET_YES ==
1992 is_excluded_generation (query_generation, excluded, excluded_size))
1993 {
1994 GNUNET_break (0);
1995 return GNUNET_NO;
1996 }
1997
1998 is_present = GNUNET_NO;
1999
2000 /* Could be made faster with binary search, but lists
2001 are small, so why bother. */
2002 for (unsigned int i = 0; i < ee->mutations_size; i++)
2003 {
2004 mut = &ee->mutations[i];
2005
2006 if (mut->generation > query_generation)
2007 {
2008 /* The mutation doesn't apply to our generation
2009 anymore. We can'b break here, since mutations aren't
2010 sorted by generation. */
2011 continue;
2012 }
2013
2014 if (GNUNET_YES ==
2015 is_excluded_generation (mut->generation, excluded, excluded_size))
2016 {
2017 /* The generation is excluded (because it belongs to another
2018 fork via a lazy copy) and thus mutations aren't considered
2019 for membership testing. */
2020 continue;
2021 }
2022
2023 /* This would be an inconsistency in how we manage mutations. */
2024 if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added))
2025 GNUNET_assert (0);
2026 /* Likewise. */
2027 if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added))
2028 GNUNET_assert (0);
2029
2030 is_present = mut->added;
2031 }
2032
2033 return is_present;
2034}
2035
2036
2037/**
2038 * Is element @a ee part of the set used by @a op?
2039 *
2040 * @param ee element to test
2041 * @param op operation the defines the set and its generation
2042 * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
2043 */
2044int
2045_GSS_is_element_of_operation (struct ElementEntry *ee, struct Operation *op)
2046{
2047 return is_element_of_generation (ee,
2048 op->generation_created,
2049 op->set->excluded_generations,
2050 op->set->excluded_generations_size);
2051}
2052
2053
2054/**
2055 * Destroy the given operation. Used for any operation where both
2056 * peers were known and that thus actually had a vt and channel. Must
2057 * not be used for operations where 'listener' is still set and we do
2058 * not know the other peer.
2059 *
2060 * Call the implementation-specific cancel function of the operation.
2061 * Disconnects from the remote peer. Does not disconnect the client,
2062 * as there may be multiple operations per set.
2063 *
2064 * @param op operation to destroy
2065 * @param gc #GNUNET_YES to perform garbage collection on the set
2066 */
2067void
2068_GSS_operation_destroy (struct Operation *op, int gc)
2069{
2070 struct Set *set = op->set;
2071 struct GNUNET_CADET_Channel *channel;
2072
2073 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
2074 GNUNET_assert (NULL == op->listener);
2075 if (NULL != op->state)
2076 {
2077 intersection_cancel (op); // FIXME: inline
2078 op->state = NULL;
2079 }
2080 if (NULL != set)
2081 {
2082 GNUNET_CONTAINER_DLL_remove (set->ops_head, set->ops_tail, op);
2083 op->set = NULL;
2084 }
2085 if (NULL != op->context_msg)
2086 {
2087 GNUNET_free (op->context_msg);
2088 op->context_msg = NULL;
2089 }
2090 if (NULL != (channel = op->channel))
2091 {
2092 /* This will free op; called conditionally as this helper function
2093 is also called from within the channel disconnect handler. */
2094 op->channel = NULL;
2095 GNUNET_CADET_channel_destroy (channel);
2096 }
2097 if ((NULL != set) && (GNUNET_YES == gc))
2098 collect_generation_garbage (set);
2099 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
2100 * there was a channel end handler that will free 'op' on the call stack. */
2101}
2102
2103
2104/**
2105 * Callback called when a client connects to the service.
2106 *
2107 * @param cls closure for the service
2108 * @param c the new client that connected to the service
2109 * @param mq the message queue used to send messages to the client
2110 * @return @a `struct ClientState`
2111 */
2112static void *
2113client_connect_cb (void *cls,
2114 struct GNUNET_SERVICE_Client *c,
2115 struct GNUNET_MQ_Handle *mq)
2116{
2117 struct ClientState *cs;
2118
2119 num_clients++;
2120 cs = GNUNET_new (struct ClientState);
2121 cs->client = c;
2122 cs->mq = mq;
2123 return cs;
2124}
2125
2126
2127/**
2128 * Iterator over hash map entries to free element entries.
2129 *
2130 * @param cls closure
2131 * @param key current key code
2132 * @param value a `struct ElementEntry *` to be free'd
2133 * @return #GNUNET_YES (continue to iterate)
2134 */
2135static int
2136destroy_elements_iterator (void *cls,
2137 const struct GNUNET_HashCode *key,
2138 void *value)
2139{
2140 struct ElementEntry *ee = value;
2141
2142 GNUNET_free (ee->mutations);
2143 GNUNET_free (ee);
2144 return GNUNET_YES;
2145}
2146
2147
2148/**
2149 * Clean up after a client has disconnected
2150 *
2151 * @param cls closure, unused
2152 * @param client the client to clean up after
2153 * @param internal_cls the `struct ClientState`
2154 */
2155static void
2156client_disconnect_cb (void *cls,
2157 struct GNUNET_SERVICE_Client *client,
2158 void *internal_cls)
2159{
2160 struct ClientState *cs = internal_cls;
2161 struct Operation *op;
2162 struct Listener *listener;
2163 struct Set *set;
2164
2165 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
2166 if (NULL != (set = cs->set))
2167 {
2168 struct SetContent *content = set->content;
2169 struct PendingMutation *pm;
2170 struct PendingMutation *pm_current;
2171
2172 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
2173 /* Destroy pending set operations */
2174 while (NULL != set->ops_head)
2175 _GSS_operation_destroy (set->ops_head, GNUNET_NO);
2176
2177 /* Destroy operation-specific state */
2178 GNUNET_assert (NULL != set->state);
2179 intersection_set_destroy (set->state); // FIXME: inline
2180 set->state = NULL;
2181
2182 /* Clean up ongoing iterations */
2183 if (NULL != set->iter)
2184 {
2185 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
2186 set->iter = NULL;
2187 set->iteration_id++;
2188 }
2189
2190 /* discard any pending mutations that reference this set */
2191 pm = content->pending_mutations_head;
2192 while (NULL != pm)
2193 {
2194 pm_current = pm;
2195 pm = pm->next;
2196 if (pm_current->set == set)
2197 {
2198 GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
2199 content->pending_mutations_tail,
2200 pm_current);
2201 GNUNET_free (pm_current);
2202 }
2203 }
2204
2205 /* free set content (or at least decrement RC) */
2206 set->content = NULL;
2207 GNUNET_assert (0 != content->refcount);
2208 content->refcount--;
2209 if (0 == content->refcount)
2210 {
2211 GNUNET_assert (NULL != content->elements);
2212 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
2213 &destroy_elements_iterator,
2214 NULL);
2215 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
2216 content->elements = NULL;
2217 GNUNET_free (content);
2218 }
2219 GNUNET_free (set->excluded_generations);
2220 set->excluded_generations = NULL;
2221
2222 GNUNET_free (set);
2223 }
2224
2225 if (NULL != (listener = cs->listener))
2226 {
2227 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
2228 GNUNET_CADET_close_port (listener->open_port);
2229 listener->open_port = NULL;
2230 while (NULL != (op = listener->op_head))
2231 {
2232 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2233 "Destroying incoming operation `%u' from peer `%s'\n",
2234 (unsigned int) op->client_request_id,
2235 GNUNET_i2s (&op->peer));
2236 incoming_destroy (op);
2237 }
2238 GNUNET_CONTAINER_DLL_remove (listener_head, listener_tail, listener);
2239 GNUNET_free (listener);
2240 }
2241 GNUNET_free (cs);
2242 num_clients--;
2243 if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
2244 {
2245 if (NULL != cadet)
2246 {
2247 GNUNET_CADET_disconnect (cadet);
2248 cadet = NULL;
2249 }
2250 }
2251}
2252
2253
2254/**
2255 * Check a request for a set operation from another peer.
2256 *
2257 * @param cls the operation state
2258 * @param msg the received message
2259 * @return #GNUNET_OK if the channel should be kept alive,
2260 * #GNUNET_SYSERR to destroy the channel
2261 */
2262static int
2263check_incoming_msg (void *cls, const struct OperationRequestMessage *msg)
2264{
2265 struct Operation *op = cls;
2266 struct Listener *listener = op->listener;
2267 const struct GNUNET_MessageHeader *nested_context;
2268
2269 /* double operation request */
2270 if (0 != op->suggest_id)
2271 {
2272 GNUNET_break_op (0);
2273 return GNUNET_SYSERR;
2274 }
2275 /* This should be equivalent to the previous condition, but can't hurt to check twice */
2276 if (NULL == op->listener)
2277 {
2278 GNUNET_break (0);
2279 return GNUNET_SYSERR;
2280 }
2281 if (listener->operation !=
2282 (enum GNUNET_SET_OperationType) ntohl (msg->operation))
2283 {
2284 GNUNET_break_op (0);
2285 return GNUNET_SYSERR;
2286 }
2287 nested_context = GNUNET_MQ_extract_nested_mh (msg);
2288 if ((NULL != nested_context) &&
2289 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE))
2290 {
2291 GNUNET_break_op (0);
2292 return GNUNET_SYSERR;
2293 }
2294 return GNUNET_OK;
2295}
2296
2297
2298/**
2299 * Handle a request for a set operation from another peer. Checks if we
2300 * have a listener waiting for such a request (and in that case initiates
2301 * asking the listener about accepting the connection). If no listener
2302 * is waiting, we queue the operation request in hope that a listener
2303 * shows up soon (before timeout).
2304 *
2305 * This msg is expected as the first and only msg handled through the
2306 * non-operation bound virtual table, acceptance of this operation replaces
2307 * our virtual table and subsequent msgs would be routed differently (as
2308 * we then know what type of operation this is).
2309 *
2310 * @param cls the operation state
2311 * @param msg the received message
2312 * @return #GNUNET_OK if the channel should be kept alive,
2313 * #GNUNET_SYSERR to destroy the channel
2314 */
2315static void
2316handle_incoming_msg (void *cls, const struct OperationRequestMessage *msg)
2317{
2318 struct Operation *op = cls;
2319 struct Listener *listener = op->listener;
2320 const struct GNUNET_MessageHeader *nested_context;
2321 struct GNUNET_MQ_Envelope *env;
2322 struct GNUNET_SET_RequestMessage *cmsg;
2323
2324 nested_context = GNUNET_MQ_extract_nested_mh (msg);
2325 /* Make a copy of the nested_context (application-specific context
2326 information that is opaque to set) so we can pass it to the
2327 listener later on */
2328 if (NULL != nested_context)
2329 op->context_msg = GNUNET_copy_message (nested_context);
2330 op->remote_element_count = ntohl (msg->element_count);
2331 GNUNET_log (
2332 GNUNET_ERROR_TYPE_DEBUG,
2333 "Received P2P operation request (op %u, port %s) for active listener\n",
2334 (uint32_t) ntohl (msg->operation),
2335 GNUNET_h2s (&op->listener->app_id));
2336 GNUNET_assert (0 == op->suggest_id);
2337 if (0 == suggest_id)
2338 suggest_id++;
2339 op->suggest_id = suggest_id++;
2340 GNUNET_assert (NULL != op->timeout_task);
2341 GNUNET_SCHEDULER_cancel (op->timeout_task);
2342 op->timeout_task = NULL;
2343 env = GNUNET_MQ_msg_nested_mh (cmsg,
2344 GNUNET_MESSAGE_TYPE_SETI_REQUEST,
2345 op->context_msg);
2346 GNUNET_log (
2347 GNUNET_ERROR_TYPE_DEBUG,
2348 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
2349 op->suggest_id,
2350 listener,
2351 listener->cs);
2352 cmsg->accept_id = htonl (op->suggest_id);
2353 cmsg->peer_id = op->peer;
2354 GNUNET_MQ_send (listener->cs->mq, env);
2355 /* NOTE: GNUNET_CADET_receive_done() will be called in
2356 #handle_client_accept() */
2357}
2358
2359
2360/**
2361 * Add an element to @a set as specified by @a msg
2362 *
2363 * @param set set to manipulate
2364 * @param msg message specifying the change
2365 */
2366static void
2367execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
2368{
2369 struct GNUNET_SET_Element el;
2370 struct ElementEntry *ee;
2371 struct GNUNET_HashCode hash;
2372
2373 GNUNET_assert (GNUNET_MESSAGE_TYPE_SETI_ADD == ntohs (msg->header.type));
2374 el.size = ntohs (msg->header.size) - sizeof(*msg);
2375 el.data = &msg[1];
2376 el.element_type = ntohs (msg->element_type);
2377 GNUNET_SET_element_hash (&el, &hash);
2378 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements, &hash);
2379 if (NULL == ee)
2380 {
2381 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2382 "Client inserts element %s of size %u\n",
2383 GNUNET_h2s (&hash),
2384 el.size);
2385 ee = GNUNET_malloc (el.size + sizeof(*ee));
2386 ee->element.size = el.size;
2387 GNUNET_memcpy (&ee[1], el.data, el.size);
2388 ee->element.data = &ee[1];
2389 ee->element.element_type = el.element_type;
2390 ee->remote = GNUNET_NO;
2391 ee->mutations = NULL;
2392 ee->mutations_size = 0;
2393 ee->element_hash = hash;
2394 GNUNET_break (GNUNET_YES ==
2395 GNUNET_CONTAINER_multihashmap_put (
2396 set->content->elements,
2397 &ee->element_hash,
2398 ee,
2399 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2400 }
2401 else if (GNUNET_YES ==
2402 is_element_of_generation (ee,
2403 set->current_generation,
2404 set->excluded_generations,
2405 set->excluded_generations_size))
2406 {
2407 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2408 "Client inserted element %s of size %u twice (ignored)\n",
2409 GNUNET_h2s (&hash),
2410 el.size);
2411
2412 /* same element inserted twice */
2413 return;
2414 }
2415
2416 {
2417 struct MutationEvent mut = { .generation = set->current_generation,
2418 .added = GNUNET_YES };
2419 GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
2420 }
2421 // FIXME: inline
2422 intersection_add (set->state,
2423 ee);
2424}
2425
2426
2427/**
2428 * Perform a mutation on a set as specified by the @a msg
2429 *
2430 * @param set the set to mutate
2431 * @param msg specification of what to change
2432 */
2433static void
2434execute_mutation (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
2435{
2436 switch (ntohs (msg->header.type))
2437 {
2438 case GNUNET_MESSAGE_TYPE_SETI_ADD: // FIXME: inline!
2439 execute_add (set, msg);
2440 break;
2441 default:
2442 GNUNET_break (0);
2443 }
2444}
2445
2446
2447/**
2448 * Execute mutations that were delayed on a set because of
2449 * pending operations.
2450 *
2451 * @param set the set to execute mutations on
2452 */
2453static void
2454execute_delayed_mutations (struct Set *set)
2455{
2456 struct PendingMutation *pm;
2457
2458 if (0 != set->content->iterator_count)
2459 return; /* still cannot do this */
2460 while (NULL != (pm = set->content->pending_mutations_head))
2461 {
2462 GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
2463 set->content->pending_mutations_tail,
2464 pm);
2465 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2466 "Executing pending mutation on %p.\n",
2467 pm->set);
2468 execute_mutation (pm->set, pm->msg);
2469 GNUNET_free (pm->msg);
2470 GNUNET_free (pm);
2471 }
2472}
2473
2474
2475/**
2476 * Send the next element of a set to the set's client. The next element is given by
2477 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
2478 * are no more elements in the set. The caller must ensure that the set's iterator is
2479 * valid.
2480 *
2481 * The client will acknowledge each received element with a
2482 * #GNUNET_MESSAGE_TYPE_SETI_ITER_ACK message. Our
2483 * #handle_client_iter_ack() will then trigger the next transmission.
2484 * Note that the #GNUNET_MESSAGE_TYPE_SETI_ITER_DONE is not acknowledged.
2485 *
2486 * @param set set that should send its next element to its client
2487 */
2488static void
2489send_client_element (struct Set *set)
2490{
2491 int ret;
2492 struct ElementEntry *ee;
2493 struct GNUNET_MQ_Envelope *ev;
2494 struct GNUNET_SET_IterResponseMessage *msg;
2495
2496 GNUNET_assert (NULL != set->iter);
2497 do
2498 {
2499 ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
2500 NULL,
2501 (const void **) &ee);
2502 if (GNUNET_NO == ret)
2503 {
2504 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set);
2505 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETI_ITER_DONE);
2506 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
2507 set->iter = NULL;
2508 set->iteration_id++;
2509 GNUNET_assert (set->content->iterator_count > 0);
2510 set->content->iterator_count--;
2511 execute_delayed_mutations (set);
2512 GNUNET_MQ_send (set->cs->mq, ev);
2513 return;
2514 }
2515 GNUNET_assert (NULL != ee);
2516 }
2517 while (GNUNET_NO ==
2518 is_element_of_generation (ee,
2519 set->iter_generation,
2520 set->excluded_generations,
2521 set->excluded_generations_size));
2522 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2523 "Sending iteration element on %p.\n",
2524 set);
2525 ev = GNUNET_MQ_msg_extra (msg,
2526 ee->element.size,
2527 GNUNET_MESSAGE_TYPE_SETI_ITER_ELEMENT);
2528 GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size);
2529 msg->element_type = htons (ee->element.element_type);
2530 msg->iteration_id = htons (set->iteration_id);
2531 GNUNET_MQ_send (set->cs->mq, ev);
2532}
2533
2534
2535/**
2536 * Called when a client wants to iterate the elements of a set.
2537 * Checks if we have a set associated with the client and if we
2538 * can right now start an iteration. If all checks out, starts
2539 * sending the elements of the set to the client.
2540 *
2541 * @param cls client that sent the message
2542 * @param m message sent by the client
2543 */
2544static void
2545handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m)
2546{
2547 struct ClientState *cs = cls;
2548 struct Set *set;
2549
2550 if (NULL == (set = cs->set))
2551 {
2552 /* attempt to iterate over a non existing set */
2553 GNUNET_break (0);
2554 GNUNET_SERVICE_client_drop (cs->client);
2555 return;
2556 }
2557 if (NULL != set->iter)
2558 {
2559 /* Only one concurrent iterate-action allowed per set */
2560 GNUNET_break (0);
2561 GNUNET_SERVICE_client_drop (cs->client);
2562 return;
2563 }
2564 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2565 "Iterating set %p in gen %u with %u content elements\n",
2566 (void *) set,
2567 set->current_generation,
2568 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
2569 GNUNET_SERVICE_client_continue (cs->client);
2570 set->content->iterator_count++;
2571 set->iter =
2572 GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
2573 set->iter_generation = set->current_generation;
2574 send_client_element (set);
2575}
2576
2577
2578/**
2579 * Called when a client wants to create a new set. This is typically
2580 * the first request from a client, and includes the type of set
2581 * operation to be performed.
2582 *
2583 * @param cls client that sent the message
2584 * @param m message sent by the client
2585 */
2586static void
2587handle_client_create_set (void *cls, const struct GNUNET_SET_CreateMessage *msg)
2588{
2589 struct ClientState *cs = cls;
2590 struct Set *set;
2591
2592 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2593 "Client created new set (operation %u)\n",
2594 (uint32_t) ntohl (msg->operation));
2595 if (NULL != cs->set)
2596 {
2597 /* There can only be one set per client */
2598 GNUNET_break (0);
2599 GNUNET_SERVICE_client_drop (cs->client);
2600 return;
2601 }
2602 set = GNUNET_new (struct Set);
2603 switch (ntohl (msg->operation))
2604 {
2605 case GNUNET_SET_OPERATION_INTERSECTION:
2606 set->vt = _GSS_intersection_vt ();
2607 break;
2608
2609 case GNUNET_SET_OPERATION_UNION:
2610 set->vt = _GSS_union_vt ();
2611 break;
2612
2613 default:
2614 GNUNET_free (set);
2615 GNUNET_break (0);
2616 GNUNET_SERVICE_client_drop (cs->client);
2617 return;
2618 }
2619 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
2620 set->state = intersection_set_create (); // FIXME: inline
2621 if (NULL == set->state)
2622 {
2623 /* initialization failed (i.e. out of memory) */
2624 GNUNET_free (set);
2625 GNUNET_SERVICE_client_drop (cs->client);
2626 return;
2627 }
2628 set->content = GNUNET_new (struct SetContent);
2629 set->content->refcount = 1;
2630 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
2631 set->cs = cs;
2632 cs->set = set;
2633 GNUNET_SERVICE_client_continue (cs->client);
2634}
2635
2636
2637/**
2638 * Timeout happens iff:
2639 * - we suggested an operation to our listener,
2640 * but did not receive a response in time
2641 * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST
2642 *
2643 * @param cls channel context
2644 * @param tc context information (why was this task triggered now)
2645 */
2646static void
2647incoming_timeout_cb (void *cls)
2648{
2649 struct Operation *op = cls;
2650
2651 op->timeout_task = NULL;
2652 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2653 "Remote peer's incoming request timed out\n");
2654 incoming_destroy (op);
2655}
2656
2657
2658/**
2659 * Method called whenever another peer has added us to a channel the
2660 * other peer initiated. Only called (once) upon reception of data
2661 * from a channel we listen on.
2662 *
2663 * The channel context represents the operation itself and gets added
2664 * to a DLL, from where it gets looked up when our local listener
2665 * client responds to a proposed/suggested operation or connects and
2666 * associates with this operation.
2667 *
2668 * @param cls closure
2669 * @param channel new handle to the channel
2670 * @param source peer that started the channel
2671 * @return initial channel context for the channel
2672 * returns NULL on error
2673 */
2674static void *
2675channel_new_cb (void *cls,
2676 struct GNUNET_CADET_Channel *channel,
2677 const struct GNUNET_PeerIdentity *source)
2678{
2679 struct Listener *listener = cls;
2680 struct Operation *op;
2681
2682 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n");
2683 op = GNUNET_new (struct Operation);
2684 op->listener = listener;
2685 op->peer = *source;
2686 op->channel = channel;
2687 op->mq = GNUNET_CADET_get_mq (op->channel);
2688 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
2689 op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
2690 &incoming_timeout_cb,
2691 op);
2692 GNUNET_CONTAINER_DLL_insert (listener->op_head, listener->op_tail, op);
2693 return op;
2694}
2695
2696
2697/**
2698 * Function called whenever a channel is destroyed. Should clean up
2699 * any associated state. It must NOT call
2700 * GNUNET_CADET_channel_destroy() on the channel.
2701 *
2702 * The peer_disconnect function is part of a a virtual table set initially either
2703 * when a peer creates a new channel with us, or once we create
2704 * a new channel ourselves (evaluate).
2705 *
2706 * Once we know the exact type of operation (union/intersection), the vt is
2707 * replaced with an operation specific instance (_GSS_[op]_vt).
2708 *
2709 * @param channel_ctx place where local state associated
2710 * with the channel is stored
2711 * @param channel connection to the other end (henceforth invalid)
2712 */
2713static void
2714channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
2715{
2716 struct Operation *op = channel_ctx;
2717
2718 op->channel = NULL;
2719 _GSS_operation_destroy2 (op);
2720}
2721
2722
2723/**
2724 * This function probably should not exist
2725 * and be replaced by inlining more specific
2726 * logic in the various places where it is called.
2727 */
2728void
2729_GSS_operation_destroy2 (struct Operation *op)
2730{
2731 struct GNUNET_CADET_Channel *channel;
2732
2733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n");
2734 if (NULL != (channel = op->channel))
2735 {
2736 /* This will free op; called conditionally as this helper function
2737 is also called from within the channel disconnect handler. */
2738 op->channel = NULL;
2739 GNUNET_CADET_channel_destroy (channel);
2740 }
2741 if (NULL != op->listener)
2742 {
2743 incoming_destroy (op);
2744 return;
2745 }
2746 if (NULL != op->set)
2747 intersection_channel_death (op); // FIXME: inline
2748 else
2749 _GSS_operation_destroy (op, GNUNET_YES);
2750 GNUNET_free (op);
2751}
2752
2753
2754/**
2755 * Function called whenever an MQ-channel's transmission window size changes.
2756 *
2757 * The first callback in an outgoing channel will be with a non-zero value
2758 * and will mean the channel is connected to the destination.
2759 *
2760 * For an incoming channel it will be called immediately after the
2761 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
2762 *
2763 * @param cls Channel closure.
2764 * @param channel Connection to the other end (henceforth invalid).
2765 * @param window_size New window size. If the is more messages than buffer size
2766 * this value will be negative..
2767 */
2768static void
2769channel_window_cb (void *cls,
2770 const struct GNUNET_CADET_Channel *channel,
2771 int window_size)
2772{
2773 /* FIXME: not implemented, we could do flow control here... */
2774}
2775
2776
2777/**
2778 * Called when a client wants to create a new listener.
2779 *
2780 * @param cls client that sent the message
2781 * @param msg message sent by the client
2782 */
2783static void
2784handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg)
2785{
2786 struct ClientState *cs = cls;
2787 struct GNUNET_MQ_MessageHandler cadet_handlers[] =
2788 { GNUNET_MQ_hd_var_size (incoming_msg,
2789 GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST,
2790 struct OperationRequestMessage,
2791 NULL),
2792 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
2793 GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO,
2794 struct IntersectionElementInfoMessage,
2795 NULL),
2796 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
2797 GNUNET_MESSAGE_TYPE_SETI_P2P_BF,
2798 struct BFMessage,
2799 NULL),
2800 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
2801 GNUNET_MESSAGE_TYPE_SETI_P2P_DONE,
2802 struct IntersectionDoneMessage,
2803 NULL),
2804 GNUNET_MQ_handler_end () };
2805 struct Listener *listener;
2806
2807 if (NULL != cs->listener)
2808 {
2809 /* max. one active listener per client! */
2810 GNUNET_break (0);
2811 GNUNET_SERVICE_client_drop (cs->client);
2812 return;
2813 }
2814 listener = GNUNET_new (struct Listener);
2815 listener->cs = cs;
2816 cs->listener = listener;
2817 listener->app_id = msg->app_id;
2818 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
2819 GNUNET_CONTAINER_DLL_insert (listener_head, listener_tail, listener);
2820 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2821 "New listener created (op %u, port %s)\n",
2822 listener->operation,
2823 GNUNET_h2s (&listener->app_id));
2824 listener->open_port = GNUNET_CADET_open_port (cadet,
2825 &msg->app_id,
2826 &channel_new_cb,
2827 listener,
2828 &channel_window_cb,
2829 &channel_end_cb,
2830 cadet_handlers);
2831 GNUNET_SERVICE_client_continue (cs->client);
2832}
2833
2834
2835/**
2836 * Called when the listening client rejects an operation
2837 * request by another peer.
2838 *
2839 * @param cls client that sent the message
2840 * @param msg message sent by the client
2841 */
2842static void
2843handle_client_reject (void *cls, const struct GNUNET_SET_RejectMessage *msg)
2844{
2845 struct ClientState *cs = cls;
2846 struct Operation *op;
2847
2848 op = get_incoming (ntohl (msg->accept_reject_id));
2849 if (NULL == op)
2850 {
2851 /* no matching incoming operation for this reject;
2852 could be that the other peer already disconnected... */
2853 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2854 "Client rejected unknown operation %u\n",
2855 (unsigned int) ntohl (msg->accept_reject_id));
2856 GNUNET_SERVICE_client_continue (cs->client);
2857 return;
2858 }
2859 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2860 "Peer request (op %u, app %s) rejected by client\n",
2861 op->listener->operation,
2862 GNUNET_h2s (&cs->listener->app_id));
2863 _GSS_operation_destroy2 (op);
2864 GNUNET_SERVICE_client_continue (cs->client);
2865}
2866
2867
2868/**
2869 * Called when a client wants to add or remove an element to a set it inhabits.
2870 *
2871 * @param cls client that sent the message
2872 * @param msg message sent by the client
2873 */
2874static int
2875check_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
2876{
2877 /* NOTE: Technically, we should probably check with the
2878 block library whether the element we are given is well-formed */
2879 return GNUNET_OK;
2880}
2881
2882
2883/**
2884 * Called when a client wants to add or remove an element to a set it inhabits.
2885 *
2886 * @param cls client that sent the message
2887 * @param msg message sent by the client
2888 */
2889static void
2890handle_client_mutation (void *cls, const struct GNUNET_SET_ElementMessage *msg)
2891{
2892 struct ClientState *cs = cls;
2893 struct Set *set;
2894
2895 if (NULL == (set = cs->set))
2896 {
2897 /* client without a set requested an operation */
2898 GNUNET_break (0);
2899 GNUNET_SERVICE_client_drop (cs->client);
2900 return;
2901 }
2902 GNUNET_SERVICE_client_continue (cs->client);
2903
2904 if (0 != set->content->iterator_count)
2905 {
2906 struct PendingMutation *pm;
2907
2908 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n");
2909 pm = GNUNET_new (struct PendingMutation);
2910 pm->msg =
2911 (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
2912 pm->set = set;
2913 GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
2914 set->content->pending_mutations_tail,
2915 pm);
2916 return;
2917 }
2918 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
2919 execute_mutation (set, msg);
2920}
2921
2922
2923/**
2924 * Advance the current generation of a set,
2925 * adding exclusion ranges if necessary.
2926 *
2927 * @param set the set where we want to advance the generation
2928 */
2929static void
2930advance_generation (struct Set *set)
2931{
2932 struct GenerationRange r;
2933
2934 if (set->current_generation == set->content->latest_generation)
2935 {
2936 set->content->latest_generation++;
2937 set->current_generation++;
2938 return;
2939 }
2940
2941 GNUNET_assert (set->current_generation < set->content->latest_generation);
2942
2943 r.start = set->current_generation + 1;
2944 r.end = set->content->latest_generation + 1;
2945 set->content->latest_generation = r.end;
2946 set->current_generation = r.end;
2947 GNUNET_array_append (set->excluded_generations,
2948 set->excluded_generations_size,
2949 r);
2950}
2951
2952
2953/**
2954 * Called when a client wants to initiate a set operation with another
2955 * peer. Initiates the CADET connection to the listener and sends the
2956 * request.
2957 *
2958 * @param cls client that sent the message
2959 * @param msg message sent by the client
2960 * @return #GNUNET_OK if the message is well-formed
2961 */
2962static int
2963check_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
2964{
2965 /* FIXME: suboptimal, even if the context below could be NULL,
2966 there are malformed messages this does not check for... */
2967 return GNUNET_OK;
2968}
2969
2970
2971/**
2972 * Called when a client wants to initiate a set operation with another
2973 * peer. Initiates the CADET connection to the listener and sends the
2974 * request.
2975 *
2976 * @param cls client that sent the message
2977 * @param msg message sent by the client
2978 */
2979static void
2980handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
2981{
2982 struct ClientState *cs = cls;
2983 struct Operation *op = GNUNET_new (struct Operation);
2984 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2985 GNUNET_MQ_hd_var_size (incoming_msg,
2986 GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST,
2987 struct OperationRequestMessage,
2988 op),
2989 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
2990 GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO,
2991 struct IntersectionElementInfoMessage,
2992 op),
2993 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
2994 GNUNET_MESSAGE_TYPE_SETI_P2P_BF,
2995 struct BFMessage,
2996 op),
2997 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
2998 GNUNET_MESSAGE_TYPE_SETI_P2P_DONE,
2999 struct IntersectionDoneMessage,
3000 op),
3001 GNUNET_MQ_handler_end ()
3002 };
3003 struct Set *set;
3004 const struct GNUNET_MessageHeader *context;
3005
3006 if (NULL == (set = cs->set))
3007 {
3008 GNUNET_break (0);
3009 GNUNET_free (op);
3010 GNUNET_SERVICE_client_drop (cs->client);
3011 return;
3012 }
3013 op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
3014 op->peer = msg->target_peer;
3015 op->result_mode = ntohl (msg->result_mode);
3016 op->client_request_id = ntohl (msg->request_id);
3017 op->byzantine = msg->byzantine;
3018 op->byzantine_lower_bound = msg->byzantine_lower_bound;
3019 op->force_full = msg->force_full;
3020 op->force_delta = msg->force_delta;
3021 context = GNUNET_MQ_extract_nested_mh (msg);
3022
3023 /* Advance generation values, so that
3024 mutations won't interfer with the running operation. */
3025 op->set = set;
3026 op->generation_created = set->current_generation;
3027 advance_generation (set);
3028 GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
3029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3030 "Creating new CADET channel to port %s for set operation type %u\n",
3031 GNUNET_h2s (&msg->app_id),
3032 set->operation);
3033 op->channel = GNUNET_CADET_channel_create (cadet,
3034 op,
3035 &msg->target_peer,
3036 &msg->app_id,
3037 &channel_window_cb,
3038 &channel_end_cb,
3039 cadet_handlers);
3040 op->mq = GNUNET_CADET_get_mq (op->channel);
3041 op->state = intersection_evaluate (op, context); // FIXME: inline!
3042 if (NULL == op->state)
3043 {
3044 GNUNET_break (0);
3045 GNUNET_SERVICE_client_drop (cs->client);
3046 return;
3047 }
3048 GNUNET_SERVICE_client_continue (cs->client);
3049}
3050
3051
3052/**
3053 * Handle a request from the client to cancel a running set operation.
3054 *
3055 * @param cls the client
3056 * @param msg the message
3057 */
3058static void
3059handle_client_cancel (void *cls, const struct GNUNET_SET_CancelMessage *msg)
3060{
3061 struct ClientState *cs = cls;
3062 struct Set *set;
3063 struct Operation *op;
3064 int found;
3065
3066 if (NULL == (set = cs->set))
3067 {
3068 /* client without a set requested an operation */
3069 GNUNET_break (0);
3070 GNUNET_SERVICE_client_drop (cs->client);
3071 return;
3072 }
3073 found = GNUNET_NO;
3074 for (op = set->ops_head; NULL != op; op = op->next)
3075 {
3076 if (op->client_request_id == ntohl (msg->request_id))
3077 {
3078 found = GNUNET_YES;
3079 break;
3080 }
3081 }
3082 if (GNUNET_NO == found)
3083 {
3084 /* It may happen that the operation was already destroyed due to
3085 * the other peer disconnecting. The client may not know about this
3086 * yet and try to cancel the (just barely non-existent) operation.
3087 * So this is not a hard error.
3088 */GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3089 "Client canceled non-existent op %u\n",
3090 (uint32_t) ntohl (msg->request_id));
3091 }
3092 else
3093 {
3094 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3095 "Client requested cancel for op %u\n",
3096 (uint32_t) ntohl (msg->request_id));
3097 _GSS_operation_destroy (op, GNUNET_YES);
3098 }
3099 GNUNET_SERVICE_client_continue (cs->client);
3100}
3101
3102
3103/**
3104 * Handle a request from the client to accept a set operation that
3105 * came from a remote peer. We forward the accept to the associated
3106 * operation for handling
3107 *
3108 * @param cls the client
3109 * @param msg the message
3110 */
3111static void
3112handle_client_accept (void *cls, const struct GNUNET_SET_AcceptMessage *msg)
3113{
3114 struct ClientState *cs = cls;
3115 struct Set *set;
3116 struct Operation *op;
3117 struct GNUNET_SET_ResultMessage *result_message;
3118 struct GNUNET_MQ_Envelope *ev;
3119 struct Listener *listener;
3120
3121 if (NULL == (set = cs->set))
3122 {
3123 /* client without a set requested to accept */
3124 GNUNET_break (0);
3125 GNUNET_SERVICE_client_drop (cs->client);
3126 return;
3127 }
3128 op = get_incoming (ntohl (msg->accept_reject_id));
3129 if (NULL == op)
3130 {
3131 /* It is not an error if the set op does not exist -- it may
3132 * have been destroyed when the partner peer disconnected. */
3133 GNUNET_log (
3134 GNUNET_ERROR_TYPE_INFO,
3135 "Client %p accepted request %u of listener %p that is no longer active\n",
3136 cs,
3137 ntohl (msg->accept_reject_id),
3138 cs->listener);
3139 ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SETI_RESULT);
3140 result_message->request_id = msg->request_id;
3141 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
3142 GNUNET_MQ_send (set->cs->mq, ev);
3143 GNUNET_SERVICE_client_continue (cs->client);
3144 return;
3145 }
3146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3147 "Client accepting request %u\n",
3148 (uint32_t) ntohl (msg->accept_reject_id));
3149 listener = op->listener;
3150 op->listener = NULL;
3151 GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
3152 op->set = set;
3153 GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
3154 op->client_request_id = ntohl (msg->request_id);
3155 op->result_mode = ntohl (msg->result_mode);
3156 op->byzantine = msg->byzantine;
3157 op->byzantine_lower_bound = msg->byzantine_lower_bound;
3158 op->force_full = msg->force_full;
3159 op->force_delta = msg->force_delta;
3160
3161 /* Advance generation values, so that future mutations do not
3162 interfer with the running operation. */
3163 op->generation_created = set->current_generation;
3164 advance_generation (set);
3165 GNUNET_assert (NULL == op->state);
3166 op->state = intersection_accept (op); // FIXME: inline
3167 if (NULL == op->state)
3168 {
3169 GNUNET_break (0);
3170 GNUNET_SERVICE_client_drop (cs->client);
3171 return;
3172 }
3173 /* Now allow CADET to continue, as we did not do this in
3174 #handle_incoming_msg (as we wanted to first see if the
3175 local client would accept the request). */
3176 GNUNET_CADET_receive_done (op->channel);
3177 GNUNET_SERVICE_client_continue (cs->client);
3178}
3179
3180
3181/**
3182 * Called to clean up, after a shutdown has been requested.
3183 *
3184 * @param cls closure, NULL
3185 */
3186static void
3187shutdown_task (void *cls)
3188{
3189 /* Delay actual shutdown to allow service to disconnect clients */
3190 in_shutdown = GNUNET_YES;
3191 if (0 == num_clients)
3192 {
3193 if (NULL != cadet)
3194 {
3195 GNUNET_CADET_disconnect (cadet);
3196 cadet = NULL;
3197 }
3198 }
3199 GNUNET_STATISTICS_destroy (_GSS_statistics, GNUNET_YES);
3200 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
3201}
3202
3203
3204/**
3205 * Function called by the service's run
3206 * method to run service-specific setup code.
3207 *
3208 * @param cls closure
3209 * @param cfg configuration to use
3210 * @param service the initialized service
3211 */
3212static void
3213run (void *cls,
3214 const struct GNUNET_CONFIGURATION_Handle *cfg,
3215 struct GNUNET_SERVICE_Handle *service)
3216{
3217 /* FIXME: need to modify SERVICE (!) API to allow
3218 us to run a shutdown task *after* clients were
3219 forcefully disconnected! */
3220 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
3221 _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg);
3222 cadet = GNUNET_CADET_connect (cfg);
3223 if (NULL == cadet)
3224 {
3225 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3226 _ ("Could not connect to CADET service\n"));
3227 GNUNET_SCHEDULER_shutdown ();
3228 return;
3229 }
3230}
3231
3232
3233/**
3234 * Define "main" method using service macro.
3235 */
3236GNUNET_SERVICE_MAIN (
3237 "set",
3238 GNUNET_SERVICE_OPTION_NONE,
3239 &run,
3240 &client_connect_cb,
3241 &client_disconnect_cb,
3242 NULL,
3243 GNUNET_MQ_hd_fixed_size (client_accept,
3244 GNUNET_MESSAGE_TYPE_SETI_ACCEPT,
3245 struct GNUNET_SET_AcceptMessage,
3246 NULL),
3247 GNUNET_MQ_hd_var_size (client_mutation,
3248 GNUNET_MESSAGE_TYPE_SETI_ADD,
3249 struct GNUNET_SET_ElementMessage,
3250 NULL),
3251 GNUNET_MQ_hd_fixed_size (client_create_set,
3252 GNUNET_MESSAGE_TYPE_SETI_CREATE,
3253 struct GNUNET_SET_CreateMessage,
3254 NULL),
3255 GNUNET_MQ_hd_var_size (client_evaluate,
3256 GNUNET_MESSAGE_TYPE_SETI_EVALUATE,
3257 struct GNUNET_SET_EvaluateMessage,
3258 NULL),
3259 GNUNET_MQ_hd_fixed_size (client_listen,
3260 GNUNET_MESSAGE_TYPE_SETI_LISTEN,
3261 struct GNUNET_SET_ListenMessage,
3262 NULL),
3263 GNUNET_MQ_hd_fixed_size (client_reject,
3264 GNUNET_MESSAGE_TYPE_SETI_REJECT,
3265 struct GNUNET_SET_RejectMessage,
3266 NULL),
3267 GNUNET_MQ_hd_fixed_size (client_cancel,
3268 GNUNET_MESSAGE_TYPE_SETI_CANCEL,
3269 struct GNUNET_SET_CancelMessage,
3270 NULL),
3271 GNUNET_MQ_handler_end ());
3272
3273
3274/* end of gnunet-service-seti.c */
diff --git a/src/seti/gnunet-service-seti_protocol.h b/src/seti/gnunet-service-seti_protocol.h
new file mode 100644
index 000000000..51968376e
--- /dev/null
+++ b/src/seti/gnunet-service-seti_protocol.h
@@ -0,0 +1,144 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2013, 2014, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @author Florian Dold
22 * @author Christian Grothoff
23 * @file seti/gnunet-service-seti_protocol.h
24 * @brief Peer-to-Peer messages for gnunet set
25 */
26#ifndef SETI_PROTOCOL_H
27#define SETI_PROTOCOL_H
28
29#include "platform.h"
30#include "gnunet_common.h"
31
32
33GNUNET_NETWORK_STRUCT_BEGIN
34
35struct OperationRequestMessage
36{
37 /**
38 * Type: #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
39 */
40 struct GNUNET_MessageHeader header;
41
42 /**
43 * For Intersection: my element count
44 */
45 uint32_t element_count GNUNET_PACKED;
46
47 /**
48 * Application-specific identifier of the request.
49 */
50 struct GNUNET_HashCode app_idX;
51
52 /* rest: optional message */
53};
54
55
56/**
57 * During intersection, the first (and possibly second) message
58 * send it the number of elements in the set, to allow the peers
59 * to decide who should start with the Bloom filter.
60 */
61struct IntersectionElementInfoMessage
62{
63 /**
64 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO
65 */
66 struct GNUNET_MessageHeader header;
67
68 /**
69 * mutator used with this bloomfilter.
70 */
71 uint32_t sender_element_count GNUNET_PACKED;
72};
73
74
75/**
76 * Bloom filter messages exchanged for set intersection calculation.
77 */
78struct BFMessage
79{
80 /**
81 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF
82 */
83 struct GNUNET_MessageHeader header;
84
85 /**
86 * Number of elements the sender still has in the set.
87 */
88 uint32_t sender_element_count GNUNET_PACKED;
89
90 /**
91 * XOR of all hashes over all elements remaining in the set.
92 * Used to determine termination.
93 */
94 struct GNUNET_HashCode element_xor_hash;
95
96 /**
97 * Mutator used with this bloomfilter.
98 */
99 uint32_t sender_mutator GNUNET_PACKED;
100
101 /**
102 * Total length of the bloomfilter data.
103 */
104 uint32_t bloomfilter_total_length GNUNET_PACKED;
105
106 /**
107 * Number of bits (k-value) used in encoding the bloomfilter.
108 */
109 uint32_t bits_per_element GNUNET_PACKED;
110
111 /**
112 * rest: the sender's bloomfilter
113 */
114};
115
116
117/**
118 * Last message, send to confirm the final set. Contains the element
119 * count as it is possible that the peer determined that we were done
120 * by getting the empty set, which in that case also needs to be
121 * communicated.
122 */
123struct IntersectionDoneMessage
124{
125 /**
126 * Type: #GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE
127 */
128 struct GNUNET_MessageHeader header;
129
130 /**
131 * Final number of elements in intersection.
132 */
133 uint32_t final_element_count GNUNET_PACKED;
134
135 /**
136 * XOR of all hashes over all elements remaining in the set.
137 */
138 struct GNUNET_HashCode element_xor_hash;
139};
140
141
142GNUNET_NETWORK_STRUCT_END
143
144#endif
diff --git a/src/seti/gnunet-seti-profiler.c b/src/seti/gnunet-seti-profiler.c
new file mode 100644
index 000000000..b8230bcfc
--- /dev/null
+++ b/src/seti/gnunet-seti-profiler.c
@@ -0,0 +1,480 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/gnunet-seti-profiler.c
23 * @brief profiling tool for set intersection
24 * @author Florian Dold
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_statistics_service.h"
29#include "gnunet_seti_service.h"
30#include "gnunet_testbed_service.h"
31
32
33static int ret;
34
35static unsigned int num_a = 5;
36static unsigned int num_b = 5;
37static unsigned int num_c = 20;
38
39const static struct GNUNET_CONFIGURATION_Handle *config;
40
41struct SetInfo
42{
43 char *id;
44 struct GNUNET_SETI_Handle *set;
45 struct GNUNET_SETI_OperationHandle *oh;
46 struct GNUNET_CONTAINER_MultiHashMap *sent;
47 struct GNUNET_CONTAINER_MultiHashMap *received;
48 int done;
49} info1, info2;
50
51static struct GNUNET_CONTAINER_MultiHashMap *common_sent;
52
53static struct GNUNET_HashCode app_id;
54
55static struct GNUNET_PeerIdentity local_peer;
56
57static struct GNUNET_SETI_ListenHandle *set_listener;
58
59static unsigned int use_intersection;
60
61static unsigned int element_size = 32;
62
63/**
64 * Handle to the statistics service.
65 */
66static struct GNUNET_STATISTICS_Handle *statistics;
67
68/**
69 * The profiler will write statistics
70 * for all peers to the file with this name.
71 */
72static char *statistics_filename;
73
74/**
75 * The profiler will write statistics
76 * for all peers to this file.
77 */
78static FILE *statistics_file;
79
80
81static int
82map_remove_iterator (void *cls,
83 const struct GNUNET_HashCode *key,
84 void *value)
85{
86 struct GNUNET_CONTAINER_MultiHashMap *m = cls;
87 int ret;
88
89 GNUNET_assert (NULL != key);
90
91 ret = GNUNET_CONTAINER_multihashmap_remove_all (m, key);
92 if (GNUNET_OK != ret)
93 printf ("spurious element\n");
94 return GNUNET_YES;
95}
96
97
98/**
99 * Callback function to process statistic values.
100 *
101 * @param cls closure
102 * @param subsystem name of subsystem that created the statistic
103 * @param name the name of the datum
104 * @param value the current value
105 * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
106 * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
107 */
108static int
109statistics_result (void *cls,
110 const char *subsystem,
111 const char *name,
112 uint64_t value,
113 int is_persistent)
114{
115 if (NULL != statistics_file)
116 {
117 fprintf (statistics_file, "%s\t%s\t%lu\n", subsystem, name, (unsigned
118 long) value);
119 }
120 return GNUNET_OK;
121}
122
123
124static void
125statistics_done (void *cls,
126 int success)
127{
128 GNUNET_assert (GNUNET_YES == success);
129 if (NULL != statistics_file)
130 fclose (statistics_file);
131 GNUNET_SCHEDULER_shutdown ();
132}
133
134
135static void
136check_all_done (void)
137{
138 if ((info1.done == GNUNET_NO) || (info2.done == GNUNET_NO))
139 return;
140
141 GNUNET_CONTAINER_multihashmap_iterate (info1.received, map_remove_iterator,
142 info2.sent);
143 GNUNET_CONTAINER_multihashmap_iterate (info2.received, map_remove_iterator,
144 info1.sent);
145
146 printf ("set a: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (
147 info1.sent));
148 printf ("set b: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size (
149 info2.sent));
150
151 if (NULL == statistics_filename)
152 {
153 GNUNET_SCHEDULER_shutdown ();
154 return;
155 }
156
157 statistics_file = fopen (statistics_filename, "w");
158 GNUNET_STATISTICS_get (statistics, NULL, NULL,
159 &statistics_done,
160 &statistics_result, NULL);
161}
162
163
164static void
165set_result_cb (void *cls,
166 const struct GNUNET_SETI_Element *element,
167 uint64_t current_size,
168 enum GNUNET_SETI_Status status)
169{
170 struct SetInfo *info = cls;
171 struct GNUNET_HashCode hash;
172
173 GNUNET_assert (GNUNET_NO == info->done);
174 switch (status)
175 {
176 case GNUNET_SETI_STATUS_DONE:
177 info->done = GNUNET_YES;
178 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
179 "set intersection done\n");
180 check_all_done ();
181 info->oh = NULL;
182 return;
183 case GNUNET_SETI_STATUS_FAILURE:
184 info->oh = NULL;
185 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
186 "failure\n");
187 GNUNET_SCHEDULER_shutdown ();
188 return;
189 case GNUNET_SETI_STATUS_ADD_LOCAL:
190 GNUNET_CRYPTO_hash (element->data,
191 element->size,
192 &hash);
193 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
194 "set %s: keep element %s\n",
195 info->id,
196 GNUNET_h2s (&hash));
197 break;
198 case GNUNET_SETI_STATUS_DEL_LOCAL:
199 GNUNET_CRYPTO_hash (element->data,
200 element->size,
201 &hash);
202 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
203 "set %s: remove element %s\n",
204 info->id,
205 GNUNET_h2s (&hash));
206 return;
207 default:
208 GNUNET_assert (0);
209 }
210
211 if (element->size != element_size)
212 {
213 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
214 "wrong element size: %u, expected %u\n",
215 element->size,
216 (unsigned int) sizeof(struct GNUNET_HashCode));
217 GNUNET_assert (0);
218 }
219
220 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
221 "set %s: got element (%s)\n",
222 info->id, GNUNET_h2s (element->data));
223 GNUNET_assert (NULL != element->data);
224 {
225 struct GNUNET_HashCode data_hash;
226
227 GNUNET_CRYPTO_hash (element->data,
228 element_size,
229 &data_hash);
230 GNUNET_CONTAINER_multihashmap_put (info->received,
231 &data_hash,
232 NULL,
233 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
234 }
235}
236
237
238static void
239set_listen_cb (void *cls,
240 const struct GNUNET_PeerIdentity *other_peer,
241 const struct GNUNET_MessageHeader *context_msg,
242 struct GNUNET_SETI_Request *request)
243{
244 /* max. 1 option plus terminator */
245 struct GNUNET_SETI_Option opts[2] = { { 0 } };
246 unsigned int n_opts = 0;
247
248 if (NULL == request)
249 {
250 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
251 "listener failed\n");
252 return;
253 }
254 GNUNET_assert (NULL == info2.oh);
255 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
256 "set listen cb called\n");
257 if (use_intersection)
258 {
259 opts[n_opts++] = (struct GNUNET_SETI_Option) { .type =
260 GNUNET_SETI_OPTION_RETURN_INTERSECTION };
261 }
262 opts[n_opts].type = GNUNET_SETI_OPTION_END;
263 info2.oh = GNUNET_SETI_accept (request,
264 opts,
265 &set_result_cb,
266 &info2);
267 GNUNET_SETI_commit (info2.oh,
268 info2.set);
269}
270
271
272static int
273set_insert_iterator (void *cls,
274 const struct GNUNET_HashCode *key,
275 void *value)
276{
277 struct GNUNET_SETI_Handle *set = cls;
278 struct GNUNET_SETI_Element el;
279
280 el.element_type = 0;
281 el.data = value;
282 el.size = element_size;
283 GNUNET_SETI_add_element (set, &el, NULL, NULL);
284 return GNUNET_YES;
285}
286
287
288static void
289handle_shutdown (void *cls)
290{
291 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
292 "Shutting down set profiler\n");
293 if (NULL != set_listener)
294 {
295 GNUNET_SETI_listen_cancel (set_listener);
296 set_listener = NULL;
297 }
298 if (NULL != info1.oh)
299 {
300 GNUNET_SETI_operation_cancel (info1.oh);
301 info1.oh = NULL;
302 }
303 if (NULL != info2.oh)
304 {
305 GNUNET_SETI_operation_cancel (info2.oh);
306 info2.oh = NULL;
307 }
308 if (NULL != info1.set)
309 {
310 GNUNET_SETI_destroy (info1.set);
311 info1.set = NULL;
312 }
313 if (NULL != info2.set)
314 {
315 GNUNET_SETI_destroy (info2.set);
316 info2.set = NULL;
317 }
318 GNUNET_STATISTICS_destroy (statistics, GNUNET_NO);
319}
320
321
322static void
323run (void *cls,
324 const struct GNUNET_CONFIGURATION_Handle *cfg,
325 struct GNUNET_TESTING_Peer *peer)
326{
327 unsigned int i;
328 struct GNUNET_HashCode hash;
329 /* max. 1 option plus terminator */
330 struct GNUNET_SETI_Option opts[2] = { { 0 } };
331 unsigned int n_opts = 0;
332
333 config = cfg;
334
335 GNUNET_assert (element_size > 0);
336
337 if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &local_peer))
338 {
339 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
340 "could not retrieve host identity\n");
341 ret = 0;
342 return;
343 }
344 statistics = GNUNET_STATISTICS_create ("set-profiler", cfg);
345 GNUNET_SCHEDULER_add_shutdown (&handle_shutdown, NULL);
346 info1.id = "a";
347 info2.id = "b";
348 info1.sent = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO);
349 info2.sent = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO);
350 common_sent = GNUNET_CONTAINER_multihashmap_create (num_c + 1, GNUNET_NO);
351 info1.received = GNUNET_CONTAINER_multihashmap_create (num_a + 1, GNUNET_NO);
352 info2.received = GNUNET_CONTAINER_multihashmap_create (num_b + 1, GNUNET_NO);
353 for (i = 0; i < num_a; i++)
354 {
355 char *data = GNUNET_malloc (element_size);
356 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size);
357 GNUNET_CRYPTO_hash (data, element_size, &hash);
358 GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, data,
359 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
360 }
361
362 for (i = 0; i < num_b; i++)
363 {
364 char *data = GNUNET_malloc (element_size);
365 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size);
366 GNUNET_CRYPTO_hash (data, element_size, &hash);
367 GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, data,
368 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
369 }
370
371 for (i = 0; i < num_c; i++)
372 {
373 char *data = GNUNET_malloc (element_size);
374 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, data, element_size);
375 GNUNET_CRYPTO_hash (data, element_size, &hash);
376 GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, data,
377 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
378 }
379
380 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &app_id);
381
382 info1.set = GNUNET_SETI_create (config);
383 info2.set = GNUNET_SETI_create (config);
384 GNUNET_CONTAINER_multihashmap_iterate (info1.sent,
385 &set_insert_iterator,
386 info1.set);
387 GNUNET_CONTAINER_multihashmap_iterate (info2.sent,
388 &set_insert_iterator,
389 info2.set);
390 GNUNET_CONTAINER_multihashmap_iterate (common_sent,
391 &set_insert_iterator,
392 info1.set);
393 GNUNET_CONTAINER_multihashmap_iterate (common_sent,
394 &set_insert_iterator,
395 info2.set);
396
397 set_listener = GNUNET_SETI_listen (config,
398 &app_id,
399 &set_listen_cb,
400 NULL);
401 if (use_intersection)
402 {
403 opts[n_opts++] = (struct GNUNET_SETI_Option) { .type =
404 GNUNET_SETI_OPTION_RETURN_INTERSECTION };
405 }
406 opts[n_opts].type = GNUNET_SETI_OPTION_END;
407
408 info1.oh = GNUNET_SETI_prepare (&local_peer,
409 &app_id,
410 NULL,
411 opts,
412 set_result_cb,
413 &info1);
414 GNUNET_SETI_commit (info1.oh,
415 info1.set);
416 GNUNET_SETI_destroy (info1.set);
417 info1.set = NULL;
418}
419
420
421static void
422pre_run (void *cls,
423 char *const *args,
424 const char *cfgfile,
425 const struct GNUNET_CONFIGURATION_Handle *cfg)
426{
427 if (0 != GNUNET_TESTING_peer_run ("set-profiler",
428 cfgfile,
429 &run, NULL))
430 ret = 2;
431}
432
433
434int
435main (int argc, char **argv)
436{
437 struct GNUNET_GETOPT_CommandLineOption options[] = {
438 GNUNET_GETOPT_option_uint ('A',
439 "num-first",
440 NULL,
441 gettext_noop ("number of values"),
442 &num_a),
443 GNUNET_GETOPT_option_uint ('B',
444 "num-second",
445 NULL,
446 gettext_noop ("number of values"),
447 &num_b),
448 GNUNET_GETOPT_option_uint ('C',
449 "num-common",
450 NULL,
451 gettext_noop ("number of values"),
452 &num_c),
453 GNUNET_GETOPT_option_uint ('i',
454 "use-intersection",
455 NULL,
456 gettext_noop (
457 "return intersection instead of delta"),
458 &use_intersection),
459 GNUNET_GETOPT_option_uint ('w',
460 "element-size",
461 NULL,
462 gettext_noop ("element size"),
463 &element_size),
464 GNUNET_GETOPT_option_filename ('s',
465 "statistics",
466 "FILENAME",
467 gettext_noop ("write statistics to file"),
468 &statistics_filename),
469 GNUNET_GETOPT_OPTION_END
470 };
471
472 GNUNET_PROGRAM_run2 (argc, argv,
473 "gnunet-seti-profiler",
474 "help",
475 options,
476 &pre_run,
477 NULL,
478 GNUNET_YES);
479 return ret;
480}
diff --git a/src/seti/plugin_block_seti_test.c b/src/seti/plugin_block_seti_test.c
new file mode 100644
index 000000000..1de086092
--- /dev/null
+++ b/src/seti/plugin_block_seti_test.c
@@ -0,0 +1,123 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2017 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/plugin_block_set_test.c
23 * @brief set test block, recognizes elements with non-zero first byte as invalid
24 * @author Christian Grothoff
25 */
26
27#include "platform.h"
28#include "gnunet_block_plugin.h"
29#include "gnunet_block_group_lib.h"
30
31
32/**
33 * Function called to validate a reply or a request. For
34 * request evaluation, simply pass "NULL" for the reply_block.
35 *
36 * @param cls closure
37 * @param ctx block context
38 * @param type block type
39 * @param group block group to use
40 * @param eo control flags
41 * @param query original query (hash)
42 * @param xquery extrended query data (can be NULL, depending on type)
43 * @param xquery_size number of bytes in xquery
44 * @param reply_block response to validate
45 * @param reply_block_size number of bytes in reply block
46 * @return characterization of result
47 */
48static enum GNUNET_BLOCK_EvaluationResult
49block_plugin_set_test_evaluate (void *cls,
50 struct GNUNET_BLOCK_Context *ctx,
51 enum GNUNET_BLOCK_Type type,
52 struct GNUNET_BLOCK_Group *group,
53 enum GNUNET_BLOCK_EvaluationOptions eo,
54 const struct GNUNET_HashCode *query,
55 const void *xquery,
56 size_t xquery_size,
57 const void *reply_block,
58 size_t reply_block_size)
59{
60 if ((NULL == reply_block) ||
61 (reply_block_size == 0) ||
62 (0 != ((char *) reply_block)[0]))
63 return GNUNET_BLOCK_EVALUATION_RESULT_INVALID;
64 return GNUNET_BLOCK_EVALUATION_OK_MORE;
65}
66
67
68/**
69 * Function called to obtain the key for a block.
70 *
71 * @param cls closure
72 * @param type block type
73 * @param block block to get the key for
74 * @param block_size number of bytes in block
75 * @param key set to the key (query) for the given block
76 * @return #GNUNET_OK on success, #GNUNET_SYSERR if type not supported
77 * (or if extracting a key from a block of this type does not work)
78 */
79static int
80block_plugin_set_test_get_key (void *cls,
81 enum GNUNET_BLOCK_Type type,
82 const void *block,
83 size_t block_size,
84 struct GNUNET_HashCode *key)
85{
86 return GNUNET_SYSERR;
87}
88
89
90/**
91 * Entry point for the plugin.
92 */
93void *
94libgnunet_plugin_block_set_test_init (void *cls)
95{
96 static enum GNUNET_BLOCK_Type types[] = {
97 GNUNET_BLOCK_TYPE_SET_TEST,
98 GNUNET_BLOCK_TYPE_ANY /* end of list */
99 };
100 struct GNUNET_BLOCK_PluginFunctions *api;
101
102 api = GNUNET_new (struct GNUNET_BLOCK_PluginFunctions);
103 api->evaluate = &block_plugin_set_test_evaluate;
104 api->get_key = &block_plugin_set_test_get_key;
105 api->types = types;
106 return api;
107}
108
109
110/**
111 * Exit point from the plugin.
112 */
113void *
114libgnunet_plugin_block_set_test_done (void *cls)
115{
116 struct GNUNET_BLOCK_PluginFunctions *api = cls;
117
118 GNUNET_free (api);
119 return NULL;
120}
121
122
123/* end of plugin_block_set_test.c */
diff --git a/src/seti/seti.conf.in b/src/seti/seti.conf.in
new file mode 100644
index 000000000..e4f7b60b5
--- /dev/null
+++ b/src/seti/seti.conf.in
@@ -0,0 +1,12 @@
1[seti]
2START_ON_DEMAND = @START_ON_DEMAND@
3@UNIXONLY@PORT = 2106
4HOSTNAME = localhost
5BINARY = gnunet-service-seti
6ACCEPT_FROM = 127.0.0.1;
7ACCEPT_FROM6 = ::1;
8UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-seti.sock
9UNIX_MATCH_UID = YES
10UNIX_MATCH_GID = YES
11
12#PREFIX = valgrind
diff --git a/src/seti/seti.h b/src/seti/seti.h
new file mode 100644
index 000000000..aa7014034
--- /dev/null
+++ b/src/seti/seti.h
@@ -0,0 +1,267 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2014, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file set/seti.h
22 * @brief messages used for the set intersection api
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#ifndef SETI_H
27#define SETI_H
28
29#include "platform.h"
30#include "gnunet_common.h"
31#include "gnunet_set_service.h"
32
33GNUNET_NETWORK_STRUCT_BEGIN
34
35/**
36 * Message sent by the client to the service to ask starting
37 * a new set to perform operations with.
38 */
39struct GNUNET_SETI_CreateMessage
40{
41 /**
42 * Type: #GNUNET_MESSAGE_TYPE_SETI_CREATE
43 */
44 struct GNUNET_MessageHeader header;
45};
46
47
48/**
49 * Message sent by the client to the service to start listening for
50 * incoming requests to perform a certain type of set operation for a
51 * certain type of application.
52 */
53struct GNUNET_SETI_ListenMessage
54{
55 /**
56 * Type: #GNUNET_MESSAGE_TYPE_SETI_LISTEN
57 */
58 struct GNUNET_MessageHeader header;
59
60 /**
61 * Operation type, values of `enum GNUNET_SETI_OperationType`
62 */
63 uint32_t operation GNUNET_PACKED;
64
65 /**
66 * application id
67 */
68 struct GNUNET_HashCode app_id;
69};
70
71
72/**
73 * Message sent by a listening client to the service to accept
74 * performing the operation with the other peer.
75 */
76struct GNUNET_SETI_AcceptMessage
77{
78 /**
79 * Type: #GNUNET_MESSAGE_TYPE_SETI_ACCEPT
80 */
81 struct GNUNET_MessageHeader header;
82
83 /**
84 * ID of the incoming request we want to accept.
85 */
86 uint32_t accept_reject_id GNUNET_PACKED;
87
88 /**
89 * Request ID to identify responses.
90 */
91 uint32_t request_id GNUNET_PACKED;
92
93 /**
94 * Return the intersection (1), instead of the elements to
95 * remove / the delta (0), in NBO.
96 */
97 uint32_t return_intersection;
98
99};
100
101
102/**
103 * Message sent by a listening client to the service to reject
104 * performing the operation with the other peer.
105 */
106struct GNUNET_SETI_RejectMessage
107{
108 /**
109 * Type: #GNUNET_MESSAGE_TYPE_SETI_REJECT
110 */
111 struct GNUNET_MessageHeader header;
112
113 /**
114 * ID of the incoming request we want to reject.
115 */
116 uint32_t accept_reject_id GNUNET_PACKED;
117};
118
119
120/**
121 * A request for an operation with another client.
122 */
123struct GNUNET_SETI_RequestMessage
124{
125 /**
126 * Type: #GNUNET_MESSAGE_TYPE_SETI_REQUEST.
127 */
128 struct GNUNET_MessageHeader header;
129
130 /**
131 * ID of the to identify the request when accepting or
132 * rejecting it.
133 */
134 uint32_t accept_id GNUNET_PACKED;
135
136 /**
137 * Identity of the requesting peer.
138 */
139 struct GNUNET_PeerIdentity peer_id;
140
141 /* rest: context message, that is, application-specific
142 message to convince listener to pick up */
143};
144
145
146/**
147 * Message sent by client to service to initiate a set operation as a
148 * client (not as listener). A set (which determines the operation
149 * type) must already exist in association with this client.
150 */
151struct GNUNET_SETI_EvaluateMessage
152{
153 /**
154 * Type: #GNUNET_MESSAGE_TYPE_SETI_EVALUATE
155 */
156 struct GNUNET_MessageHeader header;
157
158 /**
159 * Id of our set to evaluate, chosen implicitly by the client when it
160 * calls #GNUNET_SETI_commit().
161 */
162 uint32_t request_id GNUNET_PACKED;
163
164 /**
165 * Peer to evaluate the operation with
166 */
167 struct GNUNET_PeerIdentity target_peer;
168
169 /**
170 * Application id
171 */
172 struct GNUNET_HashCode app_id;
173
174 /**
175 * Return the intersection (1), instead of the elements to
176 * remove / the delta (0), in NBO.
177 */
178 uint32_t return_intersection;
179
180 /* rest: context message, that is, application-specific
181 message to convince listener to pick up */
182};
183
184
185/**
186 * Message sent by the service to the client to indicate an
187 * element that is removed (set intersection) or added
188 * (set union) or part of the final result, depending on
189 * options specified for the operation.
190 */
191struct GNUNET_SETI_ResultMessage
192{
193 /**
194 * Type: #GNUNET_MESSAGE_TYPE_SETI_RESULT
195 */
196 struct GNUNET_MessageHeader header;
197
198 /**
199 * Current set size.
200 */
201 uint64_t current_size;
202
203 /**
204 * id the result belongs to
205 */
206 uint32_t request_id GNUNET_PACKED;
207
208 /**
209 * Was the evaluation successful? Contains
210 * an `enum GNUNET_SETI_Status` in NBO.
211 */
212 uint16_t result_status GNUNET_PACKED;
213
214 /**
215 * Type of the element attachted to the message, if any.
216 */
217 uint16_t element_type GNUNET_PACKED;
218
219 /* rest: the actual element */
220};
221
222
223/**
224 * Message sent by client to the service to add an element to the set.
225 */
226struct GNUNET_SETI_ElementMessage
227{
228 /**
229 * Type: #GNUNET_MESSAGE_TYPE_SETI_ADD.
230 */
231 struct GNUNET_MessageHeader header;
232
233 /**
234 * Type of the element to add or remove.
235 */
236 uint16_t element_type GNUNET_PACKED;
237
238 /**
239 * For alignment, always zero.
240 */
241 uint16_t reserved GNUNET_PACKED;
242
243 /* rest: the actual element */
244};
245
246
247/**
248 * Sent to the service by the client
249 * in order to cancel a set operation.
250 */
251struct GNUNET_SETI_CancelMessage
252{
253 /**
254 * Type: #GNUNET_MESSAGE_TYPE_SETI_CANCEL
255 */
256 struct GNUNET_MessageHeader header;
257
258 /**
259 * ID of the request we want to cancel.
260 */
261 uint32_t request_id GNUNET_PACKED;
262};
263
264
265GNUNET_NETWORK_STRUCT_END
266
267#endif
diff --git a/src/seti/seti_api.c b/src/seti/seti_api.c
new file mode 100644
index 000000000..d80a60684
--- /dev/null
+++ b/src/seti/seti_api.c
@@ -0,0 +1,895 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2016, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file seti/seti_api.c
22 * @brief api for the set service
23 * @author Florian Dold
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_protocols.h"
29#include "gnunet_seti_service.h"
30#include "seti.h"
31
32
33#define LOG(kind, ...) GNUNET_log_from (kind, "seti-api", __VA_ARGS__)
34
35
36/**
37 * Opaque handle to a set.
38 */
39struct GNUNET_SETI_Handle
40{
41 /**
42 * Message queue for @e client.
43 */
44 struct GNUNET_MQ_Handle *mq;
45
46 /**
47 * Linked list of operations on the set.
48 */
49 struct GNUNET_SETI_OperationHandle *ops_head;
50
51 /**
52 * Linked list of operations on the set.
53 */
54 struct GNUNET_SETI_OperationHandle *ops_tail;
55
56 /**
57 * Configuration, needed when creating (lazy) copies.
58 */
59 const struct GNUNET_CONFIGURATION_Handle *cfg;
60
61 /**
62 * Should the set be destroyed once all operations are gone?
63 * #GNUNET_SYSERR if #GNUNET_SETI_destroy() must raise this flag,
64 * #GNUNET_YES if #GNUNET_SETI_destroy() did raise this flag.
65 */
66 int destroy_requested;
67
68 /**
69 * Has the set become invalid (e.g. service died)?
70 */
71 int invalid;
72
73 /**
74 * Both client and service count the number of iterators
75 * created so far to match replies with iterators.
76 */
77 uint16_t iteration_id;
78
79};
80
81
82/**
83 * Handle for a set operation request from another peer.
84 */
85struct GNUNET_SETI_Request
86{
87 /**
88 * Id of the request, used to identify the request when
89 * accepting/rejecting it.
90 */
91 uint32_t accept_id;
92
93 /**
94 * Has the request been accepted already?
95 * #GNUNET_YES/#GNUNET_NO
96 */
97 int accepted;
98};
99
100
101/**
102 * Handle to an operation. Only known to the service after committing
103 * the handle with a set.
104 */
105struct GNUNET_SETI_OperationHandle
106{
107 /**
108 * Function to be called when we have a result,
109 * or an error.
110 */
111 GNUNET_SETI_ResultIterator result_cb;
112
113 /**
114 * Closure for @e result_cb.
115 */
116 void *result_cls;
117
118 /**
119 * Local set used for the operation,
120 * NULL if no set has been provided by conclude yet.
121 */
122 struct GNUNET_SETI_Handle *set;
123
124 /**
125 * Message sent to the server on calling conclude,
126 * NULL if conclude has been called.
127 */
128 struct GNUNET_MQ_Envelope *conclude_mqm;
129
130 /**
131 * Address of the request if in the conclude message,
132 * used to patch the request id into the message when the set is known.
133 */
134 uint32_t *request_id_addr;
135
136 /**
137 * Handles are kept in a linked list.
138 */
139 struct GNUNET_SETI_OperationHandle *prev;
140
141 /**
142 * Handles are kept in a linked list.
143 */
144 struct GNUNET_SETI_OperationHandle *next;
145
146 /**
147 * Request ID to identify the operation within the set.
148 */
149 uint32_t request_id;
150
151 /**
152 * Should we return the resulting intersection (ADD) or
153 * the elements to remove (DEL)?
154 */
155 int return_intersection;
156};
157
158
159/**
160 * Opaque handle to a listen operation.
161 */
162struct GNUNET_SETI_ListenHandle
163{
164 /**
165 * Message queue for the client.
166 */
167 struct GNUNET_MQ_Handle*mq;
168
169 /**
170 * Configuration handle for the listener, stored
171 * here to be able to reconnect transparently on
172 * connection failure.
173 */
174 const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176 /**
177 * Function to call on a new incoming request,
178 * or on error.
179 */
180 GNUNET_SETI_ListenCallback listen_cb;
181
182 /**
183 * Closure for @e listen_cb.
184 */
185 void *listen_cls;
186
187 /**
188 * Task for reconnecting when the listener fails.
189 */
190 struct GNUNET_SCHEDULER_Task *reconnect_task;
191
192 /**
193 * Application ID we listen for.
194 */
195 struct GNUNET_HashCode app_id;
196
197 /**
198 * Time to wait until we try to reconnect on failure.
199 */
200 struct GNUNET_TIME_Relative reconnect_backoff;
201
202};
203
204
205/**
206 * Check that the given @a msg is well-formed.
207 *
208 * @param cls closure
209 * @param msg message to check
210 * @return #GNUNET_OK if message is well-formed
211 */
212static int
213check_result (void *cls,
214 const struct GNUNET_SETI_ResultMessage *msg)
215{
216 /* minimum size was already checked, everything else is OK! */
217 return GNUNET_OK;
218}
219
220
221/**
222 * Handle result message for a set operation.
223 *
224 * @param cls the set
225 * @param mh the message
226 */
227static void
228handle_result (void *cls,
229 const struct GNUNET_SETI_ResultMessage *msg)
230{
231 struct GNUNET_SETI_Handle *set = cls;
232 struct GNUNET_SETI_OperationHandle *oh;
233 struct GNUNET_SETI_Element e;
234 enum GNUNET_SETI_Status result_status;
235 int destroy_set;
236
237 GNUNET_assert (NULL != set->mq);
238 result_status = (enum GNUNET_SETI_Status) ntohs (msg->result_status);
239 LOG (GNUNET_ERROR_TYPE_DEBUG,
240 "Got result message with status %d\n",
241 result_status);
242 oh = GNUNET_MQ_assoc_get (set->mq,
243 ntohl (msg->request_id));
244 if (NULL == oh)
245 {
246 /* 'oh' can be NULL if we canceled the operation, but the service
247 did not get the cancel message yet. */
248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
249 "Ignoring result from canceled operation\n");
250 return;
251 }
252
253 switch (result_status)
254 {
255 case GNUNET_SETI_STATUS_ADD_LOCAL:
256 case GNUNET_SETI_STATUS_DEL_LOCAL:
257 e.data = &msg[1];
258 e.size = ntohs (msg->header.size)
259 - sizeof(struct GNUNET_SETI_ResultMessage);
260 e.element_type = ntohs (msg->element_type);
261 if (NULL != oh->result_cb)
262 oh->result_cb (oh->result_cls,
263 &e,
264 GNUNET_ntohll (msg->current_size),
265 result_status);
266 return;
267 case GNUNET_SETI_STATUS_FAILURE:
268 case GNUNET_SETI_STATUS_DONE:
269 GNUNET_MQ_assoc_remove (set->mq,
270 ntohl (msg->request_id));
271 GNUNET_CONTAINER_DLL_remove (set->ops_head,
272 set->ops_tail,
273 oh);
274 /* Need to do this calculation _before_ the result callback,
275 as IF the application still has a valid set handle, it
276 may trigger destruction of the set during the callback. */
277 destroy_set = (GNUNET_YES == set->destroy_requested) &&
278 (NULL == set->ops_head);
279 if (NULL != oh->result_cb)
280 {
281 oh->result_cb (oh->result_cls,
282 NULL,
283 GNUNET_ntohll (msg->current_size),
284 result_status);
285 }
286 else
287 {
288 LOG (GNUNET_ERROR_TYPE_DEBUG,
289 "No callback for final status\n");
290 }
291 if (destroy_set)
292 GNUNET_SETI_destroy (set);
293 GNUNET_free (oh);
294 return;
295 }
296}
297
298
299/**
300 * Destroy the given set operation.
301 *
302 * @param oh set operation to destroy
303 */
304static void
305set_operation_destroy (struct GNUNET_SETI_OperationHandle *oh)
306{
307 struct GNUNET_SETI_Handle *set = oh->set;
308 struct GNUNET_SETI_OperationHandle *h_assoc;
309
310 if (NULL != oh->conclude_mqm)
311 GNUNET_MQ_discard (oh->conclude_mqm);
312 /* is the operation already commited? */
313 if (NULL != set)
314 {
315 GNUNET_CONTAINER_DLL_remove (set->ops_head,
316 set->ops_tail,
317 oh);
318 h_assoc = GNUNET_MQ_assoc_remove (set->mq,
319 oh->request_id);
320 GNUNET_assert ((NULL == h_assoc) ||
321 (h_assoc == oh));
322 }
323 GNUNET_free (oh);
324}
325
326
327/**
328 * Cancel the given set operation. We need to send an explicit cancel
329 * message, as all operations one one set communicate using one
330 * handle.
331 *
332 * @param oh set operation to cancel
333 */
334void
335GNUNET_SETI_operation_cancel (struct GNUNET_SETI_OperationHandle *oh)
336{
337 struct GNUNET_SETI_Handle *set = oh->set;
338 struct GNUNET_SETI_CancelMessage *m;
339 struct GNUNET_MQ_Envelope *mqm;
340
341 LOG (GNUNET_ERROR_TYPE_DEBUG,
342 "Cancelling SET operation\n");
343 if (NULL != set)
344 {
345 mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SETI_CANCEL);
346 m->request_id = htonl (oh->request_id);
347 GNUNET_MQ_send (set->mq, mqm);
348 }
349 set_operation_destroy (oh);
350 if ((NULL != set) &&
351 (GNUNET_YES == set->destroy_requested) &&
352 (NULL == set->ops_head))
353 {
354 LOG (GNUNET_ERROR_TYPE_DEBUG,
355 "Destroying set after operation cancel\n");
356 GNUNET_SETI_destroy (set);
357 }
358}
359
360
361/**
362 * We encountered an error communicating with the set service while
363 * performing a set operation. Report to the application.
364 *
365 * @param cls the `struct GNUNET_SETI_Handle`
366 * @param error error code
367 */
368static void
369handle_client_set_error (void *cls,
370 enum GNUNET_MQ_Error error)
371{
372 struct GNUNET_SETI_Handle *set = cls;
373
374 LOG (GNUNET_ERROR_TYPE_ERROR,
375 "Handling client set error %d\n",
376 error);
377 while (NULL != set->ops_head)
378 {
379 if ((NULL != set->ops_head->result_cb) &&
380 (GNUNET_NO == set->destroy_requested))
381 set->ops_head->result_cb (set->ops_head->result_cls,
382 NULL,
383 0,
384 GNUNET_SETI_STATUS_FAILURE);
385 set_operation_destroy (set->ops_head);
386 }
387 set->invalid = GNUNET_YES;
388}
389
390
391/**
392 * Create an empty set.
393 *
394 * @param cfg configuration to use for connecting to the
395 * set service
396 * @return a handle to the set
397 */
398struct GNUNET_SETI_Handle *
399GNUNET_SETI_create (const struct GNUNET_CONFIGURATION_Handle *cfg)
400{
401 struct GNUNET_SETI_Handle *set = GNUNET_new (struct GNUNET_SETI_Handle);
402 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
403 GNUNET_MQ_hd_var_size (result,
404 GNUNET_MESSAGE_TYPE_SETI_RESULT,
405 struct GNUNET_SETI_ResultMessage,
406 set),
407 GNUNET_MQ_handler_end ()
408 };
409 struct GNUNET_MQ_Envelope *mqm;
410 struct GNUNET_SETI_CreateMessage *create_msg;
411
412 set->cfg = cfg;
413 set->mq = GNUNET_CLIENT_connect (cfg,
414 "set",
415 mq_handlers,
416 &handle_client_set_error,
417 set);
418 if (NULL == set->mq)
419 {
420 GNUNET_free (set);
421 return NULL;
422 }
423 LOG (GNUNET_ERROR_TYPE_DEBUG,
424 "Creating new intersection set\n");
425 mqm = GNUNET_MQ_msg (create_msg,
426 GNUNET_MESSAGE_TYPE_SETI_CREATE);
427 GNUNET_MQ_send (set->mq,
428 mqm);
429 return set;
430}
431
432
433/**
434 * Add an element to the given set. After the element has been added
435 * (in the sense of being transmitted to the set service), @a cont
436 * will be called. Multiple calls to GNUNET_SETI_add_element() can be
437 * queued.
438 *
439 * @param set set to add element to
440 * @param element element to add to the set
441 * @param cb continuation called after the element has been added
442 * @param cb_cls closure for @a cont
443 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
444 * set is invalid (e.g. the set service crashed)
445 */
446int
447GNUNET_SETI_add_element (struct GNUNET_SETI_Handle *set,
448 const struct GNUNET_SETI_Element *element,
449 GNUNET_SCHEDULER_TaskCallback cb,
450 void *cb_cls)
451{
452 struct GNUNET_MQ_Envelope *mqm;
453 struct GNUNET_SETI_ElementMessage *msg;
454
455 LOG (GNUNET_ERROR_TYPE_DEBUG,
456 "adding element of type %u to set %p\n",
457 (unsigned int) element->element_type,
458 set);
459 if (GNUNET_YES == set->invalid)
460 {
461 if (NULL != cb)
462 cb (cb_cls);
463 return GNUNET_SYSERR;
464 }
465 mqm = GNUNET_MQ_msg_extra (msg,
466 element->size,
467 GNUNET_MESSAGE_TYPE_SETI_ADD);
468 msg->element_type = htons (element->element_type);
469 GNUNET_memcpy (&msg[1],
470 element->data,
471 element->size);
472 GNUNET_MQ_notify_sent (mqm,
473 cb,
474 cb_cls);
475 GNUNET_MQ_send (set->mq,
476 mqm);
477 return GNUNET_OK;
478}
479
480
481/**
482 * Destroy the set handle if no operations are left, mark the set
483 * for destruction otherwise.
484 *
485 * @param set set handle to destroy
486 */
487void
488GNUNET_SETI_destroy (struct GNUNET_SETI_Handle *set)
489{
490 /* destroying set while iterator is active is currently
491 not supported; we should expand the API to allow
492 clients to explicitly cancel the iteration! */
493 if ((NULL != set->ops_head) ||
494 (GNUNET_SYSERR == set->destroy_requested))
495 {
496 LOG (GNUNET_ERROR_TYPE_DEBUG,
497 "Set operations are pending, delaying set destruction\n");
498 set->destroy_requested = GNUNET_YES;
499 return;
500 }
501 LOG (GNUNET_ERROR_TYPE_DEBUG,
502 "Really destroying set\n");
503 if (NULL != set->mq)
504 {
505 GNUNET_MQ_destroy (set->mq);
506 set->mq = NULL;
507 }
508 GNUNET_free (set);
509}
510
511
512/**
513 * Prepare a set operation to be evaluated with another peer.
514 * The evaluation will not start until the client provides
515 * a local set with #GNUNET_SETI_commit().
516 *
517 * @param other_peer peer with the other set
518 * @param app_id hash for the application using the set
519 * @param context_msg additional information for the request
520 * @param options options to use when processing the request
521 * @param result_cb called on error or success
522 * @param result_cls closure for @e result_cb
523 * @return a handle to cancel the operation
524 */
525struct GNUNET_SETI_OperationHandle *
526GNUNET_SETI_prepare (const struct GNUNET_PeerIdentity *other_peer,
527 const struct GNUNET_HashCode *app_id,
528 const struct GNUNET_MessageHeader *context_msg,
529 const struct GNUNET_SETI_Option options[],
530 GNUNET_SETI_ResultIterator result_cb,
531 void *result_cls)
532{
533 struct GNUNET_MQ_Envelope *mqm;
534 struct GNUNET_SETI_OperationHandle *oh;
535 struct GNUNET_SETI_EvaluateMessage *msg;
536
537 oh = GNUNET_new (struct GNUNET_SETI_OperationHandle);
538 oh->result_cb = result_cb;
539 oh->result_cls = result_cls;
540 mqm = GNUNET_MQ_msg_nested_mh (msg,
541 GNUNET_MESSAGE_TYPE_SETI_EVALUATE,
542 context_msg);
543 msg->app_id = *app_id;
544 msg->target_peer = *other_peer;
545 for (const struct GNUNET_SETI_Option *opt = options;
546 GNUNET_SETI_OPTION_END != opt->type;
547 opt++)
548 {
549 switch (opt->type)
550 {
551 case GNUNET_SETI_OPTION_RETURN_INTERSECTION:
552 msg->return_intersection = GNUNET_YES;
553 break;
554 default:
555 LOG (GNUNET_ERROR_TYPE_ERROR,
556 "Option with type %d not recognized\n",
557 (int) opt->type);
558 }
559 }
560 oh->conclude_mqm = mqm;
561 oh->request_id_addr = &msg->request_id;
562 return oh;
563}
564
565
566/**
567 * Connect to the set service in order to listen for requests.
568 *
569 * @param cls the `struct GNUNET_SETI_ListenHandle *` to connect
570 */
571static void
572listen_connect (void *cls);
573
574
575/**
576 * Check validity of request message for a listen operation
577 *
578 * @param cls the listen handle
579 * @param msg the message
580 * @return #GNUNET_OK if the message is well-formed
581 */
582static int
583check_request (void *cls,
584 const struct GNUNET_SETI_RequestMessage *msg)
585{
586 const struct GNUNET_MessageHeader *context_msg;
587
588 if (ntohs (msg->header.size) == sizeof(*msg))
589 return GNUNET_OK; /* no context message is OK */
590 context_msg = GNUNET_MQ_extract_nested_mh (msg);
591 if (NULL == context_msg)
592 {
593 /* malformed context message is NOT ok */
594 GNUNET_break_op (0);
595 return GNUNET_SYSERR;
596 }
597 return GNUNET_OK;
598}
599
600
601/**
602 * Handle request message for a listen operation
603 *
604 * @param cls the listen handle
605 * @param msg the message
606 */
607static void
608handle_request (void *cls,
609 const struct GNUNET_SETI_RequestMessage *msg)
610{
611 struct GNUNET_SETI_ListenHandle *lh = cls;
612 struct GNUNET_SETI_Request req;
613 const struct GNUNET_MessageHeader *context_msg;
614 struct GNUNET_MQ_Envelope *mqm;
615 struct GNUNET_SETI_RejectMessage *rmsg;
616
617 LOG (GNUNET_ERROR_TYPE_DEBUG,
618 "Processing incoming operation request with id %u\n",
619 ntohl (msg->accept_id));
620 /* we got another valid request => reset the backoff */
621 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
622 req.accept_id = ntohl (msg->accept_id);
623 req.accepted = GNUNET_NO;
624 context_msg = GNUNET_MQ_extract_nested_mh (msg);
625 /* calling #GNUNET_SETI_accept() in the listen cb will set req->accepted */
626 lh->listen_cb (lh->listen_cls,
627 &msg->peer_id,
628 context_msg,
629 &req);
630 if (GNUNET_YES == req.accepted)
631 return; /* the accept-case is handled in #GNUNET_SETI_accept() */
632 LOG (GNUNET_ERROR_TYPE_DEBUG,
633 "Rejected request %u\n",
634 ntohl (msg->accept_id));
635 mqm = GNUNET_MQ_msg (rmsg,
636 GNUNET_MESSAGE_TYPE_SETI_REJECT);
637 rmsg->accept_reject_id = msg->accept_id;
638 GNUNET_MQ_send (lh->mq,
639 mqm);
640}
641
642
643/**
644 * Our connection with the set service encountered an error,
645 * re-initialize with exponential back-off.
646 *
647 * @param cls the `struct GNUNET_SETI_ListenHandle *`
648 * @param error reason for the disconnect
649 */
650static void
651handle_client_listener_error (void *cls,
652 enum GNUNET_MQ_Error error)
653{
654 struct GNUNET_SETI_ListenHandle *lh = cls;
655
656 LOG (GNUNET_ERROR_TYPE_DEBUG,
657 "Listener broke down (%d), re-connecting\n",
658 (int) error);
659 GNUNET_MQ_destroy (lh->mq);
660 lh->mq = NULL;
661 lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
662 &listen_connect,
663 lh);
664 lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
665}
666
667
668/**
669 * Connect to the set service in order to listen for requests.
670 *
671 * @param cls the `struct GNUNET_SETI_ListenHandle *` to connect
672 */
673static void
674listen_connect (void *cls)
675{
676 struct GNUNET_SETI_ListenHandle *lh = cls;
677 struct GNUNET_MQ_MessageHandler mq_handlers[] = {
678 GNUNET_MQ_hd_var_size (request,
679 GNUNET_MESSAGE_TYPE_SETI_REQUEST,
680 struct GNUNET_SETI_RequestMessage,
681 lh),
682 GNUNET_MQ_handler_end ()
683 };
684 struct GNUNET_MQ_Envelope *mqm;
685 struct GNUNET_SETI_ListenMessage *msg;
686
687 lh->reconnect_task = NULL;
688 GNUNET_assert (NULL == lh->mq);
689 lh->mq = GNUNET_CLIENT_connect (lh->cfg,
690 "set",
691 mq_handlers,
692 &handle_client_listener_error,
693 lh);
694 if (NULL == lh->mq)
695 return;
696 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SETI_LISTEN);
697 msg->app_id = lh->app_id;
698 GNUNET_MQ_send (lh->mq,
699 mqm);
700}
701
702
703/**
704 * Wait for set operation requests for the given application id
705 *
706 * @param cfg configuration to use for connecting to
707 * the set service, needs to be valid for the lifetime of the listen handle
708 * @param app_id id of the application that handles set operation requests
709 * @param listen_cb called for each incoming request matching the operation
710 * and application id
711 * @param listen_cls handle for @a listen_cb
712 * @return a handle that can be used to cancel the listen operation
713 */
714struct GNUNET_SETI_ListenHandle *
715GNUNET_SETI_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
716 const struct GNUNET_HashCode *app_id,
717 GNUNET_SETI_ListenCallback listen_cb,
718 void *listen_cls)
719{
720 struct GNUNET_SETI_ListenHandle *lh;
721
722 LOG (GNUNET_ERROR_TYPE_DEBUG,
723 "Starting listener for app %s\n",
724 GNUNET_h2s (app_id));
725 lh = GNUNET_new (struct GNUNET_SETI_ListenHandle);
726 lh->listen_cb = listen_cb;
727 lh->listen_cls = listen_cls;
728 lh->cfg = cfg;
729 lh->app_id = *app_id;
730 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
731 listen_connect (lh);
732 if (NULL == lh->mq)
733 {
734 GNUNET_free (lh);
735 return NULL;
736 }
737 return lh;
738}
739
740
741/**
742 * Cancel the given listen operation.
743 *
744 * @param lh handle for the listen operation
745 */
746void
747GNUNET_SETI_listen_cancel (struct GNUNET_SETI_ListenHandle *lh)
748{
749 LOG (GNUNET_ERROR_TYPE_DEBUG,
750 "Canceling listener %s\n",
751 GNUNET_h2s (&lh->app_id));
752 if (NULL != lh->mq)
753 {
754 GNUNET_MQ_destroy (lh->mq);
755 lh->mq = NULL;
756 }
757 if (NULL != lh->reconnect_task)
758 {
759 GNUNET_SCHEDULER_cancel (lh->reconnect_task);
760 lh->reconnect_task = NULL;
761 }
762 GNUNET_free (lh);
763}
764
765
766/**
767 * Accept a request we got via #GNUNET_SETI_listen. Must be called during
768 * #GNUNET_SETI_listen, as the 'struct GNUNET_SETI_Request' becomes invalid
769 * afterwards.
770 * Call #GNUNET_SETI_commit to provide the local set to use for the operation,
771 * and to begin the exchange with the remote peer.
772 *
773 * @param request request to accept
774 * @param options options to use when processing the request
775 * @param result_cb callback for the results
776 * @param result_cls closure for @a result_cb
777 * @return a handle to cancel the operation
778 */
779struct GNUNET_SETI_OperationHandle *
780GNUNET_SETI_accept (struct GNUNET_SETI_Request *request,
781 const struct GNUNET_SETI_Option options[],
782 GNUNET_SETI_ResultIterator result_cb,
783 void *result_cls)
784{
785 struct GNUNET_MQ_Envelope *mqm;
786 struct GNUNET_SETI_OperationHandle *oh;
787 struct GNUNET_SETI_AcceptMessage *msg;
788
789 GNUNET_assert (GNUNET_NO == request->accepted);
790 LOG (GNUNET_ERROR_TYPE_DEBUG,
791 "Client accepts set intersection operation with id %u\n",
792 request->accept_id);
793 request->accepted = GNUNET_YES;
794 mqm = GNUNET_MQ_msg (msg,
795 GNUNET_MESSAGE_TYPE_SETI_ACCEPT);
796 msg->accept_reject_id = htonl (request->accept_id);
797 oh = GNUNET_new (struct GNUNET_SETI_OperationHandle);
798 oh->result_cb = result_cb;
799 oh->result_cls = result_cls;
800 oh->conclude_mqm = mqm;
801 oh->request_id_addr = &msg->request_id;
802 for (const struct GNUNET_SETI_Option *opt = options;
803 GNUNET_SETI_OPTION_END != opt->type;
804 opt++)
805 {
806 switch (opt->type)
807 {
808 case GNUNET_SETI_OPTION_RETURN_INTERSECTION:
809 oh->return_intersection = GNUNET_YES;
810 break;
811 default:
812 LOG (GNUNET_ERROR_TYPE_ERROR,
813 "Option with type %d not recognized\n",
814 (int) opt->type);
815 }
816 }
817 return oh;
818}
819
820
821/**
822 * Commit a set to be used with a set operation.
823 * This function is called once we have fully constructed
824 * the set that we want to use for the operation. At this
825 * time, the P2P protocol can then begin to exchange the
826 * set information and call the result callback with the
827 * result information.
828 *
829 * @param oh handle to the set operation
830 * @param set the set to use for the operation
831 * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
832 * set is invalid (e.g. the set service crashed)
833 */
834int
835GNUNET_SETI_commit (struct GNUNET_SETI_OperationHandle *oh,
836 struct GNUNET_SETI_Handle *set)
837{
838 if (NULL != oh->set)
839 {
840 /* Some other set was already committed for this
841 * operation, there is a logic bug in the client of this API */
842 GNUNET_break (0);
843 return GNUNET_OK;
844 }
845 GNUNET_assert (NULL != set);
846 if (GNUNET_YES == set->invalid)
847 return GNUNET_SYSERR;
848 LOG (GNUNET_ERROR_TYPE_DEBUG,
849 "Client commits to SET\n");
850 GNUNET_assert (NULL != oh->conclude_mqm);
851 oh->set = set;
852 GNUNET_CONTAINER_DLL_insert (set->ops_head,
853 set->ops_tail,
854 oh);
855 oh->request_id = GNUNET_MQ_assoc_add (set->mq,
856 oh);
857 *oh->request_id_addr = htonl (oh->request_id);
858 GNUNET_MQ_send (set->mq,
859 oh->conclude_mqm);
860 oh->conclude_mqm = NULL;
861 oh->request_id_addr = NULL;
862 return GNUNET_OK;
863}
864
865
866/**
867 * Hash a set element.
868 *
869 * @param element the element that should be hashed
870 * @param[out] ret_hash a pointer to where the hash of @a element
871 * should be stored
872 */
873void
874GNUNET_SETI_element_hash (const struct GNUNET_SETI_Element *element,
875 struct GNUNET_HashCode *ret_hash)
876{
877 struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
878
879 /* It's not guaranteed that the element data is always after the element header,
880 so we need to hash the chunks separately. */
881 GNUNET_CRYPTO_hash_context_read (ctx,
882 &element->size,
883 sizeof(uint16_t));
884 GNUNET_CRYPTO_hash_context_read (ctx,
885 &element->element_type,
886 sizeof(uint16_t));
887 GNUNET_CRYPTO_hash_context_read (ctx,
888 element->data,
889 element->size);
890 GNUNET_CRYPTO_hash_context_finish (ctx,
891 ret_hash);
892}
893
894
895/* end of seti_api.c */
diff --git a/src/seti/test_seti.conf b/src/seti/test_seti.conf
new file mode 100644
index 000000000..21fe984f8
--- /dev/null
+++ b/src/seti/test_seti.conf
@@ -0,0 +1,33 @@
1@INLINE@ ../../contrib/conf/gnunet/no_forcestart.conf
2
3[PATHS]
4GNUNET_TEST_HOME = $GNUNET_TMP/test-gnunet-set/
5
6[set]
7START_ON_DEMAND = YES
8#PREFIX = valgrind --leak-check=full
9#PREFIX = gdbserver :1234
10OPTIONS = -L INFO
11
12[transport]
13PLUGINS = unix
14OPTIONS = -LERROR
15
16[nat]
17RETURN_LOCAL_ADDRESSES = YES
18DISABLEV6 = YES
19USE_LOCALADDR = YES
20
21[peerinfo]
22NO_IO = YES
23
24[nat]
25# Use addresses from the local network interfaces (inluding loopback, but also others)
26USE_LOCALADDR = YES
27
28# Disable IPv6 support
29DISABLEV6 = NO
30
31# Do we use addresses from localhost address ranges? (::1, 127.0.0.0/8)
32RETURN_LOCAL_ADDRESSES = YES
33
diff --git a/src/seti/test_seti_api.c b/src/seti/test_seti_api.c
new file mode 100644
index 000000000..42dedb846
--- /dev/null
+++ b/src/seti/test_seti_api.c
@@ -0,0 +1,393 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2014 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file set/test_set_intersection_result_full.c
23 * @brief testcase for full result mode of the intersection set operation
24 * @author Christian Fuchs
25 * @author Christian Grothoff
26 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
29#include "gnunet_testing_lib.h"
30#include "gnunet_set_service.h"
31
32
33static int ret;
34
35static struct GNUNET_PeerIdentity local_id;
36
37static struct GNUNET_HashCode app_id;
38
39static struct GNUNET_SET_Handle *set1;
40
41static struct GNUNET_SET_Handle *set2;
42
43static struct GNUNET_SET_ListenHandle *listen_handle;
44
45static const struct GNUNET_CONFIGURATION_Handle *config;
46
47static int iter_count;
48
49static struct GNUNET_SCHEDULER_Task *tt;
50
51static struct GNUNET_SET_OperationHandle *oh1;
52
53static struct GNUNET_SET_OperationHandle *oh2;
54
55
56static void
57result_cb_set1 (void *cls,
58 const struct GNUNET_SET_Element *element,
59 uint64_t current_size,
60 enum GNUNET_SET_Status status)
61{
62 static int count;
63
64 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
65 "Processing result set 1 (%d)\n",
66 status);
67 switch (status)
68 {
69 case GNUNET_SET_STATUS_OK:
70 count++;
71 break;
72
73 case GNUNET_SET_STATUS_FAILURE:
74 oh1 = NULL;
75 ret = 1;
76 break;
77
78 case GNUNET_SET_STATUS_DONE:
79 oh1 = NULL;
80 GNUNET_assert (1 == count);
81 GNUNET_SET_destroy (set1);
82 set1 = NULL;
83 if (NULL == set2)
84 GNUNET_SCHEDULER_shutdown ();
85 break;
86
87 default:
88 GNUNET_assert (0);
89 }
90}
91
92
93static void
94result_cb_set2 (void *cls,
95 const struct GNUNET_SET_Element *element,
96 uint64_t current_size,
97 enum GNUNET_SET_Status status)
98{
99 static int count;
100
101 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
102 "Processing result set 2 (%d)\n",
103 status);
104 switch (status)
105 {
106 case GNUNET_SET_STATUS_OK:
107 count++;
108 break;
109
110 case GNUNET_SET_STATUS_FAILURE:
111 oh2 = NULL;
112 ret = 1;
113 break;
114
115 case GNUNET_SET_STATUS_DONE:
116 oh2 = NULL;
117 GNUNET_assert (1 == count);
118 GNUNET_SET_destroy (set2);
119 set2 = NULL;
120 if (NULL == set1)
121 GNUNET_SCHEDULER_shutdown ();
122 break;
123
124 default:
125 GNUNET_assert (0);
126 }
127}
128
129
130static void
131listen_cb (void *cls,
132 const struct GNUNET_PeerIdentity *other_peer,
133 const struct GNUNET_MessageHeader *context_msg,
134 struct GNUNET_SET_Request *request)
135{
136 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
137 "starting intersection by accepting and committing\n");
138 GNUNET_assert (NULL != context_msg);
139 GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_DUMMY);
140 oh2 = GNUNET_SET_accept (request,
141 GNUNET_SET_RESULT_FULL,
142 (struct GNUNET_SET_Option[]) { 0 },
143 &result_cb_set2,
144 NULL);
145 GNUNET_SET_commit (oh2,
146 set2);
147}
148
149
150/**
151 * Start the set operation.
152 *
153 * @param cls closure, unused
154 */
155static void
156start (void *cls)
157{
158 struct GNUNET_MessageHeader context_msg;
159
160 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
161 "starting listener\n");
162 context_msg.size = htons (sizeof context_msg);
163 context_msg.type = htons (GNUNET_MESSAGE_TYPE_DUMMY);
164 listen_handle = GNUNET_SET_listen (config,
165 GNUNET_SET_OPERATION_INTERSECTION,
166 &app_id,
167 &listen_cb,
168 NULL);
169 oh1 = GNUNET_SET_prepare (&local_id,
170 &app_id,
171 &context_msg,
172 GNUNET_SET_RESULT_FULL,
173 (struct GNUNET_SET_Option[]) { 0 },
174 &result_cb_set1,
175 NULL);
176 GNUNET_SET_commit (oh1,
177 set1);
178}
179
180
181/**
182 * Initialize the second set, continue
183 *
184 * @param cls closure, unused
185 */
186static void
187init_set2 (void *cls)
188{
189 struct GNUNET_SET_Element element;
190
191 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
192 "initializing set 2\n");
193 element.element_type = 0;
194 element.data = "hello";
195 element.size = strlen (element.data);
196 GNUNET_SET_add_element (set2,
197 &element,
198 NULL,
199 NULL);
200 element.data = "quux";
201 element.size = strlen (element.data);
202 GNUNET_SET_add_element (set2,
203 &element,
204 NULL,
205 NULL);
206 element.data = "baz";
207 element.size = strlen (element.data);
208 GNUNET_SET_add_element (set2,
209 &element,
210 &start,
211 NULL);
212}
213
214
215/**
216 * Initialize the first set, continue.
217 */
218static void
219init_set1 (void)
220{
221 struct GNUNET_SET_Element element;
222
223 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
224 "initializing set 1\n");
225 element.element_type = 0;
226 element.data = "hello";
227 element.size = strlen (element.data);
228 GNUNET_SET_add_element (set1,
229 &element,
230 NULL,
231 NULL);
232 element.data = "bar";
233 element.size = strlen (element.data);
234 GNUNET_SET_add_element (set1,
235 &element,
236 &init_set2,
237 NULL);
238}
239
240
241static int
242iter_cb (void *cls,
243 const struct GNUNET_SET_Element *element)
244{
245 if (NULL == element)
246 {
247 GNUNET_assert (iter_count == 3);
248 GNUNET_SET_destroy (cls);
249 return GNUNET_YES;
250 }
251 iter_count++;
252 return GNUNET_YES;
253}
254
255
256static void
257test_iter ()
258{
259 struct GNUNET_SET_Element element;
260 struct GNUNET_SET_Handle *iter_set;
261
262 iter_set = GNUNET_SET_create (config,
263 GNUNET_SET_OPERATION_INTERSECTION);
264 element.element_type = 0;
265 element.data = "hello";
266 element.size = strlen (element.data);
267 GNUNET_SET_add_element (iter_set,
268 &element,
269 NULL,
270 NULL);
271 element.data = "bar";
272 element.size = strlen (element.data);
273 GNUNET_SET_add_element (iter_set,
274 &element,
275 NULL,
276 NULL);
277 element.data = "quux";
278 element.size = strlen (element.data);
279 GNUNET_SET_add_element (iter_set,
280 &element,
281 NULL,
282 NULL);
283 GNUNET_SET_iterate (iter_set,
284 &iter_cb,
285 iter_set);
286}
287
288
289/**
290 * Function run on shutdown.
291 *
292 * @param cls closure
293 */
294static void
295do_shutdown (void *cls)
296{
297 if (NULL != tt)
298 {
299 GNUNET_SCHEDULER_cancel (tt);
300 tt = NULL;
301 }
302 if (NULL != oh1)
303 {
304 GNUNET_SET_operation_cancel (oh1);
305 oh1 = NULL;
306 }
307 if (NULL != oh2)
308 {
309 GNUNET_SET_operation_cancel (oh2);
310 oh2 = NULL;
311 }
312 if (NULL != set1)
313 {
314 GNUNET_SET_destroy (set1);
315 set1 = NULL;
316 }
317 if (NULL != set2)
318 {
319 GNUNET_SET_destroy (set2);
320 set2 = NULL;
321 }
322 if (NULL != listen_handle)
323 {
324 GNUNET_SET_listen_cancel (listen_handle);
325 listen_handle = NULL;
326 }
327}
328
329
330/**
331 * Function run on timeout.
332 *
333 * @param cls closure
334 */
335static void
336timeout_fail (void *cls)
337{
338 tt = NULL;
339 GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
340 "Testcase failed with timeout\n");
341 GNUNET_SCHEDULER_shutdown ();
342 ret = 1;
343}
344
345
346/**
347 * Signature of the 'main' function for a (single-peer) testcase that
348 * is run using 'GNUNET_TESTING_peer_run'.
349 *
350 * @param cls closure
351 * @param cfg configuration of the peer that was started
352 * @param peer identity of the peer that was created
353 */
354static void
355run (void *cls,
356 const struct GNUNET_CONFIGURATION_Handle *cfg,
357 struct GNUNET_TESTING_Peer *peer)
358{
359 config = cfg;
360 GNUNET_TESTING_peer_get_identity (peer,
361 &local_id);
362 if (0)
363 test_iter ();
364
365 tt = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (
366 GNUNET_TIME_UNIT_SECONDS, 5),
367 &timeout_fail,
368 NULL);
369 GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
370 NULL);
371
372 set1 = GNUNET_SET_create (cfg,
373 GNUNET_SET_OPERATION_INTERSECTION);
374 set2 = GNUNET_SET_create (cfg,
375 GNUNET_SET_OPERATION_INTERSECTION);
376 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK,
377 &app_id);
378
379 /* test the real set reconciliation */
380 init_set1 ();
381}
382
383
384int
385main (int argc,
386 char **argv)
387{
388 if (0 != GNUNET_TESTING_peer_run ("test_set_intersection_result_full",
389 "test_set.conf",
390 &run, NULL))
391 return 1;
392 return ret;
393}