aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2013-09-25 17:46:06 +0000
committerGabor X Toth <*@tg-x.net>2013-09-25 17:46:06 +0000
commit7bec38c1bf3572bd01ddd064f69d1b744f7725a8 (patch)
treee875e7dd3920d201e1e16fc046e8070fed7bd875 /src/psyc
parent303d6a97bc552a337c992944c3151ea53c1f74dc (diff)
downloadgnunet-7bec38c1bf3572bd01ddd064f69d1b744f7725a8.tar.gz
gnunet-7bec38c1bf3572bd01ddd064f69d1b744f7725a8.zip
psyc service: start/stop, join/part, message transmission: lib -> psyc -> mcast; psyc API: stop/resume transmission
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/Makefile.am10
-rw-r--r--src/psyc/gnunet-service-psyc.c374
-rw-r--r--src/psyc/psyc.conf7
-rw-r--r--src/psyc/psyc.h177
-rw-r--r--src/psyc/psyc_api.c620
-rw-r--r--src/psyc/test_psyc.c74
6 files changed, 1168 insertions, 94 deletions
diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am
index b65f9ec7a..57f0e2280 100644
--- a/src/psyc/Makefile.am
+++ b/src/psyc/Makefile.am
@@ -24,12 +24,14 @@ libgnunetpsyc_la_SOURCES = \
24 psyc.h 24 psyc.h
25libgnunetpsyc_la_LIBADD = \ 25libgnunetpsyc_la_LIBADD = \
26 $(top_builddir)/src/util/libgnunetutil.la \ 26 $(top_builddir)/src/util/libgnunetutil.la \
27 $(top_builddir)/src/env/libgnunetenv.la \
27 $(GN_LIBINTL) $(XLIB) 28 $(GN_LIBINTL) $(XLIB)
28libgnunetpsyc_la_LDFLAGS = \ 29libgnunetpsyc_la_LDFLAGS = \
29 $(GN_LIB_LDFLAGS) $(WINFLAGS) \ 30 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
30 -version-info 0:0:0 31 -version-info 0:0:0
31libgnunetpsyc_la_DEPENDENCIES = \ 32libgnunetpsyc_la_DEPENDENCIES = \
32 $(top_builddir)/src/util/libgnunetutil.la 33 $(top_builddir)/src/util/libgnunetutil.la \
34 $(top_builddir)/src/env/libgnunetenv.la
33 35
34bin_PROGRAMS = 36bin_PROGRAMS =
35 37
@@ -41,10 +43,14 @@ gnunet_service_psyc_SOURCES = \
41gnunet_service_psyc_LDADD = \ 43gnunet_service_psyc_LDADD = \
42 $(top_builddir)/src/statistics/libgnunetstatistics.la \ 44 $(top_builddir)/src/statistics/libgnunetstatistics.la \
43 $(top_builddir)/src/util/libgnunetutil.la \ 45 $(top_builddir)/src/util/libgnunetutil.la \
46 $(top_builddir)/src/multicast/libgnunetmulticast.la \
47 $(top_builddir)/src/psycstore/libgnunetpsycstore.la \
44 $(GN_LIBINTL) 48 $(GN_LIBINTL)
45gnunet_service_psyc_DEPENDENCIES = \ 49gnunet_service_psyc_DEPENDENCIES = \
46 $(top_builddir)/src/statistics/libgnunetstatistics.la \ 50 $(top_builddir)/src/statistics/libgnunetstatistics.la \
47 $(top_builddir)/src/util/libgnunetutil.la 51 $(top_builddir)/src/util/libgnunetutil.la \
52 $(top_builddir)/src/multicast/libgnunetmulticast.la \
53 $(top_builddir)/src/psycstore/libgnunetpsycstore.la
48 54
49 55
50if HAVE_TESTING 56if HAVE_TESTING
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 528b76e59..5fd2b9e66 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -29,9 +29,13 @@
29#include "gnunet_constants.h" 29#include "gnunet_constants.h"
30#include "gnunet_protocols.h" 30#include "gnunet_protocols.h"
31#include "gnunet_statistics_service.h" 31#include "gnunet_statistics_service.h"
32#include "gnunet_multicast_service.h"
33#include "gnunet_psycstore_service.h"
32#include "gnunet_psyc_service.h" 34#include "gnunet_psyc_service.h"
33#include "psyc.h" 35#include "psyc.h"
34 36
37#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
38
35 39
36/** 40/**
37 * Handle to our current configuration. 41 * Handle to our current configuration.
@@ -48,6 +52,83 @@ static struct GNUNET_STATISTICS_Handle *stats;
48 */ 52 */
49static struct GNUNET_SERVER_NotificationContext *nc; 53static struct GNUNET_SERVER_NotificationContext *nc;
50 54
55/**
56 * Handle to the PSYCstore.
57 */
58static struct GNUNET_PSYCSTORE_Handle *store;
59
60/**
61 * Message in the transmission queue.
62 */
63struct TransmitMessage
64{
65 struct TransmitMessage *prev;
66 struct TransmitMessage *next;
67
68 char *buf;
69 uint16_t size;
70 uint8_t status;
71};
72
73/**
74 * Common part of the client context for both a master and slave channel.
75 */
76struct Channel
77{
78 struct GNUNET_SERVER_Client *client;
79
80 struct TransmitMessage *tmit_head;
81 struct TransmitMessage *tmit_tail;
82
83 char *tmit_buf;
84 uint32_t tmit_mod_count;
85 uint32_t tmit_mod_recvd;
86 uint16_t tmit_size;
87 uint8_t tmit_status;
88
89 uint8_t in_transmit;
90 uint8_t is_master;
91};
92
93/**
94 * Client context for a channel master.
95 */
96struct Master
97{
98 struct Channel channel;
99 struct GNUNET_CRYPTO_EccPrivateKey private_key;
100 struct GNUNET_CRYPTO_EccPublicSignKey public_key;
101
102 struct GNUNET_MULTICAST_Origin *origin;
103 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
104
105 uint64_t max_message_id;
106 uint64_t max_state_message_id;
107 uint64_t max_group_generation;
108
109 /**
110 * enum GNUNET_PSYC_Policy
111 */
112 uint32_t policy;
113};
114
115
116/**
117 * Client context for a channel slave.
118 */
119struct Slave
120{
121 struct Channel channel;
122 struct GNUNET_CRYPTO_EccPrivateKey slave_key;
123 struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
124
125 struct GNUNET_MULTICAST_Member *member;
126 struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
127
128 uint64_t max_message_id;
129 uint64_t max_request_id;
130};
131
51 132
52/** 133/**
53 * Task run during shutdown. 134 * Task run during shutdown.
@@ -70,6 +151,279 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
70 } 151 }
71} 152}
72 153
154/**
155 * Called whenever a client is disconnected.
156 * Frees our resources associated with that client.
157 *
158 * @param cls Closure.
159 * @param client Identification of the client.
160 */
161static void
162client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
163{
164 if (NULL == client)
165 return;
166
167 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
168
169 struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
170 struct Channel);
171 GNUNET_assert (NULL != ch);
172
173 if (NULL != ch->tmit_buf)
174 {
175 GNUNET_free (ch->tmit_buf);
176 ch->tmit_buf = NULL;
177 }
178 GNUNET_free (ch);
179}
180
181
182void
183counters_cb (void *cls, uint64_t max_fragment_id, uint64_t max_message_id,
184 uint64_t max_group_generation, uint64_t max_state_message_id)
185{
186 struct Channel *ch = cls;
187 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
188 res->header.size = htons (sizeof (*res));
189 res->max_message_id = GNUNET_htonll (max_message_id);
190
191 if (ch->is_master)
192 {
193 struct Master *mst = cls;
194 mst->max_message_id = max_message_id;
195 mst->max_state_message_id = max_state_message_id;
196 mst->max_group_generation = max_group_generation;
197 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
198 }
199 else
200 {
201 struct Slave *slv = cls;
202 slv->max_message_id = max_message_id;
203 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
204 }
205
206 GNUNET_SERVER_notification_context_add (nc, ch->client);
207 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
208 GNUNET_NO);
209 GNUNET_free (res);
210}
211
212
213static void
214handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
215 const struct GNUNET_MessageHeader *msg)
216{
217 const struct MasterStartRequest *req
218 = (const struct MasterStartRequest *) msg;
219 struct Master *mst = GNUNET_new (struct Master);
220 mst->channel.client = client;
221 mst->channel.is_master = GNUNET_YES;
222 mst->policy = ntohl (req->policy);
223 mst->private_key = req->channel_key;
224 GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->private_key,
225 &mst->public_key);
226
227 GNUNET_PSYCSTORE_counters_get (store, &mst->public_key,
228 counters_cb, mst);
229
230 GNUNET_SERVER_client_set_user_context (client, mst);
231 GNUNET_SERVER_receive_done (client, GNUNET_OK);
232}
233
234
235static void
236handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
237 const struct GNUNET_MessageHeader *msg)
238{
239 const struct SlaveJoinRequest *req
240 = (const struct SlaveJoinRequest *) msg;
241 struct Slave *slv = GNUNET_new (struct Slave);
242 slv->channel.client = client;
243 slv->channel.is_master = GNUNET_NO;
244 slv->channel_key = req->channel_key;
245 slv->slave_key = req->slave_key;
246
247 GNUNET_PSYCSTORE_counters_get (store, &slv->channel_key,
248 counters_cb, slv);
249
250 GNUNET_SERVER_client_set_user_context (client, slv);
251 GNUNET_SERVER_receive_done (client, GNUNET_OK);
252}
253
254
255static void
256send_transmit_ack (struct Channel *ch)
257{
258 struct TransmitAck *res = GNUNET_malloc (sizeof (*res));
259 res->header.size = htons (sizeof (*res));
260 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
261 res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size);
262
263 GNUNET_SERVER_notification_context_add (nc, ch->client);
264 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
265 GNUNET_NO);
266 GNUNET_free (res);
267}
268
269
270static int
271transmit_notify (void *cls, size_t *data_size, void *data)
272{
273 struct Channel *ch = cls;
274 struct TransmitMessage *msg = ch->tmit_head;
275
276 if (NULL == msg || *data_size < msg->size)
277 {
278 *data_size = 0;
279 return GNUNET_NO;
280 }
281
282 memcpy (data, msg->buf, msg->size);
283 *data_size = msg->size;
284
285 GNUNET_free (ch->tmit_buf);
286 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
287
288 return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
289}
290
291
292static int
293master_transmit_message (struct Master *mst)
294{
295 if (NULL == mst->tmit_handle)
296 {
297 mst->tmit_handle
298 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
299 mst->max_group_generation,
300 transmit_notify, mst);
301 }
302 else
303 {
304 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
305 }
306 return GNUNET_OK;
307}
308
309
310static int
311slave_transmit_message (struct Slave *slv)
312{
313 if (NULL == slv->tmit_handle)
314 {
315 slv->tmit_handle
316 = GNUNET_MULTICAST_member_to_origin(slv->member, slv->max_request_id,
317 transmit_notify, slv);
318 }
319 else
320 {
321 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
322 }
323 return GNUNET_OK;
324}
325
326
327static int
328buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
329{
330 uint16_t size = ntohs (msg->size);
331
332 if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
333 return GNUNET_SYSERR;
334
335 if (0 == ch->tmit_size)
336 {
337 ch->tmit_buf = GNUNET_malloc (size);
338 memcpy (ch->tmit_buf, msg, size);
339 ch->tmit_size = size;
340 }
341 else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size)
342 {
343 ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size);
344 memcpy (ch->tmit_buf + ch->tmit_size, msg, size);
345 ch->tmit_size += size;
346 }
347
348 if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE
349 < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData))
350 {
351 struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage);
352 tmit_msg->buf = (char *) msg;
353 tmit_msg->size = size;
354 tmit_msg->status = ch->tmit_status;
355 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
356
357 ch->is_master
358 ? master_transmit_message ((struct Master *) ch)
359 : slave_transmit_message ((struct Slave *) ch);
360 }
361
362 return GNUNET_OK;
363}
364
365static void
366handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
367 const struct GNUNET_MessageHeader *msg)
368{
369 const struct GNUNET_PSYC_MessageMethod *meth
370 = (const struct GNUNET_PSYC_MessageMethod *) msg;
371 struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
372 struct Channel);
373 GNUNET_assert (NULL != ch);
374
375 if (GNUNET_NO != ch->in_transmit)
376 {
377 // FIXME: already transmitting a message, send back error message.
378 return;
379 }
380
381 ch->tmit_buf = NULL;
382 ch->tmit_size = 0;
383 ch->tmit_mod_recvd = 0;
384 ch->tmit_mod_count = ntohl (meth->mod_count);
385 ch->tmit_status = GNUNET_PSYC_DATA_CONT;
386
387 buffer_message (ch, msg);
388
389 if (0 == ch->tmit_mod_count)
390 send_transmit_ack (ch);
391};
392
393
394static void
395handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
396 const struct GNUNET_MessageHeader *msg)
397{
398 const struct GNUNET_PSYC_MessageModifier *mod
399 = (const struct GNUNET_PSYC_MessageModifier *) msg;
400 struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
401 struct Channel);
402 GNUNET_assert (NULL != ch);
403
404 ch->tmit_mod_recvd++;
405 buffer_message (ch, msg);
406
407 if (ch->tmit_mod_recvd == ch->tmit_mod_count)
408 send_transmit_ack (ch);
409};
410
411
412static void
413handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
414 const struct GNUNET_MessageHeader *msg)
415{
416 const struct GNUNET_PSYC_MessageData *data
417 = (const struct GNUNET_PSYC_MessageData *) msg;
418 struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
419 struct Channel);
420 GNUNET_assert (NULL != ch);
421
422 ch->tmit_status = data->status;
423 buffer_message (ch, msg);
424 send_transmit_ack (ch);
425};
426
73 427
74/** 428/**
75 * Initialize the PSYC service. 429 * Initialize the PSYC service.
@@ -83,14 +437,30 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
83 const struct GNUNET_CONFIGURATION_Handle *c) 437 const struct GNUNET_CONFIGURATION_Handle *c)
84{ 438{
85 static const struct GNUNET_SERVER_MessageHandler handlers[] = { 439 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
440 { &handle_master_start, NULL,
441 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
442
443 { &handle_slave_join, NULL,
444 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
445
446 { &handle_transmit_method, NULL,
447 GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD, 0 },
448
449 { &handle_transmit_modifier, NULL,
450 GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER, 0 },
451
452 { &handle_transmit_data, NULL,
453 GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA, 0 },
454
86 { NULL, NULL, 0, 0 } 455 { NULL, NULL, 0, 0 }
87 }; 456 };
88 457
89 cfg = c; 458 cfg = c;
90 459 store = GNUNET_PSYCSTORE_connect (cfg);
91 stats = GNUNET_STATISTICS_create ("psyc", cfg); 460 stats = GNUNET_STATISTICS_create ("psyc", cfg);
92 GNUNET_SERVER_add_handlers (server, handlers);
93 nc = GNUNET_SERVER_notification_context_create (server, 1); 461 nc = GNUNET_SERVER_notification_context_create (server, 1);
462 GNUNET_SERVER_add_handlers (server, handlers);
463 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
94 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, 464 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
95 NULL); 465 NULL);
96} 466}
diff --git a/src/psyc/psyc.conf b/src/psyc/psyc.conf
new file mode 100644
index 000000000..5a1eebf61
--- /dev/null
+++ b/src/psyc/psyc.conf
@@ -0,0 +1,7 @@
1[psyc]
2AUTOSTART = YES
3HOME = $SERVICEHOME
4BINARY = gnunet-service-psyc
5UNIXPATH = /tmp/gnunet-service-psyc.sock
6UNIX_MATCH_UID = NO
7UNIX_MATCH_GID = YES
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index c82c10ffa..35e9ae800 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -24,13 +24,186 @@
24 * @author Gabor X Toth 24 * @author Gabor X Toth
25 */ 25 */
26 26
27#ifndef GNUNET_PSYC_H 27#ifndef PSYC_H
28#define GNUNET_PSYC_H 28#define PSYC_H
29 29
30#include "gnunet_common.h" 30#include "gnunet_common.h"
31 31
32GNUNET_NETWORK_STRUCT_BEGIN 32GNUNET_NETWORK_STRUCT_BEGIN
33 33
34/**** service -> library ****/
35
36/**
37 * Answer from service to client about last operation.
38 */
39struct OperationResult
40{
41 /**
42 * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE
43 */
44 struct GNUNET_MessageHeader header;
45
46 /**
47 * Operation ID.
48 */
49 uint32_t op_id GNUNET_PACKED;
50
51 /**
52 * Status code for the operation.
53 */
54 int64_t result_code GNUNET_PACKED;
55
56 /* followed by 0-terminated error message (on error) */
57
58};
59
60
61struct CountersResult
62{
63 /**
64 * Type: GNUNET_MESSAGE_TYPE_PSYC_RESULT_COUNTERS
65 */
66 struct GNUNET_MessageHeader header;
67
68 uint64_t max_message_id;
69};
70
71
72/**
73 * Transmit acknowledgment.
74 *
75 * Sent after the last GNUNET_PSYC_MessageModifier and after each
76 * GNUNET_PSYC_MessageData.
77 *
78 * This message acknowledges previously received messages and asks for the next
79 * fragment of data.
80 */
81struct TransmitAck
82{
83 /**
84 * Type: GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK
85 */
86 struct GNUNET_MessageHeader header;
87
88 /**
89 * Buffer space available for the next data fragment.
90 */
91 uint16_t buf_avail;
92};
93
94
95/**** library -> service ****/
96
97
98struct MasterStartRequest
99{
100 /**
101 * Type: GNUNET_MESSAGE_TYPE_PSYC_MASTER_START
102 */
103 struct GNUNET_MessageHeader header;
104
105 struct GNUNET_CRYPTO_EccPrivateKey channel_key;
106
107 uint32_t policy GNUNET_PACKED;
108};
109
110
111struct SlaveJoinRequest
112{
113 /**
114 * Type: GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN
115 */
116 struct GNUNET_MessageHeader header;
117
118 uint32_t relay_count GNUNET_PACKED;
119
120 struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
121
122 struct GNUNET_CRYPTO_EccPrivateKey slave_key;
123
124 /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */
125};
126
127
128struct ChannelSlaveAdd
129{
130 /**
131 * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD
132 */
133 struct GNUNET_MessageHeader header;
134
135 uint32_t reserved;
136
137 struct GNUNET_CRYPTO_EccPublicSignKey *slave_key;
138
139 uint64_t announced_at;
140
141 uint64_t effective_since;
142};
143
144
145struct ChannelSlaveRemove
146{
147 /**
148 * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM
149 */
150 struct GNUNET_MessageHeader header;
151
152 uint32_t reserved;
153
154 struct GNUNET_CRYPTO_EccPublicSignKey *slave_key;
155
156 uint64_t announced_at;
157};
158
159
160struct StoryRequest
161{
162 /**
163 * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STORY_REQUEST
164 */
165 struct GNUNET_MessageHeader header;
166
167 uint64_t op_id;
168
169 uint64_t start_message_id;
170
171 uint64_t end_message_id;
172};
173
174
175struct StateQuery
176{
177 /**
178 * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_QUERY
179 */
180 struct GNUNET_MessageHeader header;
181
182 uint64_t op_id;
183
184 /* Followed by NUL-terminated name. */
185};
186
187
188struct StateResult
189{
190 /**
191 * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_RESULT
192 */
193 struct GNUNET_MessageHeader header;
194
195 /**
196 * Size of name, including NUL terminator.
197 */
198 uint16_t name_size GNUNET_PACKED;
199
200 /**
201 * OR'd StateOpFlags
202 */
203 uint8_t flags;
204
205 /* Followed by NUL-terminated name, then the value. */
206};
34 207
35 208
36GNUNET_NETWORK_STRUCT_END 209GNUNET_NETWORK_STRUCT_END
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index e298fae71..61b57c050 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -36,15 +36,76 @@
36#include "gnunet_psyc_service.h" 36#include "gnunet_psyc_service.h"
37#include "psyc.h" 37#include "psyc.h"
38 38
39#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
40
41
42struct OperationHandle
43{
44 struct OperationHandle *prev;
45 struct OperationHandle *next;
46 const struct GNUNET_MessageHeader *msg;
47};
48
39/** 49/**
40 * Handle that identifies a join request. 50 * Handle to access PSYC channel operations for both the master and slaves.
41 *
42 * Used to match calls to #GNUNET_PSYC_JoinCallback to the
43 * corresponding calls to GNUNET_PSYC_join_decision().
44 */ 51 */
45struct GNUNET_PSYC_JoinHandle 52struct GNUNET_PSYC_Channel
46{ 53{
47 54 /**
55 * Configuration to use.
56 */
57 const struct GNUNET_CONFIGURATION_Handle *cfg;
58
59 /**
60 * Socket (if available).
61 */
62 struct GNUNET_CLIENT_Connection *client;
63
64 /**
65 * Currently pending transmission request, or NULL for none.
66 */
67 struct GNUNET_CLIENT_TransmitHandle *th;
68
69 /**
70 * Head of operations to transmit.
71 */
72 struct OperationHandle *transmit_head;
73
74 /**
75 * Tail of operations to transmit.
76 */
77 struct OperationHandle *transmit_tail;
78
79 /**
80 * Message to send on reconnect.
81 */
82 struct GNUNET_MessageHeader *reconnect_msg;
83
84 /**
85 * Task doing exponential back-off trying to reconnect.
86 */
87 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
88
89 /**
90 * Time for next connect retry.
91 */
92 struct GNUNET_TIME_Relative reconnect_delay;
93
94 GNUNET_PSYC_Method method_cb;
95
96 GNUNET_PSYC_JoinCallback join_cb;
97
98 void *cb_cls;
99
100 /**
101 * Are we polling for incoming messages right now?
102 */
103 int in_receive;
104
105 /**
106 * Are we currently transmitting a message?
107 */
108 int in_transmit;
48}; 109};
49 110
50 111
@@ -53,23 +114,30 @@ struct GNUNET_PSYC_JoinHandle
53 */ 114 */
54struct GNUNET_PSYC_Master 115struct GNUNET_PSYC_Master
55{ 116{
117 struct GNUNET_PSYC_Channel ch;
118
119 GNUNET_PSYC_MasterStartCallback start_cb;
56 120
121 uint64_t max_message_id;
57}; 122};
58 123
59 124
60/** 125/**
61 * Handle for a pending PSYC transmission operation. 126 * Handle for a PSYC channel slave.
62 */ 127 */
63struct GNUNET_PSYC_MasterTransmitHandle 128struct GNUNET_PSYC_Slave
64{ 129{
65 130 struct GNUNET_PSYC_Channel ch;
66}; 131};
67 132
68 133
69/** 134/**
70 * Handle for a PSYC channel slave. 135 * Handle that identifies a join request.
136 *
137 * Used to match calls to #GNUNET_PSYC_JoinCallback to the
138 * corresponding calls to GNUNET_PSYC_join_decision().
71 */ 139 */
72struct GNUNET_PSYC_Slave 140struct GNUNET_PSYC_JoinHandle
73{ 141{
74 142
75}; 143};
@@ -78,16 +146,20 @@ struct GNUNET_PSYC_Slave
78/** 146/**
79 * Handle for a pending PSYC transmission operation. 147 * Handle for a pending PSYC transmission operation.
80 */ 148 */
81struct GNUNET_PSYC_SlaveTransmitHandle 149struct GNUNET_PSYC_MasterTransmitHandle
82{ 150{
83 151 struct GNUNET_PSYC_Master *master;
152 const struct GNUNET_ENV_Environment *env;
153 GNUNET_PSYC_MasterTransmitNotify notify;
154 void *notify_cls;
155 enum GNUNET_PSYC_MasterTransmitFlags flags;
84}; 156};
85 157
86 158
87/** 159/**
88 * Handle to access PSYC channel operations for both the master and slaves. 160 * Handle for a pending PSYC transmission operation.
89 */ 161 */
90struct GNUNET_PSYC_Channel 162struct GNUNET_PSYC_SlaveTransmitHandle
91{ 163{
92 164
93}; 165};
@@ -102,45 +174,264 @@ struct GNUNET_PSYC_Story
102}; 174};
103 175
104 176
177/**
178 * Handle for a state query operation.
179 */
105struct GNUNET_PSYC_StateQuery 180struct GNUNET_PSYC_StateQuery
106{ 181{
107 182
108}; 183};
109 184
110 185
111/** 186/**
112 * Function to call with the decision made for a join request. 187 * Try again to connect to the PSYCstore service.
113 * 188 *
114 * Must be called once and only once in response to an invocation of the 189 * @param cls handle to the PSYCstore service.
115 * #GNUNET_PSYC_JoinCallback. 190 * @param tc scheduler context
191 */
192static void
193reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
194
195
196/**
197 * Reschedule a connect attempt to the service.
116 * 198 *
117 * @param jh Join request handle. 199 * @param h transport service to reconnect
118 * @param is_admitted #GNUNET_YES if joining is approved,
119 * #GNUNET_NO if it is disapproved.
120 * @param relay_count Number of relays given.
121 * @param relays Array of suggested peers that might be useful relays to use
122 * when joining the multicast group (essentially a list of peers that
123 * are already part of the multicast group and might thus be willing
124 * to help with routing). If empty, only this local peer (which must
125 * be the multicast origin) is a good candidate for building the
126 * multicast tree. Note that it is unnecessary to specify our own
127 * peer identity in this array.
128 * @param method_name Method name for the message transmitted with the response.
129 * @param env Environment containing transient variables for the message, or NULL.
130 * @param data Data of the message.
131 * @param data_size Size of @a data.
132 */ 200 */
133void 201static void
134GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, 202reschedule_connect (struct GNUNET_PSYC_Channel *c)
135 int is_admitted,
136 unsigned int relay_count,
137 const struct GNUNET_PeerIdentity *relays,
138 const char *method_name,
139 const struct GNUNET_ENV_Environment *env,
140 const void *data,
141 size_t data_size)
142{ 203{
204 GNUNET_assert (c->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
205
206 if (NULL != c->th)
207 {
208 GNUNET_CLIENT_notify_transmit_ready_cancel (c->th);
209 c->th = NULL;
210 }
211 if (NULL != c->client)
212 {
213 GNUNET_CLIENT_disconnect (c->client);
214 c->client = NULL;
215 }
216 c->in_receive = GNUNET_NO;
217 LOG (GNUNET_ERROR_TYPE_DEBUG,
218 "Scheduling task to reconnect to PSYCstore service in %s.\n",
219 GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES));
220 c->reconnect_task =
221 GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c);
222 c->reconnect_delay = GNUNET_TIME_STD_BACKOFF (c->reconnect_delay);
223}
224
225
226/**
227 * Schedule transmission of the next message from our queue.
228 *
229 * @param h PSYCstore handle
230 */
231static void
232transmit_next (struct GNUNET_PSYC_Channel *c);
233
234
235/**
236 * Type of a function to call when we receive a message
237 * from the service.
238 *
239 * @param cls closure
240 * @param msg message received, NULL on timeout or fatal error
241 */
242static void
243message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
244{
245 struct GNUNET_PSYC_Channel *ch = cls;
246 struct GNUNET_PSYC_Master *mst = cls;
247 struct GNUNET_PSYC_Slave *slv = cls;
248
249 if (NULL == msg)
250 {
251 reschedule_connect (ch);
252 return;
253 }
254 uint16_t size_eq = 0;
255 uint16_t size_min = 0;
256 const uint16_t size = ntohs (msg->size);
257 const uint16_t type = ntohs (msg->type);
258
259 LOG (GNUNET_ERROR_TYPE_DEBUG,
260 "Received message of type %d from PSYC service\n", type);
261
262 switch (type)
263 {
264 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
265 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
266 size_eq = sizeof (struct CountersResult);
267 break;
268 }
269
270 if (! ((0 < size_eq && size == size_eq)
271 || (0 < size_min && size >= size_min)))
272 {
273 GNUNET_break (0);
274 reschedule_connect (ch);
275 return;
276 }
277
278 struct CountersResult *cres;
279
280 switch (type)
281 {
282 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
283 cres = (struct CountersResult *) msg;
284 mst->max_message_id = GNUNET_ntohll (cres->max_message_id);
285 if (NULL != mst->start_cb)
286 mst->start_cb (ch->cb_cls, mst->max_message_id);
287 break;
288
289 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
290 cres = (struct CountersResult *) msg;
291#if TODO
292 slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
293 if (NULL != slv->join_ack_cb)
294 mst->join_ack_cb (ch->cb_cls, mst->max_message_id);
295#endif
296 break;
297 }
298
299 GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
300 GNUNET_TIME_UNIT_FOREVER_REL);
301}
143 302
303
304/**
305 * Transmit next message to service.
306 *
307 * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
308 * @param size Number of bytes available in buf.
309 * @param buf Where to copy the message.
310 * @return Number of bytes copied to buf.
311 */
312static size_t
313send_next_message (void *cls, size_t size, void *buf)
314{
315 struct GNUNET_PSYC_Channel *ch = cls;
316 struct OperationHandle *op = ch->transmit_head;
317 size_t ret;
318
319 ch->th = NULL;
320 if (NULL == op->msg)
321 return 0;
322 ret = ntohs (op->msg->size);
323 if (ret > size)
324 {
325 reschedule_connect (ch);
326 return 0;
327 }
328 LOG (GNUNET_ERROR_TYPE_DEBUG,
329 "Sending message of type %d to PSYCstore service\n",
330 ntohs (op->msg->type));
331 memcpy (buf, op->msg, ret);
332
333 GNUNET_CONTAINER_DLL_remove (ch->transmit_head, ch->transmit_tail, op);
334 GNUNET_free (op);
335
336 if (NULL != ch->transmit_head)
337 transmit_next (ch);
338
339 if (GNUNET_NO == ch->in_receive)
340 {
341 ch->in_receive = GNUNET_YES;
342 GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
343 GNUNET_TIME_UNIT_FOREVER_REL);
344 }
345 return ret;
346}
347
348
349/**
350 * Schedule transmission of the next message from our queue.
351 *
352 * @param h PSYCstore handle.
353 */
354static void
355transmit_next (struct GNUNET_PSYC_Channel *ch)
356{
357 if (NULL != ch->th || NULL == ch->client)
358 return;
359
360 struct OperationHandle *op = ch->transmit_head;
361 if (NULL == op)
362 return;
363
364 ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client,
365 ntohs (op->msg->size),
366 GNUNET_TIME_UNIT_FOREVER_REL,
367 GNUNET_NO,
368 &send_next_message,
369 ch);
370}
371
372
373/**
374 * Try again to connect to the PSYC service.
375 *
376 * @param cls Channel handle.
377 * @param tc Scheduler context.
378 */
379static void
380reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
381{
382 struct GNUNET_PSYC_Channel *ch = cls;
383
384 ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
385 LOG (GNUNET_ERROR_TYPE_DEBUG,
386 "Connecting to PSYC service.\n");
387 GNUNET_assert (NULL == ch->client);
388 ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
389 GNUNET_assert (NULL != ch->client);
390
391 if (NULL == ch->transmit_head ||
392 ch->transmit_head->msg->type != ch->reconnect_msg->type)
393 {
394 struct OperationHandle *op
395 = GNUNET_malloc (sizeof (struct OperationHandle)
396 + ntohs (ch->reconnect_msg->size));
397 memcpy (&op[1], ch->reconnect_msg, ntohs (ch->reconnect_msg->size));
398 op->msg = (struct GNUNET_MessageHeader *) &op[1];
399 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
400 }
401
402 transmit_next (ch);
403}
404
405
406/**
407 * Disconnect from the PSYC service.
408 *
409 * @param cls Channel handle.
410 * @param tc Scheduler context.
411 */
412static void
413disconnect (void *c)
414{
415 struct GNUNET_PSYC_Channel *ch = c;
416 GNUNET_assert (NULL != ch);
417 GNUNET_assert (ch->transmit_head == ch->transmit_tail);
418 if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
419 {
420 GNUNET_SCHEDULER_cancel (ch->reconnect_task);
421 ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
422 }
423 if (NULL != ch->th)
424 {
425 GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
426 ch->th = NULL;
427 }
428 if (NULL != ch->client)
429 {
430 GNUNET_CLIENT_disconnect (ch->client);
431 ch->client = NULL;
432 }
433 if (NULL != ch->reconnect_msg)
434 ch->reconnect_msg = NULL;
144} 435}
145 436
146 437
@@ -177,57 +468,172 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
177 enum GNUNET_PSYC_Policy policy, 468 enum GNUNET_PSYC_Policy policy,
178 GNUNET_PSYC_Method method, 469 GNUNET_PSYC_Method method,
179 GNUNET_PSYC_JoinCallback join_cb, 470 GNUNET_PSYC_JoinCallback join_cb,
471 GNUNET_PSYC_MasterStartCallback master_started_cb,
180 void *cls) 472 void *cls)
181{ 473{
182 return NULL; 474 struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst));
475 struct GNUNET_PSYC_Channel *ch = &mst->ch;
476 struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
477
478 req->header.size = htons (sizeof (*req) + sizeof (*channel_key));
479 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
480 req->channel_key = *channel_key;
481 req->policy = policy;
482
483 ch->cfg = cfg;
484 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
485 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
486 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
487
488 ch->method_cb = method;
489 ch->join_cb = join_cb;
490 ch->cb_cls = cls;
491 mst->start_cb = master_started_cb;
492
493 return mst;
494}
495
496
497/**
498 * Stop a PSYC master channel.
499 *
500 * @param master PSYC channel master to stop.
501 */
502void
503GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst)
504{
505 disconnect (mst);
506 GNUNET_free (mst);
507}
508
509
510/**
511 * Function to call with the decision made for a join request.
512 *
513 * Must be called once and only once in response to an invocation of the
514 * #GNUNET_PSYC_JoinCallback.
515 *
516 * @param jh Join request handle.
517 * @param is_admitted #GNUNET_YES if joining is approved,
518 * #GNUNET_NO if it is disapproved.
519 * @param relay_count Number of relays given.
520 * @param relays Array of suggested peers that might be useful relays to use
521 * when joining the multicast group (essentially a list of peers that
522 * are already part of the multicast group and might thus be willing
523 * to help with routing). If empty, only this local peer (which must
524 * be the multicast origin) is a good candidate for building the
525 * multicast tree. Note that it is unnecessary to specify our own
526 * peer identity in this array.
527 * @param method_name Method name for the message transmitted with the response.
528 * @param env Environment containing transient variables for the message, or NULL.
529 * @param data Data of the message.
530 * @param data_size Size of @a data.
531 */
532void
533GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
534 int is_admitted,
535 unsigned int relay_count,
536 const struct GNUNET_PeerIdentity *relays,
537 const char *method_name,
538 const struct GNUNET_ENV_Environment *env,
539 const void *data,
540 size_t data_size)
541{
542
543}
544
545
546/* FIXME: split up value into <64K chunks and transmit the continuations in
547 * MOD_CONT msgs */
548int
549send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
550{
551 struct GNUNET_PSYC_Channel *ch = cls;
552 size_t name_size = strlen (mod->name) + 1;
553 struct GNUNET_PSYC_MessageModifier *pmod;
554 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmod)
555 + name_size + mod->value_size);
556 pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
557 op->msg = (struct GNUNET_MessageHeader *) pmod;
558
559 pmod->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER;
560 pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size);
561 pmod->name_size = htons (name_size);
562 memcpy (&pmod[1], mod->name, name_size);
563 memcpy ((void *) &pmod[1] + name_size, mod->value, mod->value_size);
564
565 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
566 return GNUNET_YES;
183} 567}
184 568
185 569
186/** 570/**
187 * Send a message to call a method to all members in the PSYC channel. 571 * Send a message to call a method to all members in the PSYC channel.
188 * 572 *
189 * @param master Handle to the PSYC channel. 573 * @param mst Handle to the PSYC channel.
190 * @param method_name Which method should be invoked. 574 * @param method_name Which method should be invoked.
191 * @param env Environment containing state operations and transient variables 575 * @param env Environment containing state operations and transient variables
192 * for the message, or NULL. 576 * for the message, or NULL.
193 * @param notify Function to call to obtain the arguments. 577 * @param notify Function to call to obtain the arguments.
194 * @param notify_cls Closure for @a notify. 578 * @param notify_cls Closure for @a notify.
195 * @param flags Flags for the message being transmitted. 579 * @param flags Flags for the message being transmitted.
196 * @return Transmission handle, NULL on error (i.e. more than one request queued). 580 * @return Transmission handle, NULL on error (i.e. more than one request
581 * queued).
197 */ 582 */
198struct GNUNET_PSYC_MasterTransmitHandle * 583struct GNUNET_PSYC_MasterTransmitHandle *
199GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, 584GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst,
200 const char *method_name, 585 const char *method_name,
201 const struct GNUNET_ENV_Environment *env, 586 const struct GNUNET_ENV_Environment *env,
202 GNUNET_PSYC_MasterTransmitNotify notify, 587 GNUNET_PSYC_MasterTransmitNotify notify,
203 void *notify_cls, 588 void *notify_cls,
204 enum GNUNET_PSYC_MasterTransmitFlags flags) 589 enum GNUNET_PSYC_MasterTransmitFlags flags)
205{ 590{
206 return NULL; 591 GNUNET_assert (NULL != mst);
592 struct GNUNET_PSYC_Channel *ch = &mst->ch;
593 if (GNUNET_NO != ch->in_transmit)
594 return NULL;
595 ch->in_transmit = GNUNET_YES;
596
597 struct GNUNET_PSYC_MessageMethod *pmeth;
598 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth));
599 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1];
600 op->msg = (struct GNUNET_MessageHeader *) pmeth;
601
602 pmeth->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD;
603 size_t size = strlen (method_name) + 1;
604 pmeth->header.size = htons (sizeof (*pmeth) + size);
605 pmeth->flags = htonl (flags);
606 pmeth->mod_count
607 = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
608 memcpy (&pmeth[1], method_name, size);
609
610 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
611
612 GNUNET_ENV_environment_iterate (env, send_modifier, mst);
613
614 struct GNUNET_PSYC_MasterTransmitHandle *th = GNUNET_malloc (sizeof (*th));
615 th->master = mst;
616 th->env = env;
617 th->notify = notify;
618 th->notify_cls = notify_cls;
619 return th;
207} 620}
208 621
209 622
210/** 623/**
211 * Abort transmission request to channel. 624 * Abort transmission request to the channel.
212 * 625 *
213 * @param th Handle of the request that is being aborted. 626 * @param th Handle of the request that is being aborted.
214 */ 627 */
215void 628void
216GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) 629GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
217{ 630{
631 struct GNUNET_PSYC_Master *mst = th->master;
632 struct GNUNET_PSYC_Channel *ch = &mst->ch;
633 if (GNUNET_NO != ch->in_transmit)
634 return;
218 635
219} 636
220
221
222/**
223 * Stop a PSYC master channel.
224 *
225 * @param master PSYC channel master to stop.
226 */
227void
228GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
229{
230
231} 637}
232 638
233 639
@@ -235,7 +641,7 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
235 * Join a PSYC channel. 641 * Join a PSYC channel.
236 * 642 *
237 * The entity joining is always the local peer. The user must immediately use 643 * The entity joining is always the local peer. The user must immediately use
238 * the GNUNET_PSYC_slave_to_master() functions to transmit a @e join_msg to the 644 * the GNUNET_PSYC_slave_transmit() functions to transmit a @e join_msg to the
239 * channel; if the join request succeeds, the channel state (and @e recent 645 * channel; if the join request succeeds, the channel state (and @e recent
240 * method calls) will be replayed to the joining member. There is no explicit 646 * method calls) will be replayed to the joining member. There is no explicit
241 * notification on failure (as the channel may simply take days to approve, 647 * notification on failure (as the channel may simply take days to approve,
@@ -269,13 +675,32 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
269 const struct GNUNET_PeerIdentity *relays, 675 const struct GNUNET_PeerIdentity *relays,
270 GNUNET_PSYC_Method method, 676 GNUNET_PSYC_Method method,
271 GNUNET_PSYC_JoinCallback join_cb, 677 GNUNET_PSYC_JoinCallback join_cb,
678 GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
272 void *cls, 679 void *cls,
273 const char *method_name, 680 const char *method_name,
274 const struct GNUNET_ENV_Environment *env, 681 const struct GNUNET_ENV_Environment *env,
275 const void *data, 682 const void *data,
276 size_t data_size) 683 size_t data_size)
277{ 684{
278 return NULL; 685 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
686 struct GNUNET_PSYC_Channel *ch = &slv->ch;
687 struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req));
688
689 req->header.size = htons (sizeof (*req)
690 + sizeof (*channel_key) + sizeof (*slave_key)
691 + relay_count * sizeof (*relays));
692 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
693 req->channel_key = *channel_key;
694 req->slave_key = *slave_key;
695 req->relay_count = relay_count;
696 memcpy (&req[1], relays, relay_count * sizeof (*relays));
697
698 ch->cfg = cfg;
699 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
700 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
701 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
702
703 return slv;
279} 704}
280 705
281 706
@@ -283,14 +708,15 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
283 * Part a PSYC channel. 708 * Part a PSYC channel.
284 * 709 *
285 * Will terminate the connection to the PSYC service. Polite clients should 710 * Will terminate the connection to the PSYC service. Polite clients should
286 * first explicitly send a @e part request (via GNUNET_PSYC_slave_to_master()). 711 * first explicitly send a @e part request (via GNUNET_PSYC_slave_transmit()).
287 * 712 *
288 * @param slave Slave handle. 713 * @param slv Slave handle.
289 */ 714 */
290void 715void
291GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) 716GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv)
292{ 717{
293 718 disconnect (slv);
719 GNUNET_free (slv);
294} 720}
295 721
296 722
@@ -299,11 +725,13 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
299 * 725 *
300 * @param slave Slave handle. 726 * @param slave Slave handle.
301 * @param method_name Which (PSYC) method should be invoked (on host). 727 * @param method_name Which (PSYC) method should be invoked (on host).
302 * @param env Environment containing transient variables for the message, or NULL. 728 * @param env Environment containing transient variables for the message, or
729 * NULL.
303 * @param notify Function to call when we are allowed to transmit (to get data). 730 * @param notify Function to call when we are allowed to transmit (to get data).
304 * @param notify_cls Closure for @a notify. 731 * @param notify_cls Closure for @a notify.
305 * @param flags Flags for the message being transmitted. 732 * @param flags Flags for the message being transmitted.
306 * @return Transmission handle, NULL on error (i.e. more than one request queued). 733 * @return Transmission handle, NULL on error (i.e. more than one request
734 * queued).
307 */ 735 */
308struct GNUNET_PSYC_SlaveTransmitHandle * 736struct GNUNET_PSYC_SlaveTransmitHandle *
309GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, 737GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
@@ -330,7 +758,8 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
330 758
331 759
332/** 760/**
333 * Convert a channel @a master to a @e channel handle to access the @e channel APIs. 761 * Convert a channel @a master to a @e channel handle to access the @e channel
762 * APIs.
334 * 763 *
335 * @param master Channel master handle. 764 * @param master Channel master handle.
336 * @return Channel handle, valid for as long as @a master is valid. 765 * @return Channel handle, valid for as long as @a master is valid.
@@ -338,7 +767,7 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
338struct GNUNET_PSYC_Channel * 767struct GNUNET_PSYC_Channel *
339GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) 768GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
340{ 769{
341 return NULL; 770 return (struct GNUNET_PSYC_Channel *) master;
342} 771}
343 772
344 773
@@ -351,7 +780,7 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
351struct GNUNET_PSYC_Channel * 780struct GNUNET_PSYC_Channel *
352GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) 781GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
353{ 782{
354 return NULL; 783 return (struct GNUNET_PSYC_Channel *) slave;
355} 784}
356 785
357 786
@@ -371,18 +800,30 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
371 * correctly; not doing so correctly will result in either denying other slaves 800 * correctly; not doing so correctly will result in either denying other slaves
372 * access or offering access to channel data to non-members. 801 * access or offering access to channel data to non-members.
373 * 802 *
374 * @param channel Channel handle. 803 * @param ch Channel handle.
375 * @param slave_key Identity of channel slave to add. 804 * @param slave_key Identity of channel slave to add.
376 * @param announced_at ID of the message that announced the membership change. 805 * @param announced_at ID of the message that announced the membership change.
377 * @param effective_since Addition of slave is in effect since this message ID. 806 * @param effective_since Addition of slave is in effect since this message ID.
378 */ 807 */
379void 808void
380GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, 809GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *ch,
381 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, 810 const struct GNUNET_CRYPTO_EccPublicSignKey
811 *slave_key,
382 uint64_t announced_at, 812 uint64_t announced_at,
383 uint64_t effective_since) 813 uint64_t effective_since)
384{ 814{
385 815 struct ChannelSlaveAdd *slvadd;
816 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvadd));
817 slvadd = (struct ChannelSlaveAdd *) &op[1];
818 op->msg = (struct GNUNET_MessageHeader *) slvadd;
819
820 slvadd->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD;
821 slvadd->header.size = htons (sizeof (*slvadd));
822 slvadd->announced_at = GNUNET_htonll (announced_at);
823 slvadd->effective_since = GNUNET_htonll (effective_since);
824
825 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
826 transmit_next (ch);
386} 827}
387 828
388 829
@@ -403,16 +844,27 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
403 * denying members access or offering access to channel data to 844 * denying members access or offering access to channel data to
404 * non-members. 845 * non-members.
405 * 846 *
406 * @param channel Channel handle. 847 * @param ch Channel handle.
407 * @param slave_key Identity of channel slave to remove. 848 * @param slave_key Identity of channel slave to remove.
408 * @param announced_at ID of the message that announced the membership change. 849 * @param announced_at ID of the message that announced the membership change.
409 */ 850 */
410void 851void
411GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, 852GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *ch,
412 const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, 853 const struct GNUNET_CRYPTO_EccPublicSignKey
854 *slave_key,
413 uint64_t announced_at) 855 uint64_t announced_at)
414{ 856{
857 struct ChannelSlaveRemove *slvrm;
858 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvrm));
859 slvrm = (struct ChannelSlaveRemove *) &op[1];
860 op->msg = (struct GNUNET_MessageHeader *) slvrm;
861
862 slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM;
863 slvrm->header.size = htons (sizeof (*slvrm));
864 slvrm->announced_at = GNUNET_htonll (announced_at);
415 865
866 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
867 transmit_next (ch);
416} 868}
417 869
418 870
@@ -424,7 +876,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
424 * 876 *
425 * To get the latest message, use 0 for both the start and end message ID. 877 * To get the latest message, use 0 for both the start and end message ID.
426 * 878 *
427 * @param channel Which channel should be replayed? 879 * @param ch Which channel should be replayed?
428 * @param start_message_id Earliest interesting point in history. 880 * @param start_message_id Earliest interesting point in history.
429 * @param end_message_id Last (exclusive) interesting point in history. 881 * @param end_message_id Last (exclusive) interesting point in history.
430 * @param method Function to invoke on messages received from the story. 882 * @param method Function to invoke on messages received from the story.
@@ -441,7 +893,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
441 * @return Handle to cancel story telling operation. 893 * @return Handle to cancel story telling operation.
442 */ 894 */
443struct GNUNET_PSYC_Story * 895struct GNUNET_PSYC_Story *
444GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, 896GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *ch,
445 uint64_t start_message_id, 897 uint64_t start_message_id,
446 uint64_t end_message_id, 898 uint64_t end_message_id,
447 GNUNET_PSYC_Method method, 899 GNUNET_PSYC_Method method,
@@ -495,9 +947,9 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
495/** 947/**
496 * Return all channel state variables whose name matches a given prefix. 948 * Return all channel state variables whose name matches a given prefix.
497 * 949 *
498 * A name matches if it starts with the given @a name_prefix, thus requesting the 950 * A name matches if it starts with the given @a name_prefix, thus requesting
499 * empty prefix ("") will match all values; requesting "_a_b" will also return 951 * the empty prefix ("") will match all values; requesting "_a_b" will also
500 * values stored under "_a_b_c". 952 * return values stored under "_a_b_c".
501 * 953 *
502 * The @a state_cb is invoked on all matching state variables asynchronously, as 954 * The @a state_cb is invoked on all matching state variables asynchronously, as
503 * the state is stored in and retrieved from the PSYCstore, 955 * the state is stored in and retrieved from the PSYCstore,
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index b37b3ceb1..84c9840f5 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -28,12 +28,12 @@
28#include "platform.h" 28#include "platform.h"
29#include "gnunet_common.h" 29#include "gnunet_common.h"
30#include "gnunet_util_lib.h" 30#include "gnunet_util_lib.h"
31#include "gnunet_psycstore_service.h"
32#include "gnunet_testing_lib.h" 31#include "gnunet_testing_lib.h"
32#include "gnunet_psyc_service.h"
33 33
34#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) 34#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
35 35
36#define DEBUG_SERVICE 0 36#define DEBUG_SERVICE 1
37 37
38 38
39/** 39/**
@@ -41,11 +41,22 @@
41 */ 41 */
42static int res; 42static int res;
43 43
44static const struct GNUNET_CONFIGURATION_Handle *cfg;
45
44/** 46/**
45 * Handle for task for timeout termination. 47 * Handle for task for timeout termination.
46 */ 48 */
47static GNUNET_SCHEDULER_TaskIdentifier end_badly_task; 49static GNUNET_SCHEDULER_TaskIdentifier end_badly_task;
48 50
51static struct GNUNET_PSYC_Master *mst;
52static struct GNUNET_PSYC_Slave *slv;
53static struct GNUNET_PSYC_Channel *ch;
54
55static struct GNUNET_CRYPTO_EccPrivateKey *channel_key;
56static struct GNUNET_CRYPTO_EccPrivateKey *slave_key;
57
58static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key;
59static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key;
49 60
50/** 61/**
51 * Clean up all resources used. 62 * Clean up all resources used.
@@ -53,6 +64,11 @@ static GNUNET_SCHEDULER_TaskIdentifier end_badly_task;
53static void 64static void
54cleanup () 65cleanup ()
55{ 66{
67 if (master != NULL)
68 {
69 GNUNET_PSYC_master_stop (master);
70 master = NULL;
71 }
56 GNUNET_SCHEDULER_shutdown (); 72 GNUNET_SCHEDULER_shutdown ();
57} 73}
58 74
@@ -100,6 +116,42 @@ end ()
100 &end_normally, NULL); 116 &end_normally, NULL);
101} 117}
102 118
119
120static int
121method (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
122 uint64_t message_id, const char *method_name,
123 size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers,
124 uint64_t data_offset, const void *data, size_t data_size,
125 enum GNUNET_PSYC_MessageFlags flags)
126{
127 return GNUNET_OK;
128}
129
130
131static int
132join (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
133 const char *method_name,
134 size_t variable_count, const struct GNUNET_ENV_Modifier *variables,
135 const void *data, size_t data_size, struct GNUNET_PSYC_JoinHandle *jh)
136{
137 return GNUNET_OK;
138}
139
140
141void
142master_started (void *cls, uint64_t max_message_id)
143{
144 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", max_message_id);
145}
146
147
148void
149slave_joined (void *cls, uint64_t max_message_id)
150{
151 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id);
152}
153
154
103/** 155/**
104 * Main function of the test, run from scheduler. 156 * Main function of the test, run from scheduler.
105 * 157 *
@@ -110,14 +162,28 @@ end ()
110static void 162static void
111#if DEBUG_SERVICE 163#if DEBUG_SERVICE
112run (void *cls, char *const *args, const char *cfgfile, 164run (void *cls, char *const *args, const char *cfgfile,
113 const struct GNUNET_CONFIGURATION_Handle *cfg) 165 const struct GNUNET_CONFIGURATION_Handle *c)
114#else 166#else
115run (void *cls, 167run (void *cls,
116 const struct GNUNET_CONFIGURATION_Handle *cfg, 168 const struct GNUNET_CONFIGURATION_Handle *c,
117 struct GNUNET_TESTING_Peer *peer) 169 struct GNUNET_TESTING_Peer *peer)
118#endif 170#endif
119{ 171{
172 cfg = c;
120 end_badly_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly, NULL); 173 end_badly_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly, NULL);
174
175 channel_key = GNUNET_CRYPTO_ecc_key_create ();
176 slave_key = GNUNET_CRYPTO_ecc_key_create ();
177
178 GNUNET_CRYPTO_ecc_key_get_public_for_signature (channel_key, &channel_pub_key);
179 GNUNET_CRYPTO_ecc_key_get_public_for_signature (slave_key, &slave_pub_key);
180
181 mst = GNUNET_PSYC_master_start (cfg, channel_key,
182 GNUNET_PSYC_CHANNEL_PRIVATE,
183 &method, &join, &master_started, NULL);
184
185 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key,
186 &method, &join, &slave_joined, NULL);
121} 187}
122 188
123 189