aboutsummaryrefslogtreecommitdiff
path: root/src/service/transport/transport-testing-communicator.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/transport/transport-testing-communicator.c')
-rw-r--r--src/service/transport/transport-testing-communicator.c1251
1 files changed, 1251 insertions, 0 deletions
diff --git a/src/service/transport/transport-testing-communicator.c b/src/service/transport/transport-testing-communicator.c
new file mode 100644
index 000000000..9ee70fe7b
--- /dev/null
+++ b/src/service/transport/transport-testing-communicator.c
@@ -0,0 +1,1251 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file transport/transport-testing-communicator.c
23 * @brief functions related to testing-tng
24 * @author Christian Grothoff
25 * @author Julius Bünger
26 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
29#include "gnunet_protocols.h"
30#include "gnunet_constants.h"
31#include "transport-testing-communicator.h"
32#include "gnunet_signatures.h"
33#include "transport.h"
34#include "gnunet_hello_uri_lib.h"
35#include <inttypes.h>
36
37#define LOG(kind, ...) GNUNET_log_from (kind, "transport-testing2", __VA_ARGS__)
38
39struct MyClient
40{
41 struct MyClient *prev;
42 struct MyClient *next;
43 /**
44 * @brief Handle to the client
45 */
46 struct GNUNET_SERVICE_Client *client;
47
48 /**
49 * @brief Handle to the client
50 */
51 struct GNUNET_MQ_Handle *c_mq;
52
53 /**
54 * The TCH
55 */
56 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc;
57
58};
59
60/**
61 * @brief Queue of a communicator and some context
62 */
63struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
64{
65 /**
66 * @brief Handle to the TransportCommunicator
67 */
68 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h;
69
70 /**
71 * @brief Envelope to a message that requests the opening of the queue.
72 *
73 * If the client already requests queue(s), but the communicator is not yet
74 * connected, we cannot send the request to open the queue. Save it until the
75 * communicator becomes available and send it then.
76 */
77 struct GNUNET_MQ_Envelope *open_queue_env;
78
79 /**
80 * @brief Peer ID of the peer on the other side of the queue
81 */
82 struct GNUNET_PeerIdentity peer_id;
83
84 /**
85 * @brief Queue ID
86 */
87 uint32_t qid;
88
89 /**
90 * @brief Current message id
91 */
92 uint64_t mid;
93
94 /**
95 * An `enum GNUNET_NetworkType` in NBO.
96 */
97 uint32_t nt;
98
99 /**
100 * Maximum transmission unit. UINT32_MAX for unlimited.
101 */
102 uint32_t mtu;
103
104 /**
105 * Queue length. UINT64_MAX for unlimited.
106 */
107 uint64_t q_len;
108
109 /**
110 * Queue prio
111 */
112 uint32_t priority;
113
114 /**
115 * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
116 */
117 uint32_t cs;
118
119 /**
120 * @brief Next element inside a DLL
121 */
122 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *next;
123
124 /**
125 * @brief Previous element inside a DLL
126 */
127 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *prev;
128};
129
130
131/**
132 * @brief Handle/Context to a single transmission
133 */
134struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorTransmission
135{
136};
137
138
139/**
140 * @brief Check whether incoming msg indicating available communicator is
141 * correct
142 *
143 * @param cls Closure
144 * @param msg Message struct
145 *
146 * @return GNUNET_YES in case message is correct
147 */
148static int
149check_communicator_available (
150 void *cls,
151 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *msg)
152{
153 uint16_t size;
154
155 size = ntohs (msg->header.size) - sizeof(*msg);
156 if (0 == size)
157 return GNUNET_OK; /* receive-only communicator */
158 GNUNET_MQ_check_zero_termination (msg);
159 return GNUNET_OK;
160}
161
162
163/**
164 * @brief Handle new communicator
165 *
166 * Store characteristics of communicator, call respective client callback.
167 *
168 * @param cls Closure - communicator handle
169 * @param msg Message struct
170 */
171static void
172handle_communicator_available (
173 void *cls,
174 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *msg)
175{
176 struct MyClient *client = cls;
177 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
178 client->tc;
179 uint16_t size;
180 tc_h->c_mq = client->c_mq;
181
182 size = ntohs (msg->header.size) - sizeof(*msg);
183 if (0 == size)
184 {
185 GNUNET_SERVICE_client_continue (client->client);
186 return; /* receive-only communicator */
187 }
188 tc_h->c_characteristics = ntohl (msg->cc);
189 GNUNET_free (tc_h->c_addr_prefix);
190 tc_h->c_addr_prefix = GNUNET_strdup ((const char *) &msg[1]);
191 if (NULL != tc_h->communicator_available_cb)
192 {
193 LOG (GNUNET_ERROR_TYPE_DEBUG, "calling communicator_available_cb()\n");
194 tc_h->communicator_available_cb (tc_h->cb_cls,
195 tc_h,
196 tc_h->c_characteristics,
197 tc_h->c_addr_prefix);
198 }
199 GNUNET_SERVICE_client_continue (client->client);
200 LOG (GNUNET_ERROR_TYPE_DEBUG, "finished communicator_available_cb()\n");
201
202}
203
204
205/**
206 * Incoming message. Test message is well-formed.
207 *
208 * @param cls the client
209 * @param msg the send message that was sent
210 * @return #GNUNET_OK if message is well-formed
211 */
212static int
213check_communicator_backchannel (void *cls,
214 const struct
215 GNUNET_TRANSPORT_CommunicatorBackchannel *msg)
216{
217 // struct TransportClient *tc = cls;
218
219 // if (CT_COMMUNICATOR != tc->type)
220 // {
221 // GNUNET_break (0);
222 // return GNUNET_SYSERR;
223 // }
224 // GNUNET_MQ_check_boxed_message (msg);
225 return GNUNET_OK;
226}
227
228
229/**
230 * @brief Receive an incoming message.
231 *
232 * Pass the message to the client.
233 *
234 * @param cls Closure - communicator handle
235 * @param bc_msg Message
236 */
237static void
238handle_communicator_backchannel (void *cls,
239 const struct
240 GNUNET_TRANSPORT_CommunicatorBackchannel *
241 bc_msg)
242{
243 struct MyClient *client = cls;
244 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
245 client->tc;
246 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *other_tc_h;
247 struct GNUNET_MessageHeader *msg;
248 msg = (struct GNUNET_MessageHeader *) &bc_msg[1];
249 uint16_t isize = ntohs (msg->size);
250 const char *target_communicator = ((const char *) msg) + isize;
251 struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
252 struct GNUNET_MQ_Envelope *env;
253
254 LOG (GNUNET_ERROR_TYPE_DEBUG,
255 "Received backchannel message\n");
256 if (tc_h->bc_enabled != GNUNET_YES)
257 {
258 GNUNET_SERVICE_client_continue (client->client);
259 return;
260 }
261 /* Find client providing this communicator */
262 /* Finally, deliver backchannel message to communicator */
263 LOG (GNUNET_ERROR_TYPE_DEBUG,
264 "Delivering backchannel message of type %u to %s\n",
265 ntohs (msg->type),
266 target_communicator);
267 other_tc_h = tc_h->bc_cb (tc_h, msg, (struct
268 GNUNET_PeerIdentity*) &bc_msg->pid);
269 env = GNUNET_MQ_msg_extra (
270 cbi,
271 isize,
272 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING);
273 cbi->pid = tc_h->peer_id;
274 memcpy (&cbi[1], msg, isize);
275
276
277 GNUNET_MQ_send (other_tc_h->c_mq, env);
278 GNUNET_SERVICE_client_continue (client->client);
279}
280
281
282/**
283 * Address of our peer added. Test message is well-formed.
284 *
285 * @param cls the client
286 * @param msg the send message that was sent
287 * @return #GNUNET_OK if message is well-formed
288 */
289static int
290check_add_address (void *cls,
291 const struct GNUNET_TRANSPORT_AddAddressMessage *msg)
292{
293 // if (CT_COMMUNICATOR != tc->type)
294 // {
295 // GNUNET_break (0);
296 // return GNUNET_SYSERR;
297 // }
298 GNUNET_MQ_check_zero_termination (msg);
299 return GNUNET_OK;
300}
301
302
303/**
304 * @brief The communicator informs about an address.
305 *
306 * Store address and call client callback.
307 *
308 * @param cls Closure - communicator handle
309 * @param msg Message
310 */
311static void
312handle_add_address (void *cls,
313 const struct GNUNET_TRANSPORT_AddAddressMessage *msg)
314{
315 struct MyClient *client = cls;
316 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
317 client->tc;
318 uint16_t size;
319 size = ntohs (msg->header.size) - sizeof(*msg);
320 LOG (GNUNET_ERROR_TYPE_DEBUG, "received add address cb %u\n", size);
321 if (0 == size)
322 return; /* receive-only communicator */
323 LOG (GNUNET_ERROR_TYPE_DEBUG, "received add address cb %u\n", size);
324 GNUNET_free (tc_h->c_address);
325 tc_h->c_address = GNUNET_strdup ((const char *) &msg[1]);
326 if (NULL != tc_h->add_address_cb)
327 {
328 LOG (GNUNET_ERROR_TYPE_DEBUG, "calling add_address_cb()\n");
329 tc_h->add_address_cb (tc_h->cb_cls,
330 tc_h,
331 tc_h->c_address,
332 GNUNET_TIME_relative_ntoh (msg->expiration),
333 msg->aid,
334 ntohl (msg->nt));
335 }
336 GNUNET_SERVICE_client_continue (client->client);
337}
338
339
340/**
341 * Incoming message. Test message is well-formed.
342 *
343 * @param cls the client
344 * @param msg the send message that was sent
345 * @return #GNUNET_OK if message is well-formed
346 */
347static int
348check_incoming_msg (void *cls,
349 const struct GNUNET_TRANSPORT_IncomingMessage *msg)
350{
351 // struct TransportClient *tc = cls;
352
353 // if (CT_COMMUNICATOR != tc->type)
354 // {
355 // GNUNET_break (0);
356 // return GNUNET_SYSERR;
357 // }
358 GNUNET_MQ_check_boxed_message (msg);
359 return GNUNET_OK;
360}
361
362
363/**
364 * @brief Receive an incoming message.
365 *
366 * Pass the message to the client.
367 *
368 * @param cls Closure - communicator handle
369 * @param inc_msg Message
370 */
371static void
372handle_incoming_msg (void *cls,
373 const struct GNUNET_TRANSPORT_IncomingMessage *inc_msg)
374{
375 struct MyClient *client = cls;
376 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
377 client->tc;
378 struct GNUNET_MessageHeader *msg;
379 msg = (struct GNUNET_MessageHeader *) &inc_msg[1];
380 size_t payload_len = ntohs (msg->size) - sizeof (struct
381 GNUNET_MessageHeader);
382 if (NULL != tc_h->incoming_msg_cb)
383 {
384 tc_h->incoming_msg_cb (tc_h->cb_cls,
385 tc_h,
386 (char*) &msg[1],
387 payload_len);
388 }
389 else
390 {
391 LOG (GNUNET_ERROR_TYPE_WARNING,
392 "Incoming message from communicator but no handler!\n");
393 }
394 if (GNUNET_YES == ntohl (inc_msg->fc_on))
395 {
396 /* send ACK when done to communicator for flow control! */
397 struct GNUNET_MQ_Envelope *env;
398 struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
399
400 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
401 GNUNET_assert (NULL != env);
402 ack->reserved = htonl (0);
403 ack->fc_id = inc_msg->fc_id;
404 ack->sender = inc_msg->sender;
405 GNUNET_MQ_send (tc_h->c_mq, env);
406 }
407
408 GNUNET_SERVICE_client_continue (client->client);
409}
410
411
412/**
413 * @brief Communicator informs that it tries to establish requested queue
414 *
415 * @param cls Closure - communicator handle
416 * @param msg Message
417 */
418static void
419handle_queue_create_ok (void *cls,
420 const struct GNUNET_TRANSPORT_CreateQueueResponse *msg)
421{
422 struct MyClient *client = cls;
423 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
424 client->tc;
425
426 if (NULL != tc_h->queue_create_reply_cb)
427 {
428 tc_h->queue_create_reply_cb (tc_h->cb_cls, tc_h, GNUNET_YES);
429 }
430 GNUNET_SERVICE_client_continue (client->client);
431}
432
433
434/**
435 * @brief Communicator informs that it won't try establishing requested queue.
436 *
437 * It will not do so probably because the address is bougus (see comment to
438 * #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL)
439 *
440 * @param cls Closure - communicator handle
441 * @param msg Message
442 */
443static void
444handle_queue_create_fail (
445 void *cls,
446 const struct GNUNET_TRANSPORT_CreateQueueResponse *msg)
447{
448 struct MyClient *client = cls;
449 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
450 client->tc;
451
452 if (NULL != tc_h->queue_create_reply_cb)
453 {
454 tc_h->queue_create_reply_cb (tc_h->cb_cls, tc_h, GNUNET_NO);
455 }
456 GNUNET_SERVICE_client_continue (client->client);
457}
458
459
460/**
461 * New queue became available. Check message.
462 *
463 * @param cls the client
464 * @param aqm the send message that was sent
465 */
466static int
467check_add_queue_message (void *cls,
468 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
469{
470 GNUNET_MQ_check_zero_termination (aqm);
471 return GNUNET_OK;
472}
473
474
475/**
476 * @brief Handle new queue
477 *
478 * Store context and call client callback.
479 *
480 * @param cls Closure - communicator handle
481 * @param msg Message struct
482 */
483static void
484handle_add_queue_message (void *cls,
485 const struct GNUNET_TRANSPORT_AddQueueMessage *msg)
486{
487 struct MyClient *client = cls;
488 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
489 client->tc;
490 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
491
492 LOG (GNUNET_ERROR_TYPE_DEBUG,
493 "Got queue with ID %u\n", msg->qid);
494 for (tc_queue = tc_h->queue_head; NULL != tc_queue; tc_queue = tc_queue->next)
495 {
496 if (tc_queue->qid == msg->qid)
497 break;
498 }
499 if (NULL == tc_queue)
500 {
501 tc_queue =
502 GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
503 tc_queue->tc_h = tc_h;
504 tc_queue->qid = msg->qid;
505 tc_queue->peer_id = msg->receiver;
506 GNUNET_CONTAINER_DLL_insert (tc_h->queue_head, tc_h->queue_tail, tc_queue);
507 }
508 GNUNET_assert (tc_queue->qid == msg->qid);
509 GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
510 tc_queue->nt = msg->nt;
511 tc_queue->mtu = ntohl (msg->mtu);
512 tc_queue->cs = msg->cs;
513 tc_queue->priority = ntohl (msg->priority);
514 tc_queue->q_len = GNUNET_ntohll (msg->q_len);
515 if (NULL != tc_h->add_queue_cb)
516 {
517 tc_h->add_queue_cb (tc_h->cb_cls, tc_h, tc_queue, tc_queue->mtu);
518 }
519 GNUNET_SERVICE_client_continue (client->client);
520}
521
522
523/**
524 * @brief Handle new queue
525 *
526 * Store context and call client callback.
527 *
528 * @param cls Closure - communicator handle
529 * @param msg Message struct
530 */
531static void
532handle_update_queue_message (void *cls,
533 const struct
534 GNUNET_TRANSPORT_UpdateQueueMessage *msg)
535{
536 struct MyClient *client = cls;
537 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
538 client->tc;
539 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
540
541 LOG (GNUNET_ERROR_TYPE_DEBUG,
542 "Received queue update message for %u with q_len %" PRIu64 "\n",
543 msg->qid, GNUNET_ntohll (msg->q_len));
544 tc_queue = tc_h->queue_head;
545 if (NULL != tc_queue)
546 {
547 while (tc_queue->qid != msg->qid)
548 {
549 tc_queue = tc_queue->next;
550 }
551 }
552 if (NULL == tc_queue)
553 {
554 GNUNET_SERVICE_client_continue (client->client);
555 return;
556 }
557 GNUNET_assert (tc_queue->qid == msg->qid);
558 GNUNET_assert (0 == GNUNET_memcmp (&tc_queue->peer_id, &msg->receiver));
559 tc_queue->nt = msg->nt;
560 tc_queue->mtu = ntohl (msg->mtu);
561 tc_queue->cs = msg->cs;
562 tc_queue->priority = ntohl (msg->priority);
563 // Uncomment this for alternativ 1 of backchannel functionality
564 tc_queue->q_len += GNUNET_ntohll (msg->q_len);
565 // Until here for alternativ 1
566 // Uncomment this for alternativ 2 of backchannel functionality
567 // tc_queue->q_len = GNUNET_ntohll (msg->q_len);
568 // Until here for alternativ 2
569 GNUNET_SERVICE_client_continue (client->client);
570}
571
572
573/**
574 * @brief Shut down the service
575 *
576 * @param cls Closure - Handle to the service
577 */
578static void
579shutdown_service (void *cls)
580{
581 struct GNUNET_SERVICE_Handle *h = cls;
582
583 LOG (GNUNET_ERROR_TYPE_DEBUG,
584 "Shutting down service!\n");
585
586 GNUNET_SERVICE_stop (h);
587}
588
589
590/**
591 * @brief Callback called when new Client (Communicator) connects
592 *
593 * @param cls Closure - TransporCommmunicator Handle
594 * @param client Client
595 * @param mq Messagequeue
596 *
597 * @return TransportCommunicator Handle
598 */
599static void *
600connect_cb (void *cls,
601 struct GNUNET_SERVICE_Client *client,
602 struct GNUNET_MQ_Handle *mq)
603{
604 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
605 struct MyClient *new_c;
606
607 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client %p connected to %p.\n",
608 client, tc_h);
609 new_c = GNUNET_new (struct MyClient);
610 new_c->client = client;
611 new_c->c_mq = mq;
612 new_c->tc = tc_h;
613 GNUNET_CONTAINER_DLL_insert (tc_h->client_head,
614 tc_h->client_tail,
615 new_c);
616
617 if (NULL == tc_h->queue_head)
618 return new_c;
619 /* Iterate over queues. They are yet to be opened. Request opening. */
620 for (struct
621 GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_iter =
622 tc_h->queue_head;
623 NULL != tc_queue_iter;
624 tc_queue_iter = tc_queue_iter->next)
625 {
626 if (NULL == tc_queue_iter->open_queue_env)
627 continue;
628 /* Send the previously created mq envelope to request the creation of the
629 * queue. */
630 GNUNET_MQ_send (tc_h->c_mq,
631 tc_queue_iter->open_queue_env);
632 tc_queue_iter->open_queue_env = NULL;
633 }
634 return new_c;
635}
636
637
638/**
639 * @brief Callback called when Client disconnects
640 *
641 * @param cls Closure - TransportCommunicator Handle
642 * @param client Client
643 * @param internal_cls TransporCommmunicator Handle
644 */
645static void
646disconnect_cb (void *cls,
647 struct GNUNET_SERVICE_Client *client,
648 void *internal_cls)
649{
650 struct MyClient *cl = cls;
651 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h = cls;
652
653 for (cl = tc_h->client_head; NULL != cl; cl = cl->next)
654 {
655 if (cl->client != client)
656 continue;
657 GNUNET_CONTAINER_DLL_remove (tc_h->client_head,
658 tc_h->client_tail,
659 cl);
660 if (cl->c_mq == tc_h->c_mq)
661 tc_h->c_mq = NULL;
662 GNUNET_free (cl);
663 break;
664 }
665 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected.\n");
666}
667
668
669/**
670 * Message was transmitted. Process the request.
671 *
672 * @param cls the client
673 * @param sma the send message that was sent
674 */
675static void
676handle_send_message_ack (void *cls,
677 const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
678{
679 struct MyClient *client = cls;
680 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
681 client->tc;
682 static int mtr = 0;
683 mtr++;
684 if (tc_h->cont != NULL)
685 tc_h->cont (tc_h->cont_cls);
686 GNUNET_SERVICE_client_continue (client->client);
687}
688
689
690/**
691 * @brief Start the communicator part of the transport service
692 *
693 * @param communicator_available Callback to be called when a new communicator
694 * becomes available
695 * @param cfg Configuration
696 */
697static void
698transport_communicator_start (
699 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
700{
701 struct GNUNET_MQ_MessageHandler mh[] = {
702 GNUNET_MQ_hd_var_size (communicator_available,
703 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
704 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
705 tc_h),
706 GNUNET_MQ_hd_var_size (communicator_backchannel,
707 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
708 struct GNUNET_TRANSPORT_CommunicatorBackchannel,
709 tc_h),
710 GNUNET_MQ_hd_var_size (add_address,
711 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
712 struct GNUNET_TRANSPORT_AddAddressMessage,
713 tc_h),
714 // GNUNET_MQ_hd_fixed_size (del_address,
715 // GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
716 // struct GNUNET_TRANSPORT_DelAddressMessage,
717 // NULL),
718 GNUNET_MQ_hd_var_size (incoming_msg,
719 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
720 struct GNUNET_TRANSPORT_IncomingMessage,
721 tc_h),
722 GNUNET_MQ_hd_fixed_size (queue_create_ok,
723 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
724 struct GNUNET_TRANSPORT_CreateQueueResponse,
725 tc_h),
726 GNUNET_MQ_hd_fixed_size (queue_create_fail,
727 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
728 struct GNUNET_TRANSPORT_CreateQueueResponse,
729 tc_h),
730 GNUNET_MQ_hd_var_size (add_queue_message,
731 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
732 struct GNUNET_TRANSPORT_AddQueueMessage,
733 tc_h),
734 GNUNET_MQ_hd_fixed_size (update_queue_message,
735 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE,
736 struct GNUNET_TRANSPORT_UpdateQueueMessage,
737 tc_h),
738 // GNUNET_MQ_hd_fixed_size (del_queue_message,
739 // GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
740 // struct GNUNET_TRANSPORT_DelQueueMessage,
741 // NULL),
742 GNUNET_MQ_hd_fixed_size (send_message_ack,
743 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
744 struct GNUNET_TRANSPORT_SendMessageToAck,
745 tc_h),
746 GNUNET_MQ_handler_end ()
747 };
748
749
750 tc_h->sh = GNUNET_SERVICE_start ("transport",
751 tc_h->cfg,
752 &connect_cb,
753 &disconnect_cb,
754 tc_h,
755 mh);
756 GNUNET_assert (NULL != tc_h->sh);
757}
758
759
760/**
761 * @brief Task run at shutdown to kill communicator and clean up
762 *
763 * @param cls Closure - Process of communicator
764 */
765static void
766shutdown_process (struct GNUNET_OS_Process *proc)
767{
768 if (0 != GNUNET_OS_process_kill (proc, SIGTERM))
769 {
770 LOG (GNUNET_ERROR_TYPE_WARNING,
771 "Error shutting down process with SIGERM, trying SIGKILL\n");
772 if (0 != GNUNET_OS_process_kill (proc, SIGKILL))
773 {
774 LOG (GNUNET_ERROR_TYPE_ERROR,
775 "Error shutting down process with SIGERM and SIGKILL\n");
776 }
777 }
778 GNUNET_break (GNUNET_OK == GNUNET_OS_process_wait (proc));
779 GNUNET_OS_process_destroy (proc);
780}
781
782
783/**
784 * @brief Task run at shutdown to kill the statistics process
785 *
786 * @param cls Closure - Process of communicator
787 */
788static void
789shutdown_statistics (void *cls)
790{
791 struct GNUNET_OS_Process *proc = cls;
792 shutdown_process (proc);
793}
794
795
796/**
797 * @brief Task run at shutdown to kill the peerstore process
798 *
799 * @param cls Closure - Process of communicator
800 */
801static void
802shutdown_peerstore (void *cls)
803{
804 struct GNUNET_OS_Process *proc = cls;
805 shutdown_process (proc);
806}
807
808
809/**
810 * @brief Task run at shutdown to kill a communicator process
811 *
812 * @param cls Closure - Process of communicator
813 */
814static void
815shutdown_communicator (void *cls)
816{
817 struct GNUNET_OS_Process *proc = cls;
818 shutdown_process (proc);
819}
820
821
822/**
823 * @brief Start the communicator
824 *
825 * @param cfgname Name of the communicator
826 */
827static void
828communicator_start (
829 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
830 const char *binary_name)
831{
832 char *binary;
833 char *loprefix;
834 char *section_name;
835
836 LOG (GNUNET_ERROR_TYPE_DEBUG, "communicator_start\n");
837
838 section_name = strchr (binary_name, '-');
839 section_name++;
840
841 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (tc_h->cfg,
842 section_name,
843 "PREFIX",
844 &loprefix))
845 loprefix = GNUNET_strdup ("");
846
847
848 binary = GNUNET_OS_get_libexec_binary_path (binary_name);
849 tc_h->c_proc = GNUNET_OS_start_process_s (GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
850 NULL,
851 loprefix,
852 binary,
853 binary_name,
854 "-c",
855 tc_h->cfg_filename,
856 NULL);
857 GNUNET_free (loprefix);
858 if (NULL == tc_h->c_proc)
859 {
860 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start communicator!");
861 return;
862 }
863 LOG (GNUNET_ERROR_TYPE_INFO, "started communicator\n");
864 GNUNET_free (binary);
865}
866
867
868/**
869 * @brief Task run at shutdown to kill communicator and clean up
870 *
871 * @param cls Closure - Process of communicator
872 */
873static void
874shutdown_nat (void *cls)
875{
876 struct GNUNET_OS_Process *proc = cls;
877 shutdown_process (proc);
878}
879
880
881/**
882 * @brief Task run at shutdown to kill the resolver process
883 *
884 * @param cls Closure - Process of communicator
885 */
886static void
887shutdown_resolver (void *cls)
888{
889 struct GNUNET_OS_Process *proc = cls;
890 shutdown_process (proc);
891}
892
893
894/**
895 * @brief Start Resolver
896 *
897 */
898static void
899resolver_start (struct
900 GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
901{
902 char *binary;
903
904 LOG (GNUNET_ERROR_TYPE_DEBUG, "resolver_start\n");
905 binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-resolver");
906 tc_h->resolver_proc = GNUNET_OS_start_process (
907 GNUNET_OS_INHERIT_STD_OUT_AND_ERR
908 | GNUNET_OS_USE_PIPE_CONTROL,
909 NULL,
910 NULL,
911 NULL,
912 binary,
913 "gnunet-service-resolver",
914 "-c",
915 tc_h->cfg_filename,
916 NULL);
917 if (NULL == tc_h->resolver_proc)
918 {
919 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start resolver service!");
920 return;
921 }
922 LOG (GNUNET_ERROR_TYPE_INFO, "started resolver service\n");
923 GNUNET_free (binary);
924
925}
926
927
928/**
929 * @brief Start Statistics
930 *
931 */
932static void
933statistics_start (
934 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
935{
936 char *binary;
937
938 binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-statistics");
939 tc_h->stat_proc = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
940 NULL,
941 NULL,
942 NULL,
943 binary,
944 "gnunet-service-statistics",
945 "-c",
946 tc_h->cfg_filename,
947 NULL);
948 if (NULL == tc_h->stat_proc)
949 {
950 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start Statistics!");
951 return;
952 }
953 LOG (GNUNET_ERROR_TYPE_INFO, "started Statistics\n");
954 GNUNET_free (binary);
955}
956
957
958/**
959 * @brief Start Peerstore
960 *
961 */
962static void
963peerstore_start (
964 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
965{
966 char *binary;
967
968 binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-peerstore");
969 tc_h->ps_proc = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_OUT_AND_ERR,
970 NULL,
971 NULL,
972 NULL,
973 binary,
974 "gnunet-service-peerstore",
975 "-c",
976 tc_h->cfg_filename,
977 NULL);
978 if (NULL == tc_h->ps_proc)
979 {
980 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start Peerstore!");
981 return;
982 }
983 LOG (GNUNET_ERROR_TYPE_INFO, "started Peerstore\n");
984 GNUNET_free (binary);
985}
986
987
988/**
989 * @brief Start NAT
990 *
991 */
992static void
993nat_start (
994 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
995{
996 char *binary;
997
998 LOG (GNUNET_ERROR_TYPE_DEBUG, "nat_start\n");
999 binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-nat");
1000 tc_h->nat_proc = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_OUT_AND_ERR
1001 | GNUNET_OS_USE_PIPE_CONTROL,
1002 NULL,
1003 NULL,
1004 NULL,
1005 binary,
1006 "gnunet-service-nat",
1007 "-c",
1008 tc_h->cfg_filename,
1009 NULL);
1010 if (NULL == tc_h->nat_proc)
1011 {
1012 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start NAT!");
1013 return;
1014 }
1015 LOG (GNUNET_ERROR_TYPE_INFO, "started NAT\n");
1016 GNUNET_free (binary);
1017}
1018
1019
1020/**
1021 * @brief Start communicator part of transport service and communicator
1022 *
1023 * @param service_name Name of the service
1024 * @param cfg Configuration handle
1025 * @param communicator_available_cb Callback that is called when a new
1026 * @param add_address_cb Callback that is called when a new
1027 * communicator becomes available
1028 * @param cb_cls Closure to @a communicator_available_cb and @a
1029 *
1030 * @return Handle to the communicator duo
1031 */
1032struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *
1033GNUNET_TRANSPORT_TESTING_transport_communicator_service_start (
1034 const char *service_name,
1035 const char *binary_name,
1036 const char *cfg_filename,
1037 const struct GNUNET_PeerIdentity *peer_id,
1038 GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback
1039 communicator_available_cb,
1040 GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb,
1041 GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb,
1042 GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb,
1043 GNUNET_TRANSPORT_TESTING_IncomingMessageCallback incoming_message_cb,
1044 GNUNET_TRANSPORT_TESTING_BackchannelCallback bc_cb,
1045 void *cb_cls)
1046{
1047 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h;
1048
1049 LOG (GNUNET_ERROR_TYPE_DEBUG,
1050 "Starting new transport/communicator combo with config %s\n",
1051 cfg_filename);
1052 tc_h =
1053 GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle);
1054 tc_h->cfg_filename = GNUNET_strdup (cfg_filename);
1055 tc_h->cfg = GNUNET_CONFIGURATION_create ();
1056 if ((GNUNET_SYSERR == GNUNET_CONFIGURATION_load (tc_h->cfg, cfg_filename)))
1057 {
1058 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1059 _ ("Malformed configuration file `%s', exit ...\n"),
1060 cfg_filename);
1061 GNUNET_free (tc_h->cfg_filename);
1062 GNUNET_CONFIGURATION_destroy (tc_h->cfg);
1063 GNUNET_free (tc_h);
1064 return NULL;
1065 }
1066 tc_h->bc_enabled = GNUNET_CONFIGURATION_get_value_yesno (tc_h->cfg,
1067 "communicator-test",
1068 "BACKCHANNEL_ENABLED");
1069 tc_h->communicator_available_cb = communicator_available_cb;
1070 tc_h->add_address_cb = add_address_cb;
1071 tc_h->queue_create_reply_cb = queue_create_reply_cb;
1072 tc_h->add_queue_cb = add_queue_cb;
1073 tc_h->incoming_msg_cb = incoming_message_cb;
1074 tc_h->bc_cb = bc_cb;
1075 tc_h->peer_id = *peer_id;
1076 tc_h->cb_cls = cb_cls;
1077
1078 /* Start communicator part of service */
1079 transport_communicator_start (tc_h);
1080 /* Start NAT */
1081 nat_start (tc_h);
1082 /* Start resolver service */
1083 resolver_start (tc_h);
1084 /* Start peerstore service */
1085 peerstore_start (tc_h);
1086 /* Start statistic service */
1087 statistics_start (tc_h);
1088 /* Schedule start communicator */
1089 communicator_start (tc_h,
1090 binary_name);
1091 return tc_h;
1092}
1093
1094
1095void
1096GNUNET_TRANSPORT_TESTING_transport_communicator_service_stop (
1097 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h)
1098{
1099 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *queue;
1100 shutdown_communicator (tc_h->c_proc);
1101 shutdown_service (tc_h->sh);
1102 shutdown_nat (tc_h->nat_proc);
1103 shutdown_resolver (tc_h->resolver_proc);
1104 shutdown_peerstore (tc_h->ps_proc);
1105 shutdown_statistics (tc_h->stat_proc);
1106 GNUNET_CONFIGURATION_destroy (tc_h->cfg);
1107 while (NULL != (queue = tc_h->queue_head))
1108 {
1109 GNUNET_CONTAINER_DLL_remove (tc_h->queue_head, tc_h->queue_tail, queue);
1110 GNUNET_free (queue);
1111 }
1112 GNUNET_free (tc_h->c_address);
1113 GNUNET_free (tc_h->cfg_filename);
1114 GNUNET_free (tc_h->c_addr_prefix);
1115 GNUNET_free (tc_h);
1116}
1117
1118
1119/**
1120 * @brief Instruct communicator to open a queue
1121 *
1122 * @param tc_h Handle to communicator which shall open queue
1123 * @param peer_id Towards which peer
1124 * @param address For which address
1125 */
1126void
1127GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (
1128 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
1129 const struct GNUNET_PeerIdentity *peer_id,
1130 const char *address)
1131{
1132 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
1133 static uint32_t idgen;
1134 char *prefix;
1135 struct GNUNET_TRANSPORT_CreateQueue *msg;
1136 struct GNUNET_MQ_Envelope *env;
1137 size_t alen;
1138
1139 tc_queue =
1140 GNUNET_new (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue);
1141 tc_queue->tc_h = tc_h;
1142 prefix = GNUNET_HELLO_address_to_prefix (address);
1143 if (NULL == prefix)
1144 {
1145 GNUNET_break (0); /* We got an invalid address!? */
1146 GNUNET_free (tc_queue);
1147 return;
1148 }
1149 GNUNET_free (prefix);
1150 alen = strlen (address) + 1;
1151 env =
1152 GNUNET_MQ_msg_extra (msg, alen, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
1153 msg->request_id = htonl (idgen++);
1154 tc_queue->qid = msg->request_id;
1155 msg->receiver = *peer_id;
1156 tc_queue->peer_id = *peer_id;
1157 memcpy (&msg[1], address, alen);
1158 if (NULL != tc_h->c_mq)
1159 {
1160 LOG (GNUNET_ERROR_TYPE_DEBUG,
1161 "Sending queue create immediately\n");
1162 GNUNET_MQ_send (tc_h->c_mq, env);
1163 }
1164 else
1165 {
1166 tc_queue->open_queue_env = env;
1167 }
1168 GNUNET_CONTAINER_DLL_insert (tc_h->queue_head, tc_h->queue_tail, tc_queue);
1169}
1170
1171
1172void
1173GNUNET_TRANSPORT_TESTING_transport_communicator_send
1174 (struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h,
1175 GNUNET_SCHEDULER_TaskCallback cont,
1176 void *cont_cls,
1177 const void *payload,
1178 size_t payload_size)
1179{
1180 struct GNUNET_MessageHeader *mh;
1181 struct GNUNET_TRANSPORT_SendMessageTo *msg;
1182 struct GNUNET_MQ_Envelope *env;
1183 size_t inbox_size;
1184 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
1185 struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp;
1186 static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *last_queue;
1187 tc_queue = NULL;
1188
1189 for (tc_queue_tmp = tc_h->queue_head;
1190 NULL != tc_queue_tmp;
1191 tc_queue_tmp = tc_queue_tmp->next)
1192 {
1193 if (tc_queue_tmp->q_len <= 0)
1194 continue;
1195 if (NULL == tc_queue)
1196 {
1197 LOG (GNUNET_ERROR_TYPE_DEBUG,
1198 "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
1199 tc_queue_tmp->priority,
1200 tc_queue_tmp->q_len,
1201 tc_queue_tmp->mtu);
1202 tc_queue = tc_queue_tmp;
1203 continue;
1204 }
1205 if (tc_queue->priority < tc_queue_tmp->priority)
1206 {
1207 LOG (GNUNET_ERROR_TYPE_DEBUG,
1208 "Selecting queue with prio %u, len %" PRIu64 " and MTU %u\n",
1209 tc_queue_tmp->priority,
1210 tc_queue_tmp->q_len,
1211 tc_queue_tmp->mtu);
1212 tc_queue = tc_queue_tmp;
1213 }
1214 }
1215 if (last_queue != tc_queue)
1216 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1217 "Selected sending queue changed to %u with length %lu and MTU %u\n",
1218 ntohl (tc_queue->qid), (unsigned long) tc_queue->q_len, tc_queue->mtu);
1219 GNUNET_assert (NULL != tc_queue);
1220 last_queue = tc_queue;
1221 // Uncomment this for alternativ 1 of backchannel functionality
1222 if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED)
1223 tc_queue->q_len--;
1224 // Until here for alternativ 1
1225 static int msg_count = 0;
1226 msg_count++;
1227 if (msg_count % 100 == 0)
1228 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1229 "Sending %u-th (%lu-th for queue) message on queue %u\n",
1230 msg_count, (unsigned long) tc_queue->mid, ntohl (tc_queue->qid));
1231 inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size;
1232 env = GNUNET_MQ_msg_extra (msg,
1233 inbox_size,
1234 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
1235 GNUNET_assert (NULL != env);
1236 msg->qid = tc_queue->qid;
1237 msg->mid = tc_queue->mid++;
1238 msg->receiver = tc_queue->peer_id;
1239 mh = (struct GNUNET_MessageHeader *) &msg[1];
1240 mh->size = htons (inbox_size);
1241 mh->type = GNUNET_MESSAGE_TYPE_DUMMY;
1242 memcpy (&mh[1],
1243 payload,
1244 payload_size);
1245 if (NULL != cont)
1246 GNUNET_MQ_notify_sent (env,
1247 cont,
1248 cont_cls);
1249 GNUNET_MQ_send (tc_queue->tc_h->c_mq,
1250 env);
1251}