core.go (11206B)
1 // This file is part of gnunet-go, a GNUnet-implementation in Golang. 2 // Copyright (C) 2019-2022 Bernd Fix >Y< 3 // 4 // gnunet-go is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Affero General Public License as published 6 // by the Free Software Foundation, either version 3 of the License, 7 // or (at your option) any later version. 8 // 9 // gnunet-go is distributed in the hope that it will be useful, but 10 // WITHOUT ANY WARRANTY; without even the implied warranty of 11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 // Affero General Public License for more details. 13 // 14 // You should have received a copy of the GNU Affero General Public License 15 // along with this program. If not, see <http://www.gnu.org/licenses/>. 16 // 17 // SPDX-License-Identifier: AGPL3.0-or-later 18 19 package core 20 21 import ( 22 "context" 23 "errors" 24 "gnunet/config" 25 "gnunet/crypto" 26 "gnunet/message" 27 "gnunet/transport" 28 "gnunet/util" 29 "net" 30 "strings" 31 "time" 32 33 "github.com/bfix/gospel/logger" 34 ) 35 36 // ---------------------------------------------------------------------- 37 // Core-related error codes 38 var ( 39 ErrCoreNoUpnpDyn = errors.New("no dynamic port with UPnP") 40 ErrCoreNoEndpAddr = errors.New("no endpoint for address") 41 ErrCoreNotSent = errors.New("message not sent") 42 ) 43 44 // CtxKey is a value-context key 45 type CtxKey string 46 47 // ---------------------------------------------------------------------- 48 // EndpointRef is a reference to an endpoint instance managed by core. 49 type EndpointRef struct { 50 id string // endpoint identifier in configuration 51 ep transport.Endpoint // reference to endpoint 52 addr *util.Address // public endpoint address 53 upnpID string // UPNP identifier (empty if unused) 54 } 55 56 // ---------------------------------------------------------------------- 57 // Core service 58 type Core struct { 59 // local peer instance 60 local *Peer 61 62 // incoming messages from transport 63 incoming chan *transport.Message 64 65 // reference to transport implementation 66 trans *transport.Transport 67 68 // registered signal listeners 69 listeners map[string]*Listener 70 71 // list of known peers with addresses 72 peers *util.PeerAddrList 73 74 // list of connected peers 75 connected *util.Map[string, bool] 76 77 // List of registered endpoints 78 endpoints map[string]*EndpointRef 79 } 80 81 //---------------------------------------------------------------------- 82 83 // NewCore creates and runs a new core instance. 84 func NewCore(ctx context.Context, node *config.NodeConfig) (c *Core, err error) { 85 // instantiate peer 86 var peer *Peer 87 if peer, err = NewLocalPeer(node); err != nil { 88 return 89 } 90 logger.Printf(logger.INFO, "[core] Local node is %s", peer.GetID().Short()) 91 92 // create new core instance 93 incoming := make(chan *transport.Message) 94 c = &Core{ 95 local: peer, 96 incoming: incoming, 97 listeners: make(map[string]*Listener), 98 trans: transport.NewTransport(ctx, node.Name, incoming), 99 peers: util.NewPeerAddrList(), 100 connected: util.NewMap[string, bool](), 101 endpoints: make(map[string]*EndpointRef), 102 } 103 // add all local peer endpoints to transport. 104 for _, epCfg := range node.Endpoints { 105 var ( 106 upnpID string // upnp identifier 107 local *util.Address // local address 108 remote *util.Address // remote address 109 ep transport.Endpoint // endpoint reference 110 ) 111 // handle special addresses: 112 if strings.HasPrefix(epCfg.Address, "upnp:") { 113 // don't allow dynamic port assignment 114 if epCfg.Port == 0 { 115 err = ErrCoreNoUpnpDyn 116 return 117 } 118 // handle UPNP port forwarding 119 protocol := transport.EpProtocol(epCfg.Network) 120 var localA, remoteA string 121 if upnpID, remoteA, localA, err = c.trans.ForwardOpen(protocol, epCfg.Address[5:], epCfg.Port); err != nil { 122 return 123 } 124 // parse local and remote addresses 125 if local, err = util.ParseAddress(epCfg.Network + "://" + localA); err != nil { 126 return 127 } 128 if remote, err = util.ParseAddress(epCfg.Network + "://" + remoteA); err != nil { 129 return 130 } 131 } else { 132 // direct address specification: 133 if local, err = util.ParseAddress(epCfg.Addr()); err != nil { 134 return 135 } 136 remote = local 137 upnpID = "" 138 } 139 // add endpoint for address 140 if ep, err = c.trans.AddEndpoint(ctx, local); err != nil { 141 return 142 } 143 // if port is set to 0, replace it with port assigned dynamically. 144 // only applies to direct listening addresses! 145 if epCfg.Port == 0 && local == remote { 146 addr := ep.Address() 147 if remote, err = util.ParseAddress(addr.Network() + "://" + addr.String()); err != nil { 148 return 149 } 150 } 151 // save endpoint reference 152 c.endpoints[epCfg.ID] = &EndpointRef{ 153 id: epCfg.ID, 154 ep: ep, 155 addr: remote, 156 upnpID: upnpID, 157 } 158 } 159 // run message pump 160 go c.pump(ctx) 161 return 162 } 163 164 // message pump for core 165 func (c *Core) pump(ctx context.Context) { 166 // wait for incoming messages 167 for { 168 select { 169 // get (next) message from transport 170 case tm := <-c.incoming: 171 logger.Printf(logger.DBG, "[core] Message received from %s: %s", tm.Peer.Short(), tm.Msg) 172 173 // check if peer is already connected (has an entry in PeerAddrist) 174 _, connected := c.connected.Get(tm.Peer.String(), 0) 175 if !connected { 176 // no: mark connected 177 c.connected.Put(tm.Peer.String(), true, 0) 178 // generate EV_CONNECT event 179 c.dispatch(&Event{ 180 ID: EV_CONNECT, 181 Peer: tm.Peer, 182 }) 183 // grace period for connection signal 184 time.Sleep(time.Second) 185 } 186 187 // set default responder (core) if no custom responder 188 // is defined by the receiving endpoint. 189 resp := tm.Resp 190 if resp == nil { 191 resp = &transport.TransportResponder{ 192 Peer: tm.Peer, 193 SendFcn: c.Send, 194 } 195 } 196 // generate EV_MESSAGE event 197 c.dispatch(&Event{ 198 ID: EV_MESSAGE, 199 Peer: tm.Peer, 200 Msg: tm.Msg, 201 Resp: resp, 202 }) 203 204 // wait for termination 205 case <-ctx.Done(): 206 return 207 } 208 } 209 } 210 211 // Shutdown all core-related processes. 212 func (c *Core) Shutdown() { 213 c.trans.Shutdown() 214 c.local.Shutdown() 215 } 216 217 //---------------------------------------------------------------------- 218 219 // Send is a function that allows the local peer to send a protocol 220 // message to a remote peer. 221 func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg message.Message) (err error) { 222 // assemble log label 223 label := "core" 224 if v := ctx.Value(CtxKey("label")); v != nil { 225 if s, ok := v.(string); ok && len(s) > 0 { 226 label = s 227 } 228 } 229 230 // TODO: select best endpoint protocol for transport; now fixed to IP+UDP 231 netw := "ip+udp" 232 233 // try all addresses for peer 234 aList := c.peers.Get(peer, netw) 235 maybe := false // message may be sent... 236 for _, addr := range aList { 237 logger.Printf(logger.INFO, "[%s] Trying to send to %s", label, addr.URI()) 238 // send message to address 239 if err = c.SendToAddr(ctx, addr, msg); err != nil { 240 // if it is possible that the message was not sent, try next address 241 if err != transport.ErrEndpMaybeSent { 242 logger.Printf(logger.WARN, "[%s] Failed to send to %s: %s", label, addr.URI(), err.Error()) 243 } else { 244 maybe = true 245 } 246 continue 247 } 248 // one successful send is enough 249 return 250 } 251 if maybe { 252 err = nil 253 } else { 254 err = ErrCoreNotSent 255 } 256 return 257 } 258 259 // SendToAddr message directly to address 260 func (c *Core) SendToAddr(ctx context.Context, addr *util.Address, msg message.Message) error { 261 // assemble transport message 262 tm := transport.NewTransportMessage(c.PeerID(), msg) 263 // send on transport 264 return c.trans.Send(ctx, addr, tm) 265 } 266 267 // Learn (new) addresses for peer 268 func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addrs []*util.Address, label string) (newPeer bool) { 269 logger.Printf(logger.DBG, "[%s] Learning %v for %s", label, addrs, peer.Short()) 270 271 // learn all addresses for peer 272 newPeer = false 273 for _, addr := range addrs { 274 // filter out addresses we can't handle (including local addresses) 275 if !transport.CanHandleAddress(addr) { 276 continue 277 } 278 // learn address 279 logger.Printf(logger.INFO, "[%s] Learning %s for %s (expires %s)", 280 label, addr.URI(), peer.Short(), addr.Expire) 281 newPeer = (c.peers.Add(peer, addr) == 1) || newPeer 282 } 283 return 284 } 285 286 // Addresses returns the list of listening endpoint addresses 287 func (c *Core) Addresses() (list []*util.Address, err error) { 288 for _, epRef := range c.endpoints { 289 list = append(list, epRef.addr) 290 } 291 return 292 } 293 294 //---------------------------------------------------------------------- 295 296 // Peer returns the local peer 297 func (c *Core) Peer() *Peer { 298 return c.local 299 } 300 301 // PeerID returns the peer id of the local node. 302 func (c *Core) PeerID() *util.PeerID { 303 return c.local.GetID() 304 } 305 306 //---------------------------------------------------------------------- 307 308 // Sign a signable onject with private peer key 309 func (c *Core) Sign(obj crypto.Signable) error { 310 sd := obj.SignedData() 311 sig, err := c.local.prv.EdSign(sd) 312 if err != nil { 313 return err 314 } 315 return obj.SetSignature(util.NewPeerSignature(sig.Bytes())) 316 } 317 318 //---------------------------------------------------------------------- 319 320 // TryConnect is a function which allows the local peer to attempt the 321 // establishment of a connection to another peer using an address. 322 // When the connection attempt is successful, information on the new 323 // peer is offered through the PEER_CONNECTED signal. 324 func (c *Core) TryConnect(peer *util.PeerID, addr net.Addr) error { 325 // TODO: 326 return nil 327 } 328 329 // Hold is a function which tells the underlay to keep a hold on to a 330 // connection to a peer P. Underlays are usually limited in the number 331 // of active connections. With this function the DHT can indicate to the 332 // underlay which connections should preferably be preserved. 333 func (c *Core) Hold(peer *util.PeerID) { 334 // TODO: 335 } 336 337 // Drop is a function which tells the underlay to drop the connection to a 338 // peer P. This function is only there for symmetry and used during the 339 // peer's shutdown to release all of the remaining HOLDs. As R5N always 340 // prefers the longest-lived connections, it would never drop an active 341 // connection that it has called HOLD() on before. Nevertheless, underlay 342 // implementations should not rely on this always being true. A call to 343 // DROP() also does not imply that the underlay must close the connection: 344 // it merely removes the preference to preserve the connection that was 345 // established by HOLD(). 346 func (c *Core) Drop(peer *util.PeerID) { 347 // TODO: 348 } 349 350 //---------------------------------------------------------------------- 351 // Event listener and event dispatch. 352 //---------------------------------------------------------------------- 353 354 // Register a named event listener. 355 func (c *Core) Register(name string, l *Listener) { 356 c.listeners[name] = l 357 } 358 359 // Unregister named event listener. 360 func (c *Core) Unregister(name string) *Listener { 361 if l, ok := c.listeners[name]; ok { 362 delete(c.listeners, name) 363 return l 364 } 365 return nil 366 } 367 368 // internal: dispatch event to listeners 369 func (c *Core) dispatch(ev *Event) { 370 // dispatch event to listeners 371 for _, l := range c.listeners { 372 if l.filter.CheckEvent(ev.ID) { 373 if ev.ID == EV_MESSAGE { 374 mt := ev.Msg.Type() 375 if mt != 0 && !l.filter.CheckMsgType(mt) { 376 // skip event 377 return 378 } 379 } 380 go func() { 381 l.ch <- ev 382 }() 383 } 384 } 385 }