aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/psyc_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-05-29 16:35:55 +0000
committerGabor X Toth <*@tg-x.net>2014-05-29 16:35:55 +0000
commit9955561e1b204ccf23fbf841f409bd3ef79be88c (patch)
tree0271c23ae9f1dad72266a0e6073d696e5afca027 /src/psyc/psyc_api.c
parenta5877668ba805c5e0efe622e6ce4c58ff5609bf9 (diff)
downloadgnunet-9955561e1b204ccf23fbf841f409bd3ef79be88c.tar.gz
gnunet-9955561e1b204ccf23fbf841f409bd3ef79be88c.zip
psyc, multicast: reorg code, use new client manager & psyc util lib
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r--src/psyc/psyc_api.c1295
1 files changed, 221 insertions, 1074 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 7ec9d21b7..bfb6f43fb 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -37,29 +37,11 @@
37#include "gnunet_env_lib.h" 37#include "gnunet_env_lib.h"
38#include "gnunet_multicast_service.h" 38#include "gnunet_multicast_service.h"
39#include "gnunet_psyc_service.h" 39#include "gnunet_psyc_service.h"
40#include "gnunet_psyc_util_lib.h"
40#include "psyc.h" 41#include "psyc.h"
41 42
42#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) 43#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
43 44
44struct MessageQueue
45{
46 struct MessageQueue *prev;
47 struct MessageQueue *next;
48 /* Followed by struct GNUNET_MessageHeader msg */
49};
50
51
52/**
53 * Handle for a pending PSYC transmission operation.
54 */
55struct GNUNET_PSYC_ChannelTransmitHandle
56{
57 struct GNUNET_PSYC_Channel *ch;
58 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
59 GNUNET_PSYC_TransmitNotifyData notify_data;
60 void *notify_cls;
61 enum MessageState state;
62};
63 45
64/** 46/**
65 * Handle to access PSYC channel operations for both the master and slaves. 47 * Handle to access PSYC channel operations for both the master and slaves.
@@ -67,109 +49,29 @@ struct GNUNET_PSYC_ChannelTransmitHandle
67struct GNUNET_PSYC_Channel 49struct GNUNET_PSYC_Channel
68{ 50{
69 /** 51 /**
70 * Transmission handle;
71 */
72 struct GNUNET_PSYC_ChannelTransmitHandle tmit;
73
74 /**
75 * Configuration to use. 52 * Configuration to use.
76 */ 53 */
77 const struct GNUNET_CONFIGURATION_Handle *cfg; 54 const struct GNUNET_CONFIGURATION_Handle *cfg;
78 55
79 /** 56 /**
80 * Socket (if available). 57 * Client connection to the service.
81 */
82 struct GNUNET_CLIENT_Connection *client;
83
84 /**
85 * Currently pending transmission request, or NULL for none.
86 */ 58 */
87 struct GNUNET_CLIENT_TransmitHandle *th; 59 struct GNUNET_CLIENT_MANAGER_Connection *client;
88 60
89 /** 61 /**
90 * Head of messages to transmit to the service. 62 * Transmission handle;
91 */
92 struct MessageQueue *tmit_head;
93
94 /**
95 * Tail of operations to transmit to the service.
96 */ 63 */
97 struct MessageQueue *tmit_tail; 64 struct GNUNET_PSYC_TransmitHandle *tmit;
98 65
99 /** 66 /**
100 * Message currently being transmitted to the service. 67 * Receipt handle;
101 */ 68 */
102 struct MessageQueue *tmit_msg; 69 struct GNUNET_PSYC_ReceiveHandle *recv;
103 70
104 /** 71 /**
105 * Message to send on reconnect. 72 * Message to send on reconnect.
106 */ 73 */
107 struct GNUNET_MessageHeader *reconnect_msg; 74 struct GNUNET_MessageHeader *connect_msg;
108
109 /**
110 * Task doing exponential back-off trying to reconnect.
111 */
112 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
113
114 /**
115 * Time for next connect retry.
116 */
117 struct GNUNET_TIME_Relative reconnect_delay;
118
119 /**
120 * Message part callback.
121 */
122 GNUNET_PSYC_MessageCallback message_cb;
123
124 /**
125 * Message part callback for historic message.
126 */
127 GNUNET_PSYC_MessageCallback hist_message_cb;
128
129 /**
130 * Closure for @a message_cb.
131 */
132 void *cb_cls;
133
134 /**
135 * ID of the message being received from the PSYC service.
136 */
137 uint64_t recv_message_id;
138
139 /**
140 * Public key of the slave from which a message is being received.
141 */
142 struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
143
144 /**
145 * State of the currently being received message from the PSYC service.
146 */
147 enum MessageState recv_state;
148
149 /**
150 * Flags for the currently being received message from the PSYC service.
151 */
152 enum GNUNET_PSYC_MessageFlags recv_flags;
153
154 /**
155 * Expected value size for the modifier being received from the PSYC service.
156 */
157 uint32_t recv_mod_value_size_expected;
158
159 /**
160 * Actual value size for the modifier being received from the PSYC service.
161 */
162 uint32_t recv_mod_value_size;
163
164 /**
165 * Is transmission paused?
166 */
167 uint8_t tmit_paused;
168
169 /**
170 * Are we still waiting for a PSYC_TRANSMIT_ACK?
171 */
172 uint8_t tmit_ack_pending;
173 75
174 /** 76 /**
175 * Are we polling for incoming messages right now? 77 * Are we polling for incoming messages right now?
@@ -177,11 +79,6 @@ struct GNUNET_PSYC_Channel
177 uint8_t in_receive; 79 uint8_t in_receive;
178 80
179 /** 81 /**
180 * Are we currently transmitting a message?
181 */
182 uint8_t in_transmit;
183
184 /**
185 * Is this a master or slave channel? 82 * Is this a master or slave channel?
186 */ 83 */
187 uint8_t is_master; 84 uint8_t is_master;
@@ -193,7 +90,7 @@ struct GNUNET_PSYC_Channel
193 */ 90 */
194struct GNUNET_PSYC_Master 91struct GNUNET_PSYC_Master
195{ 92{
196 struct GNUNET_PSYC_Channel ch; 93 struct GNUNET_PSYC_Channel chn;
197 94
198 GNUNET_PSYC_MasterStartCallback start_cb; 95 GNUNET_PSYC_MasterStartCallback start_cb;
199 96
@@ -201,6 +98,11 @@ struct GNUNET_PSYC_Master
201 * Join request callback. 98 * Join request callback.
202 */ 99 */
203 GNUNET_PSYC_JoinRequestCallback join_req_cb; 100 GNUNET_PSYC_JoinRequestCallback join_req_cb;
101
102 /**
103 * Closure for the callbacks.
104 */
105 void *cb_cls;
204}; 106};
205 107
206 108
@@ -209,11 +111,16 @@ struct GNUNET_PSYC_Master
209 */ 111 */
210struct GNUNET_PSYC_Slave 112struct GNUNET_PSYC_Slave
211{ 113{
212 struct GNUNET_PSYC_Channel ch; 114 struct GNUNET_PSYC_Channel chn;
213 115
214 GNUNET_PSYC_SlaveConnectCallback connect_cb; 116 GNUNET_PSYC_SlaveConnectCallback connect_cb;
215 117
216 GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb; 118 GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb;
119
120 /**
121 * Closure for the callbacks.
122 */
123 void *cb_cls;
217}; 124};
218 125
219 126
@@ -258,934 +165,170 @@ struct GNUNET_PSYC_StateQuery
258 165
259 166
260static void 167static void
261reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); 168channel_send_connect_msg (struct GNUNET_PSYC_Channel *chn)
262
263
264static void
265channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
266
267
268/**
269 * Reschedule a connect attempt to the service.
270 *
271 * @param ch Channel to reconnect.
272 */
273static void
274reschedule_connect (struct GNUNET_PSYC_Channel *ch)
275{ 169{
276 GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK); 170 uint16_t cmsg_size = ntohs (chn->connect_msg->size);
277 171 struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
278 if (NULL != ch->th) 172 memcpy (cmsg, chn->connect_msg, cmsg_size);
279 { 173 GNUNET_CLIENT_MANAGER_transmit_now (chn->client, cmsg);
280 GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
281 ch->th = NULL;
282 }
283 if (NULL != ch->client)
284 {
285 GNUNET_CLIENT_disconnect (ch->client);
286 ch->client = NULL;
287 }
288 ch->in_receive = GNUNET_NO;
289 LOG (GNUNET_ERROR_TYPE_DEBUG,
290 "Scheduling task to reconnect to PSYC service in %s.\n",
291 GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, GNUNET_YES));
292 ch->reconnect_task =
293 GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch);
294 ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay);
295} 174}
296 175
297 176
298/**
299 * Schedule transmission of the next message from our queue.
300 *
301 * @param ch PSYC channel handle
302 */
303static void
304transmit_next (struct GNUNET_PSYC_Channel *ch);
305
306
307/**
308 * Reset stored data related to the last received message.
309 */
310static void 177static void
311recv_reset (struct GNUNET_PSYC_Channel *ch) 178channel_recv_disconnect (void *cls,
179 struct GNUNET_CLIENT_MANAGER_Connection *client,
180 const struct GNUNET_MessageHeader *msg)
312{ 181{
313 ch->recv_state = MSG_STATE_START; 182 struct GNUNET_PSYC_Channel *
314 ch->recv_flags = 0; 183 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
315 ch->recv_message_id = 0; 184 GNUNET_CLIENT_MANAGER_reconnect (client);
316 //FIXME: ch->recv_slave_key = { 0 }; 185 channel_send_connect_msg (chn);
317 ch->recv_mod_value_size = 0;
318 ch->recv_mod_value_size_expected = 0;
319} 186}
320 187
321 188
322static void 189static void
323recv_error (struct GNUNET_PSYC_Channel *ch) 190channel_recv_message (void *cls,
191 struct GNUNET_CLIENT_MANAGER_Connection *client,
192 const struct GNUNET_MessageHeader *msg)
324{ 193{
325 GNUNET_PSYC_MessageCallback message_cb 194 struct GNUNET_PSYC_Channel *
326 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC 195 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
327 ? ch->hist_message_cb 196 GNUNET_PSYC_receive_message (chn->recv,
328 : ch->message_cb; 197 (const struct GNUNET_PSYC_MessageHeader *) msg);
329
330 if (NULL != message_cb)
331 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
332
333 recv_reset (ch);
334} 198}
335 199
336 200
337/**
338 * Queue a message part for transmission to the PSYC service.
339 *
340 * The message part is added to the current message buffer.
341 * When this buffer is full, it is added to the transmission queue.
342 *
343 * @param ch Channel struct for the client.
344 * @param msg Modifier message part, or NULL when there's no more modifiers.
345 * @param end End of message.
346 */
347static void 201static void
348queue_message (struct GNUNET_PSYC_Channel *ch, 202channel_recv_message_ack (void *cls,
349 const struct GNUNET_MessageHeader *msg, 203 struct GNUNET_CLIENT_MANAGER_Connection *client,
350 uint8_t end) 204 const struct GNUNET_MessageHeader *msg)
351{ 205{
352 uint16_t size = msg ? ntohs (msg->size) : 0; 206 struct GNUNET_PSYC_Channel *
353 207 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
354 LOG (GNUNET_ERROR_TYPE_DEBUG, 208 GNUNET_PSYC_transmit_got_ack (chn->tmit);
355 "Queueing message of type %u and size %u (end: %u)).\n",
356 ntohs (msg->type), size, end);
357
358 struct MessageQueue *mq = ch->tmit_msg;
359 struct GNUNET_MessageHeader *qmsg = NULL;
360 if (NULL != mq)
361 {
362 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
363 if (NULL == msg
364 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size)
365 {
366 /* End of message or buffer is full, add it to transmission queue
367 * and start with empty buffer */
368 qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
369 qmsg->size = htons (qmsg->size);
370 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
371 ch->tmit_msg = mq = NULL;
372 ch->tmit_ack_pending++;
373 }
374 else
375 {
376 /* Message fits in current buffer, append */
377 ch->tmit_msg
378 = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size);
379 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
380 memcpy ((char *) qmsg + qmsg->size, msg, size);
381 qmsg->size += size;
382 }
383 }
384
385 if (NULL == mq && NULL != msg)
386 {
387 /* Empty buffer, copy over message. */
388 ch->tmit_msg
389 = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size);
390 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
391 qmsg->size = sizeof (*qmsg) + size;
392 memcpy (&qmsg[1], msg, size);
393 }
394
395 if (NULL != mq
396 && (GNUNET_YES == end
397 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
398 < qmsg->size + sizeof (struct GNUNET_MessageHeader))))
399 {
400 /* End of message or buffer is full, add it to transmission queue. */
401 qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
402 qmsg->size = htons (qmsg->size);
403 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
404 ch->tmit_msg = mq = NULL;
405 ch->tmit_ack_pending++;
406 }
407
408 if (GNUNET_YES == end)
409 ch->in_transmit = GNUNET_NO;
410
411 transmit_next (ch);
412} 209}
413 210
414 211
415/**
416 * Request a modifier from a client to transmit.
417 *
418 * @param mst Master handle.
419 */
420static void 212static void
421channel_transmit_mod (struct GNUNET_PSYC_Channel *ch) 213master_recv_start_ack (void *cls,
214 struct GNUNET_CLIENT_MANAGER_Connection *client,
215 const struct GNUNET_MessageHeader *msg)
422{ 216{
423 uint16_t max_data_size, data_size; 217 struct GNUNET_PSYC_Master *
424 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; 218 mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
425 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; 219 sizeof (struct GNUNET_PSYC_Channel));
426 int notify_ret;
427
428 switch (ch->tmit.state)
429 {
430 case MSG_STATE_MODIFIER:
431 {
432 struct GNUNET_PSYC_MessageModifier *mod
433 = (struct GNUNET_PSYC_MessageModifier *) msg;
434 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
435 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
436 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
437 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
438 &mod->oper, &mod->value_size);
439 mod->name_size = strnlen ((char *) &mod[1], data_size);
440 if (mod->name_size < data_size)
441 {
442 mod->value_size = htonl (mod->value_size);
443 mod->name_size = htons (mod->name_size);
444 }
445 else if (0 < data_size)
446 {
447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
448 notify_ret = GNUNET_SYSERR;
449 }
450 break;
451 }
452 case MSG_STATE_MOD_CONT:
453 {
454 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
455 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
456 msg->size = sizeof (struct GNUNET_MessageHeader);
457 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
458 &data_size, &msg[1], NULL, NULL);
459 break;
460 }
461 default:
462 GNUNET_assert (0);
463 }
464
465 switch (notify_ret)
466 {
467 case GNUNET_NO:
468 if (0 == data_size)
469 { /* Transmission paused, nothing to send. */
470 ch->tmit_paused = GNUNET_YES;
471 return;
472 }
473 ch->tmit.state = MSG_STATE_MOD_CONT;
474 break;
475
476 case GNUNET_YES:
477 if (0 == data_size)
478 {
479 /* End of modifiers. */
480 ch->tmit.state = MSG_STATE_DATA;
481 if (0 == ch->tmit_ack_pending)
482 channel_transmit_data (ch);
483
484 return;
485 }
486 ch->tmit.state = MSG_STATE_MODIFIER;
487 break;
488
489 default:
490 LOG (GNUNET_ERROR_TYPE_ERROR,
491 "MasterTransmitNotifyModifier returned error "
492 "when requesting a modifier.\n");
493
494 ch->tmit.state = MSG_STATE_CANCEL;
495 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
496 msg->size = htons (sizeof (*msg));
497
498 queue_message (ch, msg, GNUNET_YES);
499 return;
500 }
501
502 if (0 < data_size)
503 {
504 GNUNET_assert (data_size <= max_data_size);
505 msg->size = htons (msg->size + data_size);
506 queue_message (ch, msg, GNUNET_NO);
507 }
508
509 channel_transmit_mod (ch);
510}
511 220
512 221 struct CountersResult *cres = (struct CountersResult *) msg;
513/** 222 if (NULL != mst->start_cb)
514 * Request data from a client to transmit. 223 mst->start_cb (mst->cb_cls, GNUNET_ntohll (cres->max_message_id));
515 *
516 * @param mst Master handle.
517 */
518static void
519channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
520{
521 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
522 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
523 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
524
525 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
526
527 int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
528 &data_size, &msg[1]);
529 switch (notify_ret)
530 {
531 case GNUNET_NO:
532 if (0 == data_size)
533 {
534 /* Transmission paused, nothing to send. */
535 ch->tmit_paused = GNUNET_YES;
536 return;
537 }
538 break;
539
540 case GNUNET_YES:
541 ch->tmit.state = MSG_STATE_END;
542 break;
543
544 default:
545 LOG (GNUNET_ERROR_TYPE_ERROR,
546 "MasterTransmitNotify returned error when requesting data.\n");
547
548 ch->tmit.state = MSG_STATE_CANCEL;
549 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
550 msg->size = htons (sizeof (*msg));
551 queue_message (ch, msg, GNUNET_YES);
552 return;
553 }
554
555 if (0 < data_size)
556 {
557 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
558 msg->size = htons (sizeof (*msg) + data_size);
559 queue_message (ch, msg, !notify_ret);
560 }
561
562 /* End of message. */
563 if (GNUNET_YES == notify_ret)
564 {
565 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
566 msg->size = htons (sizeof (*msg));
567 queue_message (ch, msg, GNUNET_YES);
568 }
569} 224}
570 225
571 226
572/**
573 * Send a message to a channel.
574 *
575 * @param ch Handle to the PSYC channel.
576 * @param method_name Which method should be invoked.
577 * @param notify_mod Function to call to obtain modifiers.
578 * @param notify_data Function to call to obtain fragments of the data.
579 * @param notify_cls Closure for @a notify_mod and @a notify_data.
580 * @param flags Flags for the message being transmitted.
581 *
582 * @return Transmission handle, NULL on error (i.e. more than one request queued).
583 */
584static struct GNUNET_PSYC_ChannelTransmitHandle *
585channel_transmit (struct GNUNET_PSYC_Channel *ch,
586 const char *method_name,
587 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
588 GNUNET_PSYC_TransmitNotifyData notify_data,
589 void *notify_cls,
590 uint32_t flags)
591{
592 if (GNUNET_NO != ch->in_transmit)
593 return NULL;
594 ch->in_transmit = GNUNET_YES;
595
596 size_t size = strlen (method_name) + 1;
597 struct GNUNET_PSYC_MessageMethod *pmeth;
598 struct GNUNET_MessageHeader *qmsg;
599 struct MessageQueue *
600 mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg)
601 + sizeof (*pmeth) + size);
602 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
603 qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size;
604
605 pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1];
606 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
607 pmeth->header.size = htons (sizeof (*pmeth) + size);
608 pmeth->flags = htonl (flags);
609 memcpy (&pmeth[1], method_name, size);
610
611 ch->tmit.ch = ch;
612 ch->tmit.notify_mod = notify_mod;
613 ch->tmit.notify_data = notify_data;
614 ch->tmit.notify_cls = notify_cls;
615 ch->tmit.state = MSG_STATE_MODIFIER;
616
617 channel_transmit_mod (ch);
618 return &ch->tmit;
619}
620
621
622/**
623 * Resume transmission to the channel.
624 *
625 * @param th Handle of the request that is being resumed.
626 */
627static void 227static void
628channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th) 228master_recv_join_request (void *cls,
229 struct GNUNET_CLIENT_MANAGER_Connection *client,
230 const struct GNUNET_MessageHeader *msg)
629{ 231{
630 struct GNUNET_PSYC_Channel *ch = th->ch; 232 struct GNUNET_PSYC_Master *
631 if (0 == ch->tmit_ack_pending) 233 mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
632 { 234 sizeof (struct GNUNET_PSYC_Channel));
633 ch->tmit_paused = GNUNET_NO;
634 channel_transmit_data (ch);
635 }
636}
637 235
236 const struct MasterJoinRequest *req = (const struct MasterJoinRequest *) msg;
638 237
639/** 238 struct GNUNET_PSYC_MessageHeader *pmsg = NULL;
640 * Abort transmission request to channel. 239 if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*pmsg))
641 * 240 pmsg = (struct GNUNET_PSYC_MessageHeader *) &req[1];
642 * @param th Handle of the request that is being aborted.
643 */
644static void
645channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
646{
647 struct GNUNET_PSYC_Channel *ch = th->ch;
648 if (GNUNET_NO == ch->in_transmit)
649 return;
650}
651 241
242 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
243 jh->mst = mst;
244 jh->slave_key = req->slave_key;
652 245
653/** 246 if (NULL != mst->join_req_cb)
654 * Handle incoming message from the PSYC service. 247 mst->join_req_cb (mst->cb_cls, &req->slave_key, pmsg, jh);
655 *
656 * @param ch The channel the message is sent to.
657 * @param pmsg The message.
658 */
659static void
660handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
661 const struct GNUNET_PSYC_MessageHeader *msg)
662{
663 uint16_t size = ntohs (msg->header.size);
664 uint32_t flags = ntohl (msg->flags);
665
666 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
667 (struct GNUNET_MessageHeader *) msg);
668
669 if (MSG_STATE_START == ch->recv_state)
670 {
671 ch->recv_message_id = GNUNET_ntohll (msg->message_id);
672 ch->recv_flags = flags;
673 ch->recv_slave_key = msg->slave_key;
674 ch->recv_mod_value_size = 0;
675 ch->recv_mod_value_size_expected = 0;
676 }
677 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
678 {
679 // FIXME
680 LOG (GNUNET_ERROR_TYPE_WARNING,
681 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
682 GNUNET_ntohll (msg->message_id), ch->recv_message_id);
683 GNUNET_break_op (0);
684 recv_error (ch);
685 return;
686 }
687 else if (flags != ch->recv_flags)
688 {
689 LOG (GNUNET_ERROR_TYPE_WARNING,
690 "Unexpected message flags. Got: %lu, expected: %lu\n",
691 flags, ch->recv_flags);
692 GNUNET_break_op (0);
693 recv_error (ch);
694 return;
695 }
696
697 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
698
699 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
700 {
701 const struct GNUNET_MessageHeader *pmsg
702 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
703 psize = ntohs (pmsg->size);
704 ptype = ntohs (pmsg->type);
705 size_eq = size_min = 0;
706
707 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
708 {
709 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
710 "Dropping message of type %u with invalid size %u.\n",
711 ptype, psize);
712 recv_error (ch);
713 return;
714 }
715
716 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
717 "Received message part from PSYC.\n");
718 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
719
720 switch (ptype)
721 {
722 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
723 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
724 break;
725 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
726 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
727 break;
728 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
729 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
730 size_min = sizeof (struct GNUNET_MessageHeader);
731 break;
732 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
733 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
734 size_eq = sizeof (struct GNUNET_MessageHeader);
735 break;
736 default:
737 GNUNET_break_op (0);
738 recv_error (ch);
739 return;
740 }
741
742 if (! ((0 < size_eq && psize == size_eq)
743 || (0 < size_min && size_min <= psize)))
744 {
745 GNUNET_break_op (0);
746 recv_error (ch);
747 return;
748 }
749
750 switch (ptype)
751 {
752 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
753 {
754 struct GNUNET_PSYC_MessageMethod *meth
755 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
756
757 if (MSG_STATE_START != ch->recv_state)
758 {
759 LOG (GNUNET_ERROR_TYPE_WARNING,
760 "Dropping out of order message method (%u).\n",
761 ch->recv_state);
762 /* It is normal to receive an incomplete message right after connecting,
763 * but should not happen later.
764 * FIXME: add a check for this condition.
765 */
766 GNUNET_break_op (0);
767 recv_error (ch);
768 return;
769 }
770
771 if ('\0' != *((char *) meth + psize - 1))
772 {
773 LOG (GNUNET_ERROR_TYPE_WARNING,
774 "Dropping message with malformed method. "
775 "Message ID: %" PRIu64 "\n", ch->recv_message_id);
776 GNUNET_break_op (0);
777 recv_error (ch);
778 return;
779 }
780 ch->recv_state = MSG_STATE_METHOD;
781 break;
782 }
783 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
784 {
785 if (!(MSG_STATE_METHOD == ch->recv_state
786 || MSG_STATE_MODIFIER == ch->recv_state
787 || MSG_STATE_MOD_CONT == ch->recv_state))
788 {
789 LOG (GNUNET_ERROR_TYPE_WARNING,
790 "Dropping out of order message modifier (%u).\n",
791 ch->recv_state);
792 GNUNET_break_op (0);
793 recv_error (ch);
794 return;
795 }
796
797 struct GNUNET_PSYC_MessageModifier *mod
798 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
799
800 uint16_t name_size = ntohs (mod->name_size);
801 ch->recv_mod_value_size_expected = ntohl (mod->value_size);
802 ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
803
804 if (psize < sizeof (*mod) + name_size + 1
805 || '\0' != *((char *) &mod[1] + name_size)
806 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
807 {
808 LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
809 GNUNET_break_op (0);
810 recv_error (ch);
811 return;
812 }
813 ch->recv_state = MSG_STATE_MODIFIER;
814 break;
815 }
816 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
817 {
818 ch->recv_mod_value_size += psize - sizeof (*pmsg);
819
820 if (!(MSG_STATE_MODIFIER == ch->recv_state
821 || MSG_STATE_MOD_CONT == ch->recv_state)
822 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
823 {
824 LOG (GNUNET_ERROR_TYPE_WARNING,
825 "Dropping out of order message modifier continuation "
826 "!(%u == %u || %u == %u) || %lu < %lu.\n",
827 MSG_STATE_MODIFIER, ch->recv_state,
828 MSG_STATE_MOD_CONT, ch->recv_state,
829 ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
830 GNUNET_break_op (0);
831 recv_error (ch);
832 return;
833 }
834 break;
835 }
836 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
837 {
838 if (ch->recv_state < MSG_STATE_METHOD
839 || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
840 {
841 LOG (GNUNET_ERROR_TYPE_WARNING,
842 "Dropping out of order message data fragment "
843 "(%u < %u || %lu != %lu).\n",
844 ch->recv_state, MSG_STATE_METHOD,
845 ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
846
847 GNUNET_break_op (0);
848 recv_error (ch);
849 return;
850 }
851 ch->recv_state = MSG_STATE_DATA;
852 break;
853 }
854 }
855
856 GNUNET_PSYC_MessageCallback message_cb
857 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
858 ? ch->hist_message_cb
859 : ch->message_cb;
860
861 if (NULL != message_cb)
862 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg);
863
864 switch (ptype)
865 {
866 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
867 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
868 recv_reset (ch);
869 break;
870 }
871 }
872} 248}
873 249
874 250
875/**
876 * Handle incoming message acknowledgement from the PSYC service.
877 *
878 * @param ch The channel the acknowledgement is sent to.
879 */
880static void 251static void
881handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch) 252slave_recv_join_ack (void *cls,
253 struct GNUNET_CLIENT_MANAGER_Connection *client,
254 const struct GNUNET_MessageHeader *msg)
882{ 255{
883 if (0 == ch->tmit_ack_pending) 256 struct GNUNET_PSYC_Slave *
884 { 257 slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
885 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); 258 sizeof (struct GNUNET_PSYC_Channel));
886 GNUNET_break (0); 259 struct CountersResult *cres = (struct CountersResult *) msg;
887 return; 260 if (NULL != slv->connect_cb)
888 } 261 slv->connect_cb (slv->cb_cls, GNUNET_ntohll (cres->max_message_id));
889 ch->tmit_ack_pending--;
890
891 switch (ch->tmit.state)
892 {
893 case MSG_STATE_MODIFIER:
894 case MSG_STATE_MOD_CONT:
895 if (GNUNET_NO == ch->tmit_paused)
896 channel_transmit_mod (ch);
897 break;
898
899 case MSG_STATE_DATA:
900 if (GNUNET_NO == ch->tmit_paused)
901 channel_transmit_data (ch);
902 break;
903
904 case MSG_STATE_END:
905 case MSG_STATE_CANCEL:
906 break;
907
908 default:
909 LOG (GNUNET_ERROR_TYPE_DEBUG,
910 "Ignoring message ACK in state %u.\n", ch->tmit.state);
911 }
912} 262}
913 263
914 264
915static void 265static void
916handle_psyc_join_request (struct GNUNET_PSYC_Master *mst, 266slave_recv_join_decision (void *cls,
917 const struct MasterJoinRequest *req) 267 struct GNUNET_CLIENT_MANAGER_Connection *client,
268 const struct GNUNET_MessageHeader *msg)
918{ 269{
919 struct GNUNET_PSYC_MessageHeader *msg = NULL; 270 struct GNUNET_PSYC_Slave *
920 if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*msg)) 271 slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
921 msg = (struct GNUNET_PSYC_MessageHeader *) &req[1]; 272 sizeof (struct GNUNET_PSYC_Channel));
273 const struct SlaveJoinDecision *
274 dcsn = (const struct SlaveJoinDecision *) msg;
922 275
923 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); 276 struct GNUNET_PSYC_MessageHeader *pmsg = NULL;
924 jh->mst = mst; 277 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg))
925 jh->slave_key = req->slave_key; 278 pmsg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1];
926 279
927 if (NULL != mst->join_req_cb) 280 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
928 mst->join_req_cb (mst->ch.cb_cls, &req->slave_key, msg, jh); 281 if (NULL != slv->join_dcsn_cb)
282 slv->join_dcsn_cb (slv->cb_cls, ntohl (dcsn->is_admitted), pmsg);
929} 283}
930 284
931 285
932static void 286static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] =
933handle_psyc_join_decision (struct GNUNET_PSYC_Slave *slv,
934 const struct SlaveJoinDecision *dcsn)
935{ 287{
936 struct GNUNET_PSYC_MessageHeader *msg = NULL; 288 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
937 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*msg))
938 msg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1];
939 289
940 struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); 290 { &channel_recv_message, NULL,
941 if (NULL != slv->join_dcsn_cb) 291 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
942 slv->join_dcsn_cb (slv->ch.cb_cls, ntohl (dcsn->is_admitted), msg); 292 sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES },
943}
944 293
294 { &channel_recv_message_ack, NULL,
295 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
296 sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
945 297
946/** 298 { &master_recv_start_ack, NULL,
947 * Type of a function to call when we receive a message 299 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK,
948 * from the service. 300 sizeof (struct CountersResult), GNUNET_NO },
949 *
950 * @param cls closure
951 * @param msg message received, NULL on timeout or fatal error
952 */
953static void
954message_handler (void *cls,
955 const struct GNUNET_MessageHeader *msg)
956{
957 struct GNUNET_PSYC_Channel *ch = cls;
958 struct GNUNET_PSYC_Master *mst = cls;
959 struct GNUNET_PSYC_Slave *slv = cls;
960
961 if (NULL == msg)
962 {
963 // timeout / disconnected from service, reconnect
964 reschedule_connect (ch);
965 return;
966 }
967 uint16_t size_eq = 0;
968 uint16_t size_min = 0;
969 uint16_t size = ntohs (msg->size);
970 uint16_t type = ntohs (msg->type);
971
972 LOG (GNUNET_ERROR_TYPE_DEBUG,
973 "Received message of type %d and size %u from PSYC service\n",
974 type, size);
975
976 switch (type)
977 {
978 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
979 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
980 size_eq = sizeof (struct CountersResult);
981 break;
982 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
983 size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
984 break;
985 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
986 size_eq = sizeof (struct GNUNET_MessageHeader);
987 break;
988 case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
989 size_min = sizeof (struct MasterJoinRequest);
990 break;
991 case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION:
992 size_min = sizeof (struct SlaveJoinDecision);
993 break;
994 default:
995 GNUNET_break_op (0);
996 return;
997 }
998
999 if (! ((0 < size_eq && size == size_eq)
1000 || (0 < size_min && size_min <= size)))
1001 {
1002 GNUNET_break_op (0);
1003 return;
1004 }
1005
1006 switch (type)
1007 {
1008 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
1009 {
1010 struct CountersResult *cres = (struct CountersResult *) msg;
1011 if (NULL != mst->start_cb)
1012 mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
1013 break;
1014 }
1015 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
1016 {
1017 struct CountersResult *cres = (struct CountersResult *) msg;
1018 if (NULL != slv->connect_cb)
1019 slv->connect_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
1020 break;
1021 }
1022 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
1023 {
1024 handle_psyc_message_ack (ch);
1025 break;
1026 }
1027
1028 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
1029 handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
1030 break;
1031
1032 case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
1033 handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch,
1034 (const struct MasterJoinRequest *) msg);
1035 break;
1036
1037 case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION:
1038 handle_psyc_join_decision ((struct GNUNET_PSYC_Slave *) ch,
1039 (const struct SlaveJoinDecision *) msg);
1040 break;
1041 }
1042
1043 if (NULL != ch->client)
1044 {
1045 GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
1046 GNUNET_TIME_UNIT_FOREVER_REL);
1047 }
1048}
1049 301
302 { &master_recv_join_request, NULL,
303 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
304 sizeof (struct MasterJoinRequest), GNUNET_YES },
1050 305
1051/** 306 { NULL, NULL, 0, 0, GNUNET_NO }
1052 * Transmit next message to service. 307};
1053 *
1054 * @param cls The struct GNUNET_PSYC_Channel.
1055 * @param size Number of bytes available in @a buf.
1056 * @param buf Where to copy the message.
1057 *
1058 * @return Number of bytes copied to @a buf.
1059 */
1060static size_t
1061send_next_message (void *cls, size_t size, void *buf)
1062{
1063 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
1064 struct GNUNET_PSYC_Channel *ch = cls;
1065 struct MessageQueue *mq = ch->tmit_head;
1066 if (NULL == mq)
1067 return 0;
1068 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
1069 size_t ret = ntohs (qmsg->size);
1070 ch->th = NULL;
1071 if (ret > size)
1072 {
1073 reschedule_connect (ch);
1074 return 0;
1075 }
1076 memcpy (buf, qmsg, ret);
1077
1078 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq);
1079 GNUNET_free (mq);
1080
1081 if (NULL != ch->tmit_head)
1082 transmit_next (ch);
1083
1084 if (GNUNET_NO == ch->in_receive)
1085 {
1086 ch->in_receive = GNUNET_YES;
1087 GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
1088 GNUNET_TIME_UNIT_FOREVER_REL);
1089 }
1090 return ret;
1091}
1092 308
1093 309
1094/** 310static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] =
1095 * Schedule transmission of the next message from our queue.
1096 *
1097 * @param ch PSYC handle.
1098 */
1099static void
1100transmit_next (struct GNUNET_PSYC_Channel *ch)
1101{ 311{
1102 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); 312 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
1103 if (NULL != ch->th || NULL == ch->client)
1104 return;
1105
1106 struct MessageQueue *mq = ch->tmit_head;
1107 if (NULL == mq)
1108 return;
1109 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
1110
1111 ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client,
1112 ntohs (qmsg->size),
1113 GNUNET_TIME_UNIT_FOREVER_REL,
1114 GNUNET_NO,
1115 &send_next_message,
1116 ch);
1117}
1118 313
314 { &channel_recv_message, NULL,
315 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
316 sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES },
1119 317
1120/** 318 { &channel_recv_message_ack, NULL,
1121 * Try again to connect to the PSYC service. 319 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
1122 * 320 sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
1123 * @param cls Channel handle.
1124 * @param tc Scheduler context.
1125 */
1126static void
1127reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1128{
1129 struct GNUNET_PSYC_Channel *ch = cls;
1130
1131 recv_reset (ch);
1132 ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1133 LOG (GNUNET_ERROR_TYPE_DEBUG,
1134 "Connecting to PSYC service.\n");
1135 GNUNET_assert (NULL == ch->client);
1136 ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
1137 GNUNET_assert (NULL != ch->client);
1138 uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
1139
1140 if (NULL == ch->tmit_head ||
1141 0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size))
1142 {
1143 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
1144 memcpy (&mq[1], ch->reconnect_msg, reconn_size);
1145 GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq);
1146 }
1147 transmit_next (ch);
1148}
1149 321
322 { &slave_recv_join_ack, NULL,
323 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK,
324 sizeof (struct CountersResult), GNUNET_NO },
1150 325
1151/** 326 { &slave_recv_join_decision, NULL,
1152 * Disconnect from the PSYC service. 327 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
1153 * 328 sizeof (struct SlaveJoinDecision), GNUNET_YES },
1154 * @param c Channel handle to disconnect. 329
1155 */ 330 { NULL, NULL, 0, 0, GNUNET_NO }
1156static void 331};
1157disconnect (void *c)
1158{
1159 struct GNUNET_PSYC_Channel *ch = c;
1160
1161 GNUNET_assert (NULL != ch);
1162 if (ch->tmit_head != ch->tmit_tail)
1163 {
1164 LOG (GNUNET_ERROR_TYPE_ERROR,
1165 "Disconnecting while there are still outstanding messages!\n");
1166 GNUNET_break (0);
1167 }
1168 if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1169 {
1170 GNUNET_SCHEDULER_cancel (ch->reconnect_task);
1171 ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1172 }
1173 if (NULL != ch->th)
1174 {
1175 GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
1176 ch->th = NULL;
1177 }
1178 if (NULL != ch->client)
1179 {
1180 GNUNET_CLIENT_disconnect (ch->client);
1181 ch->client = NULL;
1182 }
1183 if (NULL != ch->reconnect_msg)
1184 {
1185 GNUNET_free (ch->reconnect_msg);
1186 ch->reconnect_msg = NULL;
1187 }
1188}
1189 332
1190 333
1191/** 334/**
@@ -1227,24 +370,29 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
1227 void *cls) 370 void *cls)
1228{ 371{
1229 struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); 372 struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst));
1230 struct GNUNET_PSYC_Channel *ch = &mst->ch; 373 struct GNUNET_PSYC_Channel *chn = &mst->chn;
1231 struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
1232 374
375 struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
1233 req->header.size = htons (sizeof (*req)); 376 req->header.size = htons (sizeof (*req));
1234 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); 377 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
1235 req->channel_key = *channel_key; 378 req->channel_key = *channel_key;
1236 req->policy = policy; 379 req->policy = policy;
1237 380
381 chn->connect_msg = (struct GNUNET_MessageHeader *) req;
382 chn->cfg = cfg;
383 chn->is_master = GNUNET_YES;
384
1238 mst->start_cb = start_cb; 385 mst->start_cb = start_cb;
1239 mst->join_req_cb = join_request_cb; 386 mst->join_req_cb = join_request_cb;
1240 ch->message_cb = message_cb; 387 mst->cb_cls = cls;
1241 ch->cb_cls = cls; 388
1242 ch->cfg = cfg; 389 chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", master_handlers);
1243 ch->is_master = GNUNET_YES; 390 GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn));
1244 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; 391
1245 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 392 chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
1246 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); 393 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls);
1247 394
395 channel_send_connect_msg (chn);
1248 return mst; 396 return mst;
1249} 397}
1250 398
@@ -1253,12 +401,13 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
1253 * Stop a PSYC master channel. 401 * Stop a PSYC master channel.
1254 * 402 *
1255 * @param master PSYC channel master to stop. 403 * @param master PSYC channel master to stop.
404 * @param keep_active FIXME
1256 */ 405 */
1257void 406void
1258GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) 407GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst)
1259{ 408{
1260 disconnect (master); 409 GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES);
1261 GNUNET_free (master); 410 GNUNET_free (mst);
1262} 411}
1263 412
1264 413
@@ -1292,7 +441,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1292 const struct GNUNET_PeerIdentity *relays, 441 const struct GNUNET_PeerIdentity *relays,
1293 const struct GNUNET_PSYC_MessageHeader *join_resp) 442 const struct GNUNET_PSYC_MessageHeader *join_resp)
1294{ 443{
1295 struct GNUNET_PSYC_Channel *ch = &jh->mst->ch; 444 struct GNUNET_PSYC_Channel *chn = &jh->mst->chn;
1296 struct MasterJoinDecision *dcsn; 445 struct MasterJoinDecision *dcsn;
1297 uint16_t join_resp_size 446 uint16_t join_resp_size
1298 = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0; 447 = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0;
@@ -1302,9 +451,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1302 < sizeof (*dcsn) + relay_size + join_resp_size) 451 < sizeof (*dcsn) + relay_size + join_resp_size)
1303 return GNUNET_SYSERR; 452 return GNUNET_SYSERR;
1304 453
1305 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn) 454 dcsn = GNUNET_malloc (sizeof (*dcsn) + relay_size + join_resp_size);
1306 + relay_size + join_resp_size);
1307 dcsn = (struct MasterJoinDecision *) &mq[1];
1308 dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); 455 dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
1309 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); 456 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
1310 dcsn->is_admitted = htonl (is_admitted); 457 dcsn->is_admitted = htonl (is_admitted);
@@ -1313,8 +460,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1313 if (0 < join_resp_size) 460 if (0 < join_resp_size)
1314 memcpy (&dcsn[1], join_resp, join_resp_size); 461 memcpy (&dcsn[1], join_resp, join_resp_size);
1315 462
1316 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); 463 GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header);
1317 transmit_next (ch);
1318 return GNUNET_OK; 464 return GNUNET_OK;
1319} 465}
1320 466
@@ -1332,40 +478,59 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1332 * @return Transmission handle, NULL on error (i.e. more than one request queued). 478 * @return Transmission handle, NULL on error (i.e. more than one request queued).
1333 */ 479 */
1334struct GNUNET_PSYC_MasterTransmitHandle * 480struct GNUNET_PSYC_MasterTransmitHandle *
1335GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, 481GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst,
1336 const char *method_name, 482 const char *method_name,
1337 GNUNET_PSYC_TransmitNotifyModifier notify_mod, 483 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1338 GNUNET_PSYC_TransmitNotifyData notify_data, 484 GNUNET_PSYC_TransmitNotifyData notify_data,
1339 void *notify_cls, 485 void *notify_cls,
1340 enum GNUNET_PSYC_MasterTransmitFlags flags) 486 enum GNUNET_PSYC_MasterTransmitFlags flags)
1341{ 487{
1342 return (struct GNUNET_PSYC_MasterTransmitHandle *) 488 if (GNUNET_OK
1343 channel_transmit (&master->ch, method_name, notify_mod, notify_data, 489 == GNUNET_PSYC_transmit_message (mst->chn.tmit, method_name, NULL,
1344 notify_cls, flags); 490 notify_mod, notify_data, notify_cls,
491 flags))
492 return (struct GNUNET_PSYC_MasterTransmitHandle *) mst->chn.tmit;
493 else
494 return NULL;
1345} 495}
1346 496
1347 497
1348/** 498/**
1349 * Resume transmission to the channel. 499 * Resume transmission to the channel.
1350 * 500 *
1351 * @param th Handle of the request that is being resumed. 501 * @param tmit Handle of the request that is being resumed.
1352 */ 502 */
1353void 503void
1354GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 504GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *tmit)
1355{ 505{
1356 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); 506 GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit);
1357} 507}
1358 508
1359 509
1360/** 510/**
1361 * Abort transmission request to the channel. 511 * Abort transmission request to the channel.
1362 * 512 *
1363 * @param th Handle of the request that is being aborted. 513 * @param tmit Handle of the request that is being aborted.
1364 */ 514 */
1365void 515void
1366GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) 516GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *tmit)
1367{ 517{
1368 channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); 518 GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit);
519}
520
521
522/**
523 * Convert a channel @a master to a @e channel handle to access the @e channel
524 * APIs.
525 *
526 * @param master Channel master handle.
527 *
528 * @return Channel handle, valid for as long as @a master is valid.
529 */
530struct GNUNET_PSYC_Channel *
531GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
532{
533 return &master->chn;
1369} 534}
1370 535
1371 536
@@ -1420,7 +585,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1420 uint16_t data_size) 585 uint16_t data_size)
1421{ 586{
1422 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); 587 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
1423 struct GNUNET_PSYC_Channel *ch = &slv->ch; 588 struct GNUNET_PSYC_Channel *chn = &slv->chn;
1424 struct SlaveJoinRequest *req 589 struct SlaveJoinRequest *req
1425 = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays)); 590 = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
1426 req->header.size = htons (sizeof (*req) 591 req->header.size = htons (sizeof (*req)
@@ -1432,17 +597,21 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1432 req->relay_count = htonl (relay_count); 597 req->relay_count = htonl (relay_count);
1433 memcpy (&req[1], relays, relay_count * sizeof (*relays)); 598 memcpy (&req[1], relays, relay_count * sizeof (*relays));
1434 599
600 chn->connect_msg = (struct GNUNET_MessageHeader *) req;
601 chn->cfg = cfg;
602 chn->is_master = GNUNET_NO;
603
1435 slv->connect_cb = connect_cb; 604 slv->connect_cb = connect_cb;
1436 slv->join_dcsn_cb = join_decision_cb; 605 slv->join_dcsn_cb = join_decision_cb;
1437 ch->message_cb = message_cb; 606 slv->cb_cls = cls;
1438 ch->cb_cls = cls;
1439 607
1440 ch->cfg = cfg; 608 chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers);
1441 ch->is_master = GNUNET_NO; 609 GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn));
1442 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
1443 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1444 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
1445 610
611 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls);
612 chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
613
614 channel_send_connect_msg (chn);
1446 return slv; 615 return slv;
1447} 616}
1448 617
@@ -1456,10 +625,10 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1456 * @param slave Slave handle. 625 * @param slave Slave handle.
1457 */ 626 */
1458void 627void
1459GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) 628GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv)
1460{ 629{
1461 disconnect (slave); 630 GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES);
1462 GNUNET_free (slave); 631 GNUNET_free (slv);
1463} 632}
1464 633
1465 634
@@ -1477,69 +646,59 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1477 * queued). 646 * queued).
1478 */ 647 */
1479struct GNUNET_PSYC_SlaveTransmitHandle * 648struct GNUNET_PSYC_SlaveTransmitHandle *
1480GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, 649GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slv,
1481 const char *method_name, 650 const char *method_name,
1482 GNUNET_PSYC_TransmitNotifyModifier notify_mod, 651 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1483 GNUNET_PSYC_TransmitNotifyData notify_data, 652 GNUNET_PSYC_TransmitNotifyData notify_data,
1484 void *notify_cls, 653 void *notify_cls,
1485 enum GNUNET_PSYC_SlaveTransmitFlags flags) 654 enum GNUNET_PSYC_SlaveTransmitFlags flags)
655
1486{ 656{
1487 return (struct GNUNET_PSYC_SlaveTransmitHandle *) 657 if (GNUNET_OK
1488 channel_transmit (&slave->ch, method_name, 658 == GNUNET_PSYC_transmit_message (slv->chn.tmit, method_name, NULL,
1489 notify_mod, notify_data, notify_cls, flags); 659 notify_mod, notify_data, notify_cls,
660 flags))
661 return (struct GNUNET_PSYC_SlaveTransmitHandle *) slv->chn.tmit;
662 else
663 return NULL;
1490} 664}
1491 665
1492 666
1493/** 667/**
1494 * Resume transmission to the master. 668 * Resume transmission to the master.
1495 * 669 *
1496 * @param th Handle of the request that is being resumed. 670 * @param tmit Handle of the request that is being resumed.
1497 */ 671 */
1498void 672void
1499GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th) 673GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *tmit)
1500{ 674{
1501 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); 675 GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit);
1502} 676}
1503 677
1504 678
1505/** 679/**
1506 * Abort transmission request to master. 680 * Abort transmission request to master.
1507 * 681 *
1508 * @param th Handle of the request that is being aborted. 682 * @param tmit Handle of the request that is being aborted.
1509 */ 683 */
1510void 684void
1511GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) 685GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *tmit)
1512{ 686{
1513 channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); 687 GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit);
1514}
1515
1516
1517/**
1518 * Convert a channel @a master to a @e channel handle to access the @e channel
1519 * APIs.
1520 *
1521 * @param master Channel master handle.
1522 *
1523 * @return Channel handle, valid for as long as @a master is valid.
1524 */
1525struct GNUNET_PSYC_Channel *
1526GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1527{
1528 return &master->ch;
1529} 688}
1530 689
1531 690
1532/** 691/**
1533 * Convert @a slave to a @e channel handle to access the @e channel APIs. 692 * Convert @a slave to a @e channel handle to access the @e channel APIs.
1534 * 693 *
1535 * @param slave Slave handle. 694 * @param slv Slave handle.
1536 * 695 *
1537 * @return Channel handle, valid for as long as @a slave is valid. 696 * @return Channel handle, valid for as long as @a slave is valid.
1538 */ 697 */
1539struct GNUNET_PSYC_Channel * 698struct GNUNET_PSYC_Channel *
1540GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) 699GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv)
1541{ 700{
1542 return &slave->ch; 701 return &slv->chn;
1543} 702}
1544 703
1545 704
@@ -1565,23 +724,17 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
1565 * @param effective_since Addition of slave is in effect since this message ID. 724 * @param effective_since Addition of slave is in effect since this message ID.
1566 */ 725 */
1567void 726void
1568GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, 727GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1569 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 728 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1570 uint64_t announced_at, 729 uint64_t announced_at,
1571 uint64_t effective_since) 730 uint64_t effective_since)
1572{ 731{
1573 struct ChannelSlaveAdd *slvadd; 732 struct ChannelSlaveAdd *add = GNUNET_malloc (sizeof (*add));
1574 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd)); 733 add->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
1575 734 add->header.size = htons (sizeof (*add));
1576 slvadd = (struct ChannelSlaveAdd *) &mq[1]; 735 add->announced_at = GNUNET_htonll (announced_at);
1577 slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); 736 add->effective_since = GNUNET_htonll (effective_since);
1578 slvadd->header.size = htons (sizeof (*slvadd)); 737 GNUNET_CLIENT_MANAGER_transmit (chn->client, &add->header);
1579 slvadd->announced_at = GNUNET_htonll (announced_at);
1580 slvadd->effective_since = GNUNET_htonll (effective_since);
1581 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1582 channel->tmit_tail,
1583 mq);
1584 transmit_next (channel);
1585} 738}
1586 739
1587 740
@@ -1607,21 +760,15 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
1607 * @param announced_at ID of the message that announced the membership change. 760 * @param announced_at ID of the message that announced the membership change.
1608 */ 761 */
1609void 762void
1610GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, 763GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1611 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, 764 const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
1612 uint64_t announced_at) 765 uint64_t announced_at)
1613{ 766{
1614 struct ChannelSlaveRemove *slvrm; 767 struct ChannelSlaveRemove *rm = GNUNET_malloc (sizeof (*rm));
1615 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm)); 768 rm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
1616 769 rm->header.size = htons (sizeof (*rm));
1617 slvrm = (struct ChannelSlaveRemove *) &mq[1]; 770 rm->announced_at = GNUNET_htonll (announced_at);
1618 slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); 771 GNUNET_CLIENT_MANAGER_transmit (chn->client, &rm->header);
1619 slvrm->header.size = htons (sizeof (*slvrm));
1620 slvrm->announced_at = GNUNET_htonll (announced_at);
1621 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1622 channel->tmit_tail,
1623 mq);
1624 transmit_next (channel);
1625} 772}
1626 773
1627 774