diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-30 22:01:43 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-30 22:01:43 +0000 |
commit | e9dbabbabb9edd41bb7d7cb907826abdd8edb257 (patch) | |
tree | 70c9655d5aa4b68aa5b840507af4959a21f54f4c /src/core | |
parent | 33533a0e9955dcadc0857d824aa976e3fe028527 (diff) | |
download | gnunet-e9dbabbabb9edd41bb7d7cb907826abdd8edb257.tar.gz gnunet-e9dbabbabb9edd41bb7d7cb907826abdd8edb257.zip |
draft of new core API MQ-based implementation
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/Makefile.am | 1 | ||||
-rw-r--r-- | src/core/core_api.c | 38 | ||||
-rw-r--r-- | src/core/core_api_2.c | 824 |
3 files changed, 825 insertions, 38 deletions
diff --git a/src/core/Makefile.am b/src/core/Makefile.am index 3437aa43a..d22c3e01f 100644 --- a/src/core/Makefile.am +++ b/src/core/Makefile.am | |||
@@ -23,6 +23,7 @@ lib_LTLIBRARIES = \ | |||
23 | 23 | ||
24 | libgnunetcore_la_SOURCES = \ | 24 | libgnunetcore_la_SOURCES = \ |
25 | core_api.c core.h \ | 25 | core_api.c core.h \ |
26 | core_api_2.c \ | ||
26 | core_api_mq.c \ | 27 | core_api_mq.c \ |
27 | core_api_monitor_peers.c | 28 | core_api_monitor_peers.c |
28 | libgnunetcore_la_LIBADD = \ | 29 | libgnunetcore_la_LIBADD = \ |
diff --git a/src/core/core_api.c b/src/core/core_api.c index acdac5fa4..caf614afc 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c | |||
@@ -917,44 +917,6 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
917 | 917 | ||
918 | 918 | ||
919 | /** | 919 | /** |
920 | * Connect to the core service. Note that the connection may complete | ||
921 | * (or fail) asynchronously. This function primarily causes the given | ||
922 | * callback notification functions to be invoked whenever the | ||
923 | * specified event happens. The maximum number of queued | ||
924 | * notifications (queue length) is per client; the queue is shared | ||
925 | * across all types of notifications. So a slow client that registers | ||
926 | * for @a outbound_notify also risks missing @a inbound_notify messages. | ||
927 | * Certain events (such as connect/disconnect notifications) are not | ||
928 | * subject to queue size limitations. | ||
929 | * | ||
930 | * @param cfg configuration to use | ||
931 | * @param cls closure for the various callbacks that follow (including handlers in the handlers array) | ||
932 | * @param init callback to call once we have successfully | ||
933 | * connected to the core service | ||
934 | * @param connects function to call on peer connect, can be NULL | ||
935 | * @param disconnects function to call on peer disconnect / timeout, can be NULL | ||
936 | * @param handlers callbacks for messages we care about, NULL-terminated | ||
937 | * note that the core is allowed to drop notifications about inbound | ||
938 | * messages if the client does not process them fast enough (for this | ||
939 | * notification type, a bounded queue is used) | ||
940 | * @return handle to the core service (only useful for disconnect until @a init is called), | ||
941 | * NULL on error (in this case, init is never called) | ||
942 | */ | ||
943 | struct GNUNET_CORE_Handle * | ||
944 | GNUNET_CORE_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg, | ||
945 | void *cls, | ||
946 | GNUNET_CORE_StartupCallback init, | ||
947 | GNUNET_CORE_ConnecTEventHandler connects, | ||
948 | GNUNET_CORE_DisconnecTEventHandler disconnects, | ||
949 | const struct GNUNET_MQ_MessageHandler *handlers) | ||
950 | { | ||
951 | GNUNET_break (0); // not implemented | ||
952 | // NOTE: re-enable core-related tests in ats-tests/ once implemented! | ||
953 | return NULL; | ||
954 | } | ||
955 | |||
956 | |||
957 | /** | ||
958 | * Disconnect from the core service. This function can only | 920 | * Disconnect from the core service. This function can only |
959 | * be called *after* all pending #GNUNET_CORE_notify_transmit_ready() | 921 | * be called *after* all pending #GNUNET_CORE_notify_transmit_ready() |
960 | * requests have been explicitly canceled. | 922 | * requests have been explicitly canceled. |
diff --git a/src/core/core_api_2.c b/src/core/core_api_2.c new file mode 100644 index 000000000..d45c98e93 --- /dev/null +++ b/src/core/core_api_2.c | |||
@@ -0,0 +1,824 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) 2009-2016 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, | ||
18 | Boston, MA 02110-1301, USA. | ||
19 | */ | ||
20 | /** | ||
21 | * @file core/core_api_2.c | ||
22 | * @brief core service; this is the main API for encrypted P2P | ||
23 | * communications | ||
24 | * @author Christian Grothoff | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_util_lib.h" | ||
28 | #include "gnunet_constants.h" | ||
29 | #include "gnunet_core_service.h" | ||
30 | #include "core.h" | ||
31 | |||
32 | #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__) | ||
33 | |||
34 | |||
35 | /** | ||
36 | * Information we track for each peer. | ||
37 | */ | ||
38 | struct PeerRecord | ||
39 | { | ||
40 | |||
41 | /** | ||
42 | * Corresponding CORE handle. | ||
43 | */ | ||
44 | struct GNUNET_CORE_Handle *h; | ||
45 | |||
46 | /** | ||
47 | * Message queue for the peer. | ||
48 | */ | ||
49 | struct GNUNET_MQ_Handle *mq; | ||
50 | |||
51 | /** | ||
52 | * Message we are currently trying to pass to the CORE service | ||
53 | * for this peer (from @e mq). | ||
54 | */ | ||
55 | struct GNUNET_MQ_Envelope *env; | ||
56 | |||
57 | /** | ||
58 | * Value the client returned when we connected, used | ||
59 | * as the closure in various places. | ||
60 | */ | ||
61 | void *client_cls; | ||
62 | |||
63 | /** | ||
64 | * Peer the record is about. | ||
65 | */ | ||
66 | struct GNUNET_PeerIdentity peer; | ||
67 | |||
68 | /** | ||
69 | * SendMessageRequest ID generator for this peer. | ||
70 | */ | ||
71 | uint16_t smr_id_gen; | ||
72 | |||
73 | }; | ||
74 | |||
75 | |||
76 | /** | ||
77 | * Context for the core service connection. | ||
78 | */ | ||
79 | struct GNUNET_CORE_Handle | ||
80 | { | ||
81 | |||
82 | /** | ||
83 | * Configuration we're using. | ||
84 | */ | ||
85 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
86 | |||
87 | /** | ||
88 | * Closure for the various callbacks. | ||
89 | */ | ||
90 | void *cls; | ||
91 | |||
92 | /** | ||
93 | * Function to call once we've handshaked with the core service. | ||
94 | */ | ||
95 | GNUNET_CORE_StartupCallback init; | ||
96 | |||
97 | /** | ||
98 | * Function to call whenever we're notified about a peer connecting. | ||
99 | */ | ||
100 | GNUNET_CORE_ConnecTEventHandler connects; | ||
101 | |||
102 | /** | ||
103 | * Function to call whenever we're notified about a peer disconnecting. | ||
104 | */ | ||
105 | GNUNET_CORE_DisconnecTEventHandler disconnects; | ||
106 | |||
107 | /** | ||
108 | * Function handlers for messages of particular type. | ||
109 | */ | ||
110 | struct GNUNET_MQ_MessageHandler *handlers; | ||
111 | |||
112 | /** | ||
113 | * Our message queue for transmissions to the service. | ||
114 | */ | ||
115 | struct GNUNET_MQ_Handle *mq; | ||
116 | |||
117 | /** | ||
118 | * Hash map listing all of the peers that we are currently | ||
119 | * connected to. | ||
120 | */ | ||
121 | struct GNUNET_CONTAINER_MultiPeerMap *peers; | ||
122 | |||
123 | /** | ||
124 | * Identity of this peer. | ||
125 | */ | ||
126 | struct GNUNET_PeerIdentity me; | ||
127 | |||
128 | /** | ||
129 | * ID of reconnect task (if any). | ||
130 | */ | ||
131 | struct GNUNET_SCHEDULER_Task *reconnect_task; | ||
132 | |||
133 | /** | ||
134 | * Current delay we use for re-trying to connect to core. | ||
135 | */ | ||
136 | struct GNUNET_TIME_Relative retry_backoff; | ||
137 | |||
138 | /** | ||
139 | * Number of entries in the handlers array. | ||
140 | */ | ||
141 | unsigned int hcnt; | ||
142 | |||
143 | /** | ||
144 | * Did we ever get INIT? | ||
145 | */ | ||
146 | int have_init; | ||
147 | |||
148 | }; | ||
149 | |||
150 | |||
151 | /** | ||
152 | * Our current client connection went down. Clean it up | ||
153 | * and try to reconnect! | ||
154 | * | ||
155 | * @param h our handle to the core service | ||
156 | */ | ||
157 | static void | ||
158 | reconnect (struct GNUNET_CORE_Handle *h); | ||
159 | |||
160 | |||
161 | /** | ||
162 | * Task schedule to try to re-connect to core. | ||
163 | * | ||
164 | * @param cls the `struct GNUNET_CORE_Handle` | ||
165 | * @param tc task context | ||
166 | */ | ||
167 | static void | ||
168 | reconnect_task (void *cls) | ||
169 | { | ||
170 | struct GNUNET_CORE_Handle *h = cls; | ||
171 | |||
172 | h->reconnect_task = NULL; | ||
173 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
174 | "Connecting to CORE service after delay\n"); | ||
175 | reconnect (h); | ||
176 | } | ||
177 | |||
178 | |||
179 | /** | ||
180 | * Notify clients about disconnect and free the entry for connected | ||
181 | * peer. | ||
182 | * | ||
183 | * @param cls the `struct GNUNET_CORE_Handle *` | ||
184 | * @param key the peer identity (not used) | ||
185 | * @param value the `struct PeerRecord` to free. | ||
186 | * @return #GNUNET_YES (continue) | ||
187 | */ | ||
188 | static int | ||
189 | disconnect_and_free_peer_entry (void *cls, | ||
190 | const struct GNUNET_PeerIdentity *key, | ||
191 | void *value) | ||
192 | { | ||
193 | struct GNUNET_CORE_Handle *h = cls; | ||
194 | struct PeerRecord *pr = value; | ||
195 | |||
196 | GNUNET_assert (pr->h == h); | ||
197 | if (NULL != h->disconnects) | ||
198 | h->disconnects (h->cls, | ||
199 | &pr->peer, | ||
200 | pr->client_cls); | ||
201 | GNUNET_assert (GNUNET_YES == | ||
202 | GNUNET_CONTAINER_multipeermap_remove (h->peers, | ||
203 | key, | ||
204 | pr)); | ||
205 | GNUNET_MQ_destroy (pr->mq); | ||
206 | GNUNET_assert (NULL == pr->mq); | ||
207 | GNUNET_free (pr); | ||
208 | return GNUNET_YES; | ||
209 | } | ||
210 | |||
211 | |||
212 | /** | ||
213 | * Close down any existing connection to the CORE service and | ||
214 | * try re-establishing it later. | ||
215 | * | ||
216 | * @param h our handle | ||
217 | */ | ||
218 | static void | ||
219 | reconnect_later (struct GNUNET_CORE_Handle *h) | ||
220 | { | ||
221 | GNUNET_assert (NULL == h->reconnect_task); | ||
222 | if (NULL != h->mq) | ||
223 | { | ||
224 | GNUNET_MQ_destroy (h->mq); | ||
225 | h->mq = NULL; | ||
226 | } | ||
227 | GNUNET_assert (NULL == h->reconnect_task); | ||
228 | h->reconnect_task = | ||
229 | GNUNET_SCHEDULER_add_delayed (h->retry_backoff, | ||
230 | &reconnect_task, | ||
231 | h); | ||
232 | GNUNET_CONTAINER_multipeermap_iterate (h->peers, | ||
233 | &disconnect_and_free_peer_entry, | ||
234 | h); | ||
235 | h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); | ||
236 | } | ||
237 | |||
238 | |||
239 | /** | ||
240 | * Error handler for the message queue to the CORE service. | ||
241 | * On errors, we reconnect. | ||
242 | * | ||
243 | * @param cls closure, a `struct GNUNET_CORE_Handle *` | ||
244 | * @param error error code | ||
245 | */ | ||
246 | static void | ||
247 | handle_mq_error (void *cls, | ||
248 | enum GNUNET_MQ_Error error) | ||
249 | { | ||
250 | struct GNUNET_CORE_Handle *h = cls; | ||
251 | |||
252 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
253 | "MQ ERROR: %d\n", | ||
254 | error); | ||
255 | reconnect_later (h); | ||
256 | } | ||
257 | |||
258 | |||
259 | /** | ||
260 | * Implement sending functionality of a message queue for | ||
261 | * us sending messages to a peer. | ||
262 | * | ||
263 | * @param mq the message queue | ||
264 | * @param msg the message to send | ||
265 | * @param impl_state state of the implementation | ||
266 | */ | ||
267 | static void | ||
268 | core_mq_send_impl (struct GNUNET_MQ_Handle *mq, | ||
269 | const struct GNUNET_MessageHeader *msg, | ||
270 | void *impl_state) | ||
271 | { | ||
272 | struct PeerRecord *pr = impl_state; | ||
273 | struct GNUNET_CORE_Handle *h = pr->h; | ||
274 | struct SendMessageRequest *smr; | ||
275 | struct SendMessage *sm; | ||
276 | struct GNUNET_MQ_Envelope *env; | ||
277 | uint16_t msize; | ||
278 | int cork | ||
279 | = GNUNET_NO; // FIXME | ||
280 | enum GNUNET_CORE_Priority priority | ||
281 | = GNUNET_CORE_PRIO_BEST_EFFORT; // FIXME | ||
282 | |||
283 | GNUNET_assert (NULL == pr->env); | ||
284 | msize = ntohs (msg->size); | ||
285 | if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct SendMessage)) | ||
286 | { | ||
287 | GNUNET_break (0); | ||
288 | GNUNET_MQ_impl_send_continue (mq); | ||
289 | return; | ||
290 | } | ||
291 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
292 | "Asking core for transmission of %u bytes to `%s'\n", | ||
293 | (unsigned int) msize, | ||
294 | GNUNET_i2s (&pr->peer)); | ||
295 | env = GNUNET_MQ_msg (smr, | ||
296 | GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); | ||
297 | smr->priority = htonl ((uint32_t) priority); | ||
298 | // smr->deadline = GNUNET_TIME_absolute_hton (deadline); | ||
299 | smr->peer = pr->peer; | ||
300 | smr->reserved = htonl (0); | ||
301 | smr->size = htons (msize); | ||
302 | smr->smr_id = htons (++pr->smr_id_gen); | ||
303 | GNUNET_MQ_send (h->mq, | ||
304 | env); | ||
305 | pr->env = GNUNET_MQ_msg_nested_mh (sm, | ||
306 | GNUNET_MESSAGE_TYPE_CORE_SEND, | ||
307 | msg); | ||
308 | sm->priority = htonl ((uint32_t) priority); | ||
309 | // sm->deadline = GNUNET_TIME_absolute_hton (deadline); | ||
310 | sm->peer = pr->peer; | ||
311 | sm->cork = htonl ((uint32_t) cork); | ||
312 | sm->reserved = htonl (0); | ||
313 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
314 | "Calling get_message with buffer of %u bytes (%s)\n", | ||
315 | (unsigned int) msize, | ||
316 | cork ? "corked" : "uncorked"); | ||
317 | } | ||
318 | |||
319 | |||
320 | /** | ||
321 | * Handle destruction of a message queue. Implementations must not | ||
322 | * free @a mq, but should take care of @a impl_state. | ||
323 | * | ||
324 | * @param mq the message queue to destroy | ||
325 | * @param impl_state state of the implementation | ||
326 | */ | ||
327 | static void | ||
328 | core_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, | ||
329 | void *impl_state) | ||
330 | { | ||
331 | struct PeerRecord *pr = impl_state; | ||
332 | |||
333 | GNUNET_assert (mq == pr->mq); | ||
334 | pr->mq = NULL; | ||
335 | } | ||
336 | |||
337 | |||
338 | /** | ||
339 | * Implementation function that cancels the currently sent message. | ||
340 | * Should basically undo whatever #mq_send_impl() did. | ||
341 | * | ||
342 | * @param mq message queue | ||
343 | * @param impl_state state specific to the implementation | ||
344 | */ | ||
345 | static void | ||
346 | core_mq_cancel_impl (struct GNUNET_MQ_Handle *mq, | ||
347 | void *impl_state) | ||
348 | { | ||
349 | struct PeerRecord *pr = impl_state; | ||
350 | |||
351 | GNUNET_assert (NULL != pr->env); | ||
352 | GNUNET_MQ_send_cancel (pr->env); | ||
353 | pr->env = NULL; | ||
354 | } | ||
355 | |||
356 | |||
357 | /** | ||
358 | * We had an error processing a message we forwarded from a peer to | ||
359 | * the CORE service. We should just complain about it but otherwise | ||
360 | * continue processing. | ||
361 | * | ||
362 | * @param cls closure | ||
363 | * @param error error code | ||
364 | */ | ||
365 | static void | ||
366 | core_mq_error_handler (void *cls, | ||
367 | enum GNUNET_MQ_Error error) | ||
368 | { | ||
369 | /* struct PeerRecord *pr = cls; */ | ||
370 | |||
371 | GNUNET_break_op (0); | ||
372 | } | ||
373 | |||
374 | |||
375 | /** | ||
376 | * Add the given peer to the list of our connected peers | ||
377 | * and create the respective data structures and notify | ||
378 | * the application. | ||
379 | * | ||
380 | * @param h the core handle | ||
381 | * @param peer the peer that is connecting to us | ||
382 | */ | ||
383 | static void | ||
384 | connect_peer (struct GNUNET_CORE_Handle *h, | ||
385 | const struct GNUNET_PeerIdentity *peer) | ||
386 | { | ||
387 | struct PeerRecord *pr; | ||
388 | |||
389 | pr = GNUNET_new (struct PeerRecord); | ||
390 | pr->peer = *peer; | ||
391 | pr->h = h; | ||
392 | GNUNET_assert (GNUNET_YES == | ||
393 | GNUNET_CONTAINER_multipeermap_put (h->peers, | ||
394 | &pr->peer, | ||
395 | pr, | ||
396 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
397 | pr->mq = GNUNET_MQ_queue_for_callbacks (&core_mq_send_impl, | ||
398 | &core_mq_destroy_impl, | ||
399 | &core_mq_cancel_impl, | ||
400 | pr, | ||
401 | h->handlers, | ||
402 | &core_mq_error_handler, | ||
403 | pr); | ||
404 | if (NULL != h->connects) | ||
405 | { | ||
406 | pr->client_cls = h->connects (h->cls, | ||
407 | &pr->peer, | ||
408 | pr->mq); | ||
409 | GNUNET_MQ_set_handlers_closure (pr->mq, | ||
410 | pr->client_cls); | ||
411 | } | ||
412 | } | ||
413 | |||
414 | |||
415 | /** | ||
416 | * Handle init reply message received from CORE service. Notify | ||
417 | * application that we are now connected to the CORE. Also fake | ||
418 | * loopback connection. | ||
419 | * | ||
420 | * @param cls the `struct GNUNET_CORE_Handle` | ||
421 | * @param m the init reply | ||
422 | */ | ||
423 | static void | ||
424 | handle_init_reply (void *cls, | ||
425 | const struct InitReplyMessage *m) | ||
426 | { | ||
427 | struct GNUNET_CORE_Handle *h = cls; | ||
428 | GNUNET_CORE_StartupCallback init; | ||
429 | |||
430 | GNUNET_break (0 == ntohl (m->reserved)); | ||
431 | h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | ||
432 | if (NULL != (init = h->init)) | ||
433 | { | ||
434 | /* mark so we don't call init on reconnect */ | ||
435 | h->init = NULL; | ||
436 | h->me = m->my_identity; | ||
437 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
438 | "Connected to core service of peer `%s'.\n", | ||
439 | GNUNET_i2s (&h->me)); | ||
440 | h->have_init = GNUNET_YES; | ||
441 | init (h->cls, | ||
442 | &h->me); | ||
443 | } | ||
444 | else | ||
445 | { | ||
446 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
447 | "Successfully reconnected to core service.\n"); | ||
448 | if (GNUNET_NO == h->have_init) | ||
449 | { | ||
450 | h->me = m->my_identity; | ||
451 | h->have_init = GNUNET_YES; | ||
452 | } | ||
453 | else | ||
454 | { | ||
455 | GNUNET_break (0 == memcmp (&h->me, | ||
456 | &m->my_identity, | ||
457 | sizeof (struct GNUNET_PeerIdentity))); | ||
458 | } | ||
459 | } | ||
460 | /* fake 'connect to self' */ | ||
461 | connect_peer (h, | ||
462 | &h->me); | ||
463 | } | ||
464 | |||
465 | |||
466 | /** | ||
467 | * Handle connect message received from CORE service. | ||
468 | * Notify the application about the new connection. | ||
469 | * | ||
470 | * @param cls the `struct GNUNET_CORE_Handle` | ||
471 | * @param cnm the connect message | ||
472 | */ | ||
473 | static void | ||
474 | handle_connect_notify (void *cls, | ||
475 | const struct ConnectNotifyMessage *cnm) | ||
476 | { | ||
477 | struct GNUNET_CORE_Handle *h = cls; | ||
478 | struct PeerRecord *pr; | ||
479 | |||
480 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
481 | "Received notification about connection from `%s'.\n", | ||
482 | GNUNET_i2s (&cnm->peer)); | ||
483 | if (0 == memcmp (&h->me, | ||
484 | &cnm->peer, | ||
485 | sizeof (struct GNUNET_PeerIdentity))) | ||
486 | { | ||
487 | /* connect to self!? */ | ||
488 | GNUNET_break (0); | ||
489 | return; | ||
490 | } | ||
491 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, | ||
492 | &cnm->peer); | ||
493 | if (NULL != pr) | ||
494 | { | ||
495 | GNUNET_break (0); | ||
496 | reconnect_later (h); | ||
497 | return; | ||
498 | } | ||
499 | connect_peer (h, | ||
500 | &cnm->peer); | ||
501 | } | ||
502 | |||
503 | |||
504 | /** | ||
505 | * Handle disconnect message received from CORE service. | ||
506 | * Notify the application about the lost connection. | ||
507 | * | ||
508 | * @param cls the `struct GNUNET_CORE_Handle` | ||
509 | * @param dnm message about the disconnect event | ||
510 | */ | ||
511 | static void | ||
512 | handle_disconnect_notify (void *cls, | ||
513 | const struct DisconnectNotifyMessage *dnm) | ||
514 | { | ||
515 | struct GNUNET_CORE_Handle *h = cls; | ||
516 | struct PeerRecord *pr; | ||
517 | |||
518 | if (0 == memcmp (&h->me, | ||
519 | &dnm->peer, | ||
520 | sizeof (struct GNUNET_PeerIdentity))) | ||
521 | { | ||
522 | /* disconnect from self!? */ | ||
523 | GNUNET_break (0); | ||
524 | return; | ||
525 | } | ||
526 | GNUNET_break (0 == ntohl (dnm->reserved)); | ||
527 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
528 | "Received notification about disconnect from `%s'.\n", | ||
529 | GNUNET_i2s (&dnm->peer)); | ||
530 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, | ||
531 | &dnm->peer); | ||
532 | if (NULL == pr) | ||
533 | { | ||
534 | GNUNET_break (0); | ||
535 | reconnect_later (h); | ||
536 | return; | ||
537 | } | ||
538 | disconnect_and_free_peer_entry (h, | ||
539 | &pr->peer, | ||
540 | pr); | ||
541 | } | ||
542 | |||
543 | |||
544 | /** | ||
545 | * Check that message received from CORE service is well-formed. | ||
546 | * | ||
547 | * @param cls the `struct GNUNET_CORE_Handle` | ||
548 | * @param ntm the message we got | ||
549 | * @return #GNUNET_OK if the message is well-formed | ||
550 | */ | ||
551 | static int | ||
552 | check_notify_inbound (void *cls, | ||
553 | const struct NotifyTrafficMessage *ntm) | ||
554 | { | ||
555 | uint16_t msize; | ||
556 | const struct GNUNET_MessageHeader *em; | ||
557 | |||
558 | msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage); | ||
559 | if (msize < sizeof (struct GNUNET_MessageHeader)) | ||
560 | { | ||
561 | GNUNET_break (0); | ||
562 | return GNUNET_SYSERR; | ||
563 | } | ||
564 | em = (const struct GNUNET_MessageHeader *) &ntm[1]; | ||
565 | if (msize != ntohs (em->size)) | ||
566 | { | ||
567 | GNUNET_break (0); | ||
568 | return GNUNET_SYSERR; | ||
569 | } | ||
570 | return GNUNET_OK; | ||
571 | } | ||
572 | |||
573 | |||
574 | /** | ||
575 | * Handle inbound message received from CORE service. If applicable, | ||
576 | * notify the application. | ||
577 | * | ||
578 | * @param cls the `struct GNUNET_CORE_Handle` | ||
579 | * @param ntm the message we got from CORE. | ||
580 | */ | ||
581 | static void | ||
582 | handle_notify_inbound (void *cls, | ||
583 | const struct NotifyTrafficMessage *ntm) | ||
584 | { | ||
585 | struct GNUNET_CORE_Handle *h = cls; | ||
586 | const struct GNUNET_MessageHeader *em; | ||
587 | struct PeerRecord *pr; | ||
588 | |||
589 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
590 | "Received inbound message from `%s'.\n", | ||
591 | GNUNET_i2s (&ntm->peer)); | ||
592 | em = (const struct GNUNET_MessageHeader *) &ntm[1]; | ||
593 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, | ||
594 | &ntm->peer); | ||
595 | if (NULL == pr) | ||
596 | { | ||
597 | GNUNET_break (0); | ||
598 | reconnect_later (h); | ||
599 | return; | ||
600 | } | ||
601 | GNUNET_MQ_inject_message (pr->mq, | ||
602 | em); | ||
603 | } | ||
604 | |||
605 | |||
606 | /** | ||
607 | * Handle message received from CORE service notifying us that we are | ||
608 | * now allowed to send a message to a peer. If that message is still | ||
609 | * pending, put it into the queue to be transmitted. | ||
610 | * | ||
611 | * @param cls the `struct GNUNET_CORE_Handle` | ||
612 | * @param smr the message we got | ||
613 | */ | ||
614 | static void | ||
615 | handle_send_ready (void *cls, | ||
616 | const struct SendMessageReady *smr) | ||
617 | { | ||
618 | struct GNUNET_CORE_Handle *h = cls; | ||
619 | struct PeerRecord *pr; | ||
620 | |||
621 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, | ||
622 | &smr->peer); | ||
623 | if (NULL == pr) | ||
624 | { | ||
625 | GNUNET_break (0); | ||
626 | reconnect_later (h); | ||
627 | return; | ||
628 | } | ||
629 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
630 | "Received notification about transmission readiness to `%s'.\n", | ||
631 | GNUNET_i2s (&smr->peer)); | ||
632 | if (NULL == pr->env) | ||
633 | { | ||
634 | /* request must have been cancelled between the original request | ||
635 | * and the response from CORE, ignore CORE's readiness */ | ||
636 | return; | ||
637 | } | ||
638 | if (ntohs (smr->smr_id) != pr->smr_id_gen) | ||
639 | { | ||
640 | /* READY message is for expired or cancelled message, | ||
641 | * ignore! (we should have already sent another request) */ | ||
642 | return; | ||
643 | } | ||
644 | |||
645 | /* ok, all good, send message out! */ | ||
646 | GNUNET_MQ_send (h->mq, | ||
647 | pr->env); | ||
648 | pr->env = NULL; | ||
649 | GNUNET_MQ_impl_send_continue (pr->mq); | ||
650 | } | ||
651 | |||
652 | |||
653 | /** | ||
654 | * Our current client connection went down. Clean it up and try to | ||
655 | * reconnect! | ||
656 | * | ||
657 | * @param h our handle to the core service | ||
658 | */ | ||
659 | static void | ||
660 | reconnect (struct GNUNET_CORE_Handle *h) | ||
661 | { | ||
662 | GNUNET_MQ_hd_fixed_size (init_reply, | ||
663 | GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY, | ||
664 | struct InitReplyMessage); | ||
665 | GNUNET_MQ_hd_fixed_size (connect_notify, | ||
666 | GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT, | ||
667 | struct ConnectNotifyMessage); | ||
668 | GNUNET_MQ_hd_fixed_size (disconnect_notify, | ||
669 | GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT, | ||
670 | struct DisconnectNotifyMessage); | ||
671 | GNUNET_MQ_hd_var_size (notify_inbound, | ||
672 | GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND, | ||
673 | struct NotifyTrafficMessage); | ||
674 | GNUNET_MQ_hd_fixed_size (send_ready, | ||
675 | GNUNET_MESSAGE_TYPE_CORE_SEND_READY, | ||
676 | struct SendMessageReady); | ||
677 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
678 | make_init_reply_handler (h), | ||
679 | make_connect_notify_handler (h), | ||
680 | make_disconnect_notify_handler (h), | ||
681 | make_notify_inbound_handler (h), | ||
682 | make_send_ready_handler (h), | ||
683 | GNUNET_MQ_handler_end () | ||
684 | }; | ||
685 | struct InitMessage *init; | ||
686 | struct GNUNET_MQ_Envelope *env; | ||
687 | uint16_t *ts; | ||
688 | |||
689 | GNUNET_assert (NULL == h->mq); | ||
690 | h->mq = GNUNET_CLIENT_connecT (h->cfg, | ||
691 | "core", | ||
692 | handlers, | ||
693 | &handle_mq_error, | ||
694 | h); | ||
695 | if (NULL == h->mq) | ||
696 | { | ||
697 | reconnect_later (h); | ||
698 | return; | ||
699 | } | ||
700 | env = GNUNET_MQ_msg_extra (init, | ||
701 | sizeof (uint16_t) * h->hcnt, | ||
702 | GNUNET_MESSAGE_TYPE_CORE_INIT); | ||
703 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
704 | "(Re)connecting to CORE service\n"); | ||
705 | init->options = htonl (0); | ||
706 | ts = (uint16_t *) &init[1]; | ||
707 | for (unsigned int hpos = 0; hpos < h->hcnt; hpos++) | ||
708 | ts[hpos] = htons (h->handlers[hpos].type); | ||
709 | GNUNET_MQ_send (h->mq, | ||
710 | env); | ||
711 | } | ||
712 | |||
713 | |||
714 | /** | ||
715 | * Connect to the core service. Note that the connection may complete | ||
716 | * (or fail) asynchronously. | ||
717 | * | ||
718 | * @param cfg configuration to use | ||
719 | * @param cls closure for the various callbacks that follow (including handlers in the handlers array) | ||
720 | * @param init callback to call once we have successfully | ||
721 | * connected to the core service | ||
722 | * @param connects function to call on peer connect, can be NULL | ||
723 | * @param disconnects function to call on peer disconnect / timeout, can be NULL | ||
724 | * @param handlers callbacks for messages we care about, NULL-terminated | ||
725 | * @return handle to the core service (only useful for disconnect until @a init is called); | ||
726 | * NULL on error (in this case, init is never called) | ||
727 | */ | ||
728 | struct GNUNET_CORE_Handle * | ||
729 | GNUNET_CORE_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg, | ||
730 | void *cls, | ||
731 | GNUNET_CORE_StartupCallback init, | ||
732 | GNUNET_CORE_ConnecTEventHandler connects, | ||
733 | GNUNET_CORE_DisconnecTEventHandler disconnects, | ||
734 | const struct GNUNET_MQ_MessageHandler *handlers) | ||
735 | { | ||
736 | struct GNUNET_CORE_Handle *h; | ||
737 | unsigned int hcnt; | ||
738 | |||
739 | h = GNUNET_new (struct GNUNET_CORE_Handle); | ||
740 | h->cfg = cfg; | ||
741 | h->cls = cls; | ||
742 | h->init = init; | ||
743 | h->connects = connects; | ||
744 | h->disconnects = disconnects; | ||
745 | h->peers = GNUNET_CONTAINER_multipeermap_create (128, | ||
746 | GNUNET_NO); | ||
747 | hcnt = 0; | ||
748 | if (NULL != handlers) | ||
749 | while (NULL != handlers[hcnt].cb) | ||
750 | hcnt++; | ||
751 | h->handlers = GNUNET_new_array (hcnt + 1, | ||
752 | struct GNUNET_MQ_MessageHandler); | ||
753 | if (NULL != handlers) | ||
754 | GNUNET_memcpy (h->handlers, | ||
755 | handlers, | ||
756 | hcnt * sizeof (struct GNUNET_MQ_MessageHandler)); | ||
757 | h->hcnt = hcnt; | ||
758 | GNUNET_assert (hcnt < | ||
759 | (GNUNET_SERVER_MAX_MESSAGE_SIZE - | ||
760 | sizeof (struct InitMessage)) / sizeof (uint16_t)); | ||
761 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
762 | "Connecting to CORE service\n"); | ||
763 | reconnect (h); | ||
764 | if (NULL == h->mq) | ||
765 | { | ||
766 | GNUNET_CORE_disconnect (h); | ||
767 | return NULL; | ||
768 | } | ||
769 | return h; | ||
770 | } | ||
771 | |||
772 | |||
773 | /** | ||
774 | * Disconnect from the core service. | ||
775 | * | ||
776 | * @param handle connection to core to disconnect | ||
777 | */ | ||
778 | void | ||
779 | GNUNET_CORE_disconnecT (struct GNUNET_CORE_Handle *handle) | ||
780 | { | ||
781 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
782 | "Disconnecting from CORE service\n"); | ||
783 | GNUNET_CONTAINER_multipeermap_iterate (handle->peers, | ||
784 | &disconnect_and_free_peer_entry, | ||
785 | handle); | ||
786 | GNUNET_CONTAINER_multipeermap_destroy (handle->peers); | ||
787 | handle->peers = NULL; | ||
788 | if (NULL != handle->reconnect_task) | ||
789 | { | ||
790 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); | ||
791 | handle->reconnect_task = NULL; | ||
792 | } | ||
793 | if (NULL != handle->mq) | ||
794 | { | ||
795 | GNUNET_MQ_destroy (handle->mq); | ||
796 | handle->mq = NULL; | ||
797 | } | ||
798 | GNUNET_free (handle->handlers); | ||
799 | GNUNET_free (handle); | ||
800 | } | ||
801 | |||
802 | |||
803 | /** | ||
804 | * Obtain the message queue for a connected peer. | ||
805 | * | ||
806 | * @param h the core handle | ||
807 | * @param pid the identity of the peer to check if it has been connected to us | ||
808 | * @return NULL if peer is not connected | ||
809 | */ | ||
810 | struct GNUNET_MQ_Handle * | ||
811 | GNUNET_CORE_get_mq (const struct GNUNET_CORE_Handle *h, | ||
812 | const struct GNUNET_PeerIdentity *pid) | ||
813 | { | ||
814 | struct PeerRecord *pr; | ||
815 | |||
816 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, | ||
817 | pid); | ||
818 | if (NULL == pr) | ||
819 | return NULL; | ||
820 | return pr->mq; | ||
821 | } | ||
822 | |||
823 | |||
824 | /* end of core_api.c */ | ||