aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-10-06 22:54:21 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-10-06 22:54:21 +0000
commit14ddd612091d3a894901bdf6213db7487178f6e2 (patch)
tree30c529c77e87723d2d2798209f4697792cff21fe /src/core
parent9f7e7d369a842fe1621bc4b9af64ac3e5f609bde (diff)
downloadgnunet-14ddd612091d3a894901bdf6213db7487178f6e2.tar.gz
gnunet-14ddd612091d3a894901bdf6213db7487178f6e2.zip
MQ for CORE
Diffstat (limited to 'src/core')
-rw-r--r--src/core/Makefile.am10
-rw-r--r--src/core/core_api.c129
2 files changed, 138 insertions, 1 deletions
diff --git a/src/core/Makefile.am b/src/core/Makefile.am
index e0c34f54c..9a68925b2 100644
--- a/src/core/Makefile.am
+++ b/src/core/Makefile.am
@@ -60,7 +60,7 @@ gnunet_core_DEPENDENCIES = \
60 libgnunetcore.la 60 libgnunetcore.la
61 61
62if HAVE_TESTING 62if HAVE_TESTING
63 TESTING_TESTS = test_core_api_send_to_self 63 TESTING_TESTS = test_core_api_send_to_self test_core_api_mq
64endif 64endif
65 65
66check_PROGRAMS = \ 66check_PROGRAMS = \
@@ -98,6 +98,14 @@ test_core_api_send_to_self_LDADD = \
98 $(top_builddir)/src/transport/libgnunettransport.la \ 98 $(top_builddir)/src/transport/libgnunettransport.la \
99 $(top_builddir)/src/util/libgnunetutil.la 99 $(top_builddir)/src/util/libgnunetutil.la
100 100
101test_core_api_mq_SOURCES = \
102 test_core_api_mq.c
103test_core_api_mq_LDADD = \
104 $(top_builddir)/src/core/libgnunetcore.la \
105 $(top_builddir)/src/testing/libgnunettesting.la \
106 $(top_builddir)/src/transport/libgnunettransport.la \
107 $(top_builddir)/src/util/libgnunetutil.la
108
101test_core_api_start_only_SOURCES = \ 109test_core_api_start_only_SOURCES = \
102 test_core_api_start_only.c 110 test_core_api_start_only.c
103test_core_api_start_only_LDADD = \ 111test_core_api_start_only_LDADD = \
diff --git a/src/core/core_api.c b/src/core/core_api.c
index 7aa3e0519..34c235bbd 100644
--- a/src/core/core_api.c
+++ b/src/core/core_api.c
@@ -25,6 +25,7 @@
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 */ 26 */
27#include "platform.h" 27#include "platform.h"
28#include "gnunet_util_lib.h"
28#include "gnunet_constants.h" 29#include "gnunet_constants.h"
29#include "gnunet_core_service.h" 30#include "gnunet_core_service.h"
30#include "core.h" 31#include "core.h"
@@ -144,6 +145,13 @@ struct PeerRecord
144 145
145}; 146};
146 147
148struct CoreMQState
149{
150 struct GNUNET_PeerIdentity target;
151 struct GNUNET_CORE_Handle *core;
152 struct GNUNET_CORE_TransmitHandle *th;
153};
154
147 155
148/** 156/**
149 * Type of function called upon completion. 157 * Type of function called upon completion.
@@ -1387,4 +1395,125 @@ GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h,
1387} 1395}
1388 1396
1389 1397
1398/**
1399 * Function called to notify a client about the connection
1400 * begin ready to queue more data. "buf" will be
1401 * NULL and "size" zero if the connection was closed for
1402 * writing in the meantime.
1403 *
1404 * @param cls closure
1405 * @param size number of bytes available in buf
1406 * @param buf where the callee should write the message
1407 * @return number of bytes written to buf
1408 */
1409static size_t
1410core_mq_ntr (void *cls, size_t size,
1411 void *buf)
1412{
1413 struct GNUNET_MQ_Handle *mq = cls;
1414 struct CoreMQState *mqs = GNUNET_MQ_impl_state (mq);
1415 const struct GNUNET_MessageHeader *mh = GNUNET_MQ_impl_current (mq);
1416 size_t msg_size = ntohs (mh->size);
1417 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "ntr called (size %u, type %u)\n",
1418 msg_size, ntohs (mh->type));
1419 mqs->th = NULL;
1420 if (NULL == buf)
1421 {
1422 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "send error\n");
1423 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE);
1424 return 0;
1425 }
1426 memcpy (buf, mh, msg_size);
1427 GNUNET_MQ_impl_send_commit (mq);
1428 GNUNET_MQ_impl_send_continue (mq);
1429 return msg_size;
1430}
1431
1432
1433/**
1434 * Signature of functions implementing the
1435 * sending functionality of a message queue.
1436 *
1437 * @param mq the message queue
1438 * @param msg the message to send
1439 * @param impl_state state of the implementation
1440 */
1441static void
1442core_mq_send (struct GNUNET_MQ_Handle *mq,
1443 const struct GNUNET_MessageHeader *msg,
1444 void *impl_state)
1445{
1446 struct CoreMQState *mqs = impl_state;
1447 GNUNET_assert (NULL == mqs->th);
1448 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "Sending queued message (size %u)\n",
1449 ntohs (msg->size));
1450 mqs->th = GNUNET_CORE_notify_transmit_ready (mqs->core, GNUNET_YES, 0,
1451 GNUNET_TIME_UNIT_FOREVER_REL,
1452 &mqs->target,
1453 ntohs (msg->size), core_mq_ntr, mq);
1454}
1455
1456
1457/**
1458 * Signature of functions implementing the
1459 * destruction of a message queue.
1460 * Implementations must not free @a mq, but should
1461 * take care of @a impl_state.
1462 *
1463 * @param mq the message queue to destroy
1464 * @param impl_state state of the implementation
1465 */
1466static void
1467core_mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
1468{
1469 struct CoreMQState *mqs = impl_state;
1470 if (NULL != mqs->th)
1471 {
1472 GNUNET_CORE_notify_transmit_ready_cancel (mqs->th);
1473 mqs->th = NULL;
1474 }
1475 GNUNET_free (mqs);
1476}
1477
1478
1479/**
1480 * Implementation function that cancels the currently sent message.
1481 *
1482 * @param mq message queue
1483 * @param impl_state state specific to the implementation
1484 */
1485static void
1486core_mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
1487{
1488 struct CoreMQState *mqs = impl_state;
1489 GNUNET_assert (NULL != mqs->th);
1490 GNUNET_CORE_notify_transmit_ready_cancel (mqs->th);
1491}
1492
1493
1494/**
1495 * Create a message queue for sending messages to a peer with CORE.
1496 * Messages may only be queued with #GNUNET_MQ_send once the init callback has
1497 * been called for the given handle.
1498 * There must only be one queue per peer for each core handle.
1499 * The message queue can only be used to transmit messages,
1500 * not to receive them.
1501 *
1502 * @param h the core handle
1503 * @param target the target peer for this queue, may not be NULL
1504 * @return a message queue for sending messages over the core handle
1505 * to the target peer
1506 */
1507struct GNUNET_MQ_Handle *
1508GNUNET_CORE_mq_create (struct GNUNET_CORE_Handle *h,
1509 const struct GNUNET_PeerIdentity *target)
1510{
1511 struct CoreMQState *mqs = GNUNET_new (struct CoreMQState);
1512 mqs->core = h;
1513 mqs->target = *target;
1514 return GNUNET_MQ_queue_for_callbacks (core_mq_send, core_mq_destroy,
1515 core_mq_cancel, mqs,
1516 NULL, NULL, NULL);
1517}
1518
1390/* end of core_api.c */ 1519/* end of core_api.c */