gnunet-go

GNUnet Bindings for Go
Log | Files | Refs | README | LICENSE

core.go (11206B)


      1 // This file is part of gnunet-go, a GNUnet-implementation in Golang.
      2 // Copyright (C) 2019-2022 Bernd Fix  >Y<
      3 //
      4 // gnunet-go is free software: you can redistribute it and/or modify it
      5 // under the terms of the GNU Affero General Public License as published
      6 // by the Free Software Foundation, either version 3 of the License,
      7 // or (at your option) any later version.
      8 //
      9 // gnunet-go is distributed in the hope that it will be useful, but
     10 // WITHOUT ANY WARRANTY; without even the implied warranty of
     11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     12 // Affero General Public License for more details.
     13 //
     14 // You should have received a copy of the GNU Affero General Public License
     15 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
     16 //
     17 // SPDX-License-Identifier: AGPL3.0-or-later
     18 
     19 package core
     20 
     21 import (
     22 	"context"
     23 	"errors"
     24 	"gnunet/config"
     25 	"gnunet/crypto"
     26 	"gnunet/message"
     27 	"gnunet/transport"
     28 	"gnunet/util"
     29 	"net"
     30 	"strings"
     31 	"time"
     32 
     33 	"github.com/bfix/gospel/logger"
     34 )
     35 
     36 // ----------------------------------------------------------------------
     37 // Core-related error codes
     38 var (
     39 	ErrCoreNoUpnpDyn  = errors.New("no dynamic port with UPnP")
     40 	ErrCoreNoEndpAddr = errors.New("no endpoint for address")
     41 	ErrCoreNotSent    = errors.New("message not sent")
     42 )
     43 
     44 // CtxKey is a value-context key
     45 type CtxKey string
     46 
     47 // ----------------------------------------------------------------------
     48 // EndpointRef is a reference to an endpoint instance managed by core.
     49 type EndpointRef struct {
     50 	id     string             // endpoint identifier in configuration
     51 	ep     transport.Endpoint // reference to endpoint
     52 	addr   *util.Address      // public endpoint address
     53 	upnpID string             // UPNP identifier (empty if unused)
     54 }
     55 
     56 // ----------------------------------------------------------------------
     57 // Core service
     58 type Core struct {
     59 	// local peer instance
     60 	local *Peer
     61 
     62 	// incoming messages from transport
     63 	incoming chan *transport.Message
     64 
     65 	// reference to transport implementation
     66 	trans *transport.Transport
     67 
     68 	// registered signal listeners
     69 	listeners map[string]*Listener
     70 
     71 	// list of known peers with addresses
     72 	peers *util.PeerAddrList
     73 
     74 	// list of connected peers
     75 	connected *util.Map[string, bool]
     76 
     77 	// List of registered endpoints
     78 	endpoints map[string]*EndpointRef
     79 }
     80 
     81 //----------------------------------------------------------------------
     82 
     83 // NewCore creates and runs a new core instance.
     84 func NewCore(ctx context.Context, node *config.NodeConfig) (c *Core, err error) {
     85 	// instantiate peer
     86 	var peer *Peer
     87 	if peer, err = NewLocalPeer(node); err != nil {
     88 		return
     89 	}
     90 	logger.Printf(logger.INFO, "[core] Local node is %s", peer.GetID().Short())
     91 
     92 	// create new core instance
     93 	incoming := make(chan *transport.Message)
     94 	c = &Core{
     95 		local:     peer,
     96 		incoming:  incoming,
     97 		listeners: make(map[string]*Listener),
     98 		trans:     transport.NewTransport(ctx, node.Name, incoming),
     99 		peers:     util.NewPeerAddrList(),
    100 		connected: util.NewMap[string, bool](),
    101 		endpoints: make(map[string]*EndpointRef),
    102 	}
    103 	// add all local peer endpoints to transport.
    104 	for _, epCfg := range node.Endpoints {
    105 		var (
    106 			upnpID string             // upnp identifier
    107 			local  *util.Address      // local address
    108 			remote *util.Address      // remote address
    109 			ep     transport.Endpoint // endpoint reference
    110 		)
    111 		// handle special addresses:
    112 		if strings.HasPrefix(epCfg.Address, "upnp:") {
    113 			// don't allow dynamic port assignment
    114 			if epCfg.Port == 0 {
    115 				err = ErrCoreNoUpnpDyn
    116 				return
    117 			}
    118 			// handle UPNP port forwarding
    119 			protocol := transport.EpProtocol(epCfg.Network)
    120 			var localA, remoteA string
    121 			if upnpID, remoteA, localA, err = c.trans.ForwardOpen(protocol, epCfg.Address[5:], epCfg.Port); err != nil {
    122 				return
    123 			}
    124 			// parse local and remote addresses
    125 			if local, err = util.ParseAddress(epCfg.Network + "://" + localA); err != nil {
    126 				return
    127 			}
    128 			if remote, err = util.ParseAddress(epCfg.Network + "://" + remoteA); err != nil {
    129 				return
    130 			}
    131 		} else {
    132 			// direct address specification:
    133 			if local, err = util.ParseAddress(epCfg.Addr()); err != nil {
    134 				return
    135 			}
    136 			remote = local
    137 			upnpID = ""
    138 		}
    139 		// add endpoint for address
    140 		if ep, err = c.trans.AddEndpoint(ctx, local); err != nil {
    141 			return
    142 		}
    143 		// if port is set to 0, replace it with port assigned dynamically.
    144 		// only applies to direct listening addresses!
    145 		if epCfg.Port == 0 && local == remote {
    146 			addr := ep.Address()
    147 			if remote, err = util.ParseAddress(addr.Network() + "://" + addr.String()); err != nil {
    148 				return
    149 			}
    150 		}
    151 		// save endpoint reference
    152 		c.endpoints[epCfg.ID] = &EndpointRef{
    153 			id:     epCfg.ID,
    154 			ep:     ep,
    155 			addr:   remote,
    156 			upnpID: upnpID,
    157 		}
    158 	}
    159 	// run message pump
    160 	go c.pump(ctx)
    161 	return
    162 }
    163 
    164 // message pump for core
    165 func (c *Core) pump(ctx context.Context) {
    166 	// wait for incoming messages
    167 	for {
    168 		select {
    169 		// get (next) message from transport
    170 		case tm := <-c.incoming:
    171 			logger.Printf(logger.DBG, "[core] Message received from %s: %s", tm.Peer.Short(), tm.Msg)
    172 
    173 			// check if peer is already connected (has an entry in PeerAddrist)
    174 			_, connected := c.connected.Get(tm.Peer.String(), 0)
    175 			if !connected {
    176 				// no: mark connected
    177 				c.connected.Put(tm.Peer.String(), true, 0)
    178 				// generate EV_CONNECT event
    179 				c.dispatch(&Event{
    180 					ID:   EV_CONNECT,
    181 					Peer: tm.Peer,
    182 				})
    183 				// grace period for connection signal
    184 				time.Sleep(time.Second)
    185 			}
    186 
    187 			// set default responder (core) if no custom responder
    188 			// is defined by the receiving endpoint.
    189 			resp := tm.Resp
    190 			if resp == nil {
    191 				resp = &transport.TransportResponder{
    192 					Peer:    tm.Peer,
    193 					SendFcn: c.Send,
    194 				}
    195 			}
    196 			// generate EV_MESSAGE event
    197 			c.dispatch(&Event{
    198 				ID:   EV_MESSAGE,
    199 				Peer: tm.Peer,
    200 				Msg:  tm.Msg,
    201 				Resp: resp,
    202 			})
    203 
    204 		// wait for termination
    205 		case <-ctx.Done():
    206 			return
    207 		}
    208 	}
    209 }
    210 
    211 // Shutdown all core-related processes.
    212 func (c *Core) Shutdown() {
    213 	c.trans.Shutdown()
    214 	c.local.Shutdown()
    215 }
    216 
    217 //----------------------------------------------------------------------
    218 
    219 // Send is a function that allows the local peer to send a protocol
    220 // message to a remote peer.
    221 func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg message.Message) (err error) {
    222 	// assemble log label
    223 	label := "core"
    224 	if v := ctx.Value(CtxKey("label")); v != nil {
    225 		if s, ok := v.(string); ok && len(s) > 0 {
    226 			label = s
    227 		}
    228 	}
    229 
    230 	// TODO: select best endpoint protocol for transport; now fixed to IP+UDP
    231 	netw := "ip+udp"
    232 
    233 	// try all addresses for peer
    234 	aList := c.peers.Get(peer, netw)
    235 	maybe := false // message may be sent...
    236 	for _, addr := range aList {
    237 		logger.Printf(logger.INFO, "[%s] Trying to send to %s", label, addr.URI())
    238 		// send message to address
    239 		if err = c.SendToAddr(ctx, addr, msg); err != nil {
    240 			// if it is possible that the message was not sent, try next address
    241 			if err != transport.ErrEndpMaybeSent {
    242 				logger.Printf(logger.WARN, "[%s] Failed to send to %s: %s", label, addr.URI(), err.Error())
    243 			} else {
    244 				maybe = true
    245 			}
    246 			continue
    247 		}
    248 		// one successful send is enough
    249 		return
    250 	}
    251 	if maybe {
    252 		err = nil
    253 	} else {
    254 		err = ErrCoreNotSent
    255 	}
    256 	return
    257 }
    258 
    259 // SendToAddr message directly to address
    260 func (c *Core) SendToAddr(ctx context.Context, addr *util.Address, msg message.Message) error {
    261 	// assemble transport message
    262 	tm := transport.NewTransportMessage(c.PeerID(), msg)
    263 	// send on transport
    264 	return c.trans.Send(ctx, addr, tm)
    265 }
    266 
    267 // Learn (new) addresses for peer
    268 func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addrs []*util.Address, label string) (newPeer bool) {
    269 	logger.Printf(logger.DBG, "[%s] Learning %v for %s", label, addrs, peer.Short())
    270 
    271 	// learn all addresses for peer
    272 	newPeer = false
    273 	for _, addr := range addrs {
    274 		// filter out addresses we can't handle (including local addresses)
    275 		if !transport.CanHandleAddress(addr) {
    276 			continue
    277 		}
    278 		// learn address
    279 		logger.Printf(logger.INFO, "[%s] Learning %s for %s (expires %s)",
    280 			label, addr.URI(), peer.Short(), addr.Expire)
    281 		newPeer = (c.peers.Add(peer, addr) == 1) || newPeer
    282 	}
    283 	return
    284 }
    285 
    286 // Addresses returns the list of listening endpoint addresses
    287 func (c *Core) Addresses() (list []*util.Address, err error) {
    288 	for _, epRef := range c.endpoints {
    289 		list = append(list, epRef.addr)
    290 	}
    291 	return
    292 }
    293 
    294 //----------------------------------------------------------------------
    295 
    296 // Peer returns the local peer
    297 func (c *Core) Peer() *Peer {
    298 	return c.local
    299 }
    300 
    301 // PeerID returns the peer id of the local node.
    302 func (c *Core) PeerID() *util.PeerID {
    303 	return c.local.GetID()
    304 }
    305 
    306 //----------------------------------------------------------------------
    307 
    308 // Sign a signable onject with private peer key
    309 func (c *Core) Sign(obj crypto.Signable) error {
    310 	sd := obj.SignedData()
    311 	sig, err := c.local.prv.EdSign(sd)
    312 	if err != nil {
    313 		return err
    314 	}
    315 	return obj.SetSignature(util.NewPeerSignature(sig.Bytes()))
    316 }
    317 
    318 //----------------------------------------------------------------------
    319 
    320 // TryConnect is a function which allows the local peer to attempt the
    321 // establishment of a connection to another peer using an address.
    322 // When the connection attempt is successful, information on the new
    323 // peer is offered through the PEER_CONNECTED signal.
    324 func (c *Core) TryConnect(peer *util.PeerID, addr net.Addr) error {
    325 	// TODO:
    326 	return nil
    327 }
    328 
    329 // Hold is a function which tells the underlay to keep a hold on to a
    330 // connection to a peer P. Underlays are usually limited in the number
    331 // of active connections. With this function the DHT can indicate to the
    332 // underlay which connections should preferably be preserved.
    333 func (c *Core) Hold(peer *util.PeerID) {
    334 	// TODO:
    335 }
    336 
    337 // Drop is a function which tells the underlay to drop the connection to a
    338 // peer P. This function is only there for symmetry and used during the
    339 // peer's shutdown to release all of the remaining HOLDs. As R5N always
    340 // prefers the longest-lived connections, it would never drop an active
    341 // connection that it has called HOLD() on before. Nevertheless, underlay
    342 // implementations should not rely on this always being true. A call to
    343 // DROP() also does not imply that the underlay must close the connection:
    344 // it merely removes the preference to preserve the connection that was
    345 // established by HOLD().
    346 func (c *Core) Drop(peer *util.PeerID) {
    347 	// TODO:
    348 }
    349 
    350 //----------------------------------------------------------------------
    351 // Event listener and event dispatch.
    352 //----------------------------------------------------------------------
    353 
    354 // Register a named event listener.
    355 func (c *Core) Register(name string, l *Listener) {
    356 	c.listeners[name] = l
    357 }
    358 
    359 // Unregister named event listener.
    360 func (c *Core) Unregister(name string) *Listener {
    361 	if l, ok := c.listeners[name]; ok {
    362 		delete(c.listeners, name)
    363 		return l
    364 	}
    365 	return nil
    366 }
    367 
    368 // internal: dispatch event to listeners
    369 func (c *Core) dispatch(ev *Event) {
    370 	// dispatch event to listeners
    371 	for _, l := range c.listeners {
    372 		if l.filter.CheckEvent(ev.ID) {
    373 			if ev.ID == EV_MESSAGE {
    374 				mt := ev.Msg.Type()
    375 				if mt != 0 && !l.filter.CheckMsgType(mt) {
    376 					// skip event
    377 					return
    378 				}
    379 			}
    380 			go func() {
    381 				l.ch <- ev
    382 			}()
    383 		}
    384 	}
    385 }