aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2013-10-10 18:08:53 +0000
committerGabor X Toth <*@tg-x.net>2013-10-10 18:08:53 +0000
commit1b5d4aba9d8bbcb62c93f96782e3567f6f79d0cb (patch)
tree3cd28bfee831af0417c2dcbb543c03481517ad00 /src/psyc
parent67a8e21eedb6d35fec76841d4a1a6b4b41b37879 (diff)
downloadgnunet-1b5d4aba9d8bbcb62c93f96782e3567f6f79d0cb.tar.gz
gnunet-1b5d4aba9d8bbcb62c93f96782e3567f6f79d0cb.zip
PSYC: master msg transmission
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/gnunet-service-psyc.c271
-rw-r--r--src/psyc/psyc.h7
-rw-r--r--src/psyc/psyc_api.c218
-rw-r--r--src/psyc/test_psyc.c90
4 files changed, 472 insertions, 114 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 7f5189ab8..d3f203ebf 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -34,8 +34,6 @@
34#include "gnunet_psyc_service.h" 34#include "gnunet_psyc_service.h"
35#include "psyc.h" 35#include "psyc.h"
36 36
37#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
38
39 37
40/** 38/**
41 * Handle to our current configuration. 39 * Handle to our current configuration.
@@ -58,6 +56,11 @@ static struct GNUNET_SERVER_NotificationContext *nc;
58static struct GNUNET_PSYCSTORE_Handle *store; 56static struct GNUNET_PSYCSTORE_Handle *store;
59 57
60/** 58/**
59 * channel's pub_key_hash -> struct Channel
60 */
61static struct GNUNET_CONTAINER_MultiHashMap *clients;
62
63/**
61 * Message in the transmission queue. 64 * Message in the transmission queue.
62 */ 65 */
63struct TransmitMessage 66struct TransmitMessage
@@ -81,6 +84,7 @@ struct Channel
81 struct TransmitMessage *tmit_tail; 84 struct TransmitMessage *tmit_tail;
82 85
83 char *tmit_buf; 86 char *tmit_buf;
87 GNUNET_SCHEDULER_TaskIdentifier tmit_task;
84 uint32_t tmit_mod_count; 88 uint32_t tmit_mod_count;
85 uint32_t tmit_mod_recvd; 89 uint32_t tmit_mod_recvd;
86 uint16_t tmit_size; 90 uint16_t tmit_size;
@@ -96,8 +100,9 @@ struct Channel
96struct Master 100struct Master
97{ 101{
98 struct Channel channel; 102 struct Channel channel;
99 struct GNUNET_CRYPTO_EccPrivateKey private_key; 103 struct GNUNET_CRYPTO_EccPrivateKey priv_key;
100 struct GNUNET_CRYPTO_EccPublicSignKey public_key; 104 struct GNUNET_CRYPTO_EccPublicSignKey pub_key;
105 struct GNUNET_HashCode pub_key_hash;
101 106
102 struct GNUNET_MULTICAST_Origin *origin; 107 struct GNUNET_MULTICAST_Origin *origin;
103 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; 108 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
@@ -120,13 +125,20 @@ struct Slave
120{ 125{
121 struct Channel channel; 126 struct Channel channel;
122 struct GNUNET_CRYPTO_EccPrivateKey slave_key; 127 struct GNUNET_CRYPTO_EccPrivateKey slave_key;
123 struct GNUNET_CRYPTO_EccPublicSignKey channel_key; 128 struct GNUNET_CRYPTO_EccPublicSignKey chan_key;
129 struct GNUNET_HashCode chan_key_hash;
124 130
125 struct GNUNET_MULTICAST_Member *member; 131 struct GNUNET_MULTICAST_Member *member;
126 struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle; 132 struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
127 133
134 struct GNUNET_PeerIdentity origin;
135 struct GNUNET_PeerIdentity *relays;
136 struct GNUNET_MessageHeader *join_req;
137
128 uint64_t max_message_id; 138 uint64_t max_message_id;
129 uint64_t max_request_id; 139 uint64_t max_request_id;
140
141 uint32_t relay_count;
130}; 142};
131 143
132 144
@@ -166,41 +178,151 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
166 178
167 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client); 179 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
168 180
169 struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, 181 struct Channel *ch
170 struct Channel); 182 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
171 GNUNET_assert (NULL != ch); 183 if (NULL == ch)
184 {
185 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
186 "User context is NULL in client_disconnect()\n");
187 GNUNET_break (0);
188 return;
189 }
172 190
173 if (NULL != ch->tmit_buf) 191 if (NULL != ch->tmit_buf)
174 { 192 {
175 GNUNET_free (ch->tmit_buf); 193 GNUNET_free (ch->tmit_buf);
176 ch->tmit_buf = NULL; 194 ch->tmit_buf = NULL;
177 } 195 }
196
197 if (ch->is_master)
198 {
199 struct Master *mst = (struct Master *) ch;
200 if (NULL != mst->origin)
201 GNUNET_MULTICAST_origin_stop (mst->origin);
202 }
203 else
204 {
205 struct Slave *slv = (struct Slave *) ch;
206 if (NULL != slv->join_req)
207 GNUNET_free (slv->join_req);
208 if (NULL != slv->relays)
209 GNUNET_free (slv->relays);
210 if (NULL != slv->member)
211 GNUNET_MULTICAST_member_part (slv->member);
212 }
213
178 GNUNET_free (ch); 214 GNUNET_free (ch);
179} 215}
180 216
217void
218join_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
219 const struct GNUNET_MessageHeader *join_req,
220 struct GNUNET_MULTICAST_JoinHandle *jh)
221{
222
223}
181 224
182void 225void
183counters_cb (void *cls, uint64_t max_fragment_id, uint64_t max_message_id, 226membership_test_cb (void *cls,
184 uint64_t max_group_generation, uint64_t max_state_message_id) 227 const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
228 uint64_t message_id, uint64_t group_generation,
229 struct GNUNET_MULTICAST_MembershipTestHandle *mth)
185{ 230{
186 struct Channel *ch = cls; 231
232}
233
234void
235replay_fragment_cb (void *cls,
236 const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
237 uint64_t fragment_id, uint64_t flags,
238 struct GNUNET_MULTICAST_ReplayHandle *rh)
239{
240
241}
242
243void
244replay_message_cb (void *cls,
245 const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
246 uint64_t message_id,
247 uint64_t fragment_offset,
248 uint64_t flags,
249 struct GNUNET_MULTICAST_ReplayHandle *rh)
250{
251
252}
253
254void
255request_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
256 const struct GNUNET_MessageHeader *req,
257 enum GNUNET_MULTICAST_MessageFlags flags)
258{
259
260}
261
262void
263message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
264{
265 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
266 "Received message of type %u from multicast.\n",
267 ntohs (msg->type));
268}
269
270void
271master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
272 uint64_t max_message_id, uint64_t max_group_generation,
273 uint64_t max_state_message_id)
274{
275 struct Master *mst = cls;
276 struct Channel *ch = &mst->channel;
187 struct CountersResult *res = GNUNET_malloc (sizeof (*res)); 277 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
278 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
188 res->header.size = htons (sizeof (*res)); 279 res->header.size = htons (sizeof (*res));
280 res->result_code = htonl (result);
189 res->max_message_id = GNUNET_htonll (max_message_id); 281 res->max_message_id = GNUNET_htonll (max_message_id);
190 282
191 if (ch->is_master) 283 if (GNUNET_OK == result || GNUNET_NO == result)
192 { 284 {
193 struct Master *mst = cls;
194 mst->max_message_id = max_message_id; 285 mst->max_message_id = max_message_id;
195 mst->max_state_message_id = max_state_message_id; 286 mst->max_state_message_id = max_state_message_id;
196 mst->max_group_generation = max_group_generation; 287 mst->max_group_generation = max_group_generation;
197 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 288 mst->origin
289 = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
290 max_fragment_id + 1,
291 join_cb, membership_test_cb,
292 replay_fragment_cb, replay_message_cb,
293 request_cb, message_cb, ch);
198 } 294 }
199 else 295 GNUNET_SERVER_notification_context_add (nc, ch->client);
296 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
297 GNUNET_NO);
298 GNUNET_free (res);
299}
300
301
302void
303slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
304 uint64_t max_message_id, uint64_t max_group_generation,
305 uint64_t max_state_message_id)
306{
307 struct Slave *slv = cls;
308 struct Channel *ch = &slv->channel;
309 struct CountersResult *res = GNUNET_malloc (sizeof (*res));
310 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
311 res->header.size = htons (sizeof (*res));
312 res->result_code = htonl (result);
313 res->max_message_id = GNUNET_htonll (max_message_id);
314
315 if (GNUNET_OK == result || GNUNET_NO == result)
200 { 316 {
201 struct Slave *slv = cls;
202 slv->max_message_id = max_message_id; 317 slv->max_message_id = max_message_id;
203 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 318 slv->member
319 = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key,
320 &slv->origin,
321 slv->relay_count, slv->relays,
322 slv->join_req, join_cb,
323 membership_test_cb,
324 replay_fragment_cb, replay_message_cb,
325 message_cb, ch);
204 } 326 }
205 327
206 GNUNET_SERVER_notification_context_add (nc, ch->client); 328 GNUNET_SERVER_notification_context_add (nc, ch->client);
@@ -220,14 +342,17 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
220 mst->channel.client = client; 342 mst->channel.client = client;
221 mst->channel.is_master = GNUNET_YES; 343 mst->channel.is_master = GNUNET_YES;
222 mst->policy = ntohl (req->policy); 344 mst->policy = ntohl (req->policy);
223 mst->private_key = req->channel_key; 345 mst->priv_key = req->channel_key;
224 GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->private_key, 346 GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->priv_key,
225 &mst->public_key); 347 &mst->pub_key);
348 GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash);
226 349
227 GNUNET_PSYCSTORE_counters_get (store, &mst->public_key, 350 GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
228 counters_cb, mst); 351 master_counters_cb, mst);
229 352
230 GNUNET_SERVER_client_set_user_context (client, mst); 353 GNUNET_SERVER_client_set_user_context (client, &mst->channel);
354 GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst,
355 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
231 GNUNET_SERVER_receive_done (client, GNUNET_OK); 356 GNUNET_SERVER_receive_done (client, GNUNET_OK);
232} 357}
233 358
@@ -241,13 +366,25 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
241 struct Slave *slv = GNUNET_new (struct Slave); 366 struct Slave *slv = GNUNET_new (struct Slave);
242 slv->channel.client = client; 367 slv->channel.client = client;
243 slv->channel.is_master = GNUNET_NO; 368 slv->channel.is_master = GNUNET_NO;
244 slv->channel_key = req->channel_key;
245 slv->slave_key = req->slave_key; 369 slv->slave_key = req->slave_key;
246 370 slv->chan_key = req->channel_key;
247 GNUNET_PSYCSTORE_counters_get (store, &slv->channel_key, 371 GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key),
248 counters_cb, slv); 372 &slv->chan_key_hash);
249 373 slv->origin = req->origin;
250 GNUNET_SERVER_client_set_user_context (client, slv); 374 slv->relay_count = ntohl (req->relay_count);
375
376 const struct GNUNET_PeerIdentity *relays
377 = (const struct GNUNET_PeerIdentity *) &req[1];
378 slv->relays
379 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
380 uint32_t i;
381 for (i = 0; i < slv->relay_count; i++)
382 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
383
384 GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
385 slave_counters_cb, slv);
386
387 GNUNET_SERVER_client_set_user_context (client, &slv->channel);
251 GNUNET_SERVER_receive_done (client, GNUNET_OK); 388 GNUNET_SERVER_receive_done (client, GNUNET_OK);
252} 389}
253 390
@@ -268,34 +405,40 @@ send_transmit_ack (struct Channel *ch)
268 405
269 406
270static int 407static int
271transmit_notify (void *cls, uint64_t fragment_id, size_t *data_size, void *data) 408transmit_notify (void *cls, size_t *data_size, void *data)
272{ 409{
410 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n");
273 struct Channel *ch = cls; 411 struct Channel *ch = cls;
274 struct TransmitMessage *msg = ch->tmit_head; 412 struct TransmitMessage *msg = ch->tmit_head;
275 413
276 if (NULL == msg || *data_size < msg->size) 414 if (NULL == msg || *data_size < ntohs (msg->size))
277 { 415 {
278 *data_size = 0; 416 *data_size = 0;
279 return GNUNET_NO; 417 return GNUNET_NO;
280 } 418 }
281 419
282 memcpy (data, msg->buf, msg->size); 420 *data_size = ntohs (msg->size);
283 *data_size = msg->size; 421 memcpy (data, msg->buf, *data_size);
284 422
285 GNUNET_free (ch->tmit_buf); 423 GNUNET_free (ch->tmit_buf);
424 ch->tmit_buf = NULL;
286 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); 425 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
287 426
288 return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; 427 return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
289} 428}
290 429
291 430
292static int 431static void
293master_transmit_message (struct Master *mst) 432master_transmit_message (void *cls,
433 const struct GNUNET_SCHEDULER_TaskContext *tc)
294{ 434{
435 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
436 struct Master *mst = cls;
437 mst->channel.tmit_task = 0;
295 if (NULL == mst->tmit_handle) 438 if (NULL == mst->tmit_handle)
296 { 439 {
297 mst->tmit_handle 440 mst->tmit_handle
298 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, 441 = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id,
299 mst->max_group_generation, 442 mst->max_group_generation,
300 transmit_notify, mst); 443 transmit_notify, mst);
301 } 444 }
@@ -303,24 +446,25 @@ master_transmit_message (struct Master *mst)
303 { 446 {
304 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle); 447 GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
305 } 448 }
306 return GNUNET_OK;
307} 449}
308 450
309 451
310static int 452static void
311slave_transmit_message (struct Slave *slv) 453slave_transmit_message (void *cls,
454 const struct GNUNET_SCHEDULER_TaskContext *tc)
312{ 455{
456 struct Slave *slv = cls;
457 slv->channel.tmit_task = 0;
313 if (NULL == slv->tmit_handle) 458 if (NULL == slv->tmit_handle)
314 { 459 {
315 slv->tmit_handle 460 slv->tmit_handle
316 = GNUNET_MULTICAST_member_to_origin(slv->member, slv->max_request_id, 461 = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id,
317 transmit_notify, slv); 462 transmit_notify, slv);
318 } 463 }
319 else 464 else
320 { 465 {
321 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle); 466 GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
322 } 467 }
323 return GNUNET_OK;
324} 468}
325 469
326 470
@@ -328,6 +472,7 @@ static int
328buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) 472buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
329{ 473{
330 uint16_t size = ntohs (msg->size); 474 uint16_t size = ntohs (msg->size);
475 struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
331 476
332 if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size) 477 if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
333 return GNUNET_SYSERR; 478 return GNUNET_SYSERR;
@@ -353,12 +498,17 @@ buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
353 tmit_msg->size = size; 498 tmit_msg->size = size;
354 tmit_msg->status = ch->tmit_status; 499 tmit_msg->status = ch->tmit_status;
355 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); 500 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
356 501 tmit_delay = GNUNET_TIME_UNIT_ZERO;
357 ch->is_master
358 ? master_transmit_message ((struct Master *) ch)
359 : slave_transmit_message ((struct Slave *) ch);
360 } 502 }
361 503
504 if (0 != ch->tmit_task)
505 GNUNET_SCHEDULER_cancel (ch->tmit_task);
506
507 ch->tmit_task
508 = ch->is_master
509 ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch)
510 : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch);
511
362 return GNUNET_OK; 512 return GNUNET_OK;
363} 513}
364 514
@@ -368,8 +518,8 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
368{ 518{
369 const struct GNUNET_PSYC_MessageMethod *meth 519 const struct GNUNET_PSYC_MessageMethod *meth
370 = (const struct GNUNET_PSYC_MessageMethod *) msg; 520 = (const struct GNUNET_PSYC_MessageMethod *) msg;
371 struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, 521 struct Channel *ch
372 struct Channel); 522 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
373 GNUNET_assert (NULL != ch); 523 GNUNET_assert (NULL != ch);
374 524
375 if (GNUNET_NO != ch->in_transmit) 525 if (GNUNET_NO != ch->in_transmit)
@@ -378,6 +528,7 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
378 return; 528 return;
379 } 529 }
380 530
531 ch->in_transmit = GNUNET_YES;
381 ch->tmit_buf = NULL; 532 ch->tmit_buf = NULL;
382 ch->tmit_size = 0; 533 ch->tmit_size = 0;
383 ch->tmit_mod_recvd = 0; 534 ch->tmit_mod_recvd = 0;
@@ -388,6 +539,8 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
388 539
389 if (0 == ch->tmit_mod_count) 540 if (0 == ch->tmit_mod_count)
390 send_transmit_ack (ch); 541 send_transmit_ack (ch);
542
543 GNUNET_SERVER_receive_done (client, GNUNET_OK);
391}; 544};
392 545
393 546
@@ -397,8 +550,8 @@ handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
397{ 550{
398 const struct GNUNET_PSYC_MessageModifier *mod 551 const struct GNUNET_PSYC_MessageModifier *mod
399 = (const struct GNUNET_PSYC_MessageModifier *) msg; 552 = (const struct GNUNET_PSYC_MessageModifier *) msg;
400 struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, 553 struct Channel *ch
401 struct Channel); 554 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
402 GNUNET_assert (NULL != ch); 555 GNUNET_assert (NULL != ch);
403 556
404 ch->tmit_mod_recvd++; 557 ch->tmit_mod_recvd++;
@@ -406,6 +559,8 @@ handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
406 559
407 if (ch->tmit_mod_recvd == ch->tmit_mod_count) 560 if (ch->tmit_mod_recvd == ch->tmit_mod_count)
408 send_transmit_ack (ch); 561 send_transmit_ack (ch);
562
563 GNUNET_SERVER_receive_done (client, GNUNET_OK);
409}; 564};
410 565
411 566
@@ -415,13 +570,18 @@ handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
415{ 570{
416 const struct GNUNET_PSYC_MessageData *data 571 const struct GNUNET_PSYC_MessageData *data
417 = (const struct GNUNET_PSYC_MessageData *) msg; 572 = (const struct GNUNET_PSYC_MessageData *) msg;
418 struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, 573 struct Channel *ch
419 struct Channel); 574 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
420 GNUNET_assert (NULL != ch); 575 GNUNET_assert (NULL != ch);
421 576
422 ch->tmit_status = data->status; 577 ch->tmit_status = ntohs (data->status);
423 buffer_message (ch, msg); 578 buffer_message (ch, msg);
424 send_transmit_ack (ch); 579 send_transmit_ack (ch);
580
581 if (GNUNET_PSYC_DATA_CONT != ch->tmit_status)
582 ch->in_transmit = GNUNET_NO;
583
584 GNUNET_SERVER_receive_done (client, GNUNET_OK);
425}; 585};
426 586
427 587
@@ -444,13 +604,13 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
444 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, 604 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
445 605
446 { &handle_transmit_method, NULL, 606 { &handle_transmit_method, NULL,
447 GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD, 0 }, 607 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
448 608
449 { &handle_transmit_modifier, NULL, 609 { &handle_transmit_modifier, NULL,
450 GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER, 0 }, 610 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
451 611
452 { &handle_transmit_data, NULL, 612 { &handle_transmit_data, NULL,
453 GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA, 0 }, 613 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
454 614
455 { NULL, NULL, 0, 0 } 615 { NULL, NULL, 0, 0 }
456 }; 616 };
@@ -458,6 +618,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
458 cfg = c; 618 cfg = c;
459 store = GNUNET_PSYCSTORE_connect (cfg); 619 store = GNUNET_PSYCSTORE_connect (cfg);
460 stats = GNUNET_STATISTICS_create ("psyc", cfg); 620 stats = GNUNET_STATISTICS_create ("psyc", cfg);
621 clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
461 nc = GNUNET_SERVER_notification_context_create (server, 1); 622 nc = GNUNET_SERVER_notification_context_create (server, 1);
462 GNUNET_SERVER_add_handlers (server, handlers); 623 GNUNET_SERVER_add_handlers (server, handlers);
463 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); 624 GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 35e9ae800..6a8826337 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -65,6 +65,11 @@ struct CountersResult
65 */ 65 */
66 struct GNUNET_MessageHeader header; 66 struct GNUNET_MessageHeader header;
67 67
68 /**
69 * Status code for the operation.
70 */
71 int32_t result_code GNUNET_PACKED;
72
68 uint64_t max_message_id; 73 uint64_t max_message_id;
69}; 74};
70 75
@@ -121,6 +126,8 @@ struct SlaveJoinRequest
121 126
122 struct GNUNET_CRYPTO_EccPrivateKey slave_key; 127 struct GNUNET_CRYPTO_EccPrivateKey slave_key;
123 128
129 struct GNUNET_PeerIdentity origin;
130
124 /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */ 131 /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */
125}; 132};
126 133
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index abe7bb028..4178d920b 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -106,6 +106,28 @@ struct GNUNET_PSYC_Channel
106 * Are we currently transmitting a message? 106 * Are we currently transmitting a message?
107 */ 107 */
108 int in_transmit; 108 int in_transmit;
109
110 /**
111 * Is this a master or slave channel?
112 */
113 int is_master;
114
115 /**
116 * Buffer space available for transmitting the next data fragment.
117 */
118 uint16_t tmit_buf_avail;
119};
120
121
122/**
123 * Handle for a pending PSYC transmission operation.
124 */
125struct GNUNET_PSYC_MasterTransmitHandle
126{
127 struct GNUNET_PSYC_Master *master;
128 GNUNET_PSYC_MasterTransmitNotify notify;
129 void *notify_cls;
130 enum GNUNET_PSYC_DataStatus status;
109}; 131};
110 132
111 133
@@ -116,6 +138,8 @@ struct GNUNET_PSYC_Master
116{ 138{
117 struct GNUNET_PSYC_Channel ch; 139 struct GNUNET_PSYC_Channel ch;
118 140
141 struct GNUNET_PSYC_MasterTransmitHandle *tmit;
142
119 GNUNET_PSYC_MasterStartCallback start_cb; 143 GNUNET_PSYC_MasterStartCallback start_cb;
120 144
121 uint64_t max_message_id; 145 uint64_t max_message_id;
@@ -146,19 +170,6 @@ struct GNUNET_PSYC_JoinHandle
146/** 170/**
147 * Handle for a pending PSYC transmission operation. 171 * Handle for a pending PSYC transmission operation.
148 */ 172 */
149struct GNUNET_PSYC_MasterTransmitHandle
150{
151 struct GNUNET_PSYC_Master *master;
152 const struct GNUNET_ENV_Environment *env;
153 GNUNET_PSYC_MasterTransmitNotify notify;
154 void *notify_cls;
155 enum GNUNET_PSYC_MasterTransmitFlags flags;
156};
157
158
159/**
160 * Handle for a pending PSYC transmission operation.
161 */
162struct GNUNET_PSYC_SlaveTransmitHandle 173struct GNUNET_PSYC_SlaveTransmitHandle
163{ 174{
164 175
@@ -184,10 +195,10 @@ struct GNUNET_PSYC_StateQuery
184 195
185 196
186/** 197/**
187 * Try again to connect to the PSYCstore service. 198 * Try again to connect to the PSYC service.
188 * 199 *
189 * @param cls handle to the PSYCstore service. 200 * @param cls Handle to the PSYC service.
190 * @param tc scheduler context 201 * @param tc Scheduler context
191 */ 202 */
192static void 203static void
193reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); 204reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
@@ -215,7 +226,7 @@ reschedule_connect (struct GNUNET_PSYC_Channel *c)
215 } 226 }
216 c->in_receive = GNUNET_NO; 227 c->in_receive = GNUNET_NO;
217 LOG (GNUNET_ERROR_TYPE_DEBUG, 228 LOG (GNUNET_ERROR_TYPE_DEBUG,
218 "Scheduling task to reconnect to PSYCstore service in %s.\n", 229 "Scheduling task to reconnect to PSYC service in %s.\n",
219 GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES)); 230 GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES));
220 c->reconnect_task = 231 c->reconnect_task =
221 GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c); 232 GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c);
@@ -226,12 +237,56 @@ reschedule_connect (struct GNUNET_PSYC_Channel *c)
226/** 237/**
227 * Schedule transmission of the next message from our queue. 238 * Schedule transmission of the next message from our queue.
228 * 239 *
229 * @param h PSYCstore handle 240 * @param h PSYC handle
230 */ 241 */
231static void 242static void
232transmit_next (struct GNUNET_PSYC_Channel *c); 243transmit_next (struct GNUNET_PSYC_Channel *c);
233 244
234 245
246void
247master_transmit_data (struct GNUNET_PSYC_Master *mst)
248{
249 struct GNUNET_PSYC_Channel *ch = &mst->ch;
250 size_t data_size = ch->tmit_buf_avail;
251 struct GNUNET_PSYC_MessageData *pdata;
252 struct OperationHandle *op
253 = GNUNET_malloc (sizeof (*op) + sizeof (*pdata) + data_size);
254 pdata = (struct GNUNET_PSYC_MessageData *) &op[1];
255 op->msg = (struct GNUNET_MessageHeader *) pdata;
256 pdata->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
257
258 switch (mst->tmit->notify (mst->tmit->notify_cls, &data_size, &pdata[1]))
259 {
260 case GNUNET_NO:
261 mst->tmit->status = GNUNET_PSYC_DATA_CONT;
262 break;
263
264 case GNUNET_YES:
265 mst->tmit->status = GNUNET_PSYC_DATA_END;
266 break;
267
268 default:
269 mst->tmit->status = GNUNET_PSYC_DATA_CANCEL;
270 data_size = 0;
271 LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n");
272 }
273
274 if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size))
275 {
276 /* Transmission paused, nothing to send. */
277 GNUNET_free (op);
278 }
279 else
280 {
281 GNUNET_assert (data_size <= ch->tmit_buf_avail);
282 pdata->header.size = htons (sizeof (*pdata) + data_size);
283 pdata->status = htons (mst->tmit->status);
284 GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
285 transmit_next (ch);
286 }
287}
288
289
235/** 290/**
236 * Type of a function to call when we receive a message 291 * Type of a function to call when we receive a message
237 * from the service. 292 * from the service.
@@ -253,8 +308,8 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
253 } 308 }
254 uint16_t size_eq = 0; 309 uint16_t size_eq = 0;
255 uint16_t size_min = 0; 310 uint16_t size_min = 0;
256 const uint16_t size = ntohs (msg->size); 311 uint16_t size = ntohs (msg->size);
257 const uint16_t type = ntohs (msg->type); 312 uint16_t type = ntohs (msg->type);
258 313
259 LOG (GNUNET_ERROR_TYPE_DEBUG, 314 LOG (GNUNET_ERROR_TYPE_DEBUG,
260 "Received message of type %d from PSYC service\n", type); 315 "Received message of type %d from PSYC service\n", type);
@@ -265,6 +320,9 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
265 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: 320 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
266 size_eq = sizeof (struct CountersResult); 321 size_eq = sizeof (struct CountersResult);
267 break; 322 break;
323 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
324 size_eq = sizeof (struct TransmitAck);
325 break;
268 } 326 }
269 327
270 if (! ((0 < size_eq && size == size_eq) 328 if (! ((0 < size_eq && size == size_eq)
@@ -276,6 +334,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
276 } 334 }
277 335
278 struct CountersResult *cres; 336 struct CountersResult *cres;
337 struct TransmitAck *tack;
279 338
280 switch (type) 339 switch (type)
281 { 340 {
@@ -294,17 +353,39 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
294 mst->join_ack_cb (ch->cb_cls, mst->max_message_id); 353 mst->join_ack_cb (ch->cb_cls, mst->max_message_id);
295#endif 354#endif
296 break; 355 break;
356
357 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
358 tack = (struct TransmitAck *) msg;
359 if (ch->is_master)
360 {
361 GNUNET_assert (NULL != mst->tmit);
362 if (GNUNET_PSYC_DATA_CONT != mst->tmit->status
363 || NULL == mst->tmit->notify)
364 {
365 GNUNET_free (mst->tmit);
366 mst->tmit = NULL;
367 }
368 else
369 {
370 ch->tmit_buf_avail = ntohs (tack->buf_avail);
371 master_transmit_data (mst);
372 }
373 }
374 else
375 {
376 /* TODO: slave */
377 }
378 break;
297 } 379 }
298 380
299 GNUNET_CLIENT_receive (ch->client, &message_handler, ch, 381 GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
300 GNUNET_TIME_UNIT_FOREVER_REL); 382 GNUNET_TIME_UNIT_FOREVER_REL);
301} 383}
302 384
303
304/** 385/**
305 * Transmit next message to service. 386 * Transmit next message to service.
306 * 387 *
307 * @param cls The 'struct GNUNET_PSYCSTORE_Handle'. 388 * @param cls The 'struct GNUNET_PSYC_Channel'.
308 * @param size Number of bytes available in buf. 389 * @param size Number of bytes available in buf.
309 * @param buf Where to copy the message. 390 * @param buf Where to copy the message.
310 * @return Number of bytes copied to buf. 391 * @return Number of bytes copied to buf.
@@ -326,7 +407,7 @@ send_next_message (void *cls, size_t size, void *buf)
326 return 0; 407 return 0;
327 } 408 }
328 LOG (GNUNET_ERROR_TYPE_DEBUG, 409 LOG (GNUNET_ERROR_TYPE_DEBUG,
329 "Sending message of type %d to PSYCstore service\n", 410 "Sending message of type %d to PSYC service\n",
330 ntohs (op->msg->type)); 411 ntohs (op->msg->type));
331 memcpy (buf, op->msg, ret); 412 memcpy (buf, op->msg, ret);
332 413
@@ -349,7 +430,7 @@ send_next_message (void *cls, size_t size, void *buf)
349/** 430/**
350 * Schedule transmission of the next message from our queue. 431 * Schedule transmission of the next message from our queue.
351 * 432 *
352 * @param h PSYCstore handle. 433 * @param h PSYC handle.
353 */ 434 */
354static void 435static void
355transmit_next (struct GNUNET_PSYC_Channel *ch) 436transmit_next (struct GNUNET_PSYC_Channel *ch)
@@ -391,14 +472,12 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
391 if (NULL == ch->transmit_head || 472 if (NULL == ch->transmit_head ||
392 ch->transmit_head->msg->type != ch->reconnect_msg->type) 473 ch->transmit_head->msg->type != ch->reconnect_msg->type)
393 { 474 {
394 struct OperationHandle *op 475 uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
395 = GNUNET_malloc (sizeof (struct OperationHandle) 476 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size);
396 + ntohs (ch->reconnect_msg->size)); 477 memcpy (&op[1], ch->reconnect_msg, reconn_size);
397 memcpy (&op[1], ch->reconnect_msg, ntohs (ch->reconnect_msg->size));
398 op->msg = (struct GNUNET_MessageHeader *) &op[1]; 478 op->msg = (struct GNUNET_MessageHeader *) &op[1];
399 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); 479 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
400 } 480 }
401
402 transmit_next (ch); 481 transmit_next (ch);
403} 482}
404 483
@@ -414,7 +493,12 @@ disconnect (void *c)
414{ 493{
415 struct GNUNET_PSYC_Channel *ch = c; 494 struct GNUNET_PSYC_Channel *ch = c;
416 GNUNET_assert (NULL != ch); 495 GNUNET_assert (NULL != ch);
417 GNUNET_assert (ch->transmit_head == ch->transmit_tail); 496 if (ch->transmit_head != ch->transmit_tail)
497 {
498 LOG (GNUNET_ERROR_TYPE_ERROR,
499 "Disconnecting while there are still outstanding messages!\n");
500 GNUNET_break (0);
501 }
418 if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK) 502 if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
419 { 503 {
420 GNUNET_SCHEDULER_cancel (ch->reconnect_task); 504 GNUNET_SCHEDULER_cancel (ch->reconnect_task);
@@ -431,7 +515,10 @@ disconnect (void *c)
431 ch->client = NULL; 515 ch->client = NULL;
432 } 516 }
433 if (NULL != ch->reconnect_msg) 517 if (NULL != ch->reconnect_msg)
518 {
519 GNUNET_free (ch->reconnect_msg);
434 ch->reconnect_msg = NULL; 520 ch->reconnect_msg = NULL;
521 }
435} 522}
436 523
437 524
@@ -475,12 +562,13 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
475 struct GNUNET_PSYC_Channel *ch = &mst->ch; 562 struct GNUNET_PSYC_Channel *ch = &mst->ch;
476 struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); 563 struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
477 564
478 req->header.size = htons (sizeof (*req) + sizeof (*channel_key)); 565 req->header.size = htons (sizeof (*req));
479 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); 566 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
480 req->channel_key = *channel_key; 567 req->channel_key = *channel_key;
481 req->policy = policy; 568 req->policy = policy;
482 569
483 ch->cfg = cfg; 570 ch->cfg = cfg;
571 ch->is_master = GNUNET_YES;
484 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; 572 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
485 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 573 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
486 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); 574 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
@@ -532,7 +620,7 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst)
532void 620void
533GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, 621GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
534 int is_admitted, 622 int is_admitted,
535 unsigned int relay_count, 623 uint32_t relay_count,
536 const struct GNUNET_PeerIdentity *relays, 624 const struct GNUNET_PeerIdentity *relays,
537 const char *method_name, 625 const char *method_name,
538 const struct GNUNET_ENV_Environment *env, 626 const struct GNUNET_ENV_Environment *env,
@@ -556,13 +644,13 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
556 pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; 644 pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
557 op->msg = (struct GNUNET_MessageHeader *) pmod; 645 op->msg = (struct GNUNET_MessageHeader *) pmod;
558 646
559 pmod->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER; 647 pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
560 pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size); 648 pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size);
561 pmod->name_size = htons (name_size); 649 pmod->name_size = htons (name_size);
562 memcpy (&pmod[1], mod->name, name_size); 650 memcpy (&pmod[1], mod->name, name_size);
563 memcpy ((void *) &pmod[1] + name_size, mod->value, mod->value_size); 651 memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
564 652
565 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); 653 GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
566 return GNUNET_YES; 654 return GNUNET_YES;
567} 655}
568 656
@@ -594,29 +682,41 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst,
594 return NULL; 682 return NULL;
595 ch->in_transmit = GNUNET_YES; 683 ch->in_transmit = GNUNET_YES;
596 684
685 size_t size = strlen (method_name) + 1;
597 struct GNUNET_PSYC_MessageMethod *pmeth; 686 struct GNUNET_PSYC_MessageMethod *pmeth;
598 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth)); 687 struct OperationHandle *op
688 = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size);
599 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1]; 689 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1];
600 op->msg = (struct GNUNET_MessageHeader *) pmeth; 690 op->msg = (struct GNUNET_MessageHeader *) pmeth;
601 691
602 pmeth->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD; 692 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
603 size_t size = strlen (method_name) + 1;
604 pmeth->header.size = htons (sizeof (*pmeth) + size); 693 pmeth->header.size = htons (sizeof (*pmeth) + size);
605 pmeth->flags = htonl (flags); 694 pmeth->flags = htonl (flags);
606 pmeth->mod_count 695 pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
607 = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
608 memcpy (&pmeth[1], method_name, size); 696 memcpy (&pmeth[1], method_name, size);
609 697
610 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); 698 GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
611
612 GNUNET_ENV_environment_iterate (env, send_modifier, mst); 699 GNUNET_ENV_environment_iterate (env, send_modifier, mst);
700 transmit_next (ch);
701
702 mst->tmit = GNUNET_malloc (sizeof (*mst->tmit));
703 mst->tmit->master = mst;
704 mst->tmit->notify = notify;
705 mst->tmit->notify_cls = notify_cls;
706 mst->tmit->status = GNUNET_PSYC_DATA_CONT;
707 return mst->tmit;
708}
709
613 710
614 struct GNUNET_PSYC_MasterTransmitHandle *th = GNUNET_malloc (sizeof (*th)); 711/**
615 th->master = mst; 712 * Resume transmission to the channel.
616 th->env = env; 713 *
617 th->notify = notify; 714 * @param th Handle of the request that is being resumed.
618 th->notify_cls = notify_cls; 715 */
619 return th; 716void
717GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
718{
719 master_transmit_data (th->master);
620} 720}
621 721
622 722
@@ -671,7 +771,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
671 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, 771 const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
672 const struct GNUNET_CRYPTO_EccPrivateKey *slave_key, 772 const struct GNUNET_CRYPTO_EccPrivateKey *slave_key,
673 const struct GNUNET_PeerIdentity *origin, 773 const struct GNUNET_PeerIdentity *origin,
674 size_t relay_count, 774 uint32_t relay_count,
675 const struct GNUNET_PeerIdentity *relays, 775 const struct GNUNET_PeerIdentity *relays,
676 GNUNET_PSYC_Method method, 776 GNUNET_PSYC_Method method,
677 GNUNET_PSYC_JoinCallback join_cb, 777 GNUNET_PSYC_JoinCallback join_cb,
@@ -680,7 +780,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
680 const char *method_name, 780 const char *method_name,
681 const struct GNUNET_ENV_Environment *env, 781 const struct GNUNET_ENV_Environment *env,
682 const void *data, 782 const void *data,
683 size_t data_size) 783 uint16_t data_size)
684{ 784{
685 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); 785 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
686 struct GNUNET_PSYC_Channel *ch = &slv->ch; 786 struct GNUNET_PSYC_Channel *ch = &slv->ch;
@@ -692,10 +792,12 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
692 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); 792 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
693 req->channel_key = *channel_key; 793 req->channel_key = *channel_key;
694 req->slave_key = *slave_key; 794 req->slave_key = *slave_key;
795 req->origin = *origin;
695 req->relay_count = relay_count; 796 req->relay_count = relay_count;
696 memcpy (&req[1], relays, relay_count * sizeof (*relays)); 797 memcpy (&req[1], relays, relay_count * sizeof (*relays));
697 798
698 ch->cfg = cfg; 799 ch->cfg = cfg;
800 ch->is_master = GNUNET_NO;
699 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; 801 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
700 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 802 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
701 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); 803 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
@@ -746,6 +848,18 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
746 848
747 849
748/** 850/**
851 * Resume transmission to the master.
852 *
853 * @param th Handle of the request that is being resumed.
854 */
855void
856GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
857{
858
859}
860
861
862/**
749 * Abort transmission request to master. 863 * Abort transmission request to master.
750 * 864 *
751 * @param th Handle of the request that is being aborted. 865 * @param th Handle of the request that is being aborted.
@@ -822,7 +936,7 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *ch,
822 slvadd->announced_at = GNUNET_htonll (announced_at); 936 slvadd->announced_at = GNUNET_htonll (announced_at);
823 slvadd->effective_since = GNUNET_htonll (effective_since); 937 slvadd->effective_since = GNUNET_htonll (effective_since);
824 938
825 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); 939 GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
826 transmit_next (ch); 940 transmit_next (ch);
827} 941}
828 942
@@ -863,7 +977,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *ch,
863 slvrm->header.size = htons (sizeof (*slvrm)); 977 slvrm->header.size = htons (sizeof (*slvrm));
864 slvrm->announced_at = GNUNET_htonll (announced_at); 978 slvrm->announced_at = GNUNET_htonll (announced_at);
865 979
866 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); 980 GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
867 transmit_next (ch); 981 transmit_next (ch);
868} 982}
869 983
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index b5bc6d135..1d7035a87 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -19,8 +19,8 @@
19 */ 19 */
20 20
21/** 21/**
22 * @file psycstore/test_psycstore.c 22 * @file psyc/test_psyc.c
23 * @brief Test for the PSYCstore service. 23 * @brief Test for the PSYC service.
24 * @author Gabor X Toth 24 * @author Gabor X Toth
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 */ 26 */
@@ -30,6 +30,7 @@
30#include "gnunet_common.h" 30#include "gnunet_common.h"
31#include "gnunet_util_lib.h" 31#include "gnunet_util_lib.h"
32#include "gnunet_testing_lib.h" 32#include "gnunet_testing_lib.h"
33#include "gnunet_env_lib.h"
33#include "gnunet_psyc_service.h" 34#include "gnunet_psyc_service.h"
34 35
35#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) 36#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
@@ -59,6 +60,8 @@ static struct GNUNET_CRYPTO_EccPrivateKey *slave_key;
59static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key; 60static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key;
60static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key; 61static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key;
61 62
63struct GNUNET_PSYC_MasterTransmitHandle *mth;
64
62/** 65/**
63 * Clean up all resources used. 66 * Clean up all resources used.
64 */ 67 */
@@ -120,11 +123,14 @@ end ()
120 123
121static int 124static int
122method (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, 125method (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
123 uint64_t message_id, const char *method_name, 126 uint64_t message_id, const char *name,
124 size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers, 127 size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers,
125 uint64_t data_offset, const void *data, size_t data_size, 128 uint64_t data_offset, const void *data, size_t data_size,
126 enum GNUNET_PSYC_MessageFlags flags) 129 enum GNUNET_PSYC_MessageFlags flags)
127{ 130{
131 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
132 "Method: %s, modifiers: %lu, flags: %u\n%.*s\n",
133 name, modifier_count, flags, data_size, data);
128 return GNUNET_OK; 134 return GNUNET_OK;
129} 135}
130 136
@@ -138,11 +144,72 @@ join (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
138 return GNUNET_OK; 144 return GNUNET_OK;
139} 145}
140 146
147struct TransmitClosure
148{
149 struct GNUNET_PSYC_MasterTransmitHandle *handle;
150 uint8_t n;
151 uint8_t fragment_count;
152 char *fragments[16];
153 uint16_t fragment_sizes[16];
154};
155
156
157static void
158transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
159{
160 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Transmit resume\n");
161 struct TransmitClosure *tmit = cls;
162 GNUNET_PSYC_master_transmit_resume (tmit->handle);
163}
164
165
166static int
167transmit_notify (void *cls, size_t *data_size, void *data)
168{
169 struct TransmitClosure *tmit = cls;
170 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
171 "Transmit notify: %lu bytes\n", *data_size);
172
173 if (tmit->fragment_count <= tmit->n)
174 return GNUNET_YES;
175
176 GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size);
177
178 *data_size = tmit->fragment_sizes[tmit->n];
179 memcpy (data, tmit->fragments[tmit->n], *data_size);
180 tmit->n++;
181
182 if (tmit->n == tmit->fragment_count - 1)
183 {
184 /* Send last fragment later. */
185 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &transmit_resume,
186 tmit);
187 *data_size = 0;
188 return GNUNET_NO;
189 }
190 return tmit->n <= tmit->fragment_count ? GNUNET_NO : GNUNET_YES;
191}
141 192
142void 193void
143master_started (void *cls, uint64_t max_message_id) 194master_started (void *cls, uint64_t max_message_id)
144{ 195{
145 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", max_message_id); 196 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", max_message_id);
197
198 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
199 GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
200 "_foo", "bar baz", 7);
201 GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
202 "_foo_bar", "foo bar baz", 11);
203
204 struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure);
205 tmit->fragment_count = 2;
206 tmit->fragments[0] = "foo bar";
207 tmit->fragment_sizes[0] = 7;
208 tmit->fragments[1] = "baz!";
209 tmit->fragment_sizes[1] = 4;
210 tmit->handle
211 = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit,
212 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
146} 213}
147 214
148 215
@@ -157,7 +224,7 @@ slave_joined (void *cls, uint64_t max_message_id)
157 * Main function of the test, run from scheduler. 224 * Main function of the test, run from scheduler.
158 * 225 *
159 * @param cls NULL 226 * @param cls NULL
160 * @param cfg configuration we use (also to connect to PSYCstore service) 227 * @param cfg configuration we use (also to connect to PSYC service)
161 * @param peer handle to access more of the peer (not used) 228 * @param peer handle to access more of the peer (not used)
162 */ 229 */
163static void 230static void
@@ -182,9 +249,18 @@ run (void *cls,
182 mst = GNUNET_PSYC_master_start (cfg, channel_key, 249 mst = GNUNET_PSYC_master_start (cfg, channel_key,
183 GNUNET_PSYC_CHANNEL_PRIVATE, 250 GNUNET_PSYC_CHANNEL_PRIVATE,
184 &method, &join, &master_started, NULL); 251 &method, &join, &master_started, NULL);
185 252 return;
186 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, 253 struct GNUNET_PeerIdentity origin;
187 &method, &join, &slave_joined, NULL); 254 struct GNUNET_PeerIdentity relays[16];
255 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
256 GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
257 "_foo", "bar baz", 7);
258 GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
259 "_foo_bar", "foo bar baz", 11);
260 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
261 16, relays, &method, &join, &slave_joined,
262 NULL, "_request_join", env, "some data", 9);
263 GNUNET_ENV_environment_destroy (env);
188} 264}
189 265
190 266