diff options
author | Gabor X Toth <*@tg-x.net> | 2014-05-29 16:35:55 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-05-29 16:35:55 +0000 |
commit | 9955561e1b204ccf23fbf841f409bd3ef79be88c (patch) | |
tree | 0271c23ae9f1dad72266a0e6073d696e5afca027 /src/psyc/psyc_api.c | |
parent | a5877668ba805c5e0efe622e6ce4c58ff5609bf9 (diff) | |
download | gnunet-9955561e1b204ccf23fbf841f409bd3ef79be88c.tar.gz gnunet-9955561e1b204ccf23fbf841f409bd3ef79be88c.zip |
psyc, multicast: reorg code, use new client manager & psyc util lib
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r-- | src/psyc/psyc_api.c | 1295 |
1 files changed, 221 insertions, 1074 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 7ec9d21b7..bfb6f43fb 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -37,29 +37,11 @@ | |||
37 | #include "gnunet_env_lib.h" | 37 | #include "gnunet_env_lib.h" |
38 | #include "gnunet_multicast_service.h" | 38 | #include "gnunet_multicast_service.h" |
39 | #include "gnunet_psyc_service.h" | 39 | #include "gnunet_psyc_service.h" |
40 | #include "gnunet_psyc_util_lib.h" | ||
40 | #include "psyc.h" | 41 | #include "psyc.h" |
41 | 42 | ||
42 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) | 43 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) |
43 | 44 | ||
44 | struct MessageQueue | ||
45 | { | ||
46 | struct MessageQueue *prev; | ||
47 | struct MessageQueue *next; | ||
48 | /* Followed by struct GNUNET_MessageHeader msg */ | ||
49 | }; | ||
50 | |||
51 | |||
52 | /** | ||
53 | * Handle for a pending PSYC transmission operation. | ||
54 | */ | ||
55 | struct GNUNET_PSYC_ChannelTransmitHandle | ||
56 | { | ||
57 | struct GNUNET_PSYC_Channel *ch; | ||
58 | GNUNET_PSYC_TransmitNotifyModifier notify_mod; | ||
59 | GNUNET_PSYC_TransmitNotifyData notify_data; | ||
60 | void *notify_cls; | ||
61 | enum MessageState state; | ||
62 | }; | ||
63 | 45 | ||
64 | /** | 46 | /** |
65 | * Handle to access PSYC channel operations for both the master and slaves. | 47 | * Handle to access PSYC channel operations for both the master and slaves. |
@@ -67,109 +49,29 @@ struct GNUNET_PSYC_ChannelTransmitHandle | |||
67 | struct GNUNET_PSYC_Channel | 49 | struct GNUNET_PSYC_Channel |
68 | { | 50 | { |
69 | /** | 51 | /** |
70 | * Transmission handle; | ||
71 | */ | ||
72 | struct GNUNET_PSYC_ChannelTransmitHandle tmit; | ||
73 | |||
74 | /** | ||
75 | * Configuration to use. | 52 | * Configuration to use. |
76 | */ | 53 | */ |
77 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 54 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
78 | 55 | ||
79 | /** | 56 | /** |
80 | * Socket (if available). | 57 | * Client connection to the service. |
81 | */ | ||
82 | struct GNUNET_CLIENT_Connection *client; | ||
83 | |||
84 | /** | ||
85 | * Currently pending transmission request, or NULL for none. | ||
86 | */ | 58 | */ |
87 | struct GNUNET_CLIENT_TransmitHandle *th; | 59 | struct GNUNET_CLIENT_MANAGER_Connection *client; |
88 | 60 | ||
89 | /** | 61 | /** |
90 | * Head of messages to transmit to the service. | 62 | * Transmission handle; |
91 | */ | ||
92 | struct MessageQueue *tmit_head; | ||
93 | |||
94 | /** | ||
95 | * Tail of operations to transmit to the service. | ||
96 | */ | 63 | */ |
97 | struct MessageQueue *tmit_tail; | 64 | struct GNUNET_PSYC_TransmitHandle *tmit; |
98 | 65 | ||
99 | /** | 66 | /** |
100 | * Message currently being transmitted to the service. | 67 | * Receipt handle; |
101 | */ | 68 | */ |
102 | struct MessageQueue *tmit_msg; | 69 | struct GNUNET_PSYC_ReceiveHandle *recv; |
103 | 70 | ||
104 | /** | 71 | /** |
105 | * Message to send on reconnect. | 72 | * Message to send on reconnect. |
106 | */ | 73 | */ |
107 | struct GNUNET_MessageHeader *reconnect_msg; | 74 | struct GNUNET_MessageHeader *connect_msg; |
108 | |||
109 | /** | ||
110 | * Task doing exponential back-off trying to reconnect. | ||
111 | */ | ||
112 | GNUNET_SCHEDULER_TaskIdentifier reconnect_task; | ||
113 | |||
114 | /** | ||
115 | * Time for next connect retry. | ||
116 | */ | ||
117 | struct GNUNET_TIME_Relative reconnect_delay; | ||
118 | |||
119 | /** | ||
120 | * Message part callback. | ||
121 | */ | ||
122 | GNUNET_PSYC_MessageCallback message_cb; | ||
123 | |||
124 | /** | ||
125 | * Message part callback for historic message. | ||
126 | */ | ||
127 | GNUNET_PSYC_MessageCallback hist_message_cb; | ||
128 | |||
129 | /** | ||
130 | * Closure for @a message_cb. | ||
131 | */ | ||
132 | void *cb_cls; | ||
133 | |||
134 | /** | ||
135 | * ID of the message being received from the PSYC service. | ||
136 | */ | ||
137 | uint64_t recv_message_id; | ||
138 | |||
139 | /** | ||
140 | * Public key of the slave from which a message is being received. | ||
141 | */ | ||
142 | struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key; | ||
143 | |||
144 | /** | ||
145 | * State of the currently being received message from the PSYC service. | ||
146 | */ | ||
147 | enum MessageState recv_state; | ||
148 | |||
149 | /** | ||
150 | * Flags for the currently being received message from the PSYC service. | ||
151 | */ | ||
152 | enum GNUNET_PSYC_MessageFlags recv_flags; | ||
153 | |||
154 | /** | ||
155 | * Expected value size for the modifier being received from the PSYC service. | ||
156 | */ | ||
157 | uint32_t recv_mod_value_size_expected; | ||
158 | |||
159 | /** | ||
160 | * Actual value size for the modifier being received from the PSYC service. | ||
161 | */ | ||
162 | uint32_t recv_mod_value_size; | ||
163 | |||
164 | /** | ||
165 | * Is transmission paused? | ||
166 | */ | ||
167 | uint8_t tmit_paused; | ||
168 | |||
169 | /** | ||
170 | * Are we still waiting for a PSYC_TRANSMIT_ACK? | ||
171 | */ | ||
172 | uint8_t tmit_ack_pending; | ||
173 | 75 | ||
174 | /** | 76 | /** |
175 | * Are we polling for incoming messages right now? | 77 | * Are we polling for incoming messages right now? |
@@ -177,11 +79,6 @@ struct GNUNET_PSYC_Channel | |||
177 | uint8_t in_receive; | 79 | uint8_t in_receive; |
178 | 80 | ||
179 | /** | 81 | /** |
180 | * Are we currently transmitting a message? | ||
181 | */ | ||
182 | uint8_t in_transmit; | ||
183 | |||
184 | /** | ||
185 | * Is this a master or slave channel? | 82 | * Is this a master or slave channel? |
186 | */ | 83 | */ |
187 | uint8_t is_master; | 84 | uint8_t is_master; |
@@ -193,7 +90,7 @@ struct GNUNET_PSYC_Channel | |||
193 | */ | 90 | */ |
194 | struct GNUNET_PSYC_Master | 91 | struct GNUNET_PSYC_Master |
195 | { | 92 | { |
196 | struct GNUNET_PSYC_Channel ch; | 93 | struct GNUNET_PSYC_Channel chn; |
197 | 94 | ||
198 | GNUNET_PSYC_MasterStartCallback start_cb; | 95 | GNUNET_PSYC_MasterStartCallback start_cb; |
199 | 96 | ||
@@ -201,6 +98,11 @@ struct GNUNET_PSYC_Master | |||
201 | * Join request callback. | 98 | * Join request callback. |
202 | */ | 99 | */ |
203 | GNUNET_PSYC_JoinRequestCallback join_req_cb; | 100 | GNUNET_PSYC_JoinRequestCallback join_req_cb; |
101 | |||
102 | /** | ||
103 | * Closure for the callbacks. | ||
104 | */ | ||
105 | void *cb_cls; | ||
204 | }; | 106 | }; |
205 | 107 | ||
206 | 108 | ||
@@ -209,11 +111,16 @@ struct GNUNET_PSYC_Master | |||
209 | */ | 111 | */ |
210 | struct GNUNET_PSYC_Slave | 112 | struct GNUNET_PSYC_Slave |
211 | { | 113 | { |
212 | struct GNUNET_PSYC_Channel ch; | 114 | struct GNUNET_PSYC_Channel chn; |
213 | 115 | ||
214 | GNUNET_PSYC_SlaveConnectCallback connect_cb; | 116 | GNUNET_PSYC_SlaveConnectCallback connect_cb; |
215 | 117 | ||
216 | GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb; | 118 | GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb; |
119 | |||
120 | /** | ||
121 | * Closure for the callbacks. | ||
122 | */ | ||
123 | void *cb_cls; | ||
217 | }; | 124 | }; |
218 | 125 | ||
219 | 126 | ||
@@ -258,934 +165,170 @@ struct GNUNET_PSYC_StateQuery | |||
258 | 165 | ||
259 | 166 | ||
260 | static void | 167 | static void |
261 | reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | 168 | channel_send_connect_msg (struct GNUNET_PSYC_Channel *chn) |
262 | |||
263 | |||
264 | static void | ||
265 | channel_transmit_data (struct GNUNET_PSYC_Channel *ch); | ||
266 | |||
267 | |||
268 | /** | ||
269 | * Reschedule a connect attempt to the service. | ||
270 | * | ||
271 | * @param ch Channel to reconnect. | ||
272 | */ | ||
273 | static void | ||
274 | reschedule_connect (struct GNUNET_PSYC_Channel *ch) | ||
275 | { | 169 | { |
276 | GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK); | 170 | uint16_t cmsg_size = ntohs (chn->connect_msg->size); |
277 | 171 | struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size); | |
278 | if (NULL != ch->th) | 172 | memcpy (cmsg, chn->connect_msg, cmsg_size); |
279 | { | 173 | GNUNET_CLIENT_MANAGER_transmit_now (chn->client, cmsg); |
280 | GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th); | ||
281 | ch->th = NULL; | ||
282 | } | ||
283 | if (NULL != ch->client) | ||
284 | { | ||
285 | GNUNET_CLIENT_disconnect (ch->client); | ||
286 | ch->client = NULL; | ||
287 | } | ||
288 | ch->in_receive = GNUNET_NO; | ||
289 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
290 | "Scheduling task to reconnect to PSYC service in %s.\n", | ||
291 | GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, GNUNET_YES)); | ||
292 | ch->reconnect_task = | ||
293 | GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch); | ||
294 | ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay); | ||
295 | } | 174 | } |
296 | 175 | ||
297 | 176 | ||
298 | /** | ||
299 | * Schedule transmission of the next message from our queue. | ||
300 | * | ||
301 | * @param ch PSYC channel handle | ||
302 | */ | ||
303 | static void | ||
304 | transmit_next (struct GNUNET_PSYC_Channel *ch); | ||
305 | |||
306 | |||
307 | /** | ||
308 | * Reset stored data related to the last received message. | ||
309 | */ | ||
310 | static void | 177 | static void |
311 | recv_reset (struct GNUNET_PSYC_Channel *ch) | 178 | channel_recv_disconnect (void *cls, |
179 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
180 | const struct GNUNET_MessageHeader *msg) | ||
312 | { | 181 | { |
313 | ch->recv_state = MSG_STATE_START; | 182 | struct GNUNET_PSYC_Channel * |
314 | ch->recv_flags = 0; | 183 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); |
315 | ch->recv_message_id = 0; | 184 | GNUNET_CLIENT_MANAGER_reconnect (client); |
316 | //FIXME: ch->recv_slave_key = { 0 }; | 185 | channel_send_connect_msg (chn); |
317 | ch->recv_mod_value_size = 0; | ||
318 | ch->recv_mod_value_size_expected = 0; | ||
319 | } | 186 | } |
320 | 187 | ||
321 | 188 | ||
322 | static void | 189 | static void |
323 | recv_error (struct GNUNET_PSYC_Channel *ch) | 190 | channel_recv_message (void *cls, |
191 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
192 | const struct GNUNET_MessageHeader *msg) | ||
324 | { | 193 | { |
325 | GNUNET_PSYC_MessageCallback message_cb | 194 | struct GNUNET_PSYC_Channel * |
326 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | 195 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); |
327 | ? ch->hist_message_cb | 196 | GNUNET_PSYC_receive_message (chn->recv, |
328 | : ch->message_cb; | 197 | (const struct GNUNET_PSYC_MessageHeader *) msg); |
329 | |||
330 | if (NULL != message_cb) | ||
331 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); | ||
332 | |||
333 | recv_reset (ch); | ||
334 | } | 198 | } |
335 | 199 | ||
336 | 200 | ||
337 | /** | ||
338 | * Queue a message part for transmission to the PSYC service. | ||
339 | * | ||
340 | * The message part is added to the current message buffer. | ||
341 | * When this buffer is full, it is added to the transmission queue. | ||
342 | * | ||
343 | * @param ch Channel struct for the client. | ||
344 | * @param msg Modifier message part, or NULL when there's no more modifiers. | ||
345 | * @param end End of message. | ||
346 | */ | ||
347 | static void | 201 | static void |
348 | queue_message (struct GNUNET_PSYC_Channel *ch, | 202 | channel_recv_message_ack (void *cls, |
349 | const struct GNUNET_MessageHeader *msg, | 203 | struct GNUNET_CLIENT_MANAGER_Connection *client, |
350 | uint8_t end) | 204 | const struct GNUNET_MessageHeader *msg) |
351 | { | 205 | { |
352 | uint16_t size = msg ? ntohs (msg->size) : 0; | 206 | struct GNUNET_PSYC_Channel * |
353 | 207 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | |
354 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 208 | GNUNET_PSYC_transmit_got_ack (chn->tmit); |
355 | "Queueing message of type %u and size %u (end: %u)).\n", | ||
356 | ntohs (msg->type), size, end); | ||
357 | |||
358 | struct MessageQueue *mq = ch->tmit_msg; | ||
359 | struct GNUNET_MessageHeader *qmsg = NULL; | ||
360 | if (NULL != mq) | ||
361 | { | ||
362 | qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
363 | if (NULL == msg | ||
364 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size) | ||
365 | { | ||
366 | /* End of message or buffer is full, add it to transmission queue | ||
367 | * and start with empty buffer */ | ||
368 | qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
369 | qmsg->size = htons (qmsg->size); | ||
370 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); | ||
371 | ch->tmit_msg = mq = NULL; | ||
372 | ch->tmit_ack_pending++; | ||
373 | } | ||
374 | else | ||
375 | { | ||
376 | /* Message fits in current buffer, append */ | ||
377 | ch->tmit_msg | ||
378 | = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size); | ||
379 | qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
380 | memcpy ((char *) qmsg + qmsg->size, msg, size); | ||
381 | qmsg->size += size; | ||
382 | } | ||
383 | } | ||
384 | |||
385 | if (NULL == mq && NULL != msg) | ||
386 | { | ||
387 | /* Empty buffer, copy over message. */ | ||
388 | ch->tmit_msg | ||
389 | = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size); | ||
390 | qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
391 | qmsg->size = sizeof (*qmsg) + size; | ||
392 | memcpy (&qmsg[1], msg, size); | ||
393 | } | ||
394 | |||
395 | if (NULL != mq | ||
396 | && (GNUNET_YES == end | ||
397 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD | ||
398 | < qmsg->size + sizeof (struct GNUNET_MessageHeader)))) | ||
399 | { | ||
400 | /* End of message or buffer is full, add it to transmission queue. */ | ||
401 | qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
402 | qmsg->size = htons (qmsg->size); | ||
403 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); | ||
404 | ch->tmit_msg = mq = NULL; | ||
405 | ch->tmit_ack_pending++; | ||
406 | } | ||
407 | |||
408 | if (GNUNET_YES == end) | ||
409 | ch->in_transmit = GNUNET_NO; | ||
410 | |||
411 | transmit_next (ch); | ||
412 | } | 209 | } |
413 | 210 | ||
414 | 211 | ||
415 | /** | ||
416 | * Request a modifier from a client to transmit. | ||
417 | * | ||
418 | * @param mst Master handle. | ||
419 | */ | ||
420 | static void | 212 | static void |
421 | channel_transmit_mod (struct GNUNET_PSYC_Channel *ch) | 213 | master_recv_start_ack (void *cls, |
214 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
215 | const struct GNUNET_MessageHeader *msg) | ||
422 | { | 216 | { |
423 | uint16_t max_data_size, data_size; | 217 | struct GNUNET_PSYC_Master * |
424 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | 218 | mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, |
425 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; | 219 | sizeof (struct GNUNET_PSYC_Channel)); |
426 | int notify_ret; | ||
427 | |||
428 | switch (ch->tmit.state) | ||
429 | { | ||
430 | case MSG_STATE_MODIFIER: | ||
431 | { | ||
432 | struct GNUNET_PSYC_MessageModifier *mod | ||
433 | = (struct GNUNET_PSYC_MessageModifier *) msg; | ||
434 | max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; | ||
435 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | ||
436 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
437 | notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1], | ||
438 | &mod->oper, &mod->value_size); | ||
439 | mod->name_size = strnlen ((char *) &mod[1], data_size); | ||
440 | if (mod->name_size < data_size) | ||
441 | { | ||
442 | mod->value_size = htonl (mod->value_size); | ||
443 | mod->name_size = htons (mod->name_size); | ||
444 | } | ||
445 | else if (0 < data_size) | ||
446 | { | ||
447 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n"); | ||
448 | notify_ret = GNUNET_SYSERR; | ||
449 | } | ||
450 | break; | ||
451 | } | ||
452 | case MSG_STATE_MOD_CONT: | ||
453 | { | ||
454 | max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; | ||
455 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | ||
456 | msg->size = sizeof (struct GNUNET_MessageHeader); | ||
457 | notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, | ||
458 | &data_size, &msg[1], NULL, NULL); | ||
459 | break; | ||
460 | } | ||
461 | default: | ||
462 | GNUNET_assert (0); | ||
463 | } | ||
464 | |||
465 | switch (notify_ret) | ||
466 | { | ||
467 | case GNUNET_NO: | ||
468 | if (0 == data_size) | ||
469 | { /* Transmission paused, nothing to send. */ | ||
470 | ch->tmit_paused = GNUNET_YES; | ||
471 | return; | ||
472 | } | ||
473 | ch->tmit.state = MSG_STATE_MOD_CONT; | ||
474 | break; | ||
475 | |||
476 | case GNUNET_YES: | ||
477 | if (0 == data_size) | ||
478 | { | ||
479 | /* End of modifiers. */ | ||
480 | ch->tmit.state = MSG_STATE_DATA; | ||
481 | if (0 == ch->tmit_ack_pending) | ||
482 | channel_transmit_data (ch); | ||
483 | |||
484 | return; | ||
485 | } | ||
486 | ch->tmit.state = MSG_STATE_MODIFIER; | ||
487 | break; | ||
488 | |||
489 | default: | ||
490 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
491 | "MasterTransmitNotifyModifier returned error " | ||
492 | "when requesting a modifier.\n"); | ||
493 | |||
494 | ch->tmit.state = MSG_STATE_CANCEL; | ||
495 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
496 | msg->size = htons (sizeof (*msg)); | ||
497 | |||
498 | queue_message (ch, msg, GNUNET_YES); | ||
499 | return; | ||
500 | } | ||
501 | |||
502 | if (0 < data_size) | ||
503 | { | ||
504 | GNUNET_assert (data_size <= max_data_size); | ||
505 | msg->size = htons (msg->size + data_size); | ||
506 | queue_message (ch, msg, GNUNET_NO); | ||
507 | } | ||
508 | |||
509 | channel_transmit_mod (ch); | ||
510 | } | ||
511 | 220 | ||
512 | 221 | struct CountersResult *cres = (struct CountersResult *) msg; | |
513 | /** | 222 | if (NULL != mst->start_cb) |
514 | * Request data from a client to transmit. | 223 | mst->start_cb (mst->cb_cls, GNUNET_ntohll (cres->max_message_id)); |
515 | * | ||
516 | * @param mst Master handle. | ||
517 | */ | ||
518 | static void | ||
519 | channel_transmit_data (struct GNUNET_PSYC_Channel *ch) | ||
520 | { | ||
521 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | ||
522 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; | ||
523 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; | ||
524 | |||
525 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | ||
526 | |||
527 | int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls, | ||
528 | &data_size, &msg[1]); | ||
529 | switch (notify_ret) | ||
530 | { | ||
531 | case GNUNET_NO: | ||
532 | if (0 == data_size) | ||
533 | { | ||
534 | /* Transmission paused, nothing to send. */ | ||
535 | ch->tmit_paused = GNUNET_YES; | ||
536 | return; | ||
537 | } | ||
538 | break; | ||
539 | |||
540 | case GNUNET_YES: | ||
541 | ch->tmit.state = MSG_STATE_END; | ||
542 | break; | ||
543 | |||
544 | default: | ||
545 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
546 | "MasterTransmitNotify returned error when requesting data.\n"); | ||
547 | |||
548 | ch->tmit.state = MSG_STATE_CANCEL; | ||
549 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
550 | msg->size = htons (sizeof (*msg)); | ||
551 | queue_message (ch, msg, GNUNET_YES); | ||
552 | return; | ||
553 | } | ||
554 | |||
555 | if (0 < data_size) | ||
556 | { | ||
557 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); | ||
558 | msg->size = htons (sizeof (*msg) + data_size); | ||
559 | queue_message (ch, msg, !notify_ret); | ||
560 | } | ||
561 | |||
562 | /* End of message. */ | ||
563 | if (GNUNET_YES == notify_ret) | ||
564 | { | ||
565 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | ||
566 | msg->size = htons (sizeof (*msg)); | ||
567 | queue_message (ch, msg, GNUNET_YES); | ||
568 | } | ||
569 | } | 224 | } |
570 | 225 | ||
571 | 226 | ||
572 | /** | ||
573 | * Send a message to a channel. | ||
574 | * | ||
575 | * @param ch Handle to the PSYC channel. | ||
576 | * @param method_name Which method should be invoked. | ||
577 | * @param notify_mod Function to call to obtain modifiers. | ||
578 | * @param notify_data Function to call to obtain fragments of the data. | ||
579 | * @param notify_cls Closure for @a notify_mod and @a notify_data. | ||
580 | * @param flags Flags for the message being transmitted. | ||
581 | * | ||
582 | * @return Transmission handle, NULL on error (i.e. more than one request queued). | ||
583 | */ | ||
584 | static struct GNUNET_PSYC_ChannelTransmitHandle * | ||
585 | channel_transmit (struct GNUNET_PSYC_Channel *ch, | ||
586 | const char *method_name, | ||
587 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, | ||
588 | GNUNET_PSYC_TransmitNotifyData notify_data, | ||
589 | void *notify_cls, | ||
590 | uint32_t flags) | ||
591 | { | ||
592 | if (GNUNET_NO != ch->in_transmit) | ||
593 | return NULL; | ||
594 | ch->in_transmit = GNUNET_YES; | ||
595 | |||
596 | size_t size = strlen (method_name) + 1; | ||
597 | struct GNUNET_PSYC_MessageMethod *pmeth; | ||
598 | struct GNUNET_MessageHeader *qmsg; | ||
599 | struct MessageQueue * | ||
600 | mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) | ||
601 | + sizeof (*pmeth) + size); | ||
602 | qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
603 | qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size; | ||
604 | |||
605 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1]; | ||
606 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | ||
607 | pmeth->header.size = htons (sizeof (*pmeth) + size); | ||
608 | pmeth->flags = htonl (flags); | ||
609 | memcpy (&pmeth[1], method_name, size); | ||
610 | |||
611 | ch->tmit.ch = ch; | ||
612 | ch->tmit.notify_mod = notify_mod; | ||
613 | ch->tmit.notify_data = notify_data; | ||
614 | ch->tmit.notify_cls = notify_cls; | ||
615 | ch->tmit.state = MSG_STATE_MODIFIER; | ||
616 | |||
617 | channel_transmit_mod (ch); | ||
618 | return &ch->tmit; | ||
619 | } | ||
620 | |||
621 | |||
622 | /** | ||
623 | * Resume transmission to the channel. | ||
624 | * | ||
625 | * @param th Handle of the request that is being resumed. | ||
626 | */ | ||
627 | static void | 227 | static void |
628 | channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th) | 228 | master_recv_join_request (void *cls, |
229 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
230 | const struct GNUNET_MessageHeader *msg) | ||
629 | { | 231 | { |
630 | struct GNUNET_PSYC_Channel *ch = th->ch; | 232 | struct GNUNET_PSYC_Master * |
631 | if (0 == ch->tmit_ack_pending) | 233 | mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, |
632 | { | 234 | sizeof (struct GNUNET_PSYC_Channel)); |
633 | ch->tmit_paused = GNUNET_NO; | ||
634 | channel_transmit_data (ch); | ||
635 | } | ||
636 | } | ||
637 | 235 | ||
236 | const struct MasterJoinRequest *req = (const struct MasterJoinRequest *) msg; | ||
638 | 237 | ||
639 | /** | 238 | struct GNUNET_PSYC_MessageHeader *pmsg = NULL; |
640 | * Abort transmission request to channel. | 239 | if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*pmsg)) |
641 | * | 240 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &req[1]; |
642 | * @param th Handle of the request that is being aborted. | ||
643 | */ | ||
644 | static void | ||
645 | channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th) | ||
646 | { | ||
647 | struct GNUNET_PSYC_Channel *ch = th->ch; | ||
648 | if (GNUNET_NO == ch->in_transmit) | ||
649 | return; | ||
650 | } | ||
651 | 241 | ||
242 | struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); | ||
243 | jh->mst = mst; | ||
244 | jh->slave_key = req->slave_key; | ||
652 | 245 | ||
653 | /** | 246 | if (NULL != mst->join_req_cb) |
654 | * Handle incoming message from the PSYC service. | 247 | mst->join_req_cb (mst->cb_cls, &req->slave_key, pmsg, jh); |
655 | * | ||
656 | * @param ch The channel the message is sent to. | ||
657 | * @param pmsg The message. | ||
658 | */ | ||
659 | static void | ||
660 | handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | ||
661 | const struct GNUNET_PSYC_MessageHeader *msg) | ||
662 | { | ||
663 | uint16_t size = ntohs (msg->header.size); | ||
664 | uint32_t flags = ntohl (msg->flags); | ||
665 | |||
666 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, | ||
667 | (struct GNUNET_MessageHeader *) msg); | ||
668 | |||
669 | if (MSG_STATE_START == ch->recv_state) | ||
670 | { | ||
671 | ch->recv_message_id = GNUNET_ntohll (msg->message_id); | ||
672 | ch->recv_flags = flags; | ||
673 | ch->recv_slave_key = msg->slave_key; | ||
674 | ch->recv_mod_value_size = 0; | ||
675 | ch->recv_mod_value_size_expected = 0; | ||
676 | } | ||
677 | else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) | ||
678 | { | ||
679 | // FIXME | ||
680 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
681 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", | ||
682 | GNUNET_ntohll (msg->message_id), ch->recv_message_id); | ||
683 | GNUNET_break_op (0); | ||
684 | recv_error (ch); | ||
685 | return; | ||
686 | } | ||
687 | else if (flags != ch->recv_flags) | ||
688 | { | ||
689 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
690 | "Unexpected message flags. Got: %lu, expected: %lu\n", | ||
691 | flags, ch->recv_flags); | ||
692 | GNUNET_break_op (0); | ||
693 | recv_error (ch); | ||
694 | return; | ||
695 | } | ||
696 | |||
697 | uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; | ||
698 | |||
699 | for (pos = 0; sizeof (*msg) + pos < size; pos += psize) | ||
700 | { | ||
701 | const struct GNUNET_MessageHeader *pmsg | ||
702 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | ||
703 | psize = ntohs (pmsg->size); | ||
704 | ptype = ntohs (pmsg->type); | ||
705 | size_eq = size_min = 0; | ||
706 | |||
707 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) | ||
708 | { | ||
709 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
710 | "Dropping message of type %u with invalid size %u.\n", | ||
711 | ptype, psize); | ||
712 | recv_error (ch); | ||
713 | return; | ||
714 | } | ||
715 | |||
716 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
717 | "Received message part from PSYC.\n"); | ||
718 | GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); | ||
719 | |||
720 | switch (ptype) | ||
721 | { | ||
722 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
723 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | ||
724 | break; | ||
725 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
726 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
727 | break; | ||
728 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
729 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
730 | size_min = sizeof (struct GNUNET_MessageHeader); | ||
731 | break; | ||
732 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
733 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
734 | size_eq = sizeof (struct GNUNET_MessageHeader); | ||
735 | break; | ||
736 | default: | ||
737 | GNUNET_break_op (0); | ||
738 | recv_error (ch); | ||
739 | return; | ||
740 | } | ||
741 | |||
742 | if (! ((0 < size_eq && psize == size_eq) | ||
743 | || (0 < size_min && size_min <= psize))) | ||
744 | { | ||
745 | GNUNET_break_op (0); | ||
746 | recv_error (ch); | ||
747 | return; | ||
748 | } | ||
749 | |||
750 | switch (ptype) | ||
751 | { | ||
752 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
753 | { | ||
754 | struct GNUNET_PSYC_MessageMethod *meth | ||
755 | = (struct GNUNET_PSYC_MessageMethod *) pmsg; | ||
756 | |||
757 | if (MSG_STATE_START != ch->recv_state) | ||
758 | { | ||
759 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
760 | "Dropping out of order message method (%u).\n", | ||
761 | ch->recv_state); | ||
762 | /* It is normal to receive an incomplete message right after connecting, | ||
763 | * but should not happen later. | ||
764 | * FIXME: add a check for this condition. | ||
765 | */ | ||
766 | GNUNET_break_op (0); | ||
767 | recv_error (ch); | ||
768 | return; | ||
769 | } | ||
770 | |||
771 | if ('\0' != *((char *) meth + psize - 1)) | ||
772 | { | ||
773 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
774 | "Dropping message with malformed method. " | ||
775 | "Message ID: %" PRIu64 "\n", ch->recv_message_id); | ||
776 | GNUNET_break_op (0); | ||
777 | recv_error (ch); | ||
778 | return; | ||
779 | } | ||
780 | ch->recv_state = MSG_STATE_METHOD; | ||
781 | break; | ||
782 | } | ||
783 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
784 | { | ||
785 | if (!(MSG_STATE_METHOD == ch->recv_state | ||
786 | || MSG_STATE_MODIFIER == ch->recv_state | ||
787 | || MSG_STATE_MOD_CONT == ch->recv_state)) | ||
788 | { | ||
789 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
790 | "Dropping out of order message modifier (%u).\n", | ||
791 | ch->recv_state); | ||
792 | GNUNET_break_op (0); | ||
793 | recv_error (ch); | ||
794 | return; | ||
795 | } | ||
796 | |||
797 | struct GNUNET_PSYC_MessageModifier *mod | ||
798 | = (struct GNUNET_PSYC_MessageModifier *) pmsg; | ||
799 | |||
800 | uint16_t name_size = ntohs (mod->name_size); | ||
801 | ch->recv_mod_value_size_expected = ntohl (mod->value_size); | ||
802 | ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; | ||
803 | |||
804 | if (psize < sizeof (*mod) + name_size + 1 | ||
805 | || '\0' != *((char *) &mod[1] + name_size) | ||
806 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | ||
807 | { | ||
808 | LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n"); | ||
809 | GNUNET_break_op (0); | ||
810 | recv_error (ch); | ||
811 | return; | ||
812 | } | ||
813 | ch->recv_state = MSG_STATE_MODIFIER; | ||
814 | break; | ||
815 | } | ||
816 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
817 | { | ||
818 | ch->recv_mod_value_size += psize - sizeof (*pmsg); | ||
819 | |||
820 | if (!(MSG_STATE_MODIFIER == ch->recv_state | ||
821 | || MSG_STATE_MOD_CONT == ch->recv_state) | ||
822 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | ||
823 | { | ||
824 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
825 | "Dropping out of order message modifier continuation " | ||
826 | "!(%u == %u || %u == %u) || %lu < %lu.\n", | ||
827 | MSG_STATE_MODIFIER, ch->recv_state, | ||
828 | MSG_STATE_MOD_CONT, ch->recv_state, | ||
829 | ch->recv_mod_value_size_expected, ch->recv_mod_value_size); | ||
830 | GNUNET_break_op (0); | ||
831 | recv_error (ch); | ||
832 | return; | ||
833 | } | ||
834 | break; | ||
835 | } | ||
836 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
837 | { | ||
838 | if (ch->recv_state < MSG_STATE_METHOD | ||
839 | || ch->recv_mod_value_size_expected != ch->recv_mod_value_size) | ||
840 | { | ||
841 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
842 | "Dropping out of order message data fragment " | ||
843 | "(%u < %u || %lu != %lu).\n", | ||
844 | ch->recv_state, MSG_STATE_METHOD, | ||
845 | ch->recv_mod_value_size_expected, ch->recv_mod_value_size); | ||
846 | |||
847 | GNUNET_break_op (0); | ||
848 | recv_error (ch); | ||
849 | return; | ||
850 | } | ||
851 | ch->recv_state = MSG_STATE_DATA; | ||
852 | break; | ||
853 | } | ||
854 | } | ||
855 | |||
856 | GNUNET_PSYC_MessageCallback message_cb | ||
857 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
858 | ? ch->hist_message_cb | ||
859 | : ch->message_cb; | ||
860 | |||
861 | if (NULL != message_cb) | ||
862 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg); | ||
863 | |||
864 | switch (ptype) | ||
865 | { | ||
866 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
867 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
868 | recv_reset (ch); | ||
869 | break; | ||
870 | } | ||
871 | } | ||
872 | } | 248 | } |
873 | 249 | ||
874 | 250 | ||
875 | /** | ||
876 | * Handle incoming message acknowledgement from the PSYC service. | ||
877 | * | ||
878 | * @param ch The channel the acknowledgement is sent to. | ||
879 | */ | ||
880 | static void | 251 | static void |
881 | handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch) | 252 | slave_recv_join_ack (void *cls, |
253 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
254 | const struct GNUNET_MessageHeader *msg) | ||
882 | { | 255 | { |
883 | if (0 == ch->tmit_ack_pending) | 256 | struct GNUNET_PSYC_Slave * |
884 | { | 257 | slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, |
885 | LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); | 258 | sizeof (struct GNUNET_PSYC_Channel)); |
886 | GNUNET_break (0); | 259 | struct CountersResult *cres = (struct CountersResult *) msg; |
887 | return; | 260 | if (NULL != slv->connect_cb) |
888 | } | 261 | slv->connect_cb (slv->cb_cls, GNUNET_ntohll (cres->max_message_id)); |
889 | ch->tmit_ack_pending--; | ||
890 | |||
891 | switch (ch->tmit.state) | ||
892 | { | ||
893 | case MSG_STATE_MODIFIER: | ||
894 | case MSG_STATE_MOD_CONT: | ||
895 | if (GNUNET_NO == ch->tmit_paused) | ||
896 | channel_transmit_mod (ch); | ||
897 | break; | ||
898 | |||
899 | case MSG_STATE_DATA: | ||
900 | if (GNUNET_NO == ch->tmit_paused) | ||
901 | channel_transmit_data (ch); | ||
902 | break; | ||
903 | |||
904 | case MSG_STATE_END: | ||
905 | case MSG_STATE_CANCEL: | ||
906 | break; | ||
907 | |||
908 | default: | ||
909 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
910 | "Ignoring message ACK in state %u.\n", ch->tmit.state); | ||
911 | } | ||
912 | } | 262 | } |
913 | 263 | ||
914 | 264 | ||
915 | static void | 265 | static void |
916 | handle_psyc_join_request (struct GNUNET_PSYC_Master *mst, | 266 | slave_recv_join_decision (void *cls, |
917 | const struct MasterJoinRequest *req) | 267 | struct GNUNET_CLIENT_MANAGER_Connection *client, |
268 | const struct GNUNET_MessageHeader *msg) | ||
918 | { | 269 | { |
919 | struct GNUNET_PSYC_MessageHeader *msg = NULL; | 270 | struct GNUNET_PSYC_Slave * |
920 | if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*msg)) | 271 | slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, |
921 | msg = (struct GNUNET_PSYC_MessageHeader *) &req[1]; | 272 | sizeof (struct GNUNET_PSYC_Channel)); |
273 | const struct SlaveJoinDecision * | ||
274 | dcsn = (const struct SlaveJoinDecision *) msg; | ||
922 | 275 | ||
923 | struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); | 276 | struct GNUNET_PSYC_MessageHeader *pmsg = NULL; |
924 | jh->mst = mst; | 277 | if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) |
925 | jh->slave_key = req->slave_key; | 278 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1]; |
926 | 279 | ||
927 | if (NULL != mst->join_req_cb) | 280 | struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); |
928 | mst->join_req_cb (mst->ch.cb_cls, &req->slave_key, msg, jh); | 281 | if (NULL != slv->join_dcsn_cb) |
282 | slv->join_dcsn_cb (slv->cb_cls, ntohl (dcsn->is_admitted), pmsg); | ||
929 | } | 283 | } |
930 | 284 | ||
931 | 285 | ||
932 | static void | 286 | static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = |
933 | handle_psyc_join_decision (struct GNUNET_PSYC_Slave *slv, | ||
934 | const struct SlaveJoinDecision *dcsn) | ||
935 | { | 287 | { |
936 | struct GNUNET_PSYC_MessageHeader *msg = NULL; | 288 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, |
937 | if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*msg)) | ||
938 | msg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1]; | ||
939 | 289 | ||
940 | struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); | 290 | { &channel_recv_message, NULL, |
941 | if (NULL != slv->join_dcsn_cb) | 291 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, |
942 | slv->join_dcsn_cb (slv->ch.cb_cls, ntohl (dcsn->is_admitted), msg); | 292 | sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, |
943 | } | ||
944 | 293 | ||
294 | { &channel_recv_message_ack, NULL, | ||
295 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | ||
296 | sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, | ||
945 | 297 | ||
946 | /** | 298 | { &master_recv_start_ack, NULL, |
947 | * Type of a function to call when we receive a message | 299 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, |
948 | * from the service. | 300 | sizeof (struct CountersResult), GNUNET_NO }, |
949 | * | ||
950 | * @param cls closure | ||
951 | * @param msg message received, NULL on timeout or fatal error | ||
952 | */ | ||
953 | static void | ||
954 | message_handler (void *cls, | ||
955 | const struct GNUNET_MessageHeader *msg) | ||
956 | { | ||
957 | struct GNUNET_PSYC_Channel *ch = cls; | ||
958 | struct GNUNET_PSYC_Master *mst = cls; | ||
959 | struct GNUNET_PSYC_Slave *slv = cls; | ||
960 | |||
961 | if (NULL == msg) | ||
962 | { | ||
963 | // timeout / disconnected from service, reconnect | ||
964 | reschedule_connect (ch); | ||
965 | return; | ||
966 | } | ||
967 | uint16_t size_eq = 0; | ||
968 | uint16_t size_min = 0; | ||
969 | uint16_t size = ntohs (msg->size); | ||
970 | uint16_t type = ntohs (msg->type); | ||
971 | |||
972 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
973 | "Received message of type %d and size %u from PSYC service\n", | ||
974 | type, size); | ||
975 | |||
976 | switch (type) | ||
977 | { | ||
978 | case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: | ||
979 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: | ||
980 | size_eq = sizeof (struct CountersResult); | ||
981 | break; | ||
982 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: | ||
983 | size_min = sizeof (struct GNUNET_PSYC_MessageHeader); | ||
984 | break; | ||
985 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: | ||
986 | size_eq = sizeof (struct GNUNET_MessageHeader); | ||
987 | break; | ||
988 | case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST: | ||
989 | size_min = sizeof (struct MasterJoinRequest); | ||
990 | break; | ||
991 | case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION: | ||
992 | size_min = sizeof (struct SlaveJoinDecision); | ||
993 | break; | ||
994 | default: | ||
995 | GNUNET_break_op (0); | ||
996 | return; | ||
997 | } | ||
998 | |||
999 | if (! ((0 < size_eq && size == size_eq) | ||
1000 | || (0 < size_min && size_min <= size))) | ||
1001 | { | ||
1002 | GNUNET_break_op (0); | ||
1003 | return; | ||
1004 | } | ||
1005 | |||
1006 | switch (type) | ||
1007 | { | ||
1008 | case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: | ||
1009 | { | ||
1010 | struct CountersResult *cres = (struct CountersResult *) msg; | ||
1011 | if (NULL != mst->start_cb) | ||
1012 | mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id)); | ||
1013 | break; | ||
1014 | } | ||
1015 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: | ||
1016 | { | ||
1017 | struct CountersResult *cres = (struct CountersResult *) msg; | ||
1018 | if (NULL != slv->connect_cb) | ||
1019 | slv->connect_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id)); | ||
1020 | break; | ||
1021 | } | ||
1022 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: | ||
1023 | { | ||
1024 | handle_psyc_message_ack (ch); | ||
1025 | break; | ||
1026 | } | ||
1027 | |||
1028 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: | ||
1029 | handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg); | ||
1030 | break; | ||
1031 | |||
1032 | case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST: | ||
1033 | handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch, | ||
1034 | (const struct MasterJoinRequest *) msg); | ||
1035 | break; | ||
1036 | |||
1037 | case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION: | ||
1038 | handle_psyc_join_decision ((struct GNUNET_PSYC_Slave *) ch, | ||
1039 | (const struct SlaveJoinDecision *) msg); | ||
1040 | break; | ||
1041 | } | ||
1042 | |||
1043 | if (NULL != ch->client) | ||
1044 | { | ||
1045 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, | ||
1046 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
1047 | } | ||
1048 | } | ||
1049 | 301 | ||
302 | { &master_recv_join_request, NULL, | ||
303 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | ||
304 | sizeof (struct MasterJoinRequest), GNUNET_YES }, | ||
1050 | 305 | ||
1051 | /** | 306 | { NULL, NULL, 0, 0, GNUNET_NO } |
1052 | * Transmit next message to service. | 307 | }; |
1053 | * | ||
1054 | * @param cls The struct GNUNET_PSYC_Channel. | ||
1055 | * @param size Number of bytes available in @a buf. | ||
1056 | * @param buf Where to copy the message. | ||
1057 | * | ||
1058 | * @return Number of bytes copied to @a buf. | ||
1059 | */ | ||
1060 | static size_t | ||
1061 | send_next_message (void *cls, size_t size, void *buf) | ||
1062 | { | ||
1063 | LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); | ||
1064 | struct GNUNET_PSYC_Channel *ch = cls; | ||
1065 | struct MessageQueue *mq = ch->tmit_head; | ||
1066 | if (NULL == mq) | ||
1067 | return 0; | ||
1068 | struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
1069 | size_t ret = ntohs (qmsg->size); | ||
1070 | ch->th = NULL; | ||
1071 | if (ret > size) | ||
1072 | { | ||
1073 | reschedule_connect (ch); | ||
1074 | return 0; | ||
1075 | } | ||
1076 | memcpy (buf, qmsg, ret); | ||
1077 | |||
1078 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq); | ||
1079 | GNUNET_free (mq); | ||
1080 | |||
1081 | if (NULL != ch->tmit_head) | ||
1082 | transmit_next (ch); | ||
1083 | |||
1084 | if (GNUNET_NO == ch->in_receive) | ||
1085 | { | ||
1086 | ch->in_receive = GNUNET_YES; | ||
1087 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, | ||
1088 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
1089 | } | ||
1090 | return ret; | ||
1091 | } | ||
1092 | 308 | ||
1093 | 309 | ||
1094 | /** | 310 | static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = |
1095 | * Schedule transmission of the next message from our queue. | ||
1096 | * | ||
1097 | * @param ch PSYC handle. | ||
1098 | */ | ||
1099 | static void | ||
1100 | transmit_next (struct GNUNET_PSYC_Channel *ch) | ||
1101 | { | 311 | { |
1102 | LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); | 312 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, |
1103 | if (NULL != ch->th || NULL == ch->client) | ||
1104 | return; | ||
1105 | |||
1106 | struct MessageQueue *mq = ch->tmit_head; | ||
1107 | if (NULL == mq) | ||
1108 | return; | ||
1109 | struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
1110 | |||
1111 | ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client, | ||
1112 | ntohs (qmsg->size), | ||
1113 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1114 | GNUNET_NO, | ||
1115 | &send_next_message, | ||
1116 | ch); | ||
1117 | } | ||
1118 | 313 | ||
314 | { &channel_recv_message, NULL, | ||
315 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | ||
316 | sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, | ||
1119 | 317 | ||
1120 | /** | 318 | { &channel_recv_message_ack, NULL, |
1121 | * Try again to connect to the PSYC service. | 319 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, |
1122 | * | 320 | sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, |
1123 | * @param cls Channel handle. | ||
1124 | * @param tc Scheduler context. | ||
1125 | */ | ||
1126 | static void | ||
1127 | reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1128 | { | ||
1129 | struct GNUNET_PSYC_Channel *ch = cls; | ||
1130 | |||
1131 | recv_reset (ch); | ||
1132 | ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
1133 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1134 | "Connecting to PSYC service.\n"); | ||
1135 | GNUNET_assert (NULL == ch->client); | ||
1136 | ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); | ||
1137 | GNUNET_assert (NULL != ch->client); | ||
1138 | uint16_t reconn_size = ntohs (ch->reconnect_msg->size); | ||
1139 | |||
1140 | if (NULL == ch->tmit_head || | ||
1141 | 0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size)) | ||
1142 | { | ||
1143 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size); | ||
1144 | memcpy (&mq[1], ch->reconnect_msg, reconn_size); | ||
1145 | GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq); | ||
1146 | } | ||
1147 | transmit_next (ch); | ||
1148 | } | ||
1149 | 321 | ||
322 | { &slave_recv_join_ack, NULL, | ||
323 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK, | ||
324 | sizeof (struct CountersResult), GNUNET_NO }, | ||
1150 | 325 | ||
1151 | /** | 326 | { &slave_recv_join_decision, NULL, |
1152 | * Disconnect from the PSYC service. | 327 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, |
1153 | * | 328 | sizeof (struct SlaveJoinDecision), GNUNET_YES }, |
1154 | * @param c Channel handle to disconnect. | 329 | |
1155 | */ | 330 | { NULL, NULL, 0, 0, GNUNET_NO } |
1156 | static void | 331 | }; |
1157 | disconnect (void *c) | ||
1158 | { | ||
1159 | struct GNUNET_PSYC_Channel *ch = c; | ||
1160 | |||
1161 | GNUNET_assert (NULL != ch); | ||
1162 | if (ch->tmit_head != ch->tmit_tail) | ||
1163 | { | ||
1164 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
1165 | "Disconnecting while there are still outstanding messages!\n"); | ||
1166 | GNUNET_break (0); | ||
1167 | } | ||
1168 | if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK) | ||
1169 | { | ||
1170 | GNUNET_SCHEDULER_cancel (ch->reconnect_task); | ||
1171 | ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
1172 | } | ||
1173 | if (NULL != ch->th) | ||
1174 | { | ||
1175 | GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th); | ||
1176 | ch->th = NULL; | ||
1177 | } | ||
1178 | if (NULL != ch->client) | ||
1179 | { | ||
1180 | GNUNET_CLIENT_disconnect (ch->client); | ||
1181 | ch->client = NULL; | ||
1182 | } | ||
1183 | if (NULL != ch->reconnect_msg) | ||
1184 | { | ||
1185 | GNUNET_free (ch->reconnect_msg); | ||
1186 | ch->reconnect_msg = NULL; | ||
1187 | } | ||
1188 | } | ||
1189 | 332 | ||
1190 | 333 | ||
1191 | /** | 334 | /** |
@@ -1227,24 +370,29 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1227 | void *cls) | 370 | void *cls) |
1228 | { | 371 | { |
1229 | struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); | 372 | struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); |
1230 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | 373 | struct GNUNET_PSYC_Channel *chn = &mst->chn; |
1231 | struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); | ||
1232 | 374 | ||
375 | struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); | ||
1233 | req->header.size = htons (sizeof (*req)); | 376 | req->header.size = htons (sizeof (*req)); |
1234 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); | 377 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); |
1235 | req->channel_key = *channel_key; | 378 | req->channel_key = *channel_key; |
1236 | req->policy = policy; | 379 | req->policy = policy; |
1237 | 380 | ||
381 | chn->connect_msg = (struct GNUNET_MessageHeader *) req; | ||
382 | chn->cfg = cfg; | ||
383 | chn->is_master = GNUNET_YES; | ||
384 | |||
1238 | mst->start_cb = start_cb; | 385 | mst->start_cb = start_cb; |
1239 | mst->join_req_cb = join_request_cb; | 386 | mst->join_req_cb = join_request_cb; |
1240 | ch->message_cb = message_cb; | 387 | mst->cb_cls = cls; |
1241 | ch->cb_cls = cls; | 388 | |
1242 | ch->cfg = cfg; | 389 | chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", master_handlers); |
1243 | ch->is_master = GNUNET_YES; | 390 | GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn)); |
1244 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; | 391 | |
1245 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 392 | chn->tmit = GNUNET_PSYC_transmit_create (chn->client); |
1246 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); | 393 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls); |
1247 | 394 | ||
395 | channel_send_connect_msg (chn); | ||
1248 | return mst; | 396 | return mst; |
1249 | } | 397 | } |
1250 | 398 | ||
@@ -1253,12 +401,13 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1253 | * Stop a PSYC master channel. | 401 | * Stop a PSYC master channel. |
1254 | * | 402 | * |
1255 | * @param master PSYC channel master to stop. | 403 | * @param master PSYC channel master to stop. |
404 | * @param keep_active FIXME | ||
1256 | */ | 405 | */ |
1257 | void | 406 | void |
1258 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) | 407 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst) |
1259 | { | 408 | { |
1260 | disconnect (master); | 409 | GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES); |
1261 | GNUNET_free (master); | 410 | GNUNET_free (mst); |
1262 | } | 411 | } |
1263 | 412 | ||
1264 | 413 | ||
@@ -1292,7 +441,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1292 | const struct GNUNET_PeerIdentity *relays, | 441 | const struct GNUNET_PeerIdentity *relays, |
1293 | const struct GNUNET_PSYC_MessageHeader *join_resp) | 442 | const struct GNUNET_PSYC_MessageHeader *join_resp) |
1294 | { | 443 | { |
1295 | struct GNUNET_PSYC_Channel *ch = &jh->mst->ch; | 444 | struct GNUNET_PSYC_Channel *chn = &jh->mst->chn; |
1296 | struct MasterJoinDecision *dcsn; | 445 | struct MasterJoinDecision *dcsn; |
1297 | uint16_t join_resp_size | 446 | uint16_t join_resp_size |
1298 | = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0; | 447 | = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0; |
@@ -1302,9 +451,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1302 | < sizeof (*dcsn) + relay_size + join_resp_size) | 451 | < sizeof (*dcsn) + relay_size + join_resp_size) |
1303 | return GNUNET_SYSERR; | 452 | return GNUNET_SYSERR; |
1304 | 453 | ||
1305 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn) | 454 | dcsn = GNUNET_malloc (sizeof (*dcsn) + relay_size + join_resp_size); |
1306 | + relay_size + join_resp_size); | ||
1307 | dcsn = (struct MasterJoinDecision *) &mq[1]; | ||
1308 | dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); | 455 | dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); |
1309 | dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); | 456 | dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); |
1310 | dcsn->is_admitted = htonl (is_admitted); | 457 | dcsn->is_admitted = htonl (is_admitted); |
@@ -1313,8 +460,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1313 | if (0 < join_resp_size) | 460 | if (0 < join_resp_size) |
1314 | memcpy (&dcsn[1], join_resp, join_resp_size); | 461 | memcpy (&dcsn[1], join_resp, join_resp_size); |
1315 | 462 | ||
1316 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); | 463 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header); |
1317 | transmit_next (ch); | ||
1318 | return GNUNET_OK; | 464 | return GNUNET_OK; |
1319 | } | 465 | } |
1320 | 466 | ||
@@ -1332,40 +478,59 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1332 | * @return Transmission handle, NULL on error (i.e. more than one request queued). | 478 | * @return Transmission handle, NULL on error (i.e. more than one request queued). |
1333 | */ | 479 | */ |
1334 | struct GNUNET_PSYC_MasterTransmitHandle * | 480 | struct GNUNET_PSYC_MasterTransmitHandle * |
1335 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | 481 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst, |
1336 | const char *method_name, | 482 | const char *method_name, |
1337 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, | 483 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, |
1338 | GNUNET_PSYC_TransmitNotifyData notify_data, | 484 | GNUNET_PSYC_TransmitNotifyData notify_data, |
1339 | void *notify_cls, | 485 | void *notify_cls, |
1340 | enum GNUNET_PSYC_MasterTransmitFlags flags) | 486 | enum GNUNET_PSYC_MasterTransmitFlags flags) |
1341 | { | 487 | { |
1342 | return (struct GNUNET_PSYC_MasterTransmitHandle *) | 488 | if (GNUNET_OK |
1343 | channel_transmit (&master->ch, method_name, notify_mod, notify_data, | 489 | == GNUNET_PSYC_transmit_message (mst->chn.tmit, method_name, NULL, |
1344 | notify_cls, flags); | 490 | notify_mod, notify_data, notify_cls, |
491 | flags)) | ||
492 | return (struct GNUNET_PSYC_MasterTransmitHandle *) mst->chn.tmit; | ||
493 | else | ||
494 | return NULL; | ||
1345 | } | 495 | } |
1346 | 496 | ||
1347 | 497 | ||
1348 | /** | 498 | /** |
1349 | * Resume transmission to the channel. | 499 | * Resume transmission to the channel. |
1350 | * | 500 | * |
1351 | * @param th Handle of the request that is being resumed. | 501 | * @param tmit Handle of the request that is being resumed. |
1352 | */ | 502 | */ |
1353 | void | 503 | void |
1354 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | 504 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *tmit) |
1355 | { | 505 | { |
1356 | channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | 506 | GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit); |
1357 | } | 507 | } |
1358 | 508 | ||
1359 | 509 | ||
1360 | /** | 510 | /** |
1361 | * Abort transmission request to the channel. | 511 | * Abort transmission request to the channel. |
1362 | * | 512 | * |
1363 | * @param th Handle of the request that is being aborted. | 513 | * @param tmit Handle of the request that is being aborted. |
1364 | */ | 514 | */ |
1365 | void | 515 | void |
1366 | GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) | 516 | GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *tmit) |
1367 | { | 517 | { |
1368 | channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | 518 | GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit); |
519 | } | ||
520 | |||
521 | |||
522 | /** | ||
523 | * Convert a channel @a master to a @e channel handle to access the @e channel | ||
524 | * APIs. | ||
525 | * | ||
526 | * @param master Channel master handle. | ||
527 | * | ||
528 | * @return Channel handle, valid for as long as @a master is valid. | ||
529 | */ | ||
530 | struct GNUNET_PSYC_Channel * | ||
531 | GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) | ||
532 | { | ||
533 | return &master->chn; | ||
1369 | } | 534 | } |
1370 | 535 | ||
1371 | 536 | ||
@@ -1420,7 +585,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1420 | uint16_t data_size) | 585 | uint16_t data_size) |
1421 | { | 586 | { |
1422 | struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); | 587 | struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); |
1423 | struct GNUNET_PSYC_Channel *ch = &slv->ch; | 588 | struct GNUNET_PSYC_Channel *chn = &slv->chn; |
1424 | struct SlaveJoinRequest *req | 589 | struct SlaveJoinRequest *req |
1425 | = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays)); | 590 | = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays)); |
1426 | req->header.size = htons (sizeof (*req) | 591 | req->header.size = htons (sizeof (*req) |
@@ -1432,17 +597,21 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1432 | req->relay_count = htonl (relay_count); | 597 | req->relay_count = htonl (relay_count); |
1433 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); | 598 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); |
1434 | 599 | ||
600 | chn->connect_msg = (struct GNUNET_MessageHeader *) req; | ||
601 | chn->cfg = cfg; | ||
602 | chn->is_master = GNUNET_NO; | ||
603 | |||
1435 | slv->connect_cb = connect_cb; | 604 | slv->connect_cb = connect_cb; |
1436 | slv->join_dcsn_cb = join_decision_cb; | 605 | slv->join_dcsn_cb = join_decision_cb; |
1437 | ch->message_cb = message_cb; | 606 | slv->cb_cls = cls; |
1438 | ch->cb_cls = cls; | ||
1439 | 607 | ||
1440 | ch->cfg = cfg; | 608 | chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers); |
1441 | ch->is_master = GNUNET_NO; | 609 | GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn)); |
1442 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; | ||
1443 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | ||
1444 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); | ||
1445 | 610 | ||
611 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls); | ||
612 | chn->tmit = GNUNET_PSYC_transmit_create (chn->client); | ||
613 | |||
614 | channel_send_connect_msg (chn); | ||
1446 | return slv; | 615 | return slv; |
1447 | } | 616 | } |
1448 | 617 | ||
@@ -1456,10 +625,10 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1456 | * @param slave Slave handle. | 625 | * @param slave Slave handle. |
1457 | */ | 626 | */ |
1458 | void | 627 | void |
1459 | GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) | 628 | GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv) |
1460 | { | 629 | { |
1461 | disconnect (slave); | 630 | GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES); |
1462 | GNUNET_free (slave); | 631 | GNUNET_free (slv); |
1463 | } | 632 | } |
1464 | 633 | ||
1465 | 634 | ||
@@ -1477,69 +646,59 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) | |||
1477 | * queued). | 646 | * queued). |
1478 | */ | 647 | */ |
1479 | struct GNUNET_PSYC_SlaveTransmitHandle * | 648 | struct GNUNET_PSYC_SlaveTransmitHandle * |
1480 | GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, | 649 | GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slv, |
1481 | const char *method_name, | 650 | const char *method_name, |
1482 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, | 651 | GNUNET_PSYC_TransmitNotifyModifier notify_mod, |
1483 | GNUNET_PSYC_TransmitNotifyData notify_data, | 652 | GNUNET_PSYC_TransmitNotifyData notify_data, |
1484 | void *notify_cls, | 653 | void *notify_cls, |
1485 | enum GNUNET_PSYC_SlaveTransmitFlags flags) | 654 | enum GNUNET_PSYC_SlaveTransmitFlags flags) |
655 | |||
1486 | { | 656 | { |
1487 | return (struct GNUNET_PSYC_SlaveTransmitHandle *) | 657 | if (GNUNET_OK |
1488 | channel_transmit (&slave->ch, method_name, | 658 | == GNUNET_PSYC_transmit_message (slv->chn.tmit, method_name, NULL, |
1489 | notify_mod, notify_data, notify_cls, flags); | 659 | notify_mod, notify_data, notify_cls, |
660 | flags)) | ||
661 | return (struct GNUNET_PSYC_SlaveTransmitHandle *) slv->chn.tmit; | ||
662 | else | ||
663 | return NULL; | ||
1490 | } | 664 | } |
1491 | 665 | ||
1492 | 666 | ||
1493 | /** | 667 | /** |
1494 | * Resume transmission to the master. | 668 | * Resume transmission to the master. |
1495 | * | 669 | * |
1496 | * @param th Handle of the request that is being resumed. | 670 | * @param tmit Handle of the request that is being resumed. |
1497 | */ | 671 | */ |
1498 | void | 672 | void |
1499 | GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th) | 673 | GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *tmit) |
1500 | { | 674 | { |
1501 | channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | 675 | GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit); |
1502 | } | 676 | } |
1503 | 677 | ||
1504 | 678 | ||
1505 | /** | 679 | /** |
1506 | * Abort transmission request to master. | 680 | * Abort transmission request to master. |
1507 | * | 681 | * |
1508 | * @param th Handle of the request that is being aborted. | 682 | * @param tmit Handle of the request that is being aborted. |
1509 | */ | 683 | */ |
1510 | void | 684 | void |
1511 | GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) | 685 | GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *tmit) |
1512 | { | 686 | { |
1513 | channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); | 687 | GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit); |
1514 | } | ||
1515 | |||
1516 | |||
1517 | /** | ||
1518 | * Convert a channel @a master to a @e channel handle to access the @e channel | ||
1519 | * APIs. | ||
1520 | * | ||
1521 | * @param master Channel master handle. | ||
1522 | * | ||
1523 | * @return Channel handle, valid for as long as @a master is valid. | ||
1524 | */ | ||
1525 | struct GNUNET_PSYC_Channel * | ||
1526 | GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) | ||
1527 | { | ||
1528 | return &master->ch; | ||
1529 | } | 688 | } |
1530 | 689 | ||
1531 | 690 | ||
1532 | /** | 691 | /** |
1533 | * Convert @a slave to a @e channel handle to access the @e channel APIs. | 692 | * Convert @a slave to a @e channel handle to access the @e channel APIs. |
1534 | * | 693 | * |
1535 | * @param slave Slave handle. | 694 | * @param slv Slave handle. |
1536 | * | 695 | * |
1537 | * @return Channel handle, valid for as long as @a slave is valid. | 696 | * @return Channel handle, valid for as long as @a slave is valid. |
1538 | */ | 697 | */ |
1539 | struct GNUNET_PSYC_Channel * | 698 | struct GNUNET_PSYC_Channel * |
1540 | GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) | 699 | GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv) |
1541 | { | 700 | { |
1542 | return &slave->ch; | 701 | return &slv->chn; |
1543 | } | 702 | } |
1544 | 703 | ||
1545 | 704 | ||
@@ -1565,23 +724,17 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) | |||
1565 | * @param effective_since Addition of slave is in effect since this message ID. | 724 | * @param effective_since Addition of slave is in effect since this message ID. |
1566 | */ | 725 | */ |
1567 | void | 726 | void |
1568 | GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, | 727 | GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, |
1569 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | 728 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, |
1570 | uint64_t announced_at, | 729 | uint64_t announced_at, |
1571 | uint64_t effective_since) | 730 | uint64_t effective_since) |
1572 | { | 731 | { |
1573 | struct ChannelSlaveAdd *slvadd; | 732 | struct ChannelSlaveAdd *add = GNUNET_malloc (sizeof (*add)); |
1574 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd)); | 733 | add->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); |
1575 | 734 | add->header.size = htons (sizeof (*add)); | |
1576 | slvadd = (struct ChannelSlaveAdd *) &mq[1]; | 735 | add->announced_at = GNUNET_htonll (announced_at); |
1577 | slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); | 736 | add->effective_since = GNUNET_htonll (effective_since); |
1578 | slvadd->header.size = htons (sizeof (*slvadd)); | 737 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &add->header); |
1579 | slvadd->announced_at = GNUNET_htonll (announced_at); | ||
1580 | slvadd->effective_since = GNUNET_htonll (effective_since); | ||
1581 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, | ||
1582 | channel->tmit_tail, | ||
1583 | mq); | ||
1584 | transmit_next (channel); | ||
1585 | } | 738 | } |
1586 | 739 | ||
1587 | 740 | ||
@@ -1607,21 +760,15 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, | |||
1607 | * @param announced_at ID of the message that announced the membership change. | 760 | * @param announced_at ID of the message that announced the membership change. |
1608 | */ | 761 | */ |
1609 | void | 762 | void |
1610 | GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, | 763 | GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, |
1611 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | 764 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, |
1612 | uint64_t announced_at) | 765 | uint64_t announced_at) |
1613 | { | 766 | { |
1614 | struct ChannelSlaveRemove *slvrm; | 767 | struct ChannelSlaveRemove *rm = GNUNET_malloc (sizeof (*rm)); |
1615 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm)); | 768 | rm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); |
1616 | 769 | rm->header.size = htons (sizeof (*rm)); | |
1617 | slvrm = (struct ChannelSlaveRemove *) &mq[1]; | 770 | rm->announced_at = GNUNET_htonll (announced_at); |
1618 | slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); | 771 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &rm->header); |
1619 | slvrm->header.size = htons (sizeof (*slvrm)); | ||
1620 | slvrm->announced_at = GNUNET_htonll (announced_at); | ||
1621 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, | ||
1622 | channel->tmit_tail, | ||
1623 | mq); | ||
1624 | transmit_next (channel); | ||
1625 | } | 772 | } |
1626 | 773 | ||
1627 | 774 | ||