aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-03-06 23:46:45 +0000
committerGabor X Toth <*@tg-x.net>2014-03-06 23:46:45 +0000
commit8a0b8a4604526e5f832c4971f9c3b1b48d79bea4 (patch)
treedfd18a61272a18381fe9ce9b09849a965480a303 /src/psyc
parenta21beab58c1d2abc747359a98326f19aaad4e8cd (diff)
downloadgnunet-8a0b8a4604526e5f832c4971f9c3b1b48d79bea4.tar.gz
gnunet-8a0b8a4604526e5f832c4971f9c3b1b48d79bea4.zip
PSYC: implement slave to master requests, tests, fixes, reorg
Multicast lib: handle member to origin requests. Keep track of members and origins and call their callbacks when necessary.
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/Makefile.am6
-rw-r--r--src/psyc/gnunet-service-psyc.c341
-rw-r--r--src/psyc/psyc.h11
-rw-r--r--src/psyc/psyc_api.c370
-rw-r--r--src/psyc/psyc_common.c100
-rw-r--r--src/psyc/test_psyc.c231
6 files changed, 729 insertions, 330 deletions
diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am
index 7860b3995..162b42b2b 100644
--- a/src/psyc/Makefile.am
+++ b/src/psyc/Makefile.am
@@ -21,7 +21,7 @@ lib_LTLIBRARIES = libgnunetpsyc.la
21 21
22libgnunetpsyc_la_SOURCES = \ 22libgnunetpsyc_la_SOURCES = \
23 psyc_api.c \ 23 psyc_api.c \
24 psyc.h 24 psyc_common.c
25libgnunetpsyc_la_LIBADD = \ 25libgnunetpsyc_la_LIBADD = \
26 $(top_builddir)/src/util/libgnunetutil.la \ 26 $(top_builddir)/src/util/libgnunetutil.la \
27 $(top_builddir)/src/env/libgnunetenv.la \ 27 $(top_builddir)/src/env/libgnunetenv.la \
@@ -39,7 +39,8 @@ libexec_PROGRAMS = \
39 gnunet-service-psyc 39 gnunet-service-psyc
40 40
41gnunet_service_psyc_SOURCES = \ 41gnunet_service_psyc_SOURCES = \
42 gnunet-service-psyc.c 42 gnunet-service-psyc.c \
43 psyc_common.c
43gnunet_service_psyc_LDADD = \ 44gnunet_service_psyc_LDADD = \
44 $(top_builddir)/src/statistics/libgnunetstatistics.la \ 45 $(top_builddir)/src/statistics/libgnunetstatistics.la \
45 $(top_builddir)/src/util/libgnunetutil.la \ 46 $(top_builddir)/src/util/libgnunetutil.la \
@@ -51,6 +52,7 @@ gnunet_service_psyc_DEPENDENCIES = \
51 $(top_builddir)/src/util/libgnunetutil.la \ 52 $(top_builddir)/src/util/libgnunetutil.la \
52 $(top_builddir)/src/multicast/libgnunetmulticast.la \ 53 $(top_builddir)/src/multicast/libgnunetmulticast.la \
53 $(top_builddir)/src/psycstore/libgnunetpsycstore.la 54 $(top_builddir)/src/psycstore/libgnunetpsycstore.la
55gnunet_service_psyc_CFLAGS = $(AM_CFLAGS)
54 56
55 57
56if HAVE_TESTING 58if HAVE_TESTING
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index e5de7dcda..dcb5031f1 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -56,6 +56,7 @@ static struct GNUNET_SERVER_NotificationContext *nc;
56static struct GNUNET_PSYCSTORE_Handle *store; 56static struct GNUNET_PSYCSTORE_Handle *store;
57 57
58/** 58/**
59 * All connected masters and slaves.
59 * Channel's pub_key_hash -> struct Channel 60 * Channel's pub_key_hash -> struct Channel
60 */ 61 */
61static struct GNUNET_CONTAINER_MultiHashMap *clients; 62static struct GNUNET_CONTAINER_MultiHashMap *clients;
@@ -105,6 +106,15 @@ struct Channel
105 106
106 uint8_t in_transmit; 107 uint8_t in_transmit;
107 uint8_t is_master; 108 uint8_t is_master;
109
110 /**
111 * Ready to receive messages from client.
112 */
113 uint8_t ready;
114
115 /**
116 * Client disconnected.
117 */
108 uint8_t disconnected; 118 uint8_t disconnected;
109}; 119};
110 120
@@ -116,7 +126,6 @@ struct Master
116 struct Channel channel; 126 struct Channel channel;
117 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; 127 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
118 struct GNUNET_CRYPTO_EddsaPublicKey pub_key; 128 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
119 struct GNUNET_HashCode pub_key_hash;
120 129
121 struct GNUNET_MULTICAST_Origin *origin; 130 struct GNUNET_MULTICAST_Origin *origin;
122 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; 131 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
@@ -144,6 +153,8 @@ struct Master
144 * @see enum GNUNET_PSYC_Policy 153 * @see enum GNUNET_PSYC_Policy
145 */ 154 */
146 uint32_t policy; 155 uint32_t policy;
156
157 struct GNUNET_HashCode pub_key_hash;
147}; 158};
148 159
149 160
@@ -155,24 +166,26 @@ struct Slave
155 struct Channel channel; 166 struct Channel channel;
156 struct GNUNET_CRYPTO_EddsaPrivateKey slave_key; 167 struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
157 struct GNUNET_CRYPTO_EddsaPublicKey chan_key; 168 struct GNUNET_CRYPTO_EddsaPublicKey chan_key;
158 struct GNUNET_HashCode chan_key_hash;
159 169
160 struct GNUNET_MULTICAST_Member *member; 170 struct GNUNET_MULTICAST_Member *member;
161 struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle; 171 struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
162 172
163 struct GNUNET_PeerIdentity origin; 173 struct GNUNET_PeerIdentity origin;
174
175 uint32_t relay_count;
164 struct GNUNET_PeerIdentity *relays; 176 struct GNUNET_PeerIdentity *relays;
177
165 struct GNUNET_MessageHeader *join_req; 178 struct GNUNET_MessageHeader *join_req;
166 179
167 uint64_t max_message_id; 180 uint64_t max_message_id;
168 uint64_t max_request_id; 181 uint64_t max_request_id;
169 182
170 uint32_t relay_count; 183 struct GNUNET_HashCode chan_key_hash;
171}; 184};
172 185
173 186
174static inline void 187static inline void
175transmit_message (struct Channel *ch); 188transmit_message (struct Channel *ch, uint8_t inc_msg_id);
176 189
177 190
178/** 191/**
@@ -235,14 +248,14 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
235 if (NULL == client) 248 if (NULL == client)
236 return; 249 return;
237 250
238 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
239
240 struct Channel *ch 251 struct Channel *ch
241 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 252 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
254
242 if (NULL == ch) 255 if (NULL == ch)
243 { 256 {
244 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 257 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
245 "User context is NULL in client_disconnect()\n"); 258 "%p User context is NULL in client_disconnect()\n", ch);
246 GNUNET_break (0); 259 GNUNET_break (0);
247 return; 260 return;
248 } 261 }
@@ -252,7 +265,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
252 /* Send pending messages to multicast before cleanup. */ 265 /* Send pending messages to multicast before cleanup. */
253 if (NULL != ch->tmit_head) 266 if (NULL != ch->tmit_head)
254 { 267 {
255 transmit_message (ch); 268 transmit_message (ch, GNUNET_NO);
256 } 269 }
257 else 270 else
258 { 271 {
@@ -311,53 +324,22 @@ fragment_store_result (void *cls, int64_t result, const char *err_msg)
311 324
312 325
313/** 326/**
314 * Iterator callback for sending a message to a client.
315 *
316 * @see message_cb()
317 */
318static int
319message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
320 void *chan)
321{
322 const struct GNUNET_MessageHeader *msg = cls;
323 struct Channel *ch = chan;
324
325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
326 "Sending message of type %u and size %u to client 0x%zx.\n",
327 ntohs (msg->type), ntohs (msg->size), ch->client);
328
329 GNUNET_SERVER_notification_context_add (nc, ch->client);
330 GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO);
331
332 return GNUNET_YES;
333}
334
335
336/**
337 * Incoming message fragment from multicast. 327 * Incoming message fragment from multicast.
338 * 328 *
339 * Store it using PSYCstore and send it to all clients of the channel. 329 * Store it using PSYCstore and send it to the client of the channel.
340 */ 330 */
341static void 331static void
342message_cb (void *cls, const struct GNUNET_MessageHeader *msg) 332message_cb (struct Channel *ch,
333 const struct GNUNET_CRYPTO_EddsaPublicKey *chan_key,
334 const struct GNUNET_HashCode *chan_key_hash,
335 const struct GNUNET_MessageHeader *msg)
343{ 336{
344 uint16_t type = ntohs (msg->type); 337 uint16_t type = ntohs (msg->type);
345 uint16_t size = ntohs (msg->size); 338 uint16_t size = ntohs (msg->size);
346 339
347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 340 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
348 "Received message of type %u and size %u from multicast.\n", 341 "%p Received message of type %u and size %u from multicast.\n",
349 type, size); 342 ch, type, size);
350
351 struct Channel *ch = cls;
352 struct Master *mst = cls;
353 struct Slave *slv = cls;
354
355 /* const struct GNUNET_MULTICAST_MessageHeader *mmsg
356 = (const struct GNUNET_MULTICAST_MessageHeader *) msg; */
357 struct GNUNET_CRYPTO_EddsaPublicKey *chan_key
358 = ch->is_master ? &mst->pub_key : &slv->chan_key;
359 struct GNUNET_HashCode *chan_key_hash
360 = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash;
361 343
362 switch (type) 344 switch (type)
363 { 345 {
@@ -378,29 +360,19 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
378 360
379 const struct GNUNET_MULTICAST_MessageHeader *mmsg 361 const struct GNUNET_MULTICAST_MessageHeader *mmsg
380 = (const struct GNUNET_MULTICAST_MessageHeader *) msg; 362 = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
381 struct GNUNET_PSYC_MessageHeader *pmsg;
382
383 uint16_t size = ntohs (msg->size);
384 uint16_t psize = 0;
385 uint16_t pos = 0;
386 363
387 for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize) 364 if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
365 (const char *) &mmsg[1]))
388 { 366 {
389 const struct GNUNET_MessageHeader *pmsg 367 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
390 = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos); 368 "%p Received message with invalid parts from multicast. "
391 psize = ntohs (pmsg->size); 369 "Dropping message.\n", ch);
392 if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size) 370 GNUNET_break_op (0);
393 { 371 break;
394 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
395 "Received invalid message part of type %u and size %u "
396 "from multicast. Not sending to clients.\n",
397 ntohs (pmsg->type), psize);
398 GNUNET_break_op (0);
399 return;
400 }
401 } 372 }
402 373
403 psize = sizeof (*pmsg) + size - sizeof (*mmsg); 374 struct GNUNET_PSYC_MessageHeader *pmsg;
375 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
404 pmsg = GNUNET_malloc (psize); 376 pmsg = GNUNET_malloc (psize);
405 pmsg->header.size = htons (psize); 377 pmsg->header.size = htons (psize);
406 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 378 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
@@ -408,39 +380,116 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
408 380
409 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); 381 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
410 382
411 GNUNET_CONTAINER_multihashmap_get_multiple (clients, chan_key_hash, 383 GNUNET_SERVER_notification_context_add (nc, ch->client);
412 message_to_client, 384 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
413 (void *) pmsg); 385 (const struct GNUNET_MessageHeader *) pmsg,
386 GNUNET_NO);
414 GNUNET_free (pmsg); 387 GNUNET_free (pmsg);
415 break; 388 break;
416 } 389 }
417 default: 390 default:
418 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 391 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
419 "Discarding unknown message of type %u and size %u.\n", 392 "%p Dropping unknown message of type %u and size %u.\n",
420 type, size); 393 ch, type, size);
421 } 394 }
422} 395}
423 396
424 397
425/** 398/**
426 * Send a request received from multicast to a client. 399 * Incoming message fragment from multicast for a master.
427 */ 400 */
428static int 401static void
429request_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash, 402master_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
430 void *chan) 403{
404 struct Master *mst = cls;
405 GNUNET_assert (NULL != mst);
406
407 struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &mst->pub_key;
408 struct GNUNET_HashCode *chan_key_hash = &mst->pub_key_hash;
409
410 message_cb (&mst->channel, chan_key, chan_key_hash, msg);
411}
412
413
414/**
415 * Incoming message fragment from multicast for a slave.
416 */
417static void
418slave_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
431{ 419{
432 /* TODO */ 420 struct Slave *slv = cls;
421 GNUNET_assert (NULL != slv);
422
423 struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &slv->chan_key;
424 struct GNUNET_HashCode *chan_key_hash = &slv->chan_key_hash;
433 425
434 return GNUNET_YES; 426 message_cb (&slv->channel, chan_key, chan_key_hash, msg);
435} 427}
436 428
437 429
430/**
431 * Incoming request fragment from multicast for a master.
432 *
433 * @param cls Master.
434 * @param member_key Sending member's public key.
435 * @param msg The message.
436 * @param flags Request flags.
437 */
438static void 438static void
439request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, 439request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
440 const struct GNUNET_MessageHeader *req, 440 const struct GNUNET_MessageHeader *msg,
441 enum GNUNET_MULTICAST_MessageFlags flags) 441 enum GNUNET_MULTICAST_MessageFlags flags)
442{ 442{
443 struct Master *mst = cls;
444 struct Channel *ch = &mst->channel;
443 445
446 uint16_t type = ntohs (msg->type);
447 uint16_t size = ntohs (msg->size);
448
449 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
450 "%p Received request of type %u and size %u from multicast.\n",
451 ch, type, size);
452
453 switch (type)
454 {
455 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
456 {
457 const struct GNUNET_MULTICAST_RequestHeader *req
458 = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
459
460 if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*req),
461 (const char *) &req[1]))
462 {
463 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
464 "%p Dropping message with invalid parts "
465 "received from multicast.\n", ch);
466 GNUNET_break_op (0);
467 break;
468 }
469
470 struct GNUNET_PSYC_MessageHeader *pmsg;
471 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
472 pmsg = GNUNET_malloc (psize);
473 pmsg->header.size = htons (psize);
474 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
475 pmsg->message_id = req->request_id;
476 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
477
478 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
479
480 GNUNET_SERVER_notification_context_add (nc, ch->client);
481 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
482 (const struct GNUNET_MessageHeader *) pmsg,
483 GNUNET_NO);
484 GNUNET_free (pmsg);
485 break;
486 }
487 default:
488 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489 "%p Dropping unknown request of type %u and size %u.\n",
490 ch, type, size);
491 GNUNET_break_op (0);
492 }
444} 493}
445 494
446 495
@@ -470,7 +519,8 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
470 max_fragment_id + 1, 519 max_fragment_id + 1,
471 join_cb, membership_test_cb, 520 join_cb, membership_test_cb,
472 replay_fragment_cb, replay_message_cb, 521 replay_fragment_cb, replay_message_cb,
473 request_cb, message_cb, ch); 522 request_cb, master_message_cb, ch);
523 ch->ready = GNUNET_YES;
474 } 524 }
475 GNUNET_SERVER_notification_context_add (nc, ch->client); 525 GNUNET_SERVER_notification_context_add (nc, ch->client);
476 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, 526 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
@@ -505,7 +555,8 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
505 slv->join_req, join_cb, 555 slv->join_req, join_cb,
506 membership_test_cb, 556 membership_test_cb,
507 replay_fragment_cb, replay_message_cb, 557 replay_fragment_cb, replay_message_cb,
508 message_cb, ch); 558 slave_message_cb, ch);
559 ch->ready = GNUNET_YES;
509 } 560 }
510 561
511 GNUNET_SERVER_notification_context_add (nc, ch->client); 562 GNUNET_SERVER_notification_context_add (nc, ch->client);
@@ -529,9 +580,11 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
529 mst->channel.is_master = GNUNET_YES; 580 mst->channel.is_master = GNUNET_YES;
530 mst->policy = ntohl (req->policy); 581 mst->policy = ntohl (req->policy);
531 mst->priv_key = req->channel_key; 582 mst->priv_key = req->channel_key;
532 GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, 583 GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &mst->pub_key);
533 &mst->pub_key);
534 GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash); 584 GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash);
585 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586 "%p Master connected to channel %s.\n",
587 mst, GNUNET_h2s (&mst->pub_key_hash));
535 588
536 GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key, 589 GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
537 master_counters_cb, mst); 590 master_counters_cb, mst);
@@ -561,14 +614,20 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
561 &slv->chan_key_hash); 614 &slv->chan_key_hash);
562 slv->origin = req->origin; 615 slv->origin = req->origin;
563 slv->relay_count = ntohl (req->relay_count); 616 slv->relay_count = ntohl (req->relay_count);
617 if (0 < slv->relay_count)
618 {
619 const struct GNUNET_PeerIdentity *relays
620 = (const struct GNUNET_PeerIdentity *) &req[1];
621 slv->relays
622 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
623 uint32_t i;
624 for (i = 0; i < slv->relay_count; i++)
625 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
626 }
564 627
565 const struct GNUNET_PeerIdentity *relays 628 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
566 = (const struct GNUNET_PeerIdentity *) &req[1]; 629 "%p Slave connected to channel %s.\n",
567 slv->relays 630 slv, GNUNET_h2s (&slv->chan_key_hash));
568 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
569 uint32_t i;
570 for (i = 0; i < slv->relay_count; i++)
571 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
572 631
573 GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key, 632 GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
574 slave_counters_cb, slv); 633 slave_counters_cb, slv);
@@ -609,13 +668,14 @@ transmit_notify (void *cls, size_t *data_size, void *data)
609 668
610 if (NULL == tmit_msg || *data_size < tmit_msg->size) 669 if (NULL == tmit_msg || *data_size < tmit_msg->size)
611 { 670 {
612 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n"); 671 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
672 "%p transmit_notify: nothing to send.\n", ch);
613 *data_size = 0; 673 *data_size = 0;
614 return GNUNET_NO; 674 return GNUNET_NO;
615 } 675 }
616 676
617 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 677 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618 "transmit_notify: sending %u bytes.\n", tmit_msg->size); 678 "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
619 679
620 *data_size = tmit_msg->size; 680 *data_size = tmit_msg->size;
621 memcpy (data, tmit_msg->buf, *data_size); 681 memcpy (data, tmit_msg->buf, *data_size);
@@ -630,7 +690,7 @@ transmit_notify (void *cls, size_t *data_size, void *data)
630 { 690 {
631 if (NULL != ch->tmit_head) 691 if (NULL != ch->tmit_head)
632 { 692 {
633 transmit_message (ch); 693 transmit_message (ch, GNUNET_NO);
634 } 694 }
635 else if (ch->disconnected) 695 else if (ch->disconnected)
636 { 696 {
@@ -644,19 +704,55 @@ transmit_notify (void *cls, size_t *data_size, void *data)
644 704
645 705
646/** 706/**
707 * Callback for the transmit functions of multicast.
708 */
709static int
710master_transmit_notify (void *cls, size_t *data_size, void *data)
711{
712 int ret = transmit_notify (cls, data_size, data);
713
714 if (GNUNET_YES == ret)
715 {
716 struct Master *mst = cls;
717 mst->tmit_handle = NULL;
718 }
719 return ret;
720}
721
722
723/**
724 * Callback for the transmit functions of multicast.
725 */
726static int
727slave_transmit_notify (void *cls, size_t *data_size, void *data)
728{
729 int ret = transmit_notify (cls, data_size, data);
730
731 if (GNUNET_YES == ret)
732 {
733 struct Slave *slv = cls;
734 slv->tmit_handle = NULL;
735 }
736 return ret;
737}
738
739
740/**
647 * Transmit a message from a channel master to the multicast group. 741 * Transmit a message from a channel master to the multicast group.
648 */ 742 */
649static void 743static void
650master_transmit_message (struct Master *mst) 744master_transmit_message (struct Master *mst, uint8_t inc_msg_id)
651{ 745{
652 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n"); 746 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
653 mst->channel.tmit_task = 0; 747 mst->channel.tmit_task = 0;
654 if (NULL == mst->tmit_handle) 748 if (NULL == mst->tmit_handle)
655 { 749 {
750 if (GNUNET_YES == inc_msg_id)
751 mst->max_message_id++;
656 mst->tmit_handle 752 mst->tmit_handle
657 = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id, 753 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
658 mst->max_group_generation, 754 mst->max_group_generation,
659 transmit_notify, mst); 755 master_transmit_notify, mst);
660 } 756 }
661 else 757 else
662 { 758 {
@@ -669,14 +765,16 @@ master_transmit_message (struct Master *mst)
669 * Transmit a message from a channel slave to the multicast group. 765 * Transmit a message from a channel slave to the multicast group.
670 */ 766 */
671static void 767static void
672slave_transmit_message (struct Slave *slv) 768slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id)
673{ 769{
674 slv->channel.tmit_task = 0; 770 slv->channel.tmit_task = 0;
675 if (NULL == slv->tmit_handle) 771 if (NULL == slv->tmit_handle)
676 { 772 {
773 if (GNUNET_YES == inc_msg_id)
774 slv->max_message_id++;
677 slv->tmit_handle 775 slv->tmit_handle
678 = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id, 776 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
679 transmit_notify, slv); 777 slave_transmit_notify, slv);
680 } 778 }
681 else 779 else
682 { 780 {
@@ -686,11 +784,11 @@ slave_transmit_message (struct Slave *slv)
686 784
687 785
688static inline void 786static inline void
689transmit_message (struct Channel *ch) 787transmit_message (struct Channel *ch, uint8_t inc_msg_id)
690{ 788{
691 ch->is_master 789 ch->is_master
692 ? master_transmit_message ((struct Master *) ch) 790 ? master_transmit_message ((struct Master *) ch, inc_msg_id)
693 : slave_transmit_message ((struct Slave *) ch); 791 : slave_transmit_message ((struct Slave *) ch, inc_msg_id);
694} 792}
695 793
696 794
@@ -708,10 +806,9 @@ transmit_error (struct Channel *ch)
708 tmit_msg->size = sizeof (*msg); 806 tmit_msg->size = sizeof (*msg);
709 tmit_msg->state = ch->tmit_state; 807 tmit_msg->state = ch->tmit_state;
710 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); 808 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
711 transmit_message (ch); 809 transmit_message (ch, GNUNET_NO);
712 810
713 /* FIXME: cleanup */ 811 /* FIXME: cleanup */
714 GNUNET_SERVER_client_disconnect (ch->client);
715} 812}
716 813
717 814
@@ -720,40 +817,60 @@ transmit_error (struct Channel *ch)
720 */ 817 */
721static void 818static void
722handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, 819handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
723 const struct GNUNET_MessageHeader *msg) 820 const struct GNUNET_MessageHeader *msg)
724{ 821{
725 struct Channel *ch 822 struct Channel *ch
726 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 823 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
727 GNUNET_assert (NULL != ch); 824 GNUNET_assert (NULL != ch);
728 825
826 if (GNUNET_YES != ch->ready)
827 {
828 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
829 "%p Ignoring message from client, channel is not ready yet.\n",
830 ch);
831 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
832 return;
833 }
834
835 uint8_t inc_msg_id = GNUNET_NO;
729 uint16_t size = ntohs (msg->size); 836 uint16_t size = ntohs (msg->size);
730 uint16_t psize = 0, pos = 0; 837 uint16_t psize = 0, ptype = 0, pos = 0;
731 838
732 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) 839 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
733 { 840 {
734 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n"); 841 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
735 GNUNET_break (0); 842 GNUNET_break (0);
736 transmit_error (ch); 843 transmit_error (ch);
844 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
737 return; 845 return;
738 } 846 }
739 847
848 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849 "%p Received message from client.\n", ch);
850 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
851
740 for (pos = 0; sizeof (*msg) + pos < size; pos += psize) 852 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
741 { 853 {
742 const struct GNUNET_MessageHeader *pmsg 854 const struct GNUNET_MessageHeader *pmsg
743 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); 855 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
744 psize = ntohs (pmsg->size); 856 psize = ntohs (pmsg->size);
745 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 857 ptype = ntohs (pmsg->type);
746 "Received message part of type %u and size %u "
747 "from client.\n", ntohs (pmsg->type), psize);
748 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) 858 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
749 { 859 {
750 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 860 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
751 "Received invalid message part of type %u and size %u " 861 "%p Received invalid message part of type %u and size %u "
752 "from client.\n", ntohs (pmsg->type), psize); 862 "from client.\n", ch, ptype, psize);
753 GNUNET_break (0); 863 GNUNET_break (0);
754 transmit_error (ch); 864 transmit_error (ch);
865 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
755 return; 866 return;
756 } 867 }
868 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
869 "%p Received message part from client.\n", ch);
870 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
871
872 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
873 inc_msg_id = GNUNET_YES;
757 } 874 }
758 875
759 size -= sizeof (*msg); 876 size -= sizeof (*msg);
@@ -763,7 +880,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
763 tmit_msg->size = size; 880 tmit_msg->size = size;
764 tmit_msg->state = ch->tmit_state; 881 tmit_msg->state = ch->tmit_state;
765 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); 882 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
766 transmit_message (ch); 883 transmit_message (ch, inc_msg_id);
767 884
768 GNUNET_SERVER_receive_done (client, GNUNET_OK); 885 GNUNET_SERVER_receive_done (client, GNUNET_OK);
769}; 886};
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 90c07480a..1ffda5d08 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -27,7 +27,16 @@
27#ifndef PSYC_H 27#ifndef PSYC_H
28#define PSYC_H 28#define PSYC_H
29 29
30#include "gnunet_common.h" 30#include "platform.h"
31#include "gnunet_psyc_service.h"
32
33
34int
35GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data);
36
37void
38GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
39 const struct GNUNET_MessageHeader *msg);
31 40
32 41
33enum MessageState 42enum MessageState
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 20394bbce..8a1c9ffaa 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -48,12 +48,30 @@ struct OperationHandle
48 struct GNUNET_MessageHeader *msg; 48 struct GNUNET_MessageHeader *msg;
49}; 49};
50 50
51
52/**
53 * Handle for a pending PSYC transmission operation.
54 */
55struct GNUNET_PSYC_ChannelTransmitHandle
56{
57 struct GNUNET_PSYC_Channel *ch;
58 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
59 GNUNET_PSYC_TransmitNotifyData notify_data;
60 void *notify_cls;
61 enum MessageState state;
62};
63
51/** 64/**
52 * Handle to access PSYC channel operations for both the master and slaves. 65 * Handle to access PSYC channel operations for both the master and slaves.
53 */ 66 */
54struct GNUNET_PSYC_Channel 67struct GNUNET_PSYC_Channel
55{ 68{
56 /** 69 /**
70 * Transmission handle;
71 */
72 struct GNUNET_PSYC_ChannelTransmitHandle tmit;
73
74 /**
57 * Configuration to use. 75 * Configuration to use.
58 */ 76 */
59 const struct GNUNET_CONFIGURATION_Handle *cfg; 77 const struct GNUNET_CONFIGURATION_Handle *cfg;
@@ -124,6 +142,11 @@ struct GNUNET_PSYC_Channel
124 uint64_t recv_message_id; 142 uint64_t recv_message_id;
125 143
126 /** 144 /**
145 * Public key of the slave from which a message is being received.
146 */
147 struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
148
149 /**
127 * State of the currently being received message from the PSYC service. 150 * State of the currently being received message from the PSYC service.
128 */ 151 */
129 enum MessageState recv_state; 152 enum MessageState recv_state;
@@ -171,27 +194,12 @@ struct GNUNET_PSYC_Channel
171 194
172 195
173/** 196/**
174 * Handle for a pending PSYC transmission operation.
175 */
176struct GNUNET_PSYC_MasterTransmitHandle
177{
178 struct GNUNET_PSYC_Master *master;
179 GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod;
180 GNUNET_PSYC_MasterTransmitNotify notify_data;
181 void *notify_cls;
182 enum MessageState state;
183};
184
185
186/**
187 * Handle for the master of a PSYC channel. 197 * Handle for the master of a PSYC channel.
188 */ 198 */
189struct GNUNET_PSYC_Master 199struct GNUNET_PSYC_Master
190{ 200{
191 struct GNUNET_PSYC_Channel ch; 201 struct GNUNET_PSYC_Channel ch;
192 202
193 struct GNUNET_PSYC_MasterTransmitHandle *tmit;
194
195 GNUNET_PSYC_MasterStartCallback start_cb; 203 GNUNET_PSYC_MasterStartCallback start_cb;
196 204
197 uint64_t max_message_id; 205 uint64_t max_message_id;
@@ -204,6 +212,10 @@ struct GNUNET_PSYC_Master
204struct GNUNET_PSYC_Slave 212struct GNUNET_PSYC_Slave
205{ 213{
206 struct GNUNET_PSYC_Channel ch; 214 struct GNUNET_PSYC_Channel ch;
215
216 GNUNET_PSYC_SlaveJoinCallback join_cb;
217
218 uint64_t max_message_id;
207}; 219};
208 220
209 221
@@ -251,7 +263,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
251 263
252 264
253static void 265static void
254master_transmit_data (struct GNUNET_PSYC_Master *mst); 266channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
255 267
256 268
257/** 269/**
@@ -302,7 +314,8 @@ recv_reset (struct GNUNET_PSYC_Channel *ch)
302 ch->recv_state = MSG_STATE_START; 314 ch->recv_state = MSG_STATE_START;
303 ch->recv_flags = 0; 315 ch->recv_flags = 0;
304 ch->recv_message_id = 0; 316 ch->recv_message_id = 0;
305 ch->recv_mod_value_size =0; 317 //FIXME: ch->recv_slave_key = { 0 };
318 ch->recv_mod_value_size = 0;
306 ch->recv_mod_value_size_expected = 0; 319 ch->recv_mod_value_size_expected = 0;
307} 320}
308 321
@@ -379,8 +392,9 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
379 } 392 }
380 393
381 if (NULL != op 394 if (NULL != op
382 && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD 395 && (GNUNET_YES == end
383 < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) 396 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
397 < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
384 { 398 {
385 /* End of message or buffer is full, add it to transmission queue. */ 399 /* End of message or buffer is full, add it to transmission queue. */
386 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 400 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
@@ -390,6 +404,9 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
390 ch->tmit_ack_pending++; 404 ch->tmit_ack_pending++;
391 } 405 }
392 406
407 if (GNUNET_YES == end)
408 ch->in_transmit = GNUNET_NO;
409
393 transmit_next (ch); 410 transmit_next (ch);
394} 411}
395 412
@@ -400,15 +417,14 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
400 * @param mst Master handle. 417 * @param mst Master handle.
401 */ 418 */
402static void 419static void
403master_transmit_mod (struct GNUNET_PSYC_Master *mst) 420channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
404{ 421{
405 struct GNUNET_PSYC_Channel *ch = &mst->ch;
406 uint16_t max_data_size, data_size; 422 uint16_t max_data_size, data_size;
407 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; 423 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
408 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; 424 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
409 int notify_ret; 425 int notify_ret;
410 426
411 switch (mst->tmit->state) 427 switch (ch->tmit.state)
412 { 428 {
413 case MSG_STATE_MODIFIER: 429 case MSG_STATE_MODIFIER:
414 { 430 {
@@ -417,12 +433,11 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
417 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; 433 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
418 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); 434 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
419 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); 435 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
420 notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, 436 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
421 &data_size, &mod[1], &mod->oper); 437 &data_size, &mod[1], &mod->oper);
422 mod->name_size = strnlen ((char *) &mod[1], data_size); 438 mod->name_size = strnlen ((char *) &mod[1], data_size);
423 if (mod->name_size < data_size) 439 if (mod->name_size < data_size)
424 { 440 {
425 mod->oper = htons (mod->oper);
426 mod->value_size = htons (data_size - 1 - mod->name_size); 441 mod->value_size = htons (data_size - 1 - mod->name_size);
427 mod->name_size = htons (mod->name_size); 442 mod->name_size = htons (mod->name_size);
428 } 443 }
@@ -438,8 +453,8 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
438 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; 453 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
439 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); 454 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
440 msg->size = sizeof (struct GNUNET_MessageHeader); 455 msg->size = sizeof (struct GNUNET_MessageHeader);
441 notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, 456 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
442 &data_size, &msg[1], NULL); 457 &data_size, &msg[1], NULL);
443 break; 458 break;
444 } 459 }
445 default: 460 default:
@@ -454,27 +469,28 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
454 ch->tmit_paused = GNUNET_YES; 469 ch->tmit_paused = GNUNET_YES;
455 return; 470 return;
456 } 471 }
457 mst->tmit->state = MSG_STATE_MOD_CONT; 472 ch->tmit.state = MSG_STATE_MOD_CONT;
458 break; 473 break;
459 474
460 case GNUNET_YES: 475 case GNUNET_YES:
461 if (0 == data_size) 476 if (0 == data_size)
462 { 477 {
463 /* End of modifiers. */ 478 /* End of modifiers. */
464 mst->tmit->state = MSG_STATE_DATA; 479 ch->tmit.state = MSG_STATE_DATA;
465 if (0 == ch->tmit_ack_pending) 480 if (0 == ch->tmit_ack_pending)
466 master_transmit_data (mst); 481 channel_transmit_data (ch);
467 482
468 return; 483 return;
469 } 484 }
470 mst->tmit->state = MSG_STATE_MODIFIER; 485 ch->tmit.state = MSG_STATE_MODIFIER;
471 break; 486 break;
472 487
473 default: 488 default:
474 LOG (GNUNET_ERROR_TYPE_ERROR, 489 LOG (GNUNET_ERROR_TYPE_ERROR,
475 "MasterTransmitNotify returned error when requesting a modifier.\n"); 490 "MasterTransmitNotifyModifier returned error "
491 "when requesting a modifier.\n");
476 492
477 mst->tmit->state = MSG_STATE_START; 493 ch->tmit.state = MSG_STATE_CANCEL;
478 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 494 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
479 msg->size = htons (sizeof (*msg)); 495 msg->size = htons (sizeof (*msg));
480 496
@@ -489,7 +505,7 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
489 queue_message (ch, msg, GNUNET_NO); 505 queue_message (ch, msg, GNUNET_NO);
490 } 506 }
491 507
492 master_transmit_mod (mst); 508 channel_transmit_mod (ch);
493} 509}
494 510
495 511
@@ -499,17 +515,16 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
499 * @param mst Master handle. 515 * @param mst Master handle.
500 */ 516 */
501static void 517static void
502master_transmit_data (struct GNUNET_PSYC_Master *mst) 518channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
503{ 519{
504 struct GNUNET_PSYC_Channel *ch = &mst->ch;
505 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; 520 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
506 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; 521 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
507 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; 522 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
508 523
509 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); 524 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
510 525
511 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, 526 int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
512 &data_size, &msg[1]); 527 &data_size, &msg[1]);
513 switch (notify_ret) 528 switch (notify_ret)
514 { 529 {
515 case GNUNET_NO: 530 case GNUNET_NO:
@@ -522,14 +537,14 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
522 break; 537 break;
523 538
524 case GNUNET_YES: 539 case GNUNET_YES:
525 mst->tmit->state = MSG_STATE_START; 540 ch->tmit.state = MSG_STATE_END;
526 break; 541 break;
527 542
528 default: 543 default:
529 LOG (GNUNET_ERROR_TYPE_ERROR, 544 LOG (GNUNET_ERROR_TYPE_ERROR,
530 "MasterTransmitNotify returned error when requesting data.\n"); 545 "MasterTransmitNotify returned error when requesting data.\n");
531 546
532 mst->tmit->state = MSG_STATE_START; 547 ch->tmit.state = MSG_STATE_CANCEL;
533 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 548 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
534 msg->size = htons (sizeof (*msg)); 549 msg->size = htons (sizeof (*msg));
535 queue_message (ch, msg, GNUNET_YES); 550 queue_message (ch, msg, GNUNET_YES);
@@ -554,6 +569,86 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
554 569
555 570
556/** 571/**
572 * Send a message to a channel.
573 *
574 * @param ch Handle to the PSYC channel.
575 * @param method_name Which method should be invoked.
576 * @param notify_mod Function to call to obtain modifiers.
577 * @param notify_data Function to call to obtain fragments of the data.
578 * @param notify_cls Closure for @a notify_mod and @a notify_data.
579 * @param flags Flags for the message being transmitted.
580 * @return Transmission handle, NULL on error (i.e. more than one request queued).
581 */
582static struct GNUNET_PSYC_ChannelTransmitHandle *
583channel_transmit (struct GNUNET_PSYC_Channel *ch,
584 const char *method_name,
585 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
586 GNUNET_PSYC_TransmitNotifyData notify_data,
587 void *notify_cls,
588 uint32_t flags)
589{
590 if (GNUNET_NO != ch->in_transmit)
591 return NULL;
592 ch->in_transmit = GNUNET_YES;
593
594 size_t size = strlen (method_name) + 1;
595 struct GNUNET_PSYC_MessageMethod *pmeth;
596 struct OperationHandle *op;
597
598 ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
599 + sizeof (*pmeth) + size);
600 op->msg = (struct GNUNET_MessageHeader *) &op[1];
601 op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
602
603 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
604 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
605 pmeth->header.size = htons (sizeof (*pmeth) + size);
606 pmeth->flags = htonl (flags);
607 memcpy (&pmeth[1], method_name, size);
608
609 ch->tmit.ch = ch;
610 ch->tmit.notify_mod = notify_mod;
611 ch->tmit.notify_data = notify_data;
612 ch->tmit.notify_cls = notify_cls;
613 ch->tmit.state = MSG_STATE_MODIFIER;
614
615 channel_transmit_mod (ch);
616 return &ch->tmit;
617}
618
619
620/**
621 * Resume transmission to the channel.
622 *
623 * @param th Handle of the request that is being resumed.
624 */
625static void
626channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th)
627{
628 struct GNUNET_PSYC_Channel *ch = th->ch;
629 if (0 == ch->tmit_ack_pending)
630 {
631 ch->tmit_paused = GNUNET_NO;
632 channel_transmit_data (ch);
633 }
634}
635
636
637/**
638 * Abort transmission request to channel.
639 *
640 * @param th Handle of the request that is being aborted.
641 */
642static void
643channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
644{
645 struct GNUNET_PSYC_Channel *ch = th->ch;
646 if (GNUNET_NO == ch->in_transmit)
647 return;
648}
649
650
651/**
557 * Handle incoming message from the PSYC service. 652 * Handle incoming message from the PSYC service.
558 * 653 *
559 * @param ch The channel the message is sent to. 654 * @param ch The channel the message is sent to.
@@ -564,14 +659,20 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
564 const struct GNUNET_PSYC_MessageHeader *msg) 659 const struct GNUNET_PSYC_MessageHeader *msg)
565{ 660{
566 uint16_t size = ntohs (msg->header.size); 661 uint16_t size = ntohs (msg->header.size);
662 uint32_t flags = ntohl (msg->flags);
663
664 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
665 (struct GNUNET_MessageHeader *) msg);
567 666
568 if (MSG_STATE_START == ch->recv_state) 667 if (MSG_STATE_START == ch->recv_state)
569 { 668 {
570 ch->recv_message_id = GNUNET_ntohll (msg->message_id); 669 ch->recv_message_id = GNUNET_ntohll (msg->message_id);
571 ch->recv_flags = ntohl (msg->flags); 670 ch->recv_flags = flags;
671 ch->recv_slave_key = msg->slave_key;
572 } 672 }
573 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) 673 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
574 { 674 {
675 // FIXME
575 LOG (GNUNET_ERROR_TYPE_WARNING, 676 LOG (GNUNET_ERROR_TYPE_WARNING,
576 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", 677 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
577 GNUNET_ntohll (msg->message_id), ch->recv_message_id); 678 GNUNET_ntohll (msg->message_id), ch->recv_message_id);
@@ -579,11 +680,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
579 recv_error (ch); 680 recv_error (ch);
580 return; 681 return;
581 } 682 }
582 else if (ntohl (msg->flags) != ch->recv_flags) 683 else if (flags != ch->recv_flags)
583 { 684 {
584 LOG (GNUNET_ERROR_TYPE_WARNING, 685 LOG (GNUNET_ERROR_TYPE_WARNING,
585 "Unexpected message flags. Got: %lu, expected: %lu\n", 686 "Unexpected message flags. Got: %lu, expected: %lu\n",
586 ntohl (msg->flags), ch->recv_flags); 687 flags, ch->recv_flags);
587 GNUNET_break_op (0); 688 GNUNET_break_op (0);
588 recv_error (ch); 689 recv_error (ch);
589 return; 690 return;
@@ -599,10 +700,6 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
599 ptype = ntohs (pmsg->type); 700 ptype = ntohs (pmsg->type);
600 size_eq = size_min = 0; 701 size_eq = size_min = 0;
601 702
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603 "Received message part of type %u and size %u from PSYC.\n",
604 ptype, psize);
605
606 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) 703 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
607 { 704 {
608 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 705 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -612,6 +709,10 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
612 return; 709 return;
613 } 710 }
614 711
712 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
713 "Received message part from PSYC.\n");
714 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
715
615 switch (ptype) 716 switch (ptype)
616 { 717 {
617 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 718 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
@@ -758,6 +859,46 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
758 859
759 860
760/** 861/**
862 * Handle incoming message acknowledgement from the PSYC service.
863 *
864 * @param ch The channel the acknowledgement is sent to.
865 */
866static void
867handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
868{
869 if (0 == ch->tmit_ack_pending)
870 {
871 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
872 GNUNET_break (0);
873 return;
874 }
875 ch->tmit_ack_pending--;
876
877 switch (ch->tmit.state)
878 {
879 case MSG_STATE_MODIFIER:
880 case MSG_STATE_MOD_CONT:
881 if (GNUNET_NO == ch->tmit_paused)
882 channel_transmit_mod (ch);
883 break;
884
885 case MSG_STATE_DATA:
886 if (GNUNET_NO == ch->tmit_paused)
887 channel_transmit_data (ch);
888 break;
889
890 case MSG_STATE_END:
891 case MSG_STATE_CANCEL:
892 break;
893
894 default:
895 LOG (GNUNET_ERROR_TYPE_DEBUG,
896 "Ignoring message ACK in state %u.\n", ch->tmit.state);
897 }
898}
899
900
901/**
761 * Type of a function to call when we receive a message 902 * Type of a function to call when we receive a message
762 * from the service. 903 * from the service.
763 * 904 *
@@ -775,7 +916,7 @@ message_handler (void *cls,
775 916
776 if (NULL == msg) 917 if (NULL == msg)
777 { 918 {
778 GNUNET_break (0); 919 // timeout / disconnected from server, reconnect
779 reschedule_connect (ch); 920 reschedule_connect (ch);
780 return; 921 return;
781 } 922 }
@@ -824,63 +965,15 @@ message_handler (void *cls,
824 } 965 }
825 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: 966 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
826 { 967 {
827#if TODO
828 struct CountersResult *cres = (struct CountersResult *) msg; 968 struct CountersResult *cres = (struct CountersResult *) msg;
829 slv->max_message_id = GNUNET_ntohll (cres->max_message_id); 969 slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
830 if (NULL != slv->join_ack_cb) 970 if (NULL != slv->join_cb)
831 mst->join_ack_cb (ch->cb_cls, mst->max_message_id); 971 slv->join_cb (ch->cb_cls, slv->max_message_id);
832#endif
833 break; 972 break;
834 } 973 }
835 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: 974 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
836 { 975 {
837 if (0 == ch->tmit_ack_pending) 976 handle_psyc_message_ack (ch);
838 {
839 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
840 GNUNET_break (0);
841 break;
842 }
843 ch->tmit_ack_pending--;
844
845 if (ch->is_master)
846 {
847 GNUNET_assert (NULL != mst->tmit);
848 switch (mst->tmit->state)
849 {
850 case MSG_STATE_MODIFIER:
851 case MSG_STATE_MOD_CONT:
852 if (GNUNET_NO == ch->tmit_paused)
853 master_transmit_mod (mst);
854 break;
855
856 case MSG_STATE_DATA:
857 if (GNUNET_NO == ch->tmit_paused)
858 master_transmit_data (mst);
859 break;
860
861 case MSG_STATE_END:
862 case MSG_STATE_CANCEL:
863 if (NULL != mst->tmit)
864 {
865 GNUNET_free (mst->tmit);
866 mst->tmit = NULL;
867 }
868 else
869 {
870 LOG (GNUNET_ERROR_TYPE_WARNING,
871 "Ignoring message ACK, there's no transmission going on.\n");
872 GNUNET_break (0);
873 }
874 break;
875 default:
876 LOG (GNUNET_ERROR_TYPE_DEBUG,
877 "Ignoring message ACK in state %u.\n", mst->tmit->state);
878 }
879 }
880 else
881 {
882 /* TODO: slave */
883 }
884 break; 977 break;
885 } 978 }
886 979
@@ -1106,8 +1199,6 @@ void
1106GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) 1199GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
1107{ 1200{
1108 disconnect (master); 1201 disconnect (master);
1109 if (NULL != master->tmit)
1110 GNUNET_free (master->tmit);
1111 GNUNET_free (master); 1202 GNUNET_free (master);
1112} 1203}
1113 1204
@@ -1162,41 +1253,14 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1162struct GNUNET_PSYC_MasterTransmitHandle * 1253struct GNUNET_PSYC_MasterTransmitHandle *
1163GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, 1254GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1164 const char *method_name, 1255 const char *method_name,
1165 GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod, 1256 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1166 GNUNET_PSYC_MasterTransmitNotify notify_data, 1257 GNUNET_PSYC_TransmitNotifyData notify_data,
1167 void *notify_cls, 1258 void *notify_cls,
1168 enum GNUNET_PSYC_MasterTransmitFlags flags) 1259 enum GNUNET_PSYC_MasterTransmitFlags flags)
1169{ 1260{
1170 GNUNET_assert (NULL != master); 1261 return (struct GNUNET_PSYC_MasterTransmitHandle *)
1171 struct GNUNET_PSYC_Channel *ch = &master->ch; 1262 channel_transmit (&master->ch, method_name, notify_mod, notify_data,
1172 if (GNUNET_NO != ch->in_transmit) 1263 notify_cls, flags);
1173 return NULL;
1174 ch->in_transmit = GNUNET_YES;
1175
1176 size_t size = strlen (method_name) + 1;
1177 struct GNUNET_PSYC_MessageMethod *pmeth;
1178 struct OperationHandle *op;
1179
1180 ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
1181 + sizeof (*pmeth) + size);
1182 op->msg = (struct GNUNET_MessageHeader *) &op[1];
1183 op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
1184
1185 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
1186 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
1187 pmeth->header.size = htons (sizeof (*pmeth) + size);
1188 pmeth->flags = htonl (flags);
1189 memcpy (&pmeth[1], method_name, size);
1190
1191 master->tmit = GNUNET_malloc (sizeof (*master->tmit));
1192 master->tmit->master = master;
1193 master->tmit->notify_mod = notify_mod;
1194 master->tmit->notify_data = notify_data;
1195 master->tmit->notify_cls = notify_cls;
1196 master->tmit->state = MSG_STATE_MODIFIER;
1197
1198 master_transmit_mod (master);
1199 return master->tmit;
1200} 1264}
1201 1265
1202 1266
@@ -1208,12 +1272,7 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1208void 1272void
1209GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 1273GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1210{ 1274{
1211 struct GNUNET_PSYC_Channel *ch = &th->master->ch; 1275 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1212 if (0 == ch->tmit_ack_pending)
1213 {
1214 ch->tmit_paused = GNUNET_NO;
1215 master_transmit_data (th->master);
1216 }
1217} 1276}
1218 1277
1219 1278
@@ -1225,10 +1284,7 @@ GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1225void 1284void
1226GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) 1285GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
1227{ 1286{
1228 struct GNUNET_PSYC_Master *master = th->master; 1287 channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1229 struct GNUNET_PSYC_Channel *ch = &master->ch;
1230 if (GNUNET_NO != ch->in_transmit)
1231 return;
1232} 1288}
1233 1289
1234 1290
@@ -1282,15 +1338,15 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1282{ 1338{
1283 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); 1339 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
1284 struct GNUNET_PSYC_Channel *ch = &slv->ch; 1340 struct GNUNET_PSYC_Channel *ch = &slv->ch;
1285 struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req) 1341 struct SlaveJoinRequest *req
1286 + relay_count * sizeof (*relays)); 1342 = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
1287 req->header.size = htons (sizeof (*req) 1343 req->header.size = htons (sizeof (*req)
1288 + relay_count * sizeof (*relays)); 1344 + relay_count * sizeof (*relays));
1289 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); 1345 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
1290 req->channel_key = *channel_key; 1346 req->channel_key = *channel_key;
1291 req->slave_key = *slave_key; 1347 req->slave_key = *slave_key;
1292 req->origin = *origin; 1348 req->origin = *origin;
1293 req->relay_count = relay_count; 1349 req->relay_count = htonl (relay_count);
1294 memcpy (&req[1], relays, relay_count * sizeof (*relays)); 1350 memcpy (&req[1], relays, relay_count * sizeof (*relays));
1295 1351
1296 ch->message_cb = message_cb; 1352 ch->message_cb = message_cb;
@@ -1303,6 +1359,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1303 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 1359 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1304 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); 1360 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
1305 1361
1362 slv->join_cb = slave_joined_cb;
1306 return slv; 1363 return slv;
1307} 1364}
1308 1365
@@ -1328,9 +1385,8 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1328 * 1385 *
1329 * @param slave Slave handle. 1386 * @param slave Slave handle.
1330 * @param method_name Which (PSYC) method should be invoked (on host). 1387 * @param method_name Which (PSYC) method should be invoked (on host).
1331 * @param env Environment containing transient variables for the message, or 1388 * @param notify_mod Function to call to obtain modifiers.
1332 * NULL. 1389 * @param notify_data Function to call to obtain fragments of the data.
1333 * @param notify Function to call when we are allowed to transmit (to get data).
1334 * @param notify_cls Closure for @a notify. 1390 * @param notify_cls Closure for @a notify.
1335 * @param flags Flags for the message being transmitted. 1391 * @param flags Flags for the message being transmitted.
1336 * @return Transmission handle, NULL on error (i.e. more than one request 1392 * @return Transmission handle, NULL on error (i.e. more than one request
@@ -1339,12 +1395,14 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1339struct GNUNET_PSYC_SlaveTransmitHandle * 1395struct GNUNET_PSYC_SlaveTransmitHandle *
1340GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, 1396GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
1341 const char *method_name, 1397 const char *method_name,
1342 const struct GNUNET_ENV_Environment *env, 1398 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1343 GNUNET_PSYC_SlaveTransmitNotify notify, 1399 GNUNET_PSYC_TransmitNotifyData notify_data,
1344 void *notify_cls, 1400 void *notify_cls,
1345 enum GNUNET_PSYC_SlaveTransmitFlags flags) 1401 enum GNUNET_PSYC_SlaveTransmitFlags flags)
1346{ 1402{
1347 return NULL; 1403 return (struct GNUNET_PSYC_SlaveTransmitHandle *)
1404 channel_transmit (&slave->ch, method_name,
1405 notify_mod, notify_data, notify_cls, flags);
1348} 1406}
1349 1407
1350 1408
@@ -1356,7 +1414,7 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
1356void 1414void
1357GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 1415GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1358{ 1416{
1359 1417 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1360} 1418}
1361 1419
1362 1420
@@ -1368,7 +1426,7 @@ GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1368void 1426void
1369GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) 1427GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1370{ 1428{
1371 1429 channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1372} 1430}
1373 1431
1374 1432
@@ -1382,7 +1440,7 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1382struct GNUNET_PSYC_Channel * 1440struct GNUNET_PSYC_Channel *
1383GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) 1441GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1384{ 1442{
1385 return (struct GNUNET_PSYC_Channel *) master; 1443 return &master->ch;
1386} 1444}
1387 1445
1388 1446
@@ -1395,7 +1453,7 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1395struct GNUNET_PSYC_Channel * 1453struct GNUNET_PSYC_Channel *
1396GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) 1454GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
1397{ 1455{
1398 return (struct GNUNET_PSYC_Channel *) slave; 1456 return &slave->ch;
1399} 1457}
1400 1458
1401 1459
diff --git a/src/psyc/psyc_common.c b/src/psyc/psyc_common.c
new file mode 100644
index 000000000..7368011fc
--- /dev/null
+++ b/src/psyc/psyc_common.c
@@ -0,0 +1,100 @@
1/*
2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
4 *
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
19 */
20
21/**
22 * @file psyc/psyc_common.c
23 * @brief Common functions for PSYC
24 * @author Gabor X Toth
25 */
26
27#include <inttypes.h>
28#include "psyc.h"
29
30/**
31 * Check if @a data contains a series of valid message parts.
32 *
33 * @param data_size Size of @a data.
34 * @param data Data.
35 *
36 * @return GNUNET_YES or GNUNET_NO
37 */
38int
39GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data)
40{
41 const struct GNUNET_MessageHeader *pmsg;
42 uint16_t psize = 0;
43 uint16_t pos = 0;
44
45 for (pos = 0; data_size + pos < data_size; pos += psize)
46 {
47 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
48 psize = ntohs (pmsg->size);
49 if (psize < sizeof (*pmsg) || data_size + pos + psize > data_size)
50 {
51 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
52 "Invalid message part of type %u and size %u.",
53 ntohs (pmsg->type), psize);
54 return GNUNET_NO;
55 }
56 }
57 return GNUNET_YES;
58}
59
60
61void
62GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
63 const struct GNUNET_MessageHeader *msg)
64{
65 uint16_t size = ntohs (msg->size);
66 uint16_t type = ntohs (msg->type);
67 GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
68 switch (type)
69 {
70 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
71 {
72 struct GNUNET_PSYC_MessageHeader *pmsg
73 = (struct GNUNET_PSYC_MessageHeader *) msg;
74 GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %" PRIu32 "\n",
75 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
76 break;
77 }
78 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
79 {
80 struct GNUNET_PSYC_MessageMethod *meth
81 = (struct GNUNET_PSYC_MessageMethod *) msg;
82 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
83 break;
84 }
85 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
86 {
87 struct GNUNET_PSYC_MessageModifier *mod
88 = (struct GNUNET_PSYC_MessageModifier *) msg;
89 uint16_t name_size = ntohs (mod->name_size);
90 char oper = ' ' < mod->oper ? mod->oper : ' ';
91 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
92 ntohs (mod->value_size), ((char *) &mod[1]) + name_size + 1);
93 break;
94 }
95 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
96 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
97 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
98 break;
99 }
100}
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 33684b125..88947be60 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -37,7 +37,7 @@
37 37
38#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) 38#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
39 39
40#define DEBUG_SERVICE 0 40#define DEBUG_SERVICE 1
41 41
42 42
43/** 43/**
@@ -66,7 +66,8 @@ struct GNUNET_PSYC_MasterTransmitHandle *mth;
66 66
67struct TransmitClosure 67struct TransmitClosure
68{ 68{
69 struct GNUNET_PSYC_MasterTransmitHandle *handle; 69 struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit;
70 struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit;
70 struct GNUNET_ENV_Environment *env; 71 struct GNUNET_ENV_Environment *env;
71 char *data[16]; 72 char *data[16];
72 const char *mod_value; 73 const char *mod_value;
@@ -78,12 +79,30 @@ struct TransmitClosure
78 79
79struct TransmitClosure *tmit; 80struct TransmitClosure *tmit;
80 81
82
83enum
84{
85 TEST_NONE,
86 TEST_SLAVE_TRANSMIT,
87 TEST_MASTER_TRANSMIT,
88} test;
89
90
91static void
92master_transmit ();
93
94
81/** 95/**
82 * Clean up all resources used. 96 * Clean up all resources used.
83 */ 97 */
84static void 98static void
85cleanup () 99cleanup ()
86{ 100{
101 if (NULL != slv)
102 {
103 GNUNET_PSYC_slave_part (slv);
104 slv = NULL;
105 }
87 if (NULL != mst) 106 if (NULL != mst)
88 { 107 {
89 GNUNET_PSYC_master_stop (mst); 108 GNUNET_PSYC_master_stop (mst);
@@ -133,6 +152,8 @@ end_normally (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
133static void 152static void
134end () 153end ()
135{ 154{
155 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending tests.\n");
156
136 if (end_badly_task != GNUNET_SCHEDULER_NO_TASK) 157 if (end_badly_task != GNUNET_SCHEDULER_NO_TASK)
137 { 158 {
138 GNUNET_SCHEDULER_cancel (end_badly_task); 159 GNUNET_SCHEDULER_cancel (end_badly_task);
@@ -144,8 +165,8 @@ end ()
144 165
145 166
146static void 167static void
147message (void *cls, uint64_t message_id, uint32_t flags, 168master_message (void *cls, uint64_t message_id, uint32_t flags,
148 const struct GNUNET_MessageHeader *msg) 169 const struct GNUNET_MessageHeader *msg)
149{ 170{
150 if (NULL == msg) 171 if (NULL == msg)
151 { 172 {
@@ -158,12 +179,64 @@ message (void *cls, uint64_t message_id, uint32_t flags,
158 uint16_t size = ntohs (msg->size); 179 uint16_t size = ntohs (msg->size);
159 180
160 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 181 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
161 "Got message part of type %u and size %u " 182 "Master got message part of type %u and size %u "
162 "belonging to message ID %llu with flags %u\n", 183 "belonging to message ID %llu with flags %u\n",
163 type, size, message_id, flags); 184 type, size, message_id, flags);
164 185
165 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) 186 switch (test)
166 end (); 187 {
188 case TEST_SLAVE_TRANSMIT:
189 if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
190 {
191 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
192 "Unexpected request flags: %lu\n", flags);
193 GNUNET_assert (0);
194 return;
195 }
196 // FIXME: check rest of message
197
198 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
199 master_transmit ();
200 break;
201
202 case TEST_MASTER_TRANSMIT:
203 break;
204
205 default:
206 GNUNET_assert (0);
207 }
208}
209
210
211static void
212slave_message (void *cls, uint64_t message_id, uint32_t flags,
213 const struct GNUNET_MessageHeader *msg)
214{
215 if (NULL == msg)
216 {
217 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
218 "Error while receiving message %llu\n", message_id);
219 return;
220 }
221
222 uint16_t type = ntohs (msg->type);
223 uint16_t size = ntohs (msg->size);
224
225 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
226 "Slave got message part of type %u and size %u "
227 "belonging to message ID %llu with flags %u\n",
228 type, size, message_id, flags);
229
230 switch (test)
231 {
232 case TEST_MASTER_TRANSMIT:
233 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
234 end ();
235 break;
236
237 default:
238 GNUNET_assert (0);
239 }
167} 240}
168 241
169 242
@@ -175,7 +248,9 @@ join_request (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
175 struct GNUNET_PSYC_JoinHandle *jh) 248 struct GNUNET_PSYC_JoinHandle *jh)
176{ 249{
177 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 250 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
178 "Got join request."); 251 "Got join request: %s (%zu vars)", method_name, variable_count);
252 GNUNET_PSYC_join_decision (jh, GNUNET_YES, 0, NULL, "_notice_join", NULL,
253 "you're in", 9);
179} 254}
180 255
181 256
@@ -185,7 +260,7 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
185 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); 260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
186 struct TransmitClosure *tmit = cls; 261 struct TransmitClosure *tmit = cls;
187 tmit->paused = GNUNET_NO; 262 tmit->paused = GNUNET_NO;
188 GNUNET_PSYC_master_transmit_resume (tmit->handle); 263 GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
189} 264}
190 265
191 266
@@ -204,35 +279,8 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper)
204 uint16_t name_size = 0; 279 uint16_t name_size = 0;
205 size_t value_size = 0; 280 size_t value_size = 0;
206 281
207 if (NULL != tmit->mod_value && 0 < tmit->mod_value_size) 282 if (NULL != oper)
208 { /* Modifier continuation */ 283 { /* New modifier */
209 value = tmit->mod_value;
210 if (tmit->mod_value_size <= *data_size)
211 {
212 value_size = tmit->mod_value_size;
213 tmit->mod_value = NULL;
214 }
215 else
216 {
217 value_size = *data_size;
218 tmit->mod_value += value_size;
219 }
220 tmit->mod_value_size -= value_size;
221
222 if (*data_size < value_size)
223 {
224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
225 "value larger than buffer: %u < %zu\n",
226 *data_size, value_size);
227 *data_size = 0;
228 return GNUNET_NO;
229 }
230
231 *data_size = value_size;
232 memcpy (data, value, value_size);
233 }
234 else if (NULL != oper)
235 {
236 if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name, 284 if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name,
237 (void *) &value, &value_size)) 285 (void *) &value, &value_size))
238 { /* No more modifiers, continue with data */ 286 { /* No more modifiers, continue with data */
@@ -259,6 +307,33 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper)
259 ((char *)data)[name_size] = '\0'; 307 ((char *)data)[name_size] = '\0';
260 memcpy ((char *)data + name_size + 1, value, value_size); 308 memcpy ((char *)data + name_size + 1, value, value_size);
261 } 309 }
310 else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
311 { /* Modifier continuation */
312 value = tmit->mod_value;
313 if (tmit->mod_value_size <= *data_size)
314 {
315 value_size = tmit->mod_value_size;
316 tmit->mod_value = NULL;
317 }
318 else
319 {
320 value_size = *data_size;
321 tmit->mod_value += value_size;
322 }
323 tmit->mod_value_size -= value_size;
324
325 if (*data_size < value_size)
326 {
327 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
328 "value larger than buffer: %u < %zu\n",
329 *data_size, value_size);
330 *data_size = 0;
331 return GNUNET_NO;
332 }
333
334 *data_size = value_size;
335 memcpy (data, value, value_size);
336 }
262 337
263 return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO; 338 return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO;
264} 339}
@@ -268,6 +343,12 @@ static int
268tmit_notify_data (void *cls, uint16_t *data_size, void *data) 343tmit_notify_data (void *cls, uint16_t *data_size, void *data)
269{ 344{
270 struct TransmitClosure *tmit = cls; 345 struct TransmitClosure *tmit = cls;
346 if (0 == tmit->data_count)
347 {
348 *data_size = 0;
349 return GNUNET_YES;
350 }
351
271 uint16_t size = strlen (tmit->data[tmit->n]); 352 uint16_t size = strlen (tmit->data[tmit->n]);
272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
273 "Transmit notify data: %lu bytes available, " 354 "Transmit notify data: %lu bytes available, "
@@ -300,10 +381,52 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data)
300 381
301 382
302static void 383static void
303master_started (void *cls, uint64_t max_message_id) 384slave_joined (void *cls, uint64_t max_message_id)
304{ 385{
305 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 386 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id);
306 "Master started: %" PRIu64 "\n", max_message_id); 387 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n");
388
389 test = TEST_SLAVE_TRANSMIT;
390
391 tmit = GNUNET_new (struct TransmitClosure);
392 tmit->env = GNUNET_ENV_environment_create ();
393 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
394 "_abc", "abc def", 7);
395 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
396 "_abc_def", "abc def ghi", 11);
397 tmit->n = 0;
398 tmit->data[0] = "slave test";
399 tmit->data_count = 1;
400 tmit->slv_tmit
401 = GNUNET_PSYC_slave_transmit (slv, "_request_test", tmit_notify_mod,
402 tmit_notify_data, tmit,
403 GNUNET_PSYC_SLAVE_TRANSMIT_NONE);
404}
405
406static void
407slave_join ()
408{
409 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
410
411 struct GNUNET_PeerIdentity origin;
412 struct GNUNET_PeerIdentity relays[16];
413 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
414 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
415 "_foo", "bar baz", 7);
416 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
417 "_foo_bar", "foo bar baz", 11);
418 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
419 16, relays, &slave_message, &join_request, &slave_joined,
420 NULL, "_request_join", env, "some data", 9);
421 GNUNET_ENV_environment_destroy (env);
422}
423
424
425static void
426master_transmit ()
427{
428 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
429 test = TEST_MASTER_TRANSMIT;
307 430
308 tmit = GNUNET_new (struct TransmitClosure); 431 tmit = GNUNET_new (struct TransmitClosure);
309 tmit->env = GNUNET_ENV_environment_create (); 432 tmit->env = GNUNET_ENV_environment_create ();
@@ -315,17 +438,19 @@ master_started (void *cls, uint64_t max_message_id)
315 tmit->data[1] = "foo bar"; 438 tmit->data[1] = "foo bar";
316 tmit->data[2] = "foo bar baz"; 439 tmit->data[2] = "foo bar baz";
317 tmit->data_count = 3; 440 tmit->data_count = 3;
318 tmit->handle 441 tmit->mst_tmit
319 = GNUNET_PSYC_master_transmit (mst, "_test", tmit_notify_mod, 442 = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod,
320 tmit_notify_data, tmit, 443 tmit_notify_data, tmit,
321 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); 444 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
322} 445}
323 446
324 447
325static void 448static void
326slave_joined (void *cls, uint64_t max_message_id) 449master_started (void *cls, uint64_t max_message_id)
327{ 450{
328 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id); 451 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
452 "Master started: %" PRIu64 "\n", max_message_id);
453 slave_join ();
329} 454}
330 455
331 456
@@ -355,21 +480,9 @@ run (void *cls,
355 GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); 480 GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key);
356 GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key); 481 GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key);
357 482
483 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
358 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, 484 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE,
359 &message, &join_request, &master_started, NULL); 485 &master_message, &join_request, &master_started, NULL);
360 return; /* FIXME: test slave */
361
362 struct GNUNET_PeerIdentity origin;
363 struct GNUNET_PeerIdentity relays[16];
364 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
365 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
366 "_foo", "bar baz", 7);
367 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
368 "_foo_bar", "foo bar baz", 11);
369 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
370 16, relays, &message, &join_request, &slave_joined,
371 NULL, "_request_join", env, "some data", 9);
372 GNUNET_ENV_environment_destroy (env);
373} 486}
374 487
375 488