aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/gnunet_multicast_service.h26
-rw-r--r--src/include/gnunet_protocols.h23
-rw-r--r--src/multicast/Makefile.am6
-rw-r--r--src/multicast/gnunet-service-multicast.c261
-rw-r--r--src/multicast/multicast.h31
-rw-r--r--src/multicast/multicast_api.c785
-rw-r--r--src/psyc/Makefile.am4
-rw-r--r--src/psyc/gnunet-service-psyc.c58
-rw-r--r--src/psyc/psyc_api.c210
-rw-r--r--src/psyc/test_psyc.conf17
10 files changed, 1005 insertions, 416 deletions
diff --git a/src/include/gnunet_multicast_service.h b/src/include/gnunet_multicast_service.h
index 50dcf7aef..f1b8bc7e5 100644
--- a/src/include/gnunet_multicast_service.h
+++ b/src/include/gnunet_multicast_service.h
@@ -598,7 +598,7 @@ typedef int
598 * Handle for a request to send a message to all multicast group members 598 * Handle for a request to send a message to all multicast group members
599 * (from the origin). 599 * (from the origin).
600 */ 600 */
601struct GNUNET_MULTICAST_OriginMessageHandle; 601struct GNUNET_MULTICAST_OriginTransmitHandle;
602 602
603 603
604/** 604/**
@@ -612,7 +612,7 @@ struct GNUNET_MULTICAST_OriginMessageHandle;
612 * @param notify_cls Closure for @a notify. 612 * @param notify_cls Closure for @a notify.
613 * @return NULL on error (i.e. request already pending). 613 * @return NULL on error (i.e. request already pending).
614 */ 614 */
615struct GNUNET_MULTICAST_OriginMessageHandle * 615struct GNUNET_MULTICAST_OriginTransmitHandle *
616GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin, 616GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin,
617 uint64_t message_id, 617 uint64_t message_id,
618 uint64_t group_generation, 618 uint64_t group_generation,
@@ -624,19 +624,19 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin,
624/** 624/**
625 * Resume message transmission to multicast group. 625 * Resume message transmission to multicast group.
626 * 626 *
627 * @param mh Request to cancel. 627 * @param th Transmission to cancel.
628 */ 628 */
629void 629void
630GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginMessageHandle *mh); 630GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th);
631 631
632 632
633/** 633/**
634 * Cancel request for message transmission to multicast group. 634 * Cancel request for message transmission to multicast group.
635 * 635 *
636 * @param mh Request to cancel. 636 * @param th Transmission to cancel.
637 */ 637 */
638void 638void
639GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginMessageHandle *mh); 639GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th);
640 640
641 641
642/** 642/**
@@ -788,6 +788,7 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *member);
788 * @a data, should be set to the number of bytes written to data. 788 * @a data, should be set to the number of bytes written to data.
789 * @param[out] data Where to write the body of the message to give to the 789 * @param[out] data Where to write the body of the message to give to the
790 * method. The function must copy at most @a data_size bytes to @a data. 790 * method. The function must copy at most @a data_size bytes to @a data.
791 *
791 * @return #GNUNET_SYSERR on error (fatal, aborts transmission) 792 * @return #GNUNET_SYSERR on error (fatal, aborts transmission)
792 * #GNUNET_NO on success, if more data is to be transmitted later. 793 * #GNUNET_NO on success, if more data is to be transmitted later.
793 * Should be used if @a data_size was not big enough to take all the 794 * Should be used if @a data_size was not big enough to take all the
@@ -804,7 +805,7 @@ typedef int
804/** 805/**
805 * Handle for a message to be delivered from a member to the origin. 806 * Handle for a message to be delivered from a member to the origin.
806 */ 807 */
807struct GNUNET_MULTICAST_MemberRequestHandle; 808struct GNUNET_MULTICAST_MemberTransmitHandle;
808 809
809 810
810/** 811/**
@@ -814,9 +815,10 @@ struct GNUNET_MULTICAST_MemberRequestHandle;
814 * @param request_id Application layer ID for the request. Opaque to multicast. 815 * @param request_id Application layer ID for the request. Opaque to multicast.
815 * @param notify Callback to call to get the message. 816 * @param notify Callback to call to get the message.
816 * @param notify_cls Closure for @a notify. 817 * @param notify_cls Closure for @a notify.
818 *
817 * @return Handle to cancel request, NULL on error (i.e. request already pending). 819 * @return Handle to cancel request, NULL on error (i.e. request already pending).
818 */ 820 */
819struct GNUNET_MULTICAST_MemberRequestHandle * 821struct GNUNET_MULTICAST_MemberTransmitHandle *
820GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member, 822GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member,
821 uint64_t request_id, 823 uint64_t request_id,
822 GNUNET_MULTICAST_MemberTransmitNotify notify, 824 GNUNET_MULTICAST_MemberTransmitNotify notify,
@@ -826,19 +828,19 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member,
826/** 828/**
827 * Resume message transmission to origin. 829 * Resume message transmission to origin.
828 * 830 *
829 * @param rh Request to cancel. 831 * @param th Transmission to cancel.
830 */ 832 */
831void 833void
832GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberRequestHandle *rh); 834GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th);
833 835
834 836
835/** 837/**
836 * Cancel request for message transmission to origin. 838 * Cancel request for message transmission to origin.
837 * 839 *
838 * @param rh Request to cancel. 840 * @param th Transmission to cancel.
839 */ 841 */
840void 842void
841GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberRequestHandle *rh); 843GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th);
842 844
843 845
844#if 0 /* keep Emacsens' auto-indent happy */ 846#if 0 /* keep Emacsens' auto-indent happy */
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 9b1833b81..fb808b08d 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2326,48 +2326,51 @@ extern "C"
2326 * MULTICAST message types 2326 * MULTICAST message types
2327 ******************************************************************************/ 2327 ******************************************************************************/
2328 2328
2329 2329/**
2330/* WIP: no numbers assigned yet */ 2330 * C: client
2331 * S: service
2332 * T: cadet
2333 */
2331 2334
2332/** 2335/**
2333 * Start an origin. 2336 * C->S: Start the origin.
2334 */ 2337 */
2335#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START 750 2338#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START 750
2336 2339
2337/** 2340/**
2338 * Stop an origin. 2341 * C->S: Stop the origin.
2339 */ 2342 */
2340#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP 751 2343#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP 751
2341 2344
2342/** 2345/**
2343 * Join a group as a member. 2346 * C->S: Join group as a member.
2344 */ 2347 */
2345#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 752 2348#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 752
2346 2349
2347/** 2350/**
2348 * Leave a group. 2351 * C->S: Part the group.
2349 */ 2352 */
2350#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART 753 2353#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART 753
2351 2354
2352/** 2355/**
2353 * Multicast message from the origin to all members. 2356 * C<->S<->T: Multicast message from the origin to all members.
2354 */ 2357 */
2355#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 754 2358#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 754
2356 2359
2357/** 2360/**
2358 * A unicast message from a group member to the origin. 2361 * C<->S<->T: Unicast request from a group member to the origin.
2359 */ 2362 */
2360#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 755 2363#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 755
2361 2364
2362/** 2365/**
2363 * A peer wants to join the group. 2366 * C<--S<->T: A peer wants to join the group.
2364 * 2367 *
2365 * Unicast message to the origin or another group member. 2368 * Unicast message to the origin or another group member.
2366 */ 2369 */
2367#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST 2370#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST
2368 2371
2369/** 2372/**
2370 * Response to a join request. 2373 * C<->S<->T: Response to a join request.
2371 * 2374 *
2372 * Unicast message from a group member to the peer wanting to join. 2375 * Unicast message from a group member to the peer wanting to join.
2373 */ 2376 */
diff --git a/src/multicast/Makefile.am b/src/multicast/Makefile.am
index f8d49034e..752736c49 100644
--- a/src/multicast/Makefile.am
+++ b/src/multicast/Makefile.am
@@ -39,12 +39,18 @@ gnunet_multicast_SOURCES = \
39gnunet_multicast_LDADD = \ 39gnunet_multicast_LDADD = \
40 $(top_builddir)/src/util/libgnunetutil.la \ 40 $(top_builddir)/src/util/libgnunetutil.la \
41 $(GN_LIBINTL) 41 $(GN_LIBINTL)
42gnunet_multicast_DEPENDENCIES = \
43 $(top_builddir)/src/util/libgnunetutil.la
42 44
43gnunet_service_multicast_SOURCES = \ 45gnunet_service_multicast_SOURCES = \
44 gnunet-service-multicast.c 46 gnunet-service-multicast.c
45gnunet_service_multicast_LDADD = \ 47gnunet_service_multicast_LDADD = \
46 $(top_builddir)/src/util/libgnunetutil.la \ 48 $(top_builddir)/src/util/libgnunetutil.la \
49 $(top_builddir)/src/statistics/libgnunetstatistics.la \
47 $(GN_LIBINTL) 50 $(GN_LIBINTL)
51gnunet_service_multicast_DEPENDENCIES = \
52 $(top_builddir)/src/util/libgnunetutil.la \
53 $(top_builddir)/src/statistics/libgnunetstatistics.la
48 54
49 55
50check_PROGRAMS = \ 56check_PROGRAMS = \
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c
index 47c8bce36..0265660e1 100644
--- a/src/multicast/gnunet-service-multicast.c
+++ b/src/multicast/gnunet-service-multicast.c
@@ -25,6 +25,102 @@
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_util_lib.h" 27#include "gnunet_util_lib.h"
28#include "gnunet_signatures.h"
29#include "gnunet_statistics_service.h"
30#include "gnunet_multicast_service.h"
31#include "multicast.h"
32
33/**
34 * Handle to our current configuration.
35 */
36static const struct GNUNET_CONFIGURATION_Handle *cfg;
37
38/**
39 * Handle to the statistics service.
40 */
41static struct GNUNET_STATISTICS_Handle *stats;
42/**
43 * Notification context, simplifies client broadcasts.
44 */
45static struct GNUNET_SERVER_NotificationContext *nc;
46
47/**
48 * All connected origins.
49 * Group's pub_key_hash -> struct Group
50 */
51static struct GNUNET_CONTAINER_MultiHashMap *origins;
52
53/**
54 * All connected members.
55 * Group's pub_key_hash -> struct Group
56 */
57static struct GNUNET_CONTAINER_MultiHashMap *members;
58
59/**
60 * Common part of the client context for both an origin and member.
61 */
62struct Group
63{
64 struct GNUNET_SERVER_Client *client;
65
66 /**
67 * Public key of the group.
68 */
69 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
70
71 /**
72 * Hash of @a pub_key.
73 */
74 struct GNUNET_HashCode pub_key_hash;
75
76 /**
77 * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
78 */
79 uint8_t is_origin;
80
81 /**
82 * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
83 */
84 uint8_t disconnected;
85};
86
87
88/**
89 * Client context for a group's origin.
90 */
91struct Origin
92{
93 struct Group grp;
94
95 /**
96 * Private key of the group.
97 */
98 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
99
100 /**
101 * Last message fragment ID sent to the group.
102 */
103 uint64_t max_fragment_id;
104};
105
106
107/**
108 * Client context for a group member.
109 */
110struct Member
111{
112 struct Group grp;
113
114 /**
115 * Private key of the member.
116 */
117 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
118
119 /**
120 * Last request fragment ID sent to the origin.
121 */
122 uint64_t max_fragment_id;
123};
28 124
29 125
30/** 126/**
@@ -41,13 +137,86 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
41 137
42 138
43/** 139/**
140 * Iterator callback for sending a message to clients.
141 */
142static int
143message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash,
144 void *group)
145{
146 const struct GNUNET_MessageHeader *msg = cls;
147 struct Group *grp = group;
148
149 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
150 "%p Sending message to client.\n", grp);
151
152 GNUNET_SERVER_notification_context_add (nc, grp->client);
153 GNUNET_SERVER_notification_context_unicast (nc, grp->client, msg, GNUNET_NO);
154
155 return GNUNET_YES;
156}
157
158
159/**
160 * Send message to all origin and member clients connected to the group.
161 *
162 * @param grp The group to send @a msg to.
163 * @param msg Message to send.
164 */
165static void
166message_to_group (struct Group *grp, const struct GNUNET_MessageHeader *msg)
167{
168 if (origins != NULL)
169 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
170 message_callback, (void *) msg);
171 if (members != NULL)
172 GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
173 message_callback, (void *) msg);
174}
175
176
177/**
178 * Send message to all origin clients connected to the group.
179 *
180 * @param grp The group to send @a msg to.
181 * @param msg Message to send.
182 */
183static void
184message_to_origin (struct Group *grp, const struct GNUNET_MessageHeader *msg)
185{
186 if (origins != NULL)
187 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
188 message_callback, (void *) msg);
189}
190
191
192/**
44 * Handle a connecting client starting an origin. 193 * Handle a connecting client starting an origin.
45 */ 194 */
46static void 195static void
47handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client, 196handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
48 const struct GNUNET_MessageHeader *msg) 197 const struct GNUNET_MessageHeader *m)
49{ 198{
199 const struct MulticastOriginStartMessage *
200 msg = (const struct MulticastOriginStartMessage *) m;
201
202 struct Origin *orig = GNUNET_new (struct Origin);
203 orig->priv_key = msg->group_key;
50 204
205 struct Group *grp = &orig->grp;
206 grp->is_origin = GNUNET_YES;
207 grp->client = client;
208
209 GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
210 GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash);
211
212 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
213 "%p Client connected as origin to group %s.\n",
214 orig, GNUNET_h2s (&grp->pub_key_hash));
215
216 GNUNET_SERVER_client_set_user_context (client, grp);
217 GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
218 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
219 GNUNET_SERVER_receive_done (client, GNUNET_OK);
51} 220}
52 221
53 222
@@ -58,7 +227,6 @@ static void
58handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client, 227handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client,
59 const struct GNUNET_MessageHeader *msg) 228 const struct GNUNET_MessageHeader *msg)
60{ 229{
61
62} 230}
63 231
64 232
@@ -67,9 +235,28 @@ handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client,
67 */ 235 */
68static void 236static void
69handle_member_join (void *cls, struct GNUNET_SERVER_Client *client, 237handle_member_join (void *cls, struct GNUNET_SERVER_Client *client,
70 const struct GNUNET_MessageHeader *msg) 238 const struct GNUNET_MessageHeader *m)
71{ 239{
72 240 struct MulticastMemberJoinMessage *
241 msg = (struct MulticastMemberJoinMessage *) m;
242
243 struct Member *mem = GNUNET_new (struct Member);
244 mem->priv_key = msg->member_key;
245
246 struct Group *grp = &mem->grp;
247 grp->is_origin = GNUNET_NO;
248 grp->client = client;
249 grp->pub_key = msg->group_key;
250 GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash);
251
252 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
253 "%p Client connected as member to group %s.\n",
254 mem, GNUNET_h2s (&grp->pub_key_hash));
255
256 GNUNET_SERVER_client_set_user_context (client, grp);
257 GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
258 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
259 GNUNET_SERVER_receive_done (client, GNUNET_OK);
73} 260}
74 261
75 262
@@ -89,9 +276,33 @@ handle_member_part (void *cls, struct GNUNET_SERVER_Client *client,
89 */ 276 */
90static void 277static void
91handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client, 278handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
92 const struct GNUNET_MessageHeader *msg) 279 const struct GNUNET_MessageHeader *m)
93{ 280{
94 281 struct Group *
282 grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
283 GNUNET_assert (GNUNET_YES == grp->is_origin);
284 struct Origin *orig = (struct Origin *) grp;
285 struct GNUNET_MULTICAST_MessageHeader *
286 msg = (struct GNUNET_MULTICAST_MessageHeader *) m;
287
288 msg->fragment_id = GNUNET_htonll (orig->max_fragment_id++);
289 msg->purpose.size = htonl (sizeof (*msg) + ntohs (m->size)
290 - sizeof (msg->header)
291 - sizeof (msg->hop_counter)
292 - sizeof (msg->signature));
293 msg->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
294
295 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &msg->purpose,
296 &msg->signature))
297 {
298 /* FIXME: handle error */
299 return;
300 }
301
302 /* FIXME: send to remote members */
303
304 message_to_group (grp, m);
305 GNUNET_SERVER_receive_done (client, GNUNET_OK);
95} 306}
96 307
97 308
@@ -100,9 +311,35 @@ handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
100 */ 311 */
101static void 312static void
102handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, 313handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
103 const struct GNUNET_MessageHeader *msg) 314 const struct GNUNET_MessageHeader *m)
104{ 315{
316 struct Group *
317 grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
318 GNUNET_assert (GNUNET_NO == grp->is_origin);
319 struct Member *mem = (struct Member *) grp;
320
321 struct GNUNET_MULTICAST_RequestHeader *
322 req = (struct GNUNET_MULTICAST_RequestHeader *) m;
323
324 req->fragment_id = GNUNET_ntohll (mem->max_fragment_id++);
325
326 req->purpose.size = htonl (sizeof (*req) + ntohs (m->size)
327 - sizeof (req->header)
328 - sizeof (req->member_key)
329 - sizeof (req->signature));
330 req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
105 331
332 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->priv_key, &req->purpose,
333 &req->signature))
334 {
335 /* FIXME: handle error */
336 return;
337 }
338
339 /* FIXME: send to remote origin */
340
341 message_to_origin (grp, m);
342 GNUNET_SERVER_receive_done (client, GNUNET_OK);
106} 343}
107 344
108/** 345/**
@@ -114,7 +351,7 @@ handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
114 */ 351 */
115static void 352static void
116run (void *cls, struct GNUNET_SERVER_Handle *server, 353run (void *cls, struct GNUNET_SERVER_Handle *server,
117 const struct GNUNET_CONFIGURATION_Handle *cfg) 354 const struct GNUNET_CONFIGURATION_Handle *c)
118{ 355{
119 static const struct GNUNET_SERVER_MessageHandler handlers[] = { 356 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
120 { &handle_origin_start, NULL, 357 { &handle_origin_start, NULL,
@@ -137,7 +374,13 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
137 374
138 {NULL, NULL, 0, 0} 375 {NULL, NULL, 0, 0}
139 }; 376 };
140 /* FIXME: do setup here */ 377
378 cfg = c;
379 stats = GNUNET_STATISTICS_create ("multicast", cfg);
380 origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
381 members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
382 nc = GNUNET_SERVER_notification_context_create (server, 1);
383
141 GNUNET_SERVER_add_handlers (server, handlers); 384 GNUNET_SERVER_add_handlers (server, handlers);
142 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task, 385 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task,
143 NULL); 386 NULL);
diff --git a/src/multicast/multicast.h b/src/multicast/multicast.h
index facf8f54e..daa79e260 100644
--- a/src/multicast/multicast.h
+++ b/src/multicast/multicast.h
@@ -183,9 +183,8 @@ struct MulticastReplayEndMessage
183 */ 183 */
184struct MulticastOriginStartMessage 184struct MulticastOriginStartMessage
185{ 185{
186
187 /** 186 /**
188 * 187 * Type: GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START
189 */ 188 */
190 struct GNUNET_MessageHeader header; 189 struct GNUNET_MessageHeader header;
191 190
@@ -195,16 +194,36 @@ struct MulticastOriginStartMessage
195 uint32_t reserved; 194 uint32_t reserved;
196 195
197 /** 196 /**
198 * Private, non-ephemeral key for the mutlicast group. 197 * Private, non-ephemeral key for the multicast group.
199 */ 198 */
200 struct GNUNET_CRYPTO_EddsaPrivateKey group_key; 199 struct GNUNET_CRYPTO_EddsaPrivateKey group_key;
201 200
202 /** 201 /**
203 * Last fragment ID, used to continue counting fragments if we resume operating 202 * Last fragment ID sent to the group, used to continue counting fragments if
204 * a group. 203 * we resume operating * a group.
204 */
205 uint64_t max_fragment_id;
206
207};
208
209
210struct MulticastMemberJoinMessage
211{
212 /**
213 * Type: GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN
205 */ 214 */
206 uint64_t last_fragment_id; 215 struct GNUNET_MessageHeader header;
216
217 uint32_t relay_count GNUNET_PACKED;
218
219 struct GNUNET_CRYPTO_EddsaPublicKey group_key;
220
221 struct GNUNET_CRYPTO_EddsaPrivateKey member_key;
222
223 struct GNUNET_PeerIdentity origin;
207 224
225 /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */
226 /* Followed by struct GNUNET_MessageHeader join_request */
208}; 227};
209 228
210 229
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index da81de486..d42f438ae 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -26,7 +26,6 @@
26 */ 26 */
27#include "platform.h" 27#include "platform.h"
28#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
29#include "gnunet_signatures.h"
30#include "gnunet_multicast_service.h" 29#include "gnunet_multicast_service.h"
31#include "multicast.h" 30#include "multicast.h"
32 31
@@ -46,11 +45,18 @@ static struct GNUNET_CONTAINER_MultiHashMap *origins;
46static struct GNUNET_CONTAINER_MultiHashMap *members; 45static struct GNUNET_CONTAINER_MultiHashMap *members;
47 46
48 47
48struct MessageQueue
49{
50 struct MessageQueue *prev;
51 struct MessageQueue *next;
52};
53
54
49/** 55/**
50 * Handle for a request to send a message to all multicast group members 56 * Handle for a request to send a message to all multicast group members
51 * (from the origin). 57 * (from the origin).
52 */ 58 */
53struct GNUNET_MULTICAST_OriginMessageHandle 59struct GNUNET_MULTICAST_OriginTransmitHandle
54{ 60{
55 GNUNET_MULTICAST_OriginTransmitNotify notify; 61 GNUNET_MULTICAST_OriginTransmitNotify notify;
56 void *notify_cls; 62 void *notify_cls;
@@ -62,47 +68,104 @@ struct GNUNET_MULTICAST_OriginMessageHandle
62}; 68};
63 69
64 70
65struct GNUNET_MULTICAST_Group 71/**
72 * Handle for a message to be delivered from a member to the origin.
73 */
74struct GNUNET_MULTICAST_MemberTransmitHandle
66{ 75{
67 uint8_t is_origin; 76 GNUNET_MULTICAST_MemberTransmitNotify notify;
77 void *notify_cls;
78 struct GNUNET_MULTICAST_Member *member;
79
80 uint64_t request_id;
81 uint64_t fragment_offset;
68}; 82};
69 83
70/** 84
71 * Handle for the origin of a multicast group. 85struct GNUNET_MULTICAST_Group
72 */
73struct GNUNET_MULTICAST_Origin
74{ 86{
75 struct GNUNET_MULTICAST_Group grp; 87 /**
88 * Configuration to use.
89 */
90 const struct GNUNET_CONFIGURATION_Handle *cfg;
76 91
77 struct GNUNET_MULTICAST_OriginMessageHandle msg_handle; 92 /**
78 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; 93 * Socket (if available).
94 */
95 struct GNUNET_CLIENT_Connection *client;
96
97 /**
98 * Currently pending transmission request, or NULL for none.
99 */
100 struct GNUNET_CLIENT_TransmitHandle *th;
101
102 /**
103 * Head of operations to transmit.
104 */
105 struct MessageQueue *tmit_head;
106
107 /**
108 * Tail of operations to transmit.
109 */
110 struct MessageQueue *tmit_tail;
111
112 /**
113 * Message being transmitted to the Multicast service.
114 */
115 struct MessageQueue *tmit_msg;
116
117 /**
118 * Message to send on reconnect.
119 */
120 struct GNUNET_MessageHeader *reconnect_msg;
121
122 /**
123 * Task doing exponential back-off trying to reconnect.
124 */
125 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
126
127 /**
128 * Time for next connect retry.
129 */
130 struct GNUNET_TIME_Relative reconnect_delay;
131
132 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
133 struct GNUNET_HashCode pub_key_hash;
79 134
80 GNUNET_MULTICAST_JoinCallback join_cb; 135 GNUNET_MULTICAST_JoinCallback join_cb;
81 GNUNET_MULTICAST_MembershipTestCallback mem_test_cb; 136 GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
82 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; 137 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
83 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb; 138 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
84 GNUNET_MULTICAST_RequestCallback request_cb;
85 GNUNET_MULTICAST_MessageCallback message_cb; 139 GNUNET_MULTICAST_MessageCallback message_cb;
86 void *cls; 140 void *cb_cls;
87 141
88 uint64_t next_fragment_id; 142 /**
143 * Are we polling for incoming messages right now?
144 */
145 uint8_t in_receive;
89 146
90 struct GNUNET_CRYPTO_EddsaPublicKey pub_key; 147 /**
91 struct GNUNET_HashCode pub_key_hash; 148 * Are we currently transmitting a message?
149 */
150 uint8_t in_transmit;
151
152 /**
153 * Is this the origin or a member?
154 */
155 uint8_t is_origin;
92}; 156};
93 157
94 158
95/** 159/**
96 * Handle for a message to be delivered from a member to the origin. 160 * Handle for the origin of a multicast group.
97 */ 161 */
98struct GNUNET_MULTICAST_MemberRequestHandle 162struct GNUNET_MULTICAST_Origin
99{ 163{
100 GNUNET_MULTICAST_MemberTransmitNotify notify; 164 struct GNUNET_MULTICAST_Group grp;
101 void *notify_cls; 165 struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
102 struct GNUNET_MULTICAST_Member *member; 166 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
103 167
104 uint64_t request_id; 168 GNUNET_MULTICAST_RequestCallback request_cb;
105 uint64_t fragment_offset;
106}; 169};
107 170
108 171
@@ -112,24 +175,16 @@ struct GNUNET_MULTICAST_MemberRequestHandle
112struct GNUNET_MULTICAST_Member 175struct GNUNET_MULTICAST_Member
113{ 176{
114 struct GNUNET_MULTICAST_Group grp; 177 struct GNUNET_MULTICAST_Group grp;
178 struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
115 179
116 struct GNUNET_MULTICAST_MemberRequestHandle req_handle; 180 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
117
118 struct GNUNET_CRYPTO_EddsaPublicKey group_key;
119 struct GNUNET_CRYPTO_EddsaPrivateKey member_key;
120 struct GNUNET_PeerIdentity origin; 181 struct GNUNET_PeerIdentity origin;
121 struct GNUNET_PeerIdentity relays; 182 struct GNUNET_PeerIdentity relays;
122 uint32_t relay_count; 183 uint32_t relay_count;
184
123 struct GNUNET_MessageHeader *join_request; 185 struct GNUNET_MessageHeader *join_request;
124 GNUNET_MULTICAST_JoinCallback join_cb;
125 GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
126 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
127 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
128 GNUNET_MULTICAST_MessageCallback message_cb;
129 void *cls;
130 186
131 uint64_t next_fragment_id; 187 uint64_t next_fragment_id;
132 struct GNUNET_HashCode group_key_hash;
133}; 188};
134 189
135 190
@@ -168,34 +223,233 @@ struct GNUNET_MULTICAST_MemberReplayHandle
168}; 223};
169 224
170 225
226static void
227reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
228
229
230static void
231reschedule_connect (struct GNUNET_MULTICAST_Group *grp);
232
233
234/**
235 * Schedule transmission of the next message from our queue.
236 *
237 * @param grp PSYC channel handle
238 */
239static void
240transmit_next (struct GNUNET_MULTICAST_Group *grp);
241
242
243static void
244message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
245
246
247/**
248 * Reschedule a connect attempt to the service.
249 *
250 * @param c channel to reconnect
251 */
252static void
253reschedule_connect (struct GNUNET_MULTICAST_Group *grp)
254{
255 GNUNET_assert (grp->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
256
257 if (NULL != grp->th)
258 {
259 GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
260 grp->th = NULL;
261 }
262 if (NULL != grp->client)
263 {
264 GNUNET_CLIENT_disconnect (grp->client);
265 grp->client = NULL;
266 }
267 grp->in_receive = GNUNET_NO;
268 LOG (GNUNET_ERROR_TYPE_DEBUG,
269 "Scheduling task to reconnect to Multicast service in %s.\n",
270 GNUNET_STRINGS_relative_time_to_string (grp->reconnect_delay, GNUNET_YES));
271 grp->reconnect_task =
272 GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &reconnect, grp);
273 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
274}
275
276
277/**
278 * Reset stored data related to the last received message.
279 */
280static void
281recv_reset (struct GNUNET_MULTICAST_Group *grp)
282{
283}
284
285
286static void
287recv_error (struct GNUNET_MULTICAST_Group *grp)
288{
289 if (NULL != grp->message_cb)
290 grp->message_cb (grp->cb_cls, NULL);
291
292 recv_reset (grp);
293}
294
295
296/**
297 * Transmit next message to service.
298 *
299 * @param cls The struct GNUNET_MULTICAST_Group.
300 * @param size Number of bytes available in @a buf.
301 * @param buf Where to copy the message.
302 *
303 * @return Number of bytes copied to @a buf.
304 */
305static size_t
306send_next_message (void *cls, size_t size, void *buf)
307{
308 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
309 struct GNUNET_MULTICAST_Group *grp = cls;
310 struct MessageQueue *mq = grp->tmit_head;
311 if (NULL == mq)
312 return 0;
313 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
314 size_t ret = ntohs (qmsg->size);
315 grp->th = NULL;
316 if (ret > size)
317 {
318 reschedule_connect (grp);
319 return 0;
320 }
321 memcpy (buf, qmsg, ret);
322
323 GNUNET_CONTAINER_DLL_remove (grp->tmit_head, grp->tmit_tail, mq);
324 GNUNET_free (mq);
325
326 if (NULL != grp->tmit_head)
327 transmit_next (grp);
328
329 if (GNUNET_NO == grp->in_receive)
330 {
331 grp->in_receive = GNUNET_YES;
332 GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
333 GNUNET_TIME_UNIT_FOREVER_REL);
334 }
335 return ret;
336}
337
338
339/**
340 * Schedule transmission of the next message from our queue.
341 *
342 * @param grp Multicast group handle.
343 */
344static void
345transmit_next (struct GNUNET_MULTICAST_Group *grp)
346{
347 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
348 if (NULL != grp->th || NULL == grp->client)
349 return;
350
351 struct MessageQueue *mq = grp->tmit_head;
352 if (NULL == mq)
353 return;
354 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
355
356 grp->th = GNUNET_CLIENT_notify_transmit_ready (grp->client,
357 ntohs (qmsg->size),
358 GNUNET_TIME_UNIT_FOREVER_REL,
359 GNUNET_NO,
360 &send_next_message,
361 grp);
362}
363
364
365/**
366 * Try again to connect to the Multicast service.
367 *
368 * @param cls Channel handle.
369 * @param tc Scheduler context.
370 */
371static void
372reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
373{
374 struct GNUNET_MULTICAST_Group *grp = cls;
375
376 recv_reset (grp);
377 grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
378 LOG (GNUNET_ERROR_TYPE_DEBUG,
379 "Connecting to Multicast service.\n");
380 GNUNET_assert (NULL == grp->client);
381 grp->client = GNUNET_CLIENT_connect ("multicast", grp->cfg);
382 GNUNET_assert (NULL != grp->client);
383 uint16_t reconn_size = ntohs (grp->reconnect_msg->size);
384
385 if (NULL == grp->tmit_head ||
386 0 != memcmp (&grp->tmit_head[1], grp->reconnect_msg, reconn_size))
387 {
388 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
389 memcpy (&mq[1], grp->reconnect_msg, reconn_size);
390 GNUNET_CONTAINER_DLL_insert (grp->tmit_head, grp->tmit_tail, mq);
391 }
392 transmit_next (grp);
393}
394
395
396/**
397 * Disconnect from the Multicast service.
398 *
399 * @param g Group handle to disconnect.
400 */
401static void
402disconnect (void *g)
403{
404 struct GNUNET_MULTICAST_Group *grp = g;
405
406 GNUNET_assert (NULL != grp);
407 if (grp->tmit_head != grp->tmit_tail)
408 {
409 LOG (GNUNET_ERROR_TYPE_ERROR,
410 "Disconnecting while there are still outstanding messages!\n");
411 GNUNET_break (0);
412 }
413 if (grp->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
414 {
415 GNUNET_SCHEDULER_cancel (grp->reconnect_task);
416 grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
417 }
418 if (NULL != grp->th)
419 {
420 GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
421 grp->th = NULL;
422 }
423 if (NULL != grp->client)
424 {
425 GNUNET_CLIENT_disconnect (grp->client);
426 grp->client = NULL;
427 }
428 if (NULL != grp->reconnect_msg)
429 {
430 GNUNET_free (grp->reconnect_msg);
431 grp->reconnect_msg = NULL;
432 }
433}
434
435
171/** 436/**
172 * Iterator callback for calling message callbacks for all groups. 437 * Iterator callback for calling message callbacks for all groups.
173 */ 438 */
174static int 439static int
175message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash, 440message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash,
176 void *group) 441 void *group)
177{ 442{
178 const struct GNUNET_MessageHeader *msg = cls; 443 const struct GNUNET_MessageHeader *msg = cls;
179 struct GNUNET_MULTICAST_Group *grp = group; 444 struct GNUNET_MULTICAST_Group *grp = group;
180 445
181 if (GNUNET_YES == grp->is_origin) 446 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
182 { 447 "Calling message callback with a message "
183 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 448 "of type %u and size %u.\n",
184 "Calling origin's message callback "
185 "with a message of type %u and size %u.\n",
186 ntohs (msg->type), ntohs (msg->size)); 449 ntohs (msg->type), ntohs (msg->size));
187 struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp; 450
188 orig->message_cb (orig->cls, msg); 451 if (NULL != grp->message_cb)
189 } 452 grp->message_cb (grp->cb_cls, msg);
190 else
191 {
192 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
193 "Calling member's message callback "
194 "with a message of type %u and size %u.\n",
195 ntohs (msg->type), ntohs (msg->size));
196 struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp;
197 mem->message_cb (mem->cls, msg);
198 }
199 453
200 return GNUNET_YES; 454 return GNUNET_YES;
201} 455}
@@ -213,25 +467,12 @@ static void
213handle_multicast_message (struct GNUNET_MULTICAST_Group *grp, 467handle_multicast_message (struct GNUNET_MULTICAST_Group *grp,
214 const struct GNUNET_MULTICAST_MessageHeader *msg) 468 const struct GNUNET_MULTICAST_MessageHeader *msg)
215{ 469{
216 struct GNUNET_HashCode *hash;
217
218 if (GNUNET_YES == grp->is_origin)
219 {
220 struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp;
221 hash = &orig->pub_key_hash;
222 }
223 else
224 {
225 struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp;
226 hash = &mem->group_key_hash;
227 }
228
229 if (origins != NULL) 470 if (origins != NULL)
230 GNUNET_CONTAINER_multihashmap_get_multiple (origins, hash, message_callback, 471 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
231 (void *) msg); 472 message_callback, (void *) msg);
232 if (members != NULL) 473 if (members != NULL)
233 GNUNET_CONTAINER_multihashmap_get_multiple (members, hash, message_callback, 474 GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
234 (void *) msg); 475 message_callback, (void *) msg);
235} 476}
236 477
237 478
@@ -249,7 +490,7 @@ request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
249 "Calling request callback for a request of type %u and size %u.\n", 490 "Calling request callback for a request of type %u and size %u.\n",
250 ntohs (req->header.type), ntohs (req->header.size)); 491 ntohs (req->header.type), ntohs (req->header.size));
251 492
252 orig->request_cb (orig->cls, &req->member_key, 493 orig->request_cb (orig->grp.cb_cls, &req->member_key,
253 (const struct GNUNET_MessageHeader *) req, 0); 494 (const struct GNUNET_MessageHeader *) req, 0);
254 return GNUNET_YES; 495 return GNUNET_YES;
255} 496}
@@ -264,16 +505,94 @@ request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
264 * @param msg The message. 505 * @param msg The message.
265 */ 506 */
266static void 507static void
267handle_multicast_request (const struct GNUNET_HashCode *group_key_hash, 508handle_multicast_request (struct GNUNET_MULTICAST_Group *grp,
268 const struct GNUNET_MULTICAST_RequestHeader *req) 509 const struct GNUNET_MULTICAST_RequestHeader *req)
269{ 510{
270 if (NULL != origins) 511 if (NULL != origins)
271 GNUNET_CONTAINER_multihashmap_get_multiple (origins, group_key_hash, 512 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
272 request_callback, (void *) req); 513 request_callback, (void *) req);
273} 514}
274 515
275 516
276/** 517/**
518 * Function called when we receive a message from the service.
519 *
520 * @param cls struct GNUNET_MULTICAST_Group
521 * @param msg Message received, NULL on timeout or fatal error.
522 */
523static void
524message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
525{
526 struct GNUNET_MULTICAST_Group *grp = cls;
527
528 if (NULL == msg)
529 {
530 // timeout / disconnected from service, reconnect
531 reschedule_connect (grp);
532 return;
533 }
534
535 uint16_t size_eq = 0;
536 uint16_t size_min = 0;
537 uint16_t size = ntohs (msg->size);
538 uint16_t type = ntohs (msg->type);
539
540 LOG (GNUNET_ERROR_TYPE_DEBUG,
541 "Received message of type %d and size %u from Multicast service\n",
542 type, size);
543
544 switch (type)
545 {
546 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
547 size_min = sizeof (struct GNUNET_MULTICAST_MessageHeader);
548 break;
549
550 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
551 size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader);
552 break;
553
554 default:
555 GNUNET_break_op (0);
556 return;
557 }
558
559 if (! ((0 < size_eq && size == size_eq)
560 || (0 < size_min && size_min <= size)))
561 {
562 GNUNET_break_op (0);
563 return;
564 }
565
566 switch (type)
567 {
568 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
569 handle_multicast_message (grp, (struct GNUNET_MULTICAST_MessageHeader *) msg);
570 break;
571
572 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
573 if (GNUNET_YES != grp->is_origin)
574 {
575 GNUNET_break (0);
576 break;
577 }
578
579 handle_multicast_request (grp, (struct GNUNET_MULTICAST_RequestHeader *) msg);
580 break;
581
582 default:
583 GNUNET_break_op (0);
584 return;
585 }
586
587 if (NULL != grp->client)
588 {
589 GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
590 GNUNET_TIME_UNIT_FOREVER_REL);
591 }
592}
593
594
595/**
277 * Function to call with the decision made for a join request. 596 * Function to call with the decision made for a join request.
278 * 597 *
279 * Must be called once and only once in response to an invocation of the 598 * Must be called once and only once in response to an invocation of the
@@ -375,28 +694,29 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
375 * candidate will be given a response. Members in the group can send messages 694 * candidate will be given a response. Members in the group can send messages
376 * to the origin (one at a time). 695 * to the origin (one at a time).
377 * 696 *
378 * @param cfg Configuration to use. 697 * @param cfg Configuration to use.
379 * @param priv_key ECC key that will be used to sign messages for this 698 * @param priv_key ECC key that will be used to sign messages for this
380 * multicast session; public key is used to identify the multicast group; 699 * multicast session; public key is used to identify the multicast group;
381 * @param next_fragment_id Next fragment ID to continue counting fragments from 700 * @param max_fragment_id Maximum fragment ID already sent to the group.
382 * when restarting the origin. 0 for a new group. 701 * 0 for a new group.
383 * @param join_cb Function called to approve / disapprove joining of a peer. 702 * @param join_cb Function called to approve / disapprove joining of a peer.
384 * @param mem_test_cb Function multicast can use to test group membership. 703 * @param member_test_cb Function multicast can use to test group membership.
385 * @param replay_frag_cb Function that can be called to replay a message fragment. 704 * @param replay_frag_cb Function that can be called to replay a message fragment.
386 * @param replay_msg_cb Function that can be called to replay a message. 705 * @param replay_msg_cb Function that can be called to replay a message.
387 * @param request_cb Function called with message fragments from group members. 706 * @param request_cb Function called with message fragments from group members.
388 * @param message_cb Function called with the message fragments sent to the 707 * @param message_cb Function called with the message fragments sent to the
389 * network by GNUNET_MULTICAST_origin_to_all(). These message fragments 708 * network by GNUNET_MULTICAST_origin_to_all(). These message fragments
390 * should be stored for answering replay requests later. 709 * should be stored for answering replay requests later.
391 * @param cls Closure for the various callbacks that follow. 710 * @param cls Closure for the various callbacks that follow.
711 *
392 * @return Handle for the origin, NULL on error. 712 * @return Handle for the origin, NULL on error.
393 */ 713 */
394struct GNUNET_MULTICAST_Origin * 714struct GNUNET_MULTICAST_Origin *
395GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, 715GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
396 const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key, 716 const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
397 uint64_t next_fragment_id, 717 uint64_t max_fragment_id,
398 GNUNET_MULTICAST_JoinCallback join_cb, 718 GNUNET_MULTICAST_JoinCallback join_cb,
399 GNUNET_MULTICAST_MembershipTestCallback mem_test_cb, 719 GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
400 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, 720 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
401 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, 721 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
402 GNUNET_MULTICAST_RequestCallback request_cb, 722 GNUNET_MULTICAST_RequestCallback request_cb,
@@ -404,28 +724,40 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
404 void *cls) 724 void *cls)
405{ 725{
406 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig)); 726 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
407 orig->grp.is_origin = GNUNET_YES; 727 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
408 orig->priv_key = *priv_key; 728 struct MulticastOriginStartMessage *start = GNUNET_malloc (sizeof (*start));
409 orig->next_fragment_id = next_fragment_id; 729
410 orig->join_cb = join_cb; 730 start->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
411 orig->mem_test_cb = mem_test_cb; 731 start->header.size = htons (sizeof (*start));
412 orig->replay_frag_cb = replay_frag_cb; 732 start->max_fragment_id = max_fragment_id;
413 orig->replay_msg_cb = replay_msg_cb; 733 memcpy (&start->group_key, priv_key, sizeof (*priv_key));
734
735 grp->reconnect_msg = (struct GNUNET_MessageHeader *) start;
736 grp->is_origin = GNUNET_YES;
737 grp->cfg = cfg;
738
739 grp->cb_cls = cls;
740 grp->join_cb = join_cb;
741 grp->member_test_cb = member_test_cb;
742 grp->replay_frag_cb = replay_frag_cb;
743 grp->replay_msg_cb = replay_msg_cb;
744 grp->message_cb = message_cb;
745
414 orig->request_cb = request_cb; 746 orig->request_cb = request_cb;
415 orig->message_cb = message_cb; 747 orig->priv_key = *priv_key;
416 orig->cls = cls;
417 748
418 GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &orig->pub_key); 749 GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
419 GNUNET_CRYPTO_hash (&orig->pub_key, sizeof (orig->pub_key), 750 GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key),
420 &orig->pub_key_hash); 751 &grp->pub_key_hash);
421 752
422 if (NULL == origins) 753 if (NULL == origins)
423 origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 754 origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
424 755
425 GNUNET_CONTAINER_multihashmap_put (origins, &orig->pub_key_hash, orig, 756 GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
426 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 757 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
427 758
428 /* FIXME: send ORIGIN_START to service */ 759 grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
760 grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
429 761
430 return orig; 762 return orig;
431} 763}
@@ -439,25 +771,26 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
439void 771void
440GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig) 772GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig)
441{ 773{
442 GNUNET_CONTAINER_multihashmap_remove (origins, &orig->pub_key_hash, orig); 774 disconnect (&orig->grp);
775 GNUNET_CONTAINER_multihashmap_remove (origins, &orig->grp.pub_key_hash, orig);
443 GNUNET_free (orig); 776 GNUNET_free (orig);
444} 777}
445 778
446 779
447/* FIXME: for now just call clients' callbacks
448 * without sending anything to multicast. */
449static void 780static void
450schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 781origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
451{ 782{
452 LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_origin_to_all()\n"); 783 LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n");
453 struct GNUNET_MULTICAST_Origin *orig = cls; 784 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
454 struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle; 785 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
455 786
456 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; 787 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
457 char buf[GNUNET_MULTICAST_FRAGMENT_MAX_SIZE] = ""; 788 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
458 struct GNUNET_MULTICAST_MessageHeader *msg 789 GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
459 = (struct GNUNET_MULTICAST_MessageHeader *) buf; 790
460 int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]); 791 struct GNUNET_MULTICAST_MessageHeader *
792 msg = (struct GNUNET_MULTICAST_MessageHeader *) &mq[1];
793 int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
461 794
462 if (! (GNUNET_YES == ret || GNUNET_NO == ret) 795 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
463 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) 796 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
@@ -465,98 +798,78 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
465 LOG (GNUNET_ERROR_TYPE_ERROR, 798 LOG (GNUNET_ERROR_TYPE_ERROR,
466 "OriginTransmitNotify() returned error or invalid message size.\n"); 799 "OriginTransmitNotify() returned error or invalid message size.\n");
467 /* FIXME: handle error */ 800 /* FIXME: handle error */
801 GNUNET_free (mq);
468 return; 802 return;
469 } 803 }
470 804
471 if (GNUNET_NO == ret && 0 == buf_size) 805 if (GNUNET_NO == ret && 0 == buf_size)
806 {
807 GNUNET_free (mq);
472 return; /* Transmission paused. */ 808 return; /* Transmission paused. */
809 }
473 810
474 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); 811 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
475 msg->header.size = htons (sizeof (*msg) + buf_size); 812 msg->header.size = htons (sizeof (*msg) + buf_size);
476 msg->message_id = GNUNET_htonll (mh->message_id); 813 msg->message_id = GNUNET_htonll (tmit->message_id);
477 msg->group_generation = mh->group_generation; 814 msg->group_generation = tmit->group_generation;
478 815 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
479 /* FIXME: add fragment ID and signature in the service instead of here */ 816 tmit->fragment_offset += sizeof (*msg) + buf_size;
480 msg->fragment_id = GNUNET_htonll (orig->next_fragment_id++);
481 msg->fragment_offset = GNUNET_htonll (mh->fragment_offset);
482 mh->fragment_offset += sizeof (*msg) + buf_size;
483 msg->purpose.size = htonl (sizeof (*msg) + buf_size
484 - sizeof (msg->header)
485 - sizeof (msg->hop_counter)
486 - sizeof (msg->signature));
487 msg->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
488
489 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &msg->purpose,
490 &msg->signature))
491 {
492 /* FIXME: handle error */
493 return;
494 }
495
496 /* FIXME: send msg to the service and only then call handle_multicast_message
497 * with the returned signed message.
498 */
499 handle_multicast_message (&orig->grp, msg);
500 817
501 if (GNUNET_NO == ret) 818 transmit_next (grp);
502 GNUNET_SCHEDULER_add_delayed (
503 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
504 schedule_origin_to_all, orig);
505} 819}
506 820
507 821
508/** 822/**
509 * Send a message to the multicast group. 823 * Send a message to the multicast group.
510 * 824 *
511 * @param origin Handle to the multicast group. 825 * @param orig Handle to the multicast group.
512 * @param message_id Application layer ID for the message. Opaque to multicast. 826 * @param message_id Application layer ID for the message. Opaque to multicast.
513 * @param group_generation Group generation of the message. Documented in 827 * @param group_generation Group generation of the message.
514 * `struct GNUNET_MULTICAST_MessageHeader`. 828 * Documented in struct GNUNET_MULTICAST_MessageHeader.
515 * @param notify Function to call to get the message. 829 * @param notify Function to call to get the message.
516 * @param notify_cls Closure for @a notify. 830 * @param notify_cls Closure for @a notify.
517 * @return NULL on error (i.e. request already pending). 831 *
832 * @return Message handle on success,
833 * NULL on error (i.e. another request is already pending).
518 */ 834 */
519struct GNUNET_MULTICAST_OriginMessageHandle * 835struct GNUNET_MULTICAST_OriginTransmitHandle *
520GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin, 836GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
521 uint64_t message_id, 837 uint64_t message_id,
522 uint64_t group_generation, 838 uint64_t group_generation,
523 GNUNET_MULTICAST_OriginTransmitNotify notify, 839 GNUNET_MULTICAST_OriginTransmitNotify notify,
524 void *notify_cls) 840 void *notify_cls)
525{ 841{
526 struct GNUNET_MULTICAST_OriginMessageHandle *mh = &origin->msg_handle; 842 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
527 mh->origin = origin; 843 tmit->origin = orig;
528 mh->message_id = message_id; 844 tmit->message_id = message_id;
529 mh->group_generation = group_generation; 845 tmit->group_generation = group_generation;
530 mh->notify = notify; 846 tmit->notify = notify;
531 mh->notify_cls = notify_cls; 847 tmit->notify_cls = notify_cls;
532 848
533 /* add some delay for testing */ 849 origin_to_all (orig);
534 GNUNET_SCHEDULER_add_delayed ( 850 return tmit;
535 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
536 schedule_origin_to_all, origin);
537 return &origin->msg_handle;
538} 851}
539 852
540 853
541/** 854/**
542 * Resume message transmission to multicast group. 855 * Resume message transmission to multicast group.
543 * 856 *
544 * @param mh Request to cancel. 857 * @param th Transmission to cancel.
545 */ 858 */
546void 859void
547GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginMessageHandle *mh) 860GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
548{ 861{
549 GNUNET_SCHEDULER_add_now (schedule_origin_to_all, mh->origin); 862 origin_to_all (th->origin);
550} 863}
551 864
552 865
553/** 866/**
554 * Cancel request for message transmission to multicast group. 867 * Cancel request for message transmission to multicast group.
555 * 868 *
556 * @param mh Request to cancel. 869 * @param th Transmission to cancel.
557 */ 870 */
558void 871void
559GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginMessageHandle *mh) 872GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
560{ 873{
561} 874}
562 875
@@ -584,12 +897,12 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginMessageHand
584 * @param relays Peer identities of members of the group, which serve as relays 897 * @param relays Peer identities of members of the group, which serve as relays
585 * and can be used to join the group at. and send the @a join_request to. 898 * and can be used to join the group at. and send the @a join_request to.
586 * If empty, the @a join_request is sent directly to the @a origin. 899 * If empty, the @a join_request is sent directly to the @a origin.
587 * @param join_request Application-dependent join request to be passed to the peer 900 * @param join_req Application-dependent join request to be passed to the peer
588 * @a relay (might, for example, contain a user, bind user 901 * @a relay (might, for example, contain a user, bind user
589 * identity/pseudonym to peer identity, application-level message to 902 * identity/pseudonym to peer identity, application-level message to
590 * origin, etc.). 903 * origin, etc.).
591 * @param join_cb Function called to approve / disapprove joining of a peer. 904 * @param join_cb Function called to approve / disapprove joining of a peer.
592 * @param mem_test_cb Function multicast can use to test group membership. 905 * @param member_test_cb Function multicast can use to test group membership.
593 * @param replay_frag_cb Function that can be called to replay message fragments 906 * @param replay_frag_cb Function that can be called to replay message fragments
594 * this peer already knows from this group. NULL if this 907 * this peer already knows from this group. NULL if this
595 * client is unable to support replay. 908 * client is unable to support replay.
@@ -609,7 +922,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
609 const struct GNUNET_PeerIdentity *origin, 922 const struct GNUNET_PeerIdentity *origin,
610 uint32_t relay_count, 923 uint32_t relay_count,
611 const struct GNUNET_PeerIdentity *relays, 924 const struct GNUNET_PeerIdentity *relays,
612 const struct GNUNET_MessageHeader *join_request, 925 const struct GNUNET_MessageHeader *join_req,
613 GNUNET_MULTICAST_JoinCallback join_cb, 926 GNUNET_MULTICAST_JoinCallback join_cb,
614 GNUNET_MULTICAST_MembershipTestCallback member_test_cb, 927 GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
615 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, 928 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
@@ -618,33 +931,47 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
618 void *cls) 931 void *cls)
619{ 932{
620 struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem)); 933 struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
621 mem->group_key = *group_key; 934 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
622 mem->member_key = *member_key; 935
936 uint16_t relay_size = relay_count * sizeof (*relays);
937 uint16_t join_req_size = (NULL != join_req) ? ntohs (join_req->size) : 0;
938 struct MulticastMemberJoinMessage *
939 join = GNUNET_malloc (sizeof (*join) + relay_size + join_req_size);
940 join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
941 join->header.size = htons (sizeof (*join) + relay_size + join_req_size);
942 join->group_key = *group_key;
943 join->member_key = *member_key;
944 join->origin = *origin;
945 memcpy (&join[1], relays, relay_size);
946 memcpy (((char *) &join[1]) + relay_size, join_req, join_req_size);
947
948 grp->reconnect_msg = (struct GNUNET_MessageHeader *) join;
949 grp->is_origin = GNUNET_NO;
950 grp->cfg = cfg;
951 grp->pub_key = *group_key;
952
953 grp->join_cb = join_cb;
954 grp->member_test_cb = member_test_cb;
955 grp->replay_frag_cb = replay_frag_cb;
956 grp->message_cb = message_cb;
957 grp->cb_cls = cls;
958
623 mem->origin = *origin; 959 mem->origin = *origin;
624 mem->relay_count = relay_count; 960 mem->relay_count = relay_count;
625 mem->relays = *relays; 961 mem->relays = *relays;
626 mem->join_cb = join_cb; 962 mem->priv_key = *member_key;
627 mem->member_test_cb = member_test_cb;
628 mem->replay_frag_cb = replay_frag_cb;
629 mem->message_cb = message_cb;
630 mem->cls = cls;
631
632 if (NULL != join_request)
633 {
634 uint16_t size = ntohs (join_request->size);
635 mem->join_request = GNUNET_malloc (size);
636 memcpy (mem->join_request, join_request, size);
637 }
638 963
639 GNUNET_CRYPTO_hash (&mem->group_key, sizeof (mem->group_key), &mem->group_key_hash); 964 GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &grp->pub_key);
965 GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash);
640 966
641 if (NULL == members) 967 if (NULL == members)
642 members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 968 members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
643 969
644 GNUNET_CONTAINER_multihashmap_put (members, &mem->group_key_hash, mem, 970 GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
645 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 971 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
646 972
647 /* FIXME: send MEMBER_JOIN to service */ 973 grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
974 grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
648 975
649 return mem; 976 return mem;
650} 977}
@@ -663,7 +990,8 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
663void 990void
664GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem) 991GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
665{ 992{
666 GNUNET_CONTAINER_multihashmap_remove (members, &mem->group_key_hash, mem); 993 disconnect (&mem->grp);
994 GNUNET_CONTAINER_multihashmap_remove (members, &mem->grp.pub_key_hash, mem);
667 GNUNET_free (mem); 995 GNUNET_free (mem);
668} 996}
669 997
@@ -729,19 +1057,20 @@ GNUNET_MULTICAST_member_replay_cancel (struct GNUNET_MULTICAST_MemberReplayHandl
729} 1057}
730 1058
731 1059
732/* FIXME: for now just send back to the client what it sent. */
733static void 1060static void
734schedule_member_to_origin (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 1061member_to_origin (struct GNUNET_MULTICAST_Member *mem)
735{ 1062{
736 LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_member_to_origin()\n"); 1063 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
737 struct GNUNET_MULTICAST_Member *mem = cls; 1064 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
738 struct GNUNET_MULTICAST_MemberRequestHandle *rh = &mem->req_handle; 1065 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
739 1066
740 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; 1067 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
741 char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; 1068 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
742 struct GNUNET_MULTICAST_RequestHeader *req 1069 GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
743 = (struct GNUNET_MULTICAST_RequestHeader *) buf; 1070
744 int ret = rh->notify (rh->notify_cls, &buf_size, &req[1]); 1071 struct GNUNET_MULTICAST_RequestHeader *
1072 req = (struct GNUNET_MULTICAST_RequestHeader *) &mq[1];
1073 int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
745 1074
746 if (! (GNUNET_YES == ret || GNUNET_NO == ret) 1075 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
747 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) 1076 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
@@ -757,73 +1086,47 @@ schedule_member_to_origin (void *cls, const struct GNUNET_SCHEDULER_TaskContext
757 1086
758 req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); 1087 req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
759 req->header.size = htons (sizeof (*req) + buf_size); 1088 req->header.size = htons (sizeof (*req) + buf_size);
760 req->request_id = GNUNET_htonll (rh->request_id); 1089 req->request_id = GNUNET_htonll (tmit->request_id);
761 1090 req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
762 /* FIXME: add fragment ID and signature in the service instead of here */ 1091 tmit->fragment_offset += sizeof (*req) + buf_size;
763 req->fragment_id = GNUNET_ntohll (mem->next_fragment_id++);
764 req->fragment_offset = GNUNET_ntohll (rh->fragment_offset);
765 rh->fragment_offset += sizeof (*req) + buf_size;
766 req->purpose.size = htonl (sizeof (*req) + buf_size
767 - sizeof (req->header)
768 - sizeof (req->member_key)
769 - sizeof (req->signature));
770 req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
771
772 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->member_key, &req->purpose,
773 &req->signature))
774 {
775 /* FIXME: handle error */
776 return;
777 }
778
779 /* FIXME: send req to the service and only then call handle_multicast_request
780 * with the returned request.
781 */
782 handle_multicast_request (&mem->group_key_hash, req);
783 1092
784 if (GNUNET_NO == ret) 1093 transmit_next (grp);
785 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
786 (GNUNET_TIME_UNIT_SECONDS, 1),
787 schedule_member_to_origin, mem);
788} 1094}
789 1095
790 1096
791/** 1097/**
792 * Send a message to the origin of the multicast group. 1098 * Send a message to the origin of the multicast group.
793 * 1099 *
794 * @param member Membership handle. 1100 * @param mem Membership handle.
795 * @param request_id Application layer ID for the request. Opaque to multicast. 1101 * @param request_id Application layer ID for the request. Opaque to multicast.
796 * @param notify Callback to call to get the message. 1102 * @param notify Callback to call to get the message.
797 * @param notify_cls Closure for @a notify. 1103 * @param notify_cls Closure for @a notify.
798 * @return Handle to cancel request, NULL on error (i.e. request already pending). 1104 * @return Handle to cancel request, NULL on error (i.e. request already pending).
799 */ 1105 */
800struct GNUNET_MULTICAST_MemberRequestHandle * 1106struct GNUNET_MULTICAST_MemberTransmitHandle *
801GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member, 1107GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
802 uint64_t request_id, 1108 uint64_t request_id,
803 GNUNET_MULTICAST_MemberTransmitNotify notify, 1109 GNUNET_MULTICAST_MemberTransmitNotify notify,
804 void *notify_cls) 1110 void *notify_cls)
805{ 1111{
806 struct GNUNET_MULTICAST_MemberRequestHandle *rh = &member->req_handle; 1112 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
807 rh->member = member; 1113 tmit->member = mem;
808 rh->request_id = request_id; 1114 tmit->request_id = request_id;
809 rh->notify = notify; 1115 tmit->notify = notify;
810 rh->notify_cls = notify_cls; 1116 tmit->notify_cls = notify_cls;
811 1117
812 /* FIXME: remove delay, it's there only for testing */ 1118 member_to_origin (mem);
813 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 1119 return tmit;
814 (GNUNET_TIME_UNIT_SECONDS, 1),
815 schedule_member_to_origin, member);
816 return &member->req_handle;
817} 1120}
818 1121
819 1122
820/** 1123/**
821 * Resume message transmission to origin. 1124 * Resume message transmission to origin.
822 * 1125 *
823 * @param rh Request to cancel. 1126 * @param th Transmission to cancel.
824 */ 1127 */
825void 1128void
826GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberRequestHandle *rh) 1129GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
827{ 1130{
828 1131
829} 1132}
@@ -832,10 +1135,10 @@ GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberRequestH
832/** 1135/**
833 * Cancel request for message transmission to origin. 1136 * Cancel request for message transmission to origin.
834 * 1137 *
835 * @param rh Request to cancel. 1138 * @param th Transmission to cancel.
836 */ 1139 */
837void 1140void
838GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberRequestHandle *rh) 1141GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
839{ 1142{
840} 1143}
841 1144
diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am
index 5f3abfbb5..212c383ac 100644
--- a/src/psyc/Makefile.am
+++ b/src/psyc/Makefile.am
@@ -42,14 +42,14 @@ gnunet_service_psyc_SOURCES = \
42 gnunet-service-psyc.c \ 42 gnunet-service-psyc.c \
43 psyc_common.c 43 psyc_common.c
44gnunet_service_psyc_LDADD = \ 44gnunet_service_psyc_LDADD = \
45 $(top_builddir)/src/statistics/libgnunetstatistics.la \
46 $(top_builddir)/src/util/libgnunetutil.la \ 45 $(top_builddir)/src/util/libgnunetutil.la \
46 $(top_builddir)/src/statistics/libgnunetstatistics.la \
47 $(top_builddir)/src/multicast/libgnunetmulticast.la \ 47 $(top_builddir)/src/multicast/libgnunetmulticast.la \
48 $(top_builddir)/src/psycstore/libgnunetpsycstore.la \ 48 $(top_builddir)/src/psycstore/libgnunetpsycstore.la \
49 $(GN_LIBINTL) 49 $(GN_LIBINTL)
50gnunet_service_psyc_DEPENDENCIES = \ 50gnunet_service_psyc_DEPENDENCIES = \
51 $(top_builddir)/src/statistics/libgnunetstatistics.la \
52 $(top_builddir)/src/util/libgnunetutil.la \ 51 $(top_builddir)/src/util/libgnunetutil.la \
52 $(top_builddir)/src/statistics/libgnunetstatistics.la \
53 $(top_builddir)/src/multicast/libgnunetmulticast.la \ 53 $(top_builddir)/src/multicast/libgnunetmulticast.la \
54 $(top_builddir)/src/psycstore/libgnunetpsycstore.la 54 $(top_builddir)/src/psycstore/libgnunetpsycstore.la
55gnunet_service_psyc_CFLAGS = $(AM_CFLAGS) 55gnunet_service_psyc_CFLAGS = $(AM_CFLAGS)
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 3a29c8ffd..70322adaa 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -58,10 +58,16 @@ static struct GNUNET_SERVER_NotificationContext *nc;
58static struct GNUNET_PSYCSTORE_Handle *store; 58static struct GNUNET_PSYCSTORE_Handle *store;
59 59
60/** 60/**
61 * All connected masters and slaves. 61 * All connected masters.
62 * Channel's pub_key_hash -> struct Channel 62 * Channel's pub_key_hash -> struct Channel
63 */ 63 */
64static struct GNUNET_CONTAINER_MultiHashMap *clients; 64static struct GNUNET_CONTAINER_MultiHashMap *masters;
65
66/**
67 * All connected slaves.
68 * Channel's pub_key_hash -> struct Channel
69 */
70static struct GNUNET_CONTAINER_MultiHashMap *slaves;
65 71
66 72
67/** 73/**
@@ -158,7 +164,7 @@ struct FragmentQueue
158 164
159 165
160/** 166/**
161 * Common part of the client context for both a master and slave channel. 167 * Common part of the client context for both a channel master and slave.
162 */ 168 */
163struct Channel 169struct Channel
164{ 170{
@@ -266,7 +272,7 @@ struct Master
266 /** 272 /**
267 * Transmit handle for multicast. 273 * Transmit handle for multicast.
268 */ 274 */
269 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; 275 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
270 276
271 /** 277 /**
272 * Last message ID transmitted to this channel. 278 * Last message ID transmitted to this channel.
@@ -307,7 +313,7 @@ struct Slave
307 /** 313 /**
308 * Private key of the slave. 314 * Private key of the slave.
309 */ 315 */
310 struct GNUNET_CRYPTO_EddsaPrivateKey slave_key; 316 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
311 317
312 /** 318 /**
313 * Handle for the multicast member. 319 * Handle for the multicast member.
@@ -317,7 +323,7 @@ struct Slave
317 /** 323 /**
318 * Transmit handle for multicast. 324 * Transmit handle for multicast.
319 */ 325 */
320 struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle; 326 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
321 327
322 /** 328 /**
323 * Peer identity of the origin. 329 * Peer identity of the origin.
@@ -382,7 +388,7 @@ client_cleanup (struct Channel *ch)
382 struct Master *mst = (struct Master *) ch; 388 struct Master *mst = (struct Master *) ch;
383 if (NULL != mst->origin) 389 if (NULL != mst->origin)
384 GNUNET_MULTICAST_origin_stop (mst->origin); 390 GNUNET_MULTICAST_origin_stop (mst->origin);
385 GNUNET_CONTAINER_multihashmap_remove (clients, &ch->pub_key_hash, mst); 391 GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
386 } 392 }
387 else 393 else
388 { 394 {
@@ -393,6 +399,7 @@ client_cleanup (struct Channel *ch)
393 GNUNET_free (slv->relays); 399 GNUNET_free (slv->relays);
394 if (NULL != slv->member) 400 if (NULL != slv->member)
395 GNUNET_MULTICAST_member_part (slv->member); 401 GNUNET_MULTICAST_member_part (slv->member);
402 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
396 } 403 }
397 404
398 GNUNET_free (ch); 405 GNUNET_free (ch);
@@ -975,7 +982,7 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
975 NULL, NULL)) 982 NULL, NULL))
976 { 983 {
977 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 984 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
978 "%p Dropping message with invalid parts " 985 "%p Dropping request with invalid parts "
979 "received from multicast.\n", ch); 986 "received from multicast.\n", ch);
980 GNUNET_break_op (0); 987 GNUNET_break_op (0);
981 break; 988 break;
@@ -1017,6 +1024,7 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1017{ 1024{
1018 struct Master *mst = cls; 1025 struct Master *mst = cls;
1019 struct Channel *ch = &mst->channel; 1026 struct Channel *ch = &mst->channel;
1027
1020 struct CountersResult *res = GNUNET_malloc (sizeof (*res)); 1028 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
1021 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1029 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1022 res->header.size = htons (sizeof (*res)); 1030 res->header.size = htons (sizeof (*res));
@@ -1031,12 +1039,20 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1031 mst->max_group_generation = max_group_generation; 1039 mst->max_group_generation = max_group_generation;
1032 mst->origin 1040 mst->origin
1033 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, 1041 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
1034 max_fragment_id + 1, 1042 max_fragment_id,
1035 join_cb, membership_test_cb, 1043 join_cb, membership_test_cb,
1036 replay_fragment_cb, replay_message_cb, 1044 replay_fragment_cb, replay_message_cb,
1037 request_cb, message_cb, ch); 1045 request_cb, message_cb, ch);
1038 ch->ready = GNUNET_YES; 1046 ch->ready = GNUNET_YES;
1039 } 1047 }
1048 else
1049 {
1050 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1051 "%p GNUNET_PSYCSTORE_counters_get() "
1052 "returned %d for channel %s.\n",
1053 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1054 }
1055
1040 GNUNET_SERVER_notification_context_add (nc, ch->client); 1056 GNUNET_SERVER_notification_context_add (nc, ch->client);
1041 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, 1057 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
1042 GNUNET_NO); 1058 GNUNET_NO);
@@ -1054,6 +1070,7 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1054{ 1070{
1055 struct Slave *slv = cls; 1071 struct Slave *slv = cls;
1056 struct Channel *ch = &slv->channel; 1072 struct Channel *ch = &slv->channel;
1073
1057 struct CountersResult *res = GNUNET_malloc (sizeof (*res)); 1074 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
1058 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1075 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1059 res->header.size = htons (sizeof (*res)); 1076 res->header.size = htons (sizeof (*res));
@@ -1065,7 +1082,7 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1065 ch->max_message_id = max_message_id; 1082 ch->max_message_id = max_message_id;
1066 ch->max_state_message_id = max_state_message_id; 1083 ch->max_state_message_id = max_state_message_id;
1067 slv->member 1084 slv->member
1068 = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key, 1085 = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
1069 &slv->origin, 1086 &slv->origin,
1070 slv->relay_count, slv->relays, 1087 slv->relay_count, slv->relays,
1071 slv->join_req, join_cb, 1088 slv->join_req, join_cb,
@@ -1074,6 +1091,13 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1074 message_cb, ch); 1091 message_cb, ch);
1075 ch->ready = GNUNET_YES; 1092 ch->ready = GNUNET_YES;
1076 } 1093 }
1094 else
1095 {
1096 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1097 "%p GNUNET_PSYCSTORE_counters_get() "
1098 "returned %d for channel %s.\n",
1099 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1100 }
1077 1101
1078 GNUNET_SERVER_notification_context_add (nc, ch->client); 1102 GNUNET_SERVER_notification_context_add (nc, ch->client);
1079 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, 1103 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
@@ -1118,9 +1142,9 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1118 1142
1119 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst); 1143 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst);
1120 1144
1121 GNUNET_SERVER_client_set_user_context (client, &mst->channel); 1145 GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
1122 GNUNET_CONTAINER_multihashmap_put (clients, &ch->pub_key_hash, mst,
1123 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 1146 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1147 GNUNET_SERVER_client_set_user_context (client, ch);
1124 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1148 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1125} 1149}
1126 1150
@@ -1135,7 +1159,7 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1135 const struct SlaveJoinRequest *req 1159 const struct SlaveJoinRequest *req
1136 = (const struct SlaveJoinRequest *) msg; 1160 = (const struct SlaveJoinRequest *) msg;
1137 struct Slave *slv = GNUNET_new (struct Slave); 1161 struct Slave *slv = GNUNET_new (struct Slave);
1138 slv->slave_key = req->slave_key; 1162 slv->priv_key = req->slave_key;
1139 slv->origin = req->origin; 1163 slv->origin = req->origin;
1140 slv->relay_count = ntohl (req->relay_count); 1164 slv->relay_count = ntohl (req->relay_count);
1141 if (0 < slv->relay_count) 1165 if (0 < slv->relay_count)
@@ -1163,6 +1187,8 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1163 1187
1164 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv); 1188 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv);
1165 1189
1190 GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
1191 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1166 GNUNET_SERVER_client_set_user_context (client, &slv->channel); 1192 GNUNET_SERVER_client_set_user_context (client, &slv->channel);
1167 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1193 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1168} 1194}
@@ -1183,8 +1209,7 @@ send_message_ack (struct Channel *ch)
1183 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK); 1209 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1184 1210
1185 GNUNET_SERVER_notification_context_add (nc, ch->client); 1211 GNUNET_SERVER_notification_context_add (nc, ch->client);
1186 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, 1212 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, GNUNET_NO);
1187 GNUNET_NO);
1188} 1213}
1189 1214
1190 1215
@@ -1554,7 +1579,8 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
1554 cfg = c; 1579 cfg = c;
1555 store = GNUNET_PSYCSTORE_connect (cfg); 1580 store = GNUNET_PSYCSTORE_connect (cfg);
1556 stats = GNUNET_STATISTICS_create ("psyc", cfg); 1581 stats = GNUNET_STATISTICS_create ("psyc", cfg);
1557 clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1582 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1583 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1558 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1584 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1559 nc = GNUNET_SERVER_notification_context_create (server, 1); 1585 nc = GNUNET_SERVER_notification_context_create (server, 1);
1560 GNUNET_SERVER_add_handlers (server, handlers); 1586 GNUNET_SERVER_add_handlers (server, handlers);
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 22f1da069..85f86ceaa 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -41,11 +41,10 @@
41 41
42#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) 42#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
43 43
44struct OperationHandle 44struct MessageQueue
45{ 45{
46 struct OperationHandle *prev; 46 struct MessageQueue *prev;
47 struct OperationHandle *next; 47 struct MessageQueue *next;
48 struct GNUNET_MessageHeader *msg;
49}; 48};
50 49
51 50
@@ -87,19 +86,19 @@ struct GNUNET_PSYC_Channel
87 struct GNUNET_CLIENT_TransmitHandle *th; 86 struct GNUNET_CLIENT_TransmitHandle *th;
88 87
89 /** 88 /**
90 * Head of operations to transmit. 89 * Head of messages to transmit to the service.
91 */ 90 */
92 struct OperationHandle *tmit_head; 91 struct MessageQueue *tmit_head;
93 92
94 /** 93 /**
95 * Tail of operations to transmit. 94 * Tail of operations to transmit to the service.
96 */ 95 */
97 struct OperationHandle *tmit_tail; 96 struct MessageQueue *tmit_tail;
98 97
99 /** 98 /**
100 * Message being transmitted to the PSYC service. 99 * Message currently being transmitted to the service.
101 */ 100 */
102 struct OperationHandle *tmit_msg; 101 struct MessageQueue *tmit_msg;
103 102
104 /** 103 /**
105 * Message to send on reconnect. 104 * Message to send on reconnect.
@@ -201,8 +200,6 @@ struct GNUNET_PSYC_Master
201 struct GNUNET_PSYC_Channel ch; 200 struct GNUNET_PSYC_Channel ch;
202 201
203 GNUNET_PSYC_MasterStartCallback start_cb; 202 GNUNET_PSYC_MasterStartCallback start_cb;
204
205 uint64_t max_message_id;
206}; 203};
207 204
208 205
@@ -214,8 +211,6 @@ struct GNUNET_PSYC_Slave
214 struct GNUNET_PSYC_Channel ch; 211 struct GNUNET_PSYC_Channel ch;
215 212
216 GNUNET_PSYC_SlaveJoinCallback join_cb; 213 GNUNET_PSYC_SlaveJoinCallback join_cb;
217
218 uint64_t max_message_id;
219}; 214};
220 215
221 216
@@ -269,30 +264,30 @@ channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
269/** 264/**
270 * Reschedule a connect attempt to the service. 265 * Reschedule a connect attempt to the service.
271 * 266 *
272 * @param c channel to reconnect 267 * @param ch Channel to reconnect.
273 */ 268 */
274static void 269static void
275reschedule_connect (struct GNUNET_PSYC_Channel *c) 270reschedule_connect (struct GNUNET_PSYC_Channel *ch)
276{ 271{
277 GNUNET_assert (c->reconnect_task == GNUNET_SCHEDULER_NO_TASK); 272 GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
278 273
279 if (NULL != c->th) 274 if (NULL != ch->th)
280 { 275 {
281 GNUNET_CLIENT_notify_transmit_ready_cancel (c->th); 276 GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
282 c->th = NULL; 277 ch->th = NULL;
283 } 278 }
284 if (NULL != c->client) 279 if (NULL != ch->client)
285 { 280 {
286 GNUNET_CLIENT_disconnect (c->client); 281 GNUNET_CLIENT_disconnect (ch->client);
287 c->client = NULL; 282 ch->client = NULL;
288 } 283 }
289 c->in_receive = GNUNET_NO; 284 ch->in_receive = GNUNET_NO;
290 LOG (GNUNET_ERROR_TYPE_DEBUG, 285 LOG (GNUNET_ERROR_TYPE_DEBUG,
291 "Scheduling task to reconnect to PSYC service in %s.\n", 286 "Scheduling task to reconnect to PSYC service in %s.\n",
292 GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES)); 287 GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, GNUNET_YES));
293 c->reconnect_task = 288 ch->reconnect_task =
294 GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c); 289 GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch);
295 c->reconnect_delay = GNUNET_TIME_STD_BACKOFF (c->reconnect_delay); 290 ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay);
296} 291}
297 292
298 293
@@ -306,7 +301,7 @@ transmit_next (struct GNUNET_PSYC_Channel *ch);
306 301
307 302
308/** 303/**
309 * Reset data stored related to the last received message. 304 * Reset stored data related to the last received message.
310 */ 305 */
311static void 306static void
312recv_reset (struct GNUNET_PSYC_Channel *ch) 307recv_reset (struct GNUNET_PSYC_Channel *ch)
@@ -356,51 +351,53 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
356 "Queueing message of type %u and size %u (end: %u)).\n", 351 "Queueing message of type %u and size %u (end: %u)).\n",
357 ntohs (msg->type), size, end); 352 ntohs (msg->type), size, end);
358 353
359 struct OperationHandle *op = ch->tmit_msg; 354 struct MessageQueue *mq = ch->tmit_msg;
360 if (NULL != op) 355 struct GNUNET_MessageHeader *qmsg = NULL;
356 if (NULL != mq)
361 { 357 {
358 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
362 if (NULL == msg 359 if (NULL == msg
363 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size) 360 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size)
364 { 361 {
365 /* End of message or buffer is full, add it to transmission queue 362 /* End of message or buffer is full, add it to transmission queue
366 * and start with empty buffer */ 363 * and start with empty buffer */
367 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 364 qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
368 op->msg->size = htons (op->msg->size); 365 qmsg->size = htons (qmsg->size);
369 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 366 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
370 ch->tmit_msg = op = NULL; 367 ch->tmit_msg = mq = NULL;
371 ch->tmit_ack_pending++; 368 ch->tmit_ack_pending++;
372 } 369 }
373 else 370 else
374 { 371 {
375 /* Message fits in current buffer, append */ 372 /* Message fits in current buffer, append */
376 ch->tmit_msg = op 373 ch->tmit_msg
377 = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size); 374 = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size);
378 op->msg = (struct GNUNET_MessageHeader *) &op[1]; 375 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
379 memcpy ((char *) op->msg + op->msg->size, msg, size); 376 memcpy ((char *) qmsg + qmsg->size, msg, size);
380 op->msg->size += size; 377 qmsg->size += size;
381 } 378 }
382 } 379 }
383 380
384 if (NULL == op && NULL != msg) 381 if (NULL == mq && NULL != msg)
385 { 382 {
386 /* Empty buffer, copy over message. */ 383 /* Empty buffer, copy over message. */
387 ch->tmit_msg = op 384 ch->tmit_msg
388 = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size); 385 = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size);
389 op->msg = (struct GNUNET_MessageHeader *) &op[1]; 386 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
390 op->msg->size = sizeof (*op->msg) + size; 387 qmsg->size = sizeof (*qmsg) + size;
391 memcpy (&op->msg[1], msg, size); 388 memcpy (&qmsg[1], msg, size);
392 } 389 }
393 390
394 if (NULL != op 391 if (NULL != mq
395 && (GNUNET_YES == end 392 && (GNUNET_YES == end
396 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD 393 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
397 < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) 394 < qmsg->size + sizeof (struct GNUNET_MessageHeader))))
398 { 395 {
399 /* End of message or buffer is full, add it to transmission queue. */ 396 /* End of message or buffer is full, add it to transmission queue. */
400 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 397 qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
401 op->msg->size = htons (op->msg->size); 398 qmsg->size = htons (qmsg->size);
402 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 399 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
403 ch->tmit_msg = op = NULL; 400 ch->tmit_msg = mq = NULL;
404 ch->tmit_ack_pending++; 401 ch->tmit_ack_pending++;
405 } 402 }
406 403
@@ -577,6 +574,7 @@ channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
577 * @param notify_data Function to call to obtain fragments of the data. 574 * @param notify_data Function to call to obtain fragments of the data.
578 * @param notify_cls Closure for @a notify_mod and @a notify_data. 575 * @param notify_cls Closure for @a notify_mod and @a notify_data.
579 * @param flags Flags for the message being transmitted. 576 * @param flags Flags for the message being transmitted.
577 *
580 * @return Transmission handle, NULL on error (i.e. more than one request queued). 578 * @return Transmission handle, NULL on error (i.e. more than one request queued).
581 */ 579 */
582static struct GNUNET_PSYC_ChannelTransmitHandle * 580static struct GNUNET_PSYC_ChannelTransmitHandle *
@@ -593,14 +591,14 @@ channel_transmit (struct GNUNET_PSYC_Channel *ch,
593 591
594 size_t size = strlen (method_name) + 1; 592 size_t size = strlen (method_name) + 1;
595 struct GNUNET_PSYC_MessageMethod *pmeth; 593 struct GNUNET_PSYC_MessageMethod *pmeth;
596 struct OperationHandle *op; 594 struct GNUNET_MessageHeader *qmsg;
597 595 struct MessageQueue *
598 ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) 596 mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg)
599 + sizeof (*pmeth) + size); 597 + sizeof (*pmeth) + size);
600 op->msg = (struct GNUNET_MessageHeader *) &op[1]; 598 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
601 op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size; 599 qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size;
602 600
603 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1]; 601 pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1];
604 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); 602 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
605 pmeth->header.size = htons (sizeof (*pmeth) + size); 603 pmeth->header.size = htons (sizeof (*pmeth) + size);
606 pmeth->flags = htonl (flags); 604 pmeth->flags = htonl (flags);
@@ -928,7 +926,7 @@ message_handler (void *cls,
928 926
929 if (NULL == msg) 927 if (NULL == msg)
930 { 928 {
931 // timeout / disconnected from server, reconnect 929 // timeout / disconnected from service, reconnect
932 reschedule_connect (ch); 930 reschedule_connect (ch);
933 return; 931 return;
934 } 932 }
@@ -970,17 +968,15 @@ message_handler (void *cls,
970 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: 968 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
971 { 969 {
972 struct CountersResult *cres = (struct CountersResult *) msg; 970 struct CountersResult *cres = (struct CountersResult *) msg;
973 mst->max_message_id = GNUNET_ntohll (cres->max_message_id);
974 if (NULL != mst->start_cb) 971 if (NULL != mst->start_cb)
975 mst->start_cb (ch->cb_cls, mst->max_message_id); 972 mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
976 break; 973 break;
977 } 974 }
978 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: 975 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
979 { 976 {
980 struct CountersResult *cres = (struct CountersResult *) msg; 977 struct CountersResult *cres = (struct CountersResult *) msg;
981 slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
982 if (NULL != slv->join_cb) 978 if (NULL != slv->join_cb)
983 slv->join_cb (ch->cb_cls, slv->max_message_id); 979 slv->join_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
984 break; 980 break;
985 } 981 }
986 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: 982 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
@@ -1005,31 +1001,32 @@ message_handler (void *cls,
1005/** 1001/**
1006 * Transmit next message to service. 1002 * Transmit next message to service.
1007 * 1003 *
1008 * @param cls The 'struct GNUNET_PSYC_Channel'. 1004 * @param cls The struct GNUNET_PSYC_Channel.
1009 * @param size Number of bytes available in buf. 1005 * @param size Number of bytes available in @a buf.
1010 * @param buf Where to copy the message. 1006 * @param buf Where to copy the message.
1011 * @return Number of bytes copied to buf. 1007 *
1008 * @return Number of bytes copied to @a buf.
1012 */ 1009 */
1013static size_t 1010static size_t
1014send_next_message (void *cls, size_t size, void *buf) 1011send_next_message (void *cls, size_t size, void *buf)
1015{ 1012{
1016 struct GNUNET_PSYC_Channel *ch = cls;
1017 struct OperationHandle *op = ch->tmit_head;
1018 size_t ret;
1019 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); 1013 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
1020 ch->th = NULL; 1014 struct GNUNET_PSYC_Channel *ch = cls;
1021 if (NULL == op->msg) 1015 struct MessageQueue *mq = ch->tmit_head;
1016 if (NULL == mq)
1022 return 0; 1017 return 0;
1023 ret = ntohs (op->msg->size); 1018 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
1019 size_t ret = ntohs (qmsg->size);
1020 ch->th = NULL;
1024 if (ret > size) 1021 if (ret > size)
1025 { 1022 {
1026 reschedule_connect (ch); 1023 reschedule_connect (ch);
1027 return 0; 1024 return 0;
1028 } 1025 }
1029 memcpy (buf, op->msg, ret); 1026 memcpy (buf, qmsg, ret);
1030 1027
1031 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, op); 1028 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq);
1032 GNUNET_free (op); 1029 GNUNET_free (mq);
1033 1030
1034 if (NULL != ch->tmit_head) 1031 if (NULL != ch->tmit_head)
1035 transmit_next (ch); 1032 transmit_next (ch);
@@ -1056,12 +1053,13 @@ transmit_next (struct GNUNET_PSYC_Channel *ch)
1056 if (NULL != ch->th || NULL == ch->client) 1053 if (NULL != ch->th || NULL == ch->client)
1057 return; 1054 return;
1058 1055
1059 struct OperationHandle *op = ch->tmit_head; 1056 struct MessageQueue *mq = ch->tmit_head;
1060 if (NULL == op) 1057 if (NULL == mq)
1061 return; 1058 return;
1059 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
1062 1060
1063 ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client, 1061 ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client,
1064 ntohs (op->msg->size), 1062 ntohs (qmsg->size),
1065 GNUNET_TIME_UNIT_FOREVER_REL, 1063 GNUNET_TIME_UNIT_FOREVER_REL,
1066 GNUNET_NO, 1064 GNUNET_NO,
1067 &send_next_message, 1065 &send_next_message,
@@ -1087,15 +1085,14 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1087 GNUNET_assert (NULL == ch->client); 1085 GNUNET_assert (NULL == ch->client);
1088 ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); 1086 ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
1089 GNUNET_assert (NULL != ch->client); 1087 GNUNET_assert (NULL != ch->client);
1088 uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
1090 1089
1091 if (NULL == ch->tmit_head || 1090 if (NULL == ch->tmit_head ||
1092 ch->tmit_head->msg->type != ch->reconnect_msg->type) 1091 0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size))
1093 { 1092 {
1094 uint16_t reconn_size = ntohs (ch->reconnect_msg->size); 1093 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
1095 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); 1094 memcpy (&mq[1], ch->reconnect_msg, reconn_size);
1096 memcpy (&op[1], ch->reconnect_msg, reconn_size); 1095 GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq);
1097 op->msg = (struct GNUNET_MessageHeader *) &op[1];
1098 GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, op);
1099 } 1096 }
1100 transmit_next (ch); 1097 transmit_next (ch);
1101} 1098}
@@ -1104,7 +1101,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1104/** 1101/**
1105 * Disconnect from the PSYC service. 1102 * Disconnect from the PSYC service.
1106 * 1103 *
1107 * @param c Channel handle to disconnect 1104 * @param c Channel handle to disconnect.
1108 */ 1105 */
1109static void 1106static void
1110disconnect (void *c) 1107disconnect (void *c)
@@ -1167,6 +1164,7 @@ disconnect (void *c)
1167 * @param join_cb Function to invoke when a peer wants to join. 1164 * @param join_cb Function to invoke when a peer wants to join.
1168 * @param master_started_cb Function to invoke after the channel master started. 1165 * @param master_started_cb Function to invoke after the channel master started.
1169 * @param cls Closure for @a master_started_cb and @a join_cb. 1166 * @param cls Closure for @a master_started_cb and @a join_cb.
1167 *
1170 * @return Handle for the channel master, NULL on error. 1168 * @return Handle for the channel master, NULL on error.
1171 */ 1169 */
1172struct GNUNET_PSYC_Master * 1170struct GNUNET_PSYC_Master *
@@ -1187,17 +1185,16 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
1187 req->channel_key = *channel_key; 1185 req->channel_key = *channel_key;
1188 req->policy = policy; 1186 req->policy = policy;
1189 1187
1188 mst->start_cb = master_started_cb;
1189 ch->message_cb = message_cb;
1190 ch->join_cb = join_cb;
1191 ch->cb_cls = cls;
1190 ch->cfg = cfg; 1192 ch->cfg = cfg;
1191 ch->is_master = GNUNET_YES; 1193 ch->is_master = GNUNET_YES;
1192 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; 1194 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
1193 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 1195 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1194 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); 1196 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
1195 1197
1196 ch->message_cb = message_cb;
1197 ch->join_cb = join_cb;
1198 ch->cb_cls = cls;
1199 mst->start_cb = master_started_cb;
1200
1201 return mst; 1198 return mst;
1202} 1199}
1203 1200
@@ -1260,6 +1257,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1260 * @param notify_data Function to call to obtain fragments of the data. 1257 * @param notify_data Function to call to obtain fragments of the data.
1261 * @param notify_cls Closure for @a notify_mod and @a notify_data. 1258 * @param notify_cls Closure for @a notify_mod and @a notify_data.
1262 * @param flags Flags for the message being transmitted. 1259 * @param flags Flags for the message being transmitted.
1260 *
1263 * @return Transmission handle, NULL on error (i.e. more than one request queued). 1261 * @return Transmission handle, NULL on error (i.e. more than one request queued).
1264 */ 1262 */
1265struct GNUNET_PSYC_MasterTransmitHandle * 1263struct GNUNET_PSYC_MasterTransmitHandle *
@@ -1330,6 +1328,7 @@ GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
1330 * @param env Environment containing transient variables for the request, or NULL. 1328 * @param env Environment containing transient variables for the request, or NULL.
1331 * @param data Payload for the join message. 1329 * @param data Payload for the join message.
1332 * @param data_size Number of bytes in @a data. 1330 * @param data_size Number of bytes in @a data.
1331 *
1333 * @return Handle for the slave, NULL on error. 1332 * @return Handle for the slave, NULL on error.
1334 */ 1333 */
1335struct GNUNET_PSYC_Slave * 1334struct GNUNET_PSYC_Slave *
@@ -1361,6 +1360,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1361 req->relay_count = htonl (relay_count); 1360 req->relay_count = htonl (relay_count);
1362 memcpy (&req[1], relays, relay_count * sizeof (*relays)); 1361 memcpy (&req[1], relays, relay_count * sizeof (*relays));
1363 1362
1363 slv->join_cb = slave_joined_cb;
1364 ch->message_cb = message_cb; 1364 ch->message_cb = message_cb;
1365 ch->join_cb = join_cb; 1365 ch->join_cb = join_cb;
1366 ch->cb_cls = cls; 1366 ch->cb_cls = cls;
@@ -1371,7 +1371,6 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1371 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 1371 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1372 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); 1372 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
1373 1373
1374 slv->join_cb = slave_joined_cb;
1375 return slv; 1374 return slv;
1376} 1375}
1377 1376
@@ -1401,6 +1400,7 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1401 * @param notify_data Function to call to obtain fragments of the data. 1400 * @param notify_data Function to call to obtain fragments of the data.
1402 * @param notify_cls Closure for @a notify. 1401 * @param notify_cls Closure for @a notify.
1403 * @param flags Flags for the message being transmitted. 1402 * @param flags Flags for the message being transmitted.
1403 *
1404 * @return Transmission handle, NULL on error (i.e. more than one request 1404 * @return Transmission handle, NULL on error (i.e. more than one request
1405 * queued). 1405 * queued).
1406 */ 1406 */
@@ -1447,6 +1447,7 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1447 * APIs. 1447 * APIs.
1448 * 1448 *
1449 * @param master Channel master handle. 1449 * @param master Channel master handle.
1450 *
1450 * @return Channel handle, valid for as long as @a master is valid. 1451 * @return Channel handle, valid for as long as @a master is valid.
1451 */ 1452 */
1452struct GNUNET_PSYC_Channel * 1453struct GNUNET_PSYC_Channel *
@@ -1460,6 +1461,7 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1460 * Convert @a slave to a @e channel handle to access the @e channel APIs. 1461 * Convert @a slave to a @e channel handle to access the @e channel APIs.
1461 * 1462 *
1462 * @param slave Slave handle. 1463 * @param slave Slave handle.
1464 *
1463 * @return Channel handle, valid for as long as @a slave is valid. 1465 * @return Channel handle, valid for as long as @a slave is valid.
1464 */ 1466 */
1465struct GNUNET_PSYC_Channel * 1467struct GNUNET_PSYC_Channel *
@@ -1497,18 +1499,16 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
1497 uint64_t effective_since) 1499 uint64_t effective_since)
1498{ 1500{
1499 struct ChannelSlaveAdd *slvadd; 1501 struct ChannelSlaveAdd *slvadd;
1500 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvadd)); 1502 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd));
1501
1502 slvadd = (struct ChannelSlaveAdd *) &op[1];
1503 op->msg = (struct GNUNET_MessageHeader *) slvadd;
1504 1503
1504 slvadd = (struct ChannelSlaveAdd *) &mq[1];
1505 slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); 1505 slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
1506 slvadd->header.size = htons (sizeof (*slvadd)); 1506 slvadd->header.size = htons (sizeof (*slvadd));
1507 slvadd->announced_at = GNUNET_htonll (announced_at); 1507 slvadd->announced_at = GNUNET_htonll (announced_at);
1508 slvadd->effective_since = GNUNET_htonll (effective_since); 1508 slvadd->effective_since = GNUNET_htonll (effective_since);
1509 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, 1509 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1510 channel->tmit_tail, 1510 channel->tmit_tail,
1511 op); 1511 mq);
1512 transmit_next (channel); 1512 transmit_next (channel);
1513} 1513}
1514 1514
@@ -1540,16 +1540,15 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
1540 uint64_t announced_at) 1540 uint64_t announced_at)
1541{ 1541{
1542 struct ChannelSlaveRemove *slvrm; 1542 struct ChannelSlaveRemove *slvrm;
1543 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvrm)); 1543 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm));
1544 1544
1545 slvrm = (struct ChannelSlaveRemove *) &op[1]; 1545 slvrm = (struct ChannelSlaveRemove *) &mq[1];
1546 op->msg = (struct GNUNET_MessageHeader *) slvrm;
1547 slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); 1546 slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
1548 slvrm->header.size = htons (sizeof (*slvrm)); 1547 slvrm->header.size = htons (sizeof (*slvrm));
1549 slvrm->announced_at = GNUNET_htonll (announced_at); 1548 slvrm->announced_at = GNUNET_htonll (announced_at);
1550 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, 1549 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1551 channel->tmit_tail, 1550 channel->tmit_tail,
1552 op); 1551 mq);
1553 transmit_next (channel); 1552 transmit_next (channel);
1554} 1553}
1555 1554
@@ -1573,6 +1572,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
1573 * has been called, the client must not call 1572 * has been called, the client must not call
1574 * GNUNET_PSYC_channel_story_tell_cancel() anymore. 1573 * GNUNET_PSYC_channel_story_tell_cancel() anymore.
1575 * @param cls Closure for the callbacks. 1574 * @param cls Closure for the callbacks.
1575 *
1576 * @return Handle to cancel story telling operation. 1576 * @return Handle to cancel story telling operation.
1577 */ 1577 */
1578struct GNUNET_PSYC_Story * 1578struct GNUNET_PSYC_Story *
@@ -1615,6 +1615,7 @@ GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story)
1615 * @param cb Function called once when a matching state variable is found. 1615 * @param cb Function called once when a matching state variable is found.
1616 * Not called if there's no matching state variable. 1616 * Not called if there's no matching state variable.
1617 * @param cb_cls Closure for the callbacks. 1617 * @param cb_cls Closure for the callbacks.
1618 *
1618 * @return Handle that can be used to cancel the query operation. 1619 * @return Handle that can be used to cancel the query operation.
1619 */ 1620 */
1620struct GNUNET_PSYC_StateQuery * 1621struct GNUNET_PSYC_StateQuery *
@@ -1641,6 +1642,7 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
1641 * @param name_prefix Prefix of the state variable name to match. 1642 * @param name_prefix Prefix of the state variable name to match.
1642 * @param cb Function to call with the matching state variables. 1643 * @param cb Function to call with the matching state variables.
1643 * @param cb_cls Closure for the callbacks. 1644 * @param cb_cls Closure for the callbacks.
1645 *
1644 * @return Handle that can be used to cancel the query operation. 1646 * @return Handle that can be used to cancel the query operation.
1645 */ 1647 */
1646struct GNUNET_PSYC_StateQuery * 1648struct GNUNET_PSYC_StateQuery *
diff --git a/src/psyc/test_psyc.conf b/src/psyc/test_psyc.conf
index 7a1eb8404..70a408ae3 100644
--- a/src/psyc/test_psyc.conf
+++ b/src/psyc/test_psyc.conf
@@ -1,17 +1,2 @@
1[arm] 1[arm]
2UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-arm.sock 2DEFAULTSERVICES = psyc psycstore multicast
3DEFAULTSERVICES = psyc
4
5[psyc]
6AUTOSTART = YES
7BINARY = gnunet-service-psyc
8UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psyc.sock
9UNIX_MATCH_UID = NO
10UNIX_MATCH_GID = YES
11
12[psycstore]
13AUTOSTART = YES
14BINARY = gnunet-service-psycstore
15UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psycstore.sock
16UNIX_MATCH_UID = NO
17UNIX_MATCH_GID = YES