aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/psyc_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-05-13 12:08:14 +0000
committerGabor X Toth <*@tg-x.net>2014-05-13 12:08:14 +0000
commit093f0291be26fa3dfc6fc98a536028ef99517832 (patch)
treec10078bfe4136f940183d8dfde85617ab75acf46 /src/psyc/psyc_api.c
parent783fc956a05c0f321fa63fbcaeab00bc1865a069 (diff)
downloadgnunet-093f0291be26fa3dfc6fc98a536028ef99517832.tar.gz
gnunet-093f0291be26fa3dfc6fc98a536028ef99517832.zip
multicast: send messages between client lib & service
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r--src/psyc/psyc_api.c210
1 files changed, 106 insertions, 104 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 22f1da069..85f86ceaa 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -41,11 +41,10 @@
41 41
42#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) 42#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
43 43
44struct OperationHandle 44struct MessageQueue
45{ 45{
46 struct OperationHandle *prev; 46 struct MessageQueue *prev;
47 struct OperationHandle *next; 47 struct MessageQueue *next;
48 struct GNUNET_MessageHeader *msg;
49}; 48};
50 49
51 50
@@ -87,19 +86,19 @@ struct GNUNET_PSYC_Channel
87 struct GNUNET_CLIENT_TransmitHandle *th; 86 struct GNUNET_CLIENT_TransmitHandle *th;
88 87
89 /** 88 /**
90 * Head of operations to transmit. 89 * Head of messages to transmit to the service.
91 */ 90 */
92 struct OperationHandle *tmit_head; 91 struct MessageQueue *tmit_head;
93 92
94 /** 93 /**
95 * Tail of operations to transmit. 94 * Tail of operations to transmit to the service.
96 */ 95 */
97 struct OperationHandle *tmit_tail; 96 struct MessageQueue *tmit_tail;
98 97
99 /** 98 /**
100 * Message being transmitted to the PSYC service. 99 * Message currently being transmitted to the service.
101 */ 100 */
102 struct OperationHandle *tmit_msg; 101 struct MessageQueue *tmit_msg;
103 102
104 /** 103 /**
105 * Message to send on reconnect. 104 * Message to send on reconnect.
@@ -201,8 +200,6 @@ struct GNUNET_PSYC_Master
201 struct GNUNET_PSYC_Channel ch; 200 struct GNUNET_PSYC_Channel ch;
202 201
203 GNUNET_PSYC_MasterStartCallback start_cb; 202 GNUNET_PSYC_MasterStartCallback start_cb;
204
205 uint64_t max_message_id;
206}; 203};
207 204
208 205
@@ -214,8 +211,6 @@ struct GNUNET_PSYC_Slave
214 struct GNUNET_PSYC_Channel ch; 211 struct GNUNET_PSYC_Channel ch;
215 212
216 GNUNET_PSYC_SlaveJoinCallback join_cb; 213 GNUNET_PSYC_SlaveJoinCallback join_cb;
217
218 uint64_t max_message_id;
219}; 214};
220 215
221 216
@@ -269,30 +264,30 @@ channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
269/** 264/**
270 * Reschedule a connect attempt to the service. 265 * Reschedule a connect attempt to the service.
271 * 266 *
272 * @param c channel to reconnect 267 * @param ch Channel to reconnect.
273 */ 268 */
274static void 269static void
275reschedule_connect (struct GNUNET_PSYC_Channel *c) 270reschedule_connect (struct GNUNET_PSYC_Channel *ch)
276{ 271{
277 GNUNET_assert (c->reconnect_task == GNUNET_SCHEDULER_NO_TASK); 272 GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
278 273
279 if (NULL != c->th) 274 if (NULL != ch->th)
280 { 275 {
281 GNUNET_CLIENT_notify_transmit_ready_cancel (c->th); 276 GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
282 c->th = NULL; 277 ch->th = NULL;
283 } 278 }
284 if (NULL != c->client) 279 if (NULL != ch->client)
285 { 280 {
286 GNUNET_CLIENT_disconnect (c->client); 281 GNUNET_CLIENT_disconnect (ch->client);
287 c->client = NULL; 282 ch->client = NULL;
288 } 283 }
289 c->in_receive = GNUNET_NO; 284 ch->in_receive = GNUNET_NO;
290 LOG (GNUNET_ERROR_TYPE_DEBUG, 285 LOG (GNUNET_ERROR_TYPE_DEBUG,
291 "Scheduling task to reconnect to PSYC service in %s.\n", 286 "Scheduling task to reconnect to PSYC service in %s.\n",
292 GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES)); 287 GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, GNUNET_YES));
293 c->reconnect_task = 288 ch->reconnect_task =
294 GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c); 289 GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch);
295 c->reconnect_delay = GNUNET_TIME_STD_BACKOFF (c->reconnect_delay); 290 ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay);
296} 291}
297 292
298 293
@@ -306,7 +301,7 @@ transmit_next (struct GNUNET_PSYC_Channel *ch);
306 301
307 302
308/** 303/**
309 * Reset data stored related to the last received message. 304 * Reset stored data related to the last received message.
310 */ 305 */
311static void 306static void
312recv_reset (struct GNUNET_PSYC_Channel *ch) 307recv_reset (struct GNUNET_PSYC_Channel *ch)
@@ -356,51 +351,53 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
356 "Queueing message of type %u and size %u (end: %u)).\n", 351 "Queueing message of type %u and size %u (end: %u)).\n",
357 ntohs (msg->type), size, end); 352 ntohs (msg->type), size, end);
358 353
359 struct OperationHandle *op = ch->tmit_msg; 354 struct MessageQueue *mq = ch->tmit_msg;
360 if (NULL != op) 355 struct GNUNET_MessageHeader *qmsg = NULL;
356 if (NULL != mq)
361 { 357 {
358 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
362 if (NULL == msg 359 if (NULL == msg
363 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size) 360 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size)
364 { 361 {
365 /* End of message or buffer is full, add it to transmission queue 362 /* End of message or buffer is full, add it to transmission queue
366 * and start with empty buffer */ 363 * and start with empty buffer */
367 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 364 qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
368 op->msg->size = htons (op->msg->size); 365 qmsg->size = htons (qmsg->size);
369 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 366 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
370 ch->tmit_msg = op = NULL; 367 ch->tmit_msg = mq = NULL;
371 ch->tmit_ack_pending++; 368 ch->tmit_ack_pending++;
372 } 369 }
373 else 370 else
374 { 371 {
375 /* Message fits in current buffer, append */ 372 /* Message fits in current buffer, append */
376 ch->tmit_msg = op 373 ch->tmit_msg
377 = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size); 374 = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size);
378 op->msg = (struct GNUNET_MessageHeader *) &op[1]; 375 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
379 memcpy ((char *) op->msg + op->msg->size, msg, size); 376 memcpy ((char *) qmsg + qmsg->size, msg, size);
380 op->msg->size += size; 377 qmsg->size += size;
381 } 378 }
382 } 379 }
383 380
384 if (NULL == op && NULL != msg) 381 if (NULL == mq && NULL != msg)
385 { 382 {
386 /* Empty buffer, copy over message. */ 383 /* Empty buffer, copy over message. */
387 ch->tmit_msg = op 384 ch->tmit_msg
388 = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size); 385 = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size);
389 op->msg = (struct GNUNET_MessageHeader *) &op[1]; 386 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
390 op->msg->size = sizeof (*op->msg) + size; 387 qmsg->size = sizeof (*qmsg) + size;
391 memcpy (&op->msg[1], msg, size); 388 memcpy (&qmsg[1], msg, size);
392 } 389 }
393 390
394 if (NULL != op 391 if (NULL != mq
395 && (GNUNET_YES == end 392 && (GNUNET_YES == end
396 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD 393 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
397 < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) 394 < qmsg->size + sizeof (struct GNUNET_MessageHeader))))
398 { 395 {
399 /* End of message or buffer is full, add it to transmission queue. */ 396 /* End of message or buffer is full, add it to transmission queue. */
400 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 397 qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
401 op->msg->size = htons (op->msg->size); 398 qmsg->size = htons (qmsg->size);
402 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 399 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
403 ch->tmit_msg = op = NULL; 400 ch->tmit_msg = mq = NULL;
404 ch->tmit_ack_pending++; 401 ch->tmit_ack_pending++;
405 } 402 }
406 403
@@ -577,6 +574,7 @@ channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
577 * @param notify_data Function to call to obtain fragments of the data. 574 * @param notify_data Function to call to obtain fragments of the data.
578 * @param notify_cls Closure for @a notify_mod and @a notify_data. 575 * @param notify_cls Closure for @a notify_mod and @a notify_data.
579 * @param flags Flags for the message being transmitted. 576 * @param flags Flags for the message being transmitted.
577 *
580 * @return Transmission handle, NULL on error (i.e. more than one request queued). 578 * @return Transmission handle, NULL on error (i.e. more than one request queued).
581 */ 579 */
582static struct GNUNET_PSYC_ChannelTransmitHandle * 580static struct GNUNET_PSYC_ChannelTransmitHandle *
@@ -593,14 +591,14 @@ channel_transmit (struct GNUNET_PSYC_Channel *ch,
593 591
594 size_t size = strlen (method_name) + 1; 592 size_t size = strlen (method_name) + 1;
595 struct GNUNET_PSYC_MessageMethod *pmeth; 593 struct GNUNET_PSYC_MessageMethod *pmeth;
596 struct OperationHandle *op; 594 struct GNUNET_MessageHeader *qmsg;
597 595 struct MessageQueue *
598 ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) 596 mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg)
599 + sizeof (*pmeth) + size); 597 + sizeof (*pmeth) + size);
600 op->msg = (struct GNUNET_MessageHeader *) &op[1]; 598 qmsg = (struct GNUNET_MessageHeader *) &mq[1];
601 op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size; 599 qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size;
602 600
603 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1]; 601 pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1];
604 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); 602 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
605 pmeth->header.size = htons (sizeof (*pmeth) + size); 603 pmeth->header.size = htons (sizeof (*pmeth) + size);
606 pmeth->flags = htonl (flags); 604 pmeth->flags = htonl (flags);
@@ -928,7 +926,7 @@ message_handler (void *cls,
928 926
929 if (NULL == msg) 927 if (NULL == msg)
930 { 928 {
931 // timeout / disconnected from server, reconnect 929 // timeout / disconnected from service, reconnect
932 reschedule_connect (ch); 930 reschedule_connect (ch);
933 return; 931 return;
934 } 932 }
@@ -970,17 +968,15 @@ message_handler (void *cls,
970 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: 968 case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
971 { 969 {
972 struct CountersResult *cres = (struct CountersResult *) msg; 970 struct CountersResult *cres = (struct CountersResult *) msg;
973 mst->max_message_id = GNUNET_ntohll (cres->max_message_id);
974 if (NULL != mst->start_cb) 971 if (NULL != mst->start_cb)
975 mst->start_cb (ch->cb_cls, mst->max_message_id); 972 mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
976 break; 973 break;
977 } 974 }
978 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: 975 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
979 { 976 {
980 struct CountersResult *cres = (struct CountersResult *) msg; 977 struct CountersResult *cres = (struct CountersResult *) msg;
981 slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
982 if (NULL != slv->join_cb) 978 if (NULL != slv->join_cb)
983 slv->join_cb (ch->cb_cls, slv->max_message_id); 979 slv->join_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
984 break; 980 break;
985 } 981 }
986 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: 982 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
@@ -1005,31 +1001,32 @@ message_handler (void *cls,
1005/** 1001/**
1006 * Transmit next message to service. 1002 * Transmit next message to service.
1007 * 1003 *
1008 * @param cls The 'struct GNUNET_PSYC_Channel'. 1004 * @param cls The struct GNUNET_PSYC_Channel.
1009 * @param size Number of bytes available in buf. 1005 * @param size Number of bytes available in @a buf.
1010 * @param buf Where to copy the message. 1006 * @param buf Where to copy the message.
1011 * @return Number of bytes copied to buf. 1007 *
1008 * @return Number of bytes copied to @a buf.
1012 */ 1009 */
1013static size_t 1010static size_t
1014send_next_message (void *cls, size_t size, void *buf) 1011send_next_message (void *cls, size_t size, void *buf)
1015{ 1012{
1016 struct GNUNET_PSYC_Channel *ch = cls;
1017 struct OperationHandle *op = ch->tmit_head;
1018 size_t ret;
1019 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); 1013 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
1020 ch->th = NULL; 1014 struct GNUNET_PSYC_Channel *ch = cls;
1021 if (NULL == op->msg) 1015 struct MessageQueue *mq = ch->tmit_head;
1016 if (NULL == mq)
1022 return 0; 1017 return 0;
1023 ret = ntohs (op->msg->size); 1018 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
1019 size_t ret = ntohs (qmsg->size);
1020 ch->th = NULL;
1024 if (ret > size) 1021 if (ret > size)
1025 { 1022 {
1026 reschedule_connect (ch); 1023 reschedule_connect (ch);
1027 return 0; 1024 return 0;
1028 } 1025 }
1029 memcpy (buf, op->msg, ret); 1026 memcpy (buf, qmsg, ret);
1030 1027
1031 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, op); 1028 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq);
1032 GNUNET_free (op); 1029 GNUNET_free (mq);
1033 1030
1034 if (NULL != ch->tmit_head) 1031 if (NULL != ch->tmit_head)
1035 transmit_next (ch); 1032 transmit_next (ch);
@@ -1056,12 +1053,13 @@ transmit_next (struct GNUNET_PSYC_Channel *ch)
1056 if (NULL != ch->th || NULL == ch->client) 1053 if (NULL != ch->th || NULL == ch->client)
1057 return; 1054 return;
1058 1055
1059 struct OperationHandle *op = ch->tmit_head; 1056 struct MessageQueue *mq = ch->tmit_head;
1060 if (NULL == op) 1057 if (NULL == mq)
1061 return; 1058 return;
1059 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
1062 1060
1063 ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client, 1061 ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client,
1064 ntohs (op->msg->size), 1062 ntohs (qmsg->size),
1065 GNUNET_TIME_UNIT_FOREVER_REL, 1063 GNUNET_TIME_UNIT_FOREVER_REL,
1066 GNUNET_NO, 1064 GNUNET_NO,
1067 &send_next_message, 1065 &send_next_message,
@@ -1087,15 +1085,14 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1087 GNUNET_assert (NULL == ch->client); 1085 GNUNET_assert (NULL == ch->client);
1088 ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); 1086 ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
1089 GNUNET_assert (NULL != ch->client); 1087 GNUNET_assert (NULL != ch->client);
1088 uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
1090 1089
1091 if (NULL == ch->tmit_head || 1090 if (NULL == ch->tmit_head ||
1092 ch->tmit_head->msg->type != ch->reconnect_msg->type) 1091 0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size))
1093 { 1092 {
1094 uint16_t reconn_size = ntohs (ch->reconnect_msg->size); 1093 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
1095 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); 1094 memcpy (&mq[1], ch->reconnect_msg, reconn_size);
1096 memcpy (&op[1], ch->reconnect_msg, reconn_size); 1095 GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq);
1097 op->msg = (struct GNUNET_MessageHeader *) &op[1];
1098 GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, op);
1099 } 1096 }
1100 transmit_next (ch); 1097 transmit_next (ch);
1101} 1098}
@@ -1104,7 +1101,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1104/** 1101/**
1105 * Disconnect from the PSYC service. 1102 * Disconnect from the PSYC service.
1106 * 1103 *
1107 * @param c Channel handle to disconnect 1104 * @param c Channel handle to disconnect.
1108 */ 1105 */
1109static void 1106static void
1110disconnect (void *c) 1107disconnect (void *c)
@@ -1167,6 +1164,7 @@ disconnect (void *c)
1167 * @param join_cb Function to invoke when a peer wants to join. 1164 * @param join_cb Function to invoke when a peer wants to join.
1168 * @param master_started_cb Function to invoke after the channel master started. 1165 * @param master_started_cb Function to invoke after the channel master started.
1169 * @param cls Closure for @a master_started_cb and @a join_cb. 1166 * @param cls Closure for @a master_started_cb and @a join_cb.
1167 *
1170 * @return Handle for the channel master, NULL on error. 1168 * @return Handle for the channel master, NULL on error.
1171 */ 1169 */
1172struct GNUNET_PSYC_Master * 1170struct GNUNET_PSYC_Master *
@@ -1187,17 +1185,16 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
1187 req->channel_key = *channel_key; 1185 req->channel_key = *channel_key;
1188 req->policy = policy; 1186 req->policy = policy;
1189 1187
1188 mst->start_cb = master_started_cb;
1189 ch->message_cb = message_cb;
1190 ch->join_cb = join_cb;
1191 ch->cb_cls = cls;
1190 ch->cfg = cfg; 1192 ch->cfg = cfg;
1191 ch->is_master = GNUNET_YES; 1193 ch->is_master = GNUNET_YES;
1192 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; 1194 ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
1193 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 1195 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1194 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); 1196 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
1195 1197
1196 ch->message_cb = message_cb;
1197 ch->join_cb = join_cb;
1198 ch->cb_cls = cls;
1199 mst->start_cb = master_started_cb;
1200
1201 return mst; 1198 return mst;
1202} 1199}
1203 1200
@@ -1260,6 +1257,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1260 * @param notify_data Function to call to obtain fragments of the data. 1257 * @param notify_data Function to call to obtain fragments of the data.
1261 * @param notify_cls Closure for @a notify_mod and @a notify_data. 1258 * @param notify_cls Closure for @a notify_mod and @a notify_data.
1262 * @param flags Flags for the message being transmitted. 1259 * @param flags Flags for the message being transmitted.
1260 *
1263 * @return Transmission handle, NULL on error (i.e. more than one request queued). 1261 * @return Transmission handle, NULL on error (i.e. more than one request queued).
1264 */ 1262 */
1265struct GNUNET_PSYC_MasterTransmitHandle * 1263struct GNUNET_PSYC_MasterTransmitHandle *
@@ -1330,6 +1328,7 @@ GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
1330 * @param env Environment containing transient variables for the request, or NULL. 1328 * @param env Environment containing transient variables for the request, or NULL.
1331 * @param data Payload for the join message. 1329 * @param data Payload for the join message.
1332 * @param data_size Number of bytes in @a data. 1330 * @param data_size Number of bytes in @a data.
1331 *
1333 * @return Handle for the slave, NULL on error. 1332 * @return Handle for the slave, NULL on error.
1334 */ 1333 */
1335struct GNUNET_PSYC_Slave * 1334struct GNUNET_PSYC_Slave *
@@ -1361,6 +1360,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1361 req->relay_count = htonl (relay_count); 1360 req->relay_count = htonl (relay_count);
1362 memcpy (&req[1], relays, relay_count * sizeof (*relays)); 1361 memcpy (&req[1], relays, relay_count * sizeof (*relays));
1363 1362
1363 slv->join_cb = slave_joined_cb;
1364 ch->message_cb = message_cb; 1364 ch->message_cb = message_cb;
1365 ch->join_cb = join_cb; 1365 ch->join_cb = join_cb;
1366 ch->cb_cls = cls; 1366 ch->cb_cls = cls;
@@ -1371,7 +1371,6 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1371 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 1371 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1372 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); 1372 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
1373 1373
1374 slv->join_cb = slave_joined_cb;
1375 return slv; 1374 return slv;
1376} 1375}
1377 1376
@@ -1401,6 +1400,7 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1401 * @param notify_data Function to call to obtain fragments of the data. 1400 * @param notify_data Function to call to obtain fragments of the data.
1402 * @param notify_cls Closure for @a notify. 1401 * @param notify_cls Closure for @a notify.
1403 * @param flags Flags for the message being transmitted. 1402 * @param flags Flags for the message being transmitted.
1403 *
1404 * @return Transmission handle, NULL on error (i.e. more than one request 1404 * @return Transmission handle, NULL on error (i.e. more than one request
1405 * queued). 1405 * queued).
1406 */ 1406 */
@@ -1447,6 +1447,7 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1447 * APIs. 1447 * APIs.
1448 * 1448 *
1449 * @param master Channel master handle. 1449 * @param master Channel master handle.
1450 *
1450 * @return Channel handle, valid for as long as @a master is valid. 1451 * @return Channel handle, valid for as long as @a master is valid.
1451 */ 1452 */
1452struct GNUNET_PSYC_Channel * 1453struct GNUNET_PSYC_Channel *
@@ -1460,6 +1461,7 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1460 * Convert @a slave to a @e channel handle to access the @e channel APIs. 1461 * Convert @a slave to a @e channel handle to access the @e channel APIs.
1461 * 1462 *
1462 * @param slave Slave handle. 1463 * @param slave Slave handle.
1464 *
1463 * @return Channel handle, valid for as long as @a slave is valid. 1465 * @return Channel handle, valid for as long as @a slave is valid.
1464 */ 1466 */
1465struct GNUNET_PSYC_Channel * 1467struct GNUNET_PSYC_Channel *
@@ -1497,18 +1499,16 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
1497 uint64_t effective_since) 1499 uint64_t effective_since)
1498{ 1500{
1499 struct ChannelSlaveAdd *slvadd; 1501 struct ChannelSlaveAdd *slvadd;
1500 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvadd)); 1502 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd));
1501
1502 slvadd = (struct ChannelSlaveAdd *) &op[1];
1503 op->msg = (struct GNUNET_MessageHeader *) slvadd;
1504 1503
1504 slvadd = (struct ChannelSlaveAdd *) &mq[1];
1505 slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); 1505 slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
1506 slvadd->header.size = htons (sizeof (*slvadd)); 1506 slvadd->header.size = htons (sizeof (*slvadd));
1507 slvadd->announced_at = GNUNET_htonll (announced_at); 1507 slvadd->announced_at = GNUNET_htonll (announced_at);
1508 slvadd->effective_since = GNUNET_htonll (effective_since); 1508 slvadd->effective_since = GNUNET_htonll (effective_since);
1509 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, 1509 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1510 channel->tmit_tail, 1510 channel->tmit_tail,
1511 op); 1511 mq);
1512 transmit_next (channel); 1512 transmit_next (channel);
1513} 1513}
1514 1514
@@ -1540,16 +1540,15 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
1540 uint64_t announced_at) 1540 uint64_t announced_at)
1541{ 1541{
1542 struct ChannelSlaveRemove *slvrm; 1542 struct ChannelSlaveRemove *slvrm;
1543 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*slvrm)); 1543 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm));
1544 1544
1545 slvrm = (struct ChannelSlaveRemove *) &op[1]; 1545 slvrm = (struct ChannelSlaveRemove *) &mq[1];
1546 op->msg = (struct GNUNET_MessageHeader *) slvrm;
1547 slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); 1546 slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
1548 slvrm->header.size = htons (sizeof (*slvrm)); 1547 slvrm->header.size = htons (sizeof (*slvrm));
1549 slvrm->announced_at = GNUNET_htonll (announced_at); 1548 slvrm->announced_at = GNUNET_htonll (announced_at);
1550 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, 1549 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
1551 channel->tmit_tail, 1550 channel->tmit_tail,
1552 op); 1551 mq);
1553 transmit_next (channel); 1552 transmit_next (channel);
1554} 1553}
1555 1554
@@ -1573,6 +1572,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
1573 * has been called, the client must not call 1572 * has been called, the client must not call
1574 * GNUNET_PSYC_channel_story_tell_cancel() anymore. 1573 * GNUNET_PSYC_channel_story_tell_cancel() anymore.
1575 * @param cls Closure for the callbacks. 1574 * @param cls Closure for the callbacks.
1575 *
1576 * @return Handle to cancel story telling operation. 1576 * @return Handle to cancel story telling operation.
1577 */ 1577 */
1578struct GNUNET_PSYC_Story * 1578struct GNUNET_PSYC_Story *
@@ -1615,6 +1615,7 @@ GNUNET_PSYC_channel_story_tell_cancel (struct GNUNET_PSYC_Story *story)
1615 * @param cb Function called once when a matching state variable is found. 1615 * @param cb Function called once when a matching state variable is found.
1616 * Not called if there's no matching state variable. 1616 * Not called if there's no matching state variable.
1617 * @param cb_cls Closure for the callbacks. 1617 * @param cb_cls Closure for the callbacks.
1618 *
1618 * @return Handle that can be used to cancel the query operation. 1619 * @return Handle that can be used to cancel the query operation.
1619 */ 1620 */
1620struct GNUNET_PSYC_StateQuery * 1621struct GNUNET_PSYC_StateQuery *
@@ -1641,6 +1642,7 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
1641 * @param name_prefix Prefix of the state variable name to match. 1642 * @param name_prefix Prefix of the state variable name to match.
1642 * @param cb Function to call with the matching state variables. 1643 * @param cb Function to call with the matching state variables.
1643 * @param cb_cls Closure for the callbacks. 1644 * @param cb_cls Closure for the callbacks.
1645 *
1644 * @return Handle that can be used to cancel the query operation. 1646 * @return Handle that can be used to cancel the query operation.
1645 */ 1647 */
1646struct GNUNET_PSYC_StateQuery * 1648struct GNUNET_PSYC_StateQuery *