diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-01-28 09:29:42 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-01-28 09:29:42 +0100 |
commit | cadf559899f7dfaf24ed27cab923414058f207b3 (patch) | |
tree | c45c7117e6e63e3821f5415aab57583c28273bce /src/transport | |
parent | 64483767341fc741ea68249335ac1914c5a6cf1c (diff) | |
download | gnunet-cadf559899f7dfaf24ed27cab923414058f207b3.tar.gz gnunet-cadf559899f7dfaf24ed27cab923414058f207b3.zip |
more work on TCP communicator
Diffstat (limited to 'src/transport')
-rw-r--r-- | src/transport/gnunet-communicator-tcp.c | 596 |
1 files changed, 480 insertions, 116 deletions
diff --git a/src/transport/gnunet-communicator-tcp.c b/src/transport/gnunet-communicator-tcp.c index 5a397c296..a94559bd2 100644 --- a/src/transport/gnunet-communicator-tcp.c +++ b/src/transport/gnunet-communicator-tcp.c | |||
@@ -24,11 +24,14 @@ | |||
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | * | 25 | * |
26 | * TODO: | 26 | * TODO: |
27 | * - lots of basic adaptations (see FIXMEs) | 27 | * - lots of basic adaptations (see FIXMEs), need NAT service |
28 | * - better message queue management | 28 | * to determine our own listen IPs! Parsing of bindto spec! |
29 | * - actually encrypt, hmac, decrypt | 29 | * - actual decryption and handling of boxes and rekeys! |
30 | * - actually transmit | 30 | * - message queue management: flow control towards CORE! |
31 | * - | 31 | * (stop reading from socket until MQ send to core is done; |
32 | * will need a counter as ONE read from socket may generate | ||
33 | * multiple messages en route to CORE; tricky bit: queue | ||
34 | * may die before we get MQ sent-done callbacks!) | ||
32 | */ | 35 | */ |
33 | #include "platform.h" | 36 | #include "platform.h" |
34 | #include "gnunet_util_lib.h" | 37 | #include "gnunet_util_lib.h" |
@@ -61,12 +64,24 @@ | |||
61 | #define REKEY_TIME_INTERVAL GNUNET_TIME_UNIT_DAYS | 64 | #define REKEY_TIME_INTERVAL GNUNET_TIME_UNIT_DAYS |
62 | 65 | ||
63 | /** | 66 | /** |
67 | * How long do we wait until we must have received the initial KX? | ||
68 | */ | ||
69 | #define PROTO_QUEUE_TIMEOUT GNUNET_TIME_UNIT_MINUTES | ||
70 | |||
71 | /** | ||
64 | * How often do we rekey based on number of bytes transmitted? | 72 | * How often do we rekey based on number of bytes transmitted? |
65 | * (additionally randomized). | 73 | * (additionally randomized). |
66 | */ | 74 | */ |
67 | #define REKEY_MAX_BYTES (1024LLU * 1024 * 1024 * 4LLU) | 75 | #define REKEY_MAX_BYTES (1024LLU * 1024 * 1024 * 4LLU) |
68 | 76 | ||
69 | /** | 77 | /** |
78 | * Size of the initial key exchange message sent first in both | ||
79 | * directions. | ||
80 | */ | ||
81 | #define INITIAL_KX_SIZE (sizeof (struct GNUNET_CRYPTO_EcdhePublicKey)+sizeof (struct TCPConfirmation)) | ||
82 | |||
83 | |||
84 | /** | ||
70 | * Address prefix used by the communicator. | 85 | * Address prefix used by the communicator. |
71 | */ | 86 | */ |
72 | #define COMMUNICATOR_ADDRESS_PREFIX "tcp" | 87 | #define COMMUNICATOR_ADDRESS_PREFIX "tcp" |
@@ -400,6 +415,61 @@ struct Queue | |||
400 | 415 | ||
401 | 416 | ||
402 | /** | 417 | /** |
418 | * Handle for an incoming connection where we do not yet have enough | ||
419 | * information to setup a full queue. | ||
420 | */ | ||
421 | struct ProtoQueue | ||
422 | { | ||
423 | |||
424 | /** | ||
425 | * Kept in a DLL. | ||
426 | */ | ||
427 | struct ProtoQueue *next; | ||
428 | |||
429 | /** | ||
430 | * Kept in a DLL. | ||
431 | */ | ||
432 | struct ProtoQueue *prev; | ||
433 | |||
434 | /** | ||
435 | * socket that we transmit all data with on this queue | ||
436 | */ | ||
437 | struct GNUNET_NETWORK_Handle *sock; | ||
438 | |||
439 | /** | ||
440 | * ID of read task for this connection. | ||
441 | */ | ||
442 | struct GNUNET_SCHEDULER_Task *read_task; | ||
443 | |||
444 | /** | ||
445 | * Address of the other peer. | ||
446 | */ | ||
447 | struct sockaddr *address; | ||
448 | |||
449 | /** | ||
450 | * Length of the address. | ||
451 | */ | ||
452 | socklen_t address_len; | ||
453 | |||
454 | /** | ||
455 | * Timeout for this protoqueue. | ||
456 | */ | ||
457 | struct GNUNET_TIME_Absolute timeout; | ||
458 | |||
459 | /** | ||
460 | * Buffer for reading all the information we need to upgrade from | ||
461 | * protoqueue to queue. | ||
462 | */ | ||
463 | char ibuf[INITIAL_KX_SIZE]; | ||
464 | |||
465 | /** | ||
466 | * Current offset for reading into @e ibuf. | ||
467 | */ | ||
468 | size_t ibuf_off; | ||
469 | }; | ||
470 | |||
471 | |||
472 | /** | ||
403 | * ID of listen task | 473 | * ID of listen task |
404 | */ | 474 | */ |
405 | static struct GNUNET_SCHEDULER_Task *listen_task; | 475 | static struct GNUNET_SCHEDULER_Task *listen_task; |
@@ -454,6 +524,16 @@ static struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key; | |||
454 | */ | 524 | */ |
455 | static const struct GNUNET_CONFIGURATION_Handle *cfg; | 525 | static const struct GNUNET_CONFIGURATION_Handle *cfg; |
456 | 526 | ||
527 | /** | ||
528 | * Protoqueues DLL head. | ||
529 | */ | ||
530 | static struct ProtoQueue *proto_head; | ||
531 | |||
532 | /** | ||
533 | * Protoqueues DLL tail. | ||
534 | */ | ||
535 | static struct ProtoQueue *proto_tail; | ||
536 | |||
457 | 537 | ||
458 | /** | 538 | /** |
459 | * We have been notified that our listen socket has something to | 539 | * We have been notified that our listen socket has something to |
@@ -514,7 +594,6 @@ queue_destroy (struct Queue *queue) | |||
514 | listen_sock, | 594 | listen_sock, |
515 | &listen_cb, | 595 | &listen_cb, |
516 | NULL); | 596 | NULL); |
517 | |||
518 | } | 597 | } |
519 | 598 | ||
520 | 599 | ||
@@ -559,8 +638,44 @@ hmac (struct GNUNET_HashCode *hmac_secret, | |||
559 | static void | 638 | static void |
560 | queue_finish (struct Queue *queue) | 639 | queue_finish (struct Queue *queue) |
561 | { | 640 | { |
562 | // FIXME: try to send 'finish' message first!? | 641 | struct TCPFinish fin; |
563 | queue_destroy (queue); | 642 | |
643 | memset (&fin, | ||
644 | 0, | ||
645 | sizeof (fin)); | ||
646 | fin.header.size = htons (sizeof (fin)); | ||
647 | fin.header.type = htons (GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_FINISH); | ||
648 | hmac (&queue->out_hmac, | ||
649 | &fin, | ||
650 | sizeof (fin), | ||
651 | &fin.hmac); | ||
652 | /* if there is any message left in pwrite_buf, we | ||
653 | overwrite it (possibly dropping the last message | ||
654 | from CORE hard here) */ | ||
655 | memcpy (queue->pwrite_buf, | ||
656 | &fin, | ||
657 | sizeof (fin)); | ||
658 | queue->pwrite_off = sizeof (fin); | ||
659 | /* This flag will ensure that #queue_write() no longer | ||
660 | notifies CORE about the possibility of sending | ||
661 | more data, and that #queue_write() will call | ||
662 | #queue_destroy() once the @c fin was fully written. */ | ||
663 | queue->finishing = GNUNET_YES; | ||
664 | } | ||
665 | |||
666 | |||
667 | /** | ||
668 | * Increment queue timeout due to activity. We do not immediately | ||
669 | * notify the monitor here as that might generate excessive | ||
670 | * signalling. | ||
671 | * | ||
672 | * @param queue queue for which the timeout should be rescheduled | ||
673 | */ | ||
674 | static void | ||
675 | reschedule_queue_timeout (struct Queue *queue) | ||
676 | { | ||
677 | queue->timeout | ||
678 | = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); | ||
564 | } | 679 | } |
565 | 680 | ||
566 | 681 | ||
@@ -577,16 +692,29 @@ queue_read (void *cls) | |||
577 | ssize_t rcvd; | 692 | ssize_t rcvd; |
578 | 693 | ||
579 | queue->read_task = NULL; | 694 | queue->read_task = NULL; |
580 | /* FIXME: perform read! */ | ||
581 | rcvd = GNUNET_NETWORK_socket_recv (queue->sock, | 695 | rcvd = GNUNET_NETWORK_socket_recv (queue->sock, |
582 | &queue->cread_buf[queue->cread_off], | 696 | &queue->cread_buf[queue->cread_off], |
583 | BUF_SIZE - queue->cread_off); | 697 | BUF_SIZE - queue->cread_off); |
584 | if (-1 == rcvd) | 698 | if (-1 == rcvd) |
585 | { | 699 | { |
586 | // FIXME: error handling... | 700 | if ( (EAGAIN != errno) && |
701 | (EINTR != errno) ) | ||
702 | { | ||
703 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, | ||
704 | "recv"); | ||
705 | queue_finish (queue); | ||
706 | return; | ||
707 | } | ||
708 | /* try again */ | ||
709 | queue->read_task | ||
710 | = GNUNET_SCHEDULER_add_read_net (left, | ||
711 | queue->sock, | ||
712 | &queue_read, | ||
713 | queue); | ||
714 | return; | ||
587 | } | 715 | } |
588 | if (0 != rcvd) | 716 | if (0 != rcvd) |
589 | /* update queue timeout */ | 717 | reschedule_queue_timeout (queue); |
590 | queue->cread_off += rcvd; | 718 | queue->cread_off += rcvd; |
591 | if (queue->pread_off < sizeof (queue->pread_buf)) | 719 | if (queue->pread_off < sizeof (queue->pread_buf)) |
592 | { | 720 | { |
@@ -621,22 +749,6 @@ queue_read (void *cls) | |||
621 | 749 | ||
622 | 750 | ||
623 | /** | 751 | /** |
624 | * Increment queue timeout due to activity. We do not immediately | ||
625 | * notify the monitor here as that might generate excessive | ||
626 | * signalling. | ||
627 | * | ||
628 | * @param queue queue for which the timeout should be rescheduled | ||
629 | */ | ||
630 | static void | ||
631 | reschedule_queue_timeout (struct Queue *queue) | ||
632 | { | ||
633 | GNUNET_assert (NULL != queue->read_task); | ||
634 | queue->timeout | ||
635 | = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); | ||
636 | } | ||
637 | |||
638 | |||
639 | /** | ||
640 | * Convert TCP bind specification to a `struct sockaddr *` | 752 | * Convert TCP bind specification to a `struct sockaddr *` |
641 | * | 753 | * |
642 | * @param bindto bind specification to convert | 754 | * @param bindto bind specification to convert |
@@ -856,7 +968,7 @@ queue_write (void *cls) | |||
856 | memmove (queue->cwrite_buf, | 968 | memmove (queue->cwrite_buf, |
857 | &queue->cwrite_buf[sent], | 969 | &queue->cwrite_buf[sent], |
858 | queue->cwrite_off - sent); | 970 | queue->cwrite_off - sent); |
859 | /* FIXME: update queue timeout */ | 971 | reschedule_queue_timeout (queue); |
860 | } | 972 | } |
861 | /* can we encrypt more? (always encrypt full messages, needed | 973 | /* can we encrypt more? (always encrypt full messages, needed |
862 | such that #mq_cancel() can work!) */ | 974 | such that #mq_cancel() can work!) */ |
@@ -889,6 +1001,13 @@ queue_write (void *cls) | |||
889 | queue->mq_awaits_continue = GNUNET_NO; | 1001 | queue->mq_awaits_continue = GNUNET_NO; |
890 | GNUNET_MQ_impl_send_continue (queue->mq); | 1002 | GNUNET_MQ_impl_send_continue (queue->mq); |
891 | } | 1003 | } |
1004 | /* did we just finish writing 'finish'? */ | ||
1005 | if ( (0 == queue->cwrite_off) && | ||
1006 | (GNUNET_YES == queue->finishing) ) | ||
1007 | { | ||
1008 | queue_destroy (queue); | ||
1009 | return; | ||
1010 | } | ||
892 | /* do we care to write more? */ | 1011 | /* do we care to write more? */ |
893 | if (0 < queue->cwrite_off) | 1012 | if (0 < queue->cwrite_off) |
894 | queue->write_task | 1013 | queue->write_task |
@@ -917,6 +1036,8 @@ mq_send (struct GNUNET_MQ_Handle *mq, | |||
917 | struct TCPBox box; | 1036 | struct TCPBox box; |
918 | 1037 | ||
919 | GNUNET_assert (mq == queue->mq); | 1038 | GNUNET_assert (mq == queue->mq); |
1039 | if (GNUNET_YES == queue->finishing) | ||
1040 | return; /* this queue is dying, drop msg */ | ||
920 | GNUNET_assert (0 == queue->pread_off); | 1041 | GNUNET_assert (0 == queue->pread_off); |
921 | box.header.type = htons (GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_BOX); | 1042 | box.header.type = htons (GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_BOX); |
922 | box.header.size = htons (msize); | 1043 | box.header.size = htons (msize); |
@@ -1005,31 +1126,16 @@ mq_error (void *cls, | |||
1005 | 1126 | ||
1006 | 1127 | ||
1007 | /** | 1128 | /** |
1008 | * Creates a new outbound queue the transport service will use to send | 1129 | * Add the given @a queue to our internal data structure. Setup the |
1009 | * data to another peer. | 1130 | * MQ processing and inform transport that the queue is ready. Must |
1131 | * be called after the KX for outgoing messages has been bootstrapped. | ||
1010 | * | 1132 | * |
1011 | * @param sock the queue's socket | 1133 | * @param queue queue to boot |
1012 | * @param target the target peer | 1134 | */ |
1013 | * @param cs inbound or outbound queue | 1135 | static void |
1014 | * @param in the address | 1136 | boot_queue (struct Queue *queue, |
1015 | * @param in_len number of bytes in @a in | 1137 | enum GNUNET_TRANSPORT_ConnectionStatus cs) |
1016 | * @return the queue or NULL of max connections exceeded | ||
1017 | */ | ||
1018 | static struct Queue * | ||
1019 | setup_queue (struct GNUNET_NETWORK_Handle *sock, | ||
1020 | const struct GNUNET_PeerIdentity *target, | ||
1021 | enum GNUNET_TRANSPORT_ConnectionStatus cs, | ||
1022 | const struct sockaddr *in, | ||
1023 | socklen_t in_len) | ||
1024 | { | 1138 | { |
1025 | struct Queue *queue; | ||
1026 | |||
1027 | queue = GNUNET_new (struct Queue); | ||
1028 | queue->target = *target; | ||
1029 | queue->address = GNUNET_memdup (in, | ||
1030 | in_len); | ||
1031 | queue->address_len = in_len; | ||
1032 | queue->sock = sock; | ||
1033 | queue->nt = 0; // FIXME: determine NT! | 1139 | queue->nt = 0; // FIXME: determine NT! |
1034 | (void) GNUNET_CONTAINER_multipeermap_put (queue_map, | 1140 | (void) GNUNET_CONTAINER_multipeermap_put (queue_map, |
1035 | &queue->target, | 1141 | &queue->target, |
@@ -1041,11 +1147,6 @@ setup_queue (struct GNUNET_NETWORK_Handle *sock, | |||
1041 | GNUNET_NO); | 1147 | GNUNET_NO); |
1042 | queue->timeout | 1148 | queue->timeout |
1043 | = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); | 1149 | = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); |
1044 | queue->read_task | ||
1045 | = GNUNET_SCHEDULER_add_read_net (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | ||
1046 | queue->sock, | ||
1047 | &queue_read, | ||
1048 | queue); | ||
1049 | queue->mq | 1150 | queue->mq |
1050 | = GNUNET_MQ_queue_for_callbacks (&mq_send, | 1151 | = GNUNET_MQ_queue_for_callbacks (&mq_send, |
1051 | &mq_destroy, | 1152 | &mq_destroy, |
@@ -1086,19 +1187,217 @@ setup_queue (struct GNUNET_NETWORK_Handle *sock, | |||
1086 | queue->mq); | 1187 | queue->mq); |
1087 | GNUNET_free (foreign_addr); | 1188 | GNUNET_free (foreign_addr); |
1088 | } | 1189 | } |
1089 | return queue; | ||
1090 | } | 1190 | } |
1091 | 1191 | ||
1092 | 1192 | ||
1093 | /** | 1193 | /** |
1094 | * We have been notified that our listen socket has something to | 1194 | * Generate and transmit our ephemeral key and the signature for |
1095 | * read. Do the read and reschedule this function to be called again | 1195 | * the initial KX with the other peer. Must be called first, before |
1096 | * once more is available. | 1196 | * any other bytes are ever written to the output buffer. Note that |
1197 | * our cipher must already be initialized when calling this function. | ||
1198 | * Helper function for #start_initial_kx_out(). | ||
1097 | * | 1199 | * |
1098 | * @param cls NULL | 1200 | * @param queue queue to do KX for |
1201 | * @param epub our public key for the KX | ||
1099 | */ | 1202 | */ |
1100 | static void | 1203 | static void |
1101 | listen_cb (void *cls); | 1204 | transmit_kx (struct Queue *queue, |
1205 | const struct GNUNET_CRYPTO_EcdhePublicKey *epub) | ||
1206 | { | ||
1207 | struct TcpHandshakeSignature ths; | ||
1208 | struct TCPConfirmation tc; | ||
1209 | |||
1210 | memcpy (queue->cwrite_buf, | ||
1211 | epub, | ||
1212 | sizeof (*epub)); | ||
1213 | queue->cwrite_off = sizeof (epub); | ||
1214 | /* compute 'tc' and append in encrypted format to cwrite_buf */ | ||
1215 | tc.sender = my_identity; | ||
1216 | tc.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg)); | ||
1217 | ths.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_TCP_HANDSHAKE); | ||
1218 | ths.purpose.size = htonl (sizeof (ths)); | ||
1219 | ths.sender = my_identity; | ||
1220 | ths.receiver = queue->target; | ||
1221 | ths.ephemeral = *epub; | ||
1222 | ths.monotonic_time = tc.monotonic_time; | ||
1223 | GNUNET_assert (GNUNET_OK == | ||
1224 | GNUNET_CRYPTO_eddsa_sign (my_private_key, | ||
1225 | &ths.purpose, | ||
1226 | &tc.sender_sig)); | ||
1227 | GNUNET_assert (0 == | ||
1228 | gcry_cipher_encrypt (queue->out_cipher, | ||
1229 | &queue->cwrite_buf[queue->cwrite_off], | ||
1230 | sizeof (tc), | ||
1231 | &tc, | ||
1232 | sizeof (tc))); | ||
1233 | queue->cwrite_off += sizeof (tc); | ||
1234 | } | ||
1235 | |||
1236 | |||
1237 | /** | ||
1238 | * Initialize our key material for outgoing transmissions and | ||
1239 | * inform the other peer about it. Must be called first before | ||
1240 | * any data is sent. | ||
1241 | * | ||
1242 | * @param queue the queue to setup | ||
1243 | */ | ||
1244 | static void | ||
1245 | start_initial_kx_out (struct Queue *queue) | ||
1246 | { | ||
1247 | struct GNUNET_CRYPTO_EcdhePublicKey epub; | ||
1248 | |||
1249 | GNUNET_assert (GNUNET_OK == | ||
1250 | GNUNET_CRYPTO_ecdhe_key_create2 (&queue->ephemeral)); | ||
1251 | GNUNET_CRYPTO_ecdhe_key_get_public (&queue->ephemeral, | ||
1252 | &epub); | ||
1253 | setup_out_cipher (queue); | ||
1254 | transmit_kx (queue, | ||
1255 | &epub); | ||
1256 | } | ||
1257 | |||
1258 | |||
1259 | /** | ||
1260 | * We have received the first bytes from the other side on a @a queue. | ||
1261 | * Decrypt the @a tc contained in @a ibuf and check the signature. | ||
1262 | * Note that #setup_in_cipher() must have already been called. | ||
1263 | * | ||
1264 | * @param queue queue to decrypt initial bytes from other peer for | ||
1265 | * @param tc[out] where to store the result | ||
1266 | * @param ibuf incoming data, of size | ||
1267 | * `INITIAL_KX_SIZE` | ||
1268 | * @return #GNUNET_OK if the signature was OK, #GNUNET_SYSERR if not | ||
1269 | */ | ||
1270 | static int | ||
1271 | decrypt_and_check_tc (struct Queue *queue, | ||
1272 | struct TCPConfirmation *tc, | ||
1273 | char *ibuf) | ||
1274 | { | ||
1275 | struct TcpHandshakeSignature ths; | ||
1276 | |||
1277 | GNUNET_assert (0 == | ||
1278 | gcry_cipher_decrypt (queue->in_cipher, | ||
1279 | tc, | ||
1280 | sizeof (*tc), | ||
1281 | &ibuf[sizeof (struct GNUNET_CRYPTO_EcdhePublicKey)], | ||
1282 | sizeof (tc))); | ||
1283 | ths.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_TCP_HANDSHAKE); | ||
1284 | ths.purpose.size = htonl (sizeof (ths)); | ||
1285 | ths.sender = tc->sender; | ||
1286 | ths.receiver = my_identity; | ||
1287 | memcpy (&ths.ephemeral, | ||
1288 | ibuf, | ||
1289 | sizeof (struct GNUNET_CRYPTO_EcdhePublicKey)); | ||
1290 | ths.monotonic_time = tc->monotonic_time; | ||
1291 | return GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_COMMUNICATOR_TCP_HANDSHAKE, | ||
1292 | &ths.purpose, | ||
1293 | &tc->sender_sig, | ||
1294 | &tc->sender.public_key); | ||
1295 | } | ||
1296 | |||
1297 | |||
1298 | /** | ||
1299 | * Closes socket and frees memory associated with @a pq. | ||
1300 | * | ||
1301 | * @param pq proto queue to free | ||
1302 | */ | ||
1303 | static void | ||
1304 | free_proto_queue (struct ProtoQueue *pq) | ||
1305 | { | ||
1306 | GNUNET_NETWORK_socket_close (pq->sock); | ||
1307 | GNUNET_free (pq->address); | ||
1308 | GNUNET_CONTAINER_DLL_remove (proto_head, | ||
1309 | proto_tail, | ||
1310 | pq); | ||
1311 | GNUNET_free (pq); | ||
1312 | } | ||
1313 | |||
1314 | |||
1315 | /** | ||
1316 | * Read from the socket of the proto queue until we have enough data | ||
1317 | * to upgrade to full queue. | ||
1318 | * | ||
1319 | * @param cls a `struct ProtoQueue` | ||
1320 | */ | ||
1321 | static void | ||
1322 | proto_read_kx (void *cls) | ||
1323 | { | ||
1324 | struct ProtoQueue *pq = cls; | ||
1325 | ssize_t rcvd; | ||
1326 | struct GNUNET_TIME_Relative left; | ||
1327 | struct Queue *queue; | ||
1328 | struct TCPConfirmation tc; | ||
1329 | |||
1330 | pq->read_task = NULL; | ||
1331 | left = GNUNET_TIME_absolute_get_remaining (pq->timeout); | ||
1332 | if (0 == left.rel_value_us) | ||
1333 | { | ||
1334 | free_proto_queue (pq); | ||
1335 | return; | ||
1336 | } | ||
1337 | rcvd = GNUNET_NETWORK_socket_recv (pq->sock, | ||
1338 | &pq->ibuf[pq->ibuf_off], | ||
1339 | sizeof (pq->ibuf) - pq->ibuf_off); | ||
1340 | if (-1 == rcvd) | ||
1341 | { | ||
1342 | if ( (EAGAIN != errno) && | ||
1343 | (EINTR != errno) ) | ||
1344 | { | ||
1345 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, | ||
1346 | "recv"); | ||
1347 | free_proto_queue (pq); | ||
1348 | return; | ||
1349 | } | ||
1350 | /* try again */ | ||
1351 | pq->read_task = GNUNET_SCHEDULER_add_read_net (left, | ||
1352 | pq->sock, | ||
1353 | &proto_read_kx, | ||
1354 | pq); | ||
1355 | return; | ||
1356 | } | ||
1357 | pq->ibuf_off += rcvd; | ||
1358 | if (pq->ibuf_off > sizeof (pq->ibuf)) | ||
1359 | { | ||
1360 | /* read more */ | ||
1361 | pq->read_task = GNUNET_SCHEDULER_add_read_net (left, | ||
1362 | pq->sock, | ||
1363 | &proto_read_kx, | ||
1364 | pq); | ||
1365 | return; | ||
1366 | } | ||
1367 | /* we got all the data, let's find out who we are talking to! */ | ||
1368 | queue = GNUNET_new (struct Queue); | ||
1369 | setup_in_cipher ((const struct GNUNET_CRYPTO_EcdhePublicKey *) pq->ibuf, | ||
1370 | queue); | ||
1371 | if (GNUNET_OK != | ||
1372 | decrypt_and_check_tc (queue, | ||
1373 | &tc, | ||
1374 | pq->ibuf)) | ||
1375 | { | ||
1376 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
1377 | "Invalid TCP KX received from %s\n", | ||
1378 | GNUNET_a2s (queue->address, | ||
1379 | queue->address_len)); | ||
1380 | gcry_cipher_close (queue->in_cipher); | ||
1381 | GNUNET_free (queue); | ||
1382 | free_proto_queue (pq); | ||
1383 | return; | ||
1384 | } | ||
1385 | queue->address = pq->address; /* steals reference */ | ||
1386 | queue->address_len = pq->address_len; | ||
1387 | queue->target = tc.sender; | ||
1388 | start_initial_kx_out (queue); | ||
1389 | boot_queue (queue, | ||
1390 | GNUNET_TRANSPORT_CS_INBOUND); | ||
1391 | queue->read_task | ||
1392 | = GNUNET_SCHEDULER_add_read_net (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | ||
1393 | queue->sock, | ||
1394 | &queue_read, | ||
1395 | queue); | ||
1396 | GNUNET_CONTAINER_DLL_remove (proto_head, | ||
1397 | proto_tail, | ||
1398 | pq); | ||
1399 | GNUNET_free (pq); | ||
1400 | } | ||
1102 | 1401 | ||
1103 | 1402 | ||
1104 | /** | 1403 | /** |
@@ -1111,10 +1410,10 @@ listen_cb (void *cls); | |||
1111 | static void | 1410 | static void |
1112 | listen_cb (void *cls) | 1411 | listen_cb (void *cls) |
1113 | { | 1412 | { |
1114 | struct Queue *queue; | ||
1115 | struct sockaddr_storage in; | 1413 | struct sockaddr_storage in; |
1116 | socklen_t addrlen; | 1414 | socklen_t addrlen; |
1117 | struct GNUNET_NETWORK_Handle *sock; | 1415 | struct GNUNET_NETWORK_Handle *sock; |
1416 | struct ProtoQueue *pq; | ||
1118 | 1417 | ||
1119 | listen_task = NULL; | 1418 | listen_task = NULL; |
1120 | GNUNET_assert (NULL != listen_sock); | 1419 | GNUNET_assert (NULL != listen_sock); |
@@ -1143,22 +1442,112 @@ listen_cb (void *cls) | |||
1143 | "accept"); | 1442 | "accept"); |
1144 | return; | 1443 | return; |
1145 | } | 1444 | } |
1146 | #if 0 | 1445 | pq = GNUNET_new (struct ProtoQueue); |
1147 | // FIXME: setup proto-queue first here, until we have received the starting | 1446 | pq->address_len = addrlen; |
1148 | // messages! | 1447 | pq->address = GNUNET_memdup (&in, |
1149 | queue = setup_queue (sock, | 1448 | addrlen); |
1150 | GNUNET_TRANSPORT_CS_INBOUND, | 1449 | pq->timeout = GNUNET_TIME_relative_to_absolute (PROTO_QUEUE_TIMEOUT); |
1151 | (struct sockaddr *) &in, | 1450 | pq->sock = sock; |
1152 | addrlen); | 1451 | pq->read_task = GNUNET_SCHEDULER_add_read_net (PROTO_QUEUE_TIMEOUT, |
1153 | if (NULL == queue) | 1452 | pq->sock, |
1453 | &proto_read_kx, | ||
1454 | pq); | ||
1455 | GNUNET_CONTAINER_DLL_insert (proto_head, | ||
1456 | proto_tail, | ||
1457 | pq); | ||
1458 | } | ||
1459 | |||
1460 | |||
1461 | /** | ||
1462 | * Read from the socket of the queue until we have enough data | ||
1463 | * to initialize the decryption logic and can switch to regular | ||
1464 | * reading. | ||
1465 | * | ||
1466 | * @param cls a `struct Queue` | ||
1467 | */ | ||
1468 | static void | ||
1469 | queue_read_kx (void *cls) | ||
1470 | { | ||
1471 | struct Queue *queue = cls; | ||
1472 | ssize_t rcvd; | ||
1473 | struct GNUNET_TIME_Relative left; | ||
1474 | struct TCPConfirmation tc; | ||
1475 | |||
1476 | queue->read_task = NULL; | ||
1477 | left = GNUNET_TIME_absolute_get_remaining (queue->timeout); | ||
1478 | if (0 == left.rel_value_us) | ||
1154 | { | 1479 | { |
1155 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 1480 | queue_destroy (queue); |
1156 | _("Maximum number of TCP connections exceeded, dropping incoming connection\n")); | 1481 | return; |
1482 | } | ||
1483 | rcvd = GNUNET_NETWORK_socket_recv (queue->sock, | ||
1484 | &queue->cread_buf[queue->cread_off], | ||
1485 | BUF_SIZE - queue->cread_off); | ||
1486 | if (-1 == rcvd) | ||
1487 | { | ||
1488 | if ( (EAGAIN != errno) && | ||
1489 | (EINTR != errno) ) | ||
1490 | { | ||
1491 | GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, | ||
1492 | "recv"); | ||
1493 | queue_destroy (queue); | ||
1494 | return; | ||
1495 | } | ||
1496 | queue->read_task = GNUNET_SCHEDULER_add_read_net (left, | ||
1497 | queue->sock, | ||
1498 | &queue_read_kx, | ||
1499 | queue); | ||
1500 | return; | ||
1501 | } | ||
1502 | queue->cread_off += rcvd; | ||
1503 | if (queue->cread_off < | ||
1504 | INITIAL_KX_SIZE) | ||
1505 | { | ||
1506 | /* read more */ | ||
1507 | queue->read_task = GNUNET_SCHEDULER_add_read_net (left, | ||
1508 | queue->sock, | ||
1509 | &queue_read_kx, | ||
1510 | queue); | ||
1511 | return; | ||
1512 | } | ||
1513 | /* we got all the data, let's find out who we are talking to! */ | ||
1514 | setup_in_cipher ((const struct GNUNET_CRYPTO_EcdhePublicKey *) queue->cread_buf, | ||
1515 | queue); | ||
1516 | if (GNUNET_OK != | ||
1517 | decrypt_and_check_tc (queue, | ||
1518 | &tc, | ||
1519 | queue->cread_buf)) | ||
1520 | { | ||
1521 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
1522 | "Invalid TCP KX received from %s\n", | ||
1523 | GNUNET_a2s (queue->address, | ||
1524 | queue->address_len)); | ||
1525 | queue_destroy (queue); | ||
1526 | return; | ||
1527 | } | ||
1528 | if (0 != memcmp (&tc.sender, | ||
1529 | &queue->target, | ||
1530 | sizeof (struct GNUNET_PeerIdentity))) | ||
1531 | { | ||
1532 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1533 | "Invalid sender in TCP KX received from %s\n", | ||
1534 | GNUNET_a2s (queue->address, | ||
1535 | queue->address_len)); | ||
1536 | queue_destroy (queue); | ||
1157 | return; | 1537 | return; |
1158 | } | 1538 | } |
1159 | #endif | ||
1160 | } | ||
1161 | 1539 | ||
1540 | /* update queue timeout */ | ||
1541 | reschedule_queue_timeout (queue); | ||
1542 | /* prepare to continue with regular read task immediately */ | ||
1543 | memmove (queue->cread_buf, | ||
1544 | &queue->cread_buf[INITIAL_KX_SIZE], | ||
1545 | queue->cread_off - (INITIAL_KX_SIZE)); | ||
1546 | queue->cread_off -= INITIAL_KX_SIZE; | ||
1547 | queue->read_task = GNUNET_SCHEDULER_add_now (&queue_read, | ||
1548 | queue); | ||
1549 | } | ||
1550 | |||
1162 | 1551 | ||
1163 | /** | 1552 | /** |
1164 | * Function called by the transport service to initialize a | 1553 | * Function called by the transport service to initialize a |
@@ -1187,10 +1576,7 @@ mq_init (void *cls, | |||
1187 | struct sockaddr *in; | 1576 | struct sockaddr *in; |
1188 | socklen_t in_len; | 1577 | socklen_t in_len; |
1189 | struct GNUNET_NETWORK_Handle *sock; | 1578 | struct GNUNET_NETWORK_Handle *sock; |
1190 | struct GNUNET_CRYPTO_EcdhePublicKey epub; | 1579 | |
1191 | struct TcpHandshakeSignature ths; | ||
1192 | struct TCPConfirmation tc; | ||
1193 | |||
1194 | if (0 != strncmp (address, | 1580 | if (0 != strncmp (address, |
1195 | COMMUNICATOR_ADDRESS_PREFIX "-", | 1581 | COMMUNICATOR_ADDRESS_PREFIX "-", |
1196 | strlen (COMMUNICATOR_ADDRESS_PREFIX "-"))) | 1582 | strlen (COMMUNICATOR_ADDRESS_PREFIX "-"))) |
@@ -1227,12 +1613,19 @@ mq_init (void *cls, | |||
1227 | GNUNET_free (in); | 1613 | GNUNET_free (in); |
1228 | return GNUNET_SYSERR; | 1614 | return GNUNET_SYSERR; |
1229 | } | 1615 | } |
1230 | queue = setup_queue (sock, | 1616 | |
1231 | peer, | 1617 | queue = GNUNET_new (struct Queue); |
1232 | GNUNET_TRANSPORT_CS_OUTBOUND, | 1618 | queue->target = *peer; |
1233 | in, | 1619 | queue->address = in; |
1234 | in_len); | 1620 | queue->address_len = in_len; |
1235 | GNUNET_free (in); | 1621 | queue->sock = sock; |
1622 | boot_queue (queue, | ||
1623 | GNUNET_TRANSPORT_CS_OUTBOUND); | ||
1624 | queue->read_task | ||
1625 | = GNUNET_SCHEDULER_add_read_net (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, | ||
1626 | queue->sock, | ||
1627 | &queue_read_kx, | ||
1628 | queue); | ||
1236 | if (NULL == queue) | 1629 | if (NULL == queue) |
1237 | { | 1630 | { |
1238 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 1631 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
@@ -1242,37 +1635,8 @@ mq_init (void *cls, | |||
1242 | GNUNET_NETWORK_socket_close (sock); | 1635 | GNUNET_NETWORK_socket_close (sock); |
1243 | return GNUNET_NO; | 1636 | return GNUNET_NO; |
1244 | } | 1637 | } |
1245 | GNUNET_assert (GNUNET_OK == | 1638 | start_initial_kx_out (queue); |
1246 | GNUNET_CRYPTO_ecdhe_key_create2 (&queue->ephemeral)); | 1639 | return GNUNET_OK; |
1247 | GNUNET_CRYPTO_ecdhe_key_get_public (&queue->ephemeral, | ||
1248 | &epub); | ||
1249 | setup_out_cipher (queue); | ||
1250 | memcpy (queue->cwrite_buf, | ||
1251 | &epub, | ||
1252 | sizeof (epub)); | ||
1253 | queue->cwrite_off = sizeof (epub); | ||
1254 | /* compute 'tc' and append in encrypted format to cwrite_buf */ | ||
1255 | tc.sender = my_identity; | ||
1256 | tc.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg)); | ||
1257 | ths.purpose.purpose = htonl (GNUNET_SIGNATURE_COMMUNICATOR_TCP_HANDSHAKE); | ||
1258 | ths.purpose.size = htonl (sizeof (ths)); | ||
1259 | ths.sender = my_identity; | ||
1260 | ths.receiver = queue->target; | ||
1261 | ths.ephemeral = epub; | ||
1262 | ths.monotonic_time = tc.monotonic_time; | ||
1263 | GNUNET_assert (GNUNET_OK == | ||
1264 | GNUNET_CRYPTO_eddsa_sign (my_private_key, | ||
1265 | &ths.purpose, | ||
1266 | &tc.sender_sig)); | ||
1267 | GNUNET_assert (0 == | ||
1268 | gcry_cipher_encrypt (queue->out_cipher, | ||
1269 | &queue->cwrite_buf[queue->cwrite_off], | ||
1270 | sizeof (tc), | ||
1271 | &tc, | ||
1272 | sizeof (tc))); | ||
1273 | queue->cwrite_off += sizeof (tc); | ||
1274 | |||
1275 | return GNUNET_OK; | ||
1276 | } | 1640 | } |
1277 | 1641 | ||
1278 | 1642 | ||