diff options
author | Gabor X Toth <*@tg-x.net> | 2014-03-06 23:46:45 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-03-06 23:46:45 +0000 |
commit | 8a0b8a4604526e5f832c4971f9c3b1b48d79bea4 (patch) | |
tree | dfd18a61272a18381fe9ce9b09849a965480a303 /src/psyc | |
parent | a21beab58c1d2abc747359a98326f19aaad4e8cd (diff) | |
download | gnunet-8a0b8a4604526e5f832c4971f9c3b1b48d79bea4.tar.gz gnunet-8a0b8a4604526e5f832c4971f9c3b1b48d79bea4.zip |
PSYC: implement slave to master requests, tests, fixes, reorg
Multicast lib: handle member to origin requests.
Keep track of members and origins and call their callbacks when necessary.
Diffstat (limited to 'src/psyc')
-rw-r--r-- | src/psyc/Makefile.am | 6 | ||||
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 341 | ||||
-rw-r--r-- | src/psyc/psyc.h | 11 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 370 | ||||
-rw-r--r-- | src/psyc/psyc_common.c | 100 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 231 |
6 files changed, 729 insertions, 330 deletions
diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am index 7860b3995..162b42b2b 100644 --- a/src/psyc/Makefile.am +++ b/src/psyc/Makefile.am | |||
@@ -21,7 +21,7 @@ lib_LTLIBRARIES = libgnunetpsyc.la | |||
21 | 21 | ||
22 | libgnunetpsyc_la_SOURCES = \ | 22 | libgnunetpsyc_la_SOURCES = \ |
23 | psyc_api.c \ | 23 | psyc_api.c \ |
24 | psyc.h | 24 | psyc_common.c |
25 | libgnunetpsyc_la_LIBADD = \ | 25 | libgnunetpsyc_la_LIBADD = \ |
26 | $(top_builddir)/src/util/libgnunetutil.la \ | 26 | $(top_builddir)/src/util/libgnunetutil.la \ |
27 | $(top_builddir)/src/env/libgnunetenv.la \ | 27 | $(top_builddir)/src/env/libgnunetenv.la \ |
@@ -39,7 +39,8 @@ libexec_PROGRAMS = \ | |||
39 | gnunet-service-psyc | 39 | gnunet-service-psyc |
40 | 40 | ||
41 | gnunet_service_psyc_SOURCES = \ | 41 | gnunet_service_psyc_SOURCES = \ |
42 | gnunet-service-psyc.c | 42 | gnunet-service-psyc.c \ |
43 | psyc_common.c | ||
43 | gnunet_service_psyc_LDADD = \ | 44 | gnunet_service_psyc_LDADD = \ |
44 | $(top_builddir)/src/statistics/libgnunetstatistics.la \ | 45 | $(top_builddir)/src/statistics/libgnunetstatistics.la \ |
45 | $(top_builddir)/src/util/libgnunetutil.la \ | 46 | $(top_builddir)/src/util/libgnunetutil.la \ |
@@ -51,6 +52,7 @@ gnunet_service_psyc_DEPENDENCIES = \ | |||
51 | $(top_builddir)/src/util/libgnunetutil.la \ | 52 | $(top_builddir)/src/util/libgnunetutil.la \ |
52 | $(top_builddir)/src/multicast/libgnunetmulticast.la \ | 53 | $(top_builddir)/src/multicast/libgnunetmulticast.la \ |
53 | $(top_builddir)/src/psycstore/libgnunetpsycstore.la | 54 | $(top_builddir)/src/psycstore/libgnunetpsycstore.la |
55 | gnunet_service_psyc_CFLAGS = $(AM_CFLAGS) | ||
54 | 56 | ||
55 | 57 | ||
56 | if HAVE_TESTING | 58 | if HAVE_TESTING |
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index e5de7dcda..dcb5031f1 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -56,6 +56,7 @@ static struct GNUNET_SERVER_NotificationContext *nc; | |||
56 | static struct GNUNET_PSYCSTORE_Handle *store; | 56 | static struct GNUNET_PSYCSTORE_Handle *store; |
57 | 57 | ||
58 | /** | 58 | /** |
59 | * All connected masters and slaves. | ||
59 | * Channel's pub_key_hash -> struct Channel | 60 | * Channel's pub_key_hash -> struct Channel |
60 | */ | 61 | */ |
61 | static struct GNUNET_CONTAINER_MultiHashMap *clients; | 62 | static struct GNUNET_CONTAINER_MultiHashMap *clients; |
@@ -105,6 +106,15 @@ struct Channel | |||
105 | 106 | ||
106 | uint8_t in_transmit; | 107 | uint8_t in_transmit; |
107 | uint8_t is_master; | 108 | uint8_t is_master; |
109 | |||
110 | /** | ||
111 | * Ready to receive messages from client. | ||
112 | */ | ||
113 | uint8_t ready; | ||
114 | |||
115 | /** | ||
116 | * Client disconnected. | ||
117 | */ | ||
108 | uint8_t disconnected; | 118 | uint8_t disconnected; |
109 | }; | 119 | }; |
110 | 120 | ||
@@ -116,7 +126,6 @@ struct Master | |||
116 | struct Channel channel; | 126 | struct Channel channel; |
117 | struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; | 127 | struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; |
118 | struct GNUNET_CRYPTO_EddsaPublicKey pub_key; | 128 | struct GNUNET_CRYPTO_EddsaPublicKey pub_key; |
119 | struct GNUNET_HashCode pub_key_hash; | ||
120 | 129 | ||
121 | struct GNUNET_MULTICAST_Origin *origin; | 130 | struct GNUNET_MULTICAST_Origin *origin; |
122 | struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; | 131 | struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; |
@@ -144,6 +153,8 @@ struct Master | |||
144 | * @see enum GNUNET_PSYC_Policy | 153 | * @see enum GNUNET_PSYC_Policy |
145 | */ | 154 | */ |
146 | uint32_t policy; | 155 | uint32_t policy; |
156 | |||
157 | struct GNUNET_HashCode pub_key_hash; | ||
147 | }; | 158 | }; |
148 | 159 | ||
149 | 160 | ||
@@ -155,24 +166,26 @@ struct Slave | |||
155 | struct Channel channel; | 166 | struct Channel channel; |
156 | struct GNUNET_CRYPTO_EddsaPrivateKey slave_key; | 167 | struct GNUNET_CRYPTO_EddsaPrivateKey slave_key; |
157 | struct GNUNET_CRYPTO_EddsaPublicKey chan_key; | 168 | struct GNUNET_CRYPTO_EddsaPublicKey chan_key; |
158 | struct GNUNET_HashCode chan_key_hash; | ||
159 | 169 | ||
160 | struct GNUNET_MULTICAST_Member *member; | 170 | struct GNUNET_MULTICAST_Member *member; |
161 | struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle; | 171 | struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle; |
162 | 172 | ||
163 | struct GNUNET_PeerIdentity origin; | 173 | struct GNUNET_PeerIdentity origin; |
174 | |||
175 | uint32_t relay_count; | ||
164 | struct GNUNET_PeerIdentity *relays; | 176 | struct GNUNET_PeerIdentity *relays; |
177 | |||
165 | struct GNUNET_MessageHeader *join_req; | 178 | struct GNUNET_MessageHeader *join_req; |
166 | 179 | ||
167 | uint64_t max_message_id; | 180 | uint64_t max_message_id; |
168 | uint64_t max_request_id; | 181 | uint64_t max_request_id; |
169 | 182 | ||
170 | uint32_t relay_count; | 183 | struct GNUNET_HashCode chan_key_hash; |
171 | }; | 184 | }; |
172 | 185 | ||
173 | 186 | ||
174 | static inline void | 187 | static inline void |
175 | transmit_message (struct Channel *ch); | 188 | transmit_message (struct Channel *ch, uint8_t inc_msg_id); |
176 | 189 | ||
177 | 190 | ||
178 | /** | 191 | /** |
@@ -235,14 +248,14 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
235 | if (NULL == client) | 248 | if (NULL == client) |
236 | return; | 249 | return; |
237 | 250 | ||
238 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client); | ||
239 | |||
240 | struct Channel *ch | 251 | struct Channel *ch |
241 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 252 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
253 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch); | ||
254 | |||
242 | if (NULL == ch) | 255 | if (NULL == ch) |
243 | { | 256 | { |
244 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 257 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
245 | "User context is NULL in client_disconnect()\n"); | 258 | "%p User context is NULL in client_disconnect()\n", ch); |
246 | GNUNET_break (0); | 259 | GNUNET_break (0); |
247 | return; | 260 | return; |
248 | } | 261 | } |
@@ -252,7 +265,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
252 | /* Send pending messages to multicast before cleanup. */ | 265 | /* Send pending messages to multicast before cleanup. */ |
253 | if (NULL != ch->tmit_head) | 266 | if (NULL != ch->tmit_head) |
254 | { | 267 | { |
255 | transmit_message (ch); | 268 | transmit_message (ch, GNUNET_NO); |
256 | } | 269 | } |
257 | else | 270 | else |
258 | { | 271 | { |
@@ -311,53 +324,22 @@ fragment_store_result (void *cls, int64_t result, const char *err_msg) | |||
311 | 324 | ||
312 | 325 | ||
313 | /** | 326 | /** |
314 | * Iterator callback for sending a message to a client. | ||
315 | * | ||
316 | * @see message_cb() | ||
317 | */ | ||
318 | static int | ||
319 | message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash, | ||
320 | void *chan) | ||
321 | { | ||
322 | const struct GNUNET_MessageHeader *msg = cls; | ||
323 | struct Channel *ch = chan; | ||
324 | |||
325 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
326 | "Sending message of type %u and size %u to client 0x%zx.\n", | ||
327 | ntohs (msg->type), ntohs (msg->size), ch->client); | ||
328 | |||
329 | GNUNET_SERVER_notification_context_add (nc, ch->client); | ||
330 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO); | ||
331 | |||
332 | return GNUNET_YES; | ||
333 | } | ||
334 | |||
335 | |||
336 | /** | ||
337 | * Incoming message fragment from multicast. | 327 | * Incoming message fragment from multicast. |
338 | * | 328 | * |
339 | * Store it using PSYCstore and send it to all clients of the channel. | 329 | * Store it using PSYCstore and send it to the client of the channel. |
340 | */ | 330 | */ |
341 | static void | 331 | static void |
342 | message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | 332 | message_cb (struct Channel *ch, |
333 | const struct GNUNET_CRYPTO_EddsaPublicKey *chan_key, | ||
334 | const struct GNUNET_HashCode *chan_key_hash, | ||
335 | const struct GNUNET_MessageHeader *msg) | ||
343 | { | 336 | { |
344 | uint16_t type = ntohs (msg->type); | 337 | uint16_t type = ntohs (msg->type); |
345 | uint16_t size = ntohs (msg->size); | 338 | uint16_t size = ntohs (msg->size); |
346 | 339 | ||
347 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 340 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
348 | "Received message of type %u and size %u from multicast.\n", | 341 | "%p Received message of type %u and size %u from multicast.\n", |
349 | type, size); | 342 | ch, type, size); |
350 | |||
351 | struct Channel *ch = cls; | ||
352 | struct Master *mst = cls; | ||
353 | struct Slave *slv = cls; | ||
354 | |||
355 | /* const struct GNUNET_MULTICAST_MessageHeader *mmsg | ||
356 | = (const struct GNUNET_MULTICAST_MessageHeader *) msg; */ | ||
357 | struct GNUNET_CRYPTO_EddsaPublicKey *chan_key | ||
358 | = ch->is_master ? &mst->pub_key : &slv->chan_key; | ||
359 | struct GNUNET_HashCode *chan_key_hash | ||
360 | = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash; | ||
361 | 343 | ||
362 | switch (type) | 344 | switch (type) |
363 | { | 345 | { |
@@ -378,29 +360,19 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | |||
378 | 360 | ||
379 | const struct GNUNET_MULTICAST_MessageHeader *mmsg | 361 | const struct GNUNET_MULTICAST_MessageHeader *mmsg |
380 | = (const struct GNUNET_MULTICAST_MessageHeader *) msg; | 362 | = (const struct GNUNET_MULTICAST_MessageHeader *) msg; |
381 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
382 | |||
383 | uint16_t size = ntohs (msg->size); | ||
384 | uint16_t psize = 0; | ||
385 | uint16_t pos = 0; | ||
386 | 363 | ||
387 | for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize) | 364 | if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg), |
365 | (const char *) &mmsg[1])) | ||
388 | { | 366 | { |
389 | const struct GNUNET_MessageHeader *pmsg | 367 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
390 | = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos); | 368 | "%p Received message with invalid parts from multicast. " |
391 | psize = ntohs (pmsg->size); | 369 | "Dropping message.\n", ch); |
392 | if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size) | 370 | GNUNET_break_op (0); |
393 | { | 371 | break; |
394 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
395 | "Received invalid message part of type %u and size %u " | ||
396 | "from multicast. Not sending to clients.\n", | ||
397 | ntohs (pmsg->type), psize); | ||
398 | GNUNET_break_op (0); | ||
399 | return; | ||
400 | } | ||
401 | } | 372 | } |
402 | 373 | ||
403 | psize = sizeof (*pmsg) + size - sizeof (*mmsg); | 374 | struct GNUNET_PSYC_MessageHeader *pmsg; |
375 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); | ||
404 | pmsg = GNUNET_malloc (psize); | 376 | pmsg = GNUNET_malloc (psize); |
405 | pmsg->header.size = htons (psize); | 377 | pmsg->header.size = htons (psize); |
406 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | 378 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); |
@@ -408,39 +380,116 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | |||
408 | 380 | ||
409 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); | 381 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); |
410 | 382 | ||
411 | GNUNET_CONTAINER_multihashmap_get_multiple (clients, chan_key_hash, | 383 | GNUNET_SERVER_notification_context_add (nc, ch->client); |
412 | message_to_client, | 384 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, |
413 | (void *) pmsg); | 385 | (const struct GNUNET_MessageHeader *) pmsg, |
386 | GNUNET_NO); | ||
414 | GNUNET_free (pmsg); | 387 | GNUNET_free (pmsg); |
415 | break; | 388 | break; |
416 | } | 389 | } |
417 | default: | 390 | default: |
418 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 391 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
419 | "Discarding unknown message of type %u and size %u.\n", | 392 | "%p Dropping unknown message of type %u and size %u.\n", |
420 | type, size); | 393 | ch, type, size); |
421 | } | 394 | } |
422 | } | 395 | } |
423 | 396 | ||
424 | 397 | ||
425 | /** | 398 | /** |
426 | * Send a request received from multicast to a client. | 399 | * Incoming message fragment from multicast for a master. |
427 | */ | 400 | */ |
428 | static int | 401 | static void |
429 | request_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash, | 402 | master_message_cb (void *cls, const struct GNUNET_MessageHeader *msg) |
430 | void *chan) | 403 | { |
404 | struct Master *mst = cls; | ||
405 | GNUNET_assert (NULL != mst); | ||
406 | |||
407 | struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &mst->pub_key; | ||
408 | struct GNUNET_HashCode *chan_key_hash = &mst->pub_key_hash; | ||
409 | |||
410 | message_cb (&mst->channel, chan_key, chan_key_hash, msg); | ||
411 | } | ||
412 | |||
413 | |||
414 | /** | ||
415 | * Incoming message fragment from multicast for a slave. | ||
416 | */ | ||
417 | static void | ||
418 | slave_message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | ||
431 | { | 419 | { |
432 | /* TODO */ | 420 | struct Slave *slv = cls; |
421 | GNUNET_assert (NULL != slv); | ||
422 | |||
423 | struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &slv->chan_key; | ||
424 | struct GNUNET_HashCode *chan_key_hash = &slv->chan_key_hash; | ||
433 | 425 | ||
434 | return GNUNET_YES; | 426 | message_cb (&slv->channel, chan_key, chan_key_hash, msg); |
435 | } | 427 | } |
436 | 428 | ||
437 | 429 | ||
430 | /** | ||
431 | * Incoming request fragment from multicast for a master. | ||
432 | * | ||
433 | * @param cls Master. | ||
434 | * @param member_key Sending member's public key. | ||
435 | * @param msg The message. | ||
436 | * @param flags Request flags. | ||
437 | */ | ||
438 | static void | 438 | static void |
439 | request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | 439 | request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, |
440 | const struct GNUNET_MessageHeader *req, | 440 | const struct GNUNET_MessageHeader *msg, |
441 | enum GNUNET_MULTICAST_MessageFlags flags) | 441 | enum GNUNET_MULTICAST_MessageFlags flags) |
442 | { | 442 | { |
443 | struct Master *mst = cls; | ||
444 | struct Channel *ch = &mst->channel; | ||
443 | 445 | ||
446 | uint16_t type = ntohs (msg->type); | ||
447 | uint16_t size = ntohs (msg->size); | ||
448 | |||
449 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
450 | "%p Received request of type %u and size %u from multicast.\n", | ||
451 | ch, type, size); | ||
452 | |||
453 | switch (type) | ||
454 | { | ||
455 | case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: | ||
456 | { | ||
457 | const struct GNUNET_MULTICAST_RequestHeader *req | ||
458 | = (const struct GNUNET_MULTICAST_RequestHeader *) msg; | ||
459 | |||
460 | if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*req), | ||
461 | (const char *) &req[1])) | ||
462 | { | ||
463 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
464 | "%p Dropping message with invalid parts " | ||
465 | "received from multicast.\n", ch); | ||
466 | GNUNET_break_op (0); | ||
467 | break; | ||
468 | } | ||
469 | |||
470 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
471 | uint16_t psize = sizeof (*pmsg) + size - sizeof (*req); | ||
472 | pmsg = GNUNET_malloc (psize); | ||
473 | pmsg->header.size = htons (psize); | ||
474 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
475 | pmsg->message_id = req->request_id; | ||
476 | pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); | ||
477 | |||
478 | memcpy (&pmsg[1], &req[1], size - sizeof (*req)); | ||
479 | |||
480 | GNUNET_SERVER_notification_context_add (nc, ch->client); | ||
481 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, | ||
482 | (const struct GNUNET_MessageHeader *) pmsg, | ||
483 | GNUNET_NO); | ||
484 | GNUNET_free (pmsg); | ||
485 | break; | ||
486 | } | ||
487 | default: | ||
488 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
489 | "%p Dropping unknown request of type %u and size %u.\n", | ||
490 | ch, type, size); | ||
491 | GNUNET_break_op (0); | ||
492 | } | ||
444 | } | 493 | } |
445 | 494 | ||
446 | 495 | ||
@@ -470,7 +519,8 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, | |||
470 | max_fragment_id + 1, | 519 | max_fragment_id + 1, |
471 | join_cb, membership_test_cb, | 520 | join_cb, membership_test_cb, |
472 | replay_fragment_cb, replay_message_cb, | 521 | replay_fragment_cb, replay_message_cb, |
473 | request_cb, message_cb, ch); | 522 | request_cb, master_message_cb, ch); |
523 | ch->ready = GNUNET_YES; | ||
474 | } | 524 | } |
475 | GNUNET_SERVER_notification_context_add (nc, ch->client); | 525 | GNUNET_SERVER_notification_context_add (nc, ch->client); |
476 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, | 526 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, |
@@ -505,7 +555,8 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, | |||
505 | slv->join_req, join_cb, | 555 | slv->join_req, join_cb, |
506 | membership_test_cb, | 556 | membership_test_cb, |
507 | replay_fragment_cb, replay_message_cb, | 557 | replay_fragment_cb, replay_message_cb, |
508 | message_cb, ch); | 558 | slave_message_cb, ch); |
559 | ch->ready = GNUNET_YES; | ||
509 | } | 560 | } |
510 | 561 | ||
511 | GNUNET_SERVER_notification_context_add (nc, ch->client); | 562 | GNUNET_SERVER_notification_context_add (nc, ch->client); |
@@ -529,9 +580,11 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, | |||
529 | mst->channel.is_master = GNUNET_YES; | 580 | mst->channel.is_master = GNUNET_YES; |
530 | mst->policy = ntohl (req->policy); | 581 | mst->policy = ntohl (req->policy); |
531 | mst->priv_key = req->channel_key; | 582 | mst->priv_key = req->channel_key; |
532 | GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, | 583 | GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &mst->pub_key); |
533 | &mst->pub_key); | ||
534 | GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash); | 584 | GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash); |
585 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
586 | "%p Master connected to channel %s.\n", | ||
587 | mst, GNUNET_h2s (&mst->pub_key_hash)); | ||
535 | 588 | ||
536 | GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key, | 589 | GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key, |
537 | master_counters_cb, mst); | 590 | master_counters_cb, mst); |
@@ -561,14 +614,20 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
561 | &slv->chan_key_hash); | 614 | &slv->chan_key_hash); |
562 | slv->origin = req->origin; | 615 | slv->origin = req->origin; |
563 | slv->relay_count = ntohl (req->relay_count); | 616 | slv->relay_count = ntohl (req->relay_count); |
617 | if (0 < slv->relay_count) | ||
618 | { | ||
619 | const struct GNUNET_PeerIdentity *relays | ||
620 | = (const struct GNUNET_PeerIdentity *) &req[1]; | ||
621 | slv->relays | ||
622 | = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity)); | ||
623 | uint32_t i; | ||
624 | for (i = 0; i < slv->relay_count; i++) | ||
625 | memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); | ||
626 | } | ||
564 | 627 | ||
565 | const struct GNUNET_PeerIdentity *relays | 628 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
566 | = (const struct GNUNET_PeerIdentity *) &req[1]; | 629 | "%p Slave connected to channel %s.\n", |
567 | slv->relays | 630 | slv, GNUNET_h2s (&slv->chan_key_hash)); |
568 | = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity)); | ||
569 | uint32_t i; | ||
570 | for (i = 0; i < slv->relay_count; i++) | ||
571 | memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); | ||
572 | 631 | ||
573 | GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key, | 632 | GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key, |
574 | slave_counters_cb, slv); | 633 | slave_counters_cb, slv); |
@@ -609,13 +668,14 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
609 | 668 | ||
610 | if (NULL == tmit_msg || *data_size < tmit_msg->size) | 669 | if (NULL == tmit_msg || *data_size < tmit_msg->size) |
611 | { | 670 | { |
612 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n"); | 671 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
672 | "%p transmit_notify: nothing to send.\n", ch); | ||
613 | *data_size = 0; | 673 | *data_size = 0; |
614 | return GNUNET_NO; | 674 | return GNUNET_NO; |
615 | } | 675 | } |
616 | 676 | ||
617 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 677 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
618 | "transmit_notify: sending %u bytes.\n", tmit_msg->size); | 678 | "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size); |
619 | 679 | ||
620 | *data_size = tmit_msg->size; | 680 | *data_size = tmit_msg->size; |
621 | memcpy (data, tmit_msg->buf, *data_size); | 681 | memcpy (data, tmit_msg->buf, *data_size); |
@@ -630,7 +690,7 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
630 | { | 690 | { |
631 | if (NULL != ch->tmit_head) | 691 | if (NULL != ch->tmit_head) |
632 | { | 692 | { |
633 | transmit_message (ch); | 693 | transmit_message (ch, GNUNET_NO); |
634 | } | 694 | } |
635 | else if (ch->disconnected) | 695 | else if (ch->disconnected) |
636 | { | 696 | { |
@@ -644,19 +704,55 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
644 | 704 | ||
645 | 705 | ||
646 | /** | 706 | /** |
707 | * Callback for the transmit functions of multicast. | ||
708 | */ | ||
709 | static int | ||
710 | master_transmit_notify (void *cls, size_t *data_size, void *data) | ||
711 | { | ||
712 | int ret = transmit_notify (cls, data_size, data); | ||
713 | |||
714 | if (GNUNET_YES == ret) | ||
715 | { | ||
716 | struct Master *mst = cls; | ||
717 | mst->tmit_handle = NULL; | ||
718 | } | ||
719 | return ret; | ||
720 | } | ||
721 | |||
722 | |||
723 | /** | ||
724 | * Callback for the transmit functions of multicast. | ||
725 | */ | ||
726 | static int | ||
727 | slave_transmit_notify (void *cls, size_t *data_size, void *data) | ||
728 | { | ||
729 | int ret = transmit_notify (cls, data_size, data); | ||
730 | |||
731 | if (GNUNET_YES == ret) | ||
732 | { | ||
733 | struct Slave *slv = cls; | ||
734 | slv->tmit_handle = NULL; | ||
735 | } | ||
736 | return ret; | ||
737 | } | ||
738 | |||
739 | |||
740 | /** | ||
647 | * Transmit a message from a channel master to the multicast group. | 741 | * Transmit a message from a channel master to the multicast group. |
648 | */ | 742 | */ |
649 | static void | 743 | static void |
650 | master_transmit_message (struct Master *mst) | 744 | master_transmit_message (struct Master *mst, uint8_t inc_msg_id) |
651 | { | 745 | { |
652 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n"); | 746 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst); |
653 | mst->channel.tmit_task = 0; | 747 | mst->channel.tmit_task = 0; |
654 | if (NULL == mst->tmit_handle) | 748 | if (NULL == mst->tmit_handle) |
655 | { | 749 | { |
750 | if (GNUNET_YES == inc_msg_id) | ||
751 | mst->max_message_id++; | ||
656 | mst->tmit_handle | 752 | mst->tmit_handle |
657 | = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id, | 753 | = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, |
658 | mst->max_group_generation, | 754 | mst->max_group_generation, |
659 | transmit_notify, mst); | 755 | master_transmit_notify, mst); |
660 | } | 756 | } |
661 | else | 757 | else |
662 | { | 758 | { |
@@ -669,14 +765,16 @@ master_transmit_message (struct Master *mst) | |||
669 | * Transmit a message from a channel slave to the multicast group. | 765 | * Transmit a message from a channel slave to the multicast group. |
670 | */ | 766 | */ |
671 | static void | 767 | static void |
672 | slave_transmit_message (struct Slave *slv) | 768 | slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id) |
673 | { | 769 | { |
674 | slv->channel.tmit_task = 0; | 770 | slv->channel.tmit_task = 0; |
675 | if (NULL == slv->tmit_handle) | 771 | if (NULL == slv->tmit_handle) |
676 | { | 772 | { |
773 | if (GNUNET_YES == inc_msg_id) | ||
774 | slv->max_message_id++; | ||
677 | slv->tmit_handle | 775 | slv->tmit_handle |
678 | = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id, | 776 | = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id, |
679 | transmit_notify, slv); | 777 | slave_transmit_notify, slv); |
680 | } | 778 | } |
681 | else | 779 | else |
682 | { | 780 | { |
@@ -686,11 +784,11 @@ slave_transmit_message (struct Slave *slv) | |||
686 | 784 | ||
687 | 785 | ||
688 | static inline void | 786 | static inline void |
689 | transmit_message (struct Channel *ch) | 787 | transmit_message (struct Channel *ch, uint8_t inc_msg_id) |
690 | { | 788 | { |
691 | ch->is_master | 789 | ch->is_master |
692 | ? master_transmit_message ((struct Master *) ch) | 790 | ? master_transmit_message ((struct Master *) ch, inc_msg_id) |
693 | : slave_transmit_message ((struct Slave *) ch); | 791 | : slave_transmit_message ((struct Slave *) ch, inc_msg_id); |
694 | } | 792 | } |
695 | 793 | ||
696 | 794 | ||
@@ -708,10 +806,9 @@ transmit_error (struct Channel *ch) | |||
708 | tmit_msg->size = sizeof (*msg); | 806 | tmit_msg->size = sizeof (*msg); |
709 | tmit_msg->state = ch->tmit_state; | 807 | tmit_msg->state = ch->tmit_state; |
710 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); | 808 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); |
711 | transmit_message (ch); | 809 | transmit_message (ch, GNUNET_NO); |
712 | 810 | ||
713 | /* FIXME: cleanup */ | 811 | /* FIXME: cleanup */ |
714 | GNUNET_SERVER_client_disconnect (ch->client); | ||
715 | } | 812 | } |
716 | 813 | ||
717 | 814 | ||
@@ -720,40 +817,60 @@ transmit_error (struct Channel *ch) | |||
720 | */ | 817 | */ |
721 | static void | 818 | static void |
722 | handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, | 819 | handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, |
723 | const struct GNUNET_MessageHeader *msg) | 820 | const struct GNUNET_MessageHeader *msg) |
724 | { | 821 | { |
725 | struct Channel *ch | 822 | struct Channel *ch |
726 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 823 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
727 | GNUNET_assert (NULL != ch); | 824 | GNUNET_assert (NULL != ch); |
728 | 825 | ||
826 | if (GNUNET_YES != ch->ready) | ||
827 | { | ||
828 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
829 | "%p Ignoring message from client, channel is not ready yet.\n", | ||
830 | ch); | ||
831 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
832 | return; | ||
833 | } | ||
834 | |||
835 | uint8_t inc_msg_id = GNUNET_NO; | ||
729 | uint16_t size = ntohs (msg->size); | 836 | uint16_t size = ntohs (msg->size); |
730 | uint16_t psize = 0, pos = 0; | 837 | uint16_t psize = 0, ptype = 0, pos = 0; |
731 | 838 | ||
732 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) | 839 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) |
733 | { | 840 | { |
734 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n"); | 841 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch); |
735 | GNUNET_break (0); | 842 | GNUNET_break (0); |
736 | transmit_error (ch); | 843 | transmit_error (ch); |
844 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
737 | return; | 845 | return; |
738 | } | 846 | } |
739 | 847 | ||
848 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
849 | "%p Received message from client.\n", ch); | ||
850 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); | ||
851 | |||
740 | for (pos = 0; sizeof (*msg) + pos < size; pos += psize) | 852 | for (pos = 0; sizeof (*msg) + pos < size; pos += psize) |
741 | { | 853 | { |
742 | const struct GNUNET_MessageHeader *pmsg | 854 | const struct GNUNET_MessageHeader *pmsg |
743 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | 855 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); |
744 | psize = ntohs (pmsg->size); | 856 | psize = ntohs (pmsg->size); |
745 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 857 | ptype = ntohs (pmsg->type); |
746 | "Received message part of type %u and size %u " | ||
747 | "from client.\n", ntohs (pmsg->type), psize); | ||
748 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) | 858 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) |
749 | { | 859 | { |
750 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 860 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
751 | "Received invalid message part of type %u and size %u " | 861 | "%p Received invalid message part of type %u and size %u " |
752 | "from client.\n", ntohs (pmsg->type), psize); | 862 | "from client.\n", ch, ptype, psize); |
753 | GNUNET_break (0); | 863 | GNUNET_break (0); |
754 | transmit_error (ch); | 864 | transmit_error (ch); |
865 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
755 | return; | 866 | return; |
756 | } | 867 | } |
868 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
869 | "%p Received message part from client.\n", ch); | ||
870 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); | ||
871 | |||
872 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype) | ||
873 | inc_msg_id = GNUNET_YES; | ||
757 | } | 874 | } |
758 | 875 | ||
759 | size -= sizeof (*msg); | 876 | size -= sizeof (*msg); |
@@ -763,7 +880,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, | |||
763 | tmit_msg->size = size; | 880 | tmit_msg->size = size; |
764 | tmit_msg->state = ch->tmit_state; | 881 | tmit_msg->state = ch->tmit_state; |
765 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); | 882 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); |
766 | transmit_message (ch); | 883 | transmit_message (ch, inc_msg_id); |
767 | 884 | ||
768 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 885 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
769 | }; | 886 | }; |
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 90c07480a..1ffda5d08 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h | |||
@@ -27,7 +27,16 @@ | |||
27 | #ifndef PSYC_H | 27 | #ifndef PSYC_H |
28 | #define PSYC_H | 28 | #define PSYC_H |
29 | 29 | ||
30 | #include "gnunet_common.h" | 30 | #include "platform.h" |
31 | #include "gnunet_psyc_service.h" | ||
32 | |||
33 | |||
34 | int | ||
35 | GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data); | ||
36 | |||
37 | void | ||
38 | GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, | ||
39 | const struct GNUNET_MessageHeader *msg); | ||
31 | 40 | ||
32 | 41 | ||
33 | enum MessageState | 42 | enum MessageState |
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 20394bbce..8a1c9ffaa 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -48,12 +48,30 @@ struct OperationHandle | |||
48 | struct GNUNET_MessageHeader *msg; | 48 | struct GNUNET_MessageHeader *msg; |
49 | }; | 49 | }; |
50 | 50 | ||
51 | |||
52 | /** | ||
53 | * Handle for a pending PSYC transmission operation. | ||
54 | */ | ||
55 | struct GNUNET_PSYC_ChannelTransmitHandle | ||
56 | { | ||
57 | struct GNUNET_PSYC_Channel *ch; | ||
58 | GNUNET_PSYC_TransmitNotifyModifier notify_mod; | ||
59 | GNUNET_PSYC_TransmitNotifyData notify_data; | ||
60 | void *notify_cls; | ||
61 | enum MessageState state; | ||
62 | }; | ||
63 | |||
51 | /** | 64 | /** |
52 | * Handle to access PSYC channel operations for both the master and slaves. | 65 | * Handle to access PSYC channel operations for both the master and slaves. |
53 | */ | 66 | */ |
54 | struct GNUNET_PSYC_Channel | 67 | struct GNUNET_PSYC_Channel |
55 | { | 68 | { |
56 | /** | 69 | /** |
70 | * Transmission handle; | ||
71 | */ | ||
72 | struct GNUNET_PSYC_ChannelTransmitHandle tmit; | ||
73 | |||
74 | /** | ||
57 | * Configuration to use. | 75 | * Configuration to use. |
58 | */ | 76 | */ |
59 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 77 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
@@ -124,6 +142,11 @@ struct GNUNET_PSYC_Channel | |||
124 | uint64_t recv_message_id; | 142 | uint64_t recv_message_id; |
125 | 143 | ||
126 | /** | 144 | /** |
145 | * Public key of the slave from which a message is being received. | ||
146 | */ | ||
147 | struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key; | ||
148 | |||
149 | /** | ||
127 | * State of the currently being received message from the PSYC service. | 150 | * State of the currently being received message from the PSYC service. |
128 | */ | 151 | */ |
129 | enum MessageState recv_state; | 152 | enum MessageState recv_state; |
@@ -171,27 +194,12 @@ struct GNUNET_PSYC_Channel | |||
171 | 194 | ||
172 | 195 | ||
173 | /** | 196 | /** |
174 | * Handle for a pending PSYC transmission operation. | ||
175 | */ | ||
176 | struct GNUNET_PSYC_MasterTransmitHandle | ||
177 | { | ||
178 | struct GNUNET_PSYC_Master *master; | ||
179 | GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod; | ||
180 | GNUNET_PSYC_MasterTransmitNotify notify_data; | ||
181 | void *notify_cls; | ||
182 | enum MessageState state; | ||
183 | }; | ||
184 | |||
185 | |||
186 | /** | ||
187 | * Handle for the master of a PSYC channel. | 197 | * Handle for the master of a PSYC channel. |
188 | */ | 198 | */ |
189 | struct GNUNET_PSYC_Master | 199 | struct GNUNET_PSYC_Master |
190 | { | 200 | { |
191 | struct GNUNET_PSYC_Channel ch; | 201 | struct GNUNET_PSYC_Channel ch; |
192 | 202 | ||
193 | struct GNUNET_PSYC_MasterTransmitHandle *tmit; | ||
194 | |||
195 | GNUNET_PSYC_MasterStartCallback start_cb; | 203 | GNUNET_PSYC_MasterStartCallback start_cb; |
196 | 204 | ||
197 | uint64_t max_message_id; | 205 | uint64_t max_message_id; |
@@ -204,6 +212,10 @@ struct GNUNET_PSYC_Master | |||
204 | struct GNUNET_PSYC_Slave | 212 | struct GNUNET_PSYC_Slave |
205 | { | 213 | { |
206 | struct GNUNET_PSYC_Channel ch; | 214 | struct GNUNET_PSYC_Channel ch; |
215 | |||
216 | GNUNET_PSYC_SlaveJoinCallback join_cb; | ||
217 | |||
218 | uint64_t max_message_id; | ||
207 | }; | 219 | }; |
208 | 220 | ||
209 | 221 | ||
@@ -251,7 +263,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | |||
251 | 263 | ||
252 | 264 | ||
253 | static void | 265 | static void |
254 | master_transmit_data (struct GNUNET_PSYC_Master *mst); | 266 | channel_transmit_data (struct GNUNET_PSYC_Channel *ch); |
255 | 267 | ||
256 | 268 | ||
257 | /** | 269 | /** |
@@ -302,7 +314,8 @@ recv_reset (struct GNUNET_PSYC_Channel *ch) | |||
302 | ch->recv_state = MSG_STATE_START; | 314 | ch->recv_state = MSG_STATE_START; |
303 | ch->recv_flags = 0; | 315 | ch->recv_flags = 0; |
304 | ch->recv_message_id = 0; | 316 | ch->recv_message_id = 0; |
305 | ch->recv_mod_value_size =0; | 317 | //FIXME: ch->recv_slave_key = { 0 }; |
318 | ch->recv_mod_value_size = 0; | ||
306 | ch->recv_mod_value_size_expected = 0; | 319 | ch->recv_mod_value_size_expected = 0; |
307 | } | 320 | } |
308 | 321 | ||
@@ -379,8 +392,9 @@ queue_message (struct GNUNET_PSYC_Channel *ch, | |||
379 | } | 392 | } |
380 | 393 | ||
381 | if (NULL != op | 394 | if (NULL != op |
382 | && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD | 395 | && (GNUNET_YES == end |
383 | < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) | 396 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD |
397 | < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) | ||
384 | { | 398 | { |
385 | /* End of message or buffer is full, add it to transmission queue. */ | 399 | /* End of message or buffer is full, add it to transmission queue. */ |
386 | op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | 400 | op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); |
@@ -390,6 +404,9 @@ queue_message (struct GNUNET_PSYC_Channel *ch, | |||
390 | ch->tmit_ack_pending++; | 404 | ch->tmit_ack_pending++; |
391 | } | 405 | } |
392 | 406 | ||
407 | if (GNUNET_YES == end) | ||
408 | ch->in_transmit = GNUNET_NO; | ||
409 | |||
393 | transmit_next (ch); | 410 | transmit_next (ch); |
394 | } | 411 | } |
395 | 412 | ||
@@ -400,15 +417,14 @@ queue_message (struct GNUNET_PSYC_Channel *ch, | |||
400 | * @param mst Master handle. | 417 | * @param mst Master handle. |
401 | */ | 418 | */ |
402 | static void | 419 | static void |
403 | master_transmit_mod (struct GNUNET_PSYC_Master *mst) | 420 | channel_transmit_mod (struct GNUNET_PSYC_Channel *ch) |
404 | { | 421 | { |
405 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | ||
406 | uint16_t max_data_size, data_size; | 422 | uint16_t max_data_size, data_size; |
407 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | 423 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; |
408 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; | 424 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; |
409 | int notify_ret; | 425 | int notify_ret; |
410 | 426 | ||
411 | switch (mst->tmit->state) | 427 | switch (ch->tmit.state) |
412 | { | 428 | { |
413 | case MSG_STATE_MODIFIER: | 429 | case MSG_STATE_MODIFIER: |
414 | { | 430 | { |
@@ -417,12 +433,11 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) | |||
417 | max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; | 433 | max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; |
418 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | 434 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); |
419 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); | 435 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); |
420 | notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, | 436 | notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, |
421 | &data_size, &mod[1], &mod->oper); | 437 | &data_size, &mod[1], &mod->oper); |
422 | mod->name_size = strnlen ((char *) &mod[1], data_size); | 438 | mod->name_size = strnlen ((char *) &mod[1], data_size); |
423 | if (mod->name_size < data_size) | 439 | if (mod->name_size < data_size) |
424 | { | 440 | { |
425 | mod->oper = htons (mod->oper); | ||
426 | mod->value_size = htons (data_size - 1 - mod->name_size); | 441 | mod->value_size = htons (data_size - 1 - mod->name_size); |
427 | mod->name_size = htons (mod->name_size); | 442 | mod->name_size = htons (mod->name_size); |
428 | } | 443 | } |
@@ -438,8 +453,8 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) | |||
438 | max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; | 453 | max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; |
439 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | 454 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); |
440 | msg->size = sizeof (struct GNUNET_MessageHeader); | 455 | msg->size = sizeof (struct GNUNET_MessageHeader); |
441 | notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, | 456 | notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, |
442 | &data_size, &msg[1], NULL); | 457 | &data_size, &msg[1], NULL); |
443 | break; | 458 | break; |
444 | } | 459 | } |
445 | default: | 460 | default: |
@@ -454,27 +469,28 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) | |||
454 | ch->tmit_paused = GNUNET_YES; | 469 | ch->tmit_paused = GNUNET_YES; |
455 | return; | 470 | return; |
456 | } | 471 | } |
457 | mst->tmit->state = MSG_STATE_MOD_CONT; | 472 | ch->tmit.state = MSG_STATE_MOD_CONT; |
458 | break; | 473 | break; |
459 | 474 | ||
460 | case GNUNET_YES: | 475 | case GNUNET_YES: |
461 | if (0 == data_size) | 476 | if (0 == data_size) |
462 | { | 477 | { |
463 | /* End of modifiers. */ | 478 | /* End of modifiers. */ |
464 | mst->tmit->state = MSG_STATE_DATA; | 479 | ch->tmit.state = MSG_STATE_DATA; |
465 | if (0 == ch->tmit_ack_pending) | 480 | if (0 == ch->tmit_ack_pending) |
466 | master_transmit_data (mst); | 481 | channel_transmit_data (ch); |
467 | 482 | ||
468 | return; | 483 | return; |
469 | } | 484 | } |
470 | mst->tmit->state = MSG_STATE_MODIFIER; | 485 | ch->tmit.state = MSG_STATE_MODIFIER; |
471 | break; | 486 | break; |
472 | 487 | ||
473 | default: | 488 | default: |
474 | LOG (GNUNET_ERROR_TYPE_ERROR, | 489 | LOG (GNUNET_ERROR_TYPE_ERROR, |
475 | "MasterTransmitNotify returned error when requesting a modifier.\n"); | 490 | "MasterTransmitNotifyModifier returned error " |
491 | "when requesting a modifier.\n"); | ||
476 | 492 | ||
477 | mst->tmit->state = MSG_STATE_START; | 493 | ch->tmit.state = MSG_STATE_CANCEL; |
478 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 494 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
479 | msg->size = htons (sizeof (*msg)); | 495 | msg->size = htons (sizeof (*msg)); |
480 | 496 | ||
@@ -489,7 +505,7 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) | |||
489 | queue_message (ch, msg, GNUNET_NO); | 505 | queue_message (ch, msg, GNUNET_NO); |
490 | } | 506 | } |
491 | 507 | ||
492 | master_transmit_mod (mst); | 508 | channel_transmit_mod (ch); |
493 | } | 509 | } |
494 | 510 | ||
495 | 511 | ||
@@ -499,17 +515,16 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) | |||
499 | * @param mst Master handle. | 515 | * @param mst Master handle. |
500 | */ | 516 | */ |
501 | static void | 517 | static void |
502 | master_transmit_data (struct GNUNET_PSYC_Master *mst) | 518 | channel_transmit_data (struct GNUNET_PSYC_Channel *ch) |
503 | { | 519 | { |
504 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | ||
505 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | 520 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; |
506 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | 521 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; |
507 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; | 522 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; |
508 | 523 | ||
509 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | 524 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); |
510 | 525 | ||
511 | int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, | 526 | int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls, |
512 | &data_size, &msg[1]); | 527 | &data_size, &msg[1]); |
513 | switch (notify_ret) | 528 | switch (notify_ret) |
514 | { | 529 | { |
515 | case GNUNET_NO: | 530 | case GNUNET_NO: |
@@ -522,14 +537,14 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
522 | break; | 537 | break; |
523 | 538 | ||
524 | case GNUNET_YES: | 539 | case GNUNET_YES: |
525 | mst->tmit->state = MSG_STATE_START; | 540 | ch->tmit.state = MSG_STATE_END; |
526 | break; | 541 | break; |
527 | 542 | ||
528 | default: | 543 | default: |
529 | LOG (GNUNET_ERROR_TYPE_ERROR, | 544 | LOG (GNUNET_ERROR_TYPE_ERROR, |
530 | "MasterTransmitNotify returned error when requesting data.\n"); | 545 | "MasterTransmitNotify returned error when requesting data.\n"); |
531 | 546 | ||
532 | mst->tmit->state = MSG_STATE_START; | 547 | ch->tmit.state = MSG_STATE_CANCEL; |
533 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 548 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
534 | msg->size = htons (sizeof (*msg)); | 549 | msg->size = htons (sizeof (*msg)); |
535 | queue_message (ch, msg, GNUNET_YES); | 550 | queue_message (ch, msg, GNUNET_YES); |
@@ -554,6 +569,86 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
554 | 569 | ||
555 | 570 | ||
556 | /** | 571 | /** |
572 | * Send a message to a channel. | ||
573 | * | ||
574 | * @param ch Handle to the PSYC channel. | ||
575 | * @param method_name Which method should be invoked. | ||
576 | * @param notify_mod Function to call to obtain modifiers. | ||
577 | * @param notify_data Function to call to obtain fragments of the data. | ||
578 | * @param notify_cls Closure for @a notify_mod and @a notify_data. | ||
579 | * @param flags Flags for the message being transmitted. | ||
580 | * @return Transmission handle, NULL on error (i.e. more than one request queued). | ||
581 | */ | ||
582 | static struct GNUNET_PSYC_ChannelTransmitHandle * | ||
583 | channel_transmit (struct GNUNET_PSYC_Channel *ch, | ||
584 | const char *method_name, | ||
585 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, | ||
586 | GNUNET_PSYC_TransmitNotifyData notify_data, | ||
587 | void *notify_cls, | ||
588 | uint32_t flags) | ||
589 | { | ||
590 | if (GNUNET_NO != ch->in_transmit) | ||
591 | return NULL; | ||
592 | ch->in_transmit = GNUNET_YES; | ||
593 | |||
594 | size_t size = strlen (method_name) + 1; | ||
595 | struct GNUNET_PSYC_MessageMethod *pmeth; | ||
596 | struct OperationHandle *op; | ||
597 | |||
598 | ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) | ||
599 | + sizeof (*pmeth) + size); | ||
600 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
601 | op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size; | ||
602 | |||
603 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1]; | ||
604 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | ||
605 | pmeth->header.size = htons (sizeof (*pmeth) + size); | ||
606 | pmeth->flags = htonl (flags); | ||
607 | memcpy (&pmeth[1], method_name, size); | ||
608 | |||
609 | ch->tmit.ch = ch; | ||
610 | ch->tmit.notify_mod = notify_mod; | ||
611 | ch->tmit.notify_data = notify_data; | ||
612 | ch->tmit.notify_cls = notify_cls; | ||
613 | ch->tmit.state = MSG_STATE_MODIFIER; | ||
614 | |||
615 | channel_transmit_mod (ch); | ||
616 | return &ch->tmit; | ||
617 | } | ||
618 | |||
619 | |||
620 | /** | ||
621 | * Resume transmission to the channel. | ||
622 | * | ||
623 | * @param th Handle of the request that is being resumed. | ||
624 | */ | ||
625 | static void | ||
626 | channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th) | ||
627 | { | ||
628 | struct GNUNET_PSYC_Channel *ch = th->ch; | ||
629 | if (0 == ch->tmit_ack_pending) | ||
630 | { | ||
631 | ch->tmit_paused = GNUNET_NO; | ||
632 | channel_transmit_data (ch); | ||
633 | } | ||
634 | } | ||
635 | |||
636 | |||
637 | /** | ||
638 | * Abort transmission request to channel. | ||
639 | * | ||
640 | * @param th Handle of the request that is being aborted. | ||
641 | */ | ||
642 | static void | ||
643 | channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th) | ||
644 | { | ||
645 | struct GNUNET_PSYC_Channel *ch = th->ch; | ||
646 | if (GNUNET_NO == ch->in_transmit) | ||
647 | return; | ||
648 | } | ||
649 | |||
650 | |||
651 | /** | ||
557 | * Handle incoming message from the PSYC service. | 652 | * Handle incoming message from the PSYC service. |
558 | * | 653 | * |
559 | * @param ch The channel the message is sent to. | 654 | * @param ch The channel the message is sent to. |
@@ -564,14 +659,20 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
564 | const struct GNUNET_PSYC_MessageHeader *msg) | 659 | const struct GNUNET_PSYC_MessageHeader *msg) |
565 | { | 660 | { |
566 | uint16_t size = ntohs (msg->header.size); | 661 | uint16_t size = ntohs (msg->header.size); |
662 | uint32_t flags = ntohl (msg->flags); | ||
663 | |||
664 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, | ||
665 | (struct GNUNET_MessageHeader *) msg); | ||
567 | 666 | ||
568 | if (MSG_STATE_START == ch->recv_state) | 667 | if (MSG_STATE_START == ch->recv_state) |
569 | { | 668 | { |
570 | ch->recv_message_id = GNUNET_ntohll (msg->message_id); | 669 | ch->recv_message_id = GNUNET_ntohll (msg->message_id); |
571 | ch->recv_flags = ntohl (msg->flags); | 670 | ch->recv_flags = flags; |
671 | ch->recv_slave_key = msg->slave_key; | ||
572 | } | 672 | } |
573 | else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) | 673 | else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) |
574 | { | 674 | { |
675 | // FIXME | ||
575 | LOG (GNUNET_ERROR_TYPE_WARNING, | 676 | LOG (GNUNET_ERROR_TYPE_WARNING, |
576 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", | 677 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", |
577 | GNUNET_ntohll (msg->message_id), ch->recv_message_id); | 678 | GNUNET_ntohll (msg->message_id), ch->recv_message_id); |
@@ -579,11 +680,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
579 | recv_error (ch); | 680 | recv_error (ch); |
580 | return; | 681 | return; |
581 | } | 682 | } |
582 | else if (ntohl (msg->flags) != ch->recv_flags) | 683 | else if (flags != ch->recv_flags) |
583 | { | 684 | { |
584 | LOG (GNUNET_ERROR_TYPE_WARNING, | 685 | LOG (GNUNET_ERROR_TYPE_WARNING, |
585 | "Unexpected message flags. Got: %lu, expected: %lu\n", | 686 | "Unexpected message flags. Got: %lu, expected: %lu\n", |
586 | ntohl (msg->flags), ch->recv_flags); | 687 | flags, ch->recv_flags); |
587 | GNUNET_break_op (0); | 688 | GNUNET_break_op (0); |
588 | recv_error (ch); | 689 | recv_error (ch); |
589 | return; | 690 | return; |
@@ -599,10 +700,6 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
599 | ptype = ntohs (pmsg->type); | 700 | ptype = ntohs (pmsg->type); |
600 | size_eq = size_min = 0; | 701 | size_eq = size_min = 0; |
601 | 702 | ||
602 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
603 | "Received message part of type %u and size %u from PSYC.\n", | ||
604 | ptype, psize); | ||
605 | |||
606 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) | 703 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) |
607 | { | 704 | { |
608 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 705 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
@@ -612,6 +709,10 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
612 | return; | 709 | return; |
613 | } | 710 | } |
614 | 711 | ||
712 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
713 | "Received message part from PSYC.\n"); | ||
714 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); | ||
715 | |||
615 | switch (ptype) | 716 | switch (ptype) |
616 | { | 717 | { |
617 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 718 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: |
@@ -758,6 +859,46 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
758 | 859 | ||
759 | 860 | ||
760 | /** | 861 | /** |
862 | * Handle incoming message acknowledgement from the PSYC service. | ||
863 | * | ||
864 | * @param ch The channel the acknowledgement is sent to. | ||
865 | */ | ||
866 | static void | ||
867 | handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch) | ||
868 | { | ||
869 | if (0 == ch->tmit_ack_pending) | ||
870 | { | ||
871 | LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); | ||
872 | GNUNET_break (0); | ||
873 | return; | ||
874 | } | ||
875 | ch->tmit_ack_pending--; | ||
876 | |||
877 | switch (ch->tmit.state) | ||
878 | { | ||
879 | case MSG_STATE_MODIFIER: | ||
880 | case MSG_STATE_MOD_CONT: | ||
881 | if (GNUNET_NO == ch->tmit_paused) | ||
882 | channel_transmit_mod (ch); | ||
883 | break; | ||
884 | |||
885 | case MSG_STATE_DATA: | ||
886 | if (GNUNET_NO == ch->tmit_paused) | ||
887 | channel_transmit_data (ch); | ||
888 | break; | ||
889 | |||
890 | case MSG_STATE_END: | ||
891 | case MSG_STATE_CANCEL: | ||
892 | break; | ||
893 | |||
894 | default: | ||
895 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
896 | "Ignoring message ACK in state %u.\n", ch->tmit.state); | ||
897 | } | ||
898 | } | ||
899 | |||
900 | |||
901 | /** | ||
761 | * Type of a function to call when we receive a message | 902 | * Type of a function to call when we receive a message |
762 | * from the service. | 903 | * from the service. |
763 | * | 904 | * |
@@ -775,7 +916,7 @@ message_handler (void *cls, | |||
775 | 916 | ||
776 | if (NULL == msg) | 917 | if (NULL == msg) |
777 | { | 918 | { |
778 | GNUNET_break (0); | 919 | // timeout / disconnected from server, reconnect |
779 | reschedule_connect (ch); | 920 | reschedule_connect (ch); |
780 | return; | 921 | return; |
781 | } | 922 | } |
@@ -824,63 +965,15 @@ message_handler (void *cls, | |||
824 | } | 965 | } |
825 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: | 966 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: |
826 | { | 967 | { |
827 | #if TODO | ||
828 | struct CountersResult *cres = (struct CountersResult *) msg; | 968 | struct CountersResult *cres = (struct CountersResult *) msg; |
829 | slv->max_message_id = GNUNET_ntohll (cres->max_message_id); | 969 | slv->max_message_id = GNUNET_ntohll (cres->max_message_id); |
830 | if (NULL != slv->join_ack_cb) | 970 | if (NULL != slv->join_cb) |
831 | mst->join_ack_cb (ch->cb_cls, mst->max_message_id); | 971 | slv->join_cb (ch->cb_cls, slv->max_message_id); |
832 | #endif | ||
833 | break; | 972 | break; |
834 | } | 973 | } |
835 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: | 974 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: |
836 | { | 975 | { |
837 | if (0 == ch->tmit_ack_pending) | 976 | handle_psyc_message_ack (ch); |
838 | { | ||
839 | LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); | ||
840 | GNUNET_break (0); | ||
841 | break; | ||
842 | } | ||
843 | ch->tmit_ack_pending--; | ||
844 | |||
845 | if (ch->is_master) | ||
846 | { | ||
847 | GNUNET_assert (NULL != mst->tmit); | ||
848 | switch (mst->tmit->state) | ||
849 | { | ||
850 | case MSG_STATE_MODIFIER: | ||
851 | case MSG_STATE_MOD_CONT: | ||
852 | if (GNUNET_NO == ch->tmit_paused) | ||
853 | master_transmit_mod (mst); | ||
854 | break; | ||
855 | |||
856 | case MSG_STATE_DATA: | ||
857 | if (GNUNET_NO == ch->tmit_paused) | ||
858 | master_transmit_data (mst); | ||
859 | break; | ||
860 | |||
861 | case MSG_STATE_END: | ||
862 | case MSG_STATE_CANCEL: | ||
863 | if (NULL != mst->tmit) | ||
864 | { | ||
865 | GNUNET_free (mst->tmit); | ||
866 | mst->tmit = NULL; | ||
867 | } | ||
868 | else | ||
869 | { | ||
870 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
871 | "Ignoring message ACK, there's no transmission going on.\n"); | ||
872 | GNUNET_break (0); | ||
873 | } | ||
874 | break; | ||
875 | default: | ||
876 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
877 | "Ignoring message ACK in state %u.\n", mst->tmit->state); | ||
878 | } | ||
879 | } | ||
880 | else | ||
881 | { | ||
882 | /* TODO: slave */ | ||
883 | } | ||
884 | break; | 977 | break; |
885 | } | 978 | } |
886 | 979 | ||
@@ -1106,8 +1199,6 @@ void | |||
1106 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) | 1199 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) |
1107 | { | 1200 | { |
1108 | disconnect (master); | 1201 | disconnect (master); |
1109 | if (NULL != master->tmit) | ||
1110 | GNUNET_free (master->tmit); | ||
1111 | GNUNET_free (master); | 1202 | GNUNET_free (master); |
1112 | } | 1203 | } |
1113 | 1204 | ||
@@ -1162,41 +1253,14 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1162 | struct GNUNET_PSYC_MasterTransmitHandle * | 1253 | struct GNUNET_PSYC_MasterTransmitHandle * |
1163 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | 1254 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, |
1164 | const char *method_name, | 1255 | const char *method_name, |
1165 | GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod, | 1256 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, |
1166 | GNUNET_PSYC_MasterTransmitNotify notify_data, | 1257 | GNUNET_PSYC_TransmitNotifyData notify_data, |
1167 | void *notify_cls, | 1258 | void *notify_cls, |
1168 | enum GNUNET_PSYC_MasterTransmitFlags flags) | 1259 | enum GNUNET_PSYC_MasterTransmitFlags flags) |
1169 | { | 1260 | { |
1170 | GNUNET_assert (NULL != master); | 1261 | return (struct GNUNET_PSYC_MasterTransmitHandle *) |
1171 | struct GNUNET_PSYC_Channel *ch = &master->ch; | 1262 | channel_transmit (&master->ch, method_name, notify_mod, notify_data, |
1172 | if (GNUNET_NO != ch->in_transmit) | 1263 | notify_cls, flags); |
1173 | return NULL; | ||
1174 | ch->in_transmit = GNUNET_YES; | ||
1175 | |||
1176 | size_t size = strlen (method_name) + 1; | ||
1177 | struct GNUNET_PSYC_MessageMethod *pmeth; | ||
1178 | struct OperationHandle *op; | ||
1179 | |||
1180 | ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) | ||
1181 | + sizeof (*pmeth) + size); | ||
1182 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
1183 | op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size; | ||
1184 | |||
1185 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1]; | ||
1186 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | ||
1187 | pmeth->header.size = htons (sizeof (*pmeth) + size); | ||
1188 | pmeth->flags = htonl (flags); | ||
1189 | memcpy (&pmeth[1], method_name, size); | ||
1190 | |||
1191 | master->tmit = GNUNET_malloc (sizeof (*master->tmit)); | ||
1192 | master->tmit->master = master; | ||
1193 | master->tmit->notify_mod = notify_mod; | ||
1194 | master->tmit->notify_data = notify_data; | ||
1195 | master->tmit->notify_cls = notify_cls; | ||
1196 | master->tmit->state = MSG_STATE_MODIFIER; | ||
1197 | |||
1198 | master_transmit_mod (master); | ||
1199 | return master->tmit; | ||
1200 | } | 1264 | } |
1201 | 1265 | ||
1202 | 1266 | ||
@@ -1208,12 +1272,7 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | |||
1208 | void | 1272 | void |
1209 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | 1273 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) |
1210 | { | 1274 | { |
1211 | struct GNUNET_PSYC_Channel *ch = &th->master->ch; | 1275 | channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); |
1212 | if (0 == ch->tmit_ack_pending) | ||
1213 | { | ||
1214 | ch->tmit_paused = GNUNET_NO; | ||
1215 | master_transmit_data (th->master); | ||
1216 | } | ||
1217 | } | 1276 | } |
1218 | 1277 | ||
1219 | 1278 | ||
@@ -1225,10 +1284,7 @@ GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | |||
1225 | void | 1284 | void |
1226 | GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) | 1285 | GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) |
1227 | { | 1286 | { |
1228 | struct GNUNET_PSYC_Master *master = th->master; | 1287 | channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); |
1229 | struct GNUNET_PSYC_Channel *ch = &master->ch; | ||
1230 | if (GNUNET_NO != ch->in_transmit) | ||
1231 | return; | ||
1232 | } | 1288 | } |
1233 | 1289 | ||
1234 | 1290 | ||
@@ -1282,15 +1338,15 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1282 | { | 1338 | { |
1283 | struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); | 1339 | struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); |
1284 | struct GNUNET_PSYC_Channel *ch = &slv->ch; | 1340 | struct GNUNET_PSYC_Channel *ch = &slv->ch; |
1285 | struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req) | 1341 | struct SlaveJoinRequest *req |
1286 | + relay_count * sizeof (*relays)); | 1342 | = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays)); |
1287 | req->header.size = htons (sizeof (*req) | 1343 | req->header.size = htons (sizeof (*req) |
1288 | + relay_count * sizeof (*relays)); | 1344 | + relay_count * sizeof (*relays)); |
1289 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); | 1345 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); |
1290 | req->channel_key = *channel_key; | 1346 | req->channel_key = *channel_key; |
1291 | req->slave_key = *slave_key; | 1347 | req->slave_key = *slave_key; |
1292 | req->origin = *origin; | 1348 | req->origin = *origin; |
1293 | req->relay_count = relay_count; | 1349 | req->relay_count = htonl (relay_count); |
1294 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); | 1350 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); |
1295 | 1351 | ||
1296 | ch->message_cb = message_cb; | 1352 | ch->message_cb = message_cb; |
@@ -1303,6 +1359,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1303 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 1359 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; |
1304 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); | 1360 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); |
1305 | 1361 | ||
1362 | slv->join_cb = slave_joined_cb; | ||
1306 | return slv; | 1363 | return slv; |
1307 | } | 1364 | } |
1308 | 1365 | ||
@@ -1328,9 +1385,8 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) | |||
1328 | * | 1385 | * |
1329 | * @param slave Slave handle. | 1386 | * @param slave Slave handle. |
1330 | * @param method_name Which (PSYC) method should be invoked (on host). | 1387 | * @param method_name Which (PSYC) method should be invoked (on host). |
1331 | * @param env Environment containing transient variables for the message, or | 1388 | * @param notify_mod Function to call to obtain modifiers. |
1332 | * NULL. | 1389 | * @param notify_data Function to call to obtain fragments of the data. |
1333 | * @param notify Function to call when we are allowed to transmit (to get data). | ||
1334 | * @param notify_cls Closure for @a notify. | 1390 | * @param notify_cls Closure for @a notify. |
1335 | * @param flags Flags for the message being transmitted. | 1391 | * @param flags Flags for the message being transmitted. |
1336 | * @return Transmission handle, NULL on error (i.e. more than one request | 1392 | * @return Transmission handle, NULL on error (i.e. more than one request |
@@ -1339,12 +1395,14 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) | |||
1339 | struct GNUNET_PSYC_SlaveTransmitHandle * | 1395 | struct GNUNET_PSYC_SlaveTransmitHandle * |
1340 | GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, | 1396 | GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, |
1341 | const char *method_name, | 1397 | const char *method_name, |
1342 | const struct GNUNET_ENV_Environment *env, | 1398 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, |
1343 | GNUNET_PSYC_SlaveTransmitNotify notify, | 1399 | GNUNET_PSYC_TransmitNotifyData notify_data, |
1344 | void *notify_cls, | 1400 | void *notify_cls, |
1345 | enum GNUNET_PSYC_SlaveTransmitFlags flags) | 1401 | enum GNUNET_PSYC_SlaveTransmitFlags flags) |
1346 | { | 1402 | { |
1347 | return NULL; | 1403 | return (struct GNUNET_PSYC_SlaveTransmitHandle *) |
1404 | channel_transmit (&slave->ch, method_name, | ||
1405 | notify_mod, notify_data, notify_cls, flags); | ||
1348 | } | 1406 | } |
1349 | 1407 | ||
1350 | 1408 | ||
@@ -1356,7 +1414,7 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, | |||
1356 | void | 1414 | void |
1357 | GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | 1415 | GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) |
1358 | { | 1416 | { |
1359 | 1417 | channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | |
1360 | } | 1418 | } |
1361 | 1419 | ||
1362 | 1420 | ||
@@ -1368,7 +1426,7 @@ GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | |||
1368 | void | 1426 | void |
1369 | GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) | 1427 | GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) |
1370 | { | 1428 | { |
1371 | 1429 | channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | |
1372 | } | 1430 | } |
1373 | 1431 | ||
1374 | 1432 | ||
@@ -1382,7 +1440,7 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) | |||
1382 | struct GNUNET_PSYC_Channel * | 1440 | struct GNUNET_PSYC_Channel * |
1383 | GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) | 1441 | GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) |
1384 | { | 1442 | { |
1385 | return (struct GNUNET_PSYC_Channel *) master; | 1443 | return &master->ch; |
1386 | } | 1444 | } |
1387 | 1445 | ||
1388 | 1446 | ||
@@ -1395,7 +1453,7 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) | |||
1395 | struct GNUNET_PSYC_Channel * | 1453 | struct GNUNET_PSYC_Channel * |
1396 | GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) | 1454 | GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) |
1397 | { | 1455 | { |
1398 | return (struct GNUNET_PSYC_Channel *) slave; | 1456 | return &slave->ch; |
1399 | } | 1457 | } |
1400 | 1458 | ||
1401 | 1459 | ||
diff --git a/src/psyc/psyc_common.c b/src/psyc/psyc_common.c new file mode 100644 index 000000000..7368011fc --- /dev/null +++ b/src/psyc/psyc_common.c | |||
@@ -0,0 +1,100 @@ | |||
1 | /* | ||
2 | * This file is part of GNUnet | ||
3 | * (C) 2013 Christian Grothoff (and other contributing authors) | ||
4 | * | ||
5 | * GNUnet is free software; you can redistribute it and/or modify | ||
6 | * it under the terms of the GNU General Public License as published | ||
7 | * by the Free Software Foundation; either version 3, or (at your | ||
8 | * option) any later version. | ||
9 | * | ||
10 | * GNUnet is distributed in the hope that it will be useful, but | ||
11 | * WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | * General Public License for more details. | ||
14 | * | ||
15 | * You should have received a copy of the GNU General Public License | ||
16 | * along with GNUnet; see the file COPYING. If not, write to the | ||
17 | * Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | * Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file psyc/psyc_common.c | ||
23 | * @brief Common functions for PSYC | ||
24 | * @author Gabor X Toth | ||
25 | */ | ||
26 | |||
27 | #include <inttypes.h> | ||
28 | #include "psyc.h" | ||
29 | |||
30 | /** | ||
31 | * Check if @a data contains a series of valid message parts. | ||
32 | * | ||
33 | * @param data_size Size of @a data. | ||
34 | * @param data Data. | ||
35 | * | ||
36 | * @return GNUNET_YES or GNUNET_NO | ||
37 | */ | ||
38 | int | ||
39 | GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data) | ||
40 | { | ||
41 | const struct GNUNET_MessageHeader *pmsg; | ||
42 | uint16_t psize = 0; | ||
43 | uint16_t pos = 0; | ||
44 | |||
45 | for (pos = 0; data_size + pos < data_size; pos += psize) | ||
46 | { | ||
47 | pmsg = (const struct GNUNET_MessageHeader *) (data + pos); | ||
48 | psize = ntohs (pmsg->size); | ||
49 | if (psize < sizeof (*pmsg) || data_size + pos + psize > data_size) | ||
50 | { | ||
51 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
52 | "Invalid message part of type %u and size %u.", | ||
53 | ntohs (pmsg->type), psize); | ||
54 | return GNUNET_NO; | ||
55 | } | ||
56 | } | ||
57 | return GNUNET_YES; | ||
58 | } | ||
59 | |||
60 | |||
61 | void | ||
62 | GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, | ||
63 | const struct GNUNET_MessageHeader *msg) | ||
64 | { | ||
65 | uint16_t size = ntohs (msg->size); | ||
66 | uint16_t type = ntohs (msg->type); | ||
67 | GNUNET_log (kind, "Message of type %d and size %u:\n", type, size); | ||
68 | switch (type) | ||
69 | { | ||
70 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: | ||
71 | { | ||
72 | struct GNUNET_PSYC_MessageHeader *pmsg | ||
73 | = (struct GNUNET_PSYC_MessageHeader *) msg; | ||
74 | GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %" PRIu32 "\n", | ||
75 | GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags)); | ||
76 | break; | ||
77 | } | ||
78 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
79 | { | ||
80 | struct GNUNET_PSYC_MessageMethod *meth | ||
81 | = (struct GNUNET_PSYC_MessageMethod *) msg; | ||
82 | GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]); | ||
83 | break; | ||
84 | } | ||
85 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
86 | { | ||
87 | struct GNUNET_PSYC_MessageModifier *mod | ||
88 | = (struct GNUNET_PSYC_MessageModifier *) msg; | ||
89 | uint16_t name_size = ntohs (mod->name_size); | ||
90 | char oper = ' ' < mod->oper ? mod->oper : ' '; | ||
91 | GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1], | ||
92 | ntohs (mod->value_size), ((char *) &mod[1]) + name_size + 1); | ||
93 | break; | ||
94 | } | ||
95 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
96 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
97 | GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]); | ||
98 | break; | ||
99 | } | ||
100 | } | ||
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 33684b125..88947be60 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -37,7 +37,7 @@ | |||
37 | 37 | ||
38 | #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) | 38 | #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) |
39 | 39 | ||
40 | #define DEBUG_SERVICE 0 | 40 | #define DEBUG_SERVICE 1 |
41 | 41 | ||
42 | 42 | ||
43 | /** | 43 | /** |
@@ -66,7 +66,8 @@ struct GNUNET_PSYC_MasterTransmitHandle *mth; | |||
66 | 66 | ||
67 | struct TransmitClosure | 67 | struct TransmitClosure |
68 | { | 68 | { |
69 | struct GNUNET_PSYC_MasterTransmitHandle *handle; | 69 | struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit; |
70 | struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit; | ||
70 | struct GNUNET_ENV_Environment *env; | 71 | struct GNUNET_ENV_Environment *env; |
71 | char *data[16]; | 72 | char *data[16]; |
72 | const char *mod_value; | 73 | const char *mod_value; |
@@ -78,12 +79,30 @@ struct TransmitClosure | |||
78 | 79 | ||
79 | struct TransmitClosure *tmit; | 80 | struct TransmitClosure *tmit; |
80 | 81 | ||
82 | |||
83 | enum | ||
84 | { | ||
85 | TEST_NONE, | ||
86 | TEST_SLAVE_TRANSMIT, | ||
87 | TEST_MASTER_TRANSMIT, | ||
88 | } test; | ||
89 | |||
90 | |||
91 | static void | ||
92 | master_transmit (); | ||
93 | |||
94 | |||
81 | /** | 95 | /** |
82 | * Clean up all resources used. | 96 | * Clean up all resources used. |
83 | */ | 97 | */ |
84 | static void | 98 | static void |
85 | cleanup () | 99 | cleanup () |
86 | { | 100 | { |
101 | if (NULL != slv) | ||
102 | { | ||
103 | GNUNET_PSYC_slave_part (slv); | ||
104 | slv = NULL; | ||
105 | } | ||
87 | if (NULL != mst) | 106 | if (NULL != mst) |
88 | { | 107 | { |
89 | GNUNET_PSYC_master_stop (mst); | 108 | GNUNET_PSYC_master_stop (mst); |
@@ -133,6 +152,8 @@ end_normally (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
133 | static void | 152 | static void |
134 | end () | 153 | end () |
135 | { | 154 | { |
155 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending tests.\n"); | ||
156 | |||
136 | if (end_badly_task != GNUNET_SCHEDULER_NO_TASK) | 157 | if (end_badly_task != GNUNET_SCHEDULER_NO_TASK) |
137 | { | 158 | { |
138 | GNUNET_SCHEDULER_cancel (end_badly_task); | 159 | GNUNET_SCHEDULER_cancel (end_badly_task); |
@@ -144,8 +165,8 @@ end () | |||
144 | 165 | ||
145 | 166 | ||
146 | static void | 167 | static void |
147 | message (void *cls, uint64_t message_id, uint32_t flags, | 168 | master_message (void *cls, uint64_t message_id, uint32_t flags, |
148 | const struct GNUNET_MessageHeader *msg) | 169 | const struct GNUNET_MessageHeader *msg) |
149 | { | 170 | { |
150 | if (NULL == msg) | 171 | if (NULL == msg) |
151 | { | 172 | { |
@@ -158,12 +179,64 @@ message (void *cls, uint64_t message_id, uint32_t flags, | |||
158 | uint16_t size = ntohs (msg->size); | 179 | uint16_t size = ntohs (msg->size); |
159 | 180 | ||
160 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 181 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
161 | "Got message part of type %u and size %u " | 182 | "Master got message part of type %u and size %u " |
162 | "belonging to message ID %llu with flags %u\n", | 183 | "belonging to message ID %llu with flags %u\n", |
163 | type, size, message_id, flags); | 184 | type, size, message_id, flags); |
164 | 185 | ||
165 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) | 186 | switch (test) |
166 | end (); | 187 | { |
188 | case TEST_SLAVE_TRANSMIT: | ||
189 | if (GNUNET_PSYC_MESSAGE_REQUEST != flags) | ||
190 | { | ||
191 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
192 | "Unexpected request flags: %lu\n", flags); | ||
193 | GNUNET_assert (0); | ||
194 | return; | ||
195 | } | ||
196 | // FIXME: check rest of message | ||
197 | |||
198 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) | ||
199 | master_transmit (); | ||
200 | break; | ||
201 | |||
202 | case TEST_MASTER_TRANSMIT: | ||
203 | break; | ||
204 | |||
205 | default: | ||
206 | GNUNET_assert (0); | ||
207 | } | ||
208 | } | ||
209 | |||
210 | |||
211 | static void | ||
212 | slave_message (void *cls, uint64_t message_id, uint32_t flags, | ||
213 | const struct GNUNET_MessageHeader *msg) | ||
214 | { | ||
215 | if (NULL == msg) | ||
216 | { | ||
217 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
218 | "Error while receiving message %llu\n", message_id); | ||
219 | return; | ||
220 | } | ||
221 | |||
222 | uint16_t type = ntohs (msg->type); | ||
223 | uint16_t size = ntohs (msg->size); | ||
224 | |||
225 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
226 | "Slave got message part of type %u and size %u " | ||
227 | "belonging to message ID %llu with flags %u\n", | ||
228 | type, size, message_id, flags); | ||
229 | |||
230 | switch (test) | ||
231 | { | ||
232 | case TEST_MASTER_TRANSMIT: | ||
233 | if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) | ||
234 | end (); | ||
235 | break; | ||
236 | |||
237 | default: | ||
238 | GNUNET_assert (0); | ||
239 | } | ||
167 | } | 240 | } |
168 | 241 | ||
169 | 242 | ||
@@ -175,7 +248,9 @@ join_request (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | |||
175 | struct GNUNET_PSYC_JoinHandle *jh) | 248 | struct GNUNET_PSYC_JoinHandle *jh) |
176 | { | 249 | { |
177 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 250 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
178 | "Got join request."); | 251 | "Got join request: %s (%zu vars)", method_name, variable_count); |
252 | GNUNET_PSYC_join_decision (jh, GNUNET_YES, 0, NULL, "_notice_join", NULL, | ||
253 | "you're in", 9); | ||
179 | } | 254 | } |
180 | 255 | ||
181 | 256 | ||
@@ -185,7 +260,7 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
185 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); | 260 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); |
186 | struct TransmitClosure *tmit = cls; | 261 | struct TransmitClosure *tmit = cls; |
187 | tmit->paused = GNUNET_NO; | 262 | tmit->paused = GNUNET_NO; |
188 | GNUNET_PSYC_master_transmit_resume (tmit->handle); | 263 | GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit); |
189 | } | 264 | } |
190 | 265 | ||
191 | 266 | ||
@@ -204,35 +279,8 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper) | |||
204 | uint16_t name_size = 0; | 279 | uint16_t name_size = 0; |
205 | size_t value_size = 0; | 280 | size_t value_size = 0; |
206 | 281 | ||
207 | if (NULL != tmit->mod_value && 0 < tmit->mod_value_size) | 282 | if (NULL != oper) |
208 | { /* Modifier continuation */ | 283 | { /* New modifier */ |
209 | value = tmit->mod_value; | ||
210 | if (tmit->mod_value_size <= *data_size) | ||
211 | { | ||
212 | value_size = tmit->mod_value_size; | ||
213 | tmit->mod_value = NULL; | ||
214 | } | ||
215 | else | ||
216 | { | ||
217 | value_size = *data_size; | ||
218 | tmit->mod_value += value_size; | ||
219 | } | ||
220 | tmit->mod_value_size -= value_size; | ||
221 | |||
222 | if (*data_size < value_size) | ||
223 | { | ||
224 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
225 | "value larger than buffer: %u < %zu\n", | ||
226 | *data_size, value_size); | ||
227 | *data_size = 0; | ||
228 | return GNUNET_NO; | ||
229 | } | ||
230 | |||
231 | *data_size = value_size; | ||
232 | memcpy (data, value, value_size); | ||
233 | } | ||
234 | else if (NULL != oper) | ||
235 | { | ||
236 | if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name, | 284 | if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name, |
237 | (void *) &value, &value_size)) | 285 | (void *) &value, &value_size)) |
238 | { /* No more modifiers, continue with data */ | 286 | { /* No more modifiers, continue with data */ |
@@ -259,6 +307,33 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper) | |||
259 | ((char *)data)[name_size] = '\0'; | 307 | ((char *)data)[name_size] = '\0'; |
260 | memcpy ((char *)data + name_size + 1, value, value_size); | 308 | memcpy ((char *)data + name_size + 1, value, value_size); |
261 | } | 309 | } |
310 | else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size) | ||
311 | { /* Modifier continuation */ | ||
312 | value = tmit->mod_value; | ||
313 | if (tmit->mod_value_size <= *data_size) | ||
314 | { | ||
315 | value_size = tmit->mod_value_size; | ||
316 | tmit->mod_value = NULL; | ||
317 | } | ||
318 | else | ||
319 | { | ||
320 | value_size = *data_size; | ||
321 | tmit->mod_value += value_size; | ||
322 | } | ||
323 | tmit->mod_value_size -= value_size; | ||
324 | |||
325 | if (*data_size < value_size) | ||
326 | { | ||
327 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
328 | "value larger than buffer: %u < %zu\n", | ||
329 | *data_size, value_size); | ||
330 | *data_size = 0; | ||
331 | return GNUNET_NO; | ||
332 | } | ||
333 | |||
334 | *data_size = value_size; | ||
335 | memcpy (data, value, value_size); | ||
336 | } | ||
262 | 337 | ||
263 | return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO; | 338 | return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO; |
264 | } | 339 | } |
@@ -268,6 +343,12 @@ static int | |||
268 | tmit_notify_data (void *cls, uint16_t *data_size, void *data) | 343 | tmit_notify_data (void *cls, uint16_t *data_size, void *data) |
269 | { | 344 | { |
270 | struct TransmitClosure *tmit = cls; | 345 | struct TransmitClosure *tmit = cls; |
346 | if (0 == tmit->data_count) | ||
347 | { | ||
348 | *data_size = 0; | ||
349 | return GNUNET_YES; | ||
350 | } | ||
351 | |||
271 | uint16_t size = strlen (tmit->data[tmit->n]); | 352 | uint16_t size = strlen (tmit->data[tmit->n]); |
272 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 353 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
273 | "Transmit notify data: %lu bytes available, " | 354 | "Transmit notify data: %lu bytes available, " |
@@ -300,10 +381,52 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data) | |||
300 | 381 | ||
301 | 382 | ||
302 | static void | 383 | static void |
303 | master_started (void *cls, uint64_t max_message_id) | 384 | slave_joined (void *cls, uint64_t max_message_id) |
304 | { | 385 | { |
305 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 386 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id); |
306 | "Master started: %" PRIu64 "\n", max_message_id); | 387 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n"); |
388 | |||
389 | test = TEST_SLAVE_TRANSMIT; | ||
390 | |||
391 | tmit = GNUNET_new (struct TransmitClosure); | ||
392 | tmit->env = GNUNET_ENV_environment_create (); | ||
393 | GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, | ||
394 | "_abc", "abc def", 7); | ||
395 | GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, | ||
396 | "_abc_def", "abc def ghi", 11); | ||
397 | tmit->n = 0; | ||
398 | tmit->data[0] = "slave test"; | ||
399 | tmit->data_count = 1; | ||
400 | tmit->slv_tmit | ||
401 | = GNUNET_PSYC_slave_transmit (slv, "_request_test", tmit_notify_mod, | ||
402 | tmit_notify_data, tmit, | ||
403 | GNUNET_PSYC_SLAVE_TRANSMIT_NONE); | ||
404 | } | ||
405 | |||
406 | static void | ||
407 | slave_join () | ||
408 | { | ||
409 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n"); | ||
410 | |||
411 | struct GNUNET_PeerIdentity origin; | ||
412 | struct GNUNET_PeerIdentity relays[16]; | ||
413 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); | ||
414 | GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, | ||
415 | "_foo", "bar baz", 7); | ||
416 | GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, | ||
417 | "_foo_bar", "foo bar baz", 11); | ||
418 | slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, | ||
419 | 16, relays, &slave_message, &join_request, &slave_joined, | ||
420 | NULL, "_request_join", env, "some data", 9); | ||
421 | GNUNET_ENV_environment_destroy (env); | ||
422 | } | ||
423 | |||
424 | |||
425 | static void | ||
426 | master_transmit () | ||
427 | { | ||
428 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n"); | ||
429 | test = TEST_MASTER_TRANSMIT; | ||
307 | 430 | ||
308 | tmit = GNUNET_new (struct TransmitClosure); | 431 | tmit = GNUNET_new (struct TransmitClosure); |
309 | tmit->env = GNUNET_ENV_environment_create (); | 432 | tmit->env = GNUNET_ENV_environment_create (); |
@@ -315,17 +438,19 @@ master_started (void *cls, uint64_t max_message_id) | |||
315 | tmit->data[1] = "foo bar"; | 438 | tmit->data[1] = "foo bar"; |
316 | tmit->data[2] = "foo bar baz"; | 439 | tmit->data[2] = "foo bar baz"; |
317 | tmit->data_count = 3; | 440 | tmit->data_count = 3; |
318 | tmit->handle | 441 | tmit->mst_tmit |
319 | = GNUNET_PSYC_master_transmit (mst, "_test", tmit_notify_mod, | 442 | = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod, |
320 | tmit_notify_data, tmit, | 443 | tmit_notify_data, tmit, |
321 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); | 444 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); |
322 | } | 445 | } |
323 | 446 | ||
324 | 447 | ||
325 | static void | 448 | static void |
326 | slave_joined (void *cls, uint64_t max_message_id) | 449 | master_started (void *cls, uint64_t max_message_id) |
327 | { | 450 | { |
328 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id); | 451 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
452 | "Master started: %" PRIu64 "\n", max_message_id); | ||
453 | slave_join (); | ||
329 | } | 454 | } |
330 | 455 | ||
331 | 456 | ||
@@ -355,21 +480,9 @@ run (void *cls, | |||
355 | GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); | 480 | GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); |
356 | GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key); | 481 | GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key); |
357 | 482 | ||
483 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); | ||
358 | mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, | 484 | mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, |
359 | &message, &join_request, &master_started, NULL); | 485 | &master_message, &join_request, &master_started, NULL); |
360 | return; /* FIXME: test slave */ | ||
361 | |||
362 | struct GNUNET_PeerIdentity origin; | ||
363 | struct GNUNET_PeerIdentity relays[16]; | ||
364 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); | ||
365 | GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, | ||
366 | "_foo", "bar baz", 7); | ||
367 | GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, | ||
368 | "_foo_bar", "foo bar baz", 11); | ||
369 | slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, | ||
370 | 16, relays, &message, &join_request, &slave_joined, | ||
371 | NULL, "_request_join", env, "some data", 9); | ||
372 | GNUNET_ENV_environment_destroy (env); | ||
373 | } | 486 | } |
374 | 487 | ||
375 | 488 | ||