diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-10-06 22:54:21 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-10-06 22:54:21 +0000 |
commit | 14ddd612091d3a894901bdf6213db7487178f6e2 (patch) | |
tree | 30c529c77e87723d2d2798209f4697792cff21fe /src/core | |
parent | 9f7e7d369a842fe1621bc4b9af64ac3e5f609bde (diff) | |
download | gnunet-14ddd612091d3a894901bdf6213db7487178f6e2.tar.gz gnunet-14ddd612091d3a894901bdf6213db7487178f6e2.zip |
MQ for CORE
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/Makefile.am | 10 | ||||
-rw-r--r-- | src/core/core_api.c | 129 |
2 files changed, 138 insertions, 1 deletions
diff --git a/src/core/Makefile.am b/src/core/Makefile.am index e0c34f54c..9a68925b2 100644 --- a/src/core/Makefile.am +++ b/src/core/Makefile.am | |||
@@ -60,7 +60,7 @@ gnunet_core_DEPENDENCIES = \ | |||
60 | libgnunetcore.la | 60 | libgnunetcore.la |
61 | 61 | ||
62 | if HAVE_TESTING | 62 | if HAVE_TESTING |
63 | TESTING_TESTS = test_core_api_send_to_self | 63 | TESTING_TESTS = test_core_api_send_to_self test_core_api_mq |
64 | endif | 64 | endif |
65 | 65 | ||
66 | check_PROGRAMS = \ | 66 | check_PROGRAMS = \ |
@@ -98,6 +98,14 @@ test_core_api_send_to_self_LDADD = \ | |||
98 | $(top_builddir)/src/transport/libgnunettransport.la \ | 98 | $(top_builddir)/src/transport/libgnunettransport.la \ |
99 | $(top_builddir)/src/util/libgnunetutil.la | 99 | $(top_builddir)/src/util/libgnunetutil.la |
100 | 100 | ||
101 | test_core_api_mq_SOURCES = \ | ||
102 | test_core_api_mq.c | ||
103 | test_core_api_mq_LDADD = \ | ||
104 | $(top_builddir)/src/core/libgnunetcore.la \ | ||
105 | $(top_builddir)/src/testing/libgnunettesting.la \ | ||
106 | $(top_builddir)/src/transport/libgnunettransport.la \ | ||
107 | $(top_builddir)/src/util/libgnunetutil.la | ||
108 | |||
101 | test_core_api_start_only_SOURCES = \ | 109 | test_core_api_start_only_SOURCES = \ |
102 | test_core_api_start_only.c | 110 | test_core_api_start_only.c |
103 | test_core_api_start_only_LDADD = \ | 111 | test_core_api_start_only_LDADD = \ |
diff --git a/src/core/core_api.c b/src/core/core_api.c index 7aa3e0519..34c235bbd 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c | |||
@@ -25,6 +25,7 @@ | |||
25 | * @author Christian Grothoff | 25 | * @author Christian Grothoff |
26 | */ | 26 | */ |
27 | #include "platform.h" | 27 | #include "platform.h" |
28 | #include "gnunet_util_lib.h" | ||
28 | #include "gnunet_constants.h" | 29 | #include "gnunet_constants.h" |
29 | #include "gnunet_core_service.h" | 30 | #include "gnunet_core_service.h" |
30 | #include "core.h" | 31 | #include "core.h" |
@@ -144,6 +145,13 @@ struct PeerRecord | |||
144 | 145 | ||
145 | }; | 146 | }; |
146 | 147 | ||
148 | struct CoreMQState | ||
149 | { | ||
150 | struct GNUNET_PeerIdentity target; | ||
151 | struct GNUNET_CORE_Handle *core; | ||
152 | struct GNUNET_CORE_TransmitHandle *th; | ||
153 | }; | ||
154 | |||
147 | 155 | ||
148 | /** | 156 | /** |
149 | * Type of function called upon completion. | 157 | * Type of function called upon completion. |
@@ -1387,4 +1395,125 @@ GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h, | |||
1387 | } | 1395 | } |
1388 | 1396 | ||
1389 | 1397 | ||
1398 | /** | ||
1399 | * Function called to notify a client about the connection | ||
1400 | * begin ready to queue more data. "buf" will be | ||
1401 | * NULL and "size" zero if the connection was closed for | ||
1402 | * writing in the meantime. | ||
1403 | * | ||
1404 | * @param cls closure | ||
1405 | * @param size number of bytes available in buf | ||
1406 | * @param buf where the callee should write the message | ||
1407 | * @return number of bytes written to buf | ||
1408 | */ | ||
1409 | static size_t | ||
1410 | core_mq_ntr (void *cls, size_t size, | ||
1411 | void *buf) | ||
1412 | { | ||
1413 | struct GNUNET_MQ_Handle *mq = cls; | ||
1414 | struct CoreMQState *mqs = GNUNET_MQ_impl_state (mq); | ||
1415 | const struct GNUNET_MessageHeader *mh = GNUNET_MQ_impl_current (mq); | ||
1416 | size_t msg_size = ntohs (mh->size); | ||
1417 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "ntr called (size %u, type %u)\n", | ||
1418 | msg_size, ntohs (mh->type)); | ||
1419 | mqs->th = NULL; | ||
1420 | if (NULL == buf) | ||
1421 | { | ||
1422 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "send error\n"); | ||
1423 | GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE); | ||
1424 | return 0; | ||
1425 | } | ||
1426 | memcpy (buf, mh, msg_size); | ||
1427 | GNUNET_MQ_impl_send_commit (mq); | ||
1428 | GNUNET_MQ_impl_send_continue (mq); | ||
1429 | return msg_size; | ||
1430 | } | ||
1431 | |||
1432 | |||
1433 | /** | ||
1434 | * Signature of functions implementing the | ||
1435 | * sending functionality of a message queue. | ||
1436 | * | ||
1437 | * @param mq the message queue | ||
1438 | * @param msg the message to send | ||
1439 | * @param impl_state state of the implementation | ||
1440 | */ | ||
1441 | static void | ||
1442 | core_mq_send (struct GNUNET_MQ_Handle *mq, | ||
1443 | const struct GNUNET_MessageHeader *msg, | ||
1444 | void *impl_state) | ||
1445 | { | ||
1446 | struct CoreMQState *mqs = impl_state; | ||
1447 | GNUNET_assert (NULL == mqs->th); | ||
1448 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "core-mq", "Sending queued message (size %u)\n", | ||
1449 | ntohs (msg->size)); | ||
1450 | mqs->th = GNUNET_CORE_notify_transmit_ready (mqs->core, GNUNET_YES, 0, | ||
1451 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1452 | &mqs->target, | ||
1453 | ntohs (msg->size), core_mq_ntr, mq); | ||
1454 | } | ||
1455 | |||
1456 | |||
1457 | /** | ||
1458 | * Signature of functions implementing the | ||
1459 | * destruction of a message queue. | ||
1460 | * Implementations must not free @a mq, but should | ||
1461 | * take care of @a impl_state. | ||
1462 | * | ||
1463 | * @param mq the message queue to destroy | ||
1464 | * @param impl_state state of the implementation | ||
1465 | */ | ||
1466 | static void | ||
1467 | core_mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state) | ||
1468 | { | ||
1469 | struct CoreMQState *mqs = impl_state; | ||
1470 | if (NULL != mqs->th) | ||
1471 | { | ||
1472 | GNUNET_CORE_notify_transmit_ready_cancel (mqs->th); | ||
1473 | mqs->th = NULL; | ||
1474 | } | ||
1475 | GNUNET_free (mqs); | ||
1476 | } | ||
1477 | |||
1478 | |||
1479 | /** | ||
1480 | * Implementation function that cancels the currently sent message. | ||
1481 | * | ||
1482 | * @param mq message queue | ||
1483 | * @param impl_state state specific to the implementation | ||
1484 | */ | ||
1485 | static void | ||
1486 | core_mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state) | ||
1487 | { | ||
1488 | struct CoreMQState *mqs = impl_state; | ||
1489 | GNUNET_assert (NULL != mqs->th); | ||
1490 | GNUNET_CORE_notify_transmit_ready_cancel (mqs->th); | ||
1491 | } | ||
1492 | |||
1493 | |||
1494 | /** | ||
1495 | * Create a message queue for sending messages to a peer with CORE. | ||
1496 | * Messages may only be queued with #GNUNET_MQ_send once the init callback has | ||
1497 | * been called for the given handle. | ||
1498 | * There must only be one queue per peer for each core handle. | ||
1499 | * The message queue can only be used to transmit messages, | ||
1500 | * not to receive them. | ||
1501 | * | ||
1502 | * @param h the core handle | ||
1503 | * @param target the target peer for this queue, may not be NULL | ||
1504 | * @return a message queue for sending messages over the core handle | ||
1505 | * to the target peer | ||
1506 | */ | ||
1507 | struct GNUNET_MQ_Handle * | ||
1508 | GNUNET_CORE_mq_create (struct GNUNET_CORE_Handle *h, | ||
1509 | const struct GNUNET_PeerIdentity *target) | ||
1510 | { | ||
1511 | struct CoreMQState *mqs = GNUNET_new (struct CoreMQState); | ||
1512 | mqs->core = h; | ||
1513 | mqs->target = *target; | ||
1514 | return GNUNET_MQ_queue_for_callbacks (core_mq_send, core_mq_destroy, | ||
1515 | core_mq_cancel, mqs, | ||
1516 | NULL, NULL, NULL); | ||
1517 | } | ||
1518 | |||
1390 | /* end of core_api.c */ | 1519 | /* end of core_api.c */ |