aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
Diffstat (limited to 'src/set')
-rw-r--r--src/set/Makefile.am34
-rw-r--r--src/set/gnunet-service-set.c2
-rw-r--r--src/set/gnunet-service-set.h1
-rw-r--r--src/set/gnunet-service-set_union.c8
-rw-r--r--src/set/gnunet-set-bug.c142
-rw-r--r--src/set/gnunet-set-ibf.c238
-rw-r--r--src/set/gnunet-set.c10
-rw-r--r--src/set/mq.c793
-rw-r--r--src/set/mq.h362
-rw-r--r--src/set/set_api.c38
-rw-r--r--src/set/test_mq.c115
-rw-r--r--src/set/test_mq_client.c181
-rw-r--r--src/set/test_set_api.c48
13 files changed, 291 insertions, 1681 deletions
diff --git a/src/set/Makefile.am b/src/set/Makefile.am
index a609840b1..13278b05c 100644
--- a/src/set/Makefile.am
+++ b/src/set/Makefile.am
@@ -16,7 +16,7 @@ if USE_COVERAGE
16endif 16endif
17 17
18bin_PROGRAMS = \ 18bin_PROGRAMS = \
19 gnunet-set gnunet-set-bug 19 gnunet-set
20 20
21libexec_PROGRAMS = \ 21libexec_PROGRAMS = \
22 gnunet-service-set 22 gnunet-service-set
@@ -35,17 +35,9 @@ gnunet_set_LDADD = \
35gnunet_set_DEPENDENCIES = \ 35gnunet_set_DEPENDENCIES = \
36 libgnunetset.la 36 libgnunetset.la
37 37
38gnunet_set_bug_SOURCES = \
39 gnunet-set-bug.c
40gnunet_set_bug_LDADD = \
41 $(top_builddir)/src/util/libgnunetutil.la \
42 $(top_builddir)/src/stream/libgnunetstream.la \
43 $(GN_LIBINTL)
44
45gnunet_service_set_SOURCES = \ 38gnunet_service_set_SOURCES = \
46 gnunet-service-set.c \ 39 gnunet-service-set.c \
47 gnunet-service-set_union.c \ 40 gnunet-service-set_union.c \
48 mq.c \
49 ibf.c \ 41 ibf.c \
50 strata_estimator.c 42 strata_estimator.c
51gnunet_service_set_LDADD = \ 43gnunet_service_set_LDADD = \
@@ -54,13 +46,9 @@ gnunet_service_set_LDADD = \
54 $(top_builddir)/src/stream/libgnunetstream.la \ 46 $(top_builddir)/src/stream/libgnunetstream.la \
55 $(top_builddir)/src/mesh/libgnunetmesh.la \ 47 $(top_builddir)/src/mesh/libgnunetmesh.la \
56 $(GN_LIBINTL) 48 $(GN_LIBINTL)
57# hack for mq.c, see automake Objects ‘created with both libtool and without’
58# remove once GNUNET_MQ is in util/
59gnunet_service_set_CFLAGS = $(AM_CFLAGS)
60 49
61libgnunetset_la_SOURCES = \ 50libgnunetset_la_SOURCES = \
62 set_api.c \ 51 set_api.c
63 mq.c
64libgnunetset_la_LIBADD = \ 52libgnunetset_la_LIBADD = \
65 $(top_builddir)/src/util/libgnunetutil.la \ 53 $(top_builddir)/src/util/libgnunetutil.la \
66 $(top_builddir)/src/stream/libgnunetstream.la \ 54 $(top_builddir)/src/stream/libgnunetstream.la \
@@ -84,24 +72,6 @@ test_set_api_LDADD = \
84test_set_api_DEPENDENCIES = \ 72test_set_api_DEPENDENCIES = \
85 libgnunetset.la 73 libgnunetset.la
86 74
87
88test_mq_SOURCES = \
89 test_mq.c \
90 mq.c
91test_mq_LDADD = \
92 $(top_builddir)/src/util/libgnunetutil.la \
93 $(top_builddir)/src/stream/libgnunetstream.la
94test_mq_CFLAGS = $(AM_CFLAGS)
95
96
97test_mq_client_SOURCES = \
98 test_mq_client.c \
99 mq.c
100test_mq_client_LDADD = \
101 $(top_builddir)/src/util/libgnunetutil.la \
102 $(top_builddir)/src/stream/libgnunetstream.la
103test_mq_client_CFLAGS = $(AM_CFLAGS)
104
105EXTRA_DIST = \ 75EXTRA_DIST = \
106 test_set.conf 76 test_set.conf
107 77
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 9ac0fbee6..2aea50365 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -632,7 +632,7 @@ stream_listen_cb (void *cls,
632 incoming = GNUNET_new (struct Incoming); 632 incoming = GNUNET_new (struct Incoming);
633 incoming->peer = *initiator; 633 incoming->peer = *initiator;
634 incoming->socket = socket; 634 incoming->socket = socket;
635 incoming->mq = GNUNET_MQ_queue_for_stream_socket (incoming->socket, handlers, incoming); 635 incoming->mq = GNUNET_STREAM_mq_create (incoming->socket, handlers, NULL, incoming);
636 /* FIXME: timeout for peers that only connect but don't send anything */ 636 /* FIXME: timeout for peers that only connect but don't send anything */
637 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); 637 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
638 return GNUNET_OK; 638 return GNUNET_OK;
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index cc28e9701..bea77416e 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -36,7 +36,6 @@
36#include "gnunet_stream_lib.h" 36#include "gnunet_stream_lib.h"
37#include "gnunet_set_service.h" 37#include "gnunet_set_service.h"
38#include "set.h" 38#include "set.h"
39#include "mq.h"
40 39
41 40
42/* FIXME: cfuchs */ 41/* FIXME: cfuchs */
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 05b125047..ae7f47266 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -1053,8 +1053,8 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1053 } 1053 }
1054 element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); 1054 element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1055 ee = GNUNET_malloc (sizeof *eo + element_size); 1055 ee = GNUNET_malloc (sizeof *eo + element_size);
1056 memcpy (&ee[1], &mh[1], element_size);
1056 ee->element.data = &ee[1]; 1057 ee->element.data = &ee[1];
1057 memcpy (ee->element.data, &mh[1], element_size);
1058 ee->remote = GNUNET_YES; 1058 ee->remote = GNUNET_YES;
1059 1059
1060 insert_element (eo, ee); 1060 insert_element (eo, ee);
@@ -1183,8 +1183,8 @@ stream_open_cb (void *cls,
1183 1183
1184 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n"); 1184 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n");
1185 1185
1186 eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket, 1186
1187 union_handlers, eo); 1187 eo->mq = GNUNET_STREAM_mq_create (eo->socket, union_handlers, NULL, eo);
1188 /* we started the operation, thus we have to send the operation request */ 1188 /* we started the operation, thus we have to send the operation request */
1189 send_operation_request (eo); 1189 send_operation_request (eo);
1190 eo->phase = PHASE_EXPECT_SE; 1190 eo->phase = PHASE_EXPECT_SE;
@@ -1312,9 +1312,9 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set)
1312 element_size = ntohs (m->header.size) - sizeof *m; 1312 element_size = ntohs (m->header.size) - sizeof *m;
1313 ee = GNUNET_malloc (element_size + sizeof *ee); 1313 ee = GNUNET_malloc (element_size + sizeof *ee);
1314 ee->element.size = element_size; 1314 ee->element.size = element_size;
1315 memcpy (&ee[1], &m[1], element_size);
1315 ee->element.data = &ee[1]; 1316 ee->element.data = &ee[1];
1316 ee->generation_added = set->state.u->current_generation; 1317 ee->generation_added = set->state.u->current_generation;
1317 memcpy (ee->element.data, &m[1], element_size);
1318 GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash); 1318 GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1319 ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash); 1319 ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
1320 if (NULL != ee_dup) 1320 if (NULL != ee_dup)
diff --git a/src/set/gnunet-set-bug.c b/src/set/gnunet-set-bug.c
deleted file mode 100644
index 112def7d7..000000000
--- a/src/set/gnunet-set-bug.c
+++ /dev/null
@@ -1,142 +0,0 @@
1/*
2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19 */
20
21/**
22 * @file set/gnunet-set.c
23 * @brief profiling tool for the set service
24 * @author Florian Dold
25 */
26#include "platform.h"
27#include "gnunet_common.h"
28#include "gnunet_applications.h"
29#include "gnunet_util_lib.h"
30#include "gnunet_stream_lib.h"
31
32
33static struct GNUNET_PeerIdentity local_id;
34
35static struct GNUNET_CONFIGURATION_Handle *cfg;
36
37static struct GNUNET_STREAM_ListenSocket *listen_socket;
38
39static struct GNUNET_STREAM_Socket *s1;
40
41static struct GNUNET_STREAM_Socket *s2;
42
43static void
44do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
45{
46 if (NULL != s2)
47 GNUNET_STREAM_close (s2);
48 GNUNET_STREAM_close (s1);
49 GNUNET_STREAM_listen_close (listen_socket);
50 GNUNET_CONFIGURATION_destroy (cfg);
51}
52
53static size_t
54stream_data_processor (void *cls,
55 enum GNUNET_STREAM_Status status,
56 const void *data,
57 size_t size)
58{
59 return size;
60}
61
62static int
63listen_cb (void *cls,
64 struct GNUNET_STREAM_Socket *socket,
65 const struct
66 GNUNET_PeerIdentity *initiator)
67{
68 if (NULL == (s2 = socket))
69 {
70 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "socket listen failed\n");
71 return GNUNET_NO;
72 }
73
74 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "socket listen succesful\n");
75 GNUNET_assert (NULL != socket);
76 GNUNET_assert (0 == memcmp (initiator, &local_id, sizeof (*initiator)));
77 GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
78 &stream_data_processor, NULL);
79 return GNUNET_YES;
80}
81
82static void
83open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
84{
85
86}
87
88static void
89stream_connect (void)
90{
91 s1 = GNUNET_STREAM_open (cfg,
92 &local_id,
93 GNUNET_APPLICATION_TYPE_SET,
94 &open_cb,
95 NULL,
96 GNUNET_STREAM_OPTION_END);
97}
98
99/**
100 * Main function that will be run.
101 *
102 * @param cls closure
103 * @param args remaining command-line arguments
104 * @param cfgfile name of the configuration file used (for saving, can be NULL!)
105 * @param cfg configuration
106 */
107static void
108run (void *cls, char *const *args,
109 const char *cfgfile,
110 const struct GNUNET_CONFIGURATION_Handle *cfg2)
111{
112
113 cfg = GNUNET_CONFIGURATION_dup (cfg2);
114 GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
115
116 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "I am Peer %s\n", GNUNET_h2s (&local_id.hashPubKey));
117
118 listen_socket = GNUNET_STREAM_listen (cfg,
119 GNUNET_APPLICATION_TYPE_SET,
120 &listen_cb,
121 NULL,
122 GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
123 &stream_connect,
124 GNUNET_STREAM_OPTION_END);
125 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
126 &do_shutdown, NULL);
127}
128
129
130
131int
132main (int argc, char **argv)
133{
134 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
135 GNUNET_GETOPT_OPTION_END
136 };
137 GNUNET_PROGRAM_run (argc, argv, "gnunet-set",
138 "help",
139 options, &run, NULL);
140 return 0;
141}
142
diff --git a/src/set/gnunet-set-ibf.c b/src/set/gnunet-set-ibf.c
new file mode 100644
index 000000000..d431795f1
--- /dev/null
+++ b/src/set/gnunet-set-ibf.c
@@ -0,0 +1,238 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file consensus/gnunet-consensus-ibf.c
23 * @brief tool for reconciling data with invertible bloom filters
24 * @author Florian Dold
25 */
26
27
28#include "platform.h"
29#include "gnunet_common.h"
30#include "gnunet_container_lib.h"
31#include "gnunet_util_lib.h"
32
33#include "ibf.h"
34
35static unsigned int asize = 10;
36static unsigned int bsize = 10;
37static unsigned int csize = 10;
38static unsigned int hash_num = 3;
39static unsigned int ibf_size = 80;
40
41/* FIXME: add parameter for this */
42static enum GNUNET_CRYPTO_Quality random_quality = GNUNET_CRYPTO_QUALITY_WEAK;
43
44static struct GNUNET_CONTAINER_MultiHashMap *set_a;
45static struct GNUNET_CONTAINER_MultiHashMap *set_b;
46/* common elements in a and b */
47static struct GNUNET_CONTAINER_MultiHashMap *set_c;
48
49static struct GNUNET_CONTAINER_MultiHashMap *key_to_hashcode;
50
51static struct InvertibleBloomFilter *ibf_a;
52static struct InvertibleBloomFilter *ibf_b;
53
54
55static void
56register_hashcode (struct GNUNET_HashCode *hash)
57{
58 struct GNUNET_HashCode replicated;
59 struct IBF_Key key;
60 key = ibf_key_from_hashcode (hash);
61 ibf_hashcode_from_key (key, &replicated);
62 GNUNET_CONTAINER_multihashmap_put (key_to_hashcode, &replicated, GNUNET_memdup (hash, sizeof *hash),
63 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
64}
65
66static void
67iter_hashcodes (struct IBF_Key key, GNUNET_CONTAINER_HashMapIterator iter, void *cls)
68{
69 struct GNUNET_HashCode replicated;
70 ibf_hashcode_from_key (key, &replicated);
71 GNUNET_CONTAINER_multihashmap_get_multiple (key_to_hashcode, &replicated, iter, cls);
72}
73
74
75static int
76insert_iterator (void *cls,
77 const struct GNUNET_HashCode *key,
78 void *value)
79{
80 struct InvertibleBloomFilter *ibf = (struct InvertibleBloomFilter *) cls;
81 ibf_insert (ibf, ibf_key_from_hashcode (key));
82 return GNUNET_YES;
83}
84
85
86static int
87remove_iterator (void *cls,
88 const struct GNUNET_HashCode *key,
89 void *value)
90{
91 struct GNUNET_CONTAINER_MultiHashMap *hashmap = cls;
92 /* if remove fails, there just was a collision with another key */
93 (void) GNUNET_CONTAINER_multihashmap_remove (hashmap, value, NULL);
94 return GNUNET_YES;
95}
96
97
98static void
99run (void *cls, char *const *args, const char *cfgfile,
100 const struct GNUNET_CONFIGURATION_Handle *cfg)
101{
102 struct GNUNET_HashCode id;
103 struct IBF_Key ibf_key;
104 int i;
105 int side;
106 int res;
107 struct GNUNET_TIME_Absolute start_time;
108 struct GNUNET_TIME_Relative delta_time;
109
110 set_a = GNUNET_CONTAINER_multihashmap_create (((asize == 0) ? 1 : (asize + csize)),
111 GNUNET_NO);
112 set_b = GNUNET_CONTAINER_multihashmap_create (((bsize == 0) ? 1 : (bsize + csize)),
113 GNUNET_NO);
114 set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize),
115 GNUNET_NO);
116
117 key_to_hashcode = GNUNET_CONTAINER_multihashmap_create (((asize+bsize+csize == 0) ? 1 : (asize+bsize+csize)),
118 GNUNET_NO);
119
120 printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n",
121 hash_num, ibf_size, asize, bsize, csize);
122
123 i = 0;
124 while (i < asize)
125 {
126 GNUNET_CRYPTO_hash_create_random (random_quality, &id);
127 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
128 continue;
129 GNUNET_CONTAINER_multihashmap_put (
130 set_a, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
131 register_hashcode (&id);
132 i++;
133 }
134 i = 0;
135 while (i < bsize)
136 {
137 GNUNET_CRYPTO_hash_create_random (random_quality, &id);
138 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
139 continue;
140 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
141 continue;
142 GNUNET_CONTAINER_multihashmap_put (
143 set_b, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
144 register_hashcode (&id);
145 i++;
146 }
147 i = 0;
148 while (i < csize)
149 {
150 GNUNET_CRYPTO_hash_create_random (random_quality, &id);
151 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
152 continue;
153 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
154 continue;
155 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_c, &id))
156 continue;
157 GNUNET_CONTAINER_multihashmap_put (
158 set_c, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
159 register_hashcode (&id);
160 i++;
161 }
162
163 ibf_a = ibf_create (ibf_size, hash_num);
164 ibf_b = ibf_create (ibf_size, hash_num);
165
166 printf ("generated sets\n");
167
168 start_time = GNUNET_TIME_absolute_get ();
169
170 GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a);
171 GNUNET_CONTAINER_multihashmap_iterate (set_b, &insert_iterator, ibf_b);
172 GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_a);
173 GNUNET_CONTAINER_multihashmap_iterate (set_c, &insert_iterator, ibf_b);
174
175 delta_time = GNUNET_TIME_absolute_get_duration (start_time);
176
177 printf ("encoded in: %s\n", GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
178
179 ibf_subtract (ibf_a, ibf_b);
180
181
182 start_time = GNUNET_TIME_absolute_get ();
183
184 for (;;)
185 {
186 res = ibf_decode (ibf_a, &side, &ibf_key);
187 if (GNUNET_SYSERR == res)
188 {
189 printf ("decode failed\n");
190 return;
191 }
192 if (GNUNET_NO == res)
193 {
194 if ((0 == GNUNET_CONTAINER_multihashmap_size (set_b)) &&
195 (0 == GNUNET_CONTAINER_multihashmap_size (set_a)))
196 {
197 delta_time = GNUNET_TIME_absolute_get_duration (start_time);
198 printf ("decoded successfully in: %s\n", GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO));
199 }
200 else
201 printf ("decode missed elements\n");
202 return;
203 }
204
205 if (side == 1)
206 iter_hashcodes (ibf_key, remove_iterator, set_a);
207 if (side == -1)
208 iter_hashcodes (ibf_key, remove_iterator, set_b);
209 }
210}
211
212int
213main (int argc, char **argv)
214{
215 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
216 {'A', "asize", NULL,
217 gettext_noop ("number of element in set A-B"), 1,
218 &GNUNET_GETOPT_set_uint, &asize},
219 {'B', "bsize", NULL,
220 gettext_noop ("number of element in set B-A"), 1,
221 &GNUNET_GETOPT_set_uint, &bsize},
222 {'C', "csize", NULL,
223 gettext_noop ("number of common elements in A and B"), 1,
224 &GNUNET_GETOPT_set_uint, &csize},
225 {'k', "hash-num", NULL,
226 gettext_noop ("hash num"), 1,
227 &GNUNET_GETOPT_set_uint, &hash_num},
228 {'s', "ibf-size", NULL,
229 gettext_noop ("ibf size"), 1,
230 &GNUNET_GETOPT_set_uint, &ibf_size},
231 GNUNET_GETOPT_OPTION_END
232 };
233 GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus-ibf",
234 "help",
235 options, &run, NULL, GNUNET_YES);
236 return 0;
237}
238
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c
index d665fce11..5f2d1c976 100644
--- a/src/set/gnunet-set.c
+++ b/src/set/gnunet-set.c
@@ -42,8 +42,8 @@ int num_done;
42 42
43 43
44static void 44static void
45result_cb_set1 (void *cls, struct GNUNET_SET_Element *element, 45result_cb_set1 (void *cls, const struct GNUNET_SET_Element *element,
46 enum GNUNET_SET_Status status) 46 enum GNUNET_SET_Status status)
47{ 47{
48 switch (status) 48 switch (status)
49 { 49 {
@@ -64,7 +64,7 @@ result_cb_set1 (void *cls, struct GNUNET_SET_Element *element,
64 64
65 65
66static void 66static void
67result_cb_set2 (void *cls, struct GNUNET_SET_Element *element, 67result_cb_set2 (void *cls, const struct GNUNET_SET_Element *element,
68 enum GNUNET_SET_Status status) 68 enum GNUNET_SET_Status status)
69{ 69{
70 switch (status) 70 switch (status)
@@ -94,7 +94,7 @@ listen_cb (void *cls,
94 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); 94 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
95 GNUNET_SET_listen_cancel (listen_handle); 95 GNUNET_SET_listen_cancel (listen_handle);
96 96
97 GNUNET_SET_accept (request, set2, GNUNET_TIME_UNIT_FOREVER_REL, 97 GNUNET_SET_accept (request, set2,
98 GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); 98 GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
99} 99}
100 100
@@ -110,7 +110,7 @@ start (void *cls)
110 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, 110 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
111 &app_id, listen_cb, NULL); 111 &app_id, listen_cb, NULL);
112 GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, 112 GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42,
113 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED, 113 GNUNET_SET_RESULT_ADDED,
114 result_cb_set1, NULL); 114 result_cb_set1, NULL);
115} 115}
116 116
diff --git a/src/set/mq.c b/src/set/mq.c
deleted file mode 100644
index 0ced014dd..000000000
--- a/src/set/mq.c
+++ /dev/null
@@ -1,793 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @author Florian Dold
23 * @file set/mq.c
24 * @brief general purpose request queue
25 */
26
27#include "mq.h"
28
29
30#define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__)
31
32/**
33 * Signature of functions implementing the
34 * sending part of a message queue
35 *
36 * @param q the message queue
37 * @param m the message
38 */
39typedef void (*SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m);
40
41
42typedef void (*DestroyImpl) (struct GNUNET_MQ_MessageQueue *q);
43
44
45/**
46 * Collection of the state necessary to read and write gnunet messages
47 * to a stream socket. Should be used as closure for stream_data_processor.
48 */
49struct MessageStreamState
50{
51 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
52 struct MessageQueue *mq;
53 struct GNUNET_STREAM_Socket *socket;
54 struct GNUNET_STREAM_ReadHandle *rh;
55 struct GNUNET_STREAM_WriteHandle *wh;
56};
57
58
59struct ServerClientSocketState
60{
61 struct GNUNET_SERVER_Client *client;
62 struct GNUNET_SERVER_TransmitHandle* th;
63};
64
65
66struct ClientConnectionState
67{
68 struct GNUNET_CLIENT_Connection *connection;
69 struct GNUNET_CLIENT_TransmitHandle *th;
70};
71
72
73struct GNUNET_MQ_MessageQueue
74{
75 /**
76 * Handlers array, or NULL if the queue should not receive messages
77 */
78 const struct GNUNET_MQ_Handler *handlers;
79
80 /**
81 * Closure for the handler callbacks
82 */
83 void *handlers_cls;
84
85 /**
86 * Actual implementation of message sending,
87 * called when a message is added
88 */
89 SendImpl send_impl;
90
91 /**
92 * Implementation-dependent queue destruction function
93 */
94 DestroyImpl destroy_impl;
95
96 /**
97 * Implementation-specific state
98 */
99 void *impl_state;
100
101 /**
102 * Callback will be called when the message queue is empty
103 */
104 GNUNET_MQ_NotifyCallback empty_cb;
105
106 /**
107 * Closure for empty_cb
108 */
109 void *empty_cls;
110
111 /**
112 * Callback will be called when a read error occurs.
113 */
114 GNUNET_MQ_NotifyCallback read_error_cb;
115
116 /**
117 * Closure for read_error_cb
118 */
119 void *read_error_cls;
120
121 /**
122 * Linked list of messages pending to be sent
123 */
124 struct GNUNET_MQ_Message *msg_head;
125
126 /**
127 * Linked list of messages pending to be sent
128 */
129 struct GNUNET_MQ_Message *msg_tail;
130
131 /**
132 * Message that is currently scheduled to be
133 * sent. Not the head of the message queue, as the implementation
134 * needs to know if sending has been already scheduled or not.
135 */
136 struct GNUNET_MQ_Message *current_msg;
137
138 /**
139 * Map of associations, lazily allocated
140 */
141 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
142
143 /**
144 * Next id that should be used for the assoc_map,
145 * initialized lazily to a random value together with
146 * assoc_map
147 */
148 uint32_t assoc_id;
149};
150
151
152struct GNUNET_MQ_Message
153{
154 /**
155 * Messages are stored in a linked list
156 */
157 struct GNUNET_MQ_Message *next;
158
159 /**
160 * Messages are stored in a linked list
161 */
162 struct GNUNET_MQ_Message *prev;
163
164 /**
165 * Actual allocated message header,
166 * usually points to the end of the containing GNUNET_MQ_Message
167 */
168 struct GNUNET_MessageHeader *mh;
169
170 /**
171 * Queue the message is queued in, NULL if message is not queued.
172 */
173 struct GNUNET_MQ_MessageQueue *parent_queue;
174
175 /**
176 * Called after the message was sent irrevokably
177 */
178 GNUNET_MQ_NotifyCallback sent_cb;
179
180 /**
181 * Closure for send_cb
182 */
183 void *sent_cls;
184};
185
186
187/**
188 * Call the right callback for a message received
189 * by a queue
190 */
191static void
192dispatch_message (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh)
193{
194 const struct GNUNET_MQ_Handler *handler;
195 int handled = GNUNET_NO;
196
197 handler = mq->handlers;
198 if (NULL == handler)
199 return;
200 for (; NULL != handler->cb; handler++)
201 {
202 if (handler->type == ntohs (mh->type))
203 {
204 handler->cb (mq->handlers_cls, mh);
205 handled = GNUNET_YES;
206 }
207 }
208
209 if (GNUNET_NO == handled)
210 LOG (GNUNET_ERROR_TYPE_WARNING, "no handler for message of type %d\n", ntohs (mh->type));
211}
212
213
214void
215GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm)
216{
217 GNUNET_assert (NULL == mqm->parent_queue);
218 GNUNET_free (mqm);
219}
220
221
222/**
223 * Send a message with the give message queue.
224 * May only be called once per message.
225 *
226 * @param mq message queue
227 * @param mqm the message to send.
228 */
229void
230GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
231{
232 GNUNET_assert (NULL != mq);
233 mq->send_impl (mq, mqm);
234}
235
236
237struct GNUNET_MQ_Message *
238GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
239{
240 struct GNUNET_MQ_Message *mqm;
241
242 mqm = GNUNET_malloc (sizeof *mqm + size);
243 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
244 mqm->mh->size = htons (size);
245 mqm->mh->type = htons (type);
246 if (NULL != mhp)
247 *mhp = mqm->mh;
248 return mqm;
249}
250
251
252int
253GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
254 const void *data, uint16_t len)
255{
256 size_t new_size;
257 size_t old_size;
258
259 GNUNET_assert (NULL != mqmp);
260 /* there's no data to append => do nothing */
261 if (NULL == data)
262 return GNUNET_OK;
263 old_size = ntohs ((*mqmp)->mh->size);
264 /* message too large to concatenate? */
265 if (((uint16_t) (old_size + len)) < len)
266 return GNUNET_SYSERR;
267 new_size = old_size + len;
268 *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
269 (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1];
270 memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size);
271 (*mqmp)->mh->size = htons (new_size);
272 return GNUNET_OK;
273}
274
275
276/**
277 * Functions of this signature are called whenever writing operations
278 * on a stream are executed
279 *
280 * @param cls the closure from GNUNET_STREAM_write
281 * @param status the status of the stream at the time this function is called;
282 * GNUNET_STREAM_OK if writing to stream was completed successfully;
283 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
284 * (this doesn't mean that the data is never sent, the receiver may
285 * have read the data but its ACKs may have been lost);
286 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
287 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
288 * be processed.
289 * @param size the number of bytes written
290 */
291static void
292stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
293{
294 struct GNUNET_MQ_MessageQueue *mq = cls;
295 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
296 struct GNUNET_MQ_Message *mqm;
297
298 GNUNET_assert (GNUNET_STREAM_OK == status);
299
300 /* call cb for message we finished sending */
301 mqm = mq->current_msg;
302 GNUNET_assert (NULL != mq->current_msg);
303 if (NULL != mqm->sent_cb)
304 mqm->sent_cb (mqm->sent_cls);
305 GNUNET_free (mqm);
306
307 mss->wh = NULL;
308
309 mqm = mq->msg_head;
310 mq->current_msg = mqm;
311 if (NULL == mqm)
312 {
313 if (NULL != mq->empty_cb)
314 mq->empty_cb (mq->empty_cls);
315 return;
316 }
317 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
318 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
319 GNUNET_TIME_UNIT_FOREVER_REL,
320 stream_write_queued, mq);
321 GNUNET_assert (NULL != mss->wh);
322}
323
324
325static void
326stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq,
327 struct GNUNET_MQ_Message *mqm)
328{
329 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
330 if (NULL != mq->current_msg)
331 {
332 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
333 return;
334 }
335 mq->current_msg = mqm;
336 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
337 GNUNET_TIME_UNIT_FOREVER_REL,
338 stream_write_queued, mq);
339}
340
341
342/**
343 * Functions with this signature are called whenever a
344 * complete message is received by the tokenizer.
345 *
346 * Do not call GNUNET_SERVER_mst_destroy in callback
347 *
348 * @param cls closure
349 * @param client identification of the client
350 * @param message the actual message
351 *
352 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
353 */
354static int
355stream_mst_callback (void *cls, void *client,
356 const struct GNUNET_MessageHeader *message)
357{
358 struct GNUNET_MQ_MessageQueue *mq = cls;
359
360 GNUNET_assert (NULL != message);
361 dispatch_message (mq, message);
362 return GNUNET_OK;
363}
364
365
366/**
367 * Functions of this signature are called whenever data is available from the
368 * stream.
369 *
370 * @param cls the closure from GNUNET_STREAM_read
371 * @param status the status of the stream at the time this function is called
372 * @param data traffic from the other side
373 * @param size the number of bytes available in data read; will be 0 on timeout
374 * @return number of bytes of processed from 'data' (any data remaining should be
375 * given to the next time the read processor is called).
376 */
377static size_t
378stream_data_processor (void *cls,
379 enum GNUNET_STREAM_Status status,
380 const void *data,
381 size_t size)
382{
383 struct GNUNET_MQ_MessageQueue *mq = cls;
384 struct MessageStreamState *mss;
385 int ret;
386
387 mss = (struct MessageStreamState *) mq->impl_state;
388 GNUNET_assert (GNUNET_STREAM_OK == status);
389 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
390 GNUNET_assert (GNUNET_OK == ret);
391 /* we always read all data */
392 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
393 stream_data_processor, mq);
394 return size;
395}
396
397
398static void
399stream_socket_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
400{
401 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
402
403 if (NULL != mss->rh)
404 {
405 GNUNET_STREAM_read_cancel (mss->rh);
406 mss->rh = NULL;
407 }
408
409 if (NULL != mss->wh)
410 {
411 GNUNET_STREAM_write_cancel (mss->wh);
412 mss->wh = NULL;
413 }
414
415 if (NULL != mss->mst)
416 {
417 GNUNET_SERVER_mst_destroy (mss->mst);
418 mss->mst = NULL;
419 }
420
421 GNUNET_free (mss);
422}
423
424
425
426
427struct GNUNET_MQ_MessageQueue *
428GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
429 const struct GNUNET_MQ_Handler *handlers,
430 void *cls)
431{
432 struct GNUNET_MQ_MessageQueue *mq;
433 struct MessageStreamState *mss;
434
435 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
436 mss = GNUNET_new (struct MessageStreamState);
437 mss->socket = socket;
438 mq->impl_state = mss;
439 mq->send_impl = stream_socket_send_impl;
440 mq->destroy_impl = &stream_socket_destroy_impl;
441 mq->handlers = handlers;
442 mq->handlers_cls = cls;
443 if (NULL != handlers)
444 {
445 mss->mst = GNUNET_SERVER_mst_create (stream_mst_callback, mq);
446 mss->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
447 stream_data_processor, mq);
448 }
449 return mq;
450}
451
452
453/*** Transmit a queued message to the session's client.
454 *
455 * @param cls consensus session
456 * @param size number of bytes available in buf
457 * @param buf where the callee should write the message
458 * @return number of bytes written to buf
459 */
460static size_t
461transmit_queued (void *cls, size_t size,
462 void *buf)
463{
464 struct GNUNET_MQ_MessageQueue *mq = cls;
465 struct GNUNET_MQ_Message *mqm = mq->current_msg;
466 struct ServerClientSocketState *state = mq->impl_state;
467 size_t msg_size;
468
469 GNUNET_assert (NULL != buf);
470
471 if (NULL != mqm->sent_cb)
472 {
473 mqm->sent_cb (mqm->sent_cls);
474 }
475
476 mq->current_msg = NULL;
477 GNUNET_assert (NULL != mqm);
478 msg_size = ntohs (mqm->mh->size);
479 GNUNET_assert (size >= msg_size);
480 memcpy (buf, mqm->mh, msg_size);
481 GNUNET_free (mqm);
482 state->th = NULL;
483
484 if (NULL != mq->msg_head)
485 {
486 mq->current_msg = mq->msg_head;
487 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
488 state->th =
489 GNUNET_SERVER_notify_transmit_ready (state->client, msg_size,
490 GNUNET_TIME_UNIT_FOREVER_REL,
491 &transmit_queued, mq);
492 }
493 else if (NULL != mq->empty_cb)
494 mq->empty_cb (mq->empty_cls);
495 return msg_size;
496}
497
498
499
500static void
501server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
502{
503 struct ServerClientSocketState *state;
504
505 GNUNET_assert (NULL != mq);
506 state = mq->impl_state;
507 GNUNET_assert (NULL != state);
508 GNUNET_SERVER_client_drop (state->client);
509 GNUNET_free (state);
510}
511
512static void
513server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm)
514{
515 struct ServerClientSocketState *state;
516 int msize;
517
518 GNUNET_assert (NULL != mq);
519 state = mq->impl_state;
520 GNUNET_assert (NULL != state);
521
522 if (NULL != state->th)
523 {
524 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
525 return;
526 }
527 GNUNET_assert (NULL == mq->msg_head);
528 GNUNET_assert (NULL == mq->current_msg);
529 msize = ntohs (mqm->mh->size);
530 mq->current_msg = mqm;
531 state->th =
532 GNUNET_SERVER_notify_transmit_ready (state->client, msize,
533 GNUNET_TIME_UNIT_FOREVER_REL,
534 &transmit_queued, mq);
535}
536
537
538struct GNUNET_MQ_MessageQueue *
539GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
540{
541 struct GNUNET_MQ_MessageQueue *mq;
542 struct ServerClientSocketState *scss;
543
544 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
545 scss = GNUNET_new (struct ServerClientSocketState);
546 mq->impl_state = scss;
547 scss->client = client;
548 GNUNET_SERVER_client_keep (client);
549 mq->send_impl = server_client_send_impl;
550 mq->destroy_impl = server_client_destroy_impl;
551 return mq;
552}
553
554
555/**
556 * Transmit a queued message to the session's client.
557 *
558 * @param cls consensus session
559 * @param size number of bytes available in buf
560 * @param buf where the callee should write the message
561 * @return number of bytes written to buf
562 */
563static size_t
564connection_client_transmit_queued (void *cls, size_t size,
565 void *buf)
566{
567 struct GNUNET_MQ_MessageQueue *mq = cls;
568 struct GNUNET_MQ_Message *mqm = mq->current_msg;
569 struct ClientConnectionState *state = mq->impl_state;
570 size_t msg_size;
571
572
573 GNUNET_assert (NULL != mqm);
574
575 if (NULL != mqm->sent_cb)
576 {
577 mqm->sent_cb (mqm->sent_cls);
578 }
579
580 mq->current_msg = NULL;
581 GNUNET_assert (NULL != buf);
582 msg_size = ntohs (mqm->mh->size);
583 GNUNET_assert (size >= msg_size);
584 memcpy (buf, mqm->mh, msg_size);
585 GNUNET_free (mqm);
586 state->th = NULL;
587 if (NULL != mq->msg_head)
588 {
589 mq->current_msg = mq->msg_head;
590 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
591 state->th =
592 GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size),
593 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
594 &connection_client_transmit_queued, mq);
595 }
596 else if (NULL != mq->empty_cb)
597 mq->empty_cb (mq->empty_cls);
598 return msg_size;
599}
600
601
602
603static void
604connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq)
605{
606 GNUNET_free (mq->impl_state);
607}
608
609static void
610connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
611 struct GNUNET_MQ_Message *mqm)
612{
613 struct ClientConnectionState *state = mq->impl_state;
614 int msize;
615
616 GNUNET_assert (NULL != state);
617
618 if (NULL != state->th)
619 {
620 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
621 return;
622 }
623 GNUNET_assert (NULL == mq->current_msg);
624 mq->current_msg = mqm;
625 msize = ntohs (mqm->mh->size);
626 state->th =
627 GNUNET_CLIENT_notify_transmit_ready (state->connection, msize,
628 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
629 &connection_client_transmit_queued, mq);
630}
631
632
633
634/**
635 * Type of a function to call when we receive a message
636 * from the service.
637 *
638 * @param cls closure
639 * @param msg message received, NULL on timeout or fatal error
640 */
641static void
642handle_client_message (void *cls,
643 const struct GNUNET_MessageHeader *msg)
644{
645 struct GNUNET_MQ_MessageQueue *mq = cls;
646 struct ClientConnectionState *state;
647
648 state = mq->impl_state;
649
650 if (NULL == msg)
651 {
652 if (NULL == mq->read_error_cb)
653 LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
654 mq->read_error_cb (mq->read_error_cls);
655 return;
656 }
657
658 GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
659 GNUNET_TIME_UNIT_FOREVER_REL);
660
661 dispatch_message (mq, msg);
662}
663
664
665struct GNUNET_MQ_MessageQueue *
666GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
667 const struct GNUNET_MQ_Handler *handlers,
668 void *cls)
669{
670 struct GNUNET_MQ_MessageQueue *mq;
671 struct ClientConnectionState *state;
672
673 GNUNET_assert (NULL != connection);
674
675 mq = GNUNET_new (struct GNUNET_MQ_MessageQueue);
676 mq->handlers = handlers;
677 mq->handlers_cls = cls;
678 state = GNUNET_new (struct ClientConnectionState);
679 state->connection = connection;
680 mq->impl_state = state;
681 mq->send_impl = connection_client_send_impl;
682 mq->destroy_impl = connection_client_destroy_impl;
683
684 if (NULL != handlers)
685 {
686 GNUNET_CLIENT_receive (connection, handle_client_message, mq,
687 GNUNET_TIME_UNIT_FOREVER_REL);
688 }
689
690 return mq;
691}
692
693
694void
695GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
696 const struct GNUNET_MQ_Handler *new_handlers,
697 void *cls)
698{
699 mq->handlers = new_handlers;
700 mq->handlers_cls = cls;
701}
702
703
704/**
705 * Associate the assoc_data in mq with a unique request id.
706 *
707 * @param mq message queue, id will be unique for the queue
708 * @param mqm message to associate
709 * @param assoc_data to associate
710 */
711uint32_t
712GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
713 struct GNUNET_MQ_Message *mqm,
714 void *assoc_data)
715{
716 uint32_t id;
717
718 if (NULL == mq->assoc_map)
719 {
720 mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
721 mq->assoc_id = 1;
722 }
723 id = mq->assoc_id++;
724 GNUNET_CONTAINER_multihashmap32_put (mq->assoc_map, id, assoc_data,
725 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
726 return id;
727}
728
729
730
731void *
732GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
733{
734 if (NULL == mq->assoc_map)
735 return NULL;
736 return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
737}
738
739
740void *
741GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id)
742{
743 void *val;
744
745 if (NULL == mq->assoc_map)
746 return NULL;
747 val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
748 GNUNET_assert (NULL != val);
749 GNUNET_CONTAINER_multihashmap32_remove (mq->assoc_map, request_id, val);
750 return val;
751}
752
753
754void
755GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
756 GNUNET_MQ_NotifyCallback cb,
757 void *cls)
758{
759 mqm->sent_cb = cb;
760 mqm->sent_cls = cls;
761}
762
763
764void
765GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
766{
767 /* FIXME: destroy all pending messages in the queue */
768
769 if (NULL != mq->destroy_impl)
770 {
771 mq->destroy_impl (mq);
772 }
773
774 GNUNET_free (mq);
775}
776
777
778/**
779 * Call a callback once all messages queued have been sent,
780 * i.e. the message queue is empty.
781 *
782 * @param mqm the message queue to send the notification for
783 * @param cb the callback to call on an empty queue
784 * @param cls closure for cb
785 */
786void
787GNUNET_MQ_notify_empty (struct GNUNET_MQ_MessageQueue *mqm,
788 GNUNET_MQ_NotifyCallback cb,
789 void *cls)
790{
791 mqm->empty_cb = cb;
792 mqm->empty_cls = cls;
793}
diff --git a/src/set/mq.h b/src/set/mq.h
deleted file mode 100644
index b7a89f6e0..000000000
--- a/src/set/mq.h
+++ /dev/null
@@ -1,362 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @author Florian Dold
23 * @file set/mq.h
24 * @brief general purpose request queue
25 */
26#ifndef MQ_H
27#define MQ_H
28
29#include "platform.h"
30#include "gnunet_common.h"
31#include "gnunet_util_lib.h"
32#include "gnunet_connection_lib.h"
33#include "gnunet_server_lib.h"
34#include "gnunet_stream_lib.h"
35
36
37/**
38 * Allocate a GNUNET_MQ_Message, with extra space allocated after the space needed
39 * by the message struct.
40 * The allocated message will already have the type and size field set.
41 *
42 * @param mvar variable to store the allocated message in;
43 * must have a header field
44 * @param esize extra space to allocate after the message
45 * @param type type of the message
46 * @return the MQ message
47 */
48#define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), (esize) + sizeof *(mvar), (type))
49
50/**
51 * Allocate a GNUNET_MQ_Message.
52 * The allocated message will already have the type and size field set.
53 *
54 * @param mvar variable to store the allocated message in;
55 * must have a header field
56 * @param type type of the message
57 * @return the MQ message
58 */
59#define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type)
60
61/**
62 * Append data to the end of an existing MQ message.
63 * If the operation is successful, mqm is changed to point to the new MQ message,
64 * and GNUNET_OK is returned.
65 * On failure, GNUNET_SYSERR is returned, and the pointer mqm is not changed,
66 * the user of this API must take care of disposing the already allocated message
67 * (either by sending it, or by using GNUNET_MQ_discard)
68 *
69 * @param mqm MQ message to augment with additional data
70 * @param src source buffer for the additional data
71 * @param len length of the additional data
72 * @return FIXME
73 */
74#define GNUNET_MQ_nest(mqm, src, len) GNUNET_MQ_nest_ (&mqm, src, len)
75
76
77
78/**
79 * Append a message to the end of an existing MQ message.
80 * If the operation is successful, mqm is changed to point to the new MQ message,
81 * and GNUNET_OK is returned.
82 * On failure, GNUNET_SYSERR is returned, and the pointer mqm is not changed,
83 * the user of this API must take care of disposing the already allocated message
84 * (either by sending it, or by using GNUNET_MQ_discard)
85 *
86 * @param mqm MQ message to augment with additional data
87 * @param mh the message to append, must be of type 'struct GNUNET_MessageHeader *'
88 */
89#define GNUNET_MQ_nest_mh(mqm, mh) ((NULL == mh) ? (GNUNET_OK) : GNUNET_MQ_nest((mqm), (mh), ntohs ((mh)->size)))
90
91
92/**
93 * Allocate a GNUNET_MQ_Message, where the message only consists of a header.
94 * The allocated message will already have the type and size field set.
95 *
96 * @param type type of the message
97 */
98#define GNUNET_MQ_msg_header(type) GNUNET_MQ_msg_ (NULL, sizeof (struct GNUNET_MessageHeader), type)
99
100
101/**
102 * Allocate a GNUNET_MQ_Message, where the message only consists of a header and extra space.
103 * The allocated message will already have the type and size field set.
104 *
105 * @param mh pointer that will changed to point at to the allocated message header
106 * @param esize extra space to allocate after the message header
107 * @param type type of the message
108 */
109#define GNUNET_MQ_msg_header_extra(mh, esize, type) GNUNET_MQ_msg_ (&mh, (esize) + sizeof (struct GNUNET_MessageHeader), type)
110
111
112/**
113 * End-marker for the handlers array
114 */
115#define GNUNET_MQ_HANDLERS_END {NULL, 0, 0}
116
117/**
118 * Opaque handle to a message queue
119 */
120struct GNUNET_MQ_MessageQueue;
121
122/**
123 * Opaque handle to an allocated message
124 */
125struct GNUNET_MQ_Message; // Entry (/ Request)
126
127/**
128 * Called when a message has been received.
129 *
130 * @param cls closure
131 * @param msg the received message
132 */
133typedef void (*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg);
134
135
136/**
137 * Message handler for a specific message type.
138 */
139struct GNUNET_MQ_Handler
140{
141 /**
142 * Callback, called every time a new message of
143 * the specified type has been receied.
144 */
145 GNUNET_MQ_MessageCallback cb;
146
147
148 /**
149 * Type of the message this handler covers.
150 */
151 uint16_t type;
152
153 /**
154 * Expected size of messages of this type. Use 0 for
155 * variable-size. If non-zero, messages of the given
156 * type will be discarded (and the connection closed)
157 * if they do not have the right size.
158 */
159 uint16_t expected_size;
160};
161
162/**
163 * Callback used for notifications
164 *
165 * @param cls closure
166 */
167typedef void (*GNUNET_MQ_NotifyCallback) (void *cls);
168
169
170/**
171 * Create a new message for MQ.
172 *
173 * @param mhp message header to store the allocated message header in, can be NULL
174 * @param size size of the message to allocate
175 * @param type type of the message, will be set in the allocated message
176 * @return the allocated MQ message
177 */
178struct GNUNET_MQ_Message *
179GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type);
180
181
182int
183GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
184 const void *src, uint16_t len);
185
186
187/**
188 * Discard the message queue message, free all
189 * allocated resources. Must be called in the event
190 * that a message is created but should not actually be sent.
191 *
192 * @param mqm the message to discard
193 */
194void
195GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm);
196
197
198/**
199 * Send a message with the give message queue.
200 * May only be called once per message.
201 *
202 * @param mq message queue
203 * @param mqm the message to send.
204 */
205void
206GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm);
207
208
209/**
210 * Cancel sending the message. Message must have been sent with GNUNET_MQ_send before.
211 * May not be called after the notify sent callback has been called
212 *
213 * @param mqm queued message to cancel
214 */
215void
216GNUNET_MQ_send_cancel (struct GNUNET_MQ_Message *mqm);
217
218
219/**
220 * Associate the assoc_data in mq with a unique request id.
221 *
222 * @param mq message queue, id will be unique for the queue
223 * @param mqm message to associate
224 * @param assoc_data to associate
225 */
226uint32_t
227GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq,
228 struct GNUNET_MQ_Message *mqm,
229 void *assoc_data);
230
231/**
232 * Get the data associated with a request id in a queue
233 *
234 * @param mq the message queue with the association
235 * @param request_id the request id we are interested in
236 * @return the associated data
237 */
238void *
239GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id);
240
241
242/**
243 * Remove the association for a request id
244 *
245 * @param mq the message queue with the association
246 * @param request_id the request id we want to remove
247 * @return the associated data
248 */
249void *
250GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id);
251
252
253
254/**
255 * Create a message queue for a GNUNET_CLIENT_Connection.
256 * If handlers are specfied, receive messages from the connection.
257 *
258 * @param connection the client connection
259 * @param handlers handlers for receiving messages
260 * @param cls closure for the handlers
261 * @return the message queue
262 */
263struct GNUNET_MQ_MessageQueue *
264GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
265 const struct GNUNET_MQ_Handler *handlers,
266 void *cls);
267
268
269/**
270 * Create a message queue for a GNUNET_STREAM_Socket.
271 *
272 * @param client the client
273 * @return the message queue
274 */
275struct GNUNET_MQ_MessageQueue *
276GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client);
277
278
279/**
280 * Create a message queue for a GNUNET_STREAM_Socket.
281 * If handlers are specfied, receive messages from the stream socket.
282 *
283 * @param socket the stream socket
284 * @param handlers handlers for receiving messages
285 * @param cls closure for the handlers
286 * @return the message queue
287 * @deprecated - GNUNET_MQ_queue_create_with_callbacks
288 */
289struct GNUNET_MQ_MessageQueue *
290GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
291 const struct GNUNET_MQ_Handler *handlers,
292 void *cls);
293
294
295/**
296 * Replace the handlers of a message queue with new handlers.
297 * Takes effect immediately, even for messages that already have been received, but for
298 * with the handler has not been called.
299 *
300 * @param mq message queue
301 * @param new_handlers new handlers
302 * @param cls new closure for the handlers
303 */
304void
305GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq,
306 const struct GNUNET_MQ_Handler *new_handlers,
307 void *cls);
308
309
310/**
311 * Call a callback once the message has been sent, that is, the message
312 * can not be canceled anymore.
313 * There can be only one notify sent callback per message.
314 *
315 * @param mqm message to call the notify callback for
316 * @param cb the notify callback
317 * @param cls closure for the callback
318 */
319void
320GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm,
321 GNUNET_MQ_NotifyCallback cb,
322 void *cls);
323
324
325/**
326 * Call a callback once all messages queued have been sent,
327 * i.e. the message queue is empty.
328 *
329 * @param mqm the message queue to send the notification for
330 * @param cb the callback to call on an empty queue
331 * @param cls closure for cb
332 * @deprecated
333 */
334void
335GNUNET_MQ_notify_empty (struct GNUNET_MQ_MessageQueue *mqm,
336 GNUNET_MQ_NotifyCallback cb,
337 void *cls);
338
339
340/**
341 * Call a callback if reading encountered an error.
342 *
343 * @param mqm the message queue to send the notification for
344 * @param cb the callback to call on a read error
345 * @param cls closure for cb
346 * @deprecated, integrate with queue creation
347 */
348void
349GNUNET_MQ_notify_read_error (struct GNUNET_MQ_MessageQueue *mqm,
350 GNUNET_MQ_NotifyCallback cb,
351 void *cls);
352
353
354/**
355 * Destroy the message queue.
356 *
357 * @param mq message queue to destroy
358 */
359void
360GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq);
361
362#endif
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 775e390de..5838680b9 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors) 3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -29,8 +29,6 @@
29#include "gnunet_client_lib.h" 29#include "gnunet_client_lib.h"
30#include "gnunet_set_service.h" 30#include "gnunet_set_service.h"
31#include "set.h" 31#include "set.h"
32#include "mq.h"
33#include <inttypes.h>
34 32
35 33
36#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
@@ -61,7 +59,6 @@ struct GNUNET_SET_OperationHandle
61 void *result_cls; 59 void *result_cls;
62 struct GNUNET_SET_Handle *set; 60 struct GNUNET_SET_Handle *set;
63 uint32_t request_id; 61 uint32_t request_id;
64 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
65}; 62};
66 63
67 64
@@ -104,11 +101,6 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
104 * and this is the last result message we get */ 101 * and this is the last result message we get */
105 if (htons (msg->result_status) != GNUNET_SET_STATUS_OK) 102 if (htons (msg->result_status) != GNUNET_SET_STATUS_OK)
106 { 103 {
107 if (GNUNET_SCHEDULER_NO_TASK != oh->timeout_task)
108 {
109 GNUNET_SCHEDULER_cancel (oh->timeout_task);
110 oh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
111 }
112 GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id)); 104 GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
113 if (NULL != oh->result_cb) 105 if (NULL != oh->result_cb)
114 oh->result_cb (oh->result_cls, NULL, htons (msg->result_status)); 106 oh->result_cb (oh->result_cls, NULL, htons (msg->result_status));
@@ -264,26 +256,6 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
264 256
265 257
266/** 258/**
267 * Signature of the main function of a task.
268 *
269 * @param cls closure
270 * @param tc context information (why was this task triggered now)
271 */
272static void
273operation_timeout_task (void *cls,
274 const struct GNUNET_SCHEDULER_TaskContext * tc)
275{
276 struct GNUNET_SET_OperationHandle *oh = cls;
277 oh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
278 if (NULL != oh->result_cb)
279 oh->result_cb (oh->result_cls, NULL, GNUNET_SET_STATUS_TIMEOUT);
280 oh->result_cb = NULL;
281 oh->result_cls = NULL;
282 GNUNET_SET_operation_cancel (oh);
283}
284
285
286/**
287 * Evaluate a set operation with our set and the set of another peer. 259 * Evaluate a set operation with our set and the set of another peer.
288 * 260 *
289 * @param set set to use 261 * @param set set to use
@@ -294,8 +266,6 @@ operation_timeout_task (void *cls,
294 * @param salt salt used for the set operation; sometimes set operations 266 * @param salt salt used for the set operation; sometimes set operations
295 * fail due to hash collisions, using a different salt for each operation 267 * fail due to hash collisions, using a different salt for each operation
296 * makes it harder for an attacker to exploit this 268 * makes it harder for an attacker to exploit this
297 * @param timeout result_cb will be called with GNUNET_SET_STATUS_TIMEOUT
298 * if the operation is not done after the specified time
299 * @param result_mode specified how results will be returned, 269 * @param result_mode specified how results will be returned,
300 * see 'GNUNET_SET_ResultMode'. 270 * see 'GNUNET_SET_ResultMode'.
301 * @param result_cb called on error or success 271 * @param result_cb called on error or success
@@ -308,7 +278,6 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
308 const struct GNUNET_HashCode *app_id, 278 const struct GNUNET_HashCode *app_id,
309 const struct GNUNET_MessageHeader *context_msg, 279 const struct GNUNET_MessageHeader *context_msg,
310 uint16_t salt, 280 uint16_t salt,
311 struct GNUNET_TIME_Relative timeout,
312 enum GNUNET_SET_ResultMode result_mode, 281 enum GNUNET_SET_ResultMode result_mode,
313 GNUNET_SET_ResultIterator result_cb, 282 GNUNET_SET_ResultIterator result_cb,
314 void *result_cls) 283 void *result_cls)
@@ -331,7 +300,6 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
331 if (GNUNET_OK != GNUNET_MQ_nest (mqm, context_msg, ntohs (context_msg->size))) 300 if (GNUNET_OK != GNUNET_MQ_nest (mqm, context_msg, ntohs (context_msg->size)))
332 GNUNET_assert (0); 301 GNUNET_assert (0);
333 302
334 oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh);
335 GNUNET_MQ_send (set->mq, mqm); 303 GNUNET_MQ_send (set->mq, mqm);
336 304
337 return oh; 305 return oh;
@@ -399,7 +367,6 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
399 * 367 *
400 * @param request request to accept 368 * @param request request to accept
401 * @param set set used for the requested operation 369 * @param set set used for the requested operation
402 * @param timeout timeout for the set operation
403 * @param result_mode specified how results will be returned, 370 * @param result_mode specified how results will be returned,
404 * see 'GNUNET_SET_ResultMode'. 371 * see 'GNUNET_SET_ResultMode'.
405 * @param result_cb callback for the results 372 * @param result_cb callback for the results
@@ -409,7 +376,6 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
409struct GNUNET_SET_OperationHandle * 376struct GNUNET_SET_OperationHandle *
410GNUNET_SET_accept (struct GNUNET_SET_Request *request, 377GNUNET_SET_accept (struct GNUNET_SET_Request *request,
411 struct GNUNET_SET_Handle *set, 378 struct GNUNET_SET_Handle *set,
412 struct GNUNET_TIME_Relative timeout,
413 enum GNUNET_SET_ResultMode result_mode, 379 enum GNUNET_SET_ResultMode result_mode,
414 GNUNET_SET_ResultIterator result_cb, 380 GNUNET_SET_ResultIterator result_cb,
415 void *result_cls) 381 void *result_cls)
@@ -432,8 +398,6 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
432 msg->accept_id = htonl (request->accept_id); 398 msg->accept_id = htonl (request->accept_id);
433 GNUNET_MQ_send (set->mq, mqm); 399 GNUNET_MQ_send (set->mq, mqm);
434 400
435 oh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, operation_timeout_task, oh);
436
437 return oh; 401 return oh;
438} 402}
439 403
diff --git a/src/set/test_mq.c b/src/set/test_mq.c
deleted file mode 100644
index d13c63440..000000000
--- a/src/set/test_mq.c
+++ /dev/null
@@ -1,115 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file set/test_mq.c
23 * @brief simple tests for mq
24 */
25#include "platform.h"
26#include "gnunet_util_lib.h"
27#include "gnunet_testing_lib.h"
28#include "mq.h"
29
30
31GNUNET_NETWORK_STRUCT_BEGIN
32
33struct MyMessage
34{
35 struct GNUNET_MessageHeader header;
36 uint32_t x GNUNET_PACKED;
37};
38
39GNUNET_NETWORK_STRUCT_END
40
41void
42test1 (void)
43{
44 struct GNUNET_MQ_Message *mqm;
45 struct MyMessage *mm;
46
47 mm = NULL;
48 mqm = NULL;
49
50 mqm = GNUNET_MQ_msg (mm, 42);
51 GNUNET_assert (NULL != mqm);
52 GNUNET_assert (NULL != mm);
53 GNUNET_assert (42 == ntohs (mm->header.type));
54 GNUNET_assert (sizeof (struct MyMessage) == ntohs (mm->header.size));
55}
56
57
58void
59test2 (void)
60{
61 struct GNUNET_MQ_Message *mqm;
62 struct MyMessage *mm;
63 int res;
64 char *s = "foo";
65
66 mqm = GNUNET_MQ_msg (mm, 42);
67 res = GNUNET_MQ_nest (mqm, s, strlen(s));
68 GNUNET_assert (GNUNET_OK == res);
69 res = GNUNET_MQ_nest (mqm, s, strlen(s));
70 GNUNET_assert (GNUNET_OK == res);
71 res = GNUNET_MQ_nest (mqm, NULL, 0);
72 GNUNET_assert (GNUNET_OK == res);
73
74 GNUNET_assert (strlen (s) * 2 + sizeof (struct MyMessage) == ntohs (mm->header.size));
75
76 res = GNUNET_MQ_nest_mh (mqm, &mm->header);
77 GNUNET_assert (GNUNET_OK == res);
78 GNUNET_assert (2 * (strlen (s) * 2 + sizeof (struct MyMessage)) == ntohs (mm->header.size));
79
80 res = GNUNET_MQ_nest (mqm, (void *) 0xF00BA, 0xFFF0);
81 GNUNET_assert (GNUNET_OK != res);
82
83 GNUNET_MQ_discard (mqm);
84}
85
86
87void
88test3 (void)
89{
90 struct GNUNET_MQ_Message *mqm;
91 struct GNUNET_MessageHeader *mh;
92
93 mqm = GNUNET_MQ_msg_header (42);
94 /* how could the above be checked? */
95
96 GNUNET_MQ_discard (mqm);
97
98 mqm = GNUNET_MQ_msg_header_extra (mh, 20, 42);
99 GNUNET_assert (42 == ntohs (mh->type));
100 GNUNET_assert (sizeof (struct GNUNET_MessageHeader) + 20 == ntohs (mh->size));
101}
102
103
104int
105main (int argc, char **argv)
106{
107
108 GNUNET_log_setup ("test-mq", "INFO", NULL);
109 test1 ();
110 test2 ();
111 test3 ();
112
113 return 0;
114}
115
diff --git a/src/set/test_mq_client.c b/src/set/test_mq_client.c
deleted file mode 100644
index ca615d37e..000000000
--- a/src/set/test_mq_client.c
+++ /dev/null
@@ -1,181 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file set/test_mq.c
23 * @brief tests for mq with connection client
24 */
25/**
26 * @file util/test_server_with_client.c
27 * @brief tests for server.c and client.c,
28 * specifically disconnect_notify,
29 * client_get_address and receive_done (resume processing)
30 */
31#include "platform.h"
32#include "gnunet_common.h"
33#include "gnunet_scheduler_lib.h"
34#include "gnunet_client_lib.h"
35#include "gnunet_server_lib.h"
36#include "gnunet_time_lib.h"
37#include "mq.h"
38
39#define PORT 23336
40
41#define MY_TYPE 128
42
43
44static struct GNUNET_SERVER_Handle *server;
45
46static struct GNUNET_CLIENT_Connection *client;
47
48static struct GNUNET_CONFIGURATION_Handle *cfg;
49
50static int ok;
51
52static int notify = GNUNET_NO;
53
54static int received = 0;
55
56
57static void
58recv_cb (void *cls, struct GNUNET_SERVER_Client *argclient,
59 const struct GNUNET_MessageHeader *message)
60{
61 received++;
62
63 printf ("received\n");
64
65
66 if ((received == 2) && (GNUNET_YES == notify))
67 {
68 printf ("done\n");
69 GNUNET_SERVER_receive_done (argclient, GNUNET_NO);
70 return;
71 }
72
73 GNUNET_SERVER_receive_done (argclient, GNUNET_YES);
74}
75
76
77static void
78clean_up (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
79{
80 GNUNET_SERVER_destroy (server);
81 server = NULL;
82 GNUNET_CONFIGURATION_destroy (cfg);
83 cfg = NULL;
84}
85
86
87/**
88 * Functions with this signature are called whenever a client
89 * is disconnected on the network level.
90 *
91 * @param cls closure
92 * @param client identification of the client
93 */
94static void
95notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
96{
97 if (client == NULL)
98 return;
99 ok = 0;
100 GNUNET_SCHEDULER_add_now (&clean_up, NULL);
101}
102
103
104static struct GNUNET_SERVER_MessageHandler handlers[] = {
105 {&recv_cb, NULL, MY_TYPE, sizeof (struct GNUNET_MessageHeader)},
106 {NULL, NULL, 0, 0}
107};
108
109void send_cb (void *cls)
110{
111 printf ("notify sent\n");
112 notify = GNUNET_YES;
113}
114
115void test_mq (struct GNUNET_CLIENT_Connection *client)
116{
117 struct GNUNET_MQ_MessageQueue *mq;
118 struct GNUNET_MQ_Message *mqm;
119
120 /* FIXME: test handling responses */
121 mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL);
122
123 mqm = GNUNET_MQ_msg_header (MY_TYPE);
124 GNUNET_MQ_send (mq, mqm);
125
126 mqm = GNUNET_MQ_msg_header (MY_TYPE);
127 GNUNET_MQ_notify_sent (mqm, send_cb, NULL);
128 GNUNET_MQ_send (mq, mqm);
129
130 /* FIXME: add a message that will be canceled */
131}
132
133
134static void
135task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
136{
137 struct sockaddr_in sa;
138 struct sockaddr *sap[2];
139 socklen_t slens[2];
140
141 sap[0] = (struct sockaddr *) &sa;
142 slens[0] = sizeof (sa);
143 sap[1] = NULL;
144 slens[1] = 0;
145 memset (&sa, 0, sizeof (sa));
146#if HAVE_SOCKADDR_IN_SIN_LEN
147 sa.sin_len = sizeof (sa);
148#endif
149 sa.sin_family = AF_INET;
150 sa.sin_port = htons (PORT);
151 server =
152 GNUNET_SERVER_create (NULL, NULL, sap, slens,
153 GNUNET_TIME_relative_multiply
154 (GNUNET_TIME_UNIT_MILLISECONDS, 250), GNUNET_NO);
155 GNUNET_assert (server != NULL);
156 handlers[0].callback_cls = cls;
157 GNUNET_SERVER_add_handlers (server, handlers);
158 GNUNET_SERVER_disconnect_notify (server, &notify_disconnect, cls);
159 cfg = GNUNET_CONFIGURATION_create ();
160 GNUNET_CONFIGURATION_set_value_number (cfg, "test", "PORT", PORT);
161 GNUNET_CONFIGURATION_set_value_string (cfg, "test", "HOSTNAME", "localhost");
162 GNUNET_CONFIGURATION_set_value_string (cfg, "resolver", "HOSTNAME",
163 "localhost");
164 client = GNUNET_CLIENT_connect ("test", cfg);
165 GNUNET_assert (client != NULL);
166
167 test_mq (client);
168}
169
170
171int
172main (int argc, char *argv[])
173{
174 GNUNET_log_setup ("test-mq-client",
175 "INFO",
176 NULL);
177 ok = 1;
178 GNUNET_SCHEDULER_run (&task, NULL);
179 return ok;
180}
181
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c
index 0ab02cad7..66e7a81d1 100644
--- a/src/set/test_set_api.c
+++ b/src/set/test_set_api.c
@@ -29,26 +29,57 @@
29 29
30 30
31static struct GNUNET_PeerIdentity local_id; 31static struct GNUNET_PeerIdentity local_id;
32
32static struct GNUNET_HashCode app_id; 33static struct GNUNET_HashCode app_id;
33static struct GNUNET_SET_Handle *set1; 34static struct GNUNET_SET_Handle *set1;
34static struct GNUNET_SET_Handle *set2; 35static struct GNUNET_SET_Handle *set2;
35static struct GNUNET_SET_ListenHandle *listen_handle; 36static struct GNUNET_SET_ListenHandle *listen_handle;
36const static struct GNUNET_CONFIGURATION_Handle *config; 37const static struct GNUNET_CONFIGURATION_Handle *config;
37 38
39int num_done;
40
38 41
39static void 42static void
40result_cb_set1 (void *cls, struct GNUNET_SET_Element *element, 43result_cb_set1 (void *cls, const struct GNUNET_SET_Element *element,
41 enum GNUNET_SET_Status status) 44 enum GNUNET_SET_Status status)
42{ 45{
43 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result (set 1)\n"); 46 switch (status)
47 {
48 case GNUNET_SET_STATUS_OK:
49 printf ("set 1: got element\n");
50 break;
51 case GNUNET_SET_STATUS_FAILURE:
52 printf ("set 1: failure\n");
53 break;
54 case GNUNET_SET_STATUS_DONE:
55 printf ("set 1: done\n");
56 GNUNET_SET_destroy (set1);
57 break;
58 default:
59 GNUNET_assert (0);
60 }
44} 61}
45 62
46 63
47static void 64static void
48result_cb_set2 (void *cls, struct GNUNET_SET_Element *element, 65result_cb_set2 (void *cls, const struct GNUNET_SET_Element *element,
49 enum GNUNET_SET_Status status) 66 enum GNUNET_SET_Status status)
50{ 67{
51 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got result (set 2)\n"); 68 switch (status)
69 {
70 case GNUNET_SET_STATUS_OK:
71 printf ("set 2: got element\n");
72 break;
73 case GNUNET_SET_STATUS_FAILURE:
74 printf ("set 2: failure\n");
75 break;
76 case GNUNET_SET_STATUS_DONE:
77 printf ("set 2: done\n");
78 GNUNET_SET_destroy (set2);
79 break;
80 default:
81 GNUNET_assert (0);
82 }
52} 83}
53 84
54 85
@@ -59,7 +90,9 @@ listen_cb (void *cls,
59 struct GNUNET_SET_Request *request) 90 struct GNUNET_SET_Request *request)
60{ 91{
61 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); 92 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
62 GNUNET_SET_accept (request, set2, GNUNET_TIME_UNIT_FOREVER_REL, 93 GNUNET_SET_listen_cancel (listen_handle);
94
95 GNUNET_SET_accept (request, set2,
63 GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); 96 GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
64} 97}
65 98
@@ -75,7 +108,7 @@ start (void *cls)
75 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, 108 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
76 &app_id, listen_cb, NULL); 109 &app_id, listen_cb, NULL);
77 GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, 110 GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42,
78 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_SET_RESULT_ADDED, 111 GNUNET_SET_RESULT_ADDED,
79 result_cb_set1, NULL); 112 result_cb_set1, NULL);
80} 113}
81 114
@@ -120,7 +153,6 @@ init_set1 (void)
120 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n"); 153 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
121} 154}
122 155
123
124/** 156/**
125 * Signature of the 'main' function for a (single-peer) testcase that 157 * Signature of the 'main' function for a (single-peer) testcase that
126 * is run using 'GNUNET_TESTING_peer_run'. 158 * is run using 'GNUNET_TESTING_peer_run'.