gnunet-go

GNUnet Bindings for Go
Log | Files | Refs | README | LICENSE

commit 7a6ae8f61ee7efde161db98462259ea9bbb23386
parent de577428a69a0002c3194afdf0562cf5a4dc1bdc
Author: Bernd Fix <brf@hoi-polloi.org>
Date:   Tue,  7 Jun 2022 09:31:22 +0200

Improved transport and module code.

Diffstat:
Asrc/gnunet/cmd/gnunet-service-dht-go/main.go | 148+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Dsrc/gnunet/cmd/gnunet-service-dht-test-go/main.go | 152-------------------------------------------------------------------------------
Msrc/gnunet/cmd/peer_mockup/main.go | 19+++++++++++--------
Msrc/gnunet/config/config.go | 30+++++++++++++++++++++++++++---
Msrc/gnunet/config/config_test.go | 19++++++++++++-------
Msrc/gnunet/config/gnunet-config.json | 119++++++++++++++++++++++++++++++++++++++++++-------------------------------------
Msrc/gnunet/core/core.go | 202++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------
Msrc/gnunet/core/core_test.go | 203++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------
Msrc/gnunet/core/peer.go | 49++++++++-----------------------------------------
Msrc/gnunet/core/peer_test.go | 23+++++++++++++++++++----
Msrc/gnunet/go.mod | 2++
Msrc/gnunet/go.sum | 2++
Msrc/gnunet/message/msg_hello.go | 2+-
Dsrc/gnunet/modules.go | 84-------------------------------------------------------------------------------
Msrc/gnunet/service/dht/blocks/hello.go | 2+-
Msrc/gnunet/service/dht/module.go | 18++++++++++++++++--
Msrc/gnunet/service/dht/routingtable.go | 57+++++++++++++++++++++++++++++++++++++++++++--------------
Msrc/gnunet/service/dht/routingtable_test.go | 8++++++--
Msrc/gnunet/service/gns/module.go | 18++++++++++++++++++
Msrc/gnunet/service/module.go | 36++++++++++++++++++++++++++++++++++++
Msrc/gnunet/service/namecache/module.go | 26+++++++++++++++++++++-----
Msrc/gnunet/service/revocation/module.go | 18++++++++++++++++--
Msrc/gnunet/transport/endpoint.go | 168+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------
Msrc/gnunet/transport/reader_writer.go | 5+----
Msrc/gnunet/transport/transport.go | 123+++++++++++++++++++++++++++++++++++++++++++++----------------------------------
Msrc/gnunet/util/address.go | 21++++++++++++---------
Msrc/gnunet/util/database.go | 4++--
Msrc/gnunet/util/misc.go | 80+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------
Msrc/gnunet/util/peer_id.go | 26+++++++++++---------------
29 files changed, 1053 insertions(+), 611 deletions(-)

diff --git a/src/gnunet/cmd/gnunet-service-dht-go/main.go b/src/gnunet/cmd/gnunet-service-dht-go/main.go @@ -0,0 +1,148 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package main + +import ( + "context" + "flag" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "gnunet/config" + "gnunet/core" + "gnunet/rpc" + "gnunet/service" + "gnunet/service/dht" + + "github.com/bfix/gospel/logger" +) + +func main() { + defer func() { + logger.Println(logger.INFO, "[dht] Bye.") + // flush last messages + logger.Flush() + }() + logger.Println(logger.INFO, "[dht] Starting service...") + + var ( + cfgFile string + socket string + param string + err error + logLevel int + rpcEndp string + ) + // handle command line arguments + flag.StringVar(&cfgFile, "c", "gnunet-config.json", "GNUnet configuration file") + flag.StringVar(&socket, "s", "", "GNS service socket") + flag.StringVar(&param, "p", "", "socket parameters (<key>=<value>,...)") + flag.IntVar(&logLevel, "L", logger.INFO, "DHT log level (default: INFO)") + flag.StringVar(&rpcEndp, "R", "", "JSON-RPC endpoint (default: none)") + flag.Parse() + + // read configuration file and set missing arguments. + if err = config.ParseConfig(cfgFile); err != nil { + logger.Printf(logger.ERROR, "[dht] Invalid configuration file: %s\n", err.Error()) + return + } + + // apply configuration + logger.SetLogLevel(logLevel) + if len(socket) == 0 { + socket = config.Cfg.GNS.Service.Socket + } + params := make(map[string]string) + if len(param) == 0 { + for _, p := range strings.Split(param, ",") { + kv := strings.SplitN(p, "=", 2) + params[kv[0]] = kv[1] + } + } else { + params = config.Cfg.GNS.Service.Params + } + + // instantiate core service + ctx, cancel := context.WithCancel(context.Background()) + var c *core.Core + if c, err = core.NewCore(ctx, config.Cfg.Local); err != nil { + logger.Printf(logger.ERROR, "[dht] core failed: %s\n", err.Error()) + return + } + defer c.Shutdown() + + // start a new DHT service + dht := dht.NewService(ctx, c) + srv := service.NewSocketHandler("dht", dht) + if err = srv.Start(ctx, socket, params); err != nil { + logger.Printf(logger.ERROR, "[dht] Failed to start DHT service: '%s'", err.Error()) + return + } + + // start JSON-RPC server on request + if len(rpcEndp) > 0 { + parts := strings.Split(rpcEndp, ":") + if parts[0] != "tcp" { + logger.Println(logger.ERROR, "[dht] RPC must have a TCP/IP endpoint") + return + } + config.Cfg.RPC.Endpoint = parts[1] + if err = rpc.Start(ctx); err != nil { + logger.Printf(logger.ERROR, "[dht] RPC failed to start: %s", err.Error()) + return + } + rpc.Register(dht) + } + + // handle OS signals + sigCh := make(chan os.Signal, 5) + signal.Notify(sigCh) + + // heart beat + tick := time.NewTicker(5 * time.Minute) + +loop: + for { + select { + // handle OS signals + case sig := <-sigCh: + switch sig { + case syscall.SIGKILL, syscall.SIGINT, syscall.SIGTERM: + logger.Printf(logger.INFO, "[dht] Terminating service (on signal '%s')\n", sig) + break loop + case syscall.SIGHUP: + logger.Println(logger.INFO, "[dht] SIGHUP") + case syscall.SIGURG: + // TODO: https://github.com/golang/go/issues/37942 + default: + logger.Println(logger.INFO, "[dht] Unhandled signal: "+sig.String()) + } + // handle heart beat + case now := <-tick.C: + logger.Println(logger.INFO, "[dht] Heart beat at "+now.String()) + } + } + + // terminating service + cancel() + srv.Stop() +} diff --git a/src/gnunet/cmd/gnunet-service-dht-test-go/main.go b/src/gnunet/cmd/gnunet-service-dht-test-go/main.go @@ -1,152 +0,0 @@ -// This file is part of gnunet-go, a GNUnet-implementation in Golang. -// Copyright (C) 2019-2022 Bernd Fix >Y< -// -// gnunet-go is free software: you can redistribute it and/or modify it -// under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// gnunet-go is distributed in the hope that it will be useful, but -// WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -// Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. -// -// SPDX-License-Identifier: AGPL3.0-or-later - -package main - -import ( - "context" - "flag" - "os" - "os/signal" - "strings" - "syscall" - "time" - - "gnunet/config" - "gnunet/core" - "gnunet/rpc" - "gnunet/service" - "gnunet/service/dht" - - "github.com/bfix/gospel/logger" -) - -func main() { - defer func() { - logger.Println(logger.INFO, "[dht] Bye.") - // flush last messages - logger.Flush() - }() - logger.Println(logger.INFO, "[dht] Starting service...") - - var ( - cfgFile string - socket string - param string - err error - logLevel int - rpcEndp string - ) - // handle command line arguments - flag.StringVar(&cfgFile, "c", "gnunet-config.json", "GNUnet configuration file") - flag.StringVar(&socket, "s", "", "GNS service socket") - flag.StringVar(&param, "p", "", "socket parameters (<key>=<value>,...)") - flag.IntVar(&logLevel, "L", logger.INFO, "DHT log level (default: INFO)") - flag.StringVar(&rpcEndp, "R", "", "JSON-RPC endpoint (default: none)") - flag.Parse() - - // read configuration file and set missing arguments. - if err = config.ParseConfig(cfgFile); err != nil { - logger.Printf(logger.ERROR, "[dht] Invalid configuration file: %s\n", err.Error()) - return - } - - // apply configuration - logger.SetLogLevel(logLevel) - if len(socket) == 0 { - socket = config.Cfg.GNS.Service.Socket - } - params := make(map[string]string) - if len(param) == 0 { - for _, p := range strings.Split(param, ",") { - kv := strings.SplitN(p, "=", 2) - params[kv[0]] = kv[1] - } - } else { - params = config.Cfg.GNS.Service.Params - } - - // instantiate core service - ctx, cancel := context.WithCancel(context.Background()) - var local *core.Peer - if local, err = core.NewLocalPeer(config.Cfg.Local); err != nil { - logger.Printf(logger.ERROR, "[dht] No local peer: %s\n", err.Error()) - return - } - var c *core.Core - if c, err = core.NewCore(ctx, local); err != nil { - logger.Printf(logger.ERROR, "[dht] core failed: %s\n", err.Error()) - return - } - - // start a new DHT service - dht := dht.NewService(ctx, c) - srv := service.NewSocketHandler("dht", dht) - if err = srv.Start(ctx, socket, params); err != nil { - logger.Printf(logger.ERROR, "[dht] Failed to start DHT service: '%s'", err.Error()) - return - } - - // start JSON-RPC server on request - if len(rpcEndp) > 0 { - parts := strings.Split(rpcEndp, ":") - if parts[0] != "tcp" { - logger.Println(logger.ERROR, "[dht] RPC must have a TCP/IP endpoint") - return - } - config.Cfg.RPC.Endpoint = parts[1] - if err = rpc.Start(ctx); err != nil { - logger.Printf(logger.ERROR, "[dht] RPC failed to start: %s", err.Error()) - return - } - rpc.Register(dht) - } - - // handle OS signals - sigCh := make(chan os.Signal, 5) - signal.Notify(sigCh) - - // heart beat - tick := time.NewTicker(5 * time.Minute) - -loop: - for { - select { - // handle OS signals - case sig := <-sigCh: - switch sig { - case syscall.SIGKILL, syscall.SIGINT, syscall.SIGTERM: - logger.Printf(logger.INFO, "[dht] Terminating service (on signal '%s')\n", sig) - break loop - case syscall.SIGHUP: - logger.Println(logger.INFO, "[dht] SIGHUP") - case syscall.SIGURG: - // TODO: https://github.com/golang/go/issues/37942 - default: - logger.Println(logger.INFO, "[dht] Unhandled signal: "+sig.String()) - } - // handle heart beat - case now := <-tick.C: - logger.Println(logger.INFO, "[dht] Heart beat at "+now.String()) - } - } - - // terminating service - cancel() - srv.Stop() -} diff --git a/src/gnunet/cmd/peer_mockup/main.go b/src/gnunet/cmd/peer_mockup/main.go @@ -22,13 +22,19 @@ var ( // configuration for local node localCfg = &config.NodeConfig{ PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", - Endpoints: []string{ - "udp:127.0.0.1:2086", + Endpoints: []*config.EndpointConfig{ + { + ID: "local", + Network: "udp", + Address: "127.0.0.1", + Port: 2086, + TTL: 86400, + }, }, } // configuration for remote node remoteCfg = "3GXXMNb5YpIUO7ejIR2Yy0Cf5texuLfDjHkXcqbPxkc=" - remoteAddr = "udp:172.17.0.5:2086" + remoteAddr = "udp://172.17.0.5:2086" // top-level variables used accross functions local *core.Peer // local peer (with private key) @@ -50,14 +56,11 @@ func main() { flag.Parse() // setup peer and core instances - if local, err = core.NewLocalPeer(localCfg); err != nil { - fmt.Println("local failed: " + err.Error()) - return - } - if c, err = core.NewCore(ctx, local); err != nil { + if c, err = core.NewCore(ctx, localCfg); err != nil { fmt.Println("core failed: " + err.Error()) return } + local = c.Peer() if remote, err = core.NewPeer(remoteCfg); err != nil { fmt.Println("remote failed: " + err.Error()) return diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go @@ -20,6 +20,7 @@ package config import ( "encoding/json" + "fmt" "io/ioutil" "reflect" "regexp" @@ -32,10 +33,26 @@ import ( // Configuration for local node //---------------------------------------------------------------------- +// EndpointConfig holds parameters for local network listeners. +type EndpointConfig struct { + ID string `json:"id"` // endpoint identifier + Network string `json:"network"` // network protocol to use on endpoint + Address string `json:"address"` // address to listen on + Port int `json:"port"` // port for listening to network + TTL int `json:"ttl"` // time-to-live for address (in seconds) +} + +// Addr returns an address string for endpoint configuration; it does NOT +// handle special cases like UPNP and such. +func (c *EndpointConfig) Addr() string { + return fmt.Sprintf("%s://%s:%d", c.Network, c.Address, c.Port) +} + // NodeConfig holds parameters for the local node instance type NodeConfig struct { - PrivateSeed string `json:"privateSeed"` // Node private key seed (base64) - Endpoints []string `json:"endpoints"` // list of endpoints available + Name string `json:"name"` // (short) name for local node + PrivateSeed string `json:"privateSeed"` // Node private key seed (base64) + Endpoints []*EndpointConfig `json:"endpoints"` // list of endpoints available } //---------------------------------------------------------------------- @@ -139,9 +156,16 @@ func ParseConfig(fileName string) (err error) { if err != nil { return } + return ParseConfigBytes(file, true) +} + +// ParseConfigBytes reads a configuration from binary data. The data is +// a JSON-encoded content. If 'subst' is true, the configuration strings +// are subsituted +func ParseConfigBytes(data []byte, subst bool) (err error) { // unmarshal to Config data structure Cfg = new(Config) - if err = json.Unmarshal(file, Cfg); err == nil { + if err = json.Unmarshal(data, Cfg); err == nil { // process all string-based config settings and apply // string substitutions. applySubstitutions(Cfg, Cfg.Env) diff --git a/src/gnunet/config/config_test.go b/src/gnunet/config/config_test.go @@ -20,6 +20,7 @@ package config import ( "encoding/json" + "io/ioutil" "testing" "github.com/bfix/gospel/logger" @@ -27,14 +28,18 @@ import ( func TestConfigRead(t *testing.T) { logger.SetLogLevel(logger.WARN) - if err := ParseConfig("./gnunet-config.json"); err != nil { + + // read configuration file + data, err := ioutil.ReadFile("./gnunet-config.json") + if err != nil { + t.Fatal(err) + } + // parse configuration + if err := ParseConfigBytes(data, false); err != nil { t.Fatal(err) } - if testing.Verbose() { - data, err := json.Marshal(Cfg) - if err != nil { - t.Fatal(err) - } - t.Log("cfg=" + string(data)) + // write configuration + if _, err = json.Marshal(Cfg); err != nil { + t.Fatal(err) } } diff --git a/src/gnunet/config/gnunet-config.json b/src/gnunet/config/gnunet-config.json @@ -1,58 +1,65 @@ { - "local": { - "privateSeed": "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", - "endpoints": [ - "r5n+ip+udp:127.0.0.1:6666" - ] - }, - "bootstrap": { - "nodes": [ - "gnunet://hello/7KTBJ90340HF1Q2GB0A57E2XJER4FDHX8HP5GHEB9125VPWPD27G/BNMDFN6HJCPWSPNBSEC06MC1K8QN1Z2DHRQSRXDTFR7FTBD4JHNBJ2RJAAEZ31FWG1Q3PMN3PXGZQ3Q7NTNEKQZFA7TE2Y46FM8E20R/1653499308?r5n%2Bip%2Budp%3A127.0.0.1%3A7654" - ] - }, - "environ": { - "TMP": "/tmp", - "RT_SYS": "${TMP}/gnunet-system-runtime" - }, - "dht": { - "service": { - "socket": "${RT_SYS}/gnunet-service-dht.sock", - "params": { - "perm": "0770" - } - }, - "storage": "dht_file_store+/var/lib/gnunet/dht/store", - "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000" - }, - "gns": { - "service": { - "socket": "${RT_SYS}/gnunet-service-gns-go.sock", - "params": { - "perm": "0770" - } - }, - "dhtReplLevel": 10, - "maxDepth": 250 - }, - "namecache": { - "service": { - "socket": "${RT_SYS}/gnunet-service-namecache.sock", - "params": { - "perm": "0770" - } - }, - "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000" - }, - "revocation": { - "service": { - "socket": "${RT_SYS}/gnunet-service-revocation-go.sock", - "params": { - "perm": "0770" - } - }, - "storage": "redis:localhost:6397::15" - }, - "rpc": { - "endpoint": "tcp:127.0.0.1:80" - } + "bootstrap": { + "nodes": [ + "gnunet://hello/7KTBJ90340HF1Q2GB0A57E2XJER4FDHX8HP5GHEB9125VPWPD27G/BNMDFN6HJCPWSPNBSEC06MC1K8QN1Z2DHRQSRXDTFR7FTBD4JHNBJ2RJAAEZ31FWG1Q3PMN3PXGZQ3Q7NTNEKQZFA7TE2Y46FM8E20R/1653499308?r5n%2Bip%2Budp%3A127.0.0.1%3A7654" + ] + }, + "local": { + "name": "ygng", + "privateSeed": "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", + "endpoints": [ + { + "id": "test", + "network": "ip+udp", + "address": "upnp:192.168.178.1", + "port": 6666, + "ttl": 86400 + } + ] + }, + "environ": { + "TMP": "/tmp", + "RT_SYS": "${TMP}/gnunet-system-runtime" + }, + "dht": { + "service": { + "socket": "${RT_SYS}/gnunet-service-dht.sock", + "params": { + "perm": "0770" + } + }, + "storage": "dht_file_store+/var/lib/gnunet/dht/store", + "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000" + }, + "gns": { + "service": { + "socket": "${RT_SYS}/gnunet-service-gns-go.sock", + "params": { + "perm": "0770" + } + }, + "dhtReplLevel": 10, + "maxDepth": 250 + }, + "namecache": { + "service": { + "socket": "${RT_SYS}/gnunet-service-namecache.sock", + "params": { + "perm": "0770" + } + }, + "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000" + }, + "revocation": { + "service": { + "socket": "${RT_SYS}/gnunet-service-revocation-go.sock", + "params": { + "perm": "0770" + } + }, + "storage": "redis:localhost:6397::15" + }, + "rpc": { + "endpoint": "tcp:127.0.0.1:80" + } } \ No newline at end of file diff --git a/src/gnunet/core/core.go b/src/gnunet/core/core.go @@ -20,16 +20,34 @@ package core import ( "context" + "errors" + "gnunet/config" "gnunet/message" "gnunet/service/dht/blocks" "gnunet/transport" "gnunet/util" "net" + "strings" "time" +) - "github.com/bfix/gospel/data" +//---------------------------------------------------------------------- +// Core-related error codes +var ( + ErrCoreNoUpnpDyn = errors.New("no dynamic port with UPnP") + ErrCoreNoEndpAddr = errors.New("no endpoint for address") ) +//---------------------------------------------------------------------- +// EndpointRef is a reference to an endpoint instance managed by core. +type EndpointRef struct { + id string // endpoint identifier in configuration + ep transport.Endpoint // reference to endpoint + addr *util.Address // public endpoint address + upnpId string // UPNP identifier (empty if unused) +} + +//---------------------------------------------------------------------- // Core service type Core struct { // local peer instance @@ -41,31 +59,91 @@ type Core struct { // reference to transport implementation trans *transport.Transport - // registered listeners + // registered signal listeners listeners map[string]*Listener // list of known peers with addresses peers *util.PeerAddrList + + // List of registered endpoints + endpoints map[string]*EndpointRef } //---------------------------------------------------------------------- // NewCore creates and runs a new core instance. -func NewCore(ctx context.Context, local *Peer) (c *Core, err error) { +func NewCore(ctx context.Context, node *config.NodeConfig) (c *Core, err error) { + + // instantiate peer + var peer *Peer + if peer, err = NewLocalPeer(node); err != nil { + return + } // create new core instance incoming := make(chan *transport.TransportMessage) c = &Core{ - local: local, + local: peer, incoming: incoming, listeners: make(map[string]*Listener), - trans: transport.NewTransport(ctx, incoming), + trans: transport.NewTransport(ctx, node.Name, incoming), peers: util.NewPeerAddrList(), + endpoints: make(map[string]*EndpointRef), } // add all local peer endpoints to transport. - for _, addr := range local.addrList { - if _, err = c.trans.AddEndpoint(ctx, addr); err != nil { + for _, epCfg := range node.Endpoints { + var ( + upnpId string // upnp identifier + local *util.Address // local address + remote *util.Address // remote address + ep transport.Endpoint // endpoint reference + ) + // handle special addresses: + if strings.HasPrefix(epCfg.Address, "upnp:") { + // don't allow dynamic port assignment + if epCfg.Port == 0 { + err = ErrCoreNoUpnpDyn + return + } + // handle UPNP port forwarding + protocol := transport.EpProtocol(epCfg.Network) + var localA, remoteA string + if upnpId, remoteA, localA, err = c.trans.ForwardOpen(protocol, epCfg.Address[5:], epCfg.Port); err != nil { + return + } + // parse local and remote addresses + if local, err = util.ParseAddress(epCfg.Network + "://" + localA); err != nil { + return + } + if remote, err = util.ParseAddress(epCfg.Network + "://" + remoteA); err != nil { + return + } + } else { + // direct address specification: + if local, err = util.ParseAddress(epCfg.Addr()); err != nil { + return + } + remote = local + upnpId = "" + } + // add endpoint for address + if ep, err = c.trans.AddEndpoint(ctx, local); err != nil { return } + // if port is set to 0, replace it with port assigned dynamically. + // only applies to direct listening addresses! + if epCfg.Port == 0 && local == remote { + addr := ep.Address() + if remote, err = util.ParseAddress(addr.Network() + "://" + addr.String()); err != nil { + return + } + } + // save endpoint reference + c.endpoints[epCfg.ID] = &EndpointRef{ + id: epCfg.ID, + ep: ep, + addr: remote, + upnpId: upnpId, + } } // run message pump go func() { @@ -77,32 +155,31 @@ func NewCore(ctx context.Context, local *Peer) (c *Core, err error) { var ev *Event // inspect message for peer state events - m, err := tm.Message() - if err == nil { - switch msg := m.(type) { - case *message.HelloMsg: - // keep peer addresses - for _, addr := range msg.Addresses { - a := &util.Address{ - Netw: addr.Transport, - Address: addr.Address, - Expires: addr.ExpireOn, - } - c.Learn(ctx, msg.PeerID, a) + switch msg := tm.Msg.(type) { + case *message.HelloMsg: + // keep peer addresses + for _, addr := range msg.Addresses { + a := &util.Address{ + Netw: addr.Transport, + Address: addr.Address, + Expires: addr.ExpireOn, } - // generate EV_CONNECT event - ev = new(Event) - ev.ID = EV_CONNECT - ev.Peer = tm.Peer - ev.Msg = msg - c.dispatch(ev) + c.Learn(ctx, msg.PeerID, a) + } + // generate EV_CONNECT event + ev = &Event{ + ID: EV_CONNECT, + Peer: tm.Peer, + Msg: msg, } + c.dispatch(ev) } // generate EV_MESSAGE event - ev = new(Event) - ev.ID = EV_MESSAGE - ev.Peer = tm.Peer - ev.Msg, _ = tm.Message() + ev = &Event{ + ID: EV_MESSAGE, + Peer: tm.Peer, + Msg: tm.Msg, + } c.dispatch(ev) // wait for termination @@ -114,29 +191,63 @@ func NewCore(ctx context.Context, local *Peer) (c *Core, err error) { return } +// Shutdown all core-related processes. +func (c *Core) Shutdown() { + c.trans.Shutdown() + c.local.Shutdown() +} + //---------------------------------------------------------------------- // Send is a function that allows the local peer to send a protocol // message to a remote peer. func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg message.Message) error { - // TODO: select best endpoint protocol for transport; now fixed to UDP - netw := "udp" - addr := c.peers.Get(peer.String(), netw) - payload, err := data.Marshal(msg) - if err != nil { - return err + // TODO: select best endpoint protocol for transport; now fixed to IP+UDP + netw := "ip+udp" + addrs := c.peers.Get(peer.String(), netw) + if len(addrs) == 0 { + return ErrCoreNoEndpAddr } - tm := transport.NewTransportMessage(c.PeerID(), payload) + // TODO: select best address; curently selects first + addr := addrs[0] + + // select best endpoint for transport + var ep transport.Endpoint + for _, epCfg := range c.endpoints { + if epCfg.addr.Network() == netw { + if ep == nil { + ep = epCfg.ep + } + // TODO: compare endpoints, select better one: + // if ep.Better(epCfg.ep) { + // ep = epCfg.ep + // } + } + } + // check we have an endpoint to send on + if ep == nil { + return ErrCoreNoEndpAddr + } + // assemble transport message + tm := transport.NewTransportMessage(c.PeerID(), msg) + // send on transport return c.trans.Send(ctx, addr, tm) } // Learn a (new) address for peer func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) (err error) { if c.peers.Add(peer.String(), addr) == 1 { + // we added a previously unknown peer: send a HELLO + + // collect endpoint addresses + addrList := make([]*util.Address, 0) + for _, epRef := range c.endpoints { + addrList = append(addrList, epRef.addr) + } // new peer id: send HELLO message to newly added peer node := c.local var hello *blocks.HelloBlock - hello, err = node.HelloData(time.Hour) + hello, err = node.HelloData(time.Hour, addrList) if err != nil { return } @@ -150,11 +261,28 @@ func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) return } +// Addresses returns the list of listening endpoint addresses +func (c *Core) Addresses() (list []*util.Address, err error) { + for _, epRef := range c.endpoints { + list = append(list, epRef.addr) + } + return +} + +//---------------------------------------------------------------------- + +// Peer returns the local peer +func (c *Core) Peer() *Peer { + return c.local +} + // PeerID returns the peer id of the local node. func (c *Core) PeerID() *util.PeerID { return c.local.GetID() } +//---------------------------------------------------------------------- + // TryConnect is a function which allows the local peer to attempt the // establishment of a connection to another peer using an address. // When the connection attempt is successful, information on the new diff --git a/src/gnunet/core/core_test.go b/src/gnunet/core/core_test.go @@ -20,23 +20,149 @@ package core import ( "context" + "encoding/hex" "gnunet/config" "gnunet/util" "testing" "time" ) -var ( - peer1Cfg = &config.NodeConfig{ - PrivateSeed: "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=", - Endpoints: []string{"udp://127.0.0.1:20861"}, +//---------------------------------------------------------------------- +// Two node GNUnet (smallest and simplest network) +//---------------------------------------------------------------------- + +// TestCoreSimple test a two node network +func TestCoreSimple(t *testing.T) { + + var ( + peer1Cfg = &config.NodeConfig{ + Name: "p1", + PrivateSeed: "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=", + Endpoints: []*config.EndpointConfig{ + { + ID: "p1", + Network: "ip+udp", + Address: "127.0.0.1", + Port: 0, + TTL: 86400, + }, + }, + } + + peer2Cfg = &config.NodeConfig{ + Name: "p2", + PrivateSeed: "Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=", + Endpoints: []*config.EndpointConfig{ + { + ID: "p2", + Network: "ip+udp", + Address: "127.0.0.1", + Port: 20862, + TTL: 86400, + }, + }, + } + ) + + // setup execution context + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + time.Sleep(time.Second) + }() + + // create and run nodes + node1, err := NewTestNode(t, ctx, peer1Cfg) + if err != nil { + t.Fatal(err) + } + node2, err := NewTestNode(t, ctx, peer2Cfg) + if err != nil { + t.Fatal(err) } - peer2Cfg = &config.NodeConfig{ - PrivateSeed: "Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=", - Endpoints: []string{"udp://127.0.0.1:20862"}, + // learn peer addresses (triggers HELLO) + list, err := node2.core.Addresses() + if err != nil { + t.Fatal(err) } -) + for _, addr := range list { + node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr)) + } + + // wait for 5 seconds + time.Sleep(5 * time.Second) +} + +//---------------------------------------------------------------------- +// Two node GNUnet both running locally, but exchanging messages over +// the internet (UPNP router required). +//---------------------------------------------------------------------- + +// TestCoreSimple test a two node network +func TestCoreUPNP(t *testing.T) { + + // configuration data + var ( + peer1Cfg = &config.NodeConfig{ + Name: "p1", + PrivateSeed: "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=", + Endpoints: []*config.EndpointConfig{ + { + ID: "p1", + Network: "ip+udp", + Address: "upnp:", + Port: 2086, + TTL: 86400, + }, + }, + } + peer2Cfg = &config.NodeConfig{ + Name: "p2", + PrivateSeed: "Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=", + Endpoints: []*config.EndpointConfig{ + { + ID: "p2", + Network: "ip+udp", + Address: "upnp:", + Port: 1080, + TTL: 86400, + }, + }, + } + ) + + // setup execution context + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + time.Sleep(time.Second) + }() + + // create and run nodes + node1, err := NewTestNode(t, ctx, peer1Cfg) + if err != nil { + t.Fatal(err) + } + defer node1.Shutdown() + node2, err := NewTestNode(t, ctx, peer2Cfg) + if err != nil { + t.Fatal(err) + } + defer node2.Shutdown() + + // learn peer addresses (triggers HELLO) + list, err := node2.core.Addresses() + if err != nil { + t.Fatal(err) + } + for _, addr := range list { + node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr)) + } + + // sleep a bit + time.Sleep(3 * time.Second) +} //---------------------------------------------------------------------- // create and run a node with given spec @@ -50,9 +176,15 @@ type TestNode struct { addr *util.Address } +func (n *TestNode) Shutdown() { + n.core.Shutdown() +} + func (n *TestNode) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) { n.t.Logf("[%d] Learning %s for %s", n.id, addr.StringAll(), peer.String()) - n.core.Learn(ctx, peer, addr) + if err := n.core.Learn(ctx, peer, addr); err != nil { + n.t.Log("Learn: " + err.Error()) + } } func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (node *TestNode, err error) { @@ -62,18 +194,25 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (nod node.t = t node.id = util.NextID() + // create core service + if node.core, err = NewCore(ctx, cfg); err != nil { + return + } + node.peer = node.core.Peer() + // create peer object if node.peer, err = NewLocalPeer(cfg); err != nil { return } t.Logf("[%d] Node %s starting", node.id, node.peer.GetID()) + t.Logf("[%d] --> %s", node.id, hex.EncodeToString(node.peer.GetID().Key)) - // create core service - if node.core, err = NewCore(ctx, node.peer); err != nil { - return + list, err := node.core.Addresses() + if err != nil { + t.Fatal(err) } - for _, addr := range node.core.trans.Endpoints() { - s := addr.Network() + ":" + addr.String() + for _, addr := range list { + s := addr.Network() + "://" + addr.String() if node.addr, err = util.ParseAddress(s); err != nil { continue } @@ -82,7 +221,7 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (nod // register as event listener incoming := make(chan *Event) - node.core.Register("test", NewListener(incoming, nil)) + node.core.Register(cfg.Name, NewListener(incoming, nil)) // heart beat tick := time.NewTicker(5 * time.Minute) @@ -100,6 +239,7 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (nod t.Logf("[%d] <<< Peer %s diconnected", node.id, ev.Peer) case EV_MESSAGE: t.Logf("[%d] <<< Msg from %s of type %d", node.id, ev.Peer, ev.Msg.Header().MsgType) + t.Logf("[%d] <<< --> %s", node.id, ev.Msg.String()) } // handle termination signal @@ -115,36 +255,3 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (nod }() return } - -//---------------------------------------------------------------------- -// Two node GNUnet (smallest and simplest network) -//---------------------------------------------------------------------- - -// TestCoreSimple test a two node network -func TestCoreSimple(t *testing.T) { - - // setup execution context - ctx, cancel := context.WithCancel(context.Background()) - defer func() { - cancel() - time.Sleep(time.Second) - }() - - // create and run nodes - node1, err := NewTestNode(t, ctx, peer1Cfg) - if err != nil { - t.Fatal(err) - } - node2, err := NewTestNode(t, ctx, peer2Cfg) - if err != nil { - t.Fatal(err) - } - - // learn peer addresses (triggers HELLO) - for _, addr := range node2.core.trans.Endpoints() { - node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr)) - } - - // wait for 5 seconds - time.Sleep(5 * time.Second) -} diff --git a/src/gnunet/core/peer.go b/src/gnunet/core/peer.go @@ -48,7 +48,6 @@ type Peer struct { prv *ed25519.PrivateKey // node private key (long-term signing key) pub *ed25519.PublicKey // node public key (=identifier) idString string // node identifier as string - addrList []*util.Address // list of addresses associated with node ephPrv *ed25519.PrivateKey // ephemeral signing key ephMsg *message.EphemeralKeyMsg // ephemeral signing key message } @@ -73,16 +72,6 @@ func NewLocalPeer(cfg *config.NodeConfig) (p *Peer, err error) { if err != nil { return } - // set the endpoint addresses for local node - p.addrList = make([]*util.Address, len(cfg.Endpoints)) - var addr *util.Address - for i, a := range cfg.Endpoints { - if addr, err = util.ParseAddress(a); err != nil { - return - } - addr.Expires = util.NewAbsoluteTime(time.Now().Add(12 * time.Hour)) - p.addrList[i] = addr - } return } @@ -98,36 +87,24 @@ func NewPeer(peerID string) (p *Peer, err error) { p.prv = nil p.pub = ed25519.NewPublicKeyFromBytes(data) p.idString = util.EncodeBinaryToString(p.pub.Bytes()) - p.addrList = make([]*util.Address, 0) return } +// Shutdown peer-related processes. +func (p *Peer) Shutdown() {} + //---------------------------------------------------------------------- //---------------------------------------------------------------------- -// Address returns a peer address for the given transport protocol -func (p *Peer) Address(transport string) *util.Address { - for _, addr := range p.addrList { - // skip expired entries - if addr.Expires.Expired() { - continue - } - // filter by transport protocol - if len(transport) > 0 && transport != addr.Netw { - continue - } - return addr - } - return nil -} - -// HelloData returns the current HELLO data for the peer -func (p *Peer) HelloData(ttl time.Duration) (h *blocks.HelloBlock, err error) { +// HelloData returns the current HELLO data for the peer. The list of listening +// endpoint addresses re passed in from core to reflect the actual active +// endpoints. +func (p *Peer) HelloData(ttl time.Duration, a []*util.Address) (h *blocks.HelloBlock, err error) { // assemble HELLO data h = new(blocks.HelloBlock) h.PeerID = p.GetID() h.Expire = util.NewAbsoluteTime(time.Now().Add(ttl)) - h.SetAddresses(p.addrList) + h.SetAddresses(a) // sign data err = h.Sign(p.prv) @@ -171,16 +148,6 @@ func (p *Peer) GetIDString() string { return p.idString } -// GetAddressList returns a list of addresses associated with this peer. -func (p *Peer) GetAddressList() []*util.Address { - return p.addrList -} - -// AddAddress adds a new address for a node. -func (p *Peer) AddAddress(a *util.Address) { - p.addrList = append(p.addrList, a) -} - // Sign a message with the (long-term) private key. func (p *Peer) Sign(msg []byte) (*ed25519.EdSignature, error) { if p.prv == nil { diff --git a/src/gnunet/core/peer_test.go b/src/gnunet/core/peer_test.go @@ -21,6 +21,7 @@ package core import ( "gnunet/config" "gnunet/service/dht/blocks" + "gnunet/util" "testing" "time" ) @@ -29,8 +30,13 @@ import ( var ( cfg = &config.NodeConfig{ PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", - Endpoints: []string{ - "r5n+ip+udp://127.0.0.1:6666", + Endpoints: []*config.EndpointConfig{ + { + ID: "test", + Network: "r5n+ip+udp", + Address: "127.0.0.1", + Port: 6666, + }, }, } TTL = 6 * time.Hour @@ -44,8 +50,17 @@ func TestPeerHello(t *testing.T) { t.Fatal(err) } - // get HELLO data for the node - h, err := node.HelloData(TTL) + // get HELLO data for the node: + // This hack will only work for direct listening addresses + addrList := make([]*util.Address, 0) + for _, epRef := range cfg.Endpoints { + addr, err := util.ParseAddress(epRef.Addr()) + if err != nil { + t.Fatal(err) + } + addrList = append(addrList, addr) + } + h, err := node.HelloData(TTL, addrList) // convert to URL and back u := h.URL() diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod @@ -15,9 +15,11 @@ require ( require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/huin/goupnp v1.0.0 // indirect golang.org/x/mod v0.4.2 // indirect golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect ) diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum @@ -11,6 +11,7 @@ github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo= github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= github.com/mattn/go-sqlite3 v1.14.13 h1:1tj15ngiFfcZzii7yd82foL+ks+ouQcj8j/TPq3fk1I= @@ -55,6 +56,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 h1:BonxutuHCTL0rBDnZlKjpGIQFTjyUVTexFOdWkB6Fg0= diff --git a/src/gnunet/message/msg_hello.go b/src/gnunet/message/msg_hello.go @@ -59,7 +59,7 @@ func NewHelloAddress(a *util.Address) *HelloAddress { // String returns a human-readable representation of the message. func (a *HelloAddress) String() string { return fmt.Sprintf("Address{%s,expire=%s}", - util.AddressString(a.Transport, a.Address), a.ExpireOn) + util.URI(a.Transport, a.Address), a.ExpireOn) } // HelloMsg is a message send by peers to announce their presence diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go @@ -1,84 +0,0 @@ -// This file is part of gnunet-go, a GNUnet-implementation in Golang. -// Copyright (C) 2019, 2020 Bernd Fix >Y< -// -// gnunet-go is free software: you can redistribute it and/or modify it -// under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// gnunet-go is distributed in the hope that it will be useful, but -// WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -// Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. -// -// SPDX-License-Identifier: AGPL3.0-or-later - -//====================================================================== -// Standalone (all-in-one) implementation of GNUnet: -// ------------------------------------------------- -// Instead of running GNUnet services like GNS or DHT in separate -// processes communicating (exchanging messages) with each other over -// Unix Domain Sockets, the standalone implementation combines all -// service modules into a single binary running go-routines to -// concurrently performing their tasks. -//====================================================================== - -package gnunet - -import ( - "gnunet/service/dht" - "gnunet/service/gns" - "gnunet/service/namecache" - "gnunet/service/revocation" - "net/rpc" -) - -// Instances holds a list of all GNUnet service modules -type Instances struct { - GNS *gns.Module - Namecache *namecache.NamecacheModule - DHT *dht.Module - Revocation *revocation.Module -} - -// Register modules for JSON-RPC -func (inst Instances) Register() { - rpc.Register(inst.GNS) - rpc.Register(inst.Namecache) - rpc.Register(inst.DHT) - rpc.Register(inst.Revocation) -} - -// Local reference to instance list. The list is initialized -// by core. -var ( - Modules Instances -) - -/* TODO: implement -// Initialize instance list and link module functions as required. -// This function is called by core on start-up. -func Init(ctx context.Context) { - - // Namecache (no calls to other modules) - Modules.Namecache = namecache.NewModule(ctx, c) - - // DHT (no calls to other modules) - Modules.DHT = dht.NewModule(ctx, c) - - // Revocation (no calls to other modules) - Modules.Revocation = revocation.NewModule(ctx, c) - - // GNS (calls Namecache, DHT and Identity) - gns := gns.NewModule(ctx, c) - Modules.GNS = gns - gns.LookupLocal = Modules.Namecache.Get - gns.StoreLocal = Modules.Namecache.Put - gns.LookupRemote = Modules.DHT.Get - gns.RevocationQuery = Modules.Revocation.Query - gns.RevocationRevoke = Modules.Revocation.Revoke -} -*/ diff --git a/src/gnunet/service/dht/blocks/hello.go b/src/gnunet/service/dht/blocks/hello.go @@ -177,7 +177,7 @@ func (h *HelloBlock) URL() string { if i > 0 { u += "&" } - u += url.QueryEscape(a.String()) + u += url.QueryEscape(a.URI()) } return u } diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go @@ -81,7 +81,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module) { //---------------------------------------------------------------------- -// Get a block from the DHT +// Get a block from the DHT ["dht:get"] func (nc *Module) Get(ctx context.Context, query blocks.Query) (block blocks.Block, err error) { // check if we have the requested block in cache or permanent storage. @@ -100,7 +100,7 @@ func (nc *Module) Get(ctx context.Context, query blocks.Query) (block blocks.Blo return nil, nil } -// Put a block into the DHT +// Put a block into the DHT ["dht:put"] func (nc *Module) Put(ctx context.Context, key blocks.Query, block blocks.Block) error { return nil } @@ -142,6 +142,20 @@ func (m *Module) heartbeat(ctx context.Context) { //---------------------------------------------------------------------- +// Export functions +func (m *Module) Export(fcn map[string]any) { + // add exported functions from module + fcn["dht:get"] = m.Get + fcn["dht:put"] = m.Put +} + +// Import functions +func (m *Module) Import(fcm map[string]any) { + // nothing to import now. +} + +//---------------------------------------------------------------------- + // RPC returns the route and handler function for a JSON-RPC request func (m *Module) RPC() (string, func(http.ResponseWriter, *http.Request)) { return "/gns/", func(wrt http.ResponseWriter, req *http.Request) { diff --git a/src/gnunet/service/dht/routingtable.go b/src/gnunet/service/dht/routingtable.go @@ -101,7 +101,7 @@ type RoutingTable struct { ref *PeerAddress // reference address for distance buckets []*Bucket // list of buckets list map[*PeerAddress]struct{} // keep list of peers - rwlock sync.RWMutex // lock for write operations + mtx sync.RWMutex // lock for write operations l2nse float64 // log2 of estimated network size inProcess bool // flag if Process() is running } @@ -131,8 +131,8 @@ func NewRoutingTable(ref *PeerAddress) *RoutingTable { // Returns true if the entry was added, false otherwise. func (rt *RoutingTable) Add(p *PeerAddress) bool { // ensure one write and no readers - rt.rwlock.Lock() - defer rt.rwlock.Unlock() + rt.lock(false) + defer rt.unlock(false) // check if peer is already known if _, ok := rt.list[p]; ok { @@ -153,8 +153,8 @@ func (rt *RoutingTable) Add(p *PeerAddress) bool { // Returns true if the entry was removed, false otherwise. func (rt *RoutingTable) Remove(p *PeerAddress) bool { // ensure one write and no readers - rt.rwlock.Lock() - defer rt.rwlock.Unlock() + rt.lock(false) + defer rt.unlock(false) // compute distance (bucket index) and remove entry from bucket _, idx := p.Distance(rt.ref) @@ -170,10 +170,15 @@ func (rt *RoutingTable) Remove(p *PeerAddress) bool { //---------------------------------------------------------------------- // Process a function f in the locked context of a routing table -func (rt *RoutingTable) Process(f func() error) error { - // ensure one write and no readers - rt.rwlock.Lock() - defer rt.rwlock.Unlock() +func (rt *RoutingTable) Process(f func() error, readonly bool) error { + // handle locking + rt.lock(readonly) + rt.inProcess = true + defer func() { + rt.inProcess = false + rt.unlock(readonly) + }() + // call function in unlocked context return f() } @@ -184,8 +189,8 @@ func (rt *RoutingTable) Process(f func() error) error { // SelectClosestPeer for a given peer address and bloomfilter. func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) (n *PeerAddress) { // no writer allowed - rt.rwlock.RLock() - defer rt.rwlock.RUnlock() + rt.mtx.RLock() + defer rt.mtx.RUnlock() // find closest address var dist *math.Int @@ -204,8 +209,8 @@ func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) ( // included in the bloomfilter) func (rt *RoutingTable) SelectRandomPeer(bf *PeerBloomFilter) *PeerAddress { // no writer allowed - rt.rwlock.RLock() - defer rt.rwlock.RUnlock() + rt.mtx.RLock() + defer rt.mtx.RUnlock() // select random entry from list if size := len(rt.list); size > 0 { @@ -279,11 +284,35 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) { } } return nil - }); err != nil { + }, false); err != nil { logger.Println(logger.ERROR, "[dht] RT heartbeat: "+err.Error()) } } +//---------------------------------------------------------------------- + +// lock with given mode (if not in processing function) +func (rt *RoutingTable) lock(readonly bool) { + if !rt.inProcess { + if readonly { + rt.mtx.RLock() + } else { + rt.mtx.Lock() + } + } +} + +// lock with given mode (if not in processing function) +func (rt *RoutingTable) unlock(readonly bool) { + if !rt.inProcess { + if readonly { + rt.mtx.RUnlock() + } else { + rt.mtx.Unlock() + } + } +} + //====================================================================== // Routing table buckets //====================================================================== diff --git a/src/gnunet/service/dht/routingtable_test.go b/src/gnunet/service/dht/routingtable_test.go @@ -45,8 +45,12 @@ type Entry struct { var ( cfg = &config.NodeConfig{ PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", - Endpoints: []string{ - "r5n+ip+udp://127.0.0.1:6666", + Endpoints: []*config.EndpointConfig{ + { + Network: "r5n+ip+udp", + Address: "127.0.0.1", + Port: 6666, + }, }, } ) diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go @@ -98,6 +98,7 @@ type Module struct { RevocationRevoke func(ctx context.Context, rd *revocation.RevData) (success bool, err error) } +// NewModule instantiates a new GNS module. func NewModule(ctx context.Context, c *core.Core) (m *Module) { m = &Module{ ModuleImpl: *service.NewModuleImpl(), @@ -128,6 +129,23 @@ func (m *Module) event(ctx context.Context, ev *core.Event) { //---------------------------------------------------------------------- +// Export functions +func (m *Module) Export(fcn map[string]any) { + // add exported functions from module +} + +// Import functions +func (m *Module) Import(fcn map[string]any) { + // resolve imports from other modules + m.LookupLocal = fcn["namecache:get"].(func(ctx context.Context, query *blocks.GNSQuery) (*blocks.GNSBlock, error)) + m.StoreLocal = fcn["namecache:put"].(func(ctx context.Context, query *blocks.GNSQuery, block *blocks.GNSBlock) error) + m.LookupRemote = fcn["dht:get"].(func(ctx context.Context, query blocks.Query) (blocks.Block, error)) + m.RevocationQuery = fcn["rev:query"].(func(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, err error)) + m.RevocationRevoke = fcn["rev:revoke"].(func(ctx context.Context, rd *revocation.RevData) (success bool, err error)) +} + +//---------------------------------------------------------------------- + // Resolve a GNS name with multiple labels. If pkey is not nil, the name // is interpreted as "relative to current zone". func (m *Module) Resolve( diff --git a/src/gnunet/service/module.go b/src/gnunet/service/module.go @@ -26,7 +26,43 @@ import ( ) // Module is an interface for GNUnet service modules (workers). +// +// Modules can call other GNUnet services; these services can be used by +// sending messages to the respective service socket (the default way) or by +// calling the module functions directly (if the other module is compiled +// along with the calling module into one binary). The latter method requires +// calls to m.Export() and m.Import() to link the modules together (see +// example): +// +// // create module instances +// gnsMod = gns.NewModule(ctx, core) +// dhtMod = dht.NewModule(ctx, core) +// ncMod = namecache.NewModule(ctx, core) +// revMod = revocation.NewModule(ctx, core) +// +// // export module functions +// fcn := make(map[string]any) +// gnsMod.Export(fcn) +// dhtMod.Export(fcn) +// ncMod.Export(fcn) +// revMod.Export(fcn) +// +// // import (link) module functions +// gnsMod.Import(fcn) +// dhtMod.Import(fcn) +// ncMod.Import(fcn) +// revMod.Import(fcn) +// +// Exported and imported module function are identified by name defined in the +// Export() function. Import() functions that access functions in other modules +// need to use the same name for linking. type Module interface { + // Export functions by name + Export(map[string]any) + + // Import functions by name + Import(map[string]any) + // RPC returns the route and handler for JSON-RPC requests RPC() (string, func(http.ResponseWriter, *http.Request)) diff --git a/src/gnunet/service/namecache/module.go b/src/gnunet/service/namecache/module.go @@ -35,23 +35,39 @@ import ( //---------------------------------------------------------------------- // Namecache handles the transient storage of GNS blocks under the query key. -type NamecacheModule struct { +type Module struct { service.ModuleImpl cache service.DHTStore // transient block cache } // NewModule creates a new module instance. -func NewModule(ctx context.Context, c *core.Core) (m *NamecacheModule) { - m = &NamecacheModule{ +func NewModule(ctx context.Context, c *core.Core) (m *Module) { + m = &Module{ ModuleImpl: *service.NewModuleImpl(), } m.cache, _ = service.NewDHTStore(config.Cfg.Namecache.Storage) return } +//---------------------------------------------------------------------- + +// Export functions +func (m *Module) Export(fcn map[string]any) { + // add exported functions from module + fcn["namecache:get"] = m.Get + fcn["namecache:put"] = m.Put +} + +// Import functions +func (m *Module) Import(fcm map[string]any) { + // nothing to import now. +} + +//---------------------------------------------------------------------- + // Get an entry from the cache if available. -func (m *NamecacheModule) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) { +func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) { var b blocks.Block b, err = m.cache.Get(query) err = blocks.Unwrap(b, block) @@ -59,6 +75,6 @@ func (m *NamecacheModule) Get(ctx context.Context, query *blocks.GNSQuery) (bloc } // Put entry into the cache. -func (m *NamecacheModule) Put(ctx context.Context, query *blocks.GNSQuery, block *blocks.GNSBlock) error { +func (m *Module) Put(ctx context.Context, query *blocks.GNSQuery, block *blocks.GNSBlock) error { return m.cache.Put(query, block) } diff --git a/src/gnunet/service/revocation/module.go b/src/gnunet/service/revocation/module.go @@ -98,8 +98,22 @@ func (m *Module) event(ctx context.Context, ev *core.Event) { //---------------------------------------------------------------------- +// Export functions +func (m *Module) Export(fcn map[string]any) { + // add exported functions from module + fcn["rev:query"] = m.Query + fcn["rev:revoke"] = m.Revoke +} + +// Import functions +func (m *Module) Import(fcm map[string]any) { + // nothing to import now. +} + +//---------------------------------------------------------------------- + // Query return true if the pkey is valid (not revoked) and false -// if the pkey has been revoked. +// if the pkey has been revoked ["rev:query"] func (m *Module) Query(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, err error) { // fast check first: is the key in the bloomfilter? data := zkey.Bytes() @@ -118,7 +132,7 @@ func (m *Module) Query(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, e return false, nil } -// Revoke a key with given revocation data +// Revoke a key with given revocation data ["rev:revoke"] func (m *Module) Revoke(ctx context.Context, rd *RevData) (success bool, err error) { // verify the revocation data diff, rc := rd.Verify(true) diff --git a/src/gnunet/transport/endpoint.go b/src/gnunet/transport/endpoint.go @@ -30,6 +30,10 @@ import ( var ( ErrEndpNotAvailable = errors.New("no endpoint for address available") ErrEndpProtocolMismatch = errors.New("transport protocol mismatch") + ErrEndpProtocolUnknown = errors.New("unknown transport protocol") + ErrEndpExists = errors.New("endpoint exists") + ErrEndpNoAddress = errors.New("no address for endpoint") + ErrEndpNoConnection = errors.New("no connection on endpoint") ) // Endpoint represents a local endpoint that can send and receive messages. @@ -83,9 +87,12 @@ type PaketEndpoint struct { func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) (err error) { // create listener var lc net.ListenConfig - if ep.conn, err = lc.ListenPacket(ctx, ep.addr.Network(), ep.addr.String()); err != nil { + xproto := ep.addr.Network() + if ep.conn, err = lc.ListenPacket(ctx, EpProtocol(xproto), ep.addr.String()); err != nil { return } + // use the actual listening address + ep.addr = util.NewAddress(xproto, ep.conn.LocalAddr().String()) // run watch dog for termination go func() { @@ -95,27 +102,15 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) ( // run go routine to handle messages from clients go func() { for { - // read next message from packet - n, _, err := ep.conn.ReadFrom(ep.buf) + // read next message + tm, err := ep.read() if err != nil { break } - rdr := bytes.NewBuffer(util.Clone(ep.buf[:n])) - msg, err := ReadMessageDirect(rdr, ep.buf) - if err != nil { - break - } - // check for transport message - if msg.Header().MsgType == message.DUMMY { - // set transient attributes - tm := msg.(*TransportMessage) - tm.endp = ep.id - tm.conn = 0 - // send to handler - go func() { - hdlr <- tm - }() - } + // send transport message to handler + go func() { + hdlr <- tm + }() } // connection ended. ep.conn.Close() @@ -123,29 +118,73 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) ( return } +// Read a transport message from endpoint based on extended protocol +func (ep *PaketEndpoint) read() (tm *TransportMessage, err error) { + // read next packet (assuming that it contains one complete message) + var n int + if n, _, err = ep.conn.ReadFrom(ep.buf); err != nil { + return + } + // parse transport message based on extended protocol + var ( + peer *util.PeerID + msg message.Message + ) + switch ep.addr.Network() { + case "ip+udp": + // parse peer id and message in sequence + peer = util.NewPeerID(ep.buf[:32]) + rdr := bytes.NewBuffer(util.Clone(ep.buf[32:n])) + if msg, err = ReadMessageDirect(rdr, ep.buf); err != nil { + return + } + default: + panic(ErrEndpProtocolUnknown) + } + // return transport message + return &TransportMessage{ + Peer: peer, + Msg: msg, + }, nil +} + // Send message to address from endpoint func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) (err error) { + // check for valid connection + if ep.conn == nil { + return ErrEndpNoConnection + } + // resolve target address var a *net.UDPAddr - a, err = net.ResolveUDPAddr(addr.Network(), addr.String()) + a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()), addr.String()) + + // get message content (TransportMessage) var buf []byte if buf, err = msg.Bytes(); err != nil { return } + // handle extended protocol: + switch ep.addr.Network() { + case "ip+udp": + // no modifications required + + default: + // unknown protocol + return ErrEndpProtocolUnknown + } _, err = ep.conn.WriteTo(buf, a) return } // Address returms the func (ep *PaketEndpoint) Address() net.Addr { - if ep.conn != nil { - return ep.conn.LocalAddr() - } return ep.addr } // CanSendTo returns true if the endpoint can sent to address -func (ep *PaketEndpoint) CanSendTo(addr net.Addr) bool { - return epMode(addr.Network()) == "packet" +func (ep *PaketEndpoint) CanSendTo(addr net.Addr) (ok bool) { + ok = EpProtocol(addr.Network()) == EpProtocol(ep.addr.Network()) + return } // ID returns the endpoint identifier @@ -153,6 +192,7 @@ func (ep *PaketEndpoint) ID() int { return ep.id } +// create a new packet endpoint for protcol and address func newPacketEndpoint(addr net.Addr) (ep *PaketEndpoint, err error) { // check for matching protocol if epMode(addr.Network()) != "packet" { @@ -185,9 +225,13 @@ type StreamEndpoint struct { func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) (err error) { // create listener var lc net.ListenConfig - if ep.listener, err = lc.Listen(ctx, ep.addr.Network(), ep.addr.String()); err != nil { + xproto := ep.addr.Network() + if ep.listener, err = lc.Listen(ctx, EpProtocol(xproto), ep.addr.String()); err != nil { return } + // get actual listening address + ep.addr = util.NewAddress(xproto, ep.listener.Addr().String()) + // run watch dog for termination go func() { <-ctx.Done() @@ -206,21 +250,14 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) go func() { for { // read next message from connection - msg, err := ReadMessage(ctx, conn, ep.buf) + tm, err := ep.read(ctx, conn) if err != nil { break } - // check for transport message - if msg.Header().MsgType == message.DUMMY { - // set transient attributes - tm := msg.(*TransportMessage) - tm.endp = ep.id - tm.conn = session - // send to handler - go func() { - hdlr <- tm - }() - } + // send transport message to handler + go func() { + hdlr <- tm + }() } // connection ended. conn.Close() @@ -231,6 +268,34 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) return } +// Read a transport message from endpoint based on extended protocol +func (ep *StreamEndpoint) read(ctx context.Context, conn net.Conn) (tm *TransportMessage, err error) { + // parse transport message based on extended protocol + var ( + peer *util.PeerID + msg message.Message + ) + switch ep.addr.Network() { + case "ip+udp": + // parse peer id + peer = util.NewPeerID(nil) + if _, err = conn.Read(peer.Key); err != nil { + return + } + // read next message from connection + if msg, err = ReadMessage(ctx, conn, ep.buf); err != nil { + break + } + default: + panic(ErrEndpProtocolUnknown) + } + // return transport message + return &TransportMessage{ + Peer: peer, + Msg: msg, + }, nil +} + // Send message to address from endpoint func (ep *StreamEndpoint) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) error { return nil @@ -238,9 +303,6 @@ func (ep *StreamEndpoint) Send(ctx context.Context, addr net.Addr, msg *Transpor // Address returns the actual listening endpoint address func (ep *StreamEndpoint) Address() net.Addr { - if ep.listener != nil { - return ep.listener.Addr() - } return ep.addr } @@ -254,6 +316,7 @@ func (ep *StreamEndpoint) ID() int { return ep.id } +// create a new endpoint based on extended protocol and address func newStreamEndpoint(addr net.Addr) (ep *StreamEndpoint, err error) { // check for matching protocol if epMode(addr.Network()) != "stream" { @@ -270,10 +333,29 @@ func newStreamEndpoint(addr net.Addr) (ep *StreamEndpoint, err error) { return } +//---------------------------------------------------------------------- +// derive endpoint mode (packet/stream) and transport protocol from +// net.Adddr.Network() strings +//---------------------------------------------------------------------- + +// EpProtocol returns the transport protocol for a given network string +// that can include extended protocol information like "r5n+ip+udp" +func EpProtocol(netw string) string { + switch netw { + case "udp", "udp4", "udp6", "ip+udp": + return "udp" + case "tcp", "tcp4", "tcp6": + return "tcp" + case "unix": + return "unix" + } + return "" +} + // epMode returns the endpoint mode (packet or stream) for a given network func epMode(netw string) string { - switch netw { - case "udp", "udp4", "udp6", "r5n+ip+udp": + switch EpProtocol(netw) { + case "udp": return "packet" case "tcp", "unix": return "stream" diff --git a/src/gnunet/transport/reader_writer.go b/src/gnunet/transport/reader_writer.go @@ -113,10 +113,7 @@ func ReadMessage(ctx context.Context, rdr io.ReadCloser, buf []byte) (msg messag if err = get(4, int(mh.MsgSize)-4); err != nil { return nil, err } - // handle transport message case - if mh.MsgType == message.DUMMY { - msg = NewTransportMessage(nil, nil) - } else if msg, err = message.NewEmptyMessage(mh.MsgType); err != nil { + if msg, err = message.NewEmptyMessage(mh.MsgType); err != nil { return nil, err } if msg == nil { diff --git a/src/gnunet/transport/transport.go b/src/gnunet/transport/transport.go @@ -25,6 +25,8 @@ import ( "gnunet/message" "gnunet/util" "net" + + "github.com/bfix/gospel/network" ) // Trnsport layer error codes @@ -41,32 +43,19 @@ var ( // Msg is the exchanged GNUnet message. The packet itself satisfies the // message.Message interface. type TransportMessage struct { - Hdr *message.Header `` // message header - Peer *util.PeerID `` // remote peer - Payload []byte `size:"*"` // GNUnet message - - // package-local attributes (transient) - msg message.Message - endp int // id of endpoint (incoming message) - conn int // id of connection (optional, incoming message) -} - -func (msg *TransportMessage) Header() *message.Header { - return msg.Hdr -} - -func (msg *TransportMessage) Message() (m message.Message, err error) { - if m = msg.msg; m == nil { - rdr := bytes.NewBuffer(msg.Payload) - m, err = ReadMessageDirect(rdr, nil) - } - return + Peer *util.PeerID // remote peer + Msg message.Message // GNUnet message } // Bytes returns the binary representation of a transport message func (msg *TransportMessage) Bytes() ([]byte, error) { buf := new(bytes.Buffer) - err := WriteMessageDirect(buf, msg) + // serialize peer id + if _, err := buf.Write(msg.Peer.Key); err != nil { + return nil, err + } + // serialize message + err := WriteMessageDirect(buf, msg.Msg) return buf.Bytes(), err } @@ -76,21 +65,13 @@ func (msg *TransportMessage) String() string { } // NewTransportMessage creates a message suitable for transfer -func NewTransportMessage(peer *util.PeerID, payload []byte) (tm *TransportMessage) { +func NewTransportMessage(peer *util.PeerID, msg message.Message) (tm *TransportMessage) { if peer == nil { peer = util.NewPeerID(nil) } - msize := 0 - if payload != nil { - msize = len(payload) - } tm = &TransportMessage{ - Hdr: &message.Header{ - MsgSize: uint16(36 + msize), - MsgType: message.DUMMY, - }, - Peer: peer, - Payload: payload, + Peer: peer, + Msg: msg, } return } @@ -100,28 +81,41 @@ func NewTransportMessage(peer *util.PeerID, payload []byte) (tm *TransportMessag // Transport enables network-oriented (like IP, UDP, TCP or UDS) // message exchange on multiple endpoints. type Transport struct { - incoming chan *TransportMessage // messages as received from the network - endpoints map[int]Endpoint // list of available endpoints + incoming chan *TransportMessage // messages as received from the network + endpoints *util.Map[int, Endpoint] // list of available endpoints + upnp *network.PortMapper // UPnP mapper (optional) } // NewTransport creates and runs a new transport layer implementation. -func NewTransport(ctx context.Context, ch chan *TransportMessage) (t *Transport) { +func NewTransport(ctx context.Context, tag string, ch chan *TransportMessage) (t *Transport) { // create transport instance + mngr, err := network.NewPortMapper(tag) + if err != nil { + mngr = nil + } return &Transport{ incoming: ch, - endpoints: make(map[int]Endpoint), + endpoints: util.NewMap[int, Endpoint](), + upnp: mngr, + } +} + +// Shutdown transport-related processes +func (t *Transport) Shutdown() { + if t.upnp != nil { + t.upnp.Close() } } // Send a message over suitable endpoint func (t *Transport) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) (err error) { - for _, ep := range t.endpoints { + // use the first endpoint able to handle address + return t.endpoints.ProcessRange(func(_ int, ep Endpoint) error { if ep.CanSendTo(addr) { - err = ep.Send(ctx, addr, msg) - break + return ep.Send(ctx, addr, msg) } - } - return + return nil + }, true) } //---------------------------------------------------------------------- @@ -130,22 +124,45 @@ func (t *Transport) Send(ctx context.Context, addr net.Addr, msg *TransportMessa // AddEndpoint instantiates and run a new endpoint handler for the // given address (must map to a network interface). -func (t *Transport) AddEndpoint(ctx context.Context, addr net.Addr) (a net.Addr, err error) { - // register endpoint - var ep Endpoint +func (t *Transport) AddEndpoint(ctx context.Context, addr *util.Address) (ep Endpoint, err error) { + // check for valid address + if addr == nil { + err = ErrEndpNoAddress + return + } + // check if endpoint is already available + as := addr.Network() + "://" + addr.String() + if err = t.endpoints.ProcessRange(func(_ int, ep Endpoint) error { + ae := ep.Address().Network() + "://" + ep.Address().String() + if as == ae { + return ErrEndpExists + } + return nil + }, true); err != nil { + return + } + // register new endpoint if ep, err = NewEndpoint(addr); err != nil { return } - t.endpoints[ep.ID()] = ep + // add endpoint to list and run it + t.endpoints.Put(ep.ID(), ep) ep.Run(ctx, t.incoming) - return ep.Address(), nil + return } -// Endpoints returns a list of listening addresses managed by transport. -func (t *Transport) Endpoints() (list []net.Addr) { - list = make([]net.Addr, 0) - for _, ep := range t.endpoints { - list = append(list, ep.Address()) - } - return +//---------------------------------------------------------------------- +// UPnP handling +//---------------------------------------------------------------------- + +// ForwardOpen returns a local address for listening that will receive traffic +// from a port forward handled by UPnP on the router. +func (t *Transport) ForwardOpen(protocol, param string, port int) (id, local, remote string, err error) { + // no parameters currently defined, so just do the assignment. + return t.upnp.Assign(protocol, port) +} + +// ForwardClose closes a specific port forwarding +func (t *Transport) ForwardClose(id string) error { + return t.upnp.Unassign(id) } diff --git a/src/gnunet/util/address.go b/src/gnunet/util/address.go @@ -34,11 +34,11 @@ type Address struct { } // NewAddress returns a new Address for the given transport and specs -func NewAddress(transport string, addr []byte) *Address { +func NewAddress(transport string, addr string) *Address { return &Address{ Netw: transport, Options: 0, - Address: Clone(addr), + Address: Clone([]byte(addr)), Expires: AbsoluteTimeNever(), } } @@ -61,7 +61,7 @@ func ParseAddress(s string) (addr *Address, err error) { err = fmt.Errorf("invalid address format: '%s'", s) return } - addr = NewAddress(p[0], []byte(strings.Trim(p[1], "/"))) + addr = NewAddress(p[0], strings.Trim(p[1], "/")) return } @@ -91,8 +91,11 @@ func (a *Address) Network() string { //---------------------------------------------------------------------- -// AddressString returns a string representaion of an address. -func AddressString(network string, addr []byte) string { +// URI returns a string representaion of an address. +func (a *Address) URI() string { + return URI(a.Netw, a.Address) +} +func URI(network string, addr []byte) string { return network + "://" + string(addr) } @@ -151,13 +154,13 @@ func (a *PeerAddrList) Add(id string, addr *Address) (mode int) { list = append(list, addr) a.list.Put(id, list) return nil - }) + }, false) } return } // Get address for peer -func (a *PeerAddrList) Get(id string, transport string) *Address { +func (a *PeerAddrList) Get(id string, transport string) (res []*Address) { list, ok := a.list.Get(id) if ok { for _, addr := range list { @@ -171,10 +174,10 @@ func (a *PeerAddrList) Get(id string, transport string) *Address { // skip other transports continue } - return addr + res = append(res, addr) } } - return nil + return } // Delete a list entry by key. diff --git a/src/gnunet/util/database.go b/src/gnunet/util/database.go @@ -121,7 +121,7 @@ func (p *dbPool) remove(key string) error { p.insts.Delete(key) } return - }) + }, false) } // Connect to a SQL database (various types and flavors): @@ -180,6 +180,6 @@ func (p *dbPool) Connect(spec string) (db *DbConn, err error) { db = new(DbConn) db.conn, err = inst.db.Conn(p.ctx) return err - }) + }, false) return } diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go @@ -70,43 +70,87 @@ func NewMap[K comparable, V any]() *Map[K, V] { } } +//---------------------------------------------------------------------- + // Process a function in the locked map context. Calls -// to other map functions in 'f' will use additional locks. -func (m *Map[K, V]) Process(f func() error) error { - m.mtx.Lock() - defer m.mtx.Unlock() +// to other map functions in 'f' will skip their locks. +func (m *Map[K, V]) Process(f func() error, readonly bool) error { + // handle locking + m.lock(readonly) + m.inProcess = true + defer func() { + m.inProcess = false + m.unlock(readonly) + }() + // function call in unlocked environment + return f() +} + +// Process a ranged function in the locked map context. Calls +// to other map functions in 'f' will skip their locks. +func (m *Map[K, V]) ProcessRange(f func(key K, value V) error, readonly bool) error { + // handle locking + m.lock(readonly) m.inProcess = true - err := f() - m.inProcess = false - return err + defer func() { + m.inProcess = false + m.unlock(readonly) + }() + // range over map and call function. + for key, value := range m.list { + if err := f(key, value); err != nil { + return err + } + } + return nil } +//---------------------------------------------------------------------- + // Put value into map under given key. func (m *Map[K, V]) Put(key K, value V) { - if !m.inProcess { - m.mtx.Lock() - defer m.mtx.Unlock() - } + m.lock(false) + defer m.unlock(false) m.list[key] = value } // Get value with iven key from map. func (m *Map[K, V]) Get(key K) (value V, ok bool) { - if !m.inProcess { - m.mtx.RLock() - defer m.mtx.RUnlock() - } + m.lock(true) + defer m.unlock(true) value, ok = m.list[key] return } // Delete key/value pair from map. func (m *Map[K, V]) Delete(key K) { + m.lock(false) + defer m.unlock(false) + delete(m.list, key) +} + +//---------------------------------------------------------------------- + +// lock with given mode (if not in processing function) +func (m *Map[K, V]) lock(readonly bool) { if !m.inProcess { - m.mtx.Lock() - defer m.mtx.Unlock() + if readonly { + m.mtx.RLock() + } else { + m.mtx.Lock() + } + } +} + +// lock with given mode (if not in processing function) +func (m *Map[K, V]) unlock(readonly bool) { + if !m.inProcess { + if readonly { + m.mtx.RUnlock() + } else { + m.mtx.Unlock() + } } - delete(m.list, key) } //---------------------------------------------------------------------- diff --git a/src/gnunet/util/peer_id.go b/src/gnunet/util/peer_id.go @@ -25,23 +25,19 @@ type PeerID struct { Key []byte `size:"32"` } -// NewPeerID creates a new object from the data. -func NewPeerID(data []byte) *PeerID { - if data == nil { - data = make([]byte, 32) - } else { - size := len(data) - if size > 32 { - data = data[:32] - } else if size < 32 { - buf := make([]byte, 32) - CopyAlignedBlock(buf, data) - data = buf - } +// NewPeerID creates a new peer id from data. +func NewPeerID(data []byte) (p *PeerID) { + p = &PeerID{ + Key: make([]byte, 32), } - return &PeerID{ - Key: data, + if data != nil { + if len(data) < 32 { + CopyAlignedBlock(p.Key, data) + } else { + copy(p.Key, data[:32]) + } } + return } // Equals returns true if two peer IDs match.