diff options
author | Gabor X Toth <*@tg-x.net> | 2014-05-13 12:08:14 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-05-13 12:08:14 +0000 |
commit | 093f0291be26fa3dfc6fc98a536028ef99517832 (patch) | |
tree | c10078bfe4136f940183d8dfde85617ab75acf46 /src/psyc/psyc_api.c | |
parent | 783fc956a05c0f321fa63fbcaeab00bc1865a069 (diff) | |
download | gnunet-093f0291be26fa3dfc6fc98a536028ef99517832.tar.gz gnunet-093f0291be26fa3dfc6fc98a536028ef99517832.zip |
multicast: send messages between client lib & service
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r-- | src/psyc/psyc_api.c | 210 |
1 files changed, 106 insertions, 104 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 22f1da069..85f86ceaa 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -41,11 +41,10 @@ | |||
41 | 41 | ||
42 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) | 42 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) |
43 | 43 | ||
44 | struct OperationHandle | 44 | struct MessageQueue |
45 | { | 45 | { |
46 | struct OperationHandle *prev; | 46 | struct MessageQueue *prev; |
47 | struct OperationHandle *next; | 47 | struct MessageQueue *next; |
48 | struct GNUNET_MessageHeader *msg; | ||
49 | }; | 48 | }; |
50 | 49 | ||
51 | 50 | ||
@@ -87,19 +86,19 @@ struct GNUNET_PSYC_Channel | |||
87 | struct GNUNET_CLIENT_TransmitHandle *th; | 86 | struct GNUNET_CLIENT_TransmitHandle *th; |
88 | 87 | ||
89 | /** | 88 | /** |
90 | * Head of operations to transmit. | 89 | * Head of messages to transmit to the service. |
91 | */ | 90 | */ |
92 | struct OperationHandle *tmit_head; | 91 | struct MessageQueue *tmit_head; |
93 | 92 | ||
94 | /** | 93 | /** |
95 | * Tail of operations to transmit. | 94 | * Tail of operations to transmit to the service. |
96 | */ | 95 | */ |
97 | struct OperationHandle *tmit_tail; | 96 | struct MessageQueue *tmit_tail; |
98 | 97 | ||
99 | /** | 98 | /** |
100 | * Message being transmitted to the PSYC service. | 99 | * Message currently being transmitted to the service. |
101 | */ | 100 | */ |
102 | struct OperationHandle *tmit_msg; | 101 | struct MessageQueue *tmit_msg; |
103 | 102 | ||
104 | /** | 103 | /** |
105 | * Message to send on reconnect. | 104 | * Message to send on reconnect. |
@@ -201,8 +200,6 @@ struct GNUNET_PSYC_Master | |||
201 | struct GNUNET_PSYC_Channel ch; | 200 | struct GNUNET_PSYC_Channel ch; |
202 | 201 | ||
203 | GNUNET_PSYC_MasterStartCallback start_cb; | 202 | GNUNET_PSYC_MasterStartCallback start_cb; |
204 | |||
205 | uint64_t max_message_id; | ||
206 | }; | 203 | }; |
207 | 204 | ||
208 | 205 | ||
@@ -214,8 +211,6 @@ struct GNUNET_PSYC_Slave | |||
214 | struct GNUNET_PSYC_Channel ch; | 211 | struct GNUNET_PSYC_Channel ch; |
215 | 212 | ||
216 | GNUNET_PSYC_SlaveJoinCallback join_cb; | 213 | GNUNET_PSYC_SlaveJoinCallback join_cb; |
217 | |||
218 | uint64_t max_message_id; | ||
219 | }; | 214 | }; |
220 | 215 | ||
221 | 216 | ||
@@ -269,30 +264,30 @@ channel_transmit_data (struct GNUNET_PSYC_Channel *ch); | |||
269 | /** | 264 | /** |
270 | * Reschedule a connect attempt to the service. | 265 | * Reschedule a connect attempt to the service. |
271 | * | 266 | * |
272 | * @param c channel to reconnect | 267 | * @param ch Channel to reconnect. |
273 | */ | 268 | */ |
274 | static void | 269 | static void |
275 | reschedule_connect (struct GNUNET_PSYC_Channel *c) | 270 | reschedule_connect (struct GNUNET_PSYC_Channel *ch) |
276 | { | 271 | { |
277 | GNUNET_assert (c->reconnect_task == GNUNET_SCHEDULER_NO_TASK); | 272 | GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK); |
278 | 273 | ||
279 | if (NULL != c->th) | 274 | if (NULL != ch->th) |
280 | { | 275 | { |
281 | GNUNET_CLIENT_notify_transmit_ready_cancel (c->th); | 276 | GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th); |
282 | c->th = NULL; | 277 | ch->th = NULL; |
283 | } | 278 | } |
284 | if (NULL != c->client) | 279 | if (NULL != ch->client) |
285 | { | 280 | { |
286 | GNUNET_CLIENT_disconnect (c->client); | 281 | GNUNET_CLIENT_disconnect (ch->client); |
287 | c->client = NULL; | 282 | ch->client = NULL; |
288 | } | 283 | } |
289 | c->in_receive = GNUNET_NO; | 284 | ch->in_receive = GNUNET_NO; |
290 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 285 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
291 | "Scheduling task to reconnect to PSYC service in %s.\n", | 286 | "Scheduling task to reconnect to PSYC service in %s.\n", |
292 | GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES)); | 287 | GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, GNUNET_YES)); |
293 | c->reconnect_task = | 288 | ch->reconnect_task = |
294 | GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c); | 289 | GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch); |
295 | c->reconnect_delay = GNUNET_TIME_STD_BACKOFF (c->reconnect_delay); | 290 | ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay); |
296 | } | 291 | } |
297 | 292 | ||
298 | 293 | ||
@@ -306,7 +301,7 @@ transmit_next (struct GNUNET_PSYC_Channel *ch); | |||
306 | 301 | ||
307 | 302 | ||
308 | /** | 303 | /** |
309 | * Reset data stored related to the last received message. | 304 | * Reset stored data related to the last received message. |
310 | */ | 305 | */ |
311 | static void | 306 | static void |
312 | recv_reset (struct GNUNET_PSYC_Channel *ch) | 307 | recv_reset (struct GNUNET_PSYC_Channel *ch) |
@@ -356,51 +351,53 @@ queue_message (struct GNUNET_PSYC_Channel *ch, | |||
356 | "Queueing message of type %u and size %u (end: %u)).\n", | 351 | "Queueing message of type %u and size %u (end: %u)).\n", |
357 | ntohs (msg->type), size, end); | 352 | ntohs (msg->type), size, end); |
358 | 353 | ||
359 | struct OperationHandle *op = ch->tmit_msg; | 354 | struct MessageQueue *mq = ch->tmit_msg; |
360 | if (NULL != op) | 355 | struct GNUNET_MessageHeader *qmsg = NULL; |
356 | if (NULL != mq) | ||
361 | { | 357 | { |
358 | qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
362 | if (NULL == msg | 359 | if (NULL == msg |
363 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size) | 360 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size) |
364 | { | 361 | { |
365 | /* End of message or buffer is full, add it to transmission queue | 362 | /* End of message or buffer is full, add it to transmission queue |
366 | * and start with empty buffer */ | 363 | * and start with empty buffer */ |
367 | op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | 364 | qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); |
368 | op->msg->size = htons (op->msg->size); | 365 | qmsg->size = htons (qmsg->size); |
369 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 366 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); |
370 | ch->tmit_msg = op = NULL; | 367 | ch->tmit_msg = mq = NULL; |
371 | ch->tmit_ack_pending++; | 368 | ch->tmit_ack_pending++; |
372 | } | 369 | } |
373 | else | 370 | else |
374 | { | 371 | { |
375 | /* Message fits in current buffer, append */ | 372 | /* Message fits in current buffer, append */ |
376 | ch->tmit_msg = op | 373 | ch->tmit_msg |
377 | = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size); | 374 | = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size); |
378 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | 375 | qmsg = (struct GNUNET_MessageHeader *) &mq[1]; |
379 | memcpy ((char *) op->msg + op->msg->size, msg, size); | 376 | memcpy ((char *) qmsg + qmsg->size, msg, size); |
380 | op->msg->size += size; | 377 | qmsg->size += size; |
381 | } | 378 | } |
382 | } | 379 | } |
383 | 380 | ||
384 | if (NULL == op && NULL != msg) | 381 | if (NULL == mq && NULL != msg) |
385 | { | 382 | { |
386 | /* Empty buffer, copy over message. */ | 383 | /* Empty buffer, copy over message. */ |
387 | ch->tmit_msg = op | 384 | ch->tmit_msg |
388 | = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size); | 385 | = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size); |
389 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | 386 | qmsg = (struct GNUNET_MessageHeader *) &mq[1]; |
390 | op->msg->size = sizeof (*op->msg) + size; | 387 | qmsg->size = sizeof (*qmsg) + size; |
391 | memcpy (&op->msg[1], msg, size); | 388 | memcpy (&qmsg[1], msg, size); |
392 | } | 389 | } |
393 | 390 | ||
394 | if (NULL != op | 391 | if (NULL != mq |
395 | && (GNUNET_YES == end | 392 | && (GNUNET_YES == end |
396 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD | 393 | || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD |
397 | < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) | 394 | < qmsg->size + sizeof (struct GNUNET_MessageHeader)))) |
398 | { | 395 | { |
399 | /* End of message or buffer is full, add it to transmission queue. */ | 396 | /* End of message or buffer is full, add it to transmission queue. */ |
400 | op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | 397 | qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); |
401 | op->msg->size = htons (op->msg->size); | 398 | qmsg->size = htons (qmsg->size); |
402 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 399 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); |
403 | ch->tmit_msg = op = NULL; | 400 | ch->tmit_msg = mq = NULL; |
404 | ch->tmit_ack_pending++; | 401 | ch->tmit_ack_pending++; |
405 | } | 402 | } |
406 | 403 | ||
@@ -577,6 +574,7 @@ channel_transmit_data (struct GNUNET_PSYC_Channel *ch) | |||
577 | * @param notify_data Function to call to obtain fragments of the data. | 574 | * @param notify_data Function to call to obtain fragments of the data. |
578 | * @param notify_cls Closure for @a notify_mod and @a notify_data. | 575 | * @param notify_cls Closure for @a notify_mod and @a notify_data. |
579 | * @param flags Flags for the message being transmitted. | 576 | * @param flags Flags for the message being transmitted. |
577 | * | ||
580 | * @return Transmission handle, NULL on error (i.e. more than one request queued). | 578 | * @return Transmission handle, NULL on error (i.e. more than one request queued). |
581 | */ | 579 | */ |
582 | static struct GNUNET_PSYC_ChannelTransmitHandle * | 580 | static struct GNUNET_PSYC_ChannelTransmitHandle * |
@@ -593,14 +591,14 @@ channel_transmit (struct GNUNET_PSYC_Channel *ch, | |||
593 | 591 | ||
594 | size_t size = strlen (method_name) + 1; | 592 | size_t size = strlen (method_name) + 1; |
595 | struct GNUNET_PSYC_MessageMethod *pmeth; | 593 | struct GNUNET_PSYC_MessageMethod *pmeth; |
596 | struct OperationHandle *op; | 594 | struct GNUNET_MessageHeader *qmsg; |
597 | 595 | struct MessageQueue * | |
598 | ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) | 596 | mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) |
599 | + sizeof (*pmeth) + size); | 597 | + sizeof (*pmeth) + size); |
600 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | 598 | qmsg = (struct GNUNET_MessageHeader *) &mq[1]; |
601 | op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size; | 599 | qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size; |
602 | 600 | ||
603 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1]; | 601 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1]; |
604 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | 602 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); |
605 | pmeth->header.size = htons (sizeof (*pmeth) + size); | 603 | pmeth->header.size = htons (sizeof (*pmeth) + size); |
606 | pmeth->flags = htonl (flags); | 604 | pmeth->flags = htonl (flags); |
@@ -928,7 +926,7 @@ message_handler (void *cls, | |||
928 | 926 | ||
929 | if (NULL == msg) | 927 | if (NULL == msg) |
930 | { | 928 | { |
931 | // timeout / disconnected from server, reconnect | 929 | // timeout / disconnected from service, reconnect |
932 | reschedule_connect (ch); | 930 | reschedule_connect (ch); |
933 | return; | 931 | return; |
934 | } | 932 | } |
@@ -970,17 +968,15 @@ message_handler (void *cls, | |||
970 | case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: | 968 | case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: |
971 | { | 969 | { |
972 | struct CountersResult *cres = (struct CountersResult *) msg; | 970 | struct CountersResult *cres = (struct CountersResult *) msg; |
973 | mst->max_message_id = GNUNET_ntohll (cres->max_message_id); | ||
974 | if (NULL != mst->start_cb) | 971 | if (NULL != mst->start_cb) |
975 | mst->start_cb (ch->cb_cls, mst->max_message_id); | 972 | mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id)); |
976 | break; | 973 | break; |
977 | } | 974 | } |
978 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: | 975 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: |
979 | { | 976 | { |
980 | struct CountersResult *cres = (struct CountersResult *) msg; | 977 | struct CountersResult *cres = (struct CountersResult *) msg; |
981 | slv->max_message_id = GNUNET_ntohll (cres->max_message_id); | ||
982 | if (NULL != slv->join_cb) | 978 | if (NULL != slv->join_cb) |
983 | slv->join_cb (ch->cb_cls, slv->max_message_id); | 979 | slv->join_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id)); |
984 | break; | 980 | break; |
985 | } | 981 | } |
986 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: | 982 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: |
@@ -1005,31 +1001,32 @@ message_handler (void *cls, | |||
1005 | /** | 1001 | /** |
1006 | * Transmit next message to service. | 1002 | * Transmit next message to service. |
1007 | * | 1003 | * |
1008 | * @param cls The 'struct GNUNET_PSYC_Channel'. | 1004 | * @param cls The struct GNUNET_PSYC_Channel. |
1009 | * @param size Number of bytes available in buf. | 1005 | * @param size Number of bytes available in @a buf. |
1010 | * @param buf Where to copy the message. | 1006 | * @param buf Where to copy the message. |
1011 | * @return Number of bytes copied to buf. | 1007 | * |
1008 | * @return Number of bytes copied to @a buf. | ||
1012 | */ | 1009 | */ |
1013 | static size_t | 1010 | static size_t |
1014 | send_next_message (void *cls, size_t size, void *buf) | 1011 | send_next_message (void *cls, size_t size, void *buf) |
1015 | { | 1012 | { |
1016 | struct GNUNET_PSYC_Channel *ch = cls; | ||
1017 | struct OperationHandle *op = ch->tmit_head; | ||
1018 | size_t ret; | ||
1019 | LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); | 1013 | LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); |
1020 | ch->th = NULL; | 1014 | struct GNUNET_PSYC_Channel *ch = cls; |
1021 | if (NULL == op->msg) | 1015 | struct MessageQueue *mq = ch->tmit_head; |
1016 | if (NULL == mq) | ||
1022 | return 0; | 1017 | return 0; |
1023 | ret = ntohs (op->msg->size); | 1018 | struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; |
1019 | size_t ret = ntohs (qmsg->size); | ||
1020 | ch->th = NULL; | ||
1024 | if (ret > size) | 1021 | if (ret > size) |
1025 | { | 1022 | { |
1026 | reschedule_connect (ch); | 1023 | reschedule_connect (ch); |
1027 | return 0; | 1024 | return 0; |
1028 | } | 1025 | } |
1029 | memcpy (buf, op->msg, ret); | 1026 | memcpy (buf, qmsg, ret); |
1030 | 1027 | ||
1031 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, op); | 1028 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq); |
1032 | GNUNET_free (op); | 1029 | GNUNET_free (mq); |
1033 | 1030 | ||
1034 | if (NULL != ch->tmit_head) | 1031 | if (NULL != ch->tmit_head) |
1035 | transmit_next (ch); | 1032 | transmit_next (ch); |
@@ -1056,12 +1053,13 @@ transmit_next (struct GNUNET_PSYC_Channel *ch) | |||
1056 | if (NULL != ch->th || NULL == ch->client) | 1053 | if (NULL != ch->th || NULL == ch->client) |
1057 | return; | 1054 | return; |
1058 | 1055 | ||
1059 | struct OperationHandle *op = ch->tmit_head; | 1056 | struct MessageQueue *mq = ch->tmit_head; |
1060 | if (NULL == op) | 1057 | if (NULL == mq) |
1061 | return; | 1058 | return; |
1059 | struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; | ||
1062 | 1060 | ||
1063 | ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client, | 1061 | ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client, |
1064 | ntohs (op->msg->size), | 1062 | ntohs (qmsg->size), |
1065 | GNUNET_TIME_UNIT_FOREVER_REL, | 1063 | GNUNET_TIME_UNIT_FOREVER_REL, |
1066 | GNUNET_NO, | 1064 | GNUNET_NO, |
1067 | &send_next_message, | 1065 | &send_next_message, |
@@ -1087,15 +1085,14 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1087 | GNUNET_assert (NULL == ch->client); | 1085 | GNUNET_assert (NULL == ch->client); |
1088 | ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); | 1086 | ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); |
1089 | GNUNET_assert (NULL != ch->client); | 1087 | GNUNET_assert (NULL != ch->client); |
1088 | uint16_t reconn_size = ntohs (ch->reconnect_msg->size); | ||
1090 | 1089 | ||
1091 | if (NULL == ch->tmit_head || | 1090 | if (NULL == ch->tmit_head || |
1092 | ch->tmit_head->msg->type != ch->reconnect_msg->type) | 1091 | 0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size)) |
1093 | { | 1092 | { |
1094 | uint16_t reconn_size = ntohs (ch->reconnect_msg->size); | 1093 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size); |
1095 | struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); | 1094 | memcpy (&mq[1], ch->reconnect_msg, reconn_size); |
1096 | memcpy (&op[1], ch->reconnect_msg, reconn_size); | 1095 | GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq); |
1097 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
1098 | GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, op); | ||
1099 | } | 1096 | } |
1100 | transmit_next (ch); | 1097 | transmit_next (ch); |
1101 | } | 1098 | } |
@@ -1104,7 +1101,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1104 | /** | 1101 | /** |
1105 | * Disconnect from the PSYC service. | 1102 | * Disconnect from the PSYC service. |
1106 | * | 1103 | * |
1107 | * @param c Channel handle to disconnect | 1104 | * @param c Channel handle to disconnect. |
1108 | */ | 1105 | */ |
1109 | static void | 1106 | static void |
1110 | disconnect (void *c) | 1107 | disconnect (void *c) |
@@ -1167,6 +1164,7 @@ disconnect (void *c) | |||
1167 | * @param join_cb Function to invoke when a peer wants to join. | 1164 | * @param join_cb Function to invoke when a peer wants to join. |
1168 | * @param master_started_cb Function to invoke after the channel master started. | 1165 | * @param master_started_cb Function to invoke after the channel master started. |
1169 | * @param cls Closure for @a master_started_cb and @a join_cb. | 1166 | * @param cls Closure for @a master_started_cb and @a join_cb. |
1167 | * | ||
1170 | * @return Handle for the channel master, NULL on error. | 1168 | * @return Handle for the channel master, NULL on error. |
1171 | */ | 1169 | */ |
1172 | struct GNUNET_PSYC_Master * | 1170 | struct GNUNET_PSYC_Master * |
@@ -1187,17 +1185,16 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1187 | req->channel_key = *channel_key; | 1185 | req->channel_key = *channel_key; |
1188 | req->policy = policy; | 1186 | req->policy = policy; |
1189 | 1187 | ||
1188 | mst->start_cb = master_started_cb; | ||
1189 | ch->message_cb = message_cb; | ||
1190 | ch->join_cb = join_cb; | ||
1191 | ch->cb_cls = cls; | ||
1190 | ch->cfg = cfg; | 1192 | ch->cfg = cfg; |
1191 | ch->is_master = GNUNET_YES; | 1193 | ch->is_master = GNUNET_YES; |
1192 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; | 1194 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; |
1193 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 1195 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; |
1194 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); | 1196 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); |
1195 | 1197 | ||
1196 | ch->message_cb = message_cb; | ||
1197 | ch->join_cb = join_cb; | ||
1198 | ch->cb_cls = cls; | ||
1199 | mst->start_cb = master_started_cb; | ||
1200 | |||
1201 | return mst; | 1198 | return mst; |
1202 | } | 1199 | } |
1203 | 1200 | ||
@@ -1260,6 +1257,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1260 | * @param notify_data Function to call to obtain fragments of the data. | 1257 | * @param notify_data Function to call to obtain fragments of the data. |
1261 | * @param notify_cls Closure for @a notify_mod and @a notify_data. | 1258 | * @param notify_cls Closure for @a notify_mod and @a notify_data. |
1262 | * @param flags Flags for the message being transmitted. | 1259 | * @param flags Flags for the message being transmitted. |
1260 | * | ||
1263 | * @return Transmission handle, NULL on error (i.e. more than one request queued). | 1261 | * @return Transmission handle, NULL on error (i.e. more than one request queued). |
1264 | */ | 1262 | */ |
1265 | struct GNUNET_PSYC_MasterTransmitHandle * | 1263 | struct GNUNET_PSYC_MasterTransmitHandle * |
@@ -1330,6 +1328,7 @@ GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) | |||
1330 | * @param env Environment containing transient variables for the request, or NULL. | 1328 | * @param env Environment containing transient variables for the request, or NULL. |
1331 | * @param data Payload for the join message. | 1329 | * @param data Payload for the join message. |
1332 | * @param data_size Number of bytes in @a data. | 1330 | * @param data_size Number of bytes in @a data. |
1331 | * | ||
1333 | * @return Handle for the slave, NULL on error. | 1332 | * @return Handle for the slave, NULL on error. |
1334 | */ | 1333 | */ |
1335 | struct GNUNET_PSYC_Slave * | 1334 | struct GNUNET_PSYC_Slave * |
@@ -1361,6 +1360,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1361 | req->relay_count = htonl (relay_count); | 1360 | req->relay_count = htonl (relay_count); |
1362 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); | 1361 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); |
1363 | 1362 | ||
1363 | slv->join_cb = slave_joined_cb; | ||
1364 | ch->message_cb = message_cb; | 1364 | ch->message_cb = message_cb; |
1365 | ch->join_cb = join_cb; | 1365 | ch->join_cb = join_cb; |
1366 | ch->cb_cls = cls; | 1366 | ch->cb_cls = cls; |
@@ -1371,7 +1371,6 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1371 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 1371 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; |
1372 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); | 1372 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); |
1373 | 1373 | ||
1374 | slv->join_cb = slave_joined_cb; | ||
1375 | return slv; | 1374 | return slv; |
1376 | } | 1375 | } |
1377 | 1376 | ||
@@ -1401,6 +1400,7 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) | |||
1401 | * @param notify_data Function to call to obtain fragments of the data. | 1400 | * @param notify_data Function to call to obtain fragments of the data. |
1402 | * @param notify_cls Closure for @a notify. | 1401 | * @param notify_cls Closure for @a notify. |
1403 | * @param flags Flags for the message being transmitted. | 1402 | * @param flags Flags for the message being transmitted. |
1403 | * | ||
1404 | * @return Transmission handle, NULL on error (i.e. more than one request | 1404 | * @return Transmission handle, NULL on error (i.e. more than one request |
1405 | * queued). | 1405 | * queued). |
1406 | */ | 1406 | */ |
@@ -1447,6 +1447,7 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) | |||
1447 | * APIs. | 1447 | * APIs. |
1448 | * | 1448 | * |
1449 | * @param master Channel master handle. | 1449 | * @param master Channel master handle. |
1450 | * | ||
1450 | * @return Channel handle, valid for as long as @a master is valid. | 1451 | * @return Channel handle, valid for as long as @a master is valid. |
1451 | */ | 1452 | */ |
1452 | struct GNUNET_PSYC_Channel * | 1453 | struct GNUNET_PSYC_Channel * |
@@ -1460,6 +1461,7 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) | |||
1460 | * Convert @a slave to a @e channel handle to access the @e channel APIs. | 1461 | * Convert @a slave to a @e channel handle to access the @e channel APIs. |
1461 | * | 1462 | * |
1462 | * @param slave Slave handle. | 1463 | * @param slave Slave handle. |
1464 | * | ||
1463 | * @return Channel handle, valid for as long as @a slave is valid. | 1465 | * @return Channel handle, valid for as long as @a slave is valid. |
1464 | */ | 1466 | */ |
1465 | struct GNUNET_PSYC_Channel * | 1467 | struct GNUNET_PSYC_Channel * |
@@ -1497,18 +1499,16 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, | |||
1497 | uint64_t effective_since) | 1499 | uint64_t effective_since) |
1498 | { | 1500 | { |
1499 | struct ChannelSlaveAdd *slvadd; | 1501 | struct ChannelSlaveAdd *slvadd; |
1500 | struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvadd)); | 1502 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd)); |
1501 | |||
1502 | slvadd = (struct ChannelSlaveAdd *) &op[1]; | ||
1503 | op->msg = (struct GNUNET_MessageHeader *) slvadd; | ||
1504 | 1503 | ||
1504 | slvadd = (struct ChannelSlaveAdd *) &mq[1]; | ||
1505 | slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); | 1505 | slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); |
1506 | slvadd->header.size = htons (sizeof (*slvadd)); | 1506 | slvadd->header.size = htons (sizeof (*slvadd)); |
1507 | slvadd->announced_at = GNUNET_htonll (announced_at); | 1507 | slvadd->announced_at = GNUNET_htonll (announced_at); |
1508 | slvadd->effective_since = GNUNET_htonll (effective_since); | 1508 | slvadd->effective_since = GNUNET_htonll (effective_since); |
1509 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, | 1509 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, |
1510 | channel->tmit_tail, | 1510 | channel->tmit_tail, |
1511 | op); | 1511 | mq); |
1512 | transmit_next (channel); | 1512 | transmit_next (channel); |
1513 | } | 1513 | } |
1514 | 1514 | ||
@@ -1540,16 +1540,15 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, | |||
1540 | uint64_t announced_at) | 1540 | uint64_t announced_at) |
1541 | { | 1541 | { |
1542 | struct ChannelSlaveRemove *slvrm; | 1542 | struct ChannelSlaveRemove *slvrm; |
1543 | struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvrm)); | 1543 | struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm)); |
1544 | 1544 | ||
1545 | slvrm = (struct ChannelSlaveRemove *) &op[1]; | 1545 | slvrm = (struct ChannelSlaveRemove *) &mq[1]; |
1546 | op->msg = (struct GNUNET_MessageHeader *) slvrm; | ||
1547 | slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); | 1546 | slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); |
1548 | slvrm->header.size = htons (sizeof (*slvrm)); | 1547 | slvrm->header.size = htons (sizeof (*slvrm)); |
1549 | slvrm->announced_at = GNUNET_htonll (announced_at); | 1548 | slvrm->announced_at = GNUNET_htonll (announced_at); |
1550 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, | 1549 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, |
1551 | channel->tmit_tail, | 1550 | channel->tmit_tail, |
1552 | op); | 1551 | mq); |
1553 | transmit_next (channel); | 1552 | transmit_next (channel); |
1554 | } | 1553 | } |
1555 | 1554 | ||
@@ -1573,6 +1572,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, | |||
1573 | * has been called, the client must not call | 1572 | * has been called, the client must not call |
1574 | * GNUNET_PSYC_channel_story_tell_cancel() anymore. | 1573 | * GNUNET_PSYC_channel_story_tell_cancel() anymore. |
1575 | * @param cls Closure for the callbacks. | 1574 | * @param cls Closure for the callbacks. |
1575 | * | ||
1576 | * @return Handle to cancel story telling operation. | 1576 | * @return Handle to cancel story telling operation. |
1577 | */ | 1577 | */ |
1578 | struct GNUNET_PSYC_Story * | 1578 | struct GNUNET_PSYC_Story * |
@@ -1615,6 +1615,7 @@ GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story) | |||
1615 | * @param cb Function called once when a matching state variable is found. | 1615 | * @param cb Function called once when a matching state variable is found. |
1616 | * Not called if there's no matching state variable. | 1616 | * Not called if there's no matching state variable. |
1617 | * @param cb_cls Closure for the callbacks. | 1617 | * @param cb_cls Closure for the callbacks. |
1618 | * | ||
1618 | * @return Handle that can be used to cancel the query operation. | 1619 | * @return Handle that can be used to cancel the query operation. |
1619 | */ | 1620 | */ |
1620 | struct GNUNET_PSYC_StateQuery * | 1621 | struct GNUNET_PSYC_StateQuery * |
@@ -1641,6 +1642,7 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel, | |||
1641 | * @param name_prefix Prefix of the state variable name to match. | 1642 | * @param name_prefix Prefix of the state variable name to match. |
1642 | * @param cb Function to call with the matching state variables. | 1643 | * @param cb Function to call with the matching state variables. |
1643 | * @param cb_cls Closure for the callbacks. | 1644 | * @param cb_cls Closure for the callbacks. |
1645 | * | ||
1644 | * @return Handle that can be used to cancel the query operation. | 1646 | * @return Handle that can be used to cancel the query operation. |
1645 | */ | 1647 | */ |
1646 | struct GNUNET_PSYC_StateQuery * | 1648 | struct GNUNET_PSYC_StateQuery * |