aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/gnunet_protocols.h50
-rw-r--r--src/include/gnunet_psyc_service.h5
-rw-r--r--src/include/gnunet_signatures.h7
-rw-r--r--src/multicast/gnunet-service-multicast.c399
-rw-r--r--src/multicast/multicast.h8
-rw-r--r--src/multicast/multicast_api.c99
-rw-r--r--src/psyc/gnunet-service-psyc.c481
-rw-r--r--src/psyc/psyc.h15
-rw-r--r--src/psyc/psyc_api.c40
-rw-r--r--src/psyc/test_psyc.c6
10 files changed, 817 insertions, 293 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index aeeeed9ea..e943b9c29 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2338,70 +2338,60 @@ extern "C"
2338#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START 750 2338#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START 750
2339 2339
2340/** 2340/**
2341 * C->S: Stop the origin.
2342 */
2343#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP 751
2344
2345/**
2346 * C->S: Join group as a member. 2341 * C->S: Join group as a member.
2347 */ 2342 */
2348#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 752 2343#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 751
2349
2350/**
2351 * C->S: Part the group.
2352 */
2353#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART 753
2354
2355/**
2356 * C<->S<->T: Multicast message from the origin to all members.
2357 */
2358#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 754
2359
2360/**
2361 * C<->S<->T: Unicast request from a group member to the origin.
2362 */
2363#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 755
2364 2344
2365/** 2345/**
2366 * C<--S<->T: A peer wants to join the group. 2346 * C<--S<->T: A peer wants to join the group.
2367 * 2347 *
2368 * Unicast message to the origin or another group member. 2348 * Unicast message to the origin or another group member.
2369 */ 2349 */
2370#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST 2350#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST 752
2371 2351
2372/** 2352/**
2373 * C<->S<->T: Response to a join request. 2353 * C<->S<->T: Response to a join request.
2374 * 2354 *
2375 * Unicast message from a group member to the peer wanting to join. 2355 * Unicast message from a group member to the peer wanting to join.
2376 */ 2356 */
2377#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION 2357#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION 753
2378 2358
2379/** 2359/**
2380 * A peer wants to part the group. 2360 * A peer wants to part the group.
2381 */ 2361 */
2382#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST 2362#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST 754
2383 2363
2384/** 2364/**
2385 * Acknowledgement sent in response to a part request. 2365 * Acknowledgement sent in response to a part request.
2386 * 2366 *
2387 * Unicast message from a group member to the peer wanting to part. 2367 * Unicast message from a group member to the peer wanting to part.
2388 */ 2368 */
2389#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK 2369#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK 755
2390 2370
2391/** 2371/**
2392 * Group terminated. 2372 * Group terminated.
2393 */ 2373 */
2394#define GNUNET_MESSAGE_TYPE_MULTICAST_GROUP_END 2374#define GNUNET_MESSAGE_TYPE_MULTICAST_GROUP_END 756
2395 2375
2396/** 2376/**
2397 * 2377 * C<->S<->T: Multicast message from the origin to all members.
2398 */ 2378 */
2399#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 2379#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 757
2400 2380
2401/** 2381/**
2402 * 2382 * C<->S<->T: Unicast request from a group member to the origin.
2383 */
2384#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 758
2385
2386/**
2387 * C<->S<->T: Replay request from a group member to another member.
2388 */
2389#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 759
2390
2391/**
2392 * C<->S<->T: Cancellation of a replay request.
2403 */ 2393 */
2404#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL 2394#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL 760
2405 2395
2406 2396
2407 2397
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h
index 928e05242..48c1107dc 100644
--- a/src/include/gnunet_psyc_service.h
+++ b/src/include/gnunet_psyc_service.h
@@ -620,9 +620,7 @@ typedef void
620 * @param message_cb Function to invoke on message parts received from the 620 * @param message_cb Function to invoke on message parts received from the
621 * channel, typically at least contains method handlers for @e join and 621 * channel, typically at least contains method handlers for @e join and
622 * @e part. 622 * @e part.
623 * @param join_cb function invoked once we have joined with the current 623 * @param slave_joined_cb Function invoked once we have joined the channel.
624 * message ID of the channel
625 * @param slave_joined_cb Function to invoke when a peer wants to join.
626 * @param cls Closure for @a message_cb and @a slave_joined_cb. 624 * @param cls Closure for @a message_cb and @a slave_joined_cb.
627 * @param method_name Method name for the join request. 625 * @param method_name Method name for the join request.
628 * @param env Environment containing transient variables for the request, or NULL. 626 * @param env Environment containing transient variables for the request, or NULL.
@@ -638,7 +636,6 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
638 uint32_t relay_count, 636 uint32_t relay_count,
639 const struct GNUNET_PeerIdentity *relays, 637 const struct GNUNET_PeerIdentity *relays,
640 GNUNET_PSYC_MessageCallback message_cb, 638 GNUNET_PSYC_MessageCallback message_cb,
641 GNUNET_PSYC_JoinCallback join_cb,
642 GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, 639 GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
643 void *cls, 640 void *cls,
644 const char *method_name, 641 const char *method_name,
diff --git a/src/include/gnunet_signatures.h b/src/include/gnunet_signatures.h
index fafe6e5ae..d875aeb6b 100644
--- a/src/include/gnunet_signatures.h
+++ b/src/include/gnunet_signatures.h
@@ -137,7 +137,7 @@ extern "C"
137#define GNUNET_SIGNATURE_PURPOSE_REGEX_ACCEPT 18 137#define GNUNET_SIGNATURE_PURPOSE_REGEX_ACCEPT 18
138 138
139/** 139/**
140 * Signature of a multicast message. 140 * Signature of a multicast message sent by the origin.
141 */ 141 */
142#define GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE 19 142#define GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE 19
143 143
@@ -166,6 +166,11 @@ extern "C"
166 */ 166 */
167#define GNUNET_SIGNATURE_PURPOSE_SECRETSHARING_DECRYPTION 23 167#define GNUNET_SIGNATURE_PURPOSE_SECRETSHARING_DECRYPTION 23
168 168
169/**
170 * Signature of a multicast request sent by a member.
171 */
172#define GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST 24
173
169 174
170#if 0 /* keep Emacsens' auto-indent happy */ 175#if 0 /* keep Emacsens' auto-indent happy */
171{ 176{
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c
index 0265660e1..0394ee19e 100644
--- a/src/multicast/gnunet-service-multicast.c
+++ b/src/multicast/gnunet-service-multicast.c
@@ -46,22 +46,40 @@ static struct GNUNET_SERVER_NotificationContext *nc;
46 46
47/** 47/**
48 * All connected origins. 48 * All connected origins.
49 * Group's pub_key_hash -> struct Group 49 * Group's pub_key_hash -> struct Origin
50 */ 50 */
51static struct GNUNET_CONTAINER_MultiHashMap *origins; 51static struct GNUNET_CONTAINER_MultiHashMap *origins;
52 52
53/** 53/**
54 * All connected members. 54 * All connected members.
55 * Group's pub_key_hash -> struct Group 55 * Group's pub_key_hash -> struct Member
56 */ 56 */
57static struct GNUNET_CONTAINER_MultiHashMap *members; 57static struct GNUNET_CONTAINER_MultiHashMap *members;
58 58
59/** 59/**
60 * Connected members per group.
61 * Group's pub_key_hash -> Member's pub_key -> struct Member
62 */
63static struct GNUNET_CONTAINER_MultiHashMap *group_members;
64
65
66/**
67 * List of connected clients.
68 */
69struct ClientList
70{
71 struct ClientList *prev;
72 struct ClientList *next;
73 struct GNUNET_SERVER_Client *client;
74};
75
76/**
60 * Common part of the client context for both an origin and member. 77 * Common part of the client context for both an origin and member.
61 */ 78 */
62struct Group 79struct Group
63{ 80{
64 struct GNUNET_SERVER_Client *client; 81 struct ClientList *clients_head;
82 struct ClientList *clients_tail;
65 83
66 /** 84 /**
67 * Public key of the group. 85 * Public key of the group.
@@ -117,6 +135,29 @@ struct Member
117 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; 135 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
118 136
119 /** 137 /**
138 * Public key of the member.
139 */
140 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
141
142 /**
143 * Hash of @a pub_key.
144 */
145 struct GNUNET_HashCode pub_key_hash;
146
147 /**
148 * Join request sent to the origin / members.
149 */
150 struct GNUNET_MULTICAST_JoinRequest *join_request;
151
152 /**
153 * Join decision sent in reply to our request.
154 *
155 * Only a positive decision is stored here, in case of a negative decision the
156 * client is disconnected.
157 */
158 struct MulticastJoinDecisionMessage *join_decision;
159
160 /**
120 * Last request fragment ID sent to the origin. 161 * Last request fragment ID sent to the origin.
121 */ 162 */
122 uint64_t max_fragment_id; 163 uint64_t max_fragment_id;
@@ -135,23 +176,161 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
135 /* FIXME: do clean up here */ 176 /* FIXME: do clean up here */
136} 177}
137 178
179/**
180 * Clean up origin data structures after a client disconnected.
181 */
182static void
183cleanup_origin (struct Origin *orig)
184{
185 struct Group *grp = &orig->grp;
186 GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig);
187}
188
189
190/**
191 * Clean up member data structures after a client disconnected.
192 */
193static void
194cleanup_member (struct Member *mem)
195{
196 struct Group *grp = &mem->grp;
197 struct GNUNET_CONTAINER_MultiHashMap *
198 grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
199 &grp->pub_key_hash);
200 GNUNET_assert (NULL != grp_mem);
201 GNUNET_CONTAINER_multihashmap_remove (grp_mem, &mem->pub_key_hash, mem);
202
203 if (0 == GNUNET_CONTAINER_multihashmap_size (grp_mem))
204 {
205 GNUNET_CONTAINER_multihashmap_remove (group_members, &grp->pub_key_hash,
206 grp_mem);
207 GNUNET_CONTAINER_multihashmap_destroy (grp_mem);
208 }
209 GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
210}
211
212
213/**
214 * Clean up group data structures after a client disconnected.
215 */
216static void
217cleanup_group (struct Group *grp)
218{
219 (GNUNET_YES == grp->is_origin)
220 ? cleanup_origin ((struct Origin *) grp)
221 : cleanup_member ((struct Member *) grp);
222
223 GNUNET_free (grp);
224}
225
226
227/**
228 * Called whenever a client is disconnected.
229 *
230 * Frees our resources associated with that client.
231 *
232 * @param cls Closure.
233 * @param client Client handle.
234 */
235static void
236client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
237{
238 if (NULL == client)
239 return;
240
241 struct Group *grp
242 = GNUNET_SERVER_client_get_user_context (client, struct Group);
243
244 if (NULL == grp)
245 {
246 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
247 "%p User context is NULL in client_disconnect()\n", grp);
248 GNUNET_assert (0);
249 return;
250 }
251
252 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
253 "%p Client (%s) disconnected from group %s\n",
254 grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
255 GNUNET_h2s (&grp->pub_key_hash));
256
257 struct ClientList *cl = grp->clients_head;
258 while (NULL != cl)
259 {
260 if (cl->client == client)
261 {
262 GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
263 GNUNET_free (cl);
264 break;
265 }
266 cl = cl->next;
267 }
268
269 if (NULL == grp->clients_head)
270 { /* Last client disconnected. */
271#if FIXME
272 if (NULL != grp->tmit_head)
273 { /* Send pending messages via CADET before cleanup. */
274 transmit_message (grp);
275 }
276 else
277#endif
278 {
279 cleanup_group (grp);
280 }
281 }
282}
283
138 284
139/** 285/**
140 * Iterator callback for sending a message to clients. 286 * Send message to all clients connected to the group.
287 */
288static void
289message_to_clients (const struct Group *grp,
290 const struct GNUNET_MessageHeader *msg)
291{
292 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
293 "%p Sending message to clients.\n", grp);
294
295 struct ClientList *cl = grp->clients_head;
296 while (NULL != cl)
297 {
298 GNUNET_SERVER_notification_context_add (nc, cl->client);
299 GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO);
300 cl = cl->next;
301 }
302}
303
304
305/**
306 * Iterator callback for sending a message to origin clients.
141 */ 307 */
142static int 308static int
143message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash, 309origin_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
144 void *group) 310 void *origin)
145{ 311{
146 const struct GNUNET_MessageHeader *msg = cls; 312 const struct GNUNET_MessageHeader *msg = cls;
147 struct Group *grp = group; 313 struct Member *orig = origin;
148 314
149 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 315 message_to_clients (&orig->grp, msg);
150 "%p Sending message to client.\n", grp); 316 return GNUNET_YES;
317}
151 318
152 GNUNET_SERVER_notification_context_add (nc, grp->client);
153 GNUNET_SERVER_notification_context_unicast (nc, grp->client, msg, GNUNET_NO);
154 319
320/**
321 * Iterator callback for sending a message to member clients.
322 */
323static int
324member_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
325 void *member)
326{
327 const struct GNUNET_MessageHeader *msg = cls;
328 struct Member *mem = member;
329
330 if (NULL != mem->join_decision)
331 { /* Only send message to admitted members */
332 message_to_clients (&mem->grp, msg);
333 }
155 return GNUNET_YES; 334 return GNUNET_YES;
156} 335}
157 336
@@ -167,10 +346,10 @@ message_to_group (struct Group *grp, const struct GNUNET_MessageHeader *msg)
167{ 346{
168 if (origins != NULL) 347 if (origins != NULL)
169 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, 348 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
170 message_callback, (void *) msg); 349 origin_message_cb, (void *) msg);
171 if (members != NULL) 350 if (members != NULL)
172 GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, 351 GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
173 message_callback, (void *) msg); 352 member_message_cb, (void *) msg);
174} 353}
175 354
176 355
@@ -185,7 +364,7 @@ message_to_origin (struct Group *grp, const struct GNUNET_MessageHeader *msg)
185{ 364{
186 if (origins != NULL) 365 if (origins != NULL)
187 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, 366 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
188 message_callback, (void *) msg); 367 origin_message_cb, (void *) msg);
189} 368}
190 369
191 370
@@ -199,38 +378,47 @@ handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
199 const struct MulticastOriginStartMessage * 378 const struct MulticastOriginStartMessage *
200 msg = (const struct MulticastOriginStartMessage *) m; 379 msg = (const struct MulticastOriginStartMessage *) m;
201 380
202 struct Origin *orig = GNUNET_new (struct Origin); 381 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
203 orig->priv_key = msg->group_key; 382 struct GNUNET_HashCode pub_key_hash;
204 383
205 struct Group *grp = &orig->grp; 384 GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key);
206 grp->is_origin = GNUNET_YES; 385 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
207 grp->client = client; 386
387 struct Origin *
388 orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash);
389 struct Group *grp;
390
391 if (NULL == orig)
392 {
393 orig = GNUNET_new (struct Origin);
394 orig->priv_key = msg->group_key;
395 grp = &orig->grp;
396 grp->is_origin = GNUNET_YES;
397 grp->pub_key = pub_key;
398 grp->pub_key_hash = pub_key_hash;
399
400 GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
401 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
402 }
403 else
404 {
405 grp = &orig->grp;
406 }
208 407
209 GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key); 408 struct ClientList *cl = GNUNET_new (struct ClientList);
210 GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash); 409 cl->client = client;
410 GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
211 411
212 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 412 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
213 "%p Client connected as origin to group %s.\n", 413 "%p Client connected as origin to group %s.\n",
214 orig, GNUNET_h2s (&grp->pub_key_hash)); 414 orig, GNUNET_h2s (&grp->pub_key_hash));
215 415
216 GNUNET_SERVER_client_set_user_context (client, grp); 416 GNUNET_SERVER_client_set_user_context (client, grp);
217 GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
218 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
219 GNUNET_SERVER_receive_done (client, GNUNET_OK); 417 GNUNET_SERVER_receive_done (client, GNUNET_OK);
220} 418}
221 419
222 420
223/** 421/**
224 * Handle a client stopping an origin.
225 */
226static void
227handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client,
228 const struct GNUNET_MessageHeader *msg)
229{
230}
231
232
233/**
234 * Handle a connecting client joining a group. 422 * Handle a connecting client joining a group.
235 */ 423 */
236static void 424static void
@@ -240,34 +428,113 @@ handle_member_join (void *cls, struct GNUNET_SERVER_Client *client,
240 struct MulticastMemberJoinMessage * 428 struct MulticastMemberJoinMessage *
241 msg = (struct MulticastMemberJoinMessage *) m; 429 msg = (struct MulticastMemberJoinMessage *) m;
242 430
243 struct Member *mem = GNUNET_new (struct Member); 431 struct GNUNET_CRYPTO_EddsaPublicKey mem_pub_key;
244 mem->priv_key = msg->member_key; 432 struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash;
245 433
246 struct Group *grp = &mem->grp; 434 GNUNET_CRYPTO_eddsa_key_get_public (&msg->member_key, &mem_pub_key);
247 grp->is_origin = GNUNET_NO; 435 GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
248 grp->client = client; 436 GNUNET_CRYPTO_hash (&msg->group_key, sizeof (msg->group_key), &pub_key_hash);
249 grp->pub_key = msg->group_key; 437
250 GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash); 438 struct GNUNET_CONTAINER_MultiHashMap *
439 grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
440 struct Member *mem = NULL;
441 struct Group *grp;
442
443 if (NULL == grp_mem)
444 {
445 grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
446 GNUNET_CONTAINER_multihashmap_put (group_members, &pub_key_hash, grp_mem,
447 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
448 }
449 else
450 {
451 mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash);
452 }
453
454 if (NULL == mem)
455 {
456 mem = GNUNET_new (struct Member);
457 mem->priv_key = msg->member_key;
458 mem->pub_key = mem_pub_key;
459 mem->pub_key_hash = mem_pub_key_hash;
460
461 grp = &mem->grp;
462 grp->is_origin = GNUNET_NO;
463 grp->pub_key = msg->group_key;
464 grp->pub_key_hash = pub_key_hash;
465
466 GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem_pub_key_hash, mem,
467 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
468 GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
469 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
470 }
471 else
472 {
473 grp = &mem->grp;
474 }
475
476 struct ClientList *cl = GNUNET_new (struct ClientList);
477 cl->client = client;
478 GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
251 479
252 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 480 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
253 "%p Client connected as member to group %s.\n", 481 "%p Client connected as member to group %s.\n",
254 mem, GNUNET_h2s (&grp->pub_key_hash)); 482 mem, GNUNET_h2s (&grp->pub_key_hash));
255 483
256 GNUNET_SERVER_client_set_user_context (client, grp); 484 GNUNET_SERVER_client_set_user_context (client, grp);
257 GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
258 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
259 GNUNET_SERVER_receive_done (client, GNUNET_OK);
260}
261
262
263/**
264 * Handle a client parting a group.
265 */
266static void
267handle_member_part (void *cls, struct GNUNET_SERVER_Client *client,
268 const struct GNUNET_MessageHeader *msg)
269{
270 485
486 if (NULL != mem->join_decision)
487 { /* Already got a join decision, send it to client. */
488 GNUNET_SERVER_notification_context_add (nc, client);
489 GNUNET_SERVER_notification_context_unicast (nc, client,
490 (struct GNUNET_MessageHeader *)
491 mem->join_decision,
492 GNUNET_NO);
493 }
494 else if (grp->clients_head == grp->clients_tail)
495 { /* First client, send join request. */
496 struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
497 uint32_t relay_count = ntohs (msg->relay_count);
498 struct GNUNET_MessageHeader *
499 join_req = ((struct GNUNET_MessageHeader *)
500 ((char *) &msg[1]) + relay_count * sizeof (*relays));
501 uint16_t join_req_size = ntohs (join_req->size);
502
503 struct MulticastJoinRequestMessage *
504 req = GNUNET_malloc (sizeof (*req) + join_req_size);
505 req->header.size = htons (sizeof (*req) + join_req_size);
506 req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
507 req->group_key = grp->pub_key;
508 GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &req->member_key);
509 memcpy (&req[1], join_req, join_req_size);
510
511 req->purpose.size = htonl (sizeof (*req) + join_req_size
512 - sizeof (req->header)
513 - sizeof (req->signature));
514 req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
515
516 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->priv_key, &req->purpose,
517 &req->signature))
518 {
519 /* FIXME: handle error */
520 GNUNET_assert (0);
521 }
522
523 if (NULL != mem->join_request)
524 GNUNET_free (mem->join_request);
525 mem->join_request = req;
526
527 if (GNUNET_YES
528 == GNUNET_CONTAINER_multihashmap_contains (origins, &grp->pub_key_hash))
529 { /* Local origin */
530 message_to_origin (grp, (struct GNUNET_MessageHeader *) mem->join_request);
531 }
532 else
533 {
534 /* FIXME: send join request to remote origin / members */
535 }
536 }
537 GNUNET_SERVER_receive_done (client, GNUNET_OK);
271} 538}
272 539
273 540
@@ -296,7 +563,7 @@ handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
296 &msg->signature)) 563 &msg->signature))
297 { 564 {
298 /* FIXME: handle error */ 565 /* FIXME: handle error */
299 return; 566 GNUNET_assert (0);
300 } 567 }
301 568
302 /* FIXME: send to remote members */ 569 /* FIXME: send to remote members */
@@ -327,18 +594,24 @@ handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
327 - sizeof (req->header) 594 - sizeof (req->header)
328 - sizeof (req->member_key) 595 - sizeof (req->member_key)
329 - sizeof (req->signature)); 596 - sizeof (req->signature));
330 req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE); 597 req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
331 598
332 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->priv_key, &req->purpose, 599 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->priv_key, &req->purpose,
333 &req->signature)) 600 &req->signature))
334 { 601 {
335 /* FIXME: handle error */ 602 /* FIXME: handle error */
336 return; 603 GNUNET_assert (0);
337 } 604 }
338 605
339 /* FIXME: send to remote origin */ 606 if (GNUNET_YES
340 607 == GNUNET_CONTAINER_multihashmap_contains (origins, &grp->pub_key_hash))
341 message_to_origin (grp, m); 608 { /* Local origin */
609 message_to_origin (grp, m);
610 }
611 else
612 {
613 /* FIXME: send to remote origin */
614 }
342 GNUNET_SERVER_receive_done (client, GNUNET_OK); 615 GNUNET_SERVER_receive_done (client, GNUNET_OK);
343} 616}
344 617
@@ -357,15 +630,9 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
357 { &handle_origin_start, NULL, 630 { &handle_origin_start, NULL,
358 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 }, 631 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 },
359 632
360 { &handle_origin_stop, NULL,
361 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP, 0 },
362
363 { &handle_member_join, NULL, 633 { &handle_member_join, NULL,
364 GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 }, 634 GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 },
365 635
366 { &handle_member_part, NULL,
367 GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART, 0 },
368
369 { &handle_multicast_message, NULL, 636 { &handle_multicast_message, NULL,
370 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 }, 637 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
371 638
@@ -379,9 +646,11 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
379 stats = GNUNET_STATISTICS_create ("multicast", cfg); 646 stats = GNUNET_STATISTICS_create ("multicast", cfg);
380 origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 647 origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
381 members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 648 members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
649 group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
382 nc = GNUNET_SERVER_notification_context_create (server, 1); 650 nc = GNUNET_SERVER_notification_context_create (server, 1);
383 651
384 GNUNET_SERVER_add_handlers (server, handlers); 652 GNUNET_SERVER_add_handlers (server, handlers);
653 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
385 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task, 654 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task,
386 NULL); 655 NULL);
387} 656}
diff --git a/src/multicast/multicast.h b/src/multicast/multicast.h
index daa79e260..5b0fc647c 100644
--- a/src/multicast/multicast.h
+++ b/src/multicast/multicast.h
@@ -33,7 +33,7 @@ GNUNET_NETWORK_STRUCT_BEGIN
33/** 33/**
34 * Header of a join request sent to the origin or another member. 34 * Header of a join request sent to the origin or another member.
35 */ 35 */
36struct GNUNET_MULTICAST_JoinRequest 36struct MulticastJoinRequestMessage
37{ 37{
38 /** 38 /**
39 * Header for the join request. 39 * Header for the join request.
@@ -67,7 +67,7 @@ struct GNUNET_MULTICAST_JoinRequest
67 */ 67 */
68 struct GNUNET_PeerIdentity member_peer; 68 struct GNUNET_PeerIdentity member_peer;
69 69
70 /* Followed by request body. */ 70 /* Followed by struct GNUNET_MessageHeader join_request */
71}; 71};
72 72
73 73
@@ -97,9 +97,9 @@ struct MulticastJoinDecisionMessage
97 */ 97 */
98 uint32_t relay_count; 98 uint32_t relay_count;
99 99
100 /* followed by 'relay_count' peer identities */ 100 /* Followed by relay_count peer identities */
101 101
102 /* followed by the join response message */ 102 /* Followed by the join response message */
103 103
104}; 104};
105 105
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index d42f438ae..84dac0545 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -196,6 +196,17 @@ struct GNUNET_MULTICAST_Member
196 */ 196 */
197struct GNUNET_MULTICAST_JoinHandle 197struct GNUNET_MULTICAST_JoinHandle
198{ 198{
199 struct GNUNET_MULTICAST_Group *group;
200
201 /**
202 * Public key of the joining member.
203 */
204 struct GNUNET_CRYPTO_EddsaPublicKey member_key;
205
206 /**
207 * Peer identity of the joining member.
208 */
209 struct GNUNET_PeerIdentity member_peer;
199}; 210};
200 211
201 212
@@ -437,8 +448,7 @@ disconnect (void *g)
437 * Iterator callback for calling message callbacks for all groups. 448 * Iterator callback for calling message callbacks for all groups.
438 */ 449 */
439static int 450static int
440message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash, 451message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group)
441 void *group)
442{ 452{
443 const struct GNUNET_MessageHeader *msg = cls; 453 const struct GNUNET_MessageHeader *msg = cls;
444 struct GNUNET_MULTICAST_Group *grp = group; 454 struct GNUNET_MULTICAST_Group *grp = group;
@@ -456,32 +466,10 @@ message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash,
456 466
457 467
458/** 468/**
459 * Handle a multicast message from the service.
460 *
461 * Call message callbacks of all origins and members of the destination group.
462 *
463 * @param grp Destination group of the message.
464 * @param msg The message.
465 */
466static void
467handle_multicast_message (struct GNUNET_MULTICAST_Group *grp,
468 const struct GNUNET_MULTICAST_MessageHeader *msg)
469{
470 if (origins != NULL)
471 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
472 message_callback, (void *) msg);
473 if (members != NULL)
474 GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
475 message_callback, (void *) msg);
476}
477
478
479/**
480 * Iterator callback for calling request callbacks of origins. 469 * Iterator callback for calling request callbacks of origins.
481 */ 470 */
482static int 471static int
483request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash, 472request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin)
484 void *origin)
485{ 473{
486 const struct GNUNET_MULTICAST_RequestHeader *req = cls; 474 const struct GNUNET_MULTICAST_RequestHeader *req = cls;
487 struct GNUNET_MULTICAST_Origin *orig = origin; 475 struct GNUNET_MULTICAST_Origin *orig = origin;
@@ -497,20 +485,26 @@ request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
497 485
498 486
499/** 487/**
500 * Handle a multicast request from the service. 488 * Iterator callback for calling join request callbacks of origins.
501 *
502 * Call request callbacks of all origins of the destination group.
503 *
504 * @param grp Destination group of the message.
505 * @param msg The message.
506 */ 489 */
507static void 490static int
508handle_multicast_request (struct GNUNET_MULTICAST_Group *grp, 491join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
509 const struct GNUNET_MULTICAST_RequestHeader *req) 492 void *group)
510{ 493{
511 if (NULL != origins) 494 const struct MulticastJoinRequestMessage *req = cls;
512 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, 495 struct GNUNET_MULTICAST_Group *grp = group;
513 request_callback, (void *) req); 496
497 struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
498 jh->group = grp;
499 jh->member_key = req->member_key;
500 jh->member_peer = req->member_peer;
501
502 const struct GNUNET_MessageHeader *msg = NULL;
503 if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size))
504 msg =(const struct GNUNET_MessageHeader *) &req[1];
505
506 grp->join_cb (grp->cb_cls, &req->member_key, msg, jh);
507 return GNUNET_YES;
514} 508}
515 509
516 510
@@ -551,22 +545,31 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
551 size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader); 545 size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader);
552 break; 546 break;
553 547
548 case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
549 size_min = sizeof (struct MulticastJoinRequestMessage);
550 break;
551
554 default: 552 default:
555 GNUNET_break_op (0); 553 GNUNET_break_op (0);
556 return; 554 type = 0;
557 } 555 }
558 556
559 if (! ((0 < size_eq && size == size_eq) 557 if (! ((0 < size_eq && size == size_eq)
560 || (0 < size_min && size_min <= size))) 558 || (0 < size_min && size_min <= size)))
561 { 559 {
562 GNUNET_break_op (0); 560 GNUNET_break_op (0);
563 return; 561 type = 0;
564 } 562 }
565 563
566 switch (type) 564 switch (type)
567 { 565 {
568 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: 566 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
569 handle_multicast_message (grp, (struct GNUNET_MULTICAST_MessageHeader *) msg); 567 if (origins != NULL)
568 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
569 message_cb, (void *) msg);
570 if (members != NULL)
571 GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
572 message_cb, (void *) msg);
570 break; 573 break;
571 574
572 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: 575 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
@@ -576,12 +579,19 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
576 break; 579 break;
577 } 580 }
578 581
579 handle_multicast_request (grp, (struct GNUNET_MULTICAST_RequestHeader *) msg); 582 if (NULL != origins)
583 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
584 request_cb, (void *) msg);
580 break; 585 break;
581 586
582 default: 587 case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
583 GNUNET_break_op (0); 588 if (NULL != origins)
584 return; 589 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
590 join_request_cb, (void *) msg);
591 if (NULL != members)
592 GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
593 join_request_cb, (void *) msg);
594 break;
585 } 595 }
586 596
587 if (NULL != grp->client) 597 if (NULL != grp->client)
@@ -621,6 +631,7 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh,
621 const struct GNUNET_PeerIdentity *relays, 631 const struct GNUNET_PeerIdentity *relays,
622 const struct GNUNET_MessageHeader *join_response) 632 const struct GNUNET_MessageHeader *join_response)
623{ 633{
634 GNUNET_free (jh);
624 return NULL; 635 return NULL;
625} 636}
626 637
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 70322adaa..765371d77 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -59,16 +59,22 @@ static struct GNUNET_PSYCSTORE_Handle *store;
59 59
60/** 60/**
61 * All connected masters. 61 * All connected masters.
62 * Channel's pub_key_hash -> struct Channel 62 * Channel's pub_key_hash -> struct Master
63 */ 63 */
64static struct GNUNET_CONTAINER_MultiHashMap *masters; 64static struct GNUNET_CONTAINER_MultiHashMap *masters;
65 65
66/** 66/**
67 * All connected slaves. 67 * All connected slaves.
68 * Channel's pub_key_hash -> struct Channel 68 * Channel's pub_key_hash -> struct Slave
69 */ 69 */
70static struct GNUNET_CONTAINER_MultiHashMap *slaves; 70static struct GNUNET_CONTAINER_MultiHashMap *slaves;
71 71
72/**
73 * Connected slaves per channel.
74 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
75 */
76static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
77
72 78
73/** 79/**
74 * Message in the transmission queue. 80 * Message in the transmission queue.
@@ -78,6 +84,8 @@ struct TransmitMessage
78 struct TransmitMessage *prev; 84 struct TransmitMessage *prev;
79 struct TransmitMessage *next; 85 struct TransmitMessage *next;
80 86
87 struct GNUNET_SERVER_Client *client;
88
81 /** 89 /**
82 * ID assigned to the message. 90 * ID assigned to the message.
83 */ 91 */
@@ -164,11 +172,23 @@ struct FragmentQueue
164 172
165 173
166/** 174/**
175 * List of connected clients.
176 */
177struct ClientList
178{
179 struct ClientList *prev;
180 struct ClientList *next;
181 struct GNUNET_SERVER_Client *client;
182};
183
184
185/**
167 * Common part of the client context for both a channel master and slave. 186 * Common part of the client context for both a channel master and slave.
168 */ 187 */
169struct Channel 188struct Channel
170{ 189{
171 struct GNUNET_SERVER_Client *client; 190 struct ClientList *clients_head;
191 struct ClientList *clients_tail;
172 192
173 struct TransmitMessage *tmit_head; 193 struct TransmitMessage *tmit_head;
174 struct TransmitMessage *tmit_tail; 194 struct TransmitMessage *tmit_tail;
@@ -316,6 +336,16 @@ struct Slave
316 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; 336 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
317 337
318 /** 338 /**
339 * Public key of the slave.
340 */
341 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
342
343 /**
344 * Hash of @a pub_key.
345 */
346 struct GNUNET_HashCode pub_key_hash;
347
348 /**
319 * Handle for the multicast member. 349 * Handle for the multicast member.
320 */ 350 */
321 struct GNUNET_MULTICAST_Member *member; 351 struct GNUNET_MULTICAST_Member *member;
@@ -378,30 +408,62 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
378} 408}
379 409
380 410
411/**
412 * Clean up master data structures after a client disconnected.
413 */
381static void 414static void
382client_cleanup (struct Channel *ch) 415cleanup_master (struct Master *mst)
383{ 416{
384 /* FIXME: fragment_cache_clear */ 417 struct Channel *ch = &mst->channel;
385 418
386 if (ch->is_master) 419 if (NULL != mst->origin)
387 { 420 GNUNET_MULTICAST_origin_stop (mst->origin);
388 struct Master *mst = (struct Master *) ch; 421 GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
389 if (NULL != mst->origin) 422}
390 GNUNET_MULTICAST_origin_stop (mst->origin); 423
391 GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch); 424
392 } 425/**
393 else 426 * Clean up slave data structures after a client disconnected.
427 */
428static void
429cleanup_slave (struct Slave *slv)
430{
431 struct Channel *ch = &slv->channel;
432 struct GNUNET_CONTAINER_MultiHashMap *
433 ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
434 &ch->pub_key_hash);
435 GNUNET_assert (NULL != ch_slv);
436 GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv);
437
438 if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv))
394 { 439 {
395 struct Slave *slv = (struct Slave *) ch; 440 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash,
396 if (NULL != slv->join_req) 441 ch_slv);
397 GNUNET_free (slv->join_req); 442 GNUNET_CONTAINER_multihashmap_destroy (ch_slv);
398 if (NULL != slv->relays)
399 GNUNET_free (slv->relays);
400 if (NULL != slv->member)
401 GNUNET_MULTICAST_member_part (slv->member);
402 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
403 } 443 }
444 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv);
445
446 if (NULL != slv->join_req)
447 GNUNET_free (slv->join_req);
448 if (NULL != slv->relays)
449 GNUNET_free (slv->relays);
450 if (NULL != slv->member)
451 GNUNET_MULTICAST_member_part (slv->member);
452 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
453}
404 454
455
456/**
457 * Clean up channel data structures after a client disconnected.
458 */
459static void
460cleanup_channel (struct Channel *ch)
461{
462 /* FIXME: fragment_cache_clear */
463
464 (GNUNET_YES == ch->is_master)
465 ? cleanup_master ((struct Master *) ch)
466 : cleanup_slave ((struct Slave *) ch);
405 GNUNET_free (ch); 467 GNUNET_free (ch);
406} 468}
407 469
@@ -421,7 +483,10 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
421 483
422 struct Channel *ch 484 struct Channel *ch
423 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 485 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch); 486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
487 "%p Client (%s) disconnected from channel %s\n",
488 ch, (GNUNET_YES == ch->is_master) ? "master" : "slave",
489 GNUNET_h2s (&ch->pub_key_hash));
425 490
426 if (NULL == ch) 491 if (NULL == ch)
427 { 492 {
@@ -431,29 +496,112 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
431 return; 496 return;
432 } 497 }
433 498
434 ch->disconnected = GNUNET_YES; 499 struct ClientList *cl = ch->clients_head;
500 while (NULL != cl)
501 {
502 if (cl->client == client)
503 {
504 GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl);
505 GNUNET_free (cl);
506 break;
507 }
508 cl = cl->next;
509 }
510
511 if (NULL == ch->clients_head)
512 { /* Last client disconnected. */
513 if (NULL != ch->tmit_head)
514 { /* Send pending messages to multicast before cleanup. */
515 transmit_message (ch);
516 }
517 else
518 {
519 cleanup_channel (ch);
520 }
521 }
522}
523
435 524
436 /* Send pending messages to multicast before cleanup. */ 525/**
437 if (NULL != ch->tmit_head) 526 * Send message to all clients connected to the channel.
527 */
528static void
529msg_to_clients (const struct Channel *ch,
530 const struct GNUNET_MessageHeader *msg)
531{
532 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
533 "%p Sending message to clients.\n", ch);
534
535 struct ClientList *cl = ch->clients_head;
536 while (NULL != cl)
438 { 537 {
439 transmit_message (ch); 538 GNUNET_SERVER_notification_context_add (nc, cl->client);
539 GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO);
540 cl = cl->next;
541 }
542}
543
544
545/**
546 * Closure for join_mem_test_cb()
547 */
548struct JoinMemTestCls
549{
550 struct Channel *ch;
551 struct GNUNET_MULTICAST_JoinHandle *jh;
552 struct MasterJoinRequest *master_join_req;
553};
554
555
556/**
557 * Membership test result callback used for join requests.m
558 */
559static void
560join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
561{
562 struct JoinMemTestCls *jcls = cls;
563
564 if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master)
565 { /* Pass on join request to client if this is a master channel */
566 msg_to_clients (jcls->ch,
567 (struct GNUNET_MessageHeader *) jcls->master_join_req);
440 } 568 }
441 else 569 else
442 { 570 {
443 client_cleanup (ch); 571 // FIXME: relays
572 GNUNET_MULTICAST_join_decision(jcls->jh, result, 0, NULL, NULL);
444 } 573 }
574 GNUNET_free (jcls->master_join_req);
575 GNUNET_free (jcls);
445} 576}
446 577
447 578
448/** 579/**
449 * Master receives a join request from a slave. 580 * Incoming join request from multicast.
450 */ 581 */
451static void 582static void
452join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 583join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
453 const struct GNUNET_MessageHeader *join_req, 584 const struct GNUNET_MessageHeader *join_req,
454 struct GNUNET_MULTICAST_JoinHandle *jh) 585 struct GNUNET_MULTICAST_JoinHandle *jh)
455{ 586{
456 587 struct Channel *ch = cls;
588 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch);
589
590 uint16_t join_req_size = (NULL != join_req) ? ntohs (join_req->size) : 0;
591 struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_req_size);
592 req->header.size = htons (sizeof (*req) + join_req_size);
593 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
594 req->slave_key = *slave_key;
595 memcpy (&req[1], join_req, join_req_size);
596
597 struct JoinMemTestCls *jcls = GNUNET_malloc (sizeof (*jcls));
598 jcls->ch = ch;
599 jcls->jh = jh;
600 jcls->master_join_req = req;
601
602 GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key,
603 ch->max_message_id, 0,
604 &join_mem_test_cb, jcls);
457} 605}
458 606
459 607
@@ -474,6 +622,7 @@ replay_fragment_cb (void *cls,
474 struct GNUNET_MULTICAST_ReplayHandle *rh) 622 struct GNUNET_MULTICAST_ReplayHandle *rh)
475 623
476{ 624{
625
477} 626}
478 627
479 628
@@ -497,35 +646,6 @@ fragment_store_result (void *cls, int64_t result, const char *err_msg)
497} 646}
498 647
499 648
500static void
501message_to_client (struct Channel *ch,
502 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
503{
504 uint16_t size = ntohs (mmsg->header.size);
505 struct GNUNET_PSYC_MessageHeader *pmsg;
506 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
507
508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
509 "%p Sending message to client. "
510 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
511 ch, GNUNET_ntohll (mmsg->fragment_id),
512 GNUNET_ntohll (mmsg->message_id));
513
514 pmsg = GNUNET_malloc (psize);
515 pmsg->header.size = htons (psize);
516 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
517 pmsg->message_id = mmsg->message_id;
518
519 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
520
521 GNUNET_SERVER_notification_context_add (nc, ch->client);
522 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
523 (const struct GNUNET_MessageHeader *) pmsg,
524 GNUNET_NO);
525 GNUNET_free (pmsg);
526}
527
528
529/** 649/**
530 * Convert an uint64_t in network byte order to a HashCode 650 * Convert an uint64_t in network byte order to a HashCode
531 * that can be used as key in a MultiHashMap 651 * that can be used as key in a MultiHashMap
@@ -564,6 +684,34 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
564 684
565 685
566/** 686/**
687 * Send multicast message to all clients connected to the channel.
688 */
689static void
690mmsg_to_clients (struct Channel *ch,
691 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
692{
693 uint16_t size = ntohs (mmsg->header.size);
694 struct GNUNET_PSYC_MessageHeader *pmsg;
695 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
696
697 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
698 "%p Sending message to client. "
699 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
700 ch, GNUNET_ntohll (mmsg->fragment_id),
701 GNUNET_ntohll (mmsg->message_id));
702
703 pmsg = GNUNET_malloc (psize);
704 pmsg->header.size = htons (psize);
705 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
706 pmsg->message_id = mmsg->message_id;
707
708 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
709 msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
710 GNUNET_free (pmsg);
711}
712
713
714/**
567 * Insert a multicast message fragment into the queue belonging to the message. 715 * Insert a multicast message fragment into the queue belonging to the message.
568 * 716 *
569 * @param ch Channel. 717 * @param ch Channel.
@@ -752,7 +900,7 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id,
752 { 900 {
753 if (GNUNET_NO == drop) 901 if (GNUNET_NO == drop)
754 { 902 {
755 message_to_client (ch, cache_entry->mmsg); 903 mmsg_to_clients (ch, cache_entry->mmsg);
756 } 904 }
757 if (cache_entry->ref_count <= 1) 905 if (cache_entry->ref_count <= 1)
758 { 906 {
@@ -997,11 +1145,7 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
997 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); 1145 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
998 1146
999 memcpy (&pmsg[1], &req[1], size - sizeof (*req)); 1147 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1000 1148 msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
1001 GNUNET_SERVER_notification_context_add (nc, ch->client);
1002 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
1003 (const struct GNUNET_MessageHeader *) pmsg,
1004 GNUNET_NO);
1005 GNUNET_free (pmsg); 1149 GNUNET_free (pmsg);
1006 break; 1150 break;
1007 } 1151 }
@@ -1025,11 +1169,11 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1025 struct Master *mst = cls; 1169 struct Master *mst = cls;
1026 struct Channel *ch = &mst->channel; 1170 struct Channel *ch = &mst->channel;
1027 1171
1028 struct CountersResult *res = GNUNET_malloc (sizeof (*res)); 1172 struct CountersResult res;
1029 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1173 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1030 res->header.size = htons (sizeof (*res)); 1174 res.header.size = htons (sizeof (res));
1031 res->result_code = htonl (result); 1175 res.result_code = htonl (result);
1032 res->max_message_id = GNUNET_htonll (max_message_id); 1176 res.max_message_id = GNUNET_htonll (max_message_id);
1033 1177
1034 if (GNUNET_OK == result || GNUNET_NO == result) 1178 if (GNUNET_OK == result || GNUNET_NO == result)
1035 { 1179 {
@@ -1053,10 +1197,7 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1053 ch, result, GNUNET_h2s (&ch->pub_key_hash)); 1197 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1054 } 1198 }
1055 1199
1056 GNUNET_SERVER_notification_context_add (nc, ch->client); 1200 msg_to_clients (ch, &res.header);
1057 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
1058 GNUNET_NO);
1059 GNUNET_free (res);
1060} 1201}
1061 1202
1062 1203
@@ -1071,11 +1212,11 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1071 struct Slave *slv = cls; 1212 struct Slave *slv = cls;
1072 struct Channel *ch = &slv->channel; 1213 struct Channel *ch = &slv->channel;
1073 1214
1074 struct CountersResult *res = GNUNET_malloc (sizeof (*res)); 1215 struct CountersResult res;
1075 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1216 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1076 res->header.size = htons (sizeof (*res)); 1217 res.header.size = htons (sizeof (res));
1077 res->result_code = htonl (result); 1218 res.result_code = htonl (result);
1078 res->max_message_id = GNUNET_htonll (max_message_id); 1219 res.max_message_id = GNUNET_htonll (max_message_id);
1079 1220
1080 if (GNUNET_OK == result || GNUNET_NO == result) 1221 if (GNUNET_OK == result || GNUNET_NO == result)
1081 { 1222 {
@@ -1099,10 +1240,7 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1099 ch, result, GNUNET_h2s (&ch->pub_key_hash)); 1240 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1100 } 1241 }
1101 1242
1102 GNUNET_SERVER_notification_context_add (nc, ch->client); 1243 msg_to_clients (ch, &res.header);
1103 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
1104 GNUNET_NO);
1105 GNUNET_free (res);
1106} 1244}
1107 1245
1108 1246
@@ -1125,25 +1263,55 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1125 const struct MasterStartRequest *req 1263 const struct MasterStartRequest *req
1126 = (const struct MasterStartRequest *) msg; 1264 = (const struct MasterStartRequest *) msg;
1127 1265
1128 struct Master *mst = GNUNET_new (struct Master); 1266 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1129 mst->policy = ntohl (req->policy); 1267 struct GNUNET_HashCode pub_key_hash;
1130 mst->priv_key = req->channel_key;
1131 1268
1132 struct Channel *ch = &mst->channel; 1269 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1133 ch->client = client; 1270 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1134 ch->is_master = GNUNET_YES; 1271
1135 GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key); 1272 struct Master *
1136 GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash); 1273 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1137 channel_init (ch); 1274 struct Channel *ch;
1275
1276 if (NULL == mst)
1277 {
1278 mst = GNUNET_new (struct Master);
1279 mst->policy = ntohl (req->policy);
1280 mst->priv_key = req->channel_key;
1281
1282 ch = &mst->channel;
1283 ch->is_master = GNUNET_YES;
1284 ch->pub_key = pub_key;
1285 ch->pub_key_hash = pub_key_hash;
1286 channel_init (ch);
1287
1288 GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
1289 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1290 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst);
1291 }
1292 else
1293 {
1294 ch = &mst->channel;
1295
1296 struct CountersResult res;
1297 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1298 res.header.size = htons (sizeof (res));
1299 res.result_code = htonl (GNUNET_OK);
1300 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1301
1302 GNUNET_SERVER_notification_context_add (nc, client);
1303 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1304 GNUNET_NO);
1305 }
1138 1306
1139 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1307 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140 "%p Master connected to channel %s.\n", 1308 "%p Client connected as master to channel %s.\n",
1141 mst, GNUNET_h2s (&ch->pub_key_hash)); 1309 mst, GNUNET_h2s (&ch->pub_key_hash));
1142 1310
1143 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst); 1311 struct ClientList *cl = GNUNET_new (struct ClientList);
1312 cl->client = client;
1313 GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
1144 1314
1145 GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
1146 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1147 GNUNET_SERVER_client_set_user_context (client, ch); 1315 GNUNET_SERVER_client_set_user_context (client, ch);
1148 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1316 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1149} 1317}
@@ -1158,37 +1326,82 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1158{ 1326{
1159 const struct SlaveJoinRequest *req 1327 const struct SlaveJoinRequest *req
1160 = (const struct SlaveJoinRequest *) msg; 1328 = (const struct SlaveJoinRequest *) msg;
1161 struct Slave *slv = GNUNET_new (struct Slave); 1329
1162 slv->priv_key = req->slave_key; 1330 struct GNUNET_CRYPTO_EddsaPublicKey slv_pub_key;
1163 slv->origin = req->origin; 1331 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1164 slv->relay_count = ntohl (req->relay_count); 1332
1165 if (0 < slv->relay_count) 1333 GNUNET_CRYPTO_eddsa_key_get_public (&req->slave_key, &slv_pub_key);
1334 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1335 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1336
1337 struct GNUNET_CONTAINER_MultiHashMap *
1338 ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1339 struct Slave *slv = NULL;
1340 struct Channel *ch;
1341
1342 if (NULL == ch_slv)
1166 { 1343 {
1167 const struct GNUNET_PeerIdentity *relays 1344 ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1168 = (const struct GNUNET_PeerIdentity *) &req[1]; 1345 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &pub_key_hash, ch_slv,
1169 slv->relays 1346 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1170 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity)); 1347 }
1171 uint32_t i; 1348 else
1172 for (i = 0; i < slv->relay_count; i++) 1349 {
1173 memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); 1350 slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash);
1174 } 1351 }
1175 1352
1176 struct Channel *ch = &slv->channel; 1353 if (NULL == slv)
1177 ch->client = client; 1354 {
1178 ch->is_master = GNUNET_NO; 1355 slv = GNUNET_new (struct Slave);
1179 ch->pub_key = req->channel_key; 1356 slv->priv_key = req->slave_key;
1180 GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), 1357 slv->origin = req->origin;
1181 &ch->pub_key_hash); 1358 slv->relay_count = ntohl (req->relay_count);
1182 channel_init (ch); 1359 if (0 < slv->relay_count)
1360 {
1361 const struct GNUNET_PeerIdentity *relays
1362 = (const struct GNUNET_PeerIdentity *) &req[1];
1363 slv->relays
1364 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
1365 uint32_t i;
1366 for (i = 0; i < slv->relay_count; i++)
1367 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1368 }
1369
1370 ch = &slv->channel;
1371 ch->is_master = GNUNET_NO;
1372 ch->pub_key = req->channel_key;
1373 ch->pub_key_hash = pub_key_hash;
1374 channel_init (ch);
1375
1376 GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv_pub_key_hash, ch,
1377 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1378 GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
1379 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1380 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv);
1381 }
1382 else
1383 {
1384 ch = &slv->channel;
1385
1386 struct CountersResult res;
1387 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1388 res.header.size = htons (sizeof (res));
1389 res.result_code = htonl (GNUNET_OK);
1390 res.max_message_id = GNUNET_htonll (ch->max_message_id);
1391
1392 GNUNET_SERVER_notification_context_add (nc, client);
1393 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1394 GNUNET_NO);
1395 }
1183 1396
1184 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1397 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185 "%p Slave connected to channel %s.\n", 1398 "%p Client connected as slave to channel %s.\n",
1186 slv, GNUNET_h2s (&ch->pub_key_hash)); 1399 slv, GNUNET_h2s (&ch->pub_key_hash));
1187 1400
1188 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv); 1401 struct ClientList *cl = GNUNET_new (struct ClientList);
1402 cl->client = client;
1403 GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
1189 1404
1190 GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
1191 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1192 GNUNET_SERVER_client_set_user_context (client, &slv->channel); 1405 GNUNET_SERVER_client_set_user_context (client, &slv->channel);
1193 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1406 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1194} 1407}
@@ -1202,14 +1415,15 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1202 * @param ch The channel struct for the client. 1415 * @param ch The channel struct for the client.
1203 */ 1416 */
1204static void 1417static void
1205send_message_ack (struct Channel *ch) 1418send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1206{ 1419{
1207 struct GNUNET_MessageHeader res; 1420 struct GNUNET_MessageHeader res;
1208 res.size = htons (sizeof (res)); 1421 res.size = htons (sizeof (res));
1209 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK); 1422 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1210 1423
1211 GNUNET_SERVER_notification_context_add (nc, ch->client); 1424 /* FIXME */
1212 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, GNUNET_NO); 1425 GNUNET_SERVER_notification_context_add (nc, client);
1426 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1213} 1427}
1214 1428
1215 1429
@@ -1236,12 +1450,13 @@ transmit_notify (void *cls, size_t *data_size, void *data)
1236 *data_size = tmit_msg->size; 1450 *data_size = tmit_msg->size;
1237 memcpy (data, &tmit_msg[1], *data_size); 1451 memcpy (data, &tmit_msg[1], *data_size);
1238 1452
1453 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
1454 if (NULL != tmit_msg->client)
1455 send_message_ack (ch, tmit_msg->client);
1456
1239 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); 1457 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
1240 GNUNET_free (tmit_msg); 1458 GNUNET_free (tmit_msg);
1241 1459
1242 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
1243 send_message_ack (ch);
1244
1245 if (0 == ch->tmit_task) 1460 if (0 == ch->tmit_task)
1246 { 1461 {
1247 if (NULL != ch->tmit_head) 1462 if (NULL != ch->tmit_head)
@@ -1251,7 +1466,7 @@ transmit_notify (void *cls, size_t *data_size, void *data)
1251 else if (ch->disconnected) 1466 else if (ch->disconnected)
1252 { 1467 {
1253 /* FIXME: handle partial message (when still in_transmit) */ 1468 /* FIXME: handle partial message (when still in_transmit) */
1254 client_cleanup (ch); 1469 cleanup_channel (ch);
1255 } 1470 }
1256 } 1471 }
1257 1472
@@ -1394,12 +1609,15 @@ slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1394 1609
1395 1610
1396static void 1611static void
1397queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg, 1612queue_message (struct Channel *ch,
1613 struct GNUNET_SERVER_Client *client,
1614 const struct GNUNET_MessageHeader *msg,
1398 uint16_t first_ptype, uint16_t last_ptype) 1615 uint16_t first_ptype, uint16_t last_ptype)
1399{ 1616{
1400 uint16_t size = ntohs (msg->size) - sizeof (*msg); 1617 uint16_t size = ntohs (msg->size) - sizeof (*msg);
1401 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size); 1618 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
1402 memcpy (&tmit_msg[1], &msg[1], size); 1619 memcpy (&tmit_msg[1], &msg[1], size);
1620 tmit_msg->client = client;
1403 tmit_msg->size = size; 1621 tmit_msg->size = size;
1404 tmit_msg->state = ch->tmit_state; 1622 tmit_msg->state = ch->tmit_state;
1405 1623
@@ -1414,7 +1632,7 @@ queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg,
1414 1632
1415 1633
1416static void 1634static void
1417transmit_error (struct Channel *ch) 1635transmit_error (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1418{ 1636{
1419 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; 1637 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1420 1638
@@ -1422,7 +1640,7 @@ transmit_error (struct Channel *ch)
1422 msg.size = ntohs (sizeof (msg)); 1640 msg.size = ntohs (sizeof (msg));
1423 msg.type = ntohs (type); 1641 msg.type = ntohs (type);
1424 1642
1425 queue_message (ch, &msg, type, type); 1643 queue_message (ch, client, &msg, type, type);
1426 transmit_message (ch); 1644 transmit_message (ch);
1427 1645
1428 /* FIXME: cleanup */ 1646 /* FIXME: cleanup */
@@ -1458,7 +1676,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1458 { 1676 {
1459 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch); 1677 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
1460 GNUNET_break (0); 1678 GNUNET_break (0);
1461 transmit_error (ch); 1679 transmit_error (ch, client);
1462 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1680 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1463 return; 1681 return;
1464 } 1682 }
@@ -1472,12 +1690,12 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1472 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1690 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1473 "%p Received invalid message part from client.\n", ch); 1691 "%p Received invalid message part from client.\n", ch);
1474 GNUNET_break (0); 1692 GNUNET_break (0);
1475 transmit_error (ch); 1693 transmit_error (ch, client);
1476 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1694 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1477 return; 1695 return;
1478 } 1696 }
1479 1697
1480 queue_message (ch, msg, first_ptype, last_ptype); 1698 queue_message (ch, client, msg, first_ptype, last_ptype);
1481 transmit_message (ch); 1699 transmit_message (ch);
1482 1700
1483 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1701 GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1581,6 +1799,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
1581 stats = GNUNET_STATISTICS_create ("psyc", cfg); 1799 stats = GNUNET_STATISTICS_create ("psyc", cfg);
1582 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1800 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1583 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1801 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1802 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1584 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1803 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1585 nc = GNUNET_SERVER_notification_context_create (server, 1); 1804 nc = GNUNET_SERVER_notification_context_create (server, 1);
1586 GNUNET_SERVER_add_handlers (server, handlers); 1805 GNUNET_SERVER_add_handlers (server, handlers);
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index f2d386548..ab7b35d40 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -227,6 +227,21 @@ struct OperationResult
227}; 227};
228 228
229 229
230struct MasterJoinRequest
231{
232 /**
233 * Types:
234 * - GNUNET_MESSAGE_TYPE_PSYC_MASTER_JOIN_REQUEST
235 */
236 struct GNUNET_MessageHeader header;
237 /**
238 * Public key of the joining slave.
239 */
240 struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
241
242 /* Followed by struct GNUNET_MessageHeader join_request */
243};
244
230GNUNET_NETWORK_STRUCT_END 245GNUNET_NETWORK_STRUCT_END
231 246
232#endif 247#endif
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 85f86ceaa..62f099166 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -126,12 +126,7 @@ struct GNUNET_PSYC_Channel
126 GNUNET_PSYC_MessageCallback hist_message_cb; 126 GNUNET_PSYC_MessageCallback hist_message_cb;
127 127
128 /** 128 /**
129 * Join handler callback. 129 * Closure for @a message_cb.
130 */
131 GNUNET_PSYC_JoinCallback join_cb;
132
133 /**
134 * Closure for @a message_cb and @a join_cb.
135 */ 130 */
136 void *cb_cls; 131 void *cb_cls;
137 132
@@ -200,6 +195,11 @@ struct GNUNET_PSYC_Master
200 struct GNUNET_PSYC_Channel ch; 195 struct GNUNET_PSYC_Channel ch;
201 196
202 GNUNET_PSYC_MasterStartCallback start_cb; 197 GNUNET_PSYC_MasterStartCallback start_cb;
198
199 /**
200 * Join handler callback.
201 */
202 GNUNET_PSYC_JoinCallback join_cb;
203}; 203};
204 204
205 205
@@ -908,6 +908,18 @@ handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
908} 908}
909 909
910 910
911static void
912handle_psyc_join_request (struct GNUNET_PSYC_Master *mst,
913 const struct MasterJoinRequest *req)
914{
915 // FIXME: extract join message from req[1]
916 const char *method_name = "_fixme";
917 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
918 mst->join_cb (mst->ch.cb_cls, &req->slave_key, method_name,
919 0, NULL, NULL, 0, jh);
920}
921
922
911/** 923/**
912 * Type of a function to call when we receive a message 924 * Type of a function to call when we receive a message
913 * from the service. 925 * from the service.
@@ -951,6 +963,9 @@ message_handler (void *cls,
951 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: 963 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
952 size_eq = sizeof (struct GNUNET_MessageHeader); 964 size_eq = sizeof (struct GNUNET_MessageHeader);
953 break; 965 break;
966 case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
967 size_min = sizeof (struct MasterJoinRequest);
968 break;
954 default: 969 default:
955 GNUNET_break_op (0); 970 GNUNET_break_op (0);
956 return; 971 return;
@@ -988,6 +1003,11 @@ message_handler (void *cls,
988 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: 1003 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
989 handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg); 1004 handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
990 break; 1005 break;
1006
1007 case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
1008 handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch,
1009 (const struct MasterJoinRequest *) msg);
1010 break;
991 } 1011 }
992 1012
993 if (NULL != ch->client) 1013 if (NULL != ch->client)
@@ -1186,8 +1206,8 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
1186 req->policy = policy; 1206 req->policy = policy;
1187 1207
1188 mst->start_cb = master_started_cb; 1208 mst->start_cb = master_started_cb;
1209 mst->join_cb = join_cb;
1189 ch->message_cb = message_cb; 1210 ch->message_cb = message_cb;
1190 ch->join_cb = join_cb;
1191 ch->cb_cls = cls; 1211 ch->cb_cls = cls;
1192 ch->cfg = cfg; 1212 ch->cfg = cfg;
1193 ch->is_master = GNUNET_YES; 1213 ch->is_master = GNUNET_YES;
@@ -1320,9 +1340,7 @@ GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
1320 * @param message_cb Function to invoke on message parts received from the 1340 * @param message_cb Function to invoke on message parts received from the
1321 * channel, typically at least contains method handlers for @e join and 1341 * channel, typically at least contains method handlers for @e join and
1322 * @e part. 1342 * @e part.
1323 * @param join_cb function invoked once we have joined with the current 1343 * @param slave_joined_cb Function invoked once we have joined the channel.
1324 * message ID of the channel
1325 * @param slave_joined_cb Function to invoke when a peer wants to join.
1326 * @param cls Closure for @a message_cb and @a slave_joined_cb. 1344 * @param cls Closure for @a message_cb and @a slave_joined_cb.
1327 * @param method_name Method name for the join request. 1345 * @param method_name Method name for the join request.
1328 * @param env Environment containing transient variables for the request, or NULL. 1346 * @param env Environment containing transient variables for the request, or NULL.
@@ -1339,7 +1357,6 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1339 uint32_t relay_count, 1357 uint32_t relay_count,
1340 const struct GNUNET_PeerIdentity *relays, 1358 const struct GNUNET_PeerIdentity *relays,
1341 GNUNET_PSYC_MessageCallback message_cb, 1359 GNUNET_PSYC_MessageCallback message_cb,
1342 GNUNET_PSYC_JoinCallback join_cb,
1343 GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, 1360 GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
1344 void *cls, 1361 void *cls,
1345 const char *method_name, 1362 const char *method_name,
@@ -1362,7 +1379,6 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1362 1379
1363 slv->join_cb = slave_joined_cb; 1380 slv->join_cb = slave_joined_cb;
1364 ch->message_cb = message_cb; 1381 ch->message_cb = message_cb;
1365 ch->join_cb = join_cb;
1366 ch->cb_cls = cls; 1382 ch->cb_cls = cls;
1367 1383
1368 ch->cfg = cfg; 1384 ch->cfg = cfg;
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index f58ecb7f6..cef8a5dcf 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -130,6 +130,7 @@ end_badly (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
130{ 130{
131 res = 1; 131 res = 1;
132 cleanup (); 132 cleanup ();
133 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test FAILED.\n");
133} 134}
134 135
135 136
@@ -144,6 +145,7 @@ end_normally (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
144{ 145{
145 res = 0; 146 res = 0;
146 cleanup (); 147 cleanup ();
148 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test PASSED.\n");
147} 149}
148 150
149 151
@@ -181,7 +183,7 @@ master_message (void *cls, uint64_t message_id, uint32_t flags,
181 183
182 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 184 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
183 "Master got message part of type %u and size %u " 185 "Master got message part of type %u and size %u "
184 "belonging to message ID %llu with flags %u\n", 186 "belonging to message ID %llu with flags %bu\n",
185 type, size, message_id, flags); 187 type, size, message_id, flags);
186 188
187 switch (test) 189 switch (test)
@@ -225,7 +227,7 @@ slave_message (void *cls, uint64_t message_id, uint32_t flags,
225 227
226 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 228 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
227 "Slave got message part of type %u and size %u " 229 "Slave got message part of type %u and size %u "
228 "belonging to message ID %llu with flags %u\n", 230 "belonging to message ID %llu with flags %bu\n",
229 type, size, message_id, flags); 231 type, size, message_id, flags);
230 232
231 switch (test) 233 switch (test)