diff options
author | Gabor X Toth <*@tg-x.net> | 2016-08-17 21:26:41 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2016-08-17 21:26:41 +0000 |
commit | 667cc67f8224ccf4ff391b125a614cf90cf5917e (patch) | |
tree | ae2048a6525ab2521ad989afa795d7a2f0833af6 /src/psyc/psyc_api.c | |
parent | 720a38ea7519f5a1a820157f056b150ab3d4abd5 (diff) | |
download | gnunet-667cc67f8224ccf4ff391b125a614cf90cf5917e.tar.gz gnunet-667cc67f8224ccf4ff391b125a614cf90cf5917e.zip |
psyc, social: switch to MQ
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r-- | src/psyc/psyc_api.c | 662 |
1 files changed, 403 insertions, 259 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 515a2731a..2f6a15bab 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -55,7 +55,27 @@ struct GNUNET_PSYC_Channel | |||
55 | /** | 55 | /** |
56 | * Client connection to the service. | 56 | * Client connection to the service. |
57 | */ | 57 | */ |
58 | struct GNUNET_CLIENT_MANAGER_Connection *client; | 58 | struct GNUNET_MQ_Handle *mq; |
59 | |||
60 | /** | ||
61 | * Message to send on connect. | ||
62 | */ | ||
63 | struct GNUNET_MQ_Envelope *connect_env; | ||
64 | |||
65 | /** | ||
66 | * Time to wait until we try to reconnect on failure. | ||
67 | */ | ||
68 | struct GNUNET_TIME_Relative reconnect_delay; | ||
69 | |||
70 | /** | ||
71 | * Task for reconnecting when the listener fails. | ||
72 | */ | ||
73 | struct GNUNET_SCHEDULER_Task *reconnect_task; | ||
74 | |||
75 | /** | ||
76 | * Async operations. | ||
77 | */ | ||
78 | struct GNUNET_OP_Handle *op; | ||
59 | 79 | ||
60 | /** | 80 | /** |
61 | * Transmission handle; | 81 | * Transmission handle; |
@@ -68,11 +88,6 @@ struct GNUNET_PSYC_Channel | |||
68 | struct GNUNET_PSYC_ReceiveHandle *recv; | 88 | struct GNUNET_PSYC_ReceiveHandle *recv; |
69 | 89 | ||
70 | /** | 90 | /** |
71 | * Message to send on reconnect. | ||
72 | */ | ||
73 | struct GNUNET_MessageHeader *connect_msg; | ||
74 | |||
75 | /** | ||
76 | * Function called after disconnected from the service. | 91 | * Function called after disconnected from the service. |
77 | */ | 92 | */ |
78 | GNUNET_ContinuationCallback disconnect_cb; | 93 | GNUNET_ContinuationCallback disconnect_cb; |
@@ -219,41 +234,21 @@ struct GNUNET_PSYC_StateRequest | |||
219 | }; | 234 | }; |
220 | 235 | ||
221 | 236 | ||
222 | static void | 237 | static int |
223 | channel_send_connect_msg (struct GNUNET_PSYC_Channel *chn) | 238 | check_channel_result (void *cls, |
224 | { | 239 | const struct GNUNET_OperationResultMessage *res) |
225 | uint16_t cmsg_size = ntohs (chn->connect_msg->size); | ||
226 | struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size); | ||
227 | GNUNET_memcpy (cmsg, chn->connect_msg, cmsg_size); | ||
228 | GNUNET_CLIENT_MANAGER_transmit_now (chn->client, cmsg); | ||
229 | GNUNET_free (cmsg); | ||
230 | } | ||
231 | |||
232 | |||
233 | static void | ||
234 | channel_recv_disconnect (void *cls, | ||
235 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
236 | const struct GNUNET_MessageHeader *msg) | ||
237 | { | 240 | { |
238 | struct GNUNET_PSYC_Channel * | 241 | return GNUNET_OK; |
239 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | ||
240 | GNUNET_CLIENT_MANAGER_reconnect (client); | ||
241 | channel_send_connect_msg (chn); | ||
242 | } | 242 | } |
243 | 243 | ||
244 | 244 | ||
245 | static void | 245 | static void |
246 | channel_recv_result (void *cls, | 246 | handle_channel_result (void *cls, |
247 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 247 | const struct GNUNET_OperationResultMessage *res) |
248 | const struct GNUNET_MessageHeader *msg) | ||
249 | { | 248 | { |
250 | struct GNUNET_PSYC_Channel * | 249 | struct GNUNET_PSYC_Channel *chn = cls; |
251 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | ||
252 | 250 | ||
253 | const struct GNUNET_OperationResultMessage * | 251 | uint16_t size = ntohs (res->header.size); |
254 | res = (const struct GNUNET_OperationResultMessage *) msg; | ||
255 | |||
256 | uint16_t size = ntohs (msg->size); | ||
257 | if (size < sizeof (*res)) | 252 | if (size < sizeof (*res)) |
258 | { /* Error, message too small. */ | 253 | { /* Error, message too small. */ |
259 | GNUNET_break (0); | 254 | GNUNET_break (0); |
@@ -262,9 +257,9 @@ channel_recv_result (void *cls, | |||
262 | 257 | ||
263 | uint16_t data_size = size - sizeof (*res); | 258 | uint16_t data_size = size - sizeof (*res); |
264 | const char *data = (0 < data_size) ? (void *) &res[1] : NULL; | 259 | const char *data = (0 < data_size) ? (void *) &res[1] : NULL; |
265 | GNUNET_CLIENT_MANAGER_op_result (chn->client, GNUNET_ntohll (res->op_id), | 260 | GNUNET_OP_result (chn->op, GNUNET_ntohll (res->op_id), |
266 | GNUNET_ntohll (res->result_code), | 261 | GNUNET_ntohll (res->result_code), |
267 | data, data_size); | 262 | data, data_size, NULL); |
268 | } | 263 | } |
269 | 264 | ||
270 | 265 | ||
@@ -301,18 +296,30 @@ op_recv_state_result (void *cls, int64_t result, | |||
301 | } | 296 | } |
302 | 297 | ||
303 | 298 | ||
304 | static void | 299 | static int |
305 | channel_recv_history_result (void *cls, | 300 | check_channel_history_result (void *cls, |
306 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 301 | const struct GNUNET_OperationResultMessage *res) |
307 | const struct GNUNET_MessageHeader *msg) | ||
308 | { | 302 | { |
309 | struct GNUNET_PSYC_Channel * | 303 | struct GNUNET_PSYC_MessageHeader * |
310 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | 304 | pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res); |
305 | uint16_t size = ntohs (res->header.size); | ||
311 | 306 | ||
312 | const struct GNUNET_OperationResultMessage * | 307 | if (NULL == pmsg || size < sizeof (*res) + sizeof (*pmsg)) |
313 | res = (const struct GNUNET_OperationResultMessage *) msg; | 308 | { /* Error, message too small. */ |
309 | GNUNET_break_op (0); | ||
310 | return GNUNET_SYSERR; | ||
311 | } | ||
312 | return GNUNET_OK; | ||
313 | } | ||
314 | |||
315 | |||
316 | static void | ||
317 | handle_channel_history_result (void *cls, | ||
318 | const struct GNUNET_OperationResultMessage *res) | ||
319 | { | ||
320 | struct GNUNET_PSYC_Channel *chn = cls; | ||
314 | struct GNUNET_PSYC_MessageHeader * | 321 | struct GNUNET_PSYC_MessageHeader * |
315 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; | 322 | pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res); |
316 | 323 | ||
317 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 324 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
318 | "%p Received historic fragment for message #%" PRIu64 ".\n", | 325 | "%p Received historic fragment for message #%" PRIu64 ".\n", |
@@ -321,9 +328,9 @@ channel_recv_history_result (void *cls, | |||
321 | GNUNET_ResultCallback result_cb = NULL; | 328 | GNUNET_ResultCallback result_cb = NULL; |
322 | struct GNUNET_PSYC_HistoryRequest *hist = NULL; | 329 | struct GNUNET_PSYC_HistoryRequest *hist = NULL; |
323 | 330 | ||
324 | if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client, | 331 | if (GNUNET_YES != GNUNET_OP_get (chn->op, |
325 | GNUNET_ntohll (res->op_id), | 332 | GNUNET_ntohll (res->op_id), |
326 | &result_cb, (void *) &hist)) | 333 | &result_cb, (void *) &hist, NULL)) |
327 | { /* Operation not found. */ | 334 | { /* Operation not found. */ |
328 | LOG (GNUNET_ERROR_TYPE_WARNING, | 335 | LOG (GNUNET_ERROR_TYPE_WARNING, |
329 | "%p Replay operation not found for historic fragment of message #%" | 336 | "%p Replay operation not found for historic fragment of message #%" |
@@ -332,47 +339,47 @@ channel_recv_history_result (void *cls, | |||
332 | return; | 339 | return; |
333 | } | 340 | } |
334 | 341 | ||
335 | uint16_t size = ntohs (msg->size); | ||
336 | if (size < sizeof (*res) + sizeof (*pmsg)) | ||
337 | { /* Error, message too small. */ | ||
338 | GNUNET_break (0); | ||
339 | return; | ||
340 | } | ||
341 | |||
342 | GNUNET_PSYC_receive_message (hist->recv, | 342 | GNUNET_PSYC_receive_message (hist->recv, |
343 | (const struct GNUNET_PSYC_MessageHeader *) pmsg); | 343 | (const struct GNUNET_PSYC_MessageHeader *) pmsg); |
344 | } | 344 | } |
345 | 345 | ||
346 | 346 | ||
347 | static void | 347 | static int |
348 | channel_recv_state_result (void *cls, | 348 | check_channel_state_result (void *cls, |
349 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 349 | const struct GNUNET_OperationResultMessage *res) |
350 | const struct GNUNET_MessageHeader *msg) | ||
351 | { | 350 | { |
352 | struct GNUNET_PSYC_Channel * | 351 | const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res); |
353 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | 352 | uint16_t mod_size = ntohs (mod->size); |
353 | uint16_t size = ntohs (res->header.size); | ||
354 | |||
355 | if (NULL == mod || size - sizeof (*res) != mod_size) | ||
356 | { | ||
357 | GNUNET_break_op (0); | ||
358 | return GNUNET_SYSERR; | ||
359 | } | ||
360 | return GNUNET_OK; | ||
361 | } | ||
362 | |||
354 | 363 | ||
355 | const struct GNUNET_OperationResultMessage * | 364 | static void |
356 | res = (const struct GNUNET_OperationResultMessage *) msg; | 365 | handle_channel_state_result (void *cls, |
366 | const struct GNUNET_OperationResultMessage *res) | ||
367 | { | ||
368 | struct GNUNET_PSYC_Channel *chn = cls; | ||
357 | 369 | ||
358 | GNUNET_ResultCallback result_cb = NULL; | 370 | GNUNET_ResultCallback result_cb = NULL; |
359 | struct GNUNET_PSYC_StateRequest *sr = NULL; | 371 | struct GNUNET_PSYC_StateRequest *sr = NULL; |
360 | 372 | ||
361 | if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client, | 373 | if (GNUNET_YES != GNUNET_OP_get (chn->op, |
362 | GNUNET_ntohll (res->op_id), | 374 | GNUNET_ntohll (res->op_id), |
363 | &result_cb, (void *) &sr)) | 375 | &result_cb, (void *) &sr, NULL)) |
364 | { /* Operation not found. */ | 376 | { /* Operation not found. */ |
365 | return; | 377 | return; |
366 | } | 378 | } |
367 | 379 | ||
368 | const struct GNUNET_MessageHeader * | 380 | const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res); |
369 | mod = (struct GNUNET_MessageHeader *) &res[1]; | ||
370 | uint16_t mod_size = ntohs (mod->size); | 381 | uint16_t mod_size = ntohs (mod->size); |
371 | if (ntohs (msg->size) - sizeof (*res) != mod_size) | 382 | |
372 | { | ||
373 | GNUNET_break (0); | ||
374 | return; | ||
375 | } | ||
376 | switch (ntohs (mod->type)) | 383 | switch (ntohs (mod->type)) |
377 | { | 384 | { |
378 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | 385 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: |
@@ -401,40 +408,40 @@ channel_recv_state_result (void *cls, | |||
401 | } | 408 | } |
402 | 409 | ||
403 | 410 | ||
411 | static int | ||
412 | check_channel_message (void *cls, | ||
413 | const struct GNUNET_PSYC_MessageHeader *pmsg) | ||
414 | { | ||
415 | return GNUNET_OK; | ||
416 | } | ||
417 | |||
418 | |||
404 | static void | 419 | static void |
405 | channel_recv_message (void *cls, | 420 | handle_channel_message (void *cls, |
406 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 421 | const struct GNUNET_PSYC_MessageHeader *pmsg) |
407 | const struct GNUNET_MessageHeader *msg) | ||
408 | { | 422 | { |
409 | struct GNUNET_PSYC_Channel * | 423 | struct GNUNET_PSYC_Channel *chn = cls; |
410 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | 424 | |
411 | GNUNET_PSYC_receive_message (chn->recv, | 425 | GNUNET_PSYC_receive_message (chn->recv, pmsg); |
412 | (const struct GNUNET_PSYC_MessageHeader *) msg); | ||
413 | } | 426 | } |
414 | 427 | ||
415 | 428 | ||
416 | static void | 429 | static void |
417 | channel_recv_message_ack (void *cls, | 430 | handle_channel_message_ack (void *cls, |
418 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 431 | const struct GNUNET_MessageHeader *msg) |
419 | const struct GNUNET_MessageHeader *msg) | ||
420 | { | 432 | { |
421 | struct GNUNET_PSYC_Channel * | 433 | struct GNUNET_PSYC_Channel *chn = cls; |
422 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | 434 | |
423 | GNUNET_PSYC_transmit_got_ack (chn->tmit); | 435 | GNUNET_PSYC_transmit_got_ack (chn->tmit); |
424 | } | 436 | } |
425 | 437 | ||
426 | 438 | ||
427 | static void | 439 | static void |
428 | master_recv_start_ack (void *cls, | 440 | handle_master_start_ack (void *cls, |
429 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 441 | const struct GNUNET_PSYC_CountersResultMessage *cres) |
430 | const struct GNUNET_MessageHeader *msg) | ||
431 | { | 442 | { |
432 | struct GNUNET_PSYC_Master * | 443 | struct GNUNET_PSYC_Master *mst = cls; |
433 | mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | ||
434 | sizeof (struct GNUNET_PSYC_Channel)); | ||
435 | 444 | ||
436 | struct GNUNET_PSYC_CountersResultMessage * | ||
437 | cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; | ||
438 | int32_t result = ntohl (cres->result_code); | 445 | int32_t result = ntohl (cres->result_code); |
439 | if (GNUNET_OK != result && GNUNET_NO != result) | 446 | if (GNUNET_OK != result && GNUNET_NO != result) |
440 | { | 447 | { |
@@ -447,23 +454,27 @@ master_recv_start_ack (void *cls, | |||
447 | } | 454 | } |
448 | 455 | ||
449 | 456 | ||
457 | static int | ||
458 | check_master_join_request (void *cls, | ||
459 | const struct GNUNET_PSYC_JoinRequestMessage *req) | ||
460 | { | ||
461 | return GNUNET_OK; | ||
462 | } | ||
463 | |||
464 | |||
450 | static void | 465 | static void |
451 | master_recv_join_request (void *cls, | 466 | handle_master_join_request (void *cls, |
452 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 467 | const struct GNUNET_PSYC_JoinRequestMessage *req) |
453 | const struct GNUNET_MessageHeader *msg) | ||
454 | { | 468 | { |
455 | struct GNUNET_PSYC_Master * | 469 | struct GNUNET_PSYC_Master *mst = cls; |
456 | mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | 470 | |
457 | sizeof (struct GNUNET_PSYC_Channel)); | ||
458 | if (NULL == mst->join_req_cb) | 471 | if (NULL == mst->join_req_cb) |
459 | return; | 472 | return; |
460 | 473 | ||
461 | const struct GNUNET_PSYC_JoinRequestMessage * | ||
462 | req = (const struct GNUNET_PSYC_JoinRequestMessage *) msg; | ||
463 | const struct GNUNET_PSYC_Message *join_msg = NULL; | 474 | const struct GNUNET_PSYC_Message *join_msg = NULL; |
464 | if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) | 475 | if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) |
465 | { | 476 | { |
466 | join_msg = (struct GNUNET_PSYC_Message *) &req[1]; | 477 | join_msg = (struct GNUNET_PSYC_Message *) GNUNET_MQ_extract_nested_mh (req); |
467 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 478 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
468 | "Received join_msg of type %u and size %u.\n", | 479 | "Received join_msg of type %u and size %u.\n", |
469 | ntohs (join_msg->header.type), ntohs (join_msg->header.size)); | 480 | ntohs (join_msg->header.type), ntohs (join_msg->header.size)); |
@@ -479,15 +490,11 @@ master_recv_join_request (void *cls, | |||
479 | 490 | ||
480 | 491 | ||
481 | static void | 492 | static void |
482 | slave_recv_join_ack (void *cls, | 493 | handle_slave_join_ack (void *cls, |
483 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 494 | const struct GNUNET_PSYC_CountersResultMessage *cres) |
484 | const struct GNUNET_MessageHeader *msg) | ||
485 | { | 495 | { |
486 | struct GNUNET_PSYC_Slave * | 496 | struct GNUNET_PSYC_Slave *slv = cls; |
487 | slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | 497 | |
488 | sizeof (struct GNUNET_PSYC_Channel)); | ||
489 | struct GNUNET_PSYC_CountersResultMessage * | ||
490 | cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; | ||
491 | int32_t result = ntohl (cres->result_code); | 498 | int32_t result = ntohl (cres->result_code); |
492 | if (GNUNET_YES != result && GNUNET_NO != result) | 499 | if (GNUNET_YES != result && GNUNET_NO != result) |
493 | { | 500 | { |
@@ -500,16 +507,19 @@ slave_recv_join_ack (void *cls, | |||
500 | } | 507 | } |
501 | 508 | ||
502 | 509 | ||
510 | static int | ||
511 | check_slave_join_decision (void *cls, | ||
512 | const struct GNUNET_PSYC_JoinDecisionMessage *dcsn) | ||
513 | { | ||
514 | return GNUNET_OK; | ||
515 | } | ||
516 | |||
517 | |||
503 | static void | 518 | static void |
504 | slave_recv_join_decision (void *cls, | 519 | handle_slave_join_decision (void *cls, |
505 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 520 | const struct GNUNET_PSYC_JoinDecisionMessage *dcsn) |
506 | const struct GNUNET_MessageHeader *msg) | ||
507 | { | 521 | { |
508 | struct GNUNET_PSYC_Slave * | 522 | struct GNUNET_PSYC_Slave *slv = cls; |
509 | slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | ||
510 | sizeof (struct GNUNET_PSYC_Channel)); | ||
511 | const struct GNUNET_PSYC_JoinDecisionMessage * | ||
512 | dcsn = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg; | ||
513 | 523 | ||
514 | struct GNUNET_PSYC_Message *pmsg = NULL; | 524 | struct GNUNET_PSYC_Message *pmsg = NULL; |
515 | if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) | 525 | if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) |
@@ -520,104 +530,164 @@ slave_recv_join_decision (void *cls, | |||
520 | } | 530 | } |
521 | 531 | ||
522 | 532 | ||
523 | static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = | 533 | static void |
534 | channel_cleanup (struct GNUNET_PSYC_Channel *chn) | ||
524 | { | 535 | { |
525 | { &channel_recv_message, NULL, | 536 | if (NULL != chn->tmit) |
526 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | 537 | { |
527 | sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, | 538 | GNUNET_PSYC_transmit_destroy (chn->tmit); |
528 | 539 | chn->tmit = NULL; | |
529 | { &channel_recv_message_ack, NULL, | 540 | } |
530 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | 541 | if (NULL != chn->recv) |
531 | sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, | 542 | { |
532 | 543 | GNUNET_PSYC_receive_destroy (chn->recv); | |
533 | { &master_recv_start_ack, NULL, | 544 | chn->recv = NULL; |
534 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, | 545 | } |
535 | sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, | 546 | if (NULL != chn->connect_env) |
536 | 547 | { | |
537 | { &master_recv_join_request, NULL, | 548 | GNUNET_MQ_discard (chn->connect_env); |
538 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | 549 | chn->connect_env = NULL; |
539 | sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, | 550 | } |
540 | 551 | if (NULL != chn->disconnect_cb) | |
541 | { &channel_recv_history_result, NULL, | 552 | { |
542 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | 553 | chn->disconnect_cb (chn->disconnect_cls); |
543 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | 554 | chn->disconnect_cb = NULL; |
544 | 555 | } | |
545 | { &channel_recv_state_result, NULL, | 556 | } |
546 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
547 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
548 | |||
549 | { &channel_recv_result, NULL, | ||
550 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
551 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
552 | 557 | ||
553 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | ||
554 | 558 | ||
555 | { NULL, NULL, 0, 0, GNUNET_NO } | 559 | static void |
556 | }; | 560 | master_cleanup (void *cls) |
561 | { | ||
562 | struct GNUNET_PSYC_Master *mst = cls; | ||
563 | channel_cleanup (&mst->chn); | ||
564 | GNUNET_free (mst); | ||
565 | } | ||
557 | 566 | ||
558 | 567 | ||
559 | static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = | 568 | static void |
569 | slave_cleanup (void *cls) | ||
560 | { | 570 | { |
561 | { &channel_recv_message, NULL, | 571 | struct GNUNET_PSYC_Slave *slv = cls; |
562 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | 572 | channel_cleanup (&slv->chn); |
563 | sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, | 573 | GNUNET_free (slv); |
564 | 574 | } | |
565 | { &channel_recv_message_ack, NULL, | ||
566 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | ||
567 | sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, | ||
568 | 575 | ||
569 | { &slave_recv_join_ack, NULL, | ||
570 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK, | ||
571 | sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, | ||
572 | 576 | ||
573 | { &slave_recv_join_decision, NULL, | 577 | static void |
574 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, | 578 | channel_disconnect (struct GNUNET_PSYC_Channel *chn, |
575 | sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, | 579 | GNUNET_ContinuationCallback cb, |
580 | void *cls) | ||
581 | { | ||
582 | chn->is_disconnecting = GNUNET_YES; | ||
583 | chn->disconnect_cb = cb; | ||
584 | chn->disconnect_cls = cls; | ||
576 | 585 | ||
577 | { &channel_recv_history_result, NULL, | 586 | // FIXME: wait till queued messages are sent |
578 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | 587 | if (NULL != chn->mq) |
579 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | 588 | { |
589 | GNUNET_MQ_destroy (chn->mq); | ||
590 | chn->mq = NULL; | ||
591 | } | ||
592 | } | ||
580 | 593 | ||
581 | { &channel_recv_state_result, NULL, | ||
582 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
583 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
584 | 594 | ||
585 | { &channel_recv_result, NULL, | 595 | /*** MASTER ***/ |
586 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
587 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
588 | 596 | ||
589 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | ||
590 | 597 | ||
591 | { NULL, NULL, 0, 0, GNUNET_NO } | 598 | static void |
592 | }; | 599 | master_connect (struct GNUNET_PSYC_Master *mst); |
593 | 600 | ||
594 | 601 | ||
595 | static void | 602 | static void |
596 | channel_cleanup (struct GNUNET_PSYC_Channel *chn) | 603 | master_reconnect (void *cls) |
597 | { | 604 | { |
598 | GNUNET_PSYC_transmit_destroy (chn->tmit); | 605 | master_connect (cls); |
599 | GNUNET_PSYC_receive_destroy (chn->recv); | ||
600 | GNUNET_free (chn->connect_msg); | ||
601 | if (NULL != chn->disconnect_cb) | ||
602 | chn->disconnect_cb (chn->disconnect_cls); | ||
603 | } | 606 | } |
604 | 607 | ||
605 | 608 | ||
609 | /** | ||
610 | * Master client disconnected from service. | ||
611 | * | ||
612 | * Reconnect after backoff period. | ||
613 | */ | ||
606 | static void | 614 | static void |
607 | master_cleanup (void *cls) | 615 | master_disconnected (void *cls, enum GNUNET_MQ_Error error) |
608 | { | 616 | { |
609 | struct GNUNET_PSYC_Master *mst = cls; | 617 | struct GNUNET_PSYC_Master *mst = cls; |
610 | channel_cleanup (&mst->chn); | 618 | struct GNUNET_PSYC_Channel *chn = &mst->chn; |
611 | GNUNET_free (mst); | 619 | |
620 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
621 | "Master client disconnected (%d), re-connecting\n", | ||
622 | (int) error); | ||
623 | if (NULL != chn->mq) | ||
624 | { | ||
625 | GNUNET_MQ_destroy (chn->mq); | ||
626 | chn->mq = NULL; | ||
627 | } | ||
628 | if (NULL != chn->tmit) | ||
629 | { | ||
630 | GNUNET_PSYC_transmit_destroy (chn->tmit); | ||
631 | chn->tmit = NULL; | ||
632 | } | ||
633 | |||
634 | chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay, | ||
635 | master_reconnect, | ||
636 | mst); | ||
637 | chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay); | ||
612 | } | 638 | } |
613 | 639 | ||
614 | 640 | ||
615 | static void | 641 | static void |
616 | slave_cleanup (void *cls) | 642 | master_connect (struct GNUNET_PSYC_Master *mst) |
617 | { | 643 | { |
618 | struct GNUNET_PSYC_Slave *slv = cls; | 644 | struct GNUNET_PSYC_Channel *chn = &mst->chn; |
619 | channel_cleanup (&slv->chn); | 645 | |
620 | GNUNET_free (slv); | 646 | GNUNET_MQ_hd_fixed_size (master_start_ack, |
647 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, | ||
648 | struct GNUNET_PSYC_CountersResultMessage); | ||
649 | |||
650 | GNUNET_MQ_hd_var_size (master_join_request, | ||
651 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | ||
652 | struct GNUNET_PSYC_JoinRequestMessage); | ||
653 | |||
654 | GNUNET_MQ_hd_var_size (channel_message, | ||
655 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | ||
656 | struct GNUNET_PSYC_MessageHeader); | ||
657 | |||
658 | GNUNET_MQ_hd_fixed_size (channel_message_ack, | ||
659 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | ||
660 | struct GNUNET_MessageHeader); | ||
661 | |||
662 | GNUNET_MQ_hd_var_size (channel_history_result, | ||
663 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | ||
664 | struct GNUNET_OperationResultMessage); | ||
665 | |||
666 | GNUNET_MQ_hd_var_size (channel_state_result, | ||
667 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
668 | struct GNUNET_OperationResultMessage); | ||
669 | |||
670 | GNUNET_MQ_hd_var_size (channel_result, | ||
671 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
672 | struct GNUNET_OperationResultMessage); | ||
673 | |||
674 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
675 | make_master_start_ack_handler (mst), | ||
676 | make_master_join_request_handler (mst), | ||
677 | make_channel_message_handler (chn), | ||
678 | make_channel_message_ack_handler (chn), | ||
679 | make_channel_history_result_handler (chn), | ||
680 | make_channel_state_result_handler (chn), | ||
681 | make_channel_result_handler (chn), | ||
682 | GNUNET_MQ_handler_end () | ||
683 | }; | ||
684 | |||
685 | chn->mq = GNUNET_CLIENT_connecT (chn->cfg, "psyc", | ||
686 | handlers, master_disconnected, mst); | ||
687 | GNUNET_assert (NULL != chn->mq); | ||
688 | chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); | ||
689 | |||
690 | GNUNET_MQ_send_copy (chn->mq, chn->connect_env); | ||
621 | } | 691 | } |
622 | 692 | ||
623 | 693 | ||
@@ -664,26 +734,23 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
664 | struct GNUNET_PSYC_Channel *chn = &mst->chn; | 734 | struct GNUNET_PSYC_Channel *chn = &mst->chn; |
665 | 735 | ||
666 | struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); | 736 | struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); |
667 | req->header.size = htons (sizeof (*req)); | 737 | chn->connect_env = GNUNET_MQ_msg (req, |
668 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); | 738 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); |
669 | req->channel_key = *channel_key; | 739 | req->channel_key = *channel_key; |
670 | req->policy = policy; | 740 | req->policy = policy; |
671 | 741 | ||
672 | chn->connect_msg = &req->header; | ||
673 | chn->cfg = cfg; | 742 | chn->cfg = cfg; |
674 | chn->is_master = GNUNET_YES; | 743 | chn->is_master = GNUNET_YES; |
744 | chn->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; | ||
745 | |||
746 | chn->op = GNUNET_OP_create (); | ||
747 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); | ||
675 | 748 | ||
676 | mst->start_cb = start_cb; | 749 | mst->start_cb = start_cb; |
677 | mst->join_req_cb = join_request_cb; | 750 | mst->join_req_cb = join_request_cb; |
678 | mst->cb_cls = cls; | 751 | mst->cb_cls = cls; |
679 | 752 | ||
680 | chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", master_handlers); | 753 | master_connect (mst); |
681 | GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn)); | ||
682 | |||
683 | chn->tmit = GNUNET_PSYC_transmit_create (chn->client); | ||
684 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); | ||
685 | |||
686 | channel_send_connect_msg (chn); | ||
687 | return mst; | 754 | return mst; |
688 | } | 755 | } |
689 | 756 | ||
@@ -704,12 +771,8 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst, | |||
704 | 771 | ||
705 | /* FIXME: send msg to service */ | 772 | /* FIXME: send msg to service */ |
706 | 773 | ||
707 | chn->is_disconnecting = GNUNET_YES; | 774 | channel_disconnect (chn, stop_cb, stop_cls); |
708 | chn->disconnect_cb = stop_cb; | 775 | master_cleanup (mst); |
709 | chn->disconnect_cls = stop_cls; | ||
710 | |||
711 | GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES, | ||
712 | &master_cleanup, mst); | ||
713 | } | 776 | } |
714 | 777 | ||
715 | 778 | ||
@@ -753,17 +816,16 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
753 | < sizeof (*dcsn) + relay_size + join_resp_size) | 816 | < sizeof (*dcsn) + relay_size + join_resp_size) |
754 | return GNUNET_SYSERR; | 817 | return GNUNET_SYSERR; |
755 | 818 | ||
756 | dcsn = GNUNET_malloc (sizeof (*dcsn) + relay_size + join_resp_size); | 819 | struct GNUNET_MQ_Envelope * |
757 | dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); | 820 | env = GNUNET_MQ_msg_extra (dcsn, relay_size + join_resp_size, |
758 | dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); | 821 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); |
759 | dcsn->is_admitted = htonl (is_admitted); | 822 | dcsn->is_admitted = htonl (is_admitted); |
760 | dcsn->slave_pub_key = jh->slave_pub_key; | 823 | dcsn->slave_pub_key = jh->slave_pub_key; |
761 | 824 | ||
762 | if (0 < join_resp_size) | 825 | if (0 < join_resp_size) |
763 | GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size); | 826 | GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size); |
764 | 827 | ||
765 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header); | 828 | GNUNET_MQ_send (chn->mq, env); |
766 | GNUNET_free (dcsn); | ||
767 | GNUNET_free (jh); | 829 | GNUNET_free (jh); |
768 | return GNUNET_OK; | 830 | return GNUNET_OK; |
769 | } | 831 | } |
@@ -838,6 +900,104 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) | |||
838 | } | 900 | } |
839 | 901 | ||
840 | 902 | ||
903 | /*** SLAVE ***/ | ||
904 | |||
905 | |||
906 | static void | ||
907 | slave_connect (struct GNUNET_PSYC_Slave *slv); | ||
908 | |||
909 | |||
910 | static void | ||
911 | slave_reconnect (void *cls) | ||
912 | { | ||
913 | slave_connect (cls); | ||
914 | } | ||
915 | |||
916 | |||
917 | /** | ||
918 | * Slave client disconnected from service. | ||
919 | * | ||
920 | * Reconnect after backoff period. | ||
921 | */ | ||
922 | static void | ||
923 | slave_disconnected (void *cls, enum GNUNET_MQ_Error error) | ||
924 | { | ||
925 | struct GNUNET_PSYC_Slave *slv = cls; | ||
926 | struct GNUNET_PSYC_Channel *chn = &slv->chn; | ||
927 | |||
928 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
929 | "Slave client disconnected (%d), re-connecting\n", | ||
930 | (int) error); | ||
931 | if (NULL != chn->mq) | ||
932 | { | ||
933 | GNUNET_MQ_destroy (chn->mq); | ||
934 | chn->mq = NULL; | ||
935 | } | ||
936 | if (NULL != chn->tmit) | ||
937 | { | ||
938 | GNUNET_PSYC_transmit_destroy (chn->tmit); | ||
939 | chn->tmit = NULL; | ||
940 | } | ||
941 | chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay, | ||
942 | slave_reconnect, | ||
943 | slv); | ||
944 | chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay); | ||
945 | } | ||
946 | |||
947 | |||
948 | static void | ||
949 | slave_connect (struct GNUNET_PSYC_Slave *slv) | ||
950 | { | ||
951 | struct GNUNET_PSYC_Channel *chn = &slv->chn; | ||
952 | |||
953 | GNUNET_MQ_hd_fixed_size (slave_join_ack, | ||
954 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK, | ||
955 | struct GNUNET_PSYC_CountersResultMessage); | ||
956 | |||
957 | GNUNET_MQ_hd_var_size (slave_join_decision, | ||
958 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, | ||
959 | struct GNUNET_PSYC_JoinDecisionMessage); | ||
960 | |||
961 | GNUNET_MQ_hd_var_size (channel_message, | ||
962 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | ||
963 | struct GNUNET_PSYC_MessageHeader); | ||
964 | |||
965 | GNUNET_MQ_hd_fixed_size (channel_message_ack, | ||
966 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | ||
967 | struct GNUNET_MessageHeader); | ||
968 | |||
969 | GNUNET_MQ_hd_var_size (channel_history_result, | ||
970 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | ||
971 | struct GNUNET_OperationResultMessage); | ||
972 | |||
973 | GNUNET_MQ_hd_var_size (channel_state_result, | ||
974 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
975 | struct GNUNET_OperationResultMessage); | ||
976 | |||
977 | GNUNET_MQ_hd_var_size (channel_result, | ||
978 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
979 | struct GNUNET_OperationResultMessage); | ||
980 | |||
981 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
982 | make_slave_join_ack_handler (slv), | ||
983 | make_slave_join_decision_handler (slv), | ||
984 | make_channel_message_handler (chn), | ||
985 | make_channel_message_ack_handler (chn), | ||
986 | make_channel_history_result_handler (chn), | ||
987 | make_channel_state_result_handler (chn), | ||
988 | make_channel_result_handler (chn), | ||
989 | GNUNET_MQ_handler_end () | ||
990 | }; | ||
991 | |||
992 | chn->mq = GNUNET_CLIENT_connecT (chn->cfg, "psyc", | ||
993 | handlers, slave_disconnected, slv); | ||
994 | GNUNET_assert (NULL != chn->mq); | ||
995 | chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); | ||
996 | |||
997 | GNUNET_MQ_send_copy (chn->mq, chn->connect_env); | ||
998 | } | ||
999 | |||
1000 | |||
841 | /** | 1001 | /** |
842 | * Join a PSYC channel. | 1002 | * Join a PSYC channel. |
843 | * | 1003 | * |
@@ -892,15 +1052,14 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
892 | struct GNUNET_PSYC_Channel *chn = &slv->chn; | 1052 | struct GNUNET_PSYC_Channel *chn = &slv->chn; |
893 | uint16_t relay_size = relay_count * sizeof (*relays); | 1053 | uint16_t relay_size = relay_count * sizeof (*relays); |
894 | uint16_t join_msg_size; | 1054 | uint16_t join_msg_size; |
895 | struct SlaveJoinRequest *req; | ||
896 | |||
897 | if (NULL == join_msg) | 1055 | if (NULL == join_msg) |
898 | join_msg_size = 0; | 1056 | join_msg_size = 0; |
899 | else | 1057 | else |
900 | join_msg_size = ntohs (join_msg->header.size); | 1058 | join_msg_size = ntohs (join_msg->header.size); |
901 | req = GNUNET_malloc (sizeof (*req) + relay_size + join_msg_size); | 1059 | |
902 | req->header.size = htons (sizeof (*req) + relay_size + join_msg_size); | 1060 | struct SlaveJoinRequest *req; |
903 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); | 1061 | chn->connect_env = GNUNET_MQ_msg_extra (req, relay_size + join_msg_size, |
1062 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); | ||
904 | req->channel_pub_key = *channel_pub_key; | 1063 | req->channel_pub_key = *channel_pub_key; |
905 | req->slave_key = *slave_key; | 1064 | req->slave_key = *slave_key; |
906 | req->origin = *origin; | 1065 | req->origin = *origin; |
@@ -913,21 +1072,18 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
913 | if (NULL != join_msg) | 1072 | if (NULL != join_msg) |
914 | GNUNET_memcpy ((char *) &req[1] + relay_size, join_msg, join_msg_size); | 1073 | GNUNET_memcpy ((char *) &req[1] + relay_size, join_msg, join_msg_size); |
915 | 1074 | ||
916 | chn->connect_msg = &req->header; | ||
917 | chn->cfg = cfg; | 1075 | chn->cfg = cfg; |
918 | chn->is_master = GNUNET_NO; | 1076 | chn->is_master = GNUNET_NO; |
1077 | chn->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; | ||
1078 | |||
1079 | chn->op = GNUNET_OP_create (); | ||
1080 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); | ||
919 | 1081 | ||
920 | slv->connect_cb = connect_cb; | 1082 | slv->connect_cb = connect_cb; |
921 | slv->join_dcsn_cb = join_decision_cb; | 1083 | slv->join_dcsn_cb = join_decision_cb; |
922 | slv->cb_cls = cls; | 1084 | slv->cb_cls = cls; |
923 | 1085 | ||
924 | chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers); | 1086 | slave_connect (slv); |
925 | GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn)); | ||
926 | |||
927 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); | ||
928 | chn->tmit = GNUNET_PSYC_transmit_create (chn->client); | ||
929 | |||
930 | channel_send_connect_msg (chn); | ||
931 | return slv; | 1087 | return slv; |
932 | } | 1088 | } |
933 | 1089 | ||
@@ -950,12 +1106,8 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv, | |||
950 | 1106 | ||
951 | /* FIXME: send msg to service */ | 1107 | /* FIXME: send msg to service */ |
952 | 1108 | ||
953 | chn->is_disconnecting = GNUNET_YES; | 1109 | channel_disconnect (chn, part_cb, part_cls); |
954 | chn->disconnect_cb = part_cb; | 1110 | slave_cleanup (slv); |
955 | chn->disconnect_cls = part_cls; | ||
956 | |||
957 | GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES, | ||
958 | &slave_cleanup, slv); | ||
959 | } | 1111 | } |
960 | 1112 | ||
961 | 1113 | ||
@@ -1069,18 +1221,16 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, | |||
1069 | GNUNET_ResultCallback result_cb, | 1221 | GNUNET_ResultCallback result_cb, |
1070 | void *cls) | 1222 | void *cls) |
1071 | { | 1223 | { |
1072 | struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); | 1224 | struct ChannelMembershipStoreRequest *req; |
1073 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); | 1225 | struct GNUNET_MQ_Envelope * |
1074 | req->header.size = htons (sizeof (*req)); | 1226 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); |
1075 | req->slave_pub_key = *slave_pub_key; | 1227 | req->slave_pub_key = *slave_pub_key; |
1076 | req->announced_at = GNUNET_htonll (announced_at); | 1228 | req->announced_at = GNUNET_htonll (announced_at); |
1077 | req->effective_since = GNUNET_htonll (effective_since); | 1229 | req->effective_since = GNUNET_htonll (effective_since); |
1078 | req->did_join = GNUNET_YES; | 1230 | req->did_join = GNUNET_YES; |
1079 | req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client, | 1231 | req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); |
1080 | result_cb, cls)); | ||
1081 | 1232 | ||
1082 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1233 | GNUNET_MQ_send (chn->mq, env); |
1083 | GNUNET_free (req); | ||
1084 | } | 1234 | } |
1085 | 1235 | ||
1086 | 1236 | ||
@@ -1122,17 +1272,15 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, | |||
1122 | GNUNET_ResultCallback result_cb, | 1272 | GNUNET_ResultCallback result_cb, |
1123 | void *cls) | 1273 | void *cls) |
1124 | { | 1274 | { |
1125 | struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); | 1275 | struct ChannelMembershipStoreRequest *req; |
1126 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); | 1276 | struct GNUNET_MQ_Envelope * |
1127 | req->header.size = htons (sizeof (*req)); | 1277 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); |
1128 | req->slave_pub_key = *slave_pub_key; | 1278 | req->slave_pub_key = *slave_pub_key; |
1129 | req->announced_at = GNUNET_htonll (announced_at); | 1279 | req->announced_at = GNUNET_htonll (announced_at); |
1130 | req->did_join = GNUNET_NO; | 1280 | req->did_join = GNUNET_NO; |
1131 | req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client, | 1281 | req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); |
1132 | result_cb, cls)); | ||
1133 | 1282 | ||
1134 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1283 | GNUNET_MQ_send (chn->mq, env); |
1135 | GNUNET_free (req); | ||
1136 | } | 1284 | } |
1137 | 1285 | ||
1138 | 1286 | ||
@@ -1154,17 +1302,17 @@ channel_history_replay (struct GNUNET_PSYC_Channel *chn, | |||
1154 | hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); | 1302 | hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); |
1155 | hist->result_cb = result_cb; | 1303 | hist->result_cb = result_cb; |
1156 | hist->cls = cls; | 1304 | hist->cls = cls; |
1157 | hist->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client, | 1305 | hist->op_id = GNUNET_OP_add (chn->op, op_recv_history_result, hist, NULL); |
1158 | &op_recv_history_result, hist); | ||
1159 | 1306 | ||
1160 | GNUNET_assert (NULL != method_prefix); | 1307 | GNUNET_assert (NULL != method_prefix); |
1161 | uint16_t method_size = strnlen (method_prefix, | 1308 | uint16_t method_size = strnlen (method_prefix, |
1162 | GNUNET_SERVER_MAX_MESSAGE_SIZE | 1309 | GNUNET_SERVER_MAX_MESSAGE_SIZE |
1163 | - sizeof (*req)) + 1; | 1310 | - sizeof (*req)) + 1; |
1164 | GNUNET_assert ('\0' == method_prefix[method_size - 1]); | 1311 | GNUNET_assert ('\0' == method_prefix[method_size - 1]); |
1165 | req = GNUNET_malloc (sizeof (*req) + method_size); | 1312 | |
1166 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); | 1313 | struct GNUNET_MQ_Envelope * |
1167 | req->header.size = htons (sizeof (*req) + method_size); | 1314 | env = GNUNET_MQ_msg_extra (req, method_size, |
1315 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); | ||
1168 | req->start_message_id = GNUNET_htonll (start_message_id); | 1316 | req->start_message_id = GNUNET_htonll (start_message_id); |
1169 | req->end_message_id = GNUNET_htonll (end_message_id); | 1317 | req->end_message_id = GNUNET_htonll (end_message_id); |
1170 | req->message_limit = GNUNET_htonll (message_limit); | 1318 | req->message_limit = GNUNET_htonll (message_limit); |
@@ -1172,8 +1320,7 @@ channel_history_replay (struct GNUNET_PSYC_Channel *chn, | |||
1172 | req->op_id = GNUNET_htonll (hist->op_id); | 1320 | req->op_id = GNUNET_htonll (hist->op_id); |
1173 | GNUNET_memcpy (&req[1], method_prefix, method_size); | 1321 | GNUNET_memcpy (&req[1], method_prefix, method_size); |
1174 | 1322 | ||
1175 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1323 | GNUNET_MQ_send (chn->mq, env); |
1176 | GNUNET_free (req); | ||
1177 | return hist; | 1324 | return hist; |
1178 | } | 1325 | } |
1179 | 1326 | ||
@@ -1263,7 +1410,7 @@ GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel, | |||
1263 | struct GNUNET_PSYC_HistoryRequest *hist) | 1410 | struct GNUNET_PSYC_HistoryRequest *hist) |
1264 | { | 1411 | { |
1265 | GNUNET_PSYC_receive_destroy (hist->recv); | 1412 | GNUNET_PSYC_receive_destroy (hist->recv); |
1266 | GNUNET_CLIENT_MANAGER_op_cancel (hist->chn->client, hist->op_id); | 1413 | GNUNET_OP_remove (hist->chn->op, hist->op_id); |
1267 | GNUNET_free (hist); | 1414 | GNUNET_free (hist); |
1268 | } | 1415 | } |
1269 | 1416 | ||
@@ -1301,20 +1448,17 @@ channel_state_get (struct GNUNET_PSYC_Channel *chn, | |||
1301 | sr->var_cb = var_cb; | 1448 | sr->var_cb = var_cb; |
1302 | sr->result_cb = result_cb; | 1449 | sr->result_cb = result_cb; |
1303 | sr->cls = cls; | 1450 | sr->cls = cls; |
1304 | sr->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client, | 1451 | sr->op_id = GNUNET_OP_add (chn->op, op_recv_state_result, sr, NULL); |
1305 | &op_recv_state_result, sr); | ||
1306 | 1452 | ||
1307 | GNUNET_assert (NULL != name); | 1453 | GNUNET_assert (NULL != name); |
1308 | size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE | 1454 | size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE |
1309 | - sizeof (*req)) + 1; | 1455 | - sizeof (*req)) + 1; |
1310 | req = GNUNET_malloc (sizeof (*req) + name_size); | 1456 | struct GNUNET_MQ_Envelope * |
1311 | req->header.type = htons (type); | 1457 | env = GNUNET_MQ_msg_extra (req, name_size, type); |
1312 | req->header.size = htons (sizeof (*req) + name_size); | ||
1313 | req->op_id = GNUNET_htonll (sr->op_id); | 1458 | req->op_id = GNUNET_htonll (sr->op_id); |
1314 | GNUNET_memcpy (&req[1], name, name_size); | 1459 | GNUNET_memcpy (&req[1], name, name_size); |
1315 | 1460 | ||
1316 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1461 | GNUNET_MQ_send (chn->mq, env); |
1317 | GNUNET_free (req); | ||
1318 | return sr; | 1462 | return sr; |
1319 | } | 1463 | } |
1320 | 1464 | ||
@@ -1397,7 +1541,7 @@ GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn, | |||
1397 | void | 1541 | void |
1398 | GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr) | 1542 | GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr) |
1399 | { | 1543 | { |
1400 | GNUNET_CLIENT_MANAGER_op_cancel (sr->chn->client, sr->op_id); | 1544 | GNUNET_OP_remove (sr->chn->op, sr->op_id); |
1401 | GNUNET_free (sr); | 1545 | GNUNET_free (sr); |
1402 | } | 1546 | } |
1403 | 1547 | ||