aboutsummaryrefslogtreecommitdiff
path: root/src/core/core_api_monitor_peers.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-06-23 19:14:56 +0000
committerChristian Grothoff <christian@grothoff.org>2016-06-23 19:14:56 +0000
commit44ad80664bfcb495eb82599680bde78ccb02cd6d (patch)
tree417ec4ada2e30193733e5d1fe76180697d56ea65 /src/core/core_api_monitor_peers.c
parent7cc4fa45f062262d8d35940c04891c54be2e92bb (diff)
downloadgnunet-44ad80664bfcb495eb82599680bde78ccb02cd6d.tar.gz
gnunet-44ad80664bfcb495eb82599680bde78ccb02cd6d.zip
converting core monitor to MQ
Diffstat (limited to 'src/core/core_api_monitor_peers.c')
-rw-r--r--src/core/core_api_monitor_peers.c171
1 files changed, 61 insertions, 110 deletions
diff --git a/src/core/core_api_monitor_peers.c b/src/core/core_api_monitor_peers.c
index 419670d7c..bafcd3e94 100644
--- a/src/core/core_api_monitor_peers.c
+++ b/src/core/core_api_monitor_peers.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2009-2014 GNUnet e.V. 3 Copyright (C) 2009-2014, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -43,12 +43,7 @@ struct GNUNET_CORE_MonitorHandle
43 /** 43 /**
44 * Our connection to the service. 44 * Our connection to the service.
45 */ 45 */
46 struct GNUNET_CLIENT_Connection *client; 46 struct GNUNET_MQ_Handle *mq;
47
48 /**
49 * Handle for transmitting a request.
50 */
51 struct GNUNET_CLIENT_TransmitHandle *th;
52 47
53 /** 48 /**
54 * Function called with the peer. 49 * Function called with the peer.
@@ -64,47 +59,30 @@ struct GNUNET_CORE_MonitorHandle
64 59
65 60
66/** 61/**
67 * Transmits the monitor request to the CORE service. 62 * Protocol error, reconnect to CORE service and notify
68 * 63 * client.
69 * Function is called to notify a client about the socket begin ready
70 * to queue more data. @a buf will be NULL and @a size zero if the
71 * socket was closed for writing in the meantime.
72 * 64 *
73 * @param cls closure, our `struct GNUNET_CORE_MonitorHandle *` 65 * @param mh monitoring session to reconnect to CORE
74 * @param size number of bytes available in @a buf
75 * @param buf where the callee should write the message
76 * @return number of bytes written to @a buf
77 */ 66 */
78static size_t 67static void
79transmit_monitor_request (void *cls, 68reconnect (struct GNUNET_CORE_MonitorHandle *mh);
80 size_t size,
81 void *buf);
82 69
83 70
84/** 71/**
85 * Protocol error, reconnect to CORE service and notify 72 * Generic error handler, called with the appropriate error code and
86 * client. 73 * the same closure specified at the creation of the message queue.
74 * Not every message queue implementation supports an error handler.
87 * 75 *
88 * @param mh monitoring session to reconnect to CORE 76 * @param cls closure, a `struct GNUNET_CORE_MonitorHandle *`
77 * @param error error code
89 */ 78 */
90static void 79static void
91reconnect (struct GNUNET_CORE_MonitorHandle *mh) 80handle_mq_error (void *cls,
81 enum GNUNET_MQ_Error error)
92{ 82{
93 GNUNET_CLIENT_disconnect (mh->client); 83 struct GNUNET_CORE_MonitorHandle *mh = cls;
94 /* FIXME: use backoff? */ 84
95 mh->client = GNUNET_CLIENT_connect ("core", mh->cfg); 85 reconnect (mh);
96 GNUNET_assert (NULL != mh->client);
97 mh->th =
98 GNUNET_CLIENT_notify_transmit_ready (mh->client,
99 sizeof (struct GNUNET_MessageHeader),
100 GNUNET_TIME_UNIT_FOREVER_REL,
101 GNUNET_YES,
102 &transmit_monitor_request, mh);
103 /* notify callback about reconnect */
104 mh->peer_cb (mh->peer_cb_cls,
105 NULL,
106 GNUNET_CORE_KX_CORE_DISCONNECT,
107 GNUNET_TIME_UNIT_FOREVER_ABS);
108} 86}
109 87
110 88
@@ -112,34 +90,14 @@ reconnect (struct GNUNET_CORE_MonitorHandle *mh)
112 * Receive reply from CORE service with information about a peer. 90 * Receive reply from CORE service with information about a peer.
113 * 91 *
114 * @param cls our `struct GNUNET_CORE_MonitorHandle *` 92 * @param cls our `struct GNUNET_CORE_MonitorHandle *`
115 * @param msg NULL on error or last entry 93 * @param mon_message monitor message
116 */ 94 */
117static void 95static void
118receive_info (void *cls, 96handle_receive_info (void *cls,
119 const struct GNUNET_MessageHeader *msg) 97 const struct MonitorNotifyMessage *mon_message)
120{ 98{
121 struct GNUNET_CORE_MonitorHandle *mh = cls; 99 struct GNUNET_CORE_MonitorHandle *mh = cls;
122 const struct MonitorNotifyMessage *mon_message;
123 uint16_t msize;
124 100
125 if (NULL == msg)
126 {
127 reconnect (mh);
128 return;
129 }
130 msize = ntohs (msg->size);
131 /* Handle incorrect message type or size, disconnect and clean up */
132 if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY) ||
133 (sizeof (struct MonitorNotifyMessage) != msize))
134 {
135 GNUNET_break (0);
136 reconnect (mh);
137 return;
138 }
139 mon_message = (const struct MonitorNotifyMessage *) msg;
140 GNUNET_CLIENT_receive (mh->client,
141 &receive_info, mh,
142 GNUNET_TIME_UNIT_FOREVER_REL);
143 mh->peer_cb (mh->peer_cb_cls, 101 mh->peer_cb (mh->peer_cb_cls,
144 &mon_message->peer, 102 &mon_message->peer,
145 (enum GNUNET_CORE_KxState) ntohl (mon_message->state), 103 (enum GNUNET_CORE_KxState) ntohl (mon_message->state),
@@ -148,40 +106,43 @@ receive_info (void *cls,
148 106
149 107
150/** 108/**
151 * Transmits the monitor request to the CORE service. 109 * Protocol error, reconnect to CORE service and notify
152 * 110 * client.
153 * Function is called to notify a client about the socket begin ready
154 * to queue more data. @a buf will be NULL and @a size zero if the
155 * socket was closed for writing in the meantime.
156 * 111 *
157 * @param cls closure, our `struct GNUNET_CORE_MonitorHandle *` 112 * @param mh monitoring session to reconnect to CORE
158 * @param size number of bytes available in @a buf
159 * @param buf where the callee should write the message
160 * @return number of bytes written to @a buf
161 */ 113 */
162static size_t 114static void
163transmit_monitor_request (void *cls, 115reconnect (struct GNUNET_CORE_MonitorHandle *mh)
164 size_t size,
165 void *buf)
166{ 116{
167 struct GNUNET_CORE_MonitorHandle *mh = cls; 117 GNUNET_MQ_hd_fixed_size (receive_info,
118 GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY,
119 struct MonitorNotifyMessage);
120 struct GNUNET_MQ_MessageHandler handlers[] = {
121 make_receive_info_handler (mh),
122 GNUNET_MQ_handler_end ()
123 };
124 struct GNUNET_MQ_Envelope *env;
168 struct GNUNET_MessageHeader *msg; 125 struct GNUNET_MessageHeader *msg;
169 int msize;
170 126
171 mh->th = NULL; 127 if (NULL != mh->mq)
172 msize = sizeof (struct GNUNET_MessageHeader); 128 GNUNET_MQ_destroy (mh->mq);
173 if ((size < msize) || (NULL == buf)) 129 /* FIXME: use backoff? */
174 { 130 mh->mq = GNUNET_CLIENT_connecT (mh->cfg,
175 reconnect (mh); 131 "core",
176 return 0; 132 handlers,
177 } 133 &handle_mq_error,
178 msg = (struct GNUNET_MessageHeader *) buf; 134 mh);
179 msg->size = htons (msize); 135 if (NULL == mh->mq)
180 msg->type = htons (GNUNET_MESSAGE_TYPE_CORE_MONITOR_PEERS); 136 return;
181 GNUNET_CLIENT_receive (mh->client, 137 /* notify callback about reconnect */
182 &receive_info, mh, 138 mh->peer_cb (mh->peer_cb_cls,
183 GNUNET_TIME_UNIT_FOREVER_REL); 139 NULL,
184 return msize; 140 GNUNET_CORE_KX_CORE_DISCONNECT,
141 GNUNET_TIME_UNIT_FOREVER_ABS);
142 env = GNUNET_MQ_msg (msg,
143 GNUNET_MESSAGE_TYPE_CORE_MONITOR_PEERS);
144 GNUNET_MQ_send (mh->mq,
145 env);
185} 146}
186 147
187 148
@@ -207,23 +168,18 @@ GNUNET_CORE_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
207 void *peer_cb_cls) 168 void *peer_cb_cls)
208{ 169{
209 struct GNUNET_CORE_MonitorHandle *mh; 170 struct GNUNET_CORE_MonitorHandle *mh;
210 struct GNUNET_CLIENT_Connection *client;
211 171
212 GNUNET_assert (NULL != peer_cb); 172 GNUNET_assert (NULL != peer_cb);
213 client = GNUNET_CLIENT_connect ("core", cfg);
214 if (NULL == client)
215 return NULL;
216 mh = GNUNET_new (struct GNUNET_CORE_MonitorHandle); 173 mh = GNUNET_new (struct GNUNET_CORE_MonitorHandle);
217 mh->cfg = cfg; 174 mh->cfg = cfg;
218 mh->client = client;
219 mh->peer_cb = peer_cb; 175 mh->peer_cb = peer_cb;
220 mh->peer_cb_cls = peer_cb_cls; 176 mh->peer_cb_cls = peer_cb_cls;
221 mh->th = 177 reconnect (mh);
222 GNUNET_CLIENT_notify_transmit_ready (client, 178 if (NULL == mh->mq)
223 sizeof (struct GNUNET_MessageHeader), 179 {
224 GNUNET_TIME_UNIT_FOREVER_REL, 180 GNUNET_free (mh);
225 GNUNET_YES, 181 return NULL;
226 &transmit_monitor_request, mh); 182 }
227 return mh; 183 return mh;
228} 184}
229 185
@@ -236,15 +192,10 @@ GNUNET_CORE_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
236void 192void
237GNUNET_CORE_monitor_stop (struct GNUNET_CORE_MonitorHandle *mh) 193GNUNET_CORE_monitor_stop (struct GNUNET_CORE_MonitorHandle *mh)
238{ 194{
239 if (NULL != mh->th) 195 if (NULL != mh->mq)
240 {
241 GNUNET_CLIENT_notify_transmit_ready_cancel (mh->th);
242 mh->th = NULL;
243 }
244 if (NULL != mh->client)
245 { 196 {
246 GNUNET_CLIENT_disconnect (mh->client); 197 GNUNET_MQ_destroy (mh->mq);
247 mh->client = NULL; 198 mh->mq = NULL;
248 } 199 }
249 GNUNET_free (mh); 200 GNUNET_free (mh);
250} 201}