diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-06-23 19:14:56 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-06-23 19:14:56 +0000 |
commit | 44ad80664bfcb495eb82599680bde78ccb02cd6d (patch) | |
tree | 417ec4ada2e30193733e5d1fe76180697d56ea65 /src/core/core_api_monitor_peers.c | |
parent | 7cc4fa45f062262d8d35940c04891c54be2e92bb (diff) | |
download | gnunet-44ad80664bfcb495eb82599680bde78ccb02cd6d.tar.gz gnunet-44ad80664bfcb495eb82599680bde78ccb02cd6d.zip |
converting core monitor to MQ
Diffstat (limited to 'src/core/core_api_monitor_peers.c')
-rw-r--r-- | src/core/core_api_monitor_peers.c | 171 |
1 files changed, 61 insertions, 110 deletions
diff --git a/src/core/core_api_monitor_peers.c b/src/core/core_api_monitor_peers.c index 419670d7c..bafcd3e94 100644 --- a/src/core/core_api_monitor_peers.c +++ b/src/core/core_api_monitor_peers.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2009-2014 GNUnet e.V. | 3 | Copyright (C) 2009-2014, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -43,12 +43,7 @@ struct GNUNET_CORE_MonitorHandle | |||
43 | /** | 43 | /** |
44 | * Our connection to the service. | 44 | * Our connection to the service. |
45 | */ | 45 | */ |
46 | struct GNUNET_CLIENT_Connection *client; | 46 | struct GNUNET_MQ_Handle *mq; |
47 | |||
48 | /** | ||
49 | * Handle for transmitting a request. | ||
50 | */ | ||
51 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
52 | 47 | ||
53 | /** | 48 | /** |
54 | * Function called with the peer. | 49 | * Function called with the peer. |
@@ -64,47 +59,30 @@ struct GNUNET_CORE_MonitorHandle | |||
64 | 59 | ||
65 | 60 | ||
66 | /** | 61 | /** |
67 | * Transmits the monitor request to the CORE service. | 62 | * Protocol error, reconnect to CORE service and notify |
68 | * | 63 | * client. |
69 | * Function is called to notify a client about the socket begin ready | ||
70 | * to queue more data. @a buf will be NULL and @a size zero if the | ||
71 | * socket was closed for writing in the meantime. | ||
72 | * | 64 | * |
73 | * @param cls closure, our `struct GNUNET_CORE_MonitorHandle *` | 65 | * @param mh monitoring session to reconnect to CORE |
74 | * @param size number of bytes available in @a buf | ||
75 | * @param buf where the callee should write the message | ||
76 | * @return number of bytes written to @a buf | ||
77 | */ | 66 | */ |
78 | static size_t | 67 | static void |
79 | transmit_monitor_request (void *cls, | 68 | reconnect (struct GNUNET_CORE_MonitorHandle *mh); |
80 | size_t size, | ||
81 | void *buf); | ||
82 | 69 | ||
83 | 70 | ||
84 | /** | 71 | /** |
85 | * Protocol error, reconnect to CORE service and notify | 72 | * Generic error handler, called with the appropriate error code and |
86 | * client. | 73 | * the same closure specified at the creation of the message queue. |
74 | * Not every message queue implementation supports an error handler. | ||
87 | * | 75 | * |
88 | * @param mh monitoring session to reconnect to CORE | 76 | * @param cls closure, a `struct GNUNET_CORE_MonitorHandle *` |
77 | * @param error error code | ||
89 | */ | 78 | */ |
90 | static void | 79 | static void |
91 | reconnect (struct GNUNET_CORE_MonitorHandle *mh) | 80 | handle_mq_error (void *cls, |
81 | enum GNUNET_MQ_Error error) | ||
92 | { | 82 | { |
93 | GNUNET_CLIENT_disconnect (mh->client); | 83 | struct GNUNET_CORE_MonitorHandle *mh = cls; |
94 | /* FIXME: use backoff? */ | 84 | |
95 | mh->client = GNUNET_CLIENT_connect ("core", mh->cfg); | 85 | reconnect (mh); |
96 | GNUNET_assert (NULL != mh->client); | ||
97 | mh->th = | ||
98 | GNUNET_CLIENT_notify_transmit_ready (mh->client, | ||
99 | sizeof (struct GNUNET_MessageHeader), | ||
100 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
101 | GNUNET_YES, | ||
102 | &transmit_monitor_request, mh); | ||
103 | /* notify callback about reconnect */ | ||
104 | mh->peer_cb (mh->peer_cb_cls, | ||
105 | NULL, | ||
106 | GNUNET_CORE_KX_CORE_DISCONNECT, | ||
107 | GNUNET_TIME_UNIT_FOREVER_ABS); | ||
108 | } | 86 | } |
109 | 87 | ||
110 | 88 | ||
@@ -112,34 +90,14 @@ reconnect (struct GNUNET_CORE_MonitorHandle *mh) | |||
112 | * Receive reply from CORE service with information about a peer. | 90 | * Receive reply from CORE service with information about a peer. |
113 | * | 91 | * |
114 | * @param cls our `struct GNUNET_CORE_MonitorHandle *` | 92 | * @param cls our `struct GNUNET_CORE_MonitorHandle *` |
115 | * @param msg NULL on error or last entry | 93 | * @param mon_message monitor message |
116 | */ | 94 | */ |
117 | static void | 95 | static void |
118 | receive_info (void *cls, | 96 | handle_receive_info (void *cls, |
119 | const struct GNUNET_MessageHeader *msg) | 97 | const struct MonitorNotifyMessage *mon_message) |
120 | { | 98 | { |
121 | struct GNUNET_CORE_MonitorHandle *mh = cls; | 99 | struct GNUNET_CORE_MonitorHandle *mh = cls; |
122 | const struct MonitorNotifyMessage *mon_message; | ||
123 | uint16_t msize; | ||
124 | 100 | ||
125 | if (NULL == msg) | ||
126 | { | ||
127 | reconnect (mh); | ||
128 | return; | ||
129 | } | ||
130 | msize = ntohs (msg->size); | ||
131 | /* Handle incorrect message type or size, disconnect and clean up */ | ||
132 | if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY) || | ||
133 | (sizeof (struct MonitorNotifyMessage) != msize)) | ||
134 | { | ||
135 | GNUNET_break (0); | ||
136 | reconnect (mh); | ||
137 | return; | ||
138 | } | ||
139 | mon_message = (const struct MonitorNotifyMessage *) msg; | ||
140 | GNUNET_CLIENT_receive (mh->client, | ||
141 | &receive_info, mh, | ||
142 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
143 | mh->peer_cb (mh->peer_cb_cls, | 101 | mh->peer_cb (mh->peer_cb_cls, |
144 | &mon_message->peer, | 102 | &mon_message->peer, |
145 | (enum GNUNET_CORE_KxState) ntohl (mon_message->state), | 103 | (enum GNUNET_CORE_KxState) ntohl (mon_message->state), |
@@ -148,40 +106,43 @@ receive_info (void *cls, | |||
148 | 106 | ||
149 | 107 | ||
150 | /** | 108 | /** |
151 | * Transmits the monitor request to the CORE service. | 109 | * Protocol error, reconnect to CORE service and notify |
152 | * | 110 | * client. |
153 | * Function is called to notify a client about the socket begin ready | ||
154 | * to queue more data. @a buf will be NULL and @a size zero if the | ||
155 | * socket was closed for writing in the meantime. | ||
156 | * | 111 | * |
157 | * @param cls closure, our `struct GNUNET_CORE_MonitorHandle *` | 112 | * @param mh monitoring session to reconnect to CORE |
158 | * @param size number of bytes available in @a buf | ||
159 | * @param buf where the callee should write the message | ||
160 | * @return number of bytes written to @a buf | ||
161 | */ | 113 | */ |
162 | static size_t | 114 | static void |
163 | transmit_monitor_request (void *cls, | 115 | reconnect (struct GNUNET_CORE_MonitorHandle *mh) |
164 | size_t size, | ||
165 | void *buf) | ||
166 | { | 116 | { |
167 | struct GNUNET_CORE_MonitorHandle *mh = cls; | 117 | GNUNET_MQ_hd_fixed_size (receive_info, |
118 | GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY, | ||
119 | struct MonitorNotifyMessage); | ||
120 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
121 | make_receive_info_handler (mh), | ||
122 | GNUNET_MQ_handler_end () | ||
123 | }; | ||
124 | struct GNUNET_MQ_Envelope *env; | ||
168 | struct GNUNET_MessageHeader *msg; | 125 | struct GNUNET_MessageHeader *msg; |
169 | int msize; | ||
170 | 126 | ||
171 | mh->th = NULL; | 127 | if (NULL != mh->mq) |
172 | msize = sizeof (struct GNUNET_MessageHeader); | 128 | GNUNET_MQ_destroy (mh->mq); |
173 | if ((size < msize) || (NULL == buf)) | 129 | /* FIXME: use backoff? */ |
174 | { | 130 | mh->mq = GNUNET_CLIENT_connecT (mh->cfg, |
175 | reconnect (mh); | 131 | "core", |
176 | return 0; | 132 | handlers, |
177 | } | 133 | &handle_mq_error, |
178 | msg = (struct GNUNET_MessageHeader *) buf; | 134 | mh); |
179 | msg->size = htons (msize); | 135 | if (NULL == mh->mq) |
180 | msg->type = htons (GNUNET_MESSAGE_TYPE_CORE_MONITOR_PEERS); | 136 | return; |
181 | GNUNET_CLIENT_receive (mh->client, | 137 | /* notify callback about reconnect */ |
182 | &receive_info, mh, | 138 | mh->peer_cb (mh->peer_cb_cls, |
183 | GNUNET_TIME_UNIT_FOREVER_REL); | 139 | NULL, |
184 | return msize; | 140 | GNUNET_CORE_KX_CORE_DISCONNECT, |
141 | GNUNET_TIME_UNIT_FOREVER_ABS); | ||
142 | env = GNUNET_MQ_msg (msg, | ||
143 | GNUNET_MESSAGE_TYPE_CORE_MONITOR_PEERS); | ||
144 | GNUNET_MQ_send (mh->mq, | ||
145 | env); | ||
185 | } | 146 | } |
186 | 147 | ||
187 | 148 | ||
@@ -207,23 +168,18 @@ GNUNET_CORE_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
207 | void *peer_cb_cls) | 168 | void *peer_cb_cls) |
208 | { | 169 | { |
209 | struct GNUNET_CORE_MonitorHandle *mh; | 170 | struct GNUNET_CORE_MonitorHandle *mh; |
210 | struct GNUNET_CLIENT_Connection *client; | ||
211 | 171 | ||
212 | GNUNET_assert (NULL != peer_cb); | 172 | GNUNET_assert (NULL != peer_cb); |
213 | client = GNUNET_CLIENT_connect ("core", cfg); | ||
214 | if (NULL == client) | ||
215 | return NULL; | ||
216 | mh = GNUNET_new (struct GNUNET_CORE_MonitorHandle); | 173 | mh = GNUNET_new (struct GNUNET_CORE_MonitorHandle); |
217 | mh->cfg = cfg; | 174 | mh->cfg = cfg; |
218 | mh->client = client; | ||
219 | mh->peer_cb = peer_cb; | 175 | mh->peer_cb = peer_cb; |
220 | mh->peer_cb_cls = peer_cb_cls; | 176 | mh->peer_cb_cls = peer_cb_cls; |
221 | mh->th = | 177 | reconnect (mh); |
222 | GNUNET_CLIENT_notify_transmit_ready (client, | 178 | if (NULL == mh->mq) |
223 | sizeof (struct GNUNET_MessageHeader), | 179 | { |
224 | GNUNET_TIME_UNIT_FOREVER_REL, | 180 | GNUNET_free (mh); |
225 | GNUNET_YES, | 181 | return NULL; |
226 | &transmit_monitor_request, mh); | 182 | } |
227 | return mh; | 183 | return mh; |
228 | } | 184 | } |
229 | 185 | ||
@@ -236,15 +192,10 @@ GNUNET_CORE_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
236 | void | 192 | void |
237 | GNUNET_CORE_monitor_stop (struct GNUNET_CORE_MonitorHandle *mh) | 193 | GNUNET_CORE_monitor_stop (struct GNUNET_CORE_MonitorHandle *mh) |
238 | { | 194 | { |
239 | if (NULL != mh->th) | 195 | if (NULL != mh->mq) |
240 | { | ||
241 | GNUNET_CLIENT_notify_transmit_ready_cancel (mh->th); | ||
242 | mh->th = NULL; | ||
243 | } | ||
244 | if (NULL != mh->client) | ||
245 | { | 196 | { |
246 | GNUNET_CLIENT_disconnect (mh->client); | 197 | GNUNET_MQ_destroy (mh->mq); |
247 | mh->client = NULL; | 198 | mh->mq = NULL; |
248 | } | 199 | } |
249 | GNUNET_free (mh); | 200 | GNUNET_free (mh); |
250 | } | 201 | } |