diff options
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r-- | src/psyc/psyc_api.c | 103 |
1 files changed, 68 insertions, 35 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index c93d8b383..d8f4c98bc 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -260,6 +260,10 @@ handle_channel_result (void *cls, | |||
260 | GNUNET_OP_result (chn->op, GNUNET_ntohll (res->op_id), | 260 | GNUNET_OP_result (chn->op, GNUNET_ntohll (res->op_id), |
261 | GNUNET_ntohll (res->result_code), | 261 | GNUNET_ntohll (res->result_code), |
262 | data, data_size, NULL); | 262 | data, data_size, NULL); |
263 | |||
264 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
265 | "handle_channel_result: Received result message with OP ID %" PRIu64 "\n", | ||
266 | GNUNET_ntohll (res->op_id)); | ||
263 | } | 267 | } |
264 | 268 | ||
265 | 269 | ||
@@ -555,6 +559,9 @@ handle_slave_join_decision (void *cls, | |||
555 | static void | 559 | static void |
556 | channel_cleanup (struct GNUNET_PSYC_Channel *chn) | 560 | channel_cleanup (struct GNUNET_PSYC_Channel *chn) |
557 | { | 561 | { |
562 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
563 | "cleaning up channel %p\n", | ||
564 | chn); | ||
558 | if (NULL != chn->tmit) | 565 | if (NULL != chn->tmit) |
559 | { | 566 | { |
560 | GNUNET_PSYC_transmit_destroy (chn->tmit); | 567 | GNUNET_PSYC_transmit_destroy (chn->tmit); |
@@ -562,6 +569,7 @@ channel_cleanup (struct GNUNET_PSYC_Channel *chn) | |||
562 | } | 569 | } |
563 | if (NULL != chn->recv) | 570 | if (NULL != chn->recv) |
564 | { | 571 | { |
572 | |||
565 | GNUNET_PSYC_receive_destroy (chn->recv); | 573 | GNUNET_PSYC_receive_destroy (chn->recv); |
566 | chn->recv = NULL; | 574 | chn->recv = NULL; |
567 | } | 575 | } |
@@ -585,30 +593,12 @@ channel_cleanup (struct GNUNET_PSYC_Channel *chn) | |||
585 | 593 | ||
586 | 594 | ||
587 | static void | 595 | static void |
588 | channel_disconnect (struct GNUNET_PSYC_Channel *chn, | 596 | handle_channel_part_ack (void *cls, |
589 | GNUNET_ContinuationCallback cb, | 597 | const struct GNUNET_MessageHeader *msg) |
590 | void *cls) | ||
591 | { | 598 | { |
592 | chn->is_disconnecting = GNUNET_YES; | 599 | struct GNUNET_PSYC_Channel *chn = cls; |
593 | chn->disconnect_cb = cb; | ||
594 | chn->disconnect_cls = cls; | ||
595 | 600 | ||
596 | if (NULL != chn->mq) | 601 | channel_cleanup (chn); |
597 | { | ||
598 | struct GNUNET_MQ_Envelope *env = GNUNET_MQ_get_last_envelope (chn->mq); | ||
599 | if (NULL != env) | ||
600 | { | ||
601 | GNUNET_MQ_notify_sent (env, (GNUNET_SCHEDULER_TaskCallback) channel_cleanup, chn); | ||
602 | } | ||
603 | else | ||
604 | { | ||
605 | channel_cleanup (chn); | ||
606 | } | ||
607 | } | ||
608 | else | ||
609 | { | ||
610 | channel_cleanup (chn); | ||
611 | } | ||
612 | } | 602 | } |
613 | 603 | ||
614 | 604 | ||
@@ -671,6 +661,10 @@ master_connect (struct GNUNET_PSYC_Master *mst) | |||
671 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | 661 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, |
672 | struct GNUNET_PSYC_JoinRequestMessage, | 662 | struct GNUNET_PSYC_JoinRequestMessage, |
673 | mst), | 663 | mst), |
664 | GNUNET_MQ_hd_fixed_size (channel_part_ack, | ||
665 | GNUNET_MESSAGE_TYPE_PSYC_PART_ACK, | ||
666 | struct GNUNET_MessageHeader, | ||
667 | chn), | ||
674 | GNUNET_MQ_hd_var_size (channel_message, | 668 | GNUNET_MQ_hd_var_size (channel_message, |
675 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | 669 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, |
676 | struct GNUNET_PSYC_MessageHeader, | 670 | struct GNUNET_PSYC_MessageHeader, |
@@ -694,8 +688,11 @@ master_connect (struct GNUNET_PSYC_Master *mst) | |||
694 | GNUNET_MQ_handler_end () | 688 | GNUNET_MQ_handler_end () |
695 | }; | 689 | }; |
696 | 690 | ||
697 | chn->mq = GNUNET_CLIENT_connect (chn->cfg, "psyc", | 691 | chn->mq = GNUNET_CLIENT_connect (chn->cfg, |
698 | handlers, master_disconnected, mst); | 692 | "psyc", |
693 | handlers, | ||
694 | &master_disconnected, | ||
695 | mst); | ||
699 | GNUNET_assert (NULL != chn->mq); | 696 | GNUNET_assert (NULL != chn->mq); |
700 | chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); | 697 | chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); |
701 | 698 | ||
@@ -780,10 +777,13 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst, | |||
780 | void *stop_cls) | 777 | void *stop_cls) |
781 | { | 778 | { |
782 | struct GNUNET_PSYC_Channel *chn = &mst->chn; | 779 | struct GNUNET_PSYC_Channel *chn = &mst->chn; |
780 | struct GNUNET_MQ_Envelope *env; | ||
783 | 781 | ||
784 | /* FIXME: send msg to service */ | 782 | chn->is_disconnecting = GNUNET_YES; |
785 | 783 | chn->disconnect_cb = stop_cb; | |
786 | channel_disconnect (chn, stop_cb, stop_cls); | 784 | chn->disconnect_cls = stop_cls; |
785 | env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST); | ||
786 | GNUNET_MQ_send (chn->mq, env); | ||
787 | } | 787 | } |
788 | 788 | ||
789 | 789 | ||
@@ -931,7 +931,8 @@ slave_reconnect (void *cls) | |||
931 | * Reconnect after backoff period. | 931 | * Reconnect after backoff period. |
932 | */ | 932 | */ |
933 | static void | 933 | static void |
934 | slave_disconnected (void *cls, enum GNUNET_MQ_Error error) | 934 | slave_disconnected (void *cls, |
935 | enum GNUNET_MQ_Error error) | ||
935 | { | 936 | { |
936 | struct GNUNET_PSYC_Slave *slv = cls; | 937 | struct GNUNET_PSYC_Slave *slv = cls; |
937 | struct GNUNET_PSYC_Channel *chn = &slv->chn; | 938 | struct GNUNET_PSYC_Channel *chn = &slv->chn; |
@@ -950,7 +951,7 @@ slave_disconnected (void *cls, enum GNUNET_MQ_Error error) | |||
950 | chn->mq = NULL; | 951 | chn->mq = NULL; |
951 | } | 952 | } |
952 | chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay, | 953 | chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay, |
953 | slave_reconnect, | 954 | &slave_reconnect, |
954 | slv); | 955 | slv); |
955 | chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay); | 956 | chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay); |
956 | } | 957 | } |
@@ -970,6 +971,10 @@ slave_connect (struct GNUNET_PSYC_Slave *slv) | |||
970 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, | 971 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, |
971 | struct GNUNET_PSYC_JoinDecisionMessage, | 972 | struct GNUNET_PSYC_JoinDecisionMessage, |
972 | slv), | 973 | slv), |
974 | GNUNET_MQ_hd_fixed_size (channel_part_ack, | ||
975 | GNUNET_MESSAGE_TYPE_PSYC_PART_ACK, | ||
976 | struct GNUNET_MessageHeader, | ||
977 | chn), | ||
973 | GNUNET_MQ_hd_var_size (channel_message, | 978 | GNUNET_MQ_hd_var_size (channel_message, |
974 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | 979 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, |
975 | struct GNUNET_PSYC_MessageHeader, | 980 | struct GNUNET_PSYC_MessageHeader, |
@@ -993,9 +998,19 @@ slave_connect (struct GNUNET_PSYC_Slave *slv) | |||
993 | GNUNET_MQ_handler_end () | 998 | GNUNET_MQ_handler_end () |
994 | }; | 999 | }; |
995 | 1000 | ||
996 | chn->mq = GNUNET_CLIENT_connect (chn->cfg, "psyc", | 1001 | chn->mq = GNUNET_CLIENT_connect (chn->cfg, |
997 | handlers, slave_disconnected, slv); | 1002 | "psyc", |
998 | GNUNET_assert (NULL != chn->mq); | 1003 | handlers, |
1004 | &slave_disconnected, | ||
1005 | slv); | ||
1006 | if (NULL == chn->mq) | ||
1007 | { | ||
1008 | chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay, | ||
1009 | &slave_reconnect, | ||
1010 | slv); | ||
1011 | chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay); | ||
1012 | return; | ||
1013 | } | ||
999 | chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); | 1014 | chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); |
1000 | 1015 | ||
1001 | GNUNET_MQ_send_copy (chn->mq, chn->connect_env); | 1016 | GNUNET_MQ_send_copy (chn->mq, chn->connect_env); |
@@ -1107,10 +1122,13 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv, | |||
1107 | void *part_cls) | 1122 | void *part_cls) |
1108 | { | 1123 | { |
1109 | struct GNUNET_PSYC_Channel *chn = &slv->chn; | 1124 | struct GNUNET_PSYC_Channel *chn = &slv->chn; |
1125 | struct GNUNET_MQ_Envelope *env; | ||
1110 | 1126 | ||
1111 | /* FIXME: send msg to service */ | 1127 | chn->is_disconnecting = GNUNET_YES; |
1112 | 1128 | chn->disconnect_cb = part_cb; | |
1113 | channel_disconnect (chn, part_cb, part_cls); | 1129 | chn->disconnect_cls = part_cls; |
1130 | env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST); | ||
1131 | GNUNET_MQ_send (chn->mq, env); | ||
1114 | } | 1132 | } |
1115 | 1133 | ||
1116 | 1134 | ||
@@ -1233,6 +1251,9 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, | |||
1233 | req->did_join = GNUNET_YES; | 1251 | req->did_join = GNUNET_YES; |
1234 | req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); | 1252 | req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); |
1235 | 1253 | ||
1254 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1255 | "GNUNET_PSYC_channel_slave_add, OP ID: %" PRIu64 "\n", | ||
1256 | GNUNET_ntohll (req->op_id)); | ||
1236 | GNUNET_MQ_send (chn->mq, env); | 1257 | GNUNET_MQ_send (chn->mq, env); |
1237 | } | 1258 | } |
1238 | 1259 | ||
@@ -1283,6 +1304,9 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, | |||
1283 | req->did_join = GNUNET_NO; | 1304 | req->did_join = GNUNET_NO; |
1284 | req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); | 1305 | req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); |
1285 | 1306 | ||
1307 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1308 | "GNUNET_PSYC_channel_slave_remove, OP ID: %" PRIu64 "\n", | ||
1309 | GNUNET_ntohll (req->op_id)); | ||
1286 | GNUNET_MQ_send (chn->mq, env); | 1310 | GNUNET_MQ_send (chn->mq, env); |
1287 | } | 1311 | } |
1288 | 1312 | ||
@@ -1321,6 +1345,10 @@ channel_history_replay (struct GNUNET_PSYC_Channel *chn, | |||
1321 | req->message_limit = GNUNET_htonll (message_limit); | 1345 | req->message_limit = GNUNET_htonll (message_limit); |
1322 | req->flags = htonl (flags); | 1346 | req->flags = htonl (flags); |
1323 | req->op_id = GNUNET_htonll (hist->op_id); | 1347 | req->op_id = GNUNET_htonll (hist->op_id); |
1348 | |||
1349 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1350 | "channel_history_replay, OP ID: %" PRIu64 "\n", | ||
1351 | GNUNET_ntohll (req->op_id)); | ||
1324 | GNUNET_memcpy (&req[1], method_prefix, method_size); | 1352 | GNUNET_memcpy (&req[1], method_prefix, method_size); |
1325 | 1353 | ||
1326 | GNUNET_MQ_send (chn->mq, env); | 1354 | GNUNET_MQ_send (chn->mq, env); |
@@ -1459,6 +1487,11 @@ channel_state_get (struct GNUNET_PSYC_Channel *chn, | |||
1459 | struct GNUNET_MQ_Envelope * | 1487 | struct GNUNET_MQ_Envelope * |
1460 | env = GNUNET_MQ_msg_extra (req, name_size, type); | 1488 | env = GNUNET_MQ_msg_extra (req, name_size, type); |
1461 | req->op_id = GNUNET_htonll (sr->op_id); | 1489 | req->op_id = GNUNET_htonll (sr->op_id); |
1490 | |||
1491 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1492 | "channel_state_get, OP ID: %" PRIu64 "\n", | ||
1493 | GNUNET_ntohll (req->op_id)); | ||
1494 | |||
1462 | GNUNET_memcpy (&req[1], name, name_size); | 1495 | GNUNET_memcpy (&req[1], name, name_size); |
1463 | 1496 | ||
1464 | GNUNET_MQ_send (chn->mq, env); | 1497 | GNUNET_MQ_send (chn->mq, env); |