aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/psyc_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r--src/psyc/psyc_api.c103
1 files changed, 68 insertions, 35 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index c93d8b383..d8f4c98bc 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -260,6 +260,10 @@ handle_channel_result (void *cls,
260 GNUNET_OP_result (chn->op, GNUNET_ntohll (res->op_id), 260 GNUNET_OP_result (chn->op, GNUNET_ntohll (res->op_id),
261 GNUNET_ntohll (res->result_code), 261 GNUNET_ntohll (res->result_code),
262 data, data_size, NULL); 262 data, data_size, NULL);
263
264 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
265 "handle_channel_result: Received result message with OP ID %" PRIu64 "\n",
266 GNUNET_ntohll (res->op_id));
263} 267}
264 268
265 269
@@ -555,6 +559,9 @@ handle_slave_join_decision (void *cls,
555static void 559static void
556channel_cleanup (struct GNUNET_PSYC_Channel *chn) 560channel_cleanup (struct GNUNET_PSYC_Channel *chn)
557{ 561{
562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
563 "cleaning up channel %p\n",
564 chn);
558 if (NULL != chn->tmit) 565 if (NULL != chn->tmit)
559 { 566 {
560 GNUNET_PSYC_transmit_destroy (chn->tmit); 567 GNUNET_PSYC_transmit_destroy (chn->tmit);
@@ -562,6 +569,7 @@ channel_cleanup (struct GNUNET_PSYC_Channel *chn)
562 } 569 }
563 if (NULL != chn->recv) 570 if (NULL != chn->recv)
564 { 571 {
572
565 GNUNET_PSYC_receive_destroy (chn->recv); 573 GNUNET_PSYC_receive_destroy (chn->recv);
566 chn->recv = NULL; 574 chn->recv = NULL;
567 } 575 }
@@ -585,30 +593,12 @@ channel_cleanup (struct GNUNET_PSYC_Channel *chn)
585 593
586 594
587static void 595static void
588channel_disconnect (struct GNUNET_PSYC_Channel *chn, 596handle_channel_part_ack (void *cls,
589 GNUNET_ContinuationCallback cb, 597 const struct GNUNET_MessageHeader *msg)
590 void *cls)
591{ 598{
592 chn->is_disconnecting = GNUNET_YES; 599 struct GNUNET_PSYC_Channel *chn = cls;
593 chn->disconnect_cb = cb;
594 chn->disconnect_cls = cls;
595 600
596 if (NULL != chn->mq) 601 channel_cleanup (chn);
597 {
598 struct GNUNET_MQ_Envelope *env = GNUNET_MQ_get_last_envelope (chn->mq);
599 if (NULL != env)
600 {
601 GNUNET_MQ_notify_sent (env, (GNUNET_SCHEDULER_TaskCallback) channel_cleanup, chn);
602 }
603 else
604 {
605 channel_cleanup (chn);
606 }
607 }
608 else
609 {
610 channel_cleanup (chn);
611 }
612} 602}
613 603
614 604
@@ -671,6 +661,10 @@ master_connect (struct GNUNET_PSYC_Master *mst)
671 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, 661 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
672 struct GNUNET_PSYC_JoinRequestMessage, 662 struct GNUNET_PSYC_JoinRequestMessage,
673 mst), 663 mst),
664 GNUNET_MQ_hd_fixed_size (channel_part_ack,
665 GNUNET_MESSAGE_TYPE_PSYC_PART_ACK,
666 struct GNUNET_MessageHeader,
667 chn),
674 GNUNET_MQ_hd_var_size (channel_message, 668 GNUNET_MQ_hd_var_size (channel_message,
675 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 669 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
676 struct GNUNET_PSYC_MessageHeader, 670 struct GNUNET_PSYC_MessageHeader,
@@ -694,8 +688,11 @@ master_connect (struct GNUNET_PSYC_Master *mst)
694 GNUNET_MQ_handler_end () 688 GNUNET_MQ_handler_end ()
695 }; 689 };
696 690
697 chn->mq = GNUNET_CLIENT_connect (chn->cfg, "psyc", 691 chn->mq = GNUNET_CLIENT_connect (chn->cfg,
698 handlers, master_disconnected, mst); 692 "psyc",
693 handlers,
694 &master_disconnected,
695 mst);
699 GNUNET_assert (NULL != chn->mq); 696 GNUNET_assert (NULL != chn->mq);
700 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); 697 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq);
701 698
@@ -780,10 +777,13 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst,
780 void *stop_cls) 777 void *stop_cls)
781{ 778{
782 struct GNUNET_PSYC_Channel *chn = &mst->chn; 779 struct GNUNET_PSYC_Channel *chn = &mst->chn;
780 struct GNUNET_MQ_Envelope *env;
783 781
784 /* FIXME: send msg to service */ 782 chn->is_disconnecting = GNUNET_YES;
785 783 chn->disconnect_cb = stop_cb;
786 channel_disconnect (chn, stop_cb, stop_cls); 784 chn->disconnect_cls = stop_cls;
785 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST);
786 GNUNET_MQ_send (chn->mq, env);
787} 787}
788 788
789 789
@@ -931,7 +931,8 @@ slave_reconnect (void *cls)
931 * Reconnect after backoff period. 931 * Reconnect after backoff period.
932 */ 932 */
933static void 933static void
934slave_disconnected (void *cls, enum GNUNET_MQ_Error error) 934slave_disconnected (void *cls,
935 enum GNUNET_MQ_Error error)
935{ 936{
936 struct GNUNET_PSYC_Slave *slv = cls; 937 struct GNUNET_PSYC_Slave *slv = cls;
937 struct GNUNET_PSYC_Channel *chn = &slv->chn; 938 struct GNUNET_PSYC_Channel *chn = &slv->chn;
@@ -950,7 +951,7 @@ slave_disconnected (void *cls, enum GNUNET_MQ_Error error)
950 chn->mq = NULL; 951 chn->mq = NULL;
951 } 952 }
952 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay, 953 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay,
953 slave_reconnect, 954 &slave_reconnect,
954 slv); 955 slv);
955 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay); 956 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay);
956} 957}
@@ -970,6 +971,10 @@ slave_connect (struct GNUNET_PSYC_Slave *slv)
970 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 971 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
971 struct GNUNET_PSYC_JoinDecisionMessage, 972 struct GNUNET_PSYC_JoinDecisionMessage,
972 slv), 973 slv),
974 GNUNET_MQ_hd_fixed_size (channel_part_ack,
975 GNUNET_MESSAGE_TYPE_PSYC_PART_ACK,
976 struct GNUNET_MessageHeader,
977 chn),
973 GNUNET_MQ_hd_var_size (channel_message, 978 GNUNET_MQ_hd_var_size (channel_message,
974 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 979 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
975 struct GNUNET_PSYC_MessageHeader, 980 struct GNUNET_PSYC_MessageHeader,
@@ -993,9 +998,19 @@ slave_connect (struct GNUNET_PSYC_Slave *slv)
993 GNUNET_MQ_handler_end () 998 GNUNET_MQ_handler_end ()
994 }; 999 };
995 1000
996 chn->mq = GNUNET_CLIENT_connect (chn->cfg, "psyc", 1001 chn->mq = GNUNET_CLIENT_connect (chn->cfg,
997 handlers, slave_disconnected, slv); 1002 "psyc",
998 GNUNET_assert (NULL != chn->mq); 1003 handlers,
1004 &slave_disconnected,
1005 slv);
1006 if (NULL == chn->mq)
1007 {
1008 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay,
1009 &slave_reconnect,
1010 slv);
1011 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay);
1012 return;
1013 }
999 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); 1014 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq);
1000 1015
1001 GNUNET_MQ_send_copy (chn->mq, chn->connect_env); 1016 GNUNET_MQ_send_copy (chn->mq, chn->connect_env);
@@ -1107,10 +1122,13 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv,
1107 void *part_cls) 1122 void *part_cls)
1108{ 1123{
1109 struct GNUNET_PSYC_Channel *chn = &slv->chn; 1124 struct GNUNET_PSYC_Channel *chn = &slv->chn;
1125 struct GNUNET_MQ_Envelope *env;
1110 1126
1111 /* FIXME: send msg to service */ 1127 chn->is_disconnecting = GNUNET_YES;
1112 1128 chn->disconnect_cb = part_cb;
1113 channel_disconnect (chn, part_cb, part_cls); 1129 chn->disconnect_cls = part_cls;
1130 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_PSYC_PART_REQUEST);
1131 GNUNET_MQ_send (chn->mq, env);
1114} 1132}
1115 1133
1116 1134
@@ -1233,6 +1251,9 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1233 req->did_join = GNUNET_YES; 1251 req->did_join = GNUNET_YES;
1234 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); 1252 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL));
1235 1253
1254 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255 "GNUNET_PSYC_channel_slave_add, OP ID: %" PRIu64 "\n",
1256 GNUNET_ntohll (req->op_id));
1236 GNUNET_MQ_send (chn->mq, env); 1257 GNUNET_MQ_send (chn->mq, env);
1237} 1258}
1238 1259
@@ -1283,6 +1304,9 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1283 req->did_join = GNUNET_NO; 1304 req->did_join = GNUNET_NO;
1284 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); 1305 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL));
1285 1306
1307 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1308 "GNUNET_PSYC_channel_slave_remove, OP ID: %" PRIu64 "\n",
1309 GNUNET_ntohll (req->op_id));
1286 GNUNET_MQ_send (chn->mq, env); 1310 GNUNET_MQ_send (chn->mq, env);
1287} 1311}
1288 1312
@@ -1321,6 +1345,10 @@ channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1321 req->message_limit = GNUNET_htonll (message_limit); 1345 req->message_limit = GNUNET_htonll (message_limit);
1322 req->flags = htonl (flags); 1346 req->flags = htonl (flags);
1323 req->op_id = GNUNET_htonll (hist->op_id); 1347 req->op_id = GNUNET_htonll (hist->op_id);
1348
1349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1350 "channel_history_replay, OP ID: %" PRIu64 "\n",
1351 GNUNET_ntohll (req->op_id));
1324 GNUNET_memcpy (&req[1], method_prefix, method_size); 1352 GNUNET_memcpy (&req[1], method_prefix, method_size);
1325 1353
1326 GNUNET_MQ_send (chn->mq, env); 1354 GNUNET_MQ_send (chn->mq, env);
@@ -1459,6 +1487,11 @@ channel_state_get (struct GNUNET_PSYC_Channel *chn,
1459 struct GNUNET_MQ_Envelope * 1487 struct GNUNET_MQ_Envelope *
1460 env = GNUNET_MQ_msg_extra (req, name_size, type); 1488 env = GNUNET_MQ_msg_extra (req, name_size, type);
1461 req->op_id = GNUNET_htonll (sr->op_id); 1489 req->op_id = GNUNET_htonll (sr->op_id);
1490
1491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1492 "channel_state_get, OP ID: %" PRIu64 "\n",
1493 GNUNET_ntohll (req->op_id));
1494
1462 GNUNET_memcpy (&req[1], name, name_size); 1495 GNUNET_memcpy (&req[1], name, name_size);
1463 1496
1464 GNUNET_MQ_send (chn->mq, env); 1497 GNUNET_MQ_send (chn->mq, env);