endpoint.go (11515B)
1 // This file is part of gnunet-go, a GNUnet-implementation in Golang. 2 // Copyright (C) 2022 Bernd Fix >Y< 3 // 4 // gnunet-go is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Affero General Public License as published 6 // by the Free Software Foundation, either version 3 of the License, 7 // or (at your option) any later version. 8 // 9 // gnunet-go is distributed in the hope that it will be useful, but 10 // WITHOUT ANY WARRANTY; without even the implied warranty of 11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 // Affero General Public License for more details. 13 // 14 // You should have received a copy of the GNU Affero General Public License 15 // along with this program. If not, see <http://www.gnu.org/licenses/>. 16 // 17 // SPDX-License-Identifier: AGPL3.0-or-later 18 19 package transport 20 21 import ( 22 "bytes" 23 "context" 24 "errors" 25 "gnunet/message" 26 "gnunet/util" 27 "io" 28 "net" 29 "sync" 30 "time" 31 32 "github.com/bfix/gospel/logger" 33 ) 34 35 var ( 36 ErrEndpNotAvailable = errors.New("no endpoint for address available") 37 ErrEndpProtocolMismatch = errors.New("transport protocol mismatch") 38 ErrEndpProtocolUnknown = errors.New("unknown transport protocol") 39 ErrEndpExists = errors.New("endpoint exists") 40 ErrEndpNoAddress = errors.New("no address for endpoint") 41 ErrEndpNoConnection = errors.New("no connection on endpoint") 42 ErrEndpMaybeSent = errors.New("message may have been sent - can't know") 43 ErrEndpWriteShort = errors.New("write too short") 44 ErrEndpReadShort = errors.New("read too short") 45 ) 46 47 // Endpoint represents a local endpoint that can send and receive messages. 48 // Implementations need to manage the relations between peer IDs and 49 // remote endpoints for TCP and UDP traffic. 50 type Endpoint interface { 51 // Run the endpoint and send received messages to channel 52 Run(context.Context, chan *Message) error 53 54 // Send message on endpoint 55 Send(context.Context, net.Addr, *Message) error 56 57 // Address returns the listening address for the endpoint 58 Address() net.Addr 59 60 // CanSendTo returns true if the endpoint can sent to address 61 CanSendTo(net.Addr) bool 62 63 // Return endpoint identifier 64 ID() int 65 } 66 67 //---------------------------------------------------------------------- 68 69 // NewEndpoint returns a suitable endpoint for the address. 70 func NewEndpoint(addr net.Addr) (ep Endpoint, err error) { 71 switch epMode(addr.Network()) { 72 case "packet": 73 ep, err = newPacketEndpoint(addr) 74 case "stream": 75 ep, err = newStreamEndpoint(addr) 76 default: 77 err = ErrEndpNotAvailable 78 } 79 return 80 } 81 82 //---------------------------------------------------------------------- 83 // Packet-oriented endpoint 84 //---------------------------------------------------------------------- 85 86 // PacketEndpoint for packet-oriented network protocols 87 type PaketEndpoint struct { 88 sync.Mutex 89 90 id int // endpoint identifier 91 netw string // network identifier ("udp", "udp4", "udp6", ...) 92 addr net.Addr // endpoint address 93 conn net.PacketConn // packet connection 94 buf []byte // buffer for read/write operations 95 } 96 97 // Run packet endpoint: send incoming messages to the handler. 98 func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *Message) (err error) { 99 // create listener 100 var lc net.ListenConfig 101 xproto := ep.addr.Network() 102 if ep.conn, err = lc.ListenPacket(ctx, EpProtocol(xproto), ep.addr.String()); err != nil { 103 return 104 } 105 // use the actual listening address 106 ep.addr = util.NewAddress(xproto, ep.conn.LocalAddr().String()) 107 108 // save more information to detect compatible send-to addresses 109 ep.netw = ep.conn.LocalAddr().Network() 110 111 // run watch dog for termination 112 active := true 113 go func() { 114 <-ctx.Done() 115 active = false 116 ep.conn.Close() 117 }() 118 // run go routine to handle messages from clients 119 go func() { 120 for { 121 // read next message 122 tm, err := ep.read() 123 if err != nil { 124 // leave go routine if already dead or closed by client 125 if !active || err == io.EOF { 126 break 127 } 128 logger.Println(logger.WARN, "[pkt_ep] read failed: "+err.Error()) 129 // gracefully ignore failed messages 130 continue 131 } 132 // label message 133 tm.Label = ep.addr.String() 134 // send transport message to handler 135 go func() { 136 hdlr <- tm 137 }() 138 } 139 // connection ended. 140 ep.conn.Close() 141 }() 142 return 143 } 144 145 // Read a transport message from endpoint based on extended protocol 146 func (ep *PaketEndpoint) read() (tm *Message, err error) { 147 // read next packet (assuming that it contains one complete message) 148 var n int 149 if n, _, err = ep.conn.ReadFrom(ep.buf); err != nil { 150 return 151 } 152 // parse transport message based on extended protocol 153 var ( 154 peer *util.PeerID 155 msg message.Message 156 ) 157 switch ep.addr.Network() { 158 case "ip+udp": 159 // check for minimum size (32 byte peer id + 4 byte header) 160 if n < 36 { 161 err = ErrEndpReadShort 162 return 163 } 164 // parse peer id and message in sequence 165 peer = util.NewPeerID(ep.buf[:32]) 166 rdr := bytes.NewBuffer(util.Clone(ep.buf[32:n])) 167 if msg, err = ReadMessageDirect(rdr, ep.buf); err != nil { 168 return 169 } 170 default: 171 panic(ErrEndpProtocolUnknown) 172 } 173 // return transport message 174 return &Message{ 175 Peer: peer, 176 Msg: msg, 177 Resp: nil, 178 Label: "", 179 }, nil 180 } 181 182 // Send message to address from endpoint 183 func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg *Message) (err error) { 184 // only one sender at a time 185 ep.Lock() 186 defer ep.Unlock() 187 188 // check for valid connection 189 if ep.conn == nil { 190 return ErrEndpNoConnection 191 } 192 193 // resolve target address 194 var a *net.UDPAddr 195 if a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()), addr.String()); err != nil { 196 return 197 } 198 199 // get message content (TransportMessage) 200 var buf []byte 201 if buf, err = msg.Bytes(); err != nil { 202 return 203 } 204 205 // handle extended protocol: 206 switch ep.addr.Network() { 207 case "ip+udp": 208 // no modifications required 209 210 default: 211 // unknown protocol 212 return ErrEndpProtocolUnknown 213 } 214 215 // timeout after 1 second 216 if err = ep.conn.SetWriteDeadline(time.Now().Add(time.Second)); err != nil { 217 logger.Println(logger.DBG, "[pkt_ep] SetWriteDeadline failed: "+err.Error()) 218 return 219 } 220 var n int 221 n, err = ep.conn.WriteTo(buf, a) 222 if n != len(buf) { 223 err = ErrEndpWriteShort 224 } 225 return ErrEndpMaybeSent 226 } 227 228 // Address returms the 229 func (ep *PaketEndpoint) Address() net.Addr { 230 return ep.addr 231 } 232 233 // CanSendTo returns true if the endpoint can sent to address 234 func (ep *PaketEndpoint) CanSendTo(addr net.Addr) (ok bool) { 235 ok = EpProtocol(addr.Network()) == EpProtocol(ep.addr.Network()) 236 if ok { 237 // try to convert addr to compatible type 238 switch ep.netw { 239 case "udp", "udp4", "udp6": 240 var err error 241 if _, err = net.ResolveUDPAddr(ep.netw, addr.String()); err != nil { 242 ok = false 243 } 244 default: 245 logger.Printf(logger.WARN, "[pkt_ep] unknown network %s", ep.netw) 246 ok = false 247 } 248 } else { 249 logger.Printf(logger.DBG, "[pkt_ep] protocol mismatch %s -- %s", EpProtocol(addr.Network()), EpProtocol(ep.addr.Network())) 250 } 251 return 252 } 253 254 // ID returns the endpoint identifier 255 func (ep *PaketEndpoint) ID() int { 256 return ep.id 257 } 258 259 // create a new packet endpoint for protcol and address 260 func newPacketEndpoint(addr net.Addr) (ep *PaketEndpoint, err error) { 261 // check for matching protocol 262 if epMode(addr.Network()) != "packet" { 263 err = ErrEndpProtocolMismatch 264 return 265 } 266 // create endpoint 267 ep = &PaketEndpoint{ 268 id: util.NextID(), 269 addr: addr, 270 buf: make([]byte, 65536), 271 } 272 return 273 } 274 275 //---------------------------------------------------------------------- 276 // Stream-oriented endpoint 277 //---------------------------------------------------------------------- 278 279 // StreamEndpoint for stream-oriented network protocols 280 type StreamEndpoint struct { 281 id int // endpoint identifier 282 addr net.Addr // listening address 283 listener net.Listener // listener instance 284 conns *util.Map[int, net.Conn] // active connections 285 buf []byte // read/write buffer 286 } 287 288 // Run packet endpoint: send incoming messages to the handler. 289 func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *Message) (err error) { 290 // create listener 291 var lc net.ListenConfig 292 xproto := ep.addr.Network() 293 if ep.listener, err = lc.Listen(ctx, EpProtocol(xproto), ep.addr.String()); err != nil { 294 return 295 } 296 // get actual listening address 297 ep.addr = util.NewAddress(xproto, ep.listener.Addr().String()) 298 299 // run watch dog for termination 300 go func() { 301 <-ctx.Done() 302 ep.listener.Close() 303 }() 304 // run go routine to handle messages from clients 305 go func() { 306 for { 307 // get next client connection 308 conn, err := ep.listener.Accept() 309 if err != nil { 310 return 311 } 312 session := util.NextID() 313 ep.conns.Put(session, conn, 0) 314 go func() { 315 for { 316 // read next message from connection 317 tm, err := ep.read(ctx, conn) 318 if err != nil { 319 break 320 } 321 // send transport message to handler 322 go func() { 323 hdlr <- tm 324 }() 325 } 326 // connection ended. 327 conn.Close() 328 ep.conns.Delete(session, 0) 329 }() 330 } 331 }() 332 return 333 } 334 335 // Read a transport message from endpoint based on extended protocol 336 func (ep *StreamEndpoint) read(ctx context.Context, conn net.Conn) (tm *Message, err error) { 337 // parse transport message based on extended protocol 338 var ( 339 peer *util.PeerID 340 msg message.Message 341 ) 342 switch ep.addr.Network() { 343 case "ip+udp": 344 // parse peer id 345 peer = util.NewPeerID(nil) 346 if _, err = conn.Read(peer.Data); err != nil { 347 return 348 } 349 // read next message from connection 350 if msg, err = ReadMessage(ctx, conn, ep.buf); err != nil { 351 break 352 } 353 default: 354 panic(ErrEndpProtocolUnknown) 355 } 356 // return transport message 357 return &Message{ 358 Peer: peer, 359 Msg: msg, 360 }, nil 361 } 362 363 // Send message to address from endpoint 364 func (ep *StreamEndpoint) Send(ctx context.Context, addr net.Addr, msg *Message) error { 365 return nil 366 } 367 368 // Address returns the actual listening endpoint address 369 func (ep *StreamEndpoint) Address() net.Addr { 370 return ep.addr 371 } 372 373 // CanSendTo returns true if the endpoint can sent to address 374 func (ep *StreamEndpoint) CanSendTo(addr net.Addr) bool { 375 return epMode(addr.Network()) == "stream" 376 } 377 378 // ID returns the endpoint identifier 379 func (ep *StreamEndpoint) ID() int { 380 return ep.id 381 } 382 383 // create a new endpoint based on extended protocol and address 384 func newStreamEndpoint(addr net.Addr) (ep *StreamEndpoint, err error) { 385 // check for matching protocol 386 if epMode(addr.Network()) != "stream" { 387 err = ErrEndpProtocolMismatch 388 return 389 } 390 // create endpoint 391 ep = &StreamEndpoint{ 392 id: util.NextID(), 393 addr: addr, 394 conns: util.NewMap[int, net.Conn](), 395 buf: make([]byte, 65536), 396 } 397 return 398 } 399 400 //---------------------------------------------------------------------- 401 // derive endpoint mode (packet/stream) and transport protocol from 402 // net.Adddr.Network() strings 403 //---------------------------------------------------------------------- 404 405 // EpProtocol returns the transport protocol for a given network string 406 // that can include extended protocol information like "r5n+ip+udp" 407 func EpProtocol(netw string) string { 408 switch netw { 409 case "udp", "udp4", "udp6", "ip+udp": 410 return "udp" 411 case "tcp", "tcp4", "tcp6": 412 return "tcp" 413 case "unix": 414 return "unix" 415 } 416 return "" 417 } 418 419 // epMode returns the endpoint mode (packet or stream) for a given network 420 func epMode(netw string) string { 421 switch EpProtocol(netw) { 422 case "udp": 423 return "packet" 424 case "tcp", "unix": 425 return "stream" 426 } 427 return "" 428 }