aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-30 22:01:43 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-30 22:01:43 +0000
commite9dbabbabb9edd41bb7d7cb907826abdd8edb257 (patch)
tree70c9655d5aa4b68aa5b840507af4959a21f54f4c /src/core
parent33533a0e9955dcadc0857d824aa976e3fe028527 (diff)
downloadgnunet-e9dbabbabb9edd41bb7d7cb907826abdd8edb257.tar.gz
gnunet-e9dbabbabb9edd41bb7d7cb907826abdd8edb257.zip
draft of new core API MQ-based implementation
Diffstat (limited to 'src/core')
-rw-r--r--src/core/Makefile.am1
-rw-r--r--src/core/core_api.c38
-rw-r--r--src/core/core_api_2.c824
3 files changed, 825 insertions, 38 deletions
diff --git a/src/core/Makefile.am b/src/core/Makefile.am
index 3437aa43a..d22c3e01f 100644
--- a/src/core/Makefile.am
+++ b/src/core/Makefile.am
@@ -23,6 +23,7 @@ lib_LTLIBRARIES = \
23 23
24libgnunetcore_la_SOURCES = \ 24libgnunetcore_la_SOURCES = \
25 core_api.c core.h \ 25 core_api.c core.h \
26 core_api_2.c \
26 core_api_mq.c \ 27 core_api_mq.c \
27 core_api_monitor_peers.c 28 core_api_monitor_peers.c
28libgnunetcore_la_LIBADD = \ 29libgnunetcore_la_LIBADD = \
diff --git a/src/core/core_api.c b/src/core/core_api.c
index acdac5fa4..caf614afc 100644
--- a/src/core/core_api.c
+++ b/src/core/core_api.c
@@ -917,44 +917,6 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
917 917
918 918
919/** 919/**
920 * Connect to the core service. Note that the connection may complete
921 * (or fail) asynchronously. This function primarily causes the given
922 * callback notification functions to be invoked whenever the
923 * specified event happens. The maximum number of queued
924 * notifications (queue length) is per client; the queue is shared
925 * across all types of notifications. So a slow client that registers
926 * for @a outbound_notify also risks missing @a inbound_notify messages.
927 * Certain events (such as connect/disconnect notifications) are not
928 * subject to queue size limitations.
929 *
930 * @param cfg configuration to use
931 * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
932 * @param init callback to call once we have successfully
933 * connected to the core service
934 * @param connects function to call on peer connect, can be NULL
935 * @param disconnects function to call on peer disconnect / timeout, can be NULL
936 * @param handlers callbacks for messages we care about, NULL-terminated
937 * note that the core is allowed to drop notifications about inbound
938 * messages if the client does not process them fast enough (for this
939 * notification type, a bounded queue is used)
940 * @return handle to the core service (only useful for disconnect until @a init is called),
941 * NULL on error (in this case, init is never called)
942 */
943struct GNUNET_CORE_Handle *
944GNUNET_CORE_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
945 void *cls,
946 GNUNET_CORE_StartupCallback init,
947 GNUNET_CORE_ConnecTEventHandler connects,
948 GNUNET_CORE_DisconnecTEventHandler disconnects,
949 const struct GNUNET_MQ_MessageHandler *handlers)
950{
951 GNUNET_break (0); // not implemented
952 // NOTE: re-enable core-related tests in ats-tests/ once implemented!
953 return NULL;
954}
955
956
957/**
958 * Disconnect from the core service. This function can only 920 * Disconnect from the core service. This function can only
959 * be called *after* all pending #GNUNET_CORE_notify_transmit_ready() 921 * be called *after* all pending #GNUNET_CORE_notify_transmit_ready()
960 * requests have been explicitly canceled. 922 * requests have been explicitly canceled.
diff --git a/src/core/core_api_2.c b/src/core/core_api_2.c
new file mode 100644
index 000000000..d45c98e93
--- /dev/null
+++ b/src/core/core_api_2.c
@@ -0,0 +1,824 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009-2016 GNUnet e.V.
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
19*/
20/**
21 * @file core/core_api_2.c
22 * @brief core service; this is the main API for encrypted P2P
23 * communications
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_constants.h"
29#include "gnunet_core_service.h"
30#include "core.h"
31
32#define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__)
33
34
35/**
36 * Information we track for each peer.
37 */
38struct PeerRecord
39{
40
41 /**
42 * Corresponding CORE handle.
43 */
44 struct GNUNET_CORE_Handle *h;
45
46 /**
47 * Message queue for the peer.
48 */
49 struct GNUNET_MQ_Handle *mq;
50
51 /**
52 * Message we are currently trying to pass to the CORE service
53 * for this peer (from @e mq).
54 */
55 struct GNUNET_MQ_Envelope *env;
56
57 /**
58 * Value the client returned when we connected, used
59 * as the closure in various places.
60 */
61 void *client_cls;
62
63 /**
64 * Peer the record is about.
65 */
66 struct GNUNET_PeerIdentity peer;
67
68 /**
69 * SendMessageRequest ID generator for this peer.
70 */
71 uint16_t smr_id_gen;
72
73};
74
75
76/**
77 * Context for the core service connection.
78 */
79struct GNUNET_CORE_Handle
80{
81
82 /**
83 * Configuration we're using.
84 */
85 const struct GNUNET_CONFIGURATION_Handle *cfg;
86
87 /**
88 * Closure for the various callbacks.
89 */
90 void *cls;
91
92 /**
93 * Function to call once we've handshaked with the core service.
94 */
95 GNUNET_CORE_StartupCallback init;
96
97 /**
98 * Function to call whenever we're notified about a peer connecting.
99 */
100 GNUNET_CORE_ConnecTEventHandler connects;
101
102 /**
103 * Function to call whenever we're notified about a peer disconnecting.
104 */
105 GNUNET_CORE_DisconnecTEventHandler disconnects;
106
107 /**
108 * Function handlers for messages of particular type.
109 */
110 struct GNUNET_MQ_MessageHandler *handlers;
111
112 /**
113 * Our message queue for transmissions to the service.
114 */
115 struct GNUNET_MQ_Handle *mq;
116
117 /**
118 * Hash map listing all of the peers that we are currently
119 * connected to.
120 */
121 struct GNUNET_CONTAINER_MultiPeerMap *peers;
122
123 /**
124 * Identity of this peer.
125 */
126 struct GNUNET_PeerIdentity me;
127
128 /**
129 * ID of reconnect task (if any).
130 */
131 struct GNUNET_SCHEDULER_Task *reconnect_task;
132
133 /**
134 * Current delay we use for re-trying to connect to core.
135 */
136 struct GNUNET_TIME_Relative retry_backoff;
137
138 /**
139 * Number of entries in the handlers array.
140 */
141 unsigned int hcnt;
142
143 /**
144 * Did we ever get INIT?
145 */
146 int have_init;
147
148};
149
150
151/**
152 * Our current client connection went down. Clean it up
153 * and try to reconnect!
154 *
155 * @param h our handle to the core service
156 */
157static void
158reconnect (struct GNUNET_CORE_Handle *h);
159
160
161/**
162 * Task schedule to try to re-connect to core.
163 *
164 * @param cls the `struct GNUNET_CORE_Handle`
165 * @param tc task context
166 */
167static void
168reconnect_task (void *cls)
169{
170 struct GNUNET_CORE_Handle *h = cls;
171
172 h->reconnect_task = NULL;
173 LOG (GNUNET_ERROR_TYPE_DEBUG,
174 "Connecting to CORE service after delay\n");
175 reconnect (h);
176}
177
178
179/**
180 * Notify clients about disconnect and free the entry for connected
181 * peer.
182 *
183 * @param cls the `struct GNUNET_CORE_Handle *`
184 * @param key the peer identity (not used)
185 * @param value the `struct PeerRecord` to free.
186 * @return #GNUNET_YES (continue)
187 */
188static int
189disconnect_and_free_peer_entry (void *cls,
190 const struct GNUNET_PeerIdentity *key,
191 void *value)
192{
193 struct GNUNET_CORE_Handle *h = cls;
194 struct PeerRecord *pr = value;
195
196 GNUNET_assert (pr->h == h);
197 if (NULL != h->disconnects)
198 h->disconnects (h->cls,
199 &pr->peer,
200 pr->client_cls);
201 GNUNET_assert (GNUNET_YES ==
202 GNUNET_CONTAINER_multipeermap_remove (h->peers,
203 key,
204 pr));
205 GNUNET_MQ_destroy (pr->mq);
206 GNUNET_assert (NULL == pr->mq);
207 GNUNET_free (pr);
208 return GNUNET_YES;
209}
210
211
212/**
213 * Close down any existing connection to the CORE service and
214 * try re-establishing it later.
215 *
216 * @param h our handle
217 */
218static void
219reconnect_later (struct GNUNET_CORE_Handle *h)
220{
221 GNUNET_assert (NULL == h->reconnect_task);
222 if (NULL != h->mq)
223 {
224 GNUNET_MQ_destroy (h->mq);
225 h->mq = NULL;
226 }
227 GNUNET_assert (NULL == h->reconnect_task);
228 h->reconnect_task =
229 GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
230 &reconnect_task,
231 h);
232 GNUNET_CONTAINER_multipeermap_iterate (h->peers,
233 &disconnect_and_free_peer_entry,
234 h);
235 h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
236}
237
238
239/**
240 * Error handler for the message queue to the CORE service.
241 * On errors, we reconnect.
242 *
243 * @param cls closure, a `struct GNUNET_CORE_Handle *`
244 * @param error error code
245 */
246static void
247handle_mq_error (void *cls,
248 enum GNUNET_MQ_Error error)
249{
250 struct GNUNET_CORE_Handle *h = cls;
251
252 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
253 "MQ ERROR: %d\n",
254 error);
255 reconnect_later (h);
256}
257
258
259/**
260 * Implement sending functionality of a message queue for
261 * us sending messages to a peer.
262 *
263 * @param mq the message queue
264 * @param msg the message to send
265 * @param impl_state state of the implementation
266 */
267static void
268core_mq_send_impl (struct GNUNET_MQ_Handle *mq,
269 const struct GNUNET_MessageHeader *msg,
270 void *impl_state)
271{
272 struct PeerRecord *pr = impl_state;
273 struct GNUNET_CORE_Handle *h = pr->h;
274 struct SendMessageRequest *smr;
275 struct SendMessage *sm;
276 struct GNUNET_MQ_Envelope *env;
277 uint16_t msize;
278 int cork
279 = GNUNET_NO; // FIXME
280 enum GNUNET_CORE_Priority priority
281 = GNUNET_CORE_PRIO_BEST_EFFORT; // FIXME
282
283 GNUNET_assert (NULL == pr->env);
284 msize = ntohs (msg->size);
285 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct SendMessage))
286 {
287 GNUNET_break (0);
288 GNUNET_MQ_impl_send_continue (mq);
289 return;
290 }
291 LOG (GNUNET_ERROR_TYPE_DEBUG,
292 "Asking core for transmission of %u bytes to `%s'\n",
293 (unsigned int) msize,
294 GNUNET_i2s (&pr->peer));
295 env = GNUNET_MQ_msg (smr,
296 GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
297 smr->priority = htonl ((uint32_t) priority);
298 // smr->deadline = GNUNET_TIME_absolute_hton (deadline);
299 smr->peer = pr->peer;
300 smr->reserved = htonl (0);
301 smr->size = htons (msize);
302 smr->smr_id = htons (++pr->smr_id_gen);
303 GNUNET_MQ_send (h->mq,
304 env);
305 pr->env = GNUNET_MQ_msg_nested_mh (sm,
306 GNUNET_MESSAGE_TYPE_CORE_SEND,
307 msg);
308 sm->priority = htonl ((uint32_t) priority);
309 // sm->deadline = GNUNET_TIME_absolute_hton (deadline);
310 sm->peer = pr->peer;
311 sm->cork = htonl ((uint32_t) cork);
312 sm->reserved = htonl (0);
313 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
314 "Calling get_message with buffer of %u bytes (%s)\n",
315 (unsigned int) msize,
316 cork ? "corked" : "uncorked");
317}
318
319
320/**
321 * Handle destruction of a message queue. Implementations must not
322 * free @a mq, but should take care of @a impl_state.
323 *
324 * @param mq the message queue to destroy
325 * @param impl_state state of the implementation
326 */
327static void
328core_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
329 void *impl_state)
330{
331 struct PeerRecord *pr = impl_state;
332
333 GNUNET_assert (mq == pr->mq);
334 pr->mq = NULL;
335}
336
337
338/**
339 * Implementation function that cancels the currently sent message.
340 * Should basically undo whatever #mq_send_impl() did.
341 *
342 * @param mq message queue
343 * @param impl_state state specific to the implementation
344 */
345static void
346core_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
347 void *impl_state)
348{
349 struct PeerRecord *pr = impl_state;
350
351 GNUNET_assert (NULL != pr->env);
352 GNUNET_MQ_send_cancel (pr->env);
353 pr->env = NULL;
354}
355
356
357/**
358 * We had an error processing a message we forwarded from a peer to
359 * the CORE service. We should just complain about it but otherwise
360 * continue processing.
361 *
362 * @param cls closure
363 * @param error error code
364 */
365static void
366core_mq_error_handler (void *cls,
367 enum GNUNET_MQ_Error error)
368{
369 /* struct PeerRecord *pr = cls; */
370
371 GNUNET_break_op (0);
372}
373
374
375/**
376 * Add the given peer to the list of our connected peers
377 * and create the respective data structures and notify
378 * the application.
379 *
380 * @param h the core handle
381 * @param peer the peer that is connecting to us
382 */
383static void
384connect_peer (struct GNUNET_CORE_Handle *h,
385 const struct GNUNET_PeerIdentity *peer)
386{
387 struct PeerRecord *pr;
388
389 pr = GNUNET_new (struct PeerRecord);
390 pr->peer = *peer;
391 pr->h = h;
392 GNUNET_assert (GNUNET_YES ==
393 GNUNET_CONTAINER_multipeermap_put (h->peers,
394 &pr->peer,
395 pr,
396 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
397 pr->mq = GNUNET_MQ_queue_for_callbacks (&core_mq_send_impl,
398 &core_mq_destroy_impl,
399 &core_mq_cancel_impl,
400 pr,
401 h->handlers,
402 &core_mq_error_handler,
403 pr);
404 if (NULL != h->connects)
405 {
406 pr->client_cls = h->connects (h->cls,
407 &pr->peer,
408 pr->mq);
409 GNUNET_MQ_set_handlers_closure (pr->mq,
410 pr->client_cls);
411 }
412}
413
414
415/**
416 * Handle init reply message received from CORE service. Notify
417 * application that we are now connected to the CORE. Also fake
418 * loopback connection.
419 *
420 * @param cls the `struct GNUNET_CORE_Handle`
421 * @param m the init reply
422 */
423static void
424handle_init_reply (void *cls,
425 const struct InitReplyMessage *m)
426{
427 struct GNUNET_CORE_Handle *h = cls;
428 GNUNET_CORE_StartupCallback init;
429
430 GNUNET_break (0 == ntohl (m->reserved));
431 h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
432 if (NULL != (init = h->init))
433 {
434 /* mark so we don't call init on reconnect */
435 h->init = NULL;
436 h->me = m->my_identity;
437 LOG (GNUNET_ERROR_TYPE_DEBUG,
438 "Connected to core service of peer `%s'.\n",
439 GNUNET_i2s (&h->me));
440 h->have_init = GNUNET_YES;
441 init (h->cls,
442 &h->me);
443 }
444 else
445 {
446 LOG (GNUNET_ERROR_TYPE_DEBUG,
447 "Successfully reconnected to core service.\n");
448 if (GNUNET_NO == h->have_init)
449 {
450 h->me = m->my_identity;
451 h->have_init = GNUNET_YES;
452 }
453 else
454 {
455 GNUNET_break (0 == memcmp (&h->me,
456 &m->my_identity,
457 sizeof (struct GNUNET_PeerIdentity)));
458 }
459 }
460 /* fake 'connect to self' */
461 connect_peer (h,
462 &h->me);
463}
464
465
466/**
467 * Handle connect message received from CORE service.
468 * Notify the application about the new connection.
469 *
470 * @param cls the `struct GNUNET_CORE_Handle`
471 * @param cnm the connect message
472 */
473static void
474handle_connect_notify (void *cls,
475 const struct ConnectNotifyMessage *cnm)
476{
477 struct GNUNET_CORE_Handle *h = cls;
478 struct PeerRecord *pr;
479
480 LOG (GNUNET_ERROR_TYPE_DEBUG,
481 "Received notification about connection from `%s'.\n",
482 GNUNET_i2s (&cnm->peer));
483 if (0 == memcmp (&h->me,
484 &cnm->peer,
485 sizeof (struct GNUNET_PeerIdentity)))
486 {
487 /* connect to self!? */
488 GNUNET_break (0);
489 return;
490 }
491 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
492 &cnm->peer);
493 if (NULL != pr)
494 {
495 GNUNET_break (0);
496 reconnect_later (h);
497 return;
498 }
499 connect_peer (h,
500 &cnm->peer);
501}
502
503
504/**
505 * Handle disconnect message received from CORE service.
506 * Notify the application about the lost connection.
507 *
508 * @param cls the `struct GNUNET_CORE_Handle`
509 * @param dnm message about the disconnect event
510 */
511static void
512handle_disconnect_notify (void *cls,
513 const struct DisconnectNotifyMessage *dnm)
514{
515 struct GNUNET_CORE_Handle *h = cls;
516 struct PeerRecord *pr;
517
518 if (0 == memcmp (&h->me,
519 &dnm->peer,
520 sizeof (struct GNUNET_PeerIdentity)))
521 {
522 /* disconnect from self!? */
523 GNUNET_break (0);
524 return;
525 }
526 GNUNET_break (0 == ntohl (dnm->reserved));
527 LOG (GNUNET_ERROR_TYPE_DEBUG,
528 "Received notification about disconnect from `%s'.\n",
529 GNUNET_i2s (&dnm->peer));
530 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
531 &dnm->peer);
532 if (NULL == pr)
533 {
534 GNUNET_break (0);
535 reconnect_later (h);
536 return;
537 }
538 disconnect_and_free_peer_entry (h,
539 &pr->peer,
540 pr);
541}
542
543
544/**
545 * Check that message received from CORE service is well-formed.
546 *
547 * @param cls the `struct GNUNET_CORE_Handle`
548 * @param ntm the message we got
549 * @return #GNUNET_OK if the message is well-formed
550 */
551static int
552check_notify_inbound (void *cls,
553 const struct NotifyTrafficMessage *ntm)
554{
555 uint16_t msize;
556 const struct GNUNET_MessageHeader *em;
557
558 msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage);
559 if (msize < sizeof (struct GNUNET_MessageHeader))
560 {
561 GNUNET_break (0);
562 return GNUNET_SYSERR;
563 }
564 em = (const struct GNUNET_MessageHeader *) &ntm[1];
565 if (msize != ntohs (em->size))
566 {
567 GNUNET_break (0);
568 return GNUNET_SYSERR;
569 }
570 return GNUNET_OK;
571}
572
573
574/**
575 * Handle inbound message received from CORE service. If applicable,
576 * notify the application.
577 *
578 * @param cls the `struct GNUNET_CORE_Handle`
579 * @param ntm the message we got from CORE.
580 */
581static void
582handle_notify_inbound (void *cls,
583 const struct NotifyTrafficMessage *ntm)
584{
585 struct GNUNET_CORE_Handle *h = cls;
586 const struct GNUNET_MessageHeader *em;
587 struct PeerRecord *pr;
588
589 LOG (GNUNET_ERROR_TYPE_DEBUG,
590 "Received inbound message from `%s'.\n",
591 GNUNET_i2s (&ntm->peer));
592 em = (const struct GNUNET_MessageHeader *) &ntm[1];
593 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
594 &ntm->peer);
595 if (NULL == pr)
596 {
597 GNUNET_break (0);
598 reconnect_later (h);
599 return;
600 }
601 GNUNET_MQ_inject_message (pr->mq,
602 em);
603}
604
605
606/**
607 * Handle message received from CORE service notifying us that we are
608 * now allowed to send a message to a peer. If that message is still
609 * pending, put it into the queue to be transmitted.
610 *
611 * @param cls the `struct GNUNET_CORE_Handle`
612 * @param smr the message we got
613 */
614static void
615handle_send_ready (void *cls,
616 const struct SendMessageReady *smr)
617{
618 struct GNUNET_CORE_Handle *h = cls;
619 struct PeerRecord *pr;
620
621 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
622 &smr->peer);
623 if (NULL == pr)
624 {
625 GNUNET_break (0);
626 reconnect_later (h);
627 return;
628 }
629 LOG (GNUNET_ERROR_TYPE_DEBUG,
630 "Received notification about transmission readiness to `%s'.\n",
631 GNUNET_i2s (&smr->peer));
632 if (NULL == pr->env)
633 {
634 /* request must have been cancelled between the original request
635 * and the response from CORE, ignore CORE's readiness */
636 return;
637 }
638 if (ntohs (smr->smr_id) != pr->smr_id_gen)
639 {
640 /* READY message is for expired or cancelled message,
641 * ignore! (we should have already sent another request) */
642 return;
643 }
644
645 /* ok, all good, send message out! */
646 GNUNET_MQ_send (h->mq,
647 pr->env);
648 pr->env = NULL;
649 GNUNET_MQ_impl_send_continue (pr->mq);
650}
651
652
653/**
654 * Our current client connection went down. Clean it up and try to
655 * reconnect!
656 *
657 * @param h our handle to the core service
658 */
659static void
660reconnect (struct GNUNET_CORE_Handle *h)
661{
662 GNUNET_MQ_hd_fixed_size (init_reply,
663 GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY,
664 struct InitReplyMessage);
665 GNUNET_MQ_hd_fixed_size (connect_notify,
666 GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT,
667 struct ConnectNotifyMessage);
668 GNUNET_MQ_hd_fixed_size (disconnect_notify,
669 GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT,
670 struct DisconnectNotifyMessage);
671 GNUNET_MQ_hd_var_size (notify_inbound,
672 GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND,
673 struct NotifyTrafficMessage);
674 GNUNET_MQ_hd_fixed_size (send_ready,
675 GNUNET_MESSAGE_TYPE_CORE_SEND_READY,
676 struct SendMessageReady);
677 struct GNUNET_MQ_MessageHandler handlers[] = {
678 make_init_reply_handler (h),
679 make_connect_notify_handler (h),
680 make_disconnect_notify_handler (h),
681 make_notify_inbound_handler (h),
682 make_send_ready_handler (h),
683 GNUNET_MQ_handler_end ()
684 };
685 struct InitMessage *init;
686 struct GNUNET_MQ_Envelope *env;
687 uint16_t *ts;
688
689 GNUNET_assert (NULL == h->mq);
690 h->mq = GNUNET_CLIENT_connecT (h->cfg,
691 "core",
692 handlers,
693 &handle_mq_error,
694 h);
695 if (NULL == h->mq)
696 {
697 reconnect_later (h);
698 return;
699 }
700 env = GNUNET_MQ_msg_extra (init,
701 sizeof (uint16_t) * h->hcnt,
702 GNUNET_MESSAGE_TYPE_CORE_INIT);
703 LOG (GNUNET_ERROR_TYPE_INFO,
704 "(Re)connecting to CORE service\n");
705 init->options = htonl (0);
706 ts = (uint16_t *) &init[1];
707 for (unsigned int hpos = 0; hpos < h->hcnt; hpos++)
708 ts[hpos] = htons (h->handlers[hpos].type);
709 GNUNET_MQ_send (h->mq,
710 env);
711}
712
713
714/**
715 * Connect to the core service. Note that the connection may complete
716 * (or fail) asynchronously.
717 *
718 * @param cfg configuration to use
719 * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
720 * @param init callback to call once we have successfully
721 * connected to the core service
722 * @param connects function to call on peer connect, can be NULL
723 * @param disconnects function to call on peer disconnect / timeout, can be NULL
724 * @param handlers callbacks for messages we care about, NULL-terminated
725 * @return handle to the core service (only useful for disconnect until @a init is called);
726 * NULL on error (in this case, init is never called)
727 */
728struct GNUNET_CORE_Handle *
729GNUNET_CORE_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
730 void *cls,
731 GNUNET_CORE_StartupCallback init,
732 GNUNET_CORE_ConnecTEventHandler connects,
733 GNUNET_CORE_DisconnecTEventHandler disconnects,
734 const struct GNUNET_MQ_MessageHandler *handlers)
735{
736 struct GNUNET_CORE_Handle *h;
737 unsigned int hcnt;
738
739 h = GNUNET_new (struct GNUNET_CORE_Handle);
740 h->cfg = cfg;
741 h->cls = cls;
742 h->init = init;
743 h->connects = connects;
744 h->disconnects = disconnects;
745 h->peers = GNUNET_CONTAINER_multipeermap_create (128,
746 GNUNET_NO);
747 hcnt = 0;
748 if (NULL != handlers)
749 while (NULL != handlers[hcnt].cb)
750 hcnt++;
751 h->handlers = GNUNET_new_array (hcnt + 1,
752 struct GNUNET_MQ_MessageHandler);
753 if (NULL != handlers)
754 GNUNET_memcpy (h->handlers,
755 handlers,
756 hcnt * sizeof (struct GNUNET_MQ_MessageHandler));
757 h->hcnt = hcnt;
758 GNUNET_assert (hcnt <
759 (GNUNET_SERVER_MAX_MESSAGE_SIZE -
760 sizeof (struct InitMessage)) / sizeof (uint16_t));
761 LOG (GNUNET_ERROR_TYPE_DEBUG,
762 "Connecting to CORE service\n");
763 reconnect (h);
764 if (NULL == h->mq)
765 {
766 GNUNET_CORE_disconnect (h);
767 return NULL;
768 }
769 return h;
770}
771
772
773/**
774 * Disconnect from the core service.
775 *
776 * @param handle connection to core to disconnect
777 */
778void
779GNUNET_CORE_disconnecT (struct GNUNET_CORE_Handle *handle)
780{
781 LOG (GNUNET_ERROR_TYPE_DEBUG,
782 "Disconnecting from CORE service\n");
783 GNUNET_CONTAINER_multipeermap_iterate (handle->peers,
784 &disconnect_and_free_peer_entry,
785 handle);
786 GNUNET_CONTAINER_multipeermap_destroy (handle->peers);
787 handle->peers = NULL;
788 if (NULL != handle->reconnect_task)
789 {
790 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
791 handle->reconnect_task = NULL;
792 }
793 if (NULL != handle->mq)
794 {
795 GNUNET_MQ_destroy (handle->mq);
796 handle->mq = NULL;
797 }
798 GNUNET_free (handle->handlers);
799 GNUNET_free (handle);
800}
801
802
803/**
804 * Obtain the message queue for a connected peer.
805 *
806 * @param h the core handle
807 * @param pid the identity of the peer to check if it has been connected to us
808 * @return NULL if peer is not connected
809 */
810struct GNUNET_MQ_Handle *
811GNUNET_CORE_get_mq (const struct GNUNET_CORE_Handle *h,
812 const struct GNUNET_PeerIdentity *pid)
813{
814 struct PeerRecord *pr;
815
816 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
817 pid);
818 if (NULL == pr)
819 return NULL;
820 return pr->mq;
821}
822
823
824/* end of core_api.c */