aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-05-17 10:16:15 +0000
committerGabor X Toth <*@tg-x.net>2014-05-17 10:16:15 +0000
commit83c058a5ea11b6d7aa05cb71963c6063cb373603 (patch)
tree30921098e391d492cf52311983348232d41d9634 /src/psyc
parentd78b7d8a3f6cdaee244557667350da443ca76db5 (diff)
downloadgnunet-83c058a5ea11b6d7aa05cb71963c6063cb373603.tar.gz
gnunet-83c058a5ea11b6d7aa05cb71963c6063cb373603.zip
multicast, psyc: client connections, join requests
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c481
-rw-r--r--src/psyc/psyc.h15
-rw-r--r--src/psyc/psyc_api.c40
-rw-r--r--src/psyc/test_psyc.c6
4 files changed, 397 insertions, 145 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 70322adaa..765371d77 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -59,16 +59,22 @@ static struct GNUNET_PSYCSTORE_Handle *store;
59 59
60/** 60/**
61 * All connected masters. 61 * All connected masters.
62 * Channel's pub_key_hash -> struct Channel 62 * Channel's pub_key_hash -> struct Master
63 */ 63 */
64static struct GNUNET_CONTAINER_MultiHashMap *masters; 64static struct GNUNET_CONTAINER_MultiHashMap *masters;
65 65
66/** 66/**
67 * All connected slaves. 67 * All connected slaves.
68 * Channel's pub_key_hash -> struct Channel 68 * Channel's pub_key_hash -> struct Slave
69 */ 69 */
70static struct GNUNET_CONTAINER_MultiHashMap *slaves; 70static struct GNUNET_CONTAINER_MultiHashMap *slaves;
71 71
72/**
73 * Connected slaves per channel.
74 * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
75 */
76static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
77
72 78
73/** 79/**
74 * Message in the transmission queue. 80 * Message in the transmission queue.
@@ -78,6 +84,8 @@ struct TransmitMessage
78 struct TransmitMessage *prev; 84 struct TransmitMessage *prev;
79 struct TransmitMessage *next; 85 struct TransmitMessage *next;
80 86
87 struct GNUNET_SERVER_Client *client;
88
81 /** 89 /**
82 * ID assigned to the message. 90 * ID assigned to the message.
83 */ 91 */
@@ -164,11 +172,23 @@ struct FragmentQueue
164 172
165 173
166/** 174/**
175 * List of connected clients.
176 */
177struct ClientList
178{
179 struct ClientList *prev;
180 struct ClientList *next;
181 struct GNUNET_SERVER_Client *client;
182};
183
184
185/**
167 * Common part of the client context for both a channel master and slave. 186 * Common part of the client context for both a channel master and slave.
168 */ 187 */
169struct Channel 188struct Channel
170{ 189{
171 struct GNUNET_SERVER_Client *client; 190 struct ClientList *clients_head;
191 struct ClientList *clients_tail;
172 192
173 struct TransmitMessage *tmit_head; 193 struct TransmitMessage *tmit_head;
174 struct TransmitMessage *tmit_tail; 194 struct TransmitMessage *tmit_tail;
@@ -316,6 +336,16 @@ struct Slave
316 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; 336 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
317 337
318 /** 338 /**
339 * Public key of the slave.
340 */
341 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
342
343 /**
344 * Hash of @a pub_key.
345 */
346 struct GNUNET_HashCode pub_key_hash;
347
348 /**
319 * Handle for the multicast member. 349 * Handle for the multicast member.
320 */ 350 */
321 struct GNUNET_MULTICAST_Member *member; 351 struct GNUNET_MULTICAST_Member *member;
@@ -378,30 +408,62 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
378} 408}
379 409
380 410
411/**
412 * Clean up master data structures after a client disconnected.
413 */
381static void 414static void
382client_cleanup (struct Channel *ch) 415cleanup_master (struct Master *mst)
383{ 416{
384 /* FIXME: fragment_cache_clear */ 417 struct Channel *ch = &mst->channel;
385 418
386 if (ch->is_master) 419 if (NULL != mst->origin)
387 { 420 GNUNET_MULTICAST_origin_stop (mst->origin);
388 struct Master *mst = (struct Master *) ch; 421 GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
389 if (NULL != mst->origin) 422}
390 GNUNET_MULTICAST_origin_stop (mst->origin); 423
391 GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch); 424
392 } 425/**
393 else 426 * Clean up slave data structures after a client disconnected.
427 */
428static void
429cleanup_slave (struct Slave *slv)
430{
431 struct Channel *ch = &slv->channel;
432 struct GNUNET_CONTAINER_MultiHashMap *
433 ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
434 &ch->pub_key_hash);
435 GNUNET_assert (NULL != ch_slv);
436 GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv);
437
438 if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv))
394 { 439 {
395 struct Slave *slv = (struct Slave *) ch; 440 GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash,
396 if (NULL != slv->join_req) 441 ch_slv);
397 GNUNET_free (slv->join_req); 442 GNUNET_CONTAINER_multihashmap_destroy (ch_slv);
398 if (NULL != slv->relays)
399 GNUNET_free (slv->relays);
400 if (NULL != slv->member)
401 GNUNET_MULTICAST_member_part (slv->member);
402 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
403 } 443 }
444 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv);
445
446 if (NULL != slv->join_req)
447 GNUNET_free (slv->join_req);
448 if (NULL != slv->relays)
449 GNUNET_free (slv->relays);
450 if (NULL != slv->member)
451 GNUNET_MULTICAST_member_part (slv->member);
452 GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
453}
404 454
455
456/**
457 * Clean up channel data structures after a client disconnected.
458 */
459static void
460cleanup_channel (struct Channel *ch)
461{
462 /* FIXME: fragment_cache_clear */
463
464 (GNUNET_YES == ch->is_master)
465 ? cleanup_master ((struct Master *) ch)
466 : cleanup_slave ((struct Slave *) ch);
405 GNUNET_free (ch); 467 GNUNET_free (ch);
406} 468}
407 469
@@ -421,7 +483,10 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
421 483
422 struct Channel *ch 484 struct Channel *ch
423 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 485 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch); 486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
487 "%p Client (%s) disconnected from channel %s\n",
488 ch, (GNUNET_YES == ch->is_master) ? "master" : "slave",
489 GNUNET_h2s (&ch->pub_key_hash));
425 490
426 if (NULL == ch) 491 if (NULL == ch)
427 { 492 {
@@ -431,29 +496,112 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
431 return; 496 return;
432 } 497 }
433 498
434 ch->disconnected = GNUNET_YES; 499 struct ClientList *cl = ch->clients_head;
500 while (NULL != cl)
501 {
502 if (cl->client == client)
503 {
504 GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl);
505 GNUNET_free (cl);
506 break;
507 }
508 cl = cl->next;
509 }
510
511 if (NULL == ch->clients_head)
512 { /* Last client disconnected. */
513 if (NULL != ch->tmit_head)
514 { /* Send pending messages to multicast before cleanup. */
515 transmit_message (ch);
516 }
517 else
518 {
519 cleanup_channel (ch);
520 }
521 }
522}
523
435 524
436 /* Send pending messages to multicast before cleanup. */ 525/**
437 if (NULL != ch->tmit_head) 526 * Send message to all clients connected to the channel.
527 */
528static void
529msg_to_clients (const struct Channel *ch,
530 const struct GNUNET_MessageHeader *msg)
531{
532 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
533 "%p Sending message to clients.\n", ch);
534
535 struct ClientList *cl = ch->clients_head;
536 while (NULL != cl)
438 { 537 {
439 transmit_message (ch); 538 GNUNET_SERVER_notification_context_add (nc, cl->client);
539 GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO);
540 cl = cl->next;
541 }
542}
543
544
545/**
546 * Closure for join_mem_test_cb()
547 */
548struct JoinMemTestCls
549{
550 struct Channel *ch;
551 struct GNUNET_MULTICAST_JoinHandle *jh;
552 struct MasterJoinRequest *master_join_req;
553};
554
555
556/**
557 * Membership test result callback used for join requests.m
558 */
559static void
560join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
561{
562 struct JoinMemTestCls *jcls = cls;
563
564 if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master)
565 { /* Pass on join request to client if this is a master channel */
566 msg_to_clients (jcls->ch,
567 (struct GNUNET_MessageHeader *) jcls->master_join_req);
440 } 568 }
441 else 569 else
442 { 570 {
443 client_cleanup (ch); 571 // FIXME: relays
572 GNUNET_MULTICAST_join_decision(jcls->jh, result, 0, NULL, NULL);
444 } 573 }
574 GNUNET_free (jcls->master_join_req);
575 GNUNET_free (jcls);
445} 576}
446 577
447 578
448/** 579/**
449 * Master receives a join request from a slave. 580 * Incoming join request from multicast.
450 */ 581 */
451static void 582static void
452join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 583join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
453 const struct GNUNET_MessageHeader *join_req, 584 const struct GNUNET_MessageHeader *join_req,
454 struct GNUNET_MULTICAST_JoinHandle *jh) 585 struct GNUNET_MULTICAST_JoinHandle *jh)
455{ 586{
456 587 struct Channel *ch = cls;
588 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch);
589
590 uint16_t join_req_size = (NULL != join_req) ? ntohs (join_req->size) : 0;
591 struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + join_req_size);
592 req->header.size = htons (sizeof (*req) + join_req_size);
593 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
594 req->slave_key = *slave_key;
595 memcpy (&req[1], join_req, join_req_size);
596
597 struct JoinMemTestCls *jcls = GNUNET_malloc (sizeof (*jcls));
598 jcls->ch = ch;
599 jcls->jh = jh;
600 jcls->master_join_req = req;
601
602 GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key,
603 ch->max_message_id, 0,
604 &join_mem_test_cb, jcls);
457} 605}
458 606
459 607
@@ -474,6 +622,7 @@ replay_fragment_cb (void *cls,
474 struct GNUNET_MULTICAST_ReplayHandle *rh) 622 struct GNUNET_MULTICAST_ReplayHandle *rh)
475 623
476{ 624{
625
477} 626}
478 627
479 628
@@ -497,35 +646,6 @@ fragment_store_result (void *cls, int64_t result, const char *err_msg)
497} 646}
498 647
499 648
500static void
501message_to_client (struct Channel *ch,
502 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
503{
504 uint16_t size = ntohs (mmsg->header.size);
505 struct GNUNET_PSYC_MessageHeader *pmsg;
506 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
507
508 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
509 "%p Sending message to client. "
510 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
511 ch, GNUNET_ntohll (mmsg->fragment_id),
512 GNUNET_ntohll (mmsg->message_id));
513
514 pmsg = GNUNET_malloc (psize);
515 pmsg->header.size = htons (psize);
516 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
517 pmsg->message_id = mmsg->message_id;
518
519 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
520
521 GNUNET_SERVER_notification_context_add (nc, ch->client);
522 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
523 (const struct GNUNET_MessageHeader *) pmsg,
524 GNUNET_NO);
525 GNUNET_free (pmsg);
526}
527
528
529/** 649/**
530 * Convert an uint64_t in network byte order to a HashCode 650 * Convert an uint64_t in network byte order to a HashCode
531 * that can be used as key in a MultiHashMap 651 * that can be used as key in a MultiHashMap
@@ -564,6 +684,34 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
564 684
565 685
566/** 686/**
687 * Send multicast message to all clients connected to the channel.
688 */
689static void
690mmsg_to_clients (struct Channel *ch,
691 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
692{
693 uint16_t size = ntohs (mmsg->header.size);
694 struct GNUNET_PSYC_MessageHeader *pmsg;
695 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
696
697 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
698 "%p Sending message to client. "
699 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
700 ch, GNUNET_ntohll (mmsg->fragment_id),
701 GNUNET_ntohll (mmsg->message_id));
702
703 pmsg = GNUNET_malloc (psize);
704 pmsg->header.size = htons (psize);
705 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
706 pmsg->message_id = mmsg->message_id;
707
708 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
709 msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
710 GNUNET_free (pmsg);
711}
712
713
714/**
567 * Insert a multicast message fragment into the queue belonging to the message. 715 * Insert a multicast message fragment into the queue belonging to the message.
568 * 716 *
569 * @param ch Channel. 717 * @param ch Channel.
@@ -752,7 +900,7 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id,
752 { 900 {
753 if (GNUNET_NO == drop) 901 if (GNUNET_NO == drop)
754 { 902 {
755 message_to_client (ch, cache_entry->mmsg); 903 mmsg_to_clients (ch, cache_entry->mmsg);
756 } 904 }
757 if (cache_entry->ref_count <= 1) 905 if (cache_entry->ref_count <= 1)
758 { 906 {
@@ -997,11 +1145,7 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
997 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); 1145 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
998 1146
999 memcpy (&pmsg[1], &req[1], size - sizeof (*req)); 1147 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
1000 1148 msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
1001 GNUNET_SERVER_notification_context_add (nc, ch->client);
1002 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
1003 (const struct GNUNET_MessageHeader *) pmsg,
1004 GNUNET_NO);
1005 GNUNET_free (pmsg); 1149 GNUNET_free (pmsg);
1006 break; 1150 break;
1007 } 1151 }
@@ -1025,11 +1169,11 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1025 struct Master *mst = cls; 1169 struct Master *mst = cls;
1026 struct Channel *ch = &mst->channel; 1170 struct Channel *ch = &mst->channel;
1027 1171
1028 struct CountersResult *res = GNUNET_malloc (sizeof (*res)); 1172 struct CountersResult res;
1029 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1173 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1030 res->header.size = htons (sizeof (*res)); 1174 res.header.size = htons (sizeof (res));
1031 res->result_code = htonl (result); 1175 res.result_code = htonl (result);
1032 res->max_message_id = GNUNET_htonll (max_message_id); 1176 res.max_message_id = GNUNET_htonll (max_message_id);
1033 1177
1034 if (GNUNET_OK == result || GNUNET_NO == result) 1178 if (GNUNET_OK == result || GNUNET_NO == result)
1035 { 1179 {
@@ -1053,10 +1197,7 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1053 ch, result, GNUNET_h2s (&ch->pub_key_hash)); 1197 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1054 } 1198 }
1055 1199
1056 GNUNET_SERVER_notification_context_add (nc, ch->client); 1200 msg_to_clients (ch, &res.header);
1057 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
1058 GNUNET_NO);
1059 GNUNET_free (res);
1060} 1201}
1061 1202
1062 1203
@@ -1071,11 +1212,11 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1071 struct Slave *slv = cls; 1212 struct Slave *slv = cls;
1072 struct Channel *ch = &slv->channel; 1213 struct Channel *ch = &slv->channel;
1073 1214
1074 struct CountersResult *res = GNUNET_malloc (sizeof (*res)); 1215 struct CountersResult res;
1075 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1216 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1076 res->header.size = htons (sizeof (*res)); 1217 res.header.size = htons (sizeof (res));
1077 res->result_code = htonl (result); 1218 res.result_code = htonl (result);
1078 res->max_message_id = GNUNET_htonll (max_message_id); 1219 res.max_message_id = GNUNET_htonll (max_message_id);
1079 1220
1080 if (GNUNET_OK == result || GNUNET_NO == result) 1221 if (GNUNET_OK == result || GNUNET_NO == result)
1081 { 1222 {
@@ -1099,10 +1240,7 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
1099 ch, result, GNUNET_h2s (&ch->pub_key_hash)); 1240 ch, result, GNUNET_h2s (&ch->pub_key_hash));
1100 } 1241 }
1101 1242
1102 GNUNET_SERVER_notification_context_add (nc, ch->client); 1243 msg_to_clients (ch, &res.header);
1103 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
1104 GNUNET_NO);
1105 GNUNET_free (res);
1106} 1244}
1107 1245
1108 1246
@@ -1125,25 +1263,55 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1125 const struct MasterStartRequest *req 1263 const struct MasterStartRequest *req
1126 = (const struct MasterStartRequest *) msg; 1264 = (const struct MasterStartRequest *) msg;
1127 1265
1128 struct Master *mst = GNUNET_new (struct Master); 1266 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
1129 mst->policy = ntohl (req->policy); 1267 struct GNUNET_HashCode pub_key_hash;
1130 mst->priv_key = req->channel_key;
1131 1268
1132 struct Channel *ch = &mst->channel; 1269 GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
1133 ch->client = client; 1270 GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
1134 ch->is_master = GNUNET_YES; 1271
1135 GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key); 1272 struct Master *
1136 GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash); 1273 mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
1137 channel_init (ch); 1274 struct Channel *ch;
1275
1276 if (NULL == mst)
1277 {
1278 mst = GNUNET_new (struct Master);
1279 mst->policy = ntohl (req->policy);
1280 mst->priv_key = req->channel_key;
1281
1282 ch = &mst->channel;
1283 ch->is_master = GNUNET_YES;
1284 ch->pub_key = pub_key;
1285 ch->pub_key_hash = pub_key_hash;
1286 channel_init (ch);
1287
1288 GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
1289 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1290 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst);
1291 }
1292 else
1293 {
1294 ch = &mst->channel;
1295
1296 struct CountersResult res;
1297 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1298 res.header.size = htons (sizeof (res));
1299 res.result_code = htonl (GNUNET_OK);
1300 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1301
1302 GNUNET_SERVER_notification_context_add (nc, client);
1303 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1304 GNUNET_NO);
1305 }
1138 1306
1139 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1307 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140 "%p Master connected to channel %s.\n", 1308 "%p Client connected as master to channel %s.\n",
1141 mst, GNUNET_h2s (&ch->pub_key_hash)); 1309 mst, GNUNET_h2s (&ch->pub_key_hash));
1142 1310
1143 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst); 1311 struct ClientList *cl = GNUNET_new (struct ClientList);
1312 cl->client = client;
1313 GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
1144 1314
1145 GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
1146 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1147 GNUNET_SERVER_client_set_user_context (client, ch); 1315 GNUNET_SERVER_client_set_user_context (client, ch);
1148 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1316 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1149} 1317}
@@ -1158,37 +1326,82 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1158{ 1326{
1159 const struct SlaveJoinRequest *req 1327 const struct SlaveJoinRequest *req
1160 = (const struct SlaveJoinRequest *) msg; 1328 = (const struct SlaveJoinRequest *) msg;
1161 struct Slave *slv = GNUNET_new (struct Slave); 1329
1162 slv->priv_key = req->slave_key; 1330 struct GNUNET_CRYPTO_EddsaPublicKey slv_pub_key;
1163 slv->origin = req->origin; 1331 struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
1164 slv->relay_count = ntohl (req->relay_count); 1332
1165 if (0 < slv->relay_count) 1333 GNUNET_CRYPTO_eddsa_key_get_public (&req->slave_key, &slv_pub_key);
1334 GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
1335 GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
1336
1337 struct GNUNET_CONTAINER_MultiHashMap *
1338 ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
1339 struct Slave *slv = NULL;
1340 struct Channel *ch;
1341
1342 if (NULL == ch_slv)
1166 { 1343 {
1167 const struct GNUNET_PeerIdentity *relays 1344 ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1168 = (const struct GNUNET_PeerIdentity *) &req[1]; 1345 GNUNET_CONTAINER_multihashmap_put (channel_slaves, &pub_key_hash, ch_slv,
1169 slv->relays 1346 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1170 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity)); 1347 }
1171 uint32_t i; 1348 else
1172 for (i = 0; i < slv->relay_count; i++) 1349 {
1173 memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); 1350 slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash);
1174 } 1351 }
1175 1352
1176 struct Channel *ch = &slv->channel; 1353 if (NULL == slv)
1177 ch->client = client; 1354 {
1178 ch->is_master = GNUNET_NO; 1355 slv = GNUNET_new (struct Slave);
1179 ch->pub_key = req->channel_key; 1356 slv->priv_key = req->slave_key;
1180 GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), 1357 slv->origin = req->origin;
1181 &ch->pub_key_hash); 1358 slv->relay_count = ntohl (req->relay_count);
1182 channel_init (ch); 1359 if (0 < slv->relay_count)
1360 {
1361 const struct GNUNET_PeerIdentity *relays
1362 = (const struct GNUNET_PeerIdentity *) &req[1];
1363 slv->relays
1364 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
1365 uint32_t i;
1366 for (i = 0; i < slv->relay_count; i++)
1367 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
1368 }
1369
1370 ch = &slv->channel;
1371 ch->is_master = GNUNET_NO;
1372 ch->pub_key = req->channel_key;
1373 ch->pub_key_hash = pub_key_hash;
1374 channel_init (ch);
1375
1376 GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv_pub_key_hash, ch,
1377 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1378 GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
1379 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1380 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv);
1381 }
1382 else
1383 {
1384 ch = &slv->channel;
1385
1386 struct CountersResult res;
1387 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1388 res.header.size = htons (sizeof (res));
1389 res.result_code = htonl (GNUNET_OK);
1390 res.max_message_id = GNUNET_htonll (ch->max_message_id);
1391
1392 GNUNET_SERVER_notification_context_add (nc, client);
1393 GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
1394 GNUNET_NO);
1395 }
1183 1396
1184 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1397 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185 "%p Slave connected to channel %s.\n", 1398 "%p Client connected as slave to channel %s.\n",
1186 slv, GNUNET_h2s (&ch->pub_key_hash)); 1399 slv, GNUNET_h2s (&ch->pub_key_hash));
1187 1400
1188 GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv); 1401 struct ClientList *cl = GNUNET_new (struct ClientList);
1402 cl->client = client;
1403 GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
1189 1404
1190 GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
1191 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1192 GNUNET_SERVER_client_set_user_context (client, &slv->channel); 1405 GNUNET_SERVER_client_set_user_context (client, &slv->channel);
1193 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1406 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1194} 1407}
@@ -1202,14 +1415,15 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1202 * @param ch The channel struct for the client. 1415 * @param ch The channel struct for the client.
1203 */ 1416 */
1204static void 1417static void
1205send_message_ack (struct Channel *ch) 1418send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1206{ 1419{
1207 struct GNUNET_MessageHeader res; 1420 struct GNUNET_MessageHeader res;
1208 res.size = htons (sizeof (res)); 1421 res.size = htons (sizeof (res));
1209 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK); 1422 res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
1210 1423
1211 GNUNET_SERVER_notification_context_add (nc, ch->client); 1424 /* FIXME */
1212 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, GNUNET_NO); 1425 GNUNET_SERVER_notification_context_add (nc, client);
1426 GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
1213} 1427}
1214 1428
1215 1429
@@ -1236,12 +1450,13 @@ transmit_notify (void *cls, size_t *data_size, void *data)
1236 *data_size = tmit_msg->size; 1450 *data_size = tmit_msg->size;
1237 memcpy (data, &tmit_msg[1], *data_size); 1451 memcpy (data, &tmit_msg[1], *data_size);
1238 1452
1453 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
1454 if (NULL != tmit_msg->client)
1455 send_message_ack (ch, tmit_msg->client);
1456
1239 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); 1457 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
1240 GNUNET_free (tmit_msg); 1458 GNUNET_free (tmit_msg);
1241 1459
1242 int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
1243 send_message_ack (ch);
1244
1245 if (0 == ch->tmit_task) 1460 if (0 == ch->tmit_task)
1246 { 1461 {
1247 if (NULL != ch->tmit_head) 1462 if (NULL != ch->tmit_head)
@@ -1251,7 +1466,7 @@ transmit_notify (void *cls, size_t *data_size, void *data)
1251 else if (ch->disconnected) 1466 else if (ch->disconnected)
1252 { 1467 {
1253 /* FIXME: handle partial message (when still in_transmit) */ 1468 /* FIXME: handle partial message (when still in_transmit) */
1254 client_cleanup (ch); 1469 cleanup_channel (ch);
1255 } 1470 }
1256 } 1471 }
1257 1472
@@ -1394,12 +1609,15 @@ slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
1394 1609
1395 1610
1396static void 1611static void
1397queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg, 1612queue_message (struct Channel *ch,
1613 struct GNUNET_SERVER_Client *client,
1614 const struct GNUNET_MessageHeader *msg,
1398 uint16_t first_ptype, uint16_t last_ptype) 1615 uint16_t first_ptype, uint16_t last_ptype)
1399{ 1616{
1400 uint16_t size = ntohs (msg->size) - sizeof (*msg); 1617 uint16_t size = ntohs (msg->size) - sizeof (*msg);
1401 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size); 1618 struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
1402 memcpy (&tmit_msg[1], &msg[1], size); 1619 memcpy (&tmit_msg[1], &msg[1], size);
1620 tmit_msg->client = client;
1403 tmit_msg->size = size; 1621 tmit_msg->size = size;
1404 tmit_msg->state = ch->tmit_state; 1622 tmit_msg->state = ch->tmit_state;
1405 1623
@@ -1414,7 +1632,7 @@ queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg,
1414 1632
1415 1633
1416static void 1634static void
1417transmit_error (struct Channel *ch) 1635transmit_error (struct Channel *ch, struct GNUNET_SERVER_Client *client)
1418{ 1636{
1419 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; 1637 uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
1420 1638
@@ -1422,7 +1640,7 @@ transmit_error (struct Channel *ch)
1422 msg.size = ntohs (sizeof (msg)); 1640 msg.size = ntohs (sizeof (msg));
1423 msg.type = ntohs (type); 1641 msg.type = ntohs (type);
1424 1642
1425 queue_message (ch, &msg, type, type); 1643 queue_message (ch, client, &msg, type, type);
1426 transmit_message (ch); 1644 transmit_message (ch);
1427 1645
1428 /* FIXME: cleanup */ 1646 /* FIXME: cleanup */
@@ -1458,7 +1676,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1458 { 1676 {
1459 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch); 1677 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
1460 GNUNET_break (0); 1678 GNUNET_break (0);
1461 transmit_error (ch); 1679 transmit_error (ch, client);
1462 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1680 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1463 return; 1681 return;
1464 } 1682 }
@@ -1472,12 +1690,12 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1472 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1690 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1473 "%p Received invalid message part from client.\n", ch); 1691 "%p Received invalid message part from client.\n", ch);
1474 GNUNET_break (0); 1692 GNUNET_break (0);
1475 transmit_error (ch); 1693 transmit_error (ch, client);
1476 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1694 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1477 return; 1695 return;
1478 } 1696 }
1479 1697
1480 queue_message (ch, msg, first_ptype, last_ptype); 1698 queue_message (ch, client, msg, first_ptype, last_ptype);
1481 transmit_message (ch); 1699 transmit_message (ch);
1482 1700
1483 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1701 GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1581,6 +1799,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
1581 stats = GNUNET_STATISTICS_create ("psyc", cfg); 1799 stats = GNUNET_STATISTICS_create ("psyc", cfg);
1582 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1800 masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1583 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1801 slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1802 channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
1584 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1803 recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1585 nc = GNUNET_SERVER_notification_context_create (server, 1); 1804 nc = GNUNET_SERVER_notification_context_create (server, 1);
1586 GNUNET_SERVER_add_handlers (server, handlers); 1805 GNUNET_SERVER_add_handlers (server, handlers);
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index f2d386548..ab7b35d40 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -227,6 +227,21 @@ struct OperationResult
227}; 227};
228 228
229 229
230struct MasterJoinRequest
231{
232 /**
233 * Types:
234 * - GNUNET_MESSAGE_TYPE_PSYC_MASTER_JOIN_REQUEST
235 */
236 struct GNUNET_MessageHeader header;
237 /**
238 * Public key of the joining slave.
239 */
240 struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
241
242 /* Followed by struct GNUNET_MessageHeader join_request */
243};
244
230GNUNET_NETWORK_STRUCT_END 245GNUNET_NETWORK_STRUCT_END
231 246
232#endif 247#endif
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 85f86ceaa..62f099166 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -126,12 +126,7 @@ struct GNUNET_PSYC_Channel
126 GNUNET_PSYC_MessageCallback hist_message_cb; 126 GNUNET_PSYC_MessageCallback hist_message_cb;
127 127
128 /** 128 /**
129 * Join handler callback. 129 * Closure for @a message_cb.
130 */
131 GNUNET_PSYC_JoinCallback join_cb;
132
133 /**
134 * Closure for @a message_cb and @a join_cb.
135 */ 130 */
136 void *cb_cls; 131 void *cb_cls;
137 132
@@ -200,6 +195,11 @@ struct GNUNET_PSYC_Master
200 struct GNUNET_PSYC_Channel ch; 195 struct GNUNET_PSYC_Channel ch;
201 196
202 GNUNET_PSYC_MasterStartCallback start_cb; 197 GNUNET_PSYC_MasterStartCallback start_cb;
198
199 /**
200 * Join handler callback.
201 */
202 GNUNET_PSYC_JoinCallback join_cb;
203}; 203};
204 204
205 205
@@ -908,6 +908,18 @@ handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
908} 908}
909 909
910 910
911static void
912handle_psyc_join_request (struct GNUNET_PSYC_Master *mst,
913 const struct MasterJoinRequest *req)
914{
915 // FIXME: extract join message from req[1]
916 const char *method_name = "_fixme";
917 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
918 mst->join_cb (mst->ch.cb_cls, &req->slave_key, method_name,
919 0, NULL, NULL, 0, jh);
920}
921
922
911/** 923/**
912 * Type of a function to call when we receive a message 924 * Type of a function to call when we receive a message
913 * from the service. 925 * from the service.
@@ -951,6 +963,9 @@ message_handler (void *cls,
951 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: 963 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
952 size_eq = sizeof (struct GNUNET_MessageHeader); 964 size_eq = sizeof (struct GNUNET_MessageHeader);
953 break; 965 break;
966 case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
967 size_min = sizeof (struct MasterJoinRequest);
968 break;
954 default: 969 default:
955 GNUNET_break_op (0); 970 GNUNET_break_op (0);
956 return; 971 return;
@@ -988,6 +1003,11 @@ message_handler (void *cls,
988 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: 1003 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
989 handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg); 1004 handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
990 break; 1005 break;
1006
1007 case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
1008 handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch,
1009 (const struct MasterJoinRequest *) msg);
1010 break;
991 } 1011 }
992 1012
993 if (NULL != ch->client) 1013 if (NULL != ch->client)
@@ -1186,8 +1206,8 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
1186 req->policy = policy; 1206 req->policy = policy;
1187 1207
1188 mst->start_cb = master_started_cb; 1208 mst->start_cb = master_started_cb;
1209 mst->join_cb = join_cb;
1189 ch->message_cb = message_cb; 1210 ch->message_cb = message_cb;
1190 ch->join_cb = join_cb;
1191 ch->cb_cls = cls; 1211 ch->cb_cls = cls;
1192 ch->cfg = cfg; 1212 ch->cfg = cfg;
1193 ch->is_master = GNUNET_YES; 1213 ch->is_master = GNUNET_YES;
@@ -1320,9 +1340,7 @@ GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
1320 * @param message_cb Function to invoke on message parts received from the 1340 * @param message_cb Function to invoke on message parts received from the
1321 * channel, typically at least contains method handlers for @e join and 1341 * channel, typically at least contains method handlers for @e join and
1322 * @e part. 1342 * @e part.
1323 * @param join_cb function invoked once we have joined with the current 1343 * @param slave_joined_cb Function invoked once we have joined the channel.
1324 * message ID of the channel
1325 * @param slave_joined_cb Function to invoke when a peer wants to join.
1326 * @param cls Closure for @a message_cb and @a slave_joined_cb. 1344 * @param cls Closure for @a message_cb and @a slave_joined_cb.
1327 * @param method_name Method name for the join request. 1345 * @param method_name Method name for the join request.
1328 * @param env Environment containing transient variables for the request, or NULL. 1346 * @param env Environment containing transient variables for the request, or NULL.
@@ -1339,7 +1357,6 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1339 uint32_t relay_count, 1357 uint32_t relay_count,
1340 const struct GNUNET_PeerIdentity *relays, 1358 const struct GNUNET_PeerIdentity *relays,
1341 GNUNET_PSYC_MessageCallback message_cb, 1359 GNUNET_PSYC_MessageCallback message_cb,
1342 GNUNET_PSYC_JoinCallback join_cb,
1343 GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, 1360 GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
1344 void *cls, 1361 void *cls,
1345 const char *method_name, 1362 const char *method_name,
@@ -1362,7 +1379,6 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1362 1379
1363 slv->join_cb = slave_joined_cb; 1380 slv->join_cb = slave_joined_cb;
1364 ch->message_cb = message_cb; 1381 ch->message_cb = message_cb;
1365 ch->join_cb = join_cb;
1366 ch->cb_cls = cls; 1382 ch->cb_cls = cls;
1367 1383
1368 ch->cfg = cfg; 1384 ch->cfg = cfg;
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index f58ecb7f6..cef8a5dcf 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -130,6 +130,7 @@ end_badly (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
130{ 130{
131 res = 1; 131 res = 1;
132 cleanup (); 132 cleanup ();
133 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test FAILED.\n");
133} 134}
134 135
135 136
@@ -144,6 +145,7 @@ end_normally (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
144{ 145{
145 res = 0; 146 res = 0;
146 cleanup (); 147 cleanup ();
148 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test PASSED.\n");
147} 149}
148 150
149 151
@@ -181,7 +183,7 @@ master_message (void *cls, uint64_t message_id, uint32_t flags,
181 183
182 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 184 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
183 "Master got message part of type %u and size %u " 185 "Master got message part of type %u and size %u "
184 "belonging to message ID %llu with flags %u\n", 186 "belonging to message ID %llu with flags %bu\n",
185 type, size, message_id, flags); 187 type, size, message_id, flags);
186 188
187 switch (test) 189 switch (test)
@@ -225,7 +227,7 @@ slave_message (void *cls, uint64_t message_id, uint32_t flags,
225 227
226 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 228 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
227 "Slave got message part of type %u and size %u " 229 "Slave got message part of type %u and size %u "
228 "belonging to message ID %llu with flags %u\n", 230 "belonging to message ID %llu with flags %bu\n",
229 type, size, message_id, flags); 231 type, size, message_id, flags);
230 232
231 switch (test) 233 switch (test)