gnunet-go

GNUnet Bindings for Go
Log | Files | Refs | README | LICENSE

endpoint.go (11515B)


      1 // This file is part of gnunet-go, a GNUnet-implementation in Golang.
      2 // Copyright (C) 2022 Bernd Fix  >Y<
      3 //
      4 // gnunet-go is free software: you can redistribute it and/or modify it
      5 // under the terms of the GNU Affero General Public License as published
      6 // by the Free Software Foundation, either version 3 of the License,
      7 // or (at your option) any later version.
      8 //
      9 // gnunet-go is distributed in the hope that it will be useful, but
     10 // WITHOUT ANY WARRANTY; without even the implied warranty of
     11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     12 // Affero General Public License for more details.
     13 //
     14 // You should have received a copy of the GNU Affero General Public License
     15 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
     16 //
     17 // SPDX-License-Identifier: AGPL3.0-or-later
     18 
     19 package transport
     20 
     21 import (
     22 	"bytes"
     23 	"context"
     24 	"errors"
     25 	"gnunet/message"
     26 	"gnunet/util"
     27 	"io"
     28 	"net"
     29 	"sync"
     30 	"time"
     31 
     32 	"github.com/bfix/gospel/logger"
     33 )
     34 
     35 var (
     36 	ErrEndpNotAvailable     = errors.New("no endpoint for address available")
     37 	ErrEndpProtocolMismatch = errors.New("transport protocol mismatch")
     38 	ErrEndpProtocolUnknown  = errors.New("unknown transport protocol")
     39 	ErrEndpExists           = errors.New("endpoint exists")
     40 	ErrEndpNoAddress        = errors.New("no address for endpoint")
     41 	ErrEndpNoConnection     = errors.New("no connection on endpoint")
     42 	ErrEndpMaybeSent        = errors.New("message may have been sent - can't know")
     43 	ErrEndpWriteShort       = errors.New("write too short")
     44 	ErrEndpReadShort        = errors.New("read too short")
     45 )
     46 
     47 // Endpoint represents a local endpoint that can send and receive messages.
     48 // Implementations need to manage the relations between peer IDs and
     49 // remote endpoints for TCP and UDP traffic.
     50 type Endpoint interface {
     51 	// Run the endpoint and send received messages to channel
     52 	Run(context.Context, chan *Message) error
     53 
     54 	// Send message on endpoint
     55 	Send(context.Context, net.Addr, *Message) error
     56 
     57 	// Address returns the listening address for the endpoint
     58 	Address() net.Addr
     59 
     60 	// CanSendTo returns true if the endpoint can sent to address
     61 	CanSendTo(net.Addr) bool
     62 
     63 	// Return endpoint identifier
     64 	ID() int
     65 }
     66 
     67 //----------------------------------------------------------------------
     68 
     69 // NewEndpoint returns a suitable endpoint for the address.
     70 func NewEndpoint(addr net.Addr) (ep Endpoint, err error) {
     71 	switch epMode(addr.Network()) {
     72 	case "packet":
     73 		ep, err = newPacketEndpoint(addr)
     74 	case "stream":
     75 		ep, err = newStreamEndpoint(addr)
     76 	default:
     77 		err = ErrEndpNotAvailable
     78 	}
     79 	return
     80 }
     81 
     82 //----------------------------------------------------------------------
     83 // Packet-oriented endpoint
     84 //----------------------------------------------------------------------
     85 
     86 // PacketEndpoint for packet-oriented network protocols
     87 type PaketEndpoint struct {
     88 	sync.Mutex
     89 
     90 	id   int            // endpoint identifier
     91 	netw string         // network identifier ("udp", "udp4", "udp6", ...)
     92 	addr net.Addr       // endpoint address
     93 	conn net.PacketConn // packet connection
     94 	buf  []byte         // buffer for read/write operations
     95 }
     96 
     97 // Run packet endpoint: send incoming messages to the handler.
     98 func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *Message) (err error) {
     99 	// create listener
    100 	var lc net.ListenConfig
    101 	xproto := ep.addr.Network()
    102 	if ep.conn, err = lc.ListenPacket(ctx, EpProtocol(xproto), ep.addr.String()); err != nil {
    103 		return
    104 	}
    105 	// use the actual listening address
    106 	ep.addr = util.NewAddress(xproto, ep.conn.LocalAddr().String())
    107 
    108 	// save more information to detect compatible send-to addresses
    109 	ep.netw = ep.conn.LocalAddr().Network()
    110 
    111 	// run watch dog for termination
    112 	active := true
    113 	go func() {
    114 		<-ctx.Done()
    115 		active = false
    116 		ep.conn.Close()
    117 	}()
    118 	// run go routine to handle messages from clients
    119 	go func() {
    120 		for {
    121 			// read next message
    122 			tm, err := ep.read()
    123 			if err != nil {
    124 				// leave go routine if already dead or closed by client
    125 				if !active || err == io.EOF {
    126 					break
    127 				}
    128 				logger.Println(logger.WARN, "[pkt_ep] read failed: "+err.Error())
    129 				// gracefully ignore failed messages
    130 				continue
    131 			}
    132 			// label message
    133 			tm.Label = ep.addr.String()
    134 			// send transport message to handler
    135 			go func() {
    136 				hdlr <- tm
    137 			}()
    138 		}
    139 		// connection ended.
    140 		ep.conn.Close()
    141 	}()
    142 	return
    143 }
    144 
    145 // Read a transport message from endpoint based on extended protocol
    146 func (ep *PaketEndpoint) read() (tm *Message, err error) {
    147 	// read next packet (assuming that it contains one complete message)
    148 	var n int
    149 	if n, _, err = ep.conn.ReadFrom(ep.buf); err != nil {
    150 		return
    151 	}
    152 	// parse transport message based on extended protocol
    153 	var (
    154 		peer *util.PeerID
    155 		msg  message.Message
    156 	)
    157 	switch ep.addr.Network() {
    158 	case "ip+udp":
    159 		// check for minimum size (32 byte peer id + 4 byte header)
    160 		if n < 36 {
    161 			err = ErrEndpReadShort
    162 			return
    163 		}
    164 		// parse peer id and message in sequence
    165 		peer = util.NewPeerID(ep.buf[:32])
    166 		rdr := bytes.NewBuffer(util.Clone(ep.buf[32:n]))
    167 		if msg, err = ReadMessageDirect(rdr, ep.buf); err != nil {
    168 			return
    169 		}
    170 	default:
    171 		panic(ErrEndpProtocolUnknown)
    172 	}
    173 	// return transport message
    174 	return &Message{
    175 		Peer:  peer,
    176 		Msg:   msg,
    177 		Resp:  nil,
    178 		Label: "",
    179 	}, nil
    180 }
    181 
    182 // Send message to address from endpoint
    183 func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg *Message) (err error) {
    184 	// only one sender at a time
    185 	ep.Lock()
    186 	defer ep.Unlock()
    187 
    188 	// check for valid connection
    189 	if ep.conn == nil {
    190 		return ErrEndpNoConnection
    191 	}
    192 
    193 	// resolve target address
    194 	var a *net.UDPAddr
    195 	if a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()), addr.String()); err != nil {
    196 		return
    197 	}
    198 
    199 	// get message content (TransportMessage)
    200 	var buf []byte
    201 	if buf, err = msg.Bytes(); err != nil {
    202 		return
    203 	}
    204 
    205 	// handle extended protocol:
    206 	switch ep.addr.Network() {
    207 	case "ip+udp":
    208 		// no modifications required
    209 
    210 	default:
    211 		// unknown protocol
    212 		return ErrEndpProtocolUnknown
    213 	}
    214 
    215 	// timeout after 1 second
    216 	if err = ep.conn.SetWriteDeadline(time.Now().Add(time.Second)); err != nil {
    217 		logger.Println(logger.DBG, "[pkt_ep] SetWriteDeadline failed: "+err.Error())
    218 		return
    219 	}
    220 	var n int
    221 	n, err = ep.conn.WriteTo(buf, a)
    222 	if n != len(buf) {
    223 		err = ErrEndpWriteShort
    224 	}
    225 	return ErrEndpMaybeSent
    226 }
    227 
    228 // Address returms the
    229 func (ep *PaketEndpoint) Address() net.Addr {
    230 	return ep.addr
    231 }
    232 
    233 // CanSendTo returns true if the endpoint can sent to address
    234 func (ep *PaketEndpoint) CanSendTo(addr net.Addr) (ok bool) {
    235 	ok = EpProtocol(addr.Network()) == EpProtocol(ep.addr.Network())
    236 	if ok {
    237 		// try to convert addr to compatible type
    238 		switch ep.netw {
    239 		case "udp", "udp4", "udp6":
    240 			var err error
    241 			if _, err = net.ResolveUDPAddr(ep.netw, addr.String()); err != nil {
    242 				ok = false
    243 			}
    244 		default:
    245 			logger.Printf(logger.WARN, "[pkt_ep] unknown network %s", ep.netw)
    246 			ok = false
    247 		}
    248 	} else {
    249 		logger.Printf(logger.DBG, "[pkt_ep] protocol mismatch %s -- %s", EpProtocol(addr.Network()), EpProtocol(ep.addr.Network()))
    250 	}
    251 	return
    252 }
    253 
    254 // ID returns the endpoint identifier
    255 func (ep *PaketEndpoint) ID() int {
    256 	return ep.id
    257 }
    258 
    259 // create a new packet endpoint for protcol and address
    260 func newPacketEndpoint(addr net.Addr) (ep *PaketEndpoint, err error) {
    261 	// check for matching protocol
    262 	if epMode(addr.Network()) != "packet" {
    263 		err = ErrEndpProtocolMismatch
    264 		return
    265 	}
    266 	// create endpoint
    267 	ep = &PaketEndpoint{
    268 		id:   util.NextID(),
    269 		addr: addr,
    270 		buf:  make([]byte, 65536),
    271 	}
    272 	return
    273 }
    274 
    275 //----------------------------------------------------------------------
    276 // Stream-oriented endpoint
    277 //----------------------------------------------------------------------
    278 
    279 // StreamEndpoint for stream-oriented network protocols
    280 type StreamEndpoint struct {
    281 	id       int                      // endpoint identifier
    282 	addr     net.Addr                 // listening address
    283 	listener net.Listener             // listener instance
    284 	conns    *util.Map[int, net.Conn] // active connections
    285 	buf      []byte                   // read/write buffer
    286 }
    287 
    288 // Run packet endpoint: send incoming messages to the handler.
    289 func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *Message) (err error) {
    290 	// create listener
    291 	var lc net.ListenConfig
    292 	xproto := ep.addr.Network()
    293 	if ep.listener, err = lc.Listen(ctx, EpProtocol(xproto), ep.addr.String()); err != nil {
    294 		return
    295 	}
    296 	// get actual listening address
    297 	ep.addr = util.NewAddress(xproto, ep.listener.Addr().String())
    298 
    299 	// run watch dog for termination
    300 	go func() {
    301 		<-ctx.Done()
    302 		ep.listener.Close()
    303 	}()
    304 	// run go routine to handle messages from clients
    305 	go func() {
    306 		for {
    307 			// get next client connection
    308 			conn, err := ep.listener.Accept()
    309 			if err != nil {
    310 				return
    311 			}
    312 			session := util.NextID()
    313 			ep.conns.Put(session, conn, 0)
    314 			go func() {
    315 				for {
    316 					// read next message from connection
    317 					tm, err := ep.read(ctx, conn)
    318 					if err != nil {
    319 						break
    320 					}
    321 					// send transport message to handler
    322 					go func() {
    323 						hdlr <- tm
    324 					}()
    325 				}
    326 				// connection ended.
    327 				conn.Close()
    328 				ep.conns.Delete(session, 0)
    329 			}()
    330 		}
    331 	}()
    332 	return
    333 }
    334 
    335 // Read a transport message from endpoint based on extended protocol
    336 func (ep *StreamEndpoint) read(ctx context.Context, conn net.Conn) (tm *Message, err error) {
    337 	// parse transport message based on extended protocol
    338 	var (
    339 		peer *util.PeerID
    340 		msg  message.Message
    341 	)
    342 	switch ep.addr.Network() {
    343 	case "ip+udp":
    344 		// parse peer id
    345 		peer = util.NewPeerID(nil)
    346 		if _, err = conn.Read(peer.Data); err != nil {
    347 			return
    348 		}
    349 		// read next message from connection
    350 		if msg, err = ReadMessage(ctx, conn, ep.buf); err != nil {
    351 			break
    352 		}
    353 	default:
    354 		panic(ErrEndpProtocolUnknown)
    355 	}
    356 	// return transport message
    357 	return &Message{
    358 		Peer: peer,
    359 		Msg:  msg,
    360 	}, nil
    361 }
    362 
    363 // Send message to address from endpoint
    364 func (ep *StreamEndpoint) Send(ctx context.Context, addr net.Addr, msg *Message) error {
    365 	return nil
    366 }
    367 
    368 // Address returns the actual listening endpoint address
    369 func (ep *StreamEndpoint) Address() net.Addr {
    370 	return ep.addr
    371 }
    372 
    373 // CanSendTo returns true if the endpoint can sent to address
    374 func (ep *StreamEndpoint) CanSendTo(addr net.Addr) bool {
    375 	return epMode(addr.Network()) == "stream"
    376 }
    377 
    378 // ID returns the endpoint identifier
    379 func (ep *StreamEndpoint) ID() int {
    380 	return ep.id
    381 }
    382 
    383 // create a new endpoint based on extended protocol and address
    384 func newStreamEndpoint(addr net.Addr) (ep *StreamEndpoint, err error) {
    385 	// check for matching protocol
    386 	if epMode(addr.Network()) != "stream" {
    387 		err = ErrEndpProtocolMismatch
    388 		return
    389 	}
    390 	// create endpoint
    391 	ep = &StreamEndpoint{
    392 		id:    util.NextID(),
    393 		addr:  addr,
    394 		conns: util.NewMap[int, net.Conn](),
    395 		buf:   make([]byte, 65536),
    396 	}
    397 	return
    398 }
    399 
    400 //----------------------------------------------------------------------
    401 // derive endpoint mode (packet/stream) and transport protocol from
    402 // net.Adddr.Network() strings
    403 //----------------------------------------------------------------------
    404 
    405 // EpProtocol returns the transport protocol for a given network string
    406 // that can include extended protocol information like "r5n+ip+udp"
    407 func EpProtocol(netw string) string {
    408 	switch netw {
    409 	case "udp", "udp4", "udp6", "ip+udp":
    410 		return "udp"
    411 	case "tcp", "tcp4", "tcp6":
    412 		return "tcp"
    413 	case "unix":
    414 		return "unix"
    415 	}
    416 	return ""
    417 }
    418 
    419 // epMode returns the endpoint mode (packet or stream) for a given network
    420 func epMode(netw string) string {
    421 	switch EpProtocol(netw) {
    422 	case "udp":
    423 		return "packet"
    424 	case "tcp", "unix":
    425 		return "stream"
    426 	}
    427 	return ""
    428 }