diff options
author | Christian Grothoff <christian@grothoff.org> | 2014-12-13 19:59:40 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2014-12-13 19:59:40 +0000 |
commit | a4473b58bdd25d7e3531cbc50086e75ae62b769b (patch) | |
tree | 5d6f6968c2340dc122adc9b2246d4d0efc9aa67f | |
parent | 88bc4c05422973f2dc2d1b3ed6fbf1d39c773ffd (diff) | |
download | gnunet-a4473b58bdd25d7e3531cbc50086e75ae62b769b.tar.gz gnunet-a4473b58bdd25d7e3531cbc50086e75ae62b769b.zip |
-split off MQ API into separate C file to reduce visibility of symbols; reindentation, stylistic fixes
-rw-r--r-- | src/core/Makefile.am | 7 | ||||
-rw-r--r-- | src/core/core_api.c | 294 | ||||
-rw-r--r-- | src/core/core_api_mq.c | 191 |
3 files changed, 301 insertions, 191 deletions
diff --git a/src/core/Makefile.am b/src/core/Makefile.am index aac1d5fcc..1ca9a7c33 100644 --- a/src/core/Makefile.am +++ b/src/core/Makefile.am | |||
@@ -22,6 +22,7 @@ lib_LTLIBRARIES = \ | |||
22 | 22 | ||
23 | libgnunetcore_la_SOURCES = \ | 23 | libgnunetcore_la_SOURCES = \ |
24 | core_api.c core.h \ | 24 | core_api.c core.h \ |
25 | core_api_mq.c \ | ||
25 | core_api_monitor_peers.c | 26 | core_api_monitor_peers.c |
26 | libgnunetcore_la_LIBADD = \ | 27 | libgnunetcore_la_LIBADD = \ |
27 | $(top_builddir)/src/util/libgnunetutil.la \ | 28 | $(top_builddir)/src/util/libgnunetutil.la \ |
@@ -32,7 +33,7 @@ libgnunetcore_la_LDFLAGS = \ | |||
32 | 33 | ||
33 | 34 | ||
34 | libexec_PROGRAMS = \ | 35 | libexec_PROGRAMS = \ |
35 | gnunet-service-core | 36 | gnunet-service-core |
36 | 37 | ||
37 | bin_PROGRAMS = \ | 38 | bin_PROGRAMS = \ |
38 | gnunet-core | 39 | gnunet-core |
@@ -52,7 +53,7 @@ gnunet_service_core_LDADD = \ | |||
52 | 53 | ||
53 | 54 | ||
54 | gnunet_core_SOURCES = \ | 55 | gnunet_core_SOURCES = \ |
55 | gnunet-core.c | 56 | gnunet-core.c |
56 | gnunet_core_LDADD = \ | 57 | gnunet_core_LDADD = \ |
57 | libgnunetcore.la \ | 58 | libgnunetcore.la \ |
58 | $(top_builddir)/src/util/libgnunetutil.la | 59 | $(top_builddir)/src/util/libgnunetutil.la |
@@ -80,7 +81,7 @@ test_core_api_SOURCES = \ | |||
80 | test_core_api_LDADD = \ | 81 | test_core_api_LDADD = \ |
81 | libgnunetcore.la \ | 82 | libgnunetcore.la \ |
82 | $(top_builddir)/src/transport/libgnunettransport.la \ | 83 | $(top_builddir)/src/transport/libgnunettransport.la \ |
83 | $(top_builddir)/src/util/libgnunetutil.la | 84 | $(top_builddir)/src/util/libgnunetutil.la |
84 | 85 | ||
85 | test_core_api_reliability_SOURCES = \ | 86 | test_core_api_reliability_SOURCES = \ |
86 | test_core_api_reliability.c | 87 | test_core_api_reliability.c |
diff --git a/src/core/core_api.c b/src/core/core_api.c index 21bc758ff..d03236a24 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | (C) 2009, 2010 Christian Grothoff (and other contributing authors) | 3 | (C) 2009-2014 Christian Grothoff (and other contributing authors) |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -17,7 +17,6 @@ | |||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | 17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
18 | Boston, MA 02111-1307, USA. | 18 | Boston, MA 02111-1307, USA. |
19 | */ | 19 | */ |
20 | |||
21 | /** | 20 | /** |
22 | * @file core/core_api.c | 21 | * @file core/core_api.c |
23 | * @brief core service; this is the main API for encrypted P2P | 22 | * @brief core service; this is the main API for encrypted P2P |
@@ -59,7 +58,7 @@ struct GNUNET_CORE_TransmitHandle | |||
59 | GNUNET_CONNECTION_TransmitReadyNotify get_message; | 58 | GNUNET_CONNECTION_TransmitReadyNotify get_message; |
60 | 59 | ||
61 | /** | 60 | /** |
62 | * Closure for get_message. | 61 | * Closure for @e get_message. |
63 | */ | 62 | */ |
64 | void *get_message_cls; | 63 | void *get_message_cls; |
65 | 64 | ||
@@ -74,6 +73,11 @@ struct GNUNET_CORE_TransmitHandle | |||
74 | enum GNUNET_CORE_Priority priority; | 73 | enum GNUNET_CORE_Priority priority; |
75 | 74 | ||
76 | /** | 75 | /** |
76 | * Is corking allowed? | ||
77 | */ | ||
78 | int cork; | ||
79 | |||
80 | /** | ||
77 | * Size of this request. | 81 | * Size of this request. |
78 | */ | 82 | */ |
79 | uint16_t msize; | 83 | uint16_t msize; |
@@ -83,11 +87,6 @@ struct GNUNET_CORE_TransmitHandle | |||
83 | */ | 87 | */ |
84 | uint16_t smr_id; | 88 | uint16_t smr_id; |
85 | 89 | ||
86 | /** | ||
87 | * Is corking allowed? | ||
88 | */ | ||
89 | int cork; | ||
90 | |||
91 | }; | 90 | }; |
92 | 91 | ||
93 | 92 | ||
@@ -112,11 +111,6 @@ struct PeerRecord | |||
112 | struct PeerRecord *next; | 111 | struct PeerRecord *next; |
113 | 112 | ||
114 | /** | 113 | /** |
115 | * Peer the record is about. | ||
116 | */ | ||
117 | struct GNUNET_PeerIdentity peer; | ||
118 | |||
119 | /** | ||
120 | * Corresponding core handle. | 114 | * Corresponding core handle. |
121 | */ | 115 | */ |
122 | struct GNUNET_CORE_Handle *ch; | 116 | struct GNUNET_CORE_Handle *ch; |
@@ -128,6 +122,11 @@ struct PeerRecord | |||
128 | struct GNUNET_CORE_TransmitHandle th; | 122 | struct GNUNET_CORE_TransmitHandle th; |
129 | 123 | ||
130 | /** | 124 | /** |
125 | * Peer the record is about. | ||
126 | */ | ||
127 | struct GNUNET_PeerIdentity peer; | ||
128 | |||
129 | /** | ||
131 | * ID of timeout task for the 'pending_head' handle | 130 | * ID of timeout task for the 'pending_head' handle |
132 | * which is the one with the smallest timeout. | 131 | * which is the one with the smallest timeout. |
133 | */ | 132 | */ |
@@ -145,25 +144,20 @@ struct PeerRecord | |||
145 | 144 | ||
146 | }; | 145 | }; |
147 | 146 | ||
148 | struct CoreMQState | ||
149 | { | ||
150 | struct GNUNET_PeerIdentity target; | ||
151 | struct GNUNET_CORE_Handle *core; | ||
152 | struct GNUNET_CORE_TransmitHandle *th; | ||
153 | }; | ||
154 | |||
155 | 147 | ||
156 | /** | 148 | /** |
157 | * Type of function called upon completion. | 149 | * Type of function called upon completion. |
158 | * | 150 | * |
159 | * @param cls closure | 151 | * @param cls closure |
160 | * @param success GNUNET_OK on success (which for request_connect | 152 | * @param success #GNUNET_OK on success (which for request_connect |
161 | * ONLY means that we transmitted the connect request to CORE, | 153 | * ONLY means that we transmitted the connect request to CORE, |
162 | * it does not mean that we are actually now connected!); | 154 | * it does not mean that we are actually now connected!); |
163 | * GNUNET_NO on timeout, | 155 | * #GNUNET_NO on timeout, |
164 | * GNUNET_SYSERR if core was shut down | 156 | * #GNUNET_SYSERR if core was shut down |
165 | */ | 157 | */ |
166 | typedef void (*GNUNET_CORE_ControlContinuation) (void *cls, int success); | 158 | typedef void |
159 | (*GNUNET_CORE_ControlContinuation) (void *cls, | ||
160 | int success); | ||
167 | 161 | ||
168 | 162 | ||
169 | /** | 163 | /** |
@@ -345,7 +339,8 @@ reconnect (struct GNUNET_CORE_Handle *h); | |||
345 | * @param tc task context | 339 | * @param tc task context |
346 | */ | 340 | */ |
347 | static void | 341 | static void |
348 | reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 342 | reconnect_task (void *cls, |
343 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
349 | { | 344 | { |
350 | struct GNUNET_CORE_Handle *h = cls; | 345 | struct GNUNET_CORE_Handle *h = cls; |
351 | 346 | ||
@@ -384,10 +379,15 @@ disconnect_and_free_peer_entry (void *cls, | |||
384 | GNUNET_SCHEDULER_cancel (pr->ntr_task); | 379 | GNUNET_SCHEDULER_cancel (pr->ntr_task); |
385 | pr->ntr_task = GNUNET_SCHEDULER_NO_TASK; | 380 | pr->ntr_task = GNUNET_SCHEDULER_NO_TASK; |
386 | } | 381 | } |
387 | if ((NULL != pr->prev) || (NULL != pr->next) || (h->ready_peer_head == pr)) | 382 | if ( (NULL != pr->prev) || |
388 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); | 383 | (NULL != pr->next) || |
384 | (h->ready_peer_head == pr) ) | ||
385 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, | ||
386 | h->ready_peer_tail, | ||
387 | pr); | ||
389 | if (NULL != h->disconnects) | 388 | if (NULL != h->disconnects) |
390 | h->disconnects (h->cls, &pr->peer); | 389 | h->disconnects (h->cls, |
390 | &pr->peer); | ||
391 | /* all requests should have been cancelled, clean up anyway, just in case */ | 391 | /* all requests should have been cancelled, clean up anyway, just in case */ |
392 | th = &pr->th; | 392 | th = &pr->th; |
393 | if (NULL != th->peer) | 393 | if (NULL != th->peer) |
@@ -439,7 +439,8 @@ reconnect_later (struct GNUNET_CORE_Handle *h) | |||
439 | while (NULL != (cm = h->control_pending_head)) | 439 | while (NULL != (cm = h->control_pending_head)) |
440 | { | 440 | { |
441 | GNUNET_CONTAINER_DLL_remove (h->control_pending_head, | 441 | GNUNET_CONTAINER_DLL_remove (h->control_pending_head, |
442 | h->control_pending_tail, cm); | 442 | h->control_pending_tail, |
443 | cm); | ||
443 | if (NULL != cm->th) | 444 | if (NULL != cm->th) |
444 | cm->th->cm = NULL; | 445 | cm->th->cm = NULL; |
445 | if (NULL != cm->cont) | 446 | if (NULL != cm->cont) |
@@ -449,8 +450,10 @@ reconnect_later (struct GNUNET_CORE_Handle *h) | |||
449 | GNUNET_CONTAINER_multipeermap_iterate (h->peers, | 450 | GNUNET_CONTAINER_multipeermap_iterate (h->peers, |
450 | &disconnect_and_free_peer_entry, h); | 451 | &disconnect_and_free_peer_entry, h); |
451 | while (NULL != (pr = h->ready_peer_head)) | 452 | while (NULL != (pr = h->ready_peer_head)) |
452 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); | 453 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, |
453 | GNUNET_assert (h->control_pending_head == NULL); | 454 | h->ready_peer_tail, |
455 | pr); | ||
456 | GNUNET_assert (NULL == h->control_pending_head); | ||
454 | h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); | 457 | h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); |
455 | } | 458 | } |
456 | 459 | ||
@@ -463,7 +466,8 @@ reconnect_later (struct GNUNET_CORE_Handle *h) | |||
463 | * @param ignore_currently_down transmit message even if not initialized? | 466 | * @param ignore_currently_down transmit message even if not initialized? |
464 | */ | 467 | */ |
465 | static void | 468 | static void |
466 | trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down); | 469 | trigger_next_request (struct GNUNET_CORE_Handle *h, |
470 | int ignore_currently_down); | ||
467 | 471 | ||
468 | 472 | ||
469 | /** | 473 | /** |
@@ -507,9 +511,10 @@ request_next_transmission (struct PeerRecord *pr) | |||
507 | return; /* already done */ | 511 | return; /* already done */ |
508 | GNUNET_assert (pr->prev == NULL); | 512 | GNUNET_assert (pr->prev == NULL); |
509 | GNUNET_assert (pr->next == NULL); | 513 | GNUNET_assert (pr->next == NULL); |
510 | pr->timeout_task = | 514 | pr->timeout_task |
511 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining | 515 | = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout), |
512 | (th->timeout), &transmission_timeout, pr); | 516 | &transmission_timeout, |
517 | pr); | ||
513 | cm = GNUNET_malloc (sizeof (struct ControlMessage) + | 518 | cm = GNUNET_malloc (sizeof (struct ControlMessage) + |
514 | sizeof (struct SendMessageRequest)); | 519 | sizeof (struct SendMessageRequest)); |
515 | th->cm = cm; | 520 | th->cm = cm; |
@@ -555,12 +560,16 @@ transmission_timeout (void *cls, | |||
555 | } | 560 | } |
556 | th = &pr->th; | 561 | th = &pr->th; |
557 | th->peer = NULL; | 562 | th->peer = NULL; |
558 | if ((NULL != pr->prev) || (NULL != pr->next) || (pr == h->ready_peer_head)) | 563 | if ( (NULL != pr->prev) || |
564 | (NULL != pr->next) || | ||
565 | (pr == h->ready_peer_head) ) | ||
559 | { | 566 | { |
560 | /* the request that was 'approved' by core was | 567 | /* the request that was 'approved' by core was |
561 | * canceled before it could be transmitted; remove | 568 | * canceled before it could be transmitted; remove |
562 | * us from the 'ready' list */ | 569 | * us from the 'ready' list */ |
563 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); | 570 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, |
571 | h->ready_peer_tail, | ||
572 | pr); | ||
564 | } | 573 | } |
565 | if (NULL != th->cm) | 574 | if (NULL != th->cm) |
566 | { | 575 | { |
@@ -587,7 +596,9 @@ transmission_timeout (void *cls, | |||
587 | * @return number of bytes written to @a buf | 596 | * @return number of bytes written to @a buf |
588 | */ | 597 | */ |
589 | static size_t | 598 | static size_t |
590 | transmit_message (void *cls, size_t size, void *buf) | 599 | transmit_message (void *cls, |
600 | size_t size, | ||
601 | void *buf) | ||
591 | { | 602 | { |
592 | struct GNUNET_CORE_Handle *h = cls; | 603 | struct GNUNET_CORE_Handle *h = cls; |
593 | struct ControlMessage *cm; | 604 | struct ControlMessage *cm; |
@@ -619,7 +630,8 @@ transmit_message (void *cls, size_t size, void *buf) | |||
619 | } | 630 | } |
620 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 631 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
621 | "Transmitting control message with %u bytes of type %u to core.\n", | 632 | "Transmitting control message with %u bytes of type %u to core.\n", |
622 | (unsigned int) msize, (unsigned int) ntohs (hdr->type)); | 633 | (unsigned int) msize, |
634 | (unsigned int) ntohs (hdr->type)); | ||
623 | memcpy (buf, hdr, msize); | 635 | memcpy (buf, hdr, msize); |
624 | GNUNET_CONTAINER_DLL_remove (h->control_pending_head, | 636 | GNUNET_CONTAINER_DLL_remove (h->control_pending_head, |
625 | h->control_pending_tail, cm); | 637 | h->control_pending_tail, cm); |
@@ -641,7 +653,9 @@ transmit_message (void *cls, size_t size, void *buf) | |||
641 | trigger_next_request (h, GNUNET_NO); | 653 | trigger_next_request (h, GNUNET_NO); |
642 | return 0; | 654 | return 0; |
643 | } | 655 | } |
644 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); | 656 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, |
657 | h->ready_peer_tail, | ||
658 | pr); | ||
645 | th->peer = NULL; | 659 | th->peer = NULL; |
646 | if (GNUNET_SCHEDULER_NO_TASK != pr->timeout_task) | 660 | if (GNUNET_SCHEDULER_NO_TASK != pr->timeout_task) |
647 | { | 661 | { |
@@ -650,7 +664,8 @@ transmit_message (void *cls, size_t size, void *buf) | |||
650 | } | 664 | } |
651 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 665 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
652 | "Transmitting SEND request to `%s' with %u bytes.\n", | 666 | "Transmitting SEND request to `%s' with %u bytes.\n", |
653 | GNUNET_i2s (&pr->peer), (unsigned int) th->msize); | 667 | GNUNET_i2s (&pr->peer), |
668 | (unsigned int) th->msize); | ||
654 | sm = (struct SendMessage *) buf; | 669 | sm = (struct SendMessage *) buf; |
655 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); | 670 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); |
656 | sm->priority = htonl ((uint32_t) th->priority); | 671 | sm->priority = htonl ((uint32_t) th->priority); |
@@ -660,11 +675,13 @@ transmit_message (void *cls, size_t size, void *buf) | |||
660 | sm->reserved = htonl (0); | 675 | sm->reserved = htonl (0); |
661 | ret = | 676 | ret = |
662 | th->get_message (th->get_message_cls, | 677 | th->get_message (th->get_message_cls, |
663 | size - sizeof (struct SendMessage), &sm[1]); | 678 | size - sizeof (struct SendMessage), |
679 | &sm[1]); | ||
664 | 680 | ||
665 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 681 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
666 | "Transmitting SEND request to `%s' yielded %u bytes.\n", | 682 | "Transmitting SEND request to `%s' yielded %u bytes.\n", |
667 | GNUNET_i2s (&pr->peer), ret); | 683 | GNUNET_i2s (&pr->peer), |
684 | ret); | ||
668 | if (0 == ret) | 685 | if (0 == ret) |
669 | { | 686 | { |
670 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 687 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -693,8 +710,7 @@ transmit_message (void *cls, size_t size, void *buf) | |||
693 | 710 | ||
694 | 711 | ||
695 | /** | 712 | /** |
696 | * Check the list of pending requests, send the next | 713 | * Check the list of pending requests, send the next one to the core. |
697 | * one to the core. | ||
698 | * | 714 | * |
699 | * @param h core handle | 715 | * @param h core handle |
700 | * @param ignore_currently_down transmit message even if not initialized? | 716 | * @param ignore_currently_down transmit message even if not initialized? |
@@ -705,7 +721,8 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, | |||
705 | { | 721 | { |
706 | uint16_t msize; | 722 | uint16_t msize; |
707 | 723 | ||
708 | if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO)) | 724 | if ( (GNUNET_YES == h->currently_down) && |
725 | (GNUNET_NO == ignore_currently_down) ) | ||
709 | { | 726 | { |
710 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 727 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
711 | "Core connection down, not processing queue\n"); | 728 | "Core connection down, not processing queue\n"); |
@@ -713,7 +730,8 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, | |||
713 | } | 730 | } |
714 | if (NULL != h->cth) | 731 | if (NULL != h->cth) |
715 | { | 732 | { |
716 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Request pending, not processing queue\n"); | 733 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
734 | "Request pending, not processing queue\n"); | ||
717 | return; | 735 | return; |
718 | } | 736 | } |
719 | if (NULL != h->control_pending_head) | 737 | if (NULL != h->control_pending_head) |
@@ -732,7 +750,8 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, | |||
732 | h->cth = | 750 | h->cth = |
733 | GNUNET_CLIENT_notify_transmit_ready (h->client, msize, | 751 | GNUNET_CLIENT_notify_transmit_ready (h->client, msize, |
734 | GNUNET_TIME_UNIT_FOREVER_REL, | 752 | GNUNET_TIME_UNIT_FOREVER_REL, |
735 | GNUNET_NO, &transmit_message, h); | 753 | GNUNET_NO, |
754 | &transmit_message, h); | ||
736 | } | 755 | } |
737 | 756 | ||
738 | 757 | ||
@@ -837,7 +856,9 @@ main_notify_handler (void *cls, | |||
837 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 856 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
838 | "Received notification about connection from `%s'.\n", | 857 | "Received notification about connection from `%s'.\n", |
839 | GNUNET_i2s (&cnm->peer)); | 858 | GNUNET_i2s (&cnm->peer)); |
840 | if (0 == memcmp (&h->me, &cnm->peer, sizeof (struct GNUNET_PeerIdentity))) | 859 | if (0 == memcmp (&h->me, |
860 | &cnm->peer, | ||
861 | sizeof (struct GNUNET_PeerIdentity))) | ||
841 | { | 862 | { |
842 | /* connect to self!? */ | 863 | /* connect to self!? */ |
843 | GNUNET_break (0); | 864 | GNUNET_break (0); |
@@ -868,7 +889,9 @@ main_notify_handler (void *cls, | |||
868 | return; | 889 | return; |
869 | } | 890 | } |
870 | dnm = (const struct DisconnectNotifyMessage *) msg; | 891 | dnm = (const struct DisconnectNotifyMessage *) msg; |
871 | if (0 == memcmp (&h->me, &dnm->peer, sizeof (struct GNUNET_PeerIdentity))) | 892 | if (0 == memcmp (&h->me, |
893 | &dnm->peer, | ||
894 | sizeof (struct GNUNET_PeerIdentity))) | ||
872 | { | 895 | { |
873 | /* connection to self!? */ | 896 | /* connection to self!? */ |
874 | GNUNET_break (0); | 897 | GNUNET_break (0); |
@@ -993,7 +1016,8 @@ main_notify_handler (void *cls, | |||
993 | return; | 1016 | return; |
994 | } | 1017 | } |
995 | smr = (const struct SendMessageReady *) msg; | 1018 | smr = (const struct SendMessageReady *) msg; |
996 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &smr->peer); | 1019 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, |
1020 | &smr->peer); | ||
997 | if (NULL == pr) | 1021 | if (NULL == pr) |
998 | { | 1022 | { |
999 | GNUNET_break (0); | 1023 | GNUNET_break (0); |
@@ -1017,21 +1041,26 @@ main_notify_handler (void *cls, | |||
1017 | * ignore! (we should have already sent another request) */ | 1041 | * ignore! (we should have already sent another request) */ |
1018 | break; | 1042 | break; |
1019 | } | 1043 | } |
1020 | if ((NULL != pr->prev) || (NULL != pr->next) || (h->ready_peer_head == pr)) | 1044 | if ( (NULL != pr->prev) || |
1045 | (NULL != pr->next) || | ||
1046 | (h->ready_peer_head == pr) ) | ||
1021 | { | 1047 | { |
1022 | /* we should not already be on the ready list... */ | 1048 | /* we should not already be on the ready list... */ |
1023 | GNUNET_break (0); | 1049 | GNUNET_break (0); |
1024 | reconnect_later (h); | 1050 | reconnect_later (h); |
1025 | return; | 1051 | return; |
1026 | } | 1052 | } |
1027 | GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, h->ready_peer_tail, pr); | 1053 | GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, |
1054 | h->ready_peer_tail, | ||
1055 | pr); | ||
1028 | trigger_next_request (h, GNUNET_NO); | 1056 | trigger_next_request (h, GNUNET_NO); |
1029 | break; | 1057 | break; |
1030 | default: | 1058 | default: |
1031 | reconnect_later (h); | 1059 | reconnect_later (h); |
1032 | return; | 1060 | return; |
1033 | } | 1061 | } |
1034 | GNUNET_CLIENT_receive (h->client, &main_notify_handler, h, | 1062 | GNUNET_CLIENT_receive (h->client, |
1063 | &main_notify_handler, h, | ||
1035 | GNUNET_TIME_UNIT_FOREVER_REL); | 1064 | GNUNET_TIME_UNIT_FOREVER_REL); |
1036 | } | 1065 | } |
1037 | 1066 | ||
@@ -1058,7 +1087,8 @@ init_done_task (void *cls, int success) | |||
1058 | reconnect_later (h); | 1087 | reconnect_later (h); |
1059 | return; | 1088 | return; |
1060 | } | 1089 | } |
1061 | GNUNET_CLIENT_receive (h->client, &main_notify_handler, h, | 1090 | GNUNET_CLIENT_receive (h->client, |
1091 | &main_notify_handler, h, | ||
1062 | GNUNET_TIME_UNIT_FOREVER_REL); | 1092 | GNUNET_TIME_UNIT_FOREVER_REL); |
1063 | } | 1093 | } |
1064 | 1094 | ||
@@ -1096,14 +1126,14 @@ reconnect (struct GNUNET_CORE_Handle *h) | |||
1096 | init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT); | 1126 | init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT); |
1097 | init->header.size = htons (msize); | 1127 | init->header.size = htons (msize); |
1098 | opt = 0; | 1128 | opt = 0; |
1099 | if (h->inbound_notify != NULL) | 1129 | if (NULL != h->inbound_notify) |
1100 | { | 1130 | { |
1101 | if (h->inbound_hdr_only) | 1131 | if (h->inbound_hdr_only) |
1102 | opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND; | 1132 | opt |= GNUNET_CORE_OPTION_SEND_HDR_INBOUND; |
1103 | else | 1133 | else |
1104 | opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND; | 1134 | opt |= GNUNET_CORE_OPTION_SEND_FULL_INBOUND; |
1105 | } | 1135 | } |
1106 | if (h->outbound_notify != NULL) | 1136 | if (NULL != h->outbound_notify) |
1107 | { | 1137 | { |
1108 | if (h->outbound_hdr_only) | 1138 | if (h->outbound_hdr_only) |
1109 | opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND; | 1139 | opt |= GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND; |
@@ -1118,7 +1148,8 @@ reconnect (struct GNUNET_CORE_Handle *h) | |||
1118 | ts = (uint16_t *) & init[1]; | 1148 | ts = (uint16_t *) & init[1]; |
1119 | for (hpos = 0; hpos < h->hcnt; hpos++) | 1149 | for (hpos = 0; hpos < h->hcnt; hpos++) |
1120 | ts[hpos] = htons (h->handlers[hpos].type); | 1150 | ts[hpos] = htons (h->handlers[hpos].type); |
1121 | GNUNET_CONTAINER_DLL_insert (h->control_pending_head, h->control_pending_tail, | 1151 | GNUNET_CONTAINER_DLL_insert (h->control_pending_head, |
1152 | h->control_pending_tail, | ||
1122 | cm); | 1153 | cm); |
1123 | trigger_next_request (h, GNUNET_YES); | 1154 | trigger_next_request (h, GNUNET_YES); |
1124 | } | 1155 | } |
@@ -1202,8 +1233,8 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) | |||
1202 | struct ControlMessage *cm; | 1233 | struct ControlMessage *cm; |
1203 | 1234 | ||
1204 | GNUNET_assert (NULL != handle); | 1235 | GNUNET_assert (NULL != handle); |
1205 | 1236 | LOG (GNUNET_ERROR_TYPE_DEBUG, | |
1206 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n"); | 1237 | "Disconnecting from CORE service\n"); |
1207 | if (NULL != handle->cth) | 1238 | if (NULL != handle->cth) |
1208 | { | 1239 | { |
1209 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth); | 1240 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth); |
@@ -1212,7 +1243,8 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) | |||
1212 | while (NULL != (cm = handle->control_pending_head)) | 1243 | while (NULL != (cm = handle->control_pending_head)) |
1213 | { | 1244 | { |
1214 | GNUNET_CONTAINER_DLL_remove (handle->control_pending_head, | 1245 | GNUNET_CONTAINER_DLL_remove (handle->control_pending_head, |
1215 | handle->control_pending_tail, cm); | 1246 | handle->control_pending_tail, |
1247 | cm); | ||
1216 | if (NULL != cm->th) | 1248 | if (NULL != cm->th) |
1217 | cm->th->cm = NULL; | 1249 | cm->th->cm = NULL; |
1218 | if (NULL != cm->cont) | 1250 | if (NULL != cm->cont) |
@@ -1276,7 +1308,7 @@ run_request_next_transmission (void *cls, | |||
1276 | * will be called with NULL on timeout; clients MUST cancel | 1308 | * will be called with NULL on timeout; clients MUST cancel |
1277 | * all pending transmission requests DURING the disconnect | 1309 | * all pending transmission requests DURING the disconnect |
1278 | * handler | 1310 | * handler |
1279 | * @param notify_cls closure for notify | 1311 | * @param notify_cls closure for @a notify |
1280 | * @return non-NULL if the notify callback was queued, | 1312 | * @return non-NULL if the notify callback was queued, |
1281 | * NULL if we can not even queue the request (request already pending); | 1313 | * NULL if we can not even queue the request (request already pending); |
1282 | * if NULL is returned, @a notify will NOT be called. | 1314 | * if NULL is returned, @a notify will NOT be called. |
@@ -1304,7 +1336,8 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, | |||
1304 | "Asking core for transmission of %u bytes to `%s'\n", | 1336 | "Asking core for transmission of %u bytes to `%s'\n", |
1305 | (unsigned int) notify_size, | 1337 | (unsigned int) notify_size, |
1306 | GNUNET_i2s (target)); | 1338 | GNUNET_i2s (target)); |
1307 | pr = GNUNET_CONTAINER_multipeermap_get (handle->peers, target); | 1339 | pr = GNUNET_CONTAINER_multipeermap_get (handle->peers, |
1340 | target); | ||
1308 | if (NULL == pr) | 1341 | if (NULL == pr) |
1309 | { | 1342 | { |
1310 | /* attempt to send to peer that is not connected */ | 1343 | /* attempt to send to peer that is not connected */ |
@@ -1360,16 +1393,21 @@ GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) | |||
1360 | { | 1393 | { |
1361 | /* we're currently in the control queue, remove */ | 1394 | /* we're currently in the control queue, remove */ |
1362 | GNUNET_CONTAINER_DLL_remove (h->control_pending_head, | 1395 | GNUNET_CONTAINER_DLL_remove (h->control_pending_head, |
1363 | h->control_pending_tail, th->cm); | 1396 | h->control_pending_tail, |
1397 | th->cm); | ||
1364 | GNUNET_free (th->cm); | 1398 | GNUNET_free (th->cm); |
1365 | th->cm = NULL; | 1399 | th->cm = NULL; |
1366 | } | 1400 | } |
1367 | if ((NULL != pr->prev) || (NULL != pr->next) || (pr == h->ready_peer_head)) | 1401 | if ( (NULL != pr->prev) || |
1402 | (NULL != pr->next) || | ||
1403 | (pr == h->ready_peer_head) ) | ||
1368 | { | 1404 | { |
1369 | /* the request that was 'approved' by core was | 1405 | /* the request that was 'approved' by core was |
1370 | * canceled before it could be transmitted; remove | 1406 | * canceled before it could be transmitted; remove |
1371 | * us from the 'ready' list */ | 1407 | * us from the 'ready' list */ |
1372 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); | 1408 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, |
1409 | h->ready_peer_tail, | ||
1410 | pr); | ||
1373 | } | 1411 | } |
1374 | if (GNUNET_SCHEDULER_NO_TASK != pr->ntr_task) | 1412 | if (GNUNET_SCHEDULER_NO_TASK != pr->ntr_task) |
1375 | { | 1413 | { |
@@ -1383,7 +1421,7 @@ GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) | |||
1383 | * Check if the given peer is currently connected. This function is for special | 1421 | * Check if the given peer is currently connected. This function is for special |
1384 | * cirumstances (GNUNET_TESTBED uses it), normal users of the CORE API are | 1422 | * cirumstances (GNUNET_TESTBED uses it), normal users of the CORE API are |
1385 | * expected to track which peers are connected based on the connect/disconnect | 1423 | * expected to track which peers are connected based on the connect/disconnect |
1386 | * callbacks from GNUNET_CORE_connect. This function is NOT part of the | 1424 | * callbacks from #GNUNET_CORE_connect(). This function is NOT part of the |
1387 | * 'versioned', 'official' API. The difference between this function and the | 1425 | * 'versioned', 'official' API. The difference between this function and the |
1388 | * function GNUNET_CORE_is_peer_connected() is that this one returns | 1426 | * function GNUNET_CORE_is_peer_connected() is that this one returns |
1389 | * synchronously after looking in the CORE API cache. The function | 1427 | * synchronously after looking in the CORE API cache. The function |
@@ -1392,7 +1430,7 @@ GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) | |||
1392 | * | 1430 | * |
1393 | * @param h the core handle | 1431 | * @param h the core handle |
1394 | * @param pid the identity of the peer to check if it has been connected to us | 1432 | * @param pid the identity of the peer to check if it has been connected to us |
1395 | * @return GNUNET_YES if the peer is connected to us; GNUNET_NO if not | 1433 | * @return #GNUNET_YES if the peer is connected to us; #GNUNET_NO if not |
1396 | */ | 1434 | */ |
1397 | int | 1435 | int |
1398 | GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h, | 1436 | GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h, |
@@ -1404,124 +1442,4 @@ GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h, | |||
1404 | } | 1442 | } |
1405 | 1443 | ||
1406 | 1444 | ||
1407 | /** | ||
1408 | * Function called to notify a client about the connection | ||
1409 | * begin ready to queue more data. "buf" will be | ||
1410 | * NULL and "size" zero if the connection was closed for | ||
1411 | * writing in the meantime. | ||
1412 | * | ||
1413 | * @param cls closure | ||
1414 | * @param size number of bytes available in buf | ||
1415 | * @param buf where the callee should write the message | ||
1416 | * @return number of bytes written to buf | ||
1417 | */ | ||
1418 | static size_t | ||
1419 | core_mq_ntr (void *cls, size_t size, | ||
1420 | void *buf) | ||
1421 | { | ||
1422 | struct GNUNET_MQ_Handle *mq = cls; | ||
1423 | struct CoreMQState *mqs = GNUNET_MQ_impl_state (mq); | ||
1424 | const struct GNUNET_MessageHeader *mh = GNUNET_MQ_impl_current (mq); | ||
1425 | size_t msg_size = ntohs (mh->size); | ||
1426 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "ntr called (size %u, type %u)\n", | ||
1427 | msg_size, ntohs (mh->type)); | ||
1428 | mqs->th = NULL; | ||
1429 | if (NULL == buf) | ||
1430 | { | ||
1431 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "send error\n"); | ||
1432 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE); | ||
1433 | return 0; | ||
1434 | } | ||
1435 | memcpy (buf, mh, msg_size); | ||
1436 | GNUNET_MQ_impl_send_continue (mq); | ||
1437 | return msg_size; | ||
1438 | } | ||
1439 | |||
1440 | |||
1441 | /** | ||
1442 | * Signature of functions implementing the | ||
1443 | * sending functionality of a message queue. | ||
1444 | * | ||
1445 | * @param mq the message queue | ||
1446 | * @param msg the message to send | ||
1447 | * @param impl_state state of the implementation | ||
1448 | */ | ||
1449 | static void | ||
1450 | core_mq_send (struct GNUNET_MQ_Handle *mq, | ||
1451 | const struct GNUNET_MessageHeader *msg, | ||
1452 | void *impl_state) | ||
1453 | { | ||
1454 | struct CoreMQState *mqs = impl_state; | ||
1455 | GNUNET_assert (NULL == mqs->th); | ||
1456 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "Sending queued message (size %u)\n", | ||
1457 | ntohs (msg->size)); | ||
1458 | mqs->th = GNUNET_CORE_notify_transmit_ready (mqs->core, GNUNET_YES, 0, | ||
1459 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1460 | &mqs->target, | ||
1461 | ntohs (msg->size), core_mq_ntr, mq); | ||
1462 | } | ||
1463 | |||
1464 | |||
1465 | /** | ||
1466 | * Signature of functions implementing the | ||
1467 | * destruction of a message queue. | ||
1468 | * Implementations must not free @a mq, but should | ||
1469 | * take care of @a impl_state. | ||
1470 | * | ||
1471 | * @param mq the message queue to destroy | ||
1472 | * @param impl_state state of the implementation | ||
1473 | */ | ||
1474 | static void | ||
1475 | core_mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state) | ||
1476 | { | ||
1477 | struct CoreMQState *mqs = impl_state; | ||
1478 | if (NULL != mqs->th) | ||
1479 | { | ||
1480 | GNUNET_CORE_notify_transmit_ready_cancel (mqs->th); | ||
1481 | mqs->th = NULL; | ||
1482 | } | ||
1483 | GNUNET_free (mqs); | ||
1484 | } | ||
1485 | |||
1486 | |||
1487 | /** | ||
1488 | * Implementation function that cancels the currently sent message. | ||
1489 | * | ||
1490 | * @param mq message queue | ||
1491 | * @param impl_state state specific to the implementation | ||
1492 | */ | ||
1493 | static void | ||
1494 | core_mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state) | ||
1495 | { | ||
1496 | struct CoreMQState *mqs = impl_state; | ||
1497 | GNUNET_assert (NULL != mqs->th); | ||
1498 | GNUNET_CORE_notify_transmit_ready_cancel (mqs->th); | ||
1499 | } | ||
1500 | |||
1501 | |||
1502 | /** | ||
1503 | * Create a message queue for sending messages to a peer with CORE. | ||
1504 | * Messages may only be queued with #GNUNET_MQ_send once the init callback has | ||
1505 | * been called for the given handle. | ||
1506 | * There must only be one queue per peer for each core handle. | ||
1507 | * The message queue can only be used to transmit messages, | ||
1508 | * not to receive them. | ||
1509 | * | ||
1510 | * @param h the core handle | ||
1511 | * @param target the target peer for this queue, may not be NULL | ||
1512 | * @return a message queue for sending messages over the core handle | ||
1513 | * to the target peer | ||
1514 | */ | ||
1515 | struct GNUNET_MQ_Handle * | ||
1516 | GNUNET_CORE_mq_create (struct GNUNET_CORE_Handle *h, | ||
1517 | const struct GNUNET_PeerIdentity *target) | ||
1518 | { | ||
1519 | struct CoreMQState *mqs = GNUNET_new (struct CoreMQState); | ||
1520 | mqs->core = h; | ||
1521 | mqs->target = *target; | ||
1522 | return GNUNET_MQ_queue_for_callbacks (core_mq_send, core_mq_destroy, | ||
1523 | core_mq_cancel, mqs, | ||
1524 | NULL, NULL, NULL); | ||
1525 | } | ||
1526 | |||
1527 | /* end of core_api.c */ | 1445 | /* end of core_api.c */ |
diff --git a/src/core/core_api_mq.c b/src/core/core_api_mq.c new file mode 100644 index 000000000..b95215115 --- /dev/null +++ b/src/core/core_api_mq.c | |||
@@ -0,0 +1,191 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) 2009-2014 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | /** | ||
21 | * @file core/core_api_mq.c | ||
22 | * @brief MQ support for core service | ||
23 | * @author Christian Grothoff | ||
24 | * @author Florian Dold | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet_util_lib.h" | ||
28 | #include "gnunet_constants.h" | ||
29 | #include "gnunet_core_service.h" | ||
30 | #include "core.h" | ||
31 | |||
32 | #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__) | ||
33 | |||
34 | |||
35 | /** | ||
36 | * Internal state of a GNUNET-MQ queue for CORE. | ||
37 | */ | ||
38 | struct CoreMQState | ||
39 | { | ||
40 | /** | ||
41 | * Which peer does this queue target? | ||
42 | */ | ||
43 | struct GNUNET_PeerIdentity target; | ||
44 | |||
45 | /** | ||
46 | * Handle to the CORE service used by this MQ. | ||
47 | */ | ||
48 | struct GNUNET_CORE_Handle *core; | ||
49 | |||
50 | /** | ||
51 | * Transmission handle (if in use). | ||
52 | */ | ||
53 | struct GNUNET_CORE_TransmitHandle *th; | ||
54 | }; | ||
55 | |||
56 | |||
57 | /** | ||
58 | * Function called to notify a client about the connection | ||
59 | * begin ready to queue more data. @a buf will be | ||
60 | * NULL and @a size zero if the connection was closed for | ||
61 | * writing in the meantime. | ||
62 | * | ||
63 | * @param cls closure | ||
64 | * @param size number of bytes available in @a buf | ||
65 | * @param buf where the callee should write the message | ||
66 | * @return number of bytes written to @a buf | ||
67 | */ | ||
68 | static size_t | ||
69 | core_mq_ntr (void *cls, size_t size, | ||
70 | void *buf) | ||
71 | { | ||
72 | struct GNUNET_MQ_Handle *mq = cls; | ||
73 | struct CoreMQState *mqs = GNUNET_MQ_impl_state (mq); | ||
74 | const struct GNUNET_MessageHeader *mh = GNUNET_MQ_impl_current (mq); | ||
75 | size_t msg_size = ntohs (mh->size); | ||
76 | |||
77 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
78 | "ntr called (size %u, type %u)\n", | ||
79 | msg_size, | ||
80 | ntohs (mh->type)); | ||
81 | mqs->th = NULL; | ||
82 | if (NULL == buf) | ||
83 | { | ||
84 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
85 | "send error\n"); | ||
86 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE); | ||
87 | return 0; | ||
88 | } | ||
89 | memcpy (buf, mh, msg_size); | ||
90 | GNUNET_MQ_impl_send_continue (mq); | ||
91 | return msg_size; | ||
92 | } | ||
93 | |||
94 | |||
95 | /** | ||
96 | * Signature of functions implementing the | ||
97 | * sending functionality of a message queue. | ||
98 | * | ||
99 | * @param mq the message queue | ||
100 | * @param msg the message to send | ||
101 | * @param impl_state state of the implementation | ||
102 | */ | ||
103 | static void | ||
104 | core_mq_send (struct GNUNET_MQ_Handle *mq, | ||
105 | const struct GNUNET_MessageHeader *msg, | ||
106 | void *impl_state) | ||
107 | { | ||
108 | struct CoreMQState *mqs = impl_state; | ||
109 | |||
110 | GNUNET_assert (NULL == mqs->th); | ||
111 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
112 | "Sending queued message (size %u)\n", | ||
113 | ntohs (msg->size)); | ||
114 | mqs->th = GNUNET_CORE_notify_transmit_ready (mqs->core, GNUNET_YES, 0, | ||
115 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
116 | &mqs->target, | ||
117 | ntohs (msg->size), | ||
118 | &core_mq_ntr, mq); | ||
119 | } | ||
120 | |||
121 | |||
122 | /** | ||
123 | * Signature of functions implementing the | ||
124 | * destruction of a message queue. | ||
125 | * Implementations must not free @a mq, but should | ||
126 | * take care of @a impl_state. | ||
127 | * | ||
128 | * @param mq the message queue to destroy | ||
129 | * @param impl_state state of the implementation | ||
130 | */ | ||
131 | static void | ||
132 | core_mq_destroy (struct GNUNET_MQ_Handle *mq, | ||
133 | void *impl_state) | ||
134 | { | ||
135 | struct CoreMQState *mqs = impl_state; | ||
136 | |||
137 | if (NULL != mqs->th) | ||
138 | { | ||
139 | GNUNET_CORE_notify_transmit_ready_cancel (mqs->th); | ||
140 | mqs->th = NULL; | ||
141 | } | ||
142 | GNUNET_free (mqs); | ||
143 | } | ||
144 | |||
145 | |||
146 | /** | ||
147 | * Implementation function that cancels the currently sent message. | ||
148 | * | ||
149 | * @param mq message queue | ||
150 | * @param impl_state state specific to the implementation | ||
151 | */ | ||
152 | static void | ||
153 | core_mq_cancel (struct GNUNET_MQ_Handle *mq, | ||
154 | void *impl_state) | ||
155 | { | ||
156 | struct CoreMQState *mqs = impl_state; | ||
157 | |||
158 | GNUNET_assert (NULL != mqs->th); | ||
159 | GNUNET_CORE_notify_transmit_ready_cancel (mqs->th); | ||
160 | } | ||
161 | |||
162 | |||
163 | /** | ||
164 | * Create a message queue for sending messages to a peer with CORE. | ||
165 | * Messages may only be queued with #GNUNET_MQ_send once the init callback has | ||
166 | * been called for the given handle. | ||
167 | * There must only be one queue per peer for each core handle. | ||
168 | * The message queue can only be used to transmit messages, | ||
169 | * not to receive them. | ||
170 | * | ||
171 | * @param h the core handle | ||
172 | * @param target the target peer for this queue, may not be NULL | ||
173 | * @return a message queue for sending messages over the core handle | ||
174 | * to the target peer | ||
175 | */ | ||
176 | struct GNUNET_MQ_Handle * | ||
177 | GNUNET_CORE_mq_create (struct GNUNET_CORE_Handle *h, | ||
178 | const struct GNUNET_PeerIdentity *target) | ||
179 | { | ||
180 | struct CoreMQState *mqs = GNUNET_new (struct CoreMQState); | ||
181 | |||
182 | mqs->core = h; | ||
183 | mqs->target = *target; | ||
184 | return GNUNET_MQ_queue_for_callbacks (&core_mq_send, | ||
185 | &core_mq_destroy, | ||
186 | &core_mq_cancel, | ||
187 | mqs, | ||
188 | NULL, NULL, NULL); | ||
189 | } | ||
190 | |||
191 | /* end of core_api_mq.c */ | ||