aboutsummaryrefslogtreecommitdiff
path: root/src/psyc
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2016-08-17 21:26:41 +0000
committerGabor X Toth <*@tg-x.net>2016-08-17 21:26:41 +0000
commit667cc67f8224ccf4ff391b125a614cf90cf5917e (patch)
treeae2048a6525ab2521ad989afa795d7a2f0833af6 /src/psyc
parent720a38ea7519f5a1a820157f056b150ab3d4abd5 (diff)
downloadgnunet-667cc67f8224ccf4ff391b125a614cf90cf5917e.tar.gz
gnunet-667cc67f8224ccf4ff391b125a614cf90cf5917e.zip
psyc, social: switch to MQ
Diffstat (limited to 'src/psyc')
-rw-r--r--src/psyc/psyc_api.c662
1 files changed, 403 insertions, 259 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 515a2731a..2f6a15bab 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -55,7 +55,27 @@ struct GNUNET_PSYC_Channel
55 /** 55 /**
56 * Client connection to the service. 56 * Client connection to the service.
57 */ 57 */
58 struct GNUNET_CLIENT_MANAGER_Connection *client; 58 struct GNUNET_MQ_Handle *mq;
59
60 /**
61 * Message to send on connect.
62 */
63 struct GNUNET_MQ_Envelope *connect_env;
64
65 /**
66 * Time to wait until we try to reconnect on failure.
67 */
68 struct GNUNET_TIME_Relative reconnect_delay;
69
70 /**
71 * Task for reconnecting when the listener fails.
72 */
73 struct GNUNET_SCHEDULER_Task *reconnect_task;
74
75 /**
76 * Async operations.
77 */
78 struct GNUNET_OP_Handle *op;
59 79
60 /** 80 /**
61 * Transmission handle; 81 * Transmission handle;
@@ -68,11 +88,6 @@ struct GNUNET_PSYC_Channel
68 struct GNUNET_PSYC_ReceiveHandle *recv; 88 struct GNUNET_PSYC_ReceiveHandle *recv;
69 89
70 /** 90 /**
71 * Message to send on reconnect.
72 */
73 struct GNUNET_MessageHeader *connect_msg;
74
75 /**
76 * Function called after disconnected from the service. 91 * Function called after disconnected from the service.
77 */ 92 */
78 GNUNET_ContinuationCallback disconnect_cb; 93 GNUNET_ContinuationCallback disconnect_cb;
@@ -219,41 +234,21 @@ struct GNUNET_PSYC_StateRequest
219}; 234};
220 235
221 236
222static void 237static int
223channel_send_connect_msg (struct GNUNET_PSYC_Channel *chn) 238check_channel_result (void *cls,
224{ 239 const struct GNUNET_OperationResultMessage *res)
225 uint16_t cmsg_size = ntohs (chn->connect_msg->size);
226 struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size);
227 GNUNET_memcpy (cmsg, chn->connect_msg, cmsg_size);
228 GNUNET_CLIENT_MANAGER_transmit_now (chn->client, cmsg);
229 GNUNET_free (cmsg);
230}
231
232
233static void
234channel_recv_disconnect (void *cls,
235 struct GNUNET_CLIENT_MANAGER_Connection *client,
236 const struct GNUNET_MessageHeader *msg)
237{ 240{
238 struct GNUNET_PSYC_Channel * 241 return GNUNET_OK;
239 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
240 GNUNET_CLIENT_MANAGER_reconnect (client);
241 channel_send_connect_msg (chn);
242} 242}
243 243
244 244
245static void 245static void
246channel_recv_result (void *cls, 246handle_channel_result (void *cls,
247 struct GNUNET_CLIENT_MANAGER_Connection *client, 247 const struct GNUNET_OperationResultMessage *res)
248 const struct GNUNET_MessageHeader *msg)
249{ 248{
250 struct GNUNET_PSYC_Channel * 249 struct GNUNET_PSYC_Channel *chn = cls;
251 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
252 250
253 const struct GNUNET_OperationResultMessage * 251 uint16_t size = ntohs (res->header.size);
254 res = (const struct GNUNET_OperationResultMessage *) msg;
255
256 uint16_t size = ntohs (msg->size);
257 if (size < sizeof (*res)) 252 if (size < sizeof (*res))
258 { /* Error, message too small. */ 253 { /* Error, message too small. */
259 GNUNET_break (0); 254 GNUNET_break (0);
@@ -262,9 +257,9 @@ channel_recv_result (void *cls,
262 257
263 uint16_t data_size = size - sizeof (*res); 258 uint16_t data_size = size - sizeof (*res);
264 const char *data = (0 < data_size) ? (void *) &res[1] : NULL; 259 const char *data = (0 < data_size) ? (void *) &res[1] : NULL;
265 GNUNET_CLIENT_MANAGER_op_result (chn->client, GNUNET_ntohll (res->op_id), 260 GNUNET_OP_result (chn->op, GNUNET_ntohll (res->op_id),
266 GNUNET_ntohll (res->result_code), 261 GNUNET_ntohll (res->result_code),
267 data, data_size); 262 data, data_size, NULL);
268} 263}
269 264
270 265
@@ -301,18 +296,30 @@ op_recv_state_result (void *cls, int64_t result,
301} 296}
302 297
303 298
304static void 299static int
305channel_recv_history_result (void *cls, 300check_channel_history_result (void *cls,
306 struct GNUNET_CLIENT_MANAGER_Connection *client, 301 const struct GNUNET_OperationResultMessage *res)
307 const struct GNUNET_MessageHeader *msg)
308{ 302{
309 struct GNUNET_PSYC_Channel * 303 struct GNUNET_PSYC_MessageHeader *
310 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 304 pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res);
305 uint16_t size = ntohs (res->header.size);
311 306
312 const struct GNUNET_OperationResultMessage * 307 if (NULL == pmsg || size < sizeof (*res) + sizeof (*pmsg))
313 res = (const struct GNUNET_OperationResultMessage *) msg; 308 { /* Error, message too small. */
309 GNUNET_break_op (0);
310 return GNUNET_SYSERR;
311 }
312 return GNUNET_OK;
313}
314
315
316static void
317handle_channel_history_result (void *cls,
318 const struct GNUNET_OperationResultMessage *res)
319{
320 struct GNUNET_PSYC_Channel *chn = cls;
314 struct GNUNET_PSYC_MessageHeader * 321 struct GNUNET_PSYC_MessageHeader *
315 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; 322 pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res);
316 323
317 LOG (GNUNET_ERROR_TYPE_DEBUG, 324 LOG (GNUNET_ERROR_TYPE_DEBUG,
318 "%p Received historic fragment for message #%" PRIu64 ".\n", 325 "%p Received historic fragment for message #%" PRIu64 ".\n",
@@ -321,9 +328,9 @@ channel_recv_history_result (void *cls,
321 GNUNET_ResultCallback result_cb = NULL; 328 GNUNET_ResultCallback result_cb = NULL;
322 struct GNUNET_PSYC_HistoryRequest *hist = NULL; 329 struct GNUNET_PSYC_HistoryRequest *hist = NULL;
323 330
324 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client, 331 if (GNUNET_YES != GNUNET_OP_get (chn->op,
325 GNUNET_ntohll (res->op_id), 332 GNUNET_ntohll (res->op_id),
326 &result_cb, (void *) &hist)) 333 &result_cb, (void *) &hist, NULL))
327 { /* Operation not found. */ 334 { /* Operation not found. */
328 LOG (GNUNET_ERROR_TYPE_WARNING, 335 LOG (GNUNET_ERROR_TYPE_WARNING,
329 "%p Replay operation not found for historic fragment of message #%" 336 "%p Replay operation not found for historic fragment of message #%"
@@ -332,47 +339,47 @@ channel_recv_history_result (void *cls,
332 return; 339 return;
333 } 340 }
334 341
335 uint16_t size = ntohs (msg->size);
336 if (size < sizeof (*res) + sizeof (*pmsg))
337 { /* Error, message too small. */
338 GNUNET_break (0);
339 return;
340 }
341
342 GNUNET_PSYC_receive_message (hist->recv, 342 GNUNET_PSYC_receive_message (hist->recv,
343 (const struct GNUNET_PSYC_MessageHeader *) pmsg); 343 (const struct GNUNET_PSYC_MessageHeader *) pmsg);
344} 344}
345 345
346 346
347static void 347static int
348channel_recv_state_result (void *cls, 348check_channel_state_result (void *cls,
349 struct GNUNET_CLIENT_MANAGER_Connection *client, 349 const struct GNUNET_OperationResultMessage *res)
350 const struct GNUNET_MessageHeader *msg)
351{ 350{
352 struct GNUNET_PSYC_Channel * 351 const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res);
353 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 352 uint16_t mod_size = ntohs (mod->size);
353 uint16_t size = ntohs (res->header.size);
354
355 if (NULL == mod || size - sizeof (*res) != mod_size)
356 {
357 GNUNET_break_op (0);
358 return GNUNET_SYSERR;
359 }
360 return GNUNET_OK;
361}
362
354 363
355 const struct GNUNET_OperationResultMessage * 364static void
356 res = (const struct GNUNET_OperationResultMessage *) msg; 365handle_channel_state_result (void *cls,
366 const struct GNUNET_OperationResultMessage *res)
367{
368 struct GNUNET_PSYC_Channel *chn = cls;
357 369
358 GNUNET_ResultCallback result_cb = NULL; 370 GNUNET_ResultCallback result_cb = NULL;
359 struct GNUNET_PSYC_StateRequest *sr = NULL; 371 struct GNUNET_PSYC_StateRequest *sr = NULL;
360 372
361 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client, 373 if (GNUNET_YES != GNUNET_OP_get (chn->op,
362 GNUNET_ntohll (res->op_id), 374 GNUNET_ntohll (res->op_id),
363 &result_cb, (void *) &sr)) 375 &result_cb, (void *) &sr, NULL))
364 { /* Operation not found. */ 376 { /* Operation not found. */
365 return; 377 return;
366 } 378 }
367 379
368 const struct GNUNET_MessageHeader * 380 const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res);
369 mod = (struct GNUNET_MessageHeader *) &res[1];
370 uint16_t mod_size = ntohs (mod->size); 381 uint16_t mod_size = ntohs (mod->size);
371 if (ntohs (msg->size) - sizeof (*res) != mod_size) 382
372 {
373 GNUNET_break (0);
374 return;
375 }
376 switch (ntohs (mod->type)) 383 switch (ntohs (mod->type))
377 { 384 {
378 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: 385 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
@@ -401,40 +408,40 @@ channel_recv_state_result (void *cls,
401} 408}
402 409
403 410
411static int
412check_channel_message (void *cls,
413 const struct GNUNET_PSYC_MessageHeader *pmsg)
414{
415 return GNUNET_OK;
416}
417
418
404static void 419static void
405channel_recv_message (void *cls, 420handle_channel_message (void *cls,
406 struct GNUNET_CLIENT_MANAGER_Connection *client, 421 const struct GNUNET_PSYC_MessageHeader *pmsg)
407 const struct GNUNET_MessageHeader *msg)
408{ 422{
409 struct GNUNET_PSYC_Channel * 423 struct GNUNET_PSYC_Channel *chn = cls;
410 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 424
411 GNUNET_PSYC_receive_message (chn->recv, 425 GNUNET_PSYC_receive_message (chn->recv, pmsg);
412 (const struct GNUNET_PSYC_MessageHeader *) msg);
413} 426}
414 427
415 428
416static void 429static void
417channel_recv_message_ack (void *cls, 430handle_channel_message_ack (void *cls,
418 struct GNUNET_CLIENT_MANAGER_Connection *client, 431 const struct GNUNET_MessageHeader *msg)
419 const struct GNUNET_MessageHeader *msg)
420{ 432{
421 struct GNUNET_PSYC_Channel * 433 struct GNUNET_PSYC_Channel *chn = cls;
422 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 434
423 GNUNET_PSYC_transmit_got_ack (chn->tmit); 435 GNUNET_PSYC_transmit_got_ack (chn->tmit);
424} 436}
425 437
426 438
427static void 439static void
428master_recv_start_ack (void *cls, 440handle_master_start_ack (void *cls,
429 struct GNUNET_CLIENT_MANAGER_Connection *client, 441 const struct GNUNET_PSYC_CountersResultMessage *cres)
430 const struct GNUNET_MessageHeader *msg)
431{ 442{
432 struct GNUNET_PSYC_Master * 443 struct GNUNET_PSYC_Master *mst = cls;
433 mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
434 sizeof (struct GNUNET_PSYC_Channel));
435 444
436 struct GNUNET_PSYC_CountersResultMessage *
437 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
438 int32_t result = ntohl (cres->result_code); 445 int32_t result = ntohl (cres->result_code);
439 if (GNUNET_OK != result && GNUNET_NO != result) 446 if (GNUNET_OK != result && GNUNET_NO != result)
440 { 447 {
@@ -447,23 +454,27 @@ master_recv_start_ack (void *cls,
447} 454}
448 455
449 456
457static int
458check_master_join_request (void *cls,
459 const struct GNUNET_PSYC_JoinRequestMessage *req)
460{
461 return GNUNET_OK;
462}
463
464
450static void 465static void
451master_recv_join_request (void *cls, 466handle_master_join_request (void *cls,
452 struct GNUNET_CLIENT_MANAGER_Connection *client, 467 const struct GNUNET_PSYC_JoinRequestMessage *req)
453 const struct GNUNET_MessageHeader *msg)
454{ 468{
455 struct GNUNET_PSYC_Master * 469 struct GNUNET_PSYC_Master *mst = cls;
456 mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, 470
457 sizeof (struct GNUNET_PSYC_Channel));
458 if (NULL == mst->join_req_cb) 471 if (NULL == mst->join_req_cb)
459 return; 472 return;
460 473
461 const struct GNUNET_PSYC_JoinRequestMessage *
462 req = (const struct GNUNET_PSYC_JoinRequestMessage *) msg;
463 const struct GNUNET_PSYC_Message *join_msg = NULL; 474 const struct GNUNET_PSYC_Message *join_msg = NULL;
464 if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) 475 if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size))
465 { 476 {
466 join_msg = (struct GNUNET_PSYC_Message *) &req[1]; 477 join_msg = (struct GNUNET_PSYC_Message *) GNUNET_MQ_extract_nested_mh (req);
467 LOG (GNUNET_ERROR_TYPE_DEBUG, 478 LOG (GNUNET_ERROR_TYPE_DEBUG,
468 "Received join_msg of type %u and size %u.\n", 479 "Received join_msg of type %u and size %u.\n",
469 ntohs (join_msg->header.type), ntohs (join_msg->header.size)); 480 ntohs (join_msg->header.type), ntohs (join_msg->header.size));
@@ -479,15 +490,11 @@ master_recv_join_request (void *cls,
479 490
480 491
481static void 492static void
482slave_recv_join_ack (void *cls, 493handle_slave_join_ack (void *cls,
483 struct GNUNET_CLIENT_MANAGER_Connection *client, 494 const struct GNUNET_PSYC_CountersResultMessage *cres)
484 const struct GNUNET_MessageHeader *msg)
485{ 495{
486 struct GNUNET_PSYC_Slave * 496 struct GNUNET_PSYC_Slave *slv = cls;
487 slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, 497
488 sizeof (struct GNUNET_PSYC_Channel));
489 struct GNUNET_PSYC_CountersResultMessage *
490 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
491 int32_t result = ntohl (cres->result_code); 498 int32_t result = ntohl (cres->result_code);
492 if (GNUNET_YES != result && GNUNET_NO != result) 499 if (GNUNET_YES != result && GNUNET_NO != result)
493 { 500 {
@@ -500,16 +507,19 @@ slave_recv_join_ack (void *cls,
500} 507}
501 508
502 509
510static int
511check_slave_join_decision (void *cls,
512 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
513{
514 return GNUNET_OK;
515}
516
517
503static void 518static void
504slave_recv_join_decision (void *cls, 519handle_slave_join_decision (void *cls,
505 struct GNUNET_CLIENT_MANAGER_Connection *client, 520 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
506 const struct GNUNET_MessageHeader *msg)
507{ 521{
508 struct GNUNET_PSYC_Slave * 522 struct GNUNET_PSYC_Slave *slv = cls;
509 slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
510 sizeof (struct GNUNET_PSYC_Channel));
511 const struct GNUNET_PSYC_JoinDecisionMessage *
512 dcsn = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
513 523
514 struct GNUNET_PSYC_Message *pmsg = NULL; 524 struct GNUNET_PSYC_Message *pmsg = NULL;
515 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) 525 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg))
@@ -520,104 +530,164 @@ slave_recv_join_decision (void *cls,
520} 530}
521 531
522 532
523static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = 533static void
534channel_cleanup (struct GNUNET_PSYC_Channel *chn)
524{ 535{
525 { &channel_recv_message, NULL, 536 if (NULL != chn->tmit)
526 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 537 {
527 sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, 538 GNUNET_PSYC_transmit_destroy (chn->tmit);
528 539 chn->tmit = NULL;
529 { &channel_recv_message_ack, NULL, 540 }
530 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, 541 if (NULL != chn->recv)
531 sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, 542 {
532 543 GNUNET_PSYC_receive_destroy (chn->recv);
533 { &master_recv_start_ack, NULL, 544 chn->recv = NULL;
534 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, 545 }
535 sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, 546 if (NULL != chn->connect_env)
536 547 {
537 { &master_recv_join_request, NULL, 548 GNUNET_MQ_discard (chn->connect_env);
538 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, 549 chn->connect_env = NULL;
539 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, 550 }
540 551 if (NULL != chn->disconnect_cb)
541 { &channel_recv_history_result, NULL, 552 {
542 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, 553 chn->disconnect_cb (chn->disconnect_cls);
543 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, 554 chn->disconnect_cb = NULL;
544 555 }
545 { &channel_recv_state_result, NULL, 556}
546 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
547 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
548
549 { &channel_recv_result, NULL,
550 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
551 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
552 557
553 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
554 558
555 { NULL, NULL, 0, 0, GNUNET_NO } 559static void
556}; 560master_cleanup (void *cls)
561{
562 struct GNUNET_PSYC_Master *mst = cls;
563 channel_cleanup (&mst->chn);
564 GNUNET_free (mst);
565}
557 566
558 567
559static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = 568static void
569slave_cleanup (void *cls)
560{ 570{
561 { &channel_recv_message, NULL, 571 struct GNUNET_PSYC_Slave *slv = cls;
562 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 572 channel_cleanup (&slv->chn);
563 sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, 573 GNUNET_free (slv);
564 574}
565 { &channel_recv_message_ack, NULL,
566 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
567 sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
568 575
569 { &slave_recv_join_ack, NULL,
570 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK,
571 sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO },
572 576
573 { &slave_recv_join_decision, NULL, 577static void
574 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 578channel_disconnect (struct GNUNET_PSYC_Channel *chn,
575 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, 579 GNUNET_ContinuationCallback cb,
580 void *cls)
581{
582 chn->is_disconnecting = GNUNET_YES;
583 chn->disconnect_cb = cb;
584 chn->disconnect_cls = cls;
576 585
577 { &channel_recv_history_result, NULL, 586 // FIXME: wait till queued messages are sent
578 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, 587 if (NULL != chn->mq)
579 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, 588 {
589 GNUNET_MQ_destroy (chn->mq);
590 chn->mq = NULL;
591 }
592}
580 593
581 { &channel_recv_state_result, NULL,
582 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
583 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
584 594
585 { &channel_recv_result, NULL, 595/*** MASTER ***/
586 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
587 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
588 596
589 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
590 597
591 { NULL, NULL, 0, 0, GNUNET_NO } 598static void
592}; 599master_connect (struct GNUNET_PSYC_Master *mst);
593 600
594 601
595static void 602static void
596channel_cleanup (struct GNUNET_PSYC_Channel *chn) 603master_reconnect (void *cls)
597{ 604{
598 GNUNET_PSYC_transmit_destroy (chn->tmit); 605 master_connect (cls);
599 GNUNET_PSYC_receive_destroy (chn->recv);
600 GNUNET_free (chn->connect_msg);
601 if (NULL != chn->disconnect_cb)
602 chn->disconnect_cb (chn->disconnect_cls);
603} 606}
604 607
605 608
609/**
610 * Master client disconnected from service.
611 *
612 * Reconnect after backoff period.
613 */
606static void 614static void
607master_cleanup (void *cls) 615master_disconnected (void *cls, enum GNUNET_MQ_Error error)
608{ 616{
609 struct GNUNET_PSYC_Master *mst = cls; 617 struct GNUNET_PSYC_Master *mst = cls;
610 channel_cleanup (&mst->chn); 618 struct GNUNET_PSYC_Channel *chn = &mst->chn;
611 GNUNET_free (mst); 619
620 LOG (GNUNET_ERROR_TYPE_DEBUG,
621 "Master client disconnected (%d), re-connecting\n",
622 (int) error);
623 if (NULL != chn->mq)
624 {
625 GNUNET_MQ_destroy (chn->mq);
626 chn->mq = NULL;
627 }
628 if (NULL != chn->tmit)
629 {
630 GNUNET_PSYC_transmit_destroy (chn->tmit);
631 chn->tmit = NULL;
632 }
633
634 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay,
635 master_reconnect,
636 mst);
637 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay);
612} 638}
613 639
614 640
615static void 641static void
616slave_cleanup (void *cls) 642master_connect (struct GNUNET_PSYC_Master *mst)
617{ 643{
618 struct GNUNET_PSYC_Slave *slv = cls; 644 struct GNUNET_PSYC_Channel *chn = &mst->chn;
619 channel_cleanup (&slv->chn); 645
620 GNUNET_free (slv); 646 GNUNET_MQ_hd_fixed_size (master_start_ack,
647 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK,
648 struct GNUNET_PSYC_CountersResultMessage);
649
650 GNUNET_MQ_hd_var_size (master_join_request,
651 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
652 struct GNUNET_PSYC_JoinRequestMessage);
653
654 GNUNET_MQ_hd_var_size (channel_message,
655 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
656 struct GNUNET_PSYC_MessageHeader);
657
658 GNUNET_MQ_hd_fixed_size (channel_message_ack,
659 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
660 struct GNUNET_MessageHeader);
661
662 GNUNET_MQ_hd_var_size (channel_history_result,
663 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
664 struct GNUNET_OperationResultMessage);
665
666 GNUNET_MQ_hd_var_size (channel_state_result,
667 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
668 struct GNUNET_OperationResultMessage);
669
670 GNUNET_MQ_hd_var_size (channel_result,
671 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
672 struct GNUNET_OperationResultMessage);
673
674 struct GNUNET_MQ_MessageHandler handlers[] = {
675 make_master_start_ack_handler (mst),
676 make_master_join_request_handler (mst),
677 make_channel_message_handler (chn),
678 make_channel_message_ack_handler (chn),
679 make_channel_history_result_handler (chn),
680 make_channel_state_result_handler (chn),
681 make_channel_result_handler (chn),
682 GNUNET_MQ_handler_end ()
683 };
684
685 chn->mq = GNUNET_CLIENT_connecT (chn->cfg, "psyc",
686 handlers, master_disconnected, mst);
687 GNUNET_assert (NULL != chn->mq);
688 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq);
689
690 GNUNET_MQ_send_copy (chn->mq, chn->connect_env);
621} 691}
622 692
623 693
@@ -664,26 +734,23 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
664 struct GNUNET_PSYC_Channel *chn = &mst->chn; 734 struct GNUNET_PSYC_Channel *chn = &mst->chn;
665 735
666 struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); 736 struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
667 req->header.size = htons (sizeof (*req)); 737 chn->connect_env = GNUNET_MQ_msg (req,
668 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); 738 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
669 req->channel_key = *channel_key; 739 req->channel_key = *channel_key;
670 req->policy = policy; 740 req->policy = policy;
671 741
672 chn->connect_msg = &req->header;
673 chn->cfg = cfg; 742 chn->cfg = cfg;
674 chn->is_master = GNUNET_YES; 743 chn->is_master = GNUNET_YES;
744 chn->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
745
746 chn->op = GNUNET_OP_create ();
747 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
675 748
676 mst->start_cb = start_cb; 749 mst->start_cb = start_cb;
677 mst->join_req_cb = join_request_cb; 750 mst->join_req_cb = join_request_cb;
678 mst->cb_cls = cls; 751 mst->cb_cls = cls;
679 752
680 chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", master_handlers); 753 master_connect (mst);
681 GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn));
682
683 chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
684 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
685
686 channel_send_connect_msg (chn);
687 return mst; 754 return mst;
688} 755}
689 756
@@ -704,12 +771,8 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst,
704 771
705 /* FIXME: send msg to service */ 772 /* FIXME: send msg to service */
706 773
707 chn->is_disconnecting = GNUNET_YES; 774 channel_disconnect (chn, stop_cb, stop_cls);
708 chn->disconnect_cb = stop_cb; 775 master_cleanup (mst);
709 chn->disconnect_cls = stop_cls;
710
711 GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES,
712 &master_cleanup, mst);
713} 776}
714 777
715 778
@@ -753,17 +816,16 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
753 < sizeof (*dcsn) + relay_size + join_resp_size) 816 < sizeof (*dcsn) + relay_size + join_resp_size)
754 return GNUNET_SYSERR; 817 return GNUNET_SYSERR;
755 818
756 dcsn = GNUNET_malloc (sizeof (*dcsn) + relay_size + join_resp_size); 819 struct GNUNET_MQ_Envelope *
757 dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); 820 env = GNUNET_MQ_msg_extra (dcsn, relay_size + join_resp_size,
758 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); 821 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
759 dcsn->is_admitted = htonl (is_admitted); 822 dcsn->is_admitted = htonl (is_admitted);
760 dcsn->slave_pub_key = jh->slave_pub_key; 823 dcsn->slave_pub_key = jh->slave_pub_key;
761 824
762 if (0 < join_resp_size) 825 if (0 < join_resp_size)
763 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size); 826 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
764 827
765 GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header); 828 GNUNET_MQ_send (chn->mq, env);
766 GNUNET_free (dcsn);
767 GNUNET_free (jh); 829 GNUNET_free (jh);
768 return GNUNET_OK; 830 return GNUNET_OK;
769} 831}
@@ -838,6 +900,104 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
838} 900}
839 901
840 902
903/*** SLAVE ***/
904
905
906static void
907slave_connect (struct GNUNET_PSYC_Slave *slv);
908
909
910static void
911slave_reconnect (void *cls)
912{
913 slave_connect (cls);
914}
915
916
917/**
918 * Slave client disconnected from service.
919 *
920 * Reconnect after backoff period.
921 */
922static void
923slave_disconnected (void *cls, enum GNUNET_MQ_Error error)
924{
925 struct GNUNET_PSYC_Slave *slv = cls;
926 struct GNUNET_PSYC_Channel *chn = &slv->chn;
927
928 LOG (GNUNET_ERROR_TYPE_DEBUG,
929 "Slave client disconnected (%d), re-connecting\n",
930 (int) error);
931 if (NULL != chn->mq)
932 {
933 GNUNET_MQ_destroy (chn->mq);
934 chn->mq = NULL;
935 }
936 if (NULL != chn->tmit)
937 {
938 GNUNET_PSYC_transmit_destroy (chn->tmit);
939 chn->tmit = NULL;
940 }
941 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay,
942 slave_reconnect,
943 slv);
944 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay);
945}
946
947
948static void
949slave_connect (struct GNUNET_PSYC_Slave *slv)
950{
951 struct GNUNET_PSYC_Channel *chn = &slv->chn;
952
953 GNUNET_MQ_hd_fixed_size (slave_join_ack,
954 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK,
955 struct GNUNET_PSYC_CountersResultMessage);
956
957 GNUNET_MQ_hd_var_size (slave_join_decision,
958 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
959 struct GNUNET_PSYC_JoinDecisionMessage);
960
961 GNUNET_MQ_hd_var_size (channel_message,
962 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
963 struct GNUNET_PSYC_MessageHeader);
964
965 GNUNET_MQ_hd_fixed_size (channel_message_ack,
966 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
967 struct GNUNET_MessageHeader);
968
969 GNUNET_MQ_hd_var_size (channel_history_result,
970 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
971 struct GNUNET_OperationResultMessage);
972
973 GNUNET_MQ_hd_var_size (channel_state_result,
974 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
975 struct GNUNET_OperationResultMessage);
976
977 GNUNET_MQ_hd_var_size (channel_result,
978 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
979 struct GNUNET_OperationResultMessage);
980
981 struct GNUNET_MQ_MessageHandler handlers[] = {
982 make_slave_join_ack_handler (slv),
983 make_slave_join_decision_handler (slv),
984 make_channel_message_handler (chn),
985 make_channel_message_ack_handler (chn),
986 make_channel_history_result_handler (chn),
987 make_channel_state_result_handler (chn),
988 make_channel_result_handler (chn),
989 GNUNET_MQ_handler_end ()
990 };
991
992 chn->mq = GNUNET_CLIENT_connecT (chn->cfg, "psyc",
993 handlers, slave_disconnected, slv);
994 GNUNET_assert (NULL != chn->mq);
995 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq);
996
997 GNUNET_MQ_send_copy (chn->mq, chn->connect_env);
998}
999
1000
841/** 1001/**
842 * Join a PSYC channel. 1002 * Join a PSYC channel.
843 * 1003 *
@@ -892,15 +1052,14 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
892 struct GNUNET_PSYC_Channel *chn = &slv->chn; 1052 struct GNUNET_PSYC_Channel *chn = &slv->chn;
893 uint16_t relay_size = relay_count * sizeof (*relays); 1053 uint16_t relay_size = relay_count * sizeof (*relays);
894 uint16_t join_msg_size; 1054 uint16_t join_msg_size;
895 struct SlaveJoinRequest *req;
896
897 if (NULL == join_msg) 1055 if (NULL == join_msg)
898 join_msg_size = 0; 1056 join_msg_size = 0;
899 else 1057 else
900 join_msg_size = ntohs (join_msg->header.size); 1058 join_msg_size = ntohs (join_msg->header.size);
901 req = GNUNET_malloc (sizeof (*req) + relay_size + join_msg_size); 1059
902 req->header.size = htons (sizeof (*req) + relay_size + join_msg_size); 1060 struct SlaveJoinRequest *req;
903 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); 1061 chn->connect_env = GNUNET_MQ_msg_extra (req, relay_size + join_msg_size,
1062 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
904 req->channel_pub_key = *channel_pub_key; 1063 req->channel_pub_key = *channel_pub_key;
905 req->slave_key = *slave_key; 1064 req->slave_key = *slave_key;
906 req->origin = *origin; 1065 req->origin = *origin;
@@ -913,21 +1072,18 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
913 if (NULL != join_msg) 1072 if (NULL != join_msg)
914 GNUNET_memcpy ((char *) &req[1] + relay_size, join_msg, join_msg_size); 1073 GNUNET_memcpy ((char *) &req[1] + relay_size, join_msg, join_msg_size);
915 1074
916 chn->connect_msg = &req->header;
917 chn->cfg = cfg; 1075 chn->cfg = cfg;
918 chn->is_master = GNUNET_NO; 1076 chn->is_master = GNUNET_NO;
1077 chn->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1078
1079 chn->op = GNUNET_OP_create ();
1080 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
919 1081
920 slv->connect_cb = connect_cb; 1082 slv->connect_cb = connect_cb;
921 slv->join_dcsn_cb = join_decision_cb; 1083 slv->join_dcsn_cb = join_decision_cb;
922 slv->cb_cls = cls; 1084 slv->cb_cls = cls;
923 1085
924 chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers); 1086 slave_connect (slv);
925 GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn));
926
927 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
928 chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
929
930 channel_send_connect_msg (chn);
931 return slv; 1087 return slv;
932} 1088}
933 1089
@@ -950,12 +1106,8 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv,
950 1106
951 /* FIXME: send msg to service */ 1107 /* FIXME: send msg to service */
952 1108
953 chn->is_disconnecting = GNUNET_YES; 1109 channel_disconnect (chn, part_cb, part_cls);
954 chn->disconnect_cb = part_cb; 1110 slave_cleanup (slv);
955 chn->disconnect_cls = part_cls;
956
957 GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES,
958 &slave_cleanup, slv);
959} 1111}
960 1112
961 1113
@@ -1069,18 +1221,16 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1069 GNUNET_ResultCallback result_cb, 1221 GNUNET_ResultCallback result_cb,
1070 void *cls) 1222 void *cls)
1071{ 1223{
1072 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); 1224 struct ChannelMembershipStoreRequest *req;
1073 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); 1225 struct GNUNET_MQ_Envelope *
1074 req->header.size = htons (sizeof (*req)); 1226 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE);
1075 req->slave_pub_key = *slave_pub_key; 1227 req->slave_pub_key = *slave_pub_key;
1076 req->announced_at = GNUNET_htonll (announced_at); 1228 req->announced_at = GNUNET_htonll (announced_at);
1077 req->effective_since = GNUNET_htonll (effective_since); 1229 req->effective_since = GNUNET_htonll (effective_since);
1078 req->did_join = GNUNET_YES; 1230 req->did_join = GNUNET_YES;
1079 req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client, 1231 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL));
1080 result_cb, cls));
1081 1232
1082 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1233 GNUNET_MQ_send (chn->mq, env);
1083 GNUNET_free (req);
1084} 1234}
1085 1235
1086 1236
@@ -1122,17 +1272,15 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1122 GNUNET_ResultCallback result_cb, 1272 GNUNET_ResultCallback result_cb,
1123 void *cls) 1273 void *cls)
1124{ 1274{
1125 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); 1275 struct ChannelMembershipStoreRequest *req;
1126 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); 1276 struct GNUNET_MQ_Envelope *
1127 req->header.size = htons (sizeof (*req)); 1277 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE);
1128 req->slave_pub_key = *slave_pub_key; 1278 req->slave_pub_key = *slave_pub_key;
1129 req->announced_at = GNUNET_htonll (announced_at); 1279 req->announced_at = GNUNET_htonll (announced_at);
1130 req->did_join = GNUNET_NO; 1280 req->did_join = GNUNET_NO;
1131 req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client, 1281 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL));
1132 result_cb, cls));
1133 1282
1134 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1283 GNUNET_MQ_send (chn->mq, env);
1135 GNUNET_free (req);
1136} 1284}
1137 1285
1138 1286
@@ -1154,17 +1302,17 @@ channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1154 hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); 1302 hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
1155 hist->result_cb = result_cb; 1303 hist->result_cb = result_cb;
1156 hist->cls = cls; 1304 hist->cls = cls;
1157 hist->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client, 1305 hist->op_id = GNUNET_OP_add (chn->op, op_recv_history_result, hist, NULL);
1158 &op_recv_history_result, hist);
1159 1306
1160 GNUNET_assert (NULL != method_prefix); 1307 GNUNET_assert (NULL != method_prefix);
1161 uint16_t method_size = strnlen (method_prefix, 1308 uint16_t method_size = strnlen (method_prefix,
1162 GNUNET_SERVER_MAX_MESSAGE_SIZE 1309 GNUNET_SERVER_MAX_MESSAGE_SIZE
1163 - sizeof (*req)) + 1; 1310 - sizeof (*req)) + 1;
1164 GNUNET_assert ('\0' == method_prefix[method_size - 1]); 1311 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1165 req = GNUNET_malloc (sizeof (*req) + method_size); 1312
1166 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); 1313 struct GNUNET_MQ_Envelope *
1167 req->header.size = htons (sizeof (*req) + method_size); 1314 env = GNUNET_MQ_msg_extra (req, method_size,
1315 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
1168 req->start_message_id = GNUNET_htonll (start_message_id); 1316 req->start_message_id = GNUNET_htonll (start_message_id);
1169 req->end_message_id = GNUNET_htonll (end_message_id); 1317 req->end_message_id = GNUNET_htonll (end_message_id);
1170 req->message_limit = GNUNET_htonll (message_limit); 1318 req->message_limit = GNUNET_htonll (message_limit);
@@ -1172,8 +1320,7 @@ channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1172 req->op_id = GNUNET_htonll (hist->op_id); 1320 req->op_id = GNUNET_htonll (hist->op_id);
1173 GNUNET_memcpy (&req[1], method_prefix, method_size); 1321 GNUNET_memcpy (&req[1], method_prefix, method_size);
1174 1322
1175 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1323 GNUNET_MQ_send (chn->mq, env);
1176 GNUNET_free (req);
1177 return hist; 1324 return hist;
1178} 1325}
1179 1326
@@ -1263,7 +1410,7 @@ GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel,
1263 struct GNUNET_PSYC_HistoryRequest *hist) 1410 struct GNUNET_PSYC_HistoryRequest *hist)
1264{ 1411{
1265 GNUNET_PSYC_receive_destroy (hist->recv); 1412 GNUNET_PSYC_receive_destroy (hist->recv);
1266 GNUNET_CLIENT_MANAGER_op_cancel (hist->chn->client, hist->op_id); 1413 GNUNET_OP_remove (hist->chn->op, hist->op_id);
1267 GNUNET_free (hist); 1414 GNUNET_free (hist);
1268} 1415}
1269 1416
@@ -1301,20 +1448,17 @@ channel_state_get (struct GNUNET_PSYC_Channel *chn,
1301 sr->var_cb = var_cb; 1448 sr->var_cb = var_cb;
1302 sr->result_cb = result_cb; 1449 sr->result_cb = result_cb;
1303 sr->cls = cls; 1450 sr->cls = cls;
1304 sr->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client, 1451 sr->op_id = GNUNET_OP_add (chn->op, op_recv_state_result, sr, NULL);
1305 &op_recv_state_result, sr);
1306 1452
1307 GNUNET_assert (NULL != name); 1453 GNUNET_assert (NULL != name);
1308 size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE 1454 size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE
1309 - sizeof (*req)) + 1; 1455 - sizeof (*req)) + 1;
1310 req = GNUNET_malloc (sizeof (*req) + name_size); 1456 struct GNUNET_MQ_Envelope *
1311 req->header.type = htons (type); 1457 env = GNUNET_MQ_msg_extra (req, name_size, type);
1312 req->header.size = htons (sizeof (*req) + name_size);
1313 req->op_id = GNUNET_htonll (sr->op_id); 1458 req->op_id = GNUNET_htonll (sr->op_id);
1314 GNUNET_memcpy (&req[1], name, name_size); 1459 GNUNET_memcpy (&req[1], name, name_size);
1315 1460
1316 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1461 GNUNET_MQ_send (chn->mq, env);
1317 GNUNET_free (req);
1318 return sr; 1462 return sr;
1319} 1463}
1320 1464
@@ -1397,7 +1541,7 @@ GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn,
1397void 1541void
1398GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr) 1542GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr)
1399{ 1543{
1400 GNUNET_CLIENT_MANAGER_op_cancel (sr->chn->client, sr->op_id); 1544 GNUNET_OP_remove (sr->chn->op, sr->op_id);
1401 GNUNET_free (sr); 1545 GNUNET_free (sr);
1402} 1546}
1403 1547