aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBernd Fix <brf@hoi-polloi.org>2022-06-07 09:31:22 +0200
committerBernd Fix <brf@hoi-polloi.org>2022-06-07 09:31:22 +0200
commit7a6ae8f61ee7efde161db98462259ea9bbb23386 (patch)
tree990de3aab9cec74f74185883a5ca3d2941b18dff /src
parentde577428a69a0002c3194afdf0562cf5a4dc1bdc (diff)
downloadgnunet-go-7a6ae8f61ee7efde161db98462259ea9bbb23386.tar.gz
gnunet-go-7a6ae8f61ee7efde161db98462259ea9bbb23386.zip
Improved transport and module code.v0.1.24
Diffstat (limited to 'src')
-rw-r--r--src/gnunet/cmd/gnunet-service-dht-go/main.go (renamed from src/gnunet/cmd/gnunet-service-dht-test-go/main.go)8
-rw-r--r--src/gnunet/cmd/peer_mockup/main.go19
-rw-r--r--src/gnunet/config/config.go30
-rw-r--r--src/gnunet/config/config_test.go19
-rw-r--r--src/gnunet/config/gnunet-config.json119
-rw-r--r--src/gnunet/core/core.go202
-rw-r--r--src/gnunet/core/core_test.go203
-rw-r--r--src/gnunet/core/peer.go49
-rw-r--r--src/gnunet/core/peer_test.go23
-rw-r--r--src/gnunet/go.mod2
-rw-r--r--src/gnunet/go.sum2
-rw-r--r--src/gnunet/message/msg_hello.go2
-rw-r--r--src/gnunet/modules.go84
-rw-r--r--src/gnunet/service/dht/blocks/hello.go2
-rw-r--r--src/gnunet/service/dht/module.go18
-rw-r--r--src/gnunet/service/dht/routingtable.go57
-rw-r--r--src/gnunet/service/dht/routingtable_test.go8
-rw-r--r--src/gnunet/service/gns/module.go18
-rw-r--r--src/gnunet/service/module.go36
-rw-r--r--src/gnunet/service/namecache/module.go26
-rw-r--r--src/gnunet/service/revocation/module.go18
-rw-r--r--src/gnunet/transport/endpoint.go168
-rw-r--r--src/gnunet/transport/reader_writer.go5
-rw-r--r--src/gnunet/transport/transport.go123
-rw-r--r--src/gnunet/util/address.go21
-rw-r--r--src/gnunet/util/database.go4
-rw-r--r--src/gnunet/util/misc.go80
-rw-r--r--src/gnunet/util/peer_id.go26
28 files changed, 907 insertions, 465 deletions
diff --git a/src/gnunet/cmd/gnunet-service-dht-test-go/main.go b/src/gnunet/cmd/gnunet-service-dht-go/main.go
index cd2bd1a..3b525fd 100644
--- a/src/gnunet/cmd/gnunet-service-dht-test-go/main.go
+++ b/src/gnunet/cmd/gnunet-service-dht-go/main.go
@@ -83,16 +83,12 @@ func main() {
83 83
84 // instantiate core service 84 // instantiate core service
85 ctx, cancel := context.WithCancel(context.Background()) 85 ctx, cancel := context.WithCancel(context.Background())
86 var local *core.Peer
87 if local, err = core.NewLocalPeer(config.Cfg.Local); err != nil {
88 logger.Printf(logger.ERROR, "[dht] No local peer: %s\n", err.Error())
89 return
90 }
91 var c *core.Core 86 var c *core.Core
92 if c, err = core.NewCore(ctx, local); err != nil { 87 if c, err = core.NewCore(ctx, config.Cfg.Local); err != nil {
93 logger.Printf(logger.ERROR, "[dht] core failed: %s\n", err.Error()) 88 logger.Printf(logger.ERROR, "[dht] core failed: %s\n", err.Error())
94 return 89 return
95 } 90 }
91 defer c.Shutdown()
96 92
97 // start a new DHT service 93 // start a new DHT service
98 dht := dht.NewService(ctx, c) 94 dht := dht.NewService(ctx, c)
diff --git a/src/gnunet/cmd/peer_mockup/main.go b/src/gnunet/cmd/peer_mockup/main.go
index 58f4baf..96f62da 100644
--- a/src/gnunet/cmd/peer_mockup/main.go
+++ b/src/gnunet/cmd/peer_mockup/main.go
@@ -22,13 +22,19 @@ var (
22 // configuration for local node 22 // configuration for local node
23 localCfg = &config.NodeConfig{ 23 localCfg = &config.NodeConfig{
24 PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", 24 PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
25 Endpoints: []string{ 25 Endpoints: []*config.EndpointConfig{
26 "udp:127.0.0.1:2086", 26 {
27 ID: "local",
28 Network: "udp",
29 Address: "127.0.0.1",
30 Port: 2086,
31 TTL: 86400,
32 },
27 }, 33 },
28 } 34 }
29 // configuration for remote node 35 // configuration for remote node
30 remoteCfg = "3GXXMNb5YpIUO7ejIR2Yy0Cf5texuLfDjHkXcqbPxkc=" 36 remoteCfg = "3GXXMNb5YpIUO7ejIR2Yy0Cf5texuLfDjHkXcqbPxkc="
31 remoteAddr = "udp:172.17.0.5:2086" 37 remoteAddr = "udp://172.17.0.5:2086"
32 38
33 // top-level variables used accross functions 39 // top-level variables used accross functions
34 local *core.Peer // local peer (with private key) 40 local *core.Peer // local peer (with private key)
@@ -50,14 +56,11 @@ func main() {
50 flag.Parse() 56 flag.Parse()
51 57
52 // setup peer and core instances 58 // setup peer and core instances
53 if local, err = core.NewLocalPeer(localCfg); err != nil { 59 if c, err = core.NewCore(ctx, localCfg); err != nil {
54 fmt.Println("local failed: " + err.Error())
55 return
56 }
57 if c, err = core.NewCore(ctx, local); err != nil {
58 fmt.Println("core failed: " + err.Error()) 60 fmt.Println("core failed: " + err.Error())
59 return 61 return
60 } 62 }
63 local = c.Peer()
61 if remote, err = core.NewPeer(remoteCfg); err != nil { 64 if remote, err = core.NewPeer(remoteCfg); err != nil {
62 fmt.Println("remote failed: " + err.Error()) 65 fmt.Println("remote failed: " + err.Error())
63 return 66 return
diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go
index 41a65f0..3687a3e 100644
--- a/src/gnunet/config/config.go
+++ b/src/gnunet/config/config.go
@@ -20,6 +20,7 @@ package config
20 20
21import ( 21import (
22 "encoding/json" 22 "encoding/json"
23 "fmt"
23 "io/ioutil" 24 "io/ioutil"
24 "reflect" 25 "reflect"
25 "regexp" 26 "regexp"
@@ -32,10 +33,26 @@ import (
32// Configuration for local node 33// Configuration for local node
33//---------------------------------------------------------------------- 34//----------------------------------------------------------------------
34 35
36// EndpointConfig holds parameters for local network listeners.
37type EndpointConfig struct {
38 ID string `json:"id"` // endpoint identifier
39 Network string `json:"network"` // network protocol to use on endpoint
40 Address string `json:"address"` // address to listen on
41 Port int `json:"port"` // port for listening to network
42 TTL int `json:"ttl"` // time-to-live for address (in seconds)
43}
44
45// Addr returns an address string for endpoint configuration; it does NOT
46// handle special cases like UPNP and such.
47func (c *EndpointConfig) Addr() string {
48 return fmt.Sprintf("%s://%s:%d", c.Network, c.Address, c.Port)
49}
50
35// NodeConfig holds parameters for the local node instance 51// NodeConfig holds parameters for the local node instance
36type NodeConfig struct { 52type NodeConfig struct {
37 PrivateSeed string `json:"privateSeed"` // Node private key seed (base64) 53 Name string `json:"name"` // (short) name for local node
38 Endpoints []string `json:"endpoints"` // list of endpoints available 54 PrivateSeed string `json:"privateSeed"` // Node private key seed (base64)
55 Endpoints []*EndpointConfig `json:"endpoints"` // list of endpoints available
39} 56}
40 57
41//---------------------------------------------------------------------- 58//----------------------------------------------------------------------
@@ -139,9 +156,16 @@ func ParseConfig(fileName string) (err error) {
139 if err != nil { 156 if err != nil {
140 return 157 return
141 } 158 }
159 return ParseConfigBytes(file, true)
160}
161
162// ParseConfigBytes reads a configuration from binary data. The data is
163// a JSON-encoded content. If 'subst' is true, the configuration strings
164// are subsituted
165func ParseConfigBytes(data []byte, subst bool) (err error) {
142 // unmarshal to Config data structure 166 // unmarshal to Config data structure
143 Cfg = new(Config) 167 Cfg = new(Config)
144 if err = json.Unmarshal(file, Cfg); err == nil { 168 if err = json.Unmarshal(data, Cfg); err == nil {
145 // process all string-based config settings and apply 169 // process all string-based config settings and apply
146 // string substitutions. 170 // string substitutions.
147 applySubstitutions(Cfg, Cfg.Env) 171 applySubstitutions(Cfg, Cfg.Env)
diff --git a/src/gnunet/config/config_test.go b/src/gnunet/config/config_test.go
index 3afd3a7..d40bc19 100644
--- a/src/gnunet/config/config_test.go
+++ b/src/gnunet/config/config_test.go
@@ -20,6 +20,7 @@ package config
20 20
21import ( 21import (
22 "encoding/json" 22 "encoding/json"
23 "io/ioutil"
23 "testing" 24 "testing"
24 25
25 "github.com/bfix/gospel/logger" 26 "github.com/bfix/gospel/logger"
@@ -27,14 +28,18 @@ import (
27 28
28func TestConfigRead(t *testing.T) { 29func TestConfigRead(t *testing.T) {
29 logger.SetLogLevel(logger.WARN) 30 logger.SetLogLevel(logger.WARN)
30 if err := ParseConfig("./gnunet-config.json"); err != nil { 31
32 // read configuration file
33 data, err := ioutil.ReadFile("./gnunet-config.json")
34 if err != nil {
35 t.Fatal(err)
36 }
37 // parse configuration
38 if err := ParseConfigBytes(data, false); err != nil {
31 t.Fatal(err) 39 t.Fatal(err)
32 } 40 }
33 if testing.Verbose() { 41 // write configuration
34 data, err := json.Marshal(Cfg) 42 if _, err = json.Marshal(Cfg); err != nil {
35 if err != nil { 43 t.Fatal(err)
36 t.Fatal(err)
37 }
38 t.Log("cfg=" + string(data))
39 } 44 }
40} 45}
diff --git a/src/gnunet/config/gnunet-config.json b/src/gnunet/config/gnunet-config.json
index 941cf21..82606d7 100644
--- a/src/gnunet/config/gnunet-config.json
+++ b/src/gnunet/config/gnunet-config.json
@@ -1,58 +1,65 @@
1{ 1{
2 "local": { 2 "bootstrap": {
3 "privateSeed": "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", 3 "nodes": [
4 "endpoints": [ 4 "gnunet://hello/7KTBJ90340HF1Q2GB0A57E2XJER4FDHX8HP5GHEB9125VPWPD27G/BNMDFN6HJCPWSPNBSEC06MC1K8QN1Z2DHRQSRXDTFR7FTBD4JHNBJ2RJAAEZ31FWG1Q3PMN3PXGZQ3Q7NTNEKQZFA7TE2Y46FM8E20R/1653499308?r5n%2Bip%2Budp%3A127.0.0.1%3A7654"
5 "r5n+ip+udp:127.0.0.1:6666" 5 ]
6 ] 6 },
7 }, 7 "local": {
8 "bootstrap": { 8 "name": "ygng",
9 "nodes": [ 9 "privateSeed": "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
10 "gnunet://hello/7KTBJ90340HF1Q2GB0A57E2XJER4FDHX8HP5GHEB9125VPWPD27G/BNMDFN6HJCPWSPNBSEC06MC1K8QN1Z2DHRQSRXDTFR7FTBD4JHNBJ2RJAAEZ31FWG1Q3PMN3PXGZQ3Q7NTNEKQZFA7TE2Y46FM8E20R/1653499308?r5n%2Bip%2Budp%3A127.0.0.1%3A7654" 10 "endpoints": [
11 ] 11 {
12 }, 12 "id": "test",
13 "environ": { 13 "network": "ip+udp",
14 "TMP": "/tmp", 14 "address": "upnp:192.168.178.1",
15 "RT_SYS": "${TMP}/gnunet-system-runtime" 15 "port": 6666,
16 }, 16 "ttl": 86400
17 "dht": { 17 }
18 "service": { 18 ]
19 "socket": "${RT_SYS}/gnunet-service-dht.sock", 19 },
20 "params": { 20 "environ": {
21 "perm": "0770" 21 "TMP": "/tmp",
22 } 22 "RT_SYS": "${TMP}/gnunet-system-runtime"
23 }, 23 },
24 "storage": "dht_file_store+/var/lib/gnunet/dht/store", 24 "dht": {
25 "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000" 25 "service": {
26 }, 26 "socket": "${RT_SYS}/gnunet-service-dht.sock",
27 "gns": { 27 "params": {
28 "service": { 28 "perm": "0770"
29 "socket": "${RT_SYS}/gnunet-service-gns-go.sock", 29 }
30 "params": { 30 },
31 "perm": "0770" 31 "storage": "dht_file_store+/var/lib/gnunet/dht/store",
32 } 32 "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000"
33 }, 33 },
34 "dhtReplLevel": 10, 34 "gns": {
35 "maxDepth": 250 35 "service": {
36 }, 36 "socket": "${RT_SYS}/gnunet-service-gns-go.sock",
37 "namecache": { 37 "params": {
38 "service": { 38 "perm": "0770"
39 "socket": "${RT_SYS}/gnunet-service-namecache.sock", 39 }
40 "params": { 40 },
41 "perm": "0770" 41 "dhtReplLevel": 10,
42 } 42 "maxDepth": 250
43 }, 43 },
44 "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000" 44 "namecache": {
45 }, 45 "service": {
46 "revocation": { 46 "socket": "${RT_SYS}/gnunet-service-namecache.sock",
47 "service": { 47 "params": {
48 "socket": "${RT_SYS}/gnunet-service-revocation-go.sock", 48 "perm": "0770"
49 "params": { 49 }
50 "perm": "0770" 50 },
51 } 51 "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000"
52 }, 52 },
53 "storage": "redis:localhost:6397::15" 53 "revocation": {
54 }, 54 "service": {
55 "rpc": { 55 "socket": "${RT_SYS}/gnunet-service-revocation-go.sock",
56 "endpoint": "tcp:127.0.0.1:80" 56 "params": {
57 } 57 "perm": "0770"
58 }
59 },
60 "storage": "redis:localhost:6397::15"
61 },
62 "rpc": {
63 "endpoint": "tcp:127.0.0.1:80"
64 }
58} \ No newline at end of file 65} \ No newline at end of file
diff --git a/src/gnunet/core/core.go b/src/gnunet/core/core.go
index c3bf355..7582891 100644
--- a/src/gnunet/core/core.go
+++ b/src/gnunet/core/core.go
@@ -20,16 +20,34 @@ package core
20 20
21import ( 21import (
22 "context" 22 "context"
23 "errors"
24 "gnunet/config"
23 "gnunet/message" 25 "gnunet/message"
24 "gnunet/service/dht/blocks" 26 "gnunet/service/dht/blocks"
25 "gnunet/transport" 27 "gnunet/transport"
26 "gnunet/util" 28 "gnunet/util"
27 "net" 29 "net"
30 "strings"
28 "time" 31 "time"
32)
29 33
30 "github.com/bfix/gospel/data" 34//----------------------------------------------------------------------
35// Core-related error codes
36var (
37 ErrCoreNoUpnpDyn = errors.New("no dynamic port with UPnP")
38 ErrCoreNoEndpAddr = errors.New("no endpoint for address")
31) 39)
32 40
41//----------------------------------------------------------------------
42// EndpointRef is a reference to an endpoint instance managed by core.
43type EndpointRef struct {
44 id string // endpoint identifier in configuration
45 ep transport.Endpoint // reference to endpoint
46 addr *util.Address // public endpoint address
47 upnpId string // UPNP identifier (empty if unused)
48}
49
50//----------------------------------------------------------------------
33// Core service 51// Core service
34type Core struct { 52type Core struct {
35 // local peer instance 53 // local peer instance
@@ -41,31 +59,91 @@ type Core struct {
41 // reference to transport implementation 59 // reference to transport implementation
42 trans *transport.Transport 60 trans *transport.Transport
43 61
44 // registered listeners 62 // registered signal listeners
45 listeners map[string]*Listener 63 listeners map[string]*Listener
46 64
47 // list of known peers with addresses 65 // list of known peers with addresses
48 peers *util.PeerAddrList 66 peers *util.PeerAddrList
67
68 // List of registered endpoints
69 endpoints map[string]*EndpointRef
49} 70}
50 71
51//---------------------------------------------------------------------- 72//----------------------------------------------------------------------
52 73
53// NewCore creates and runs a new core instance. 74// NewCore creates and runs a new core instance.
54func NewCore(ctx context.Context, local *Peer) (c *Core, err error) { 75func NewCore(ctx context.Context, node *config.NodeConfig) (c *Core, err error) {
76
77 // instantiate peer
78 var peer *Peer
79 if peer, err = NewLocalPeer(node); err != nil {
80 return
81 }
55 // create new core instance 82 // create new core instance
56 incoming := make(chan *transport.TransportMessage) 83 incoming := make(chan *transport.TransportMessage)
57 c = &Core{ 84 c = &Core{
58 local: local, 85 local: peer,
59 incoming: incoming, 86 incoming: incoming,
60 listeners: make(map[string]*Listener), 87 listeners: make(map[string]*Listener),
61 trans: transport.NewTransport(ctx, incoming), 88 trans: transport.NewTransport(ctx, node.Name, incoming),
62 peers: util.NewPeerAddrList(), 89 peers: util.NewPeerAddrList(),
90 endpoints: make(map[string]*EndpointRef),
63 } 91 }
64 // add all local peer endpoints to transport. 92 // add all local peer endpoints to transport.
65 for _, addr := range local.addrList { 93 for _, epCfg := range node.Endpoints {
66 if _, err = c.trans.AddEndpoint(ctx, addr); err != nil { 94 var (
95 upnpId string // upnp identifier
96 local *util.Address // local address
97 remote *util.Address // remote address
98 ep transport.Endpoint // endpoint reference
99 )
100 // handle special addresses:
101 if strings.HasPrefix(epCfg.Address, "upnp:") {
102 // don't allow dynamic port assignment
103 if epCfg.Port == 0 {
104 err = ErrCoreNoUpnpDyn
105 return
106 }
107 // handle UPNP port forwarding
108 protocol := transport.EpProtocol(epCfg.Network)
109 var localA, remoteA string
110 if upnpId, remoteA, localA, err = c.trans.ForwardOpen(protocol, epCfg.Address[5:], epCfg.Port); err != nil {
111 return
112 }
113 // parse local and remote addresses
114 if local, err = util.ParseAddress(epCfg.Network + "://" + localA); err != nil {
115 return
116 }
117 if remote, err = util.ParseAddress(epCfg.Network + "://" + remoteA); err != nil {
118 return
119 }
120 } else {
121 // direct address specification:
122 if local, err = util.ParseAddress(epCfg.Addr()); err != nil {
123 return
124 }
125 remote = local
126 upnpId = ""
127 }
128 // add endpoint for address
129 if ep, err = c.trans.AddEndpoint(ctx, local); err != nil {
67 return 130 return
68 } 131 }
132 // if port is set to 0, replace it with port assigned dynamically.
133 // only applies to direct listening addresses!
134 if epCfg.Port == 0 && local == remote {
135 addr := ep.Address()
136 if remote, err = util.ParseAddress(addr.Network() + "://" + addr.String()); err != nil {
137 return
138 }
139 }
140 // save endpoint reference
141 c.endpoints[epCfg.ID] = &EndpointRef{
142 id: epCfg.ID,
143 ep: ep,
144 addr: remote,
145 upnpId: upnpId,
146 }
69 } 147 }
70 // run message pump 148 // run message pump
71 go func() { 149 go func() {
@@ -77,32 +155,31 @@ func NewCore(ctx context.Context, local *Peer) (c *Core, err error) {
77 var ev *Event 155 var ev *Event
78 156
79 // inspect message for peer state events 157 // inspect message for peer state events
80 m, err := tm.Message() 158 switch msg := tm.Msg.(type) {
81 if err == nil { 159 case *message.HelloMsg:
82 switch msg := m.(type) { 160 // keep peer addresses
83 case *message.HelloMsg: 161 for _, addr := range msg.Addresses {
84 // keep peer addresses 162 a := &util.Address{
85 for _, addr := range msg.Addresses { 163 Netw: addr.Transport,
86 a := &util.Address{ 164 Address: addr.Address,
87 Netw: addr.Transport, 165 Expires: addr.ExpireOn,
88 Address: addr.Address,
89 Expires: addr.ExpireOn,
90 }
91 c.Learn(ctx, msg.PeerID, a)
92 } 166 }
93 // generate EV_CONNECT event 167 c.Learn(ctx, msg.PeerID, a)
94 ev = new(Event) 168 }
95 ev.ID = EV_CONNECT 169 // generate EV_CONNECT event
96 ev.Peer = tm.Peer 170 ev = &Event{
97 ev.Msg = msg 171 ID: EV_CONNECT,
98 c.dispatch(ev) 172 Peer: tm.Peer,
173 Msg: msg,
99 } 174 }
175 c.dispatch(ev)
100 } 176 }
101 // generate EV_MESSAGE event 177 // generate EV_MESSAGE event
102 ev = new(Event) 178 ev = &Event{
103 ev.ID = EV_MESSAGE 179 ID: EV_MESSAGE,
104 ev.Peer = tm.Peer 180 Peer: tm.Peer,
105 ev.Msg, _ = tm.Message() 181 Msg: tm.Msg,
182 }
106 c.dispatch(ev) 183 c.dispatch(ev)
107 184
108 // wait for termination 185 // wait for termination
@@ -114,29 +191,63 @@ func NewCore(ctx context.Context, local *Peer) (c *Core, err error) {
114 return 191 return
115} 192}
116 193
194// Shutdown all core-related processes.
195func (c *Core) Shutdown() {
196 c.trans.Shutdown()
197 c.local.Shutdown()
198}
199
117//---------------------------------------------------------------------- 200//----------------------------------------------------------------------
118 201
119// Send is a function that allows the local peer to send a protocol 202// Send is a function that allows the local peer to send a protocol
120// message to a remote peer. 203// message to a remote peer.
121func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg message.Message) error { 204func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg message.Message) error {
122 // TODO: select best endpoint protocol for transport; now fixed to UDP 205 // TODO: select best endpoint protocol for transport; now fixed to IP+UDP
123 netw := "udp" 206 netw := "ip+udp"
124 addr := c.peers.Get(peer.String(), netw) 207 addrs := c.peers.Get(peer.String(), netw)
125 payload, err := data.Marshal(msg) 208 if len(addrs) == 0 {
126 if err != nil { 209 return ErrCoreNoEndpAddr
127 return err
128 } 210 }
129 tm := transport.NewTransportMessage(c.PeerID(), payload) 211 // TODO: select best address; curently selects first
212 addr := addrs[0]
213
214 // select best endpoint for transport
215 var ep transport.Endpoint
216 for _, epCfg := range c.endpoints {
217 if epCfg.addr.Network() == netw {
218 if ep == nil {
219 ep = epCfg.ep
220 }
221 // TODO: compare endpoints, select better one:
222 // if ep.Better(epCfg.ep) {
223 // ep = epCfg.ep
224 // }
225 }
226 }
227 // check we have an endpoint to send on
228 if ep == nil {
229 return ErrCoreNoEndpAddr
230 }
231 // assemble transport message
232 tm := transport.NewTransportMessage(c.PeerID(), msg)
233 // send on transport
130 return c.trans.Send(ctx, addr, tm) 234 return c.trans.Send(ctx, addr, tm)
131} 235}
132 236
133// Learn a (new) address for peer 237// Learn a (new) address for peer
134func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) (err error) { 238func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) (err error) {
135 if c.peers.Add(peer.String(), addr) == 1 { 239 if c.peers.Add(peer.String(), addr) == 1 {
240 // we added a previously unknown peer: send a HELLO
241
242 // collect endpoint addresses
243 addrList := make([]*util.Address, 0)
244 for _, epRef := range c.endpoints {
245 addrList = append(addrList, epRef.addr)
246 }
136 // new peer id: send HELLO message to newly added peer 247 // new peer id: send HELLO message to newly added peer
137 node := c.local 248 node := c.local
138 var hello *blocks.HelloBlock 249 var hello *blocks.HelloBlock
139 hello, err = node.HelloData(time.Hour) 250 hello, err = node.HelloData(time.Hour, addrList)
140 if err != nil { 251 if err != nil {
141 return 252 return
142 } 253 }
@@ -150,11 +261,28 @@ func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address)
150 return 261 return
151} 262}
152 263
264// Addresses returns the list of listening endpoint addresses
265func (c *Core) Addresses() (list []*util.Address, err error) {
266 for _, epRef := range c.endpoints {
267 list = append(list, epRef.addr)
268 }
269 return
270}
271
272//----------------------------------------------------------------------
273
274// Peer returns the local peer
275func (c *Core) Peer() *Peer {
276 return c.local
277}
278
153// PeerID returns the peer id of the local node. 279// PeerID returns the peer id of the local node.
154func (c *Core) PeerID() *util.PeerID { 280func (c *Core) PeerID() *util.PeerID {
155 return c.local.GetID() 281 return c.local.GetID()
156} 282}
157 283
284//----------------------------------------------------------------------
285
158// TryConnect is a function which allows the local peer to attempt the 286// TryConnect is a function which allows the local peer to attempt the
159// establishment of a connection to another peer using an address. 287// establishment of a connection to another peer using an address.
160// When the connection attempt is successful, information on the new 288// When the connection attempt is successful, information on the new
diff --git a/src/gnunet/core/core_test.go b/src/gnunet/core/core_test.go
index 102abf1..d8a6277 100644
--- a/src/gnunet/core/core_test.go
+++ b/src/gnunet/core/core_test.go
@@ -20,23 +20,149 @@ package core
20 20
21import ( 21import (
22 "context" 22 "context"
23 "encoding/hex"
23 "gnunet/config" 24 "gnunet/config"
24 "gnunet/util" 25 "gnunet/util"
25 "testing" 26 "testing"
26 "time" 27 "time"
27) 28)
28 29
29var ( 30//----------------------------------------------------------------------
30 peer1Cfg = &config.NodeConfig{ 31// Two node GNUnet (smallest and simplest network)
31 PrivateSeed: "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=", 32//----------------------------------------------------------------------
32 Endpoints: []string{"udp://127.0.0.1:20861"}, 33
34// TestCoreSimple test a two node network
35func TestCoreSimple(t *testing.T) {
36
37 var (
38 peer1Cfg = &config.NodeConfig{
39 Name: "p1",
40 PrivateSeed: "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=",
41 Endpoints: []*config.EndpointConfig{
42 {
43 ID: "p1",
44 Network: "ip+udp",
45 Address: "127.0.0.1",
46 Port: 0,
47 TTL: 86400,
48 },
49 },
50 }
51
52 peer2Cfg = &config.NodeConfig{
53 Name: "p2",
54 PrivateSeed: "Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=",
55 Endpoints: []*config.EndpointConfig{
56 {
57 ID: "p2",
58 Network: "ip+udp",
59 Address: "127.0.0.1",
60 Port: 20862,
61 TTL: 86400,
62 },
63 },
64 }
65 )
66
67 // setup execution context
68 ctx, cancel := context.WithCancel(context.Background())
69 defer func() {
70 cancel()
71 time.Sleep(time.Second)
72 }()
73
74 // create and run nodes
75 node1, err := NewTestNode(t, ctx, peer1Cfg)
76 if err != nil {
77 t.Fatal(err)
78 }
79 node2, err := NewTestNode(t, ctx, peer2Cfg)
80 if err != nil {
81 t.Fatal(err)
33 } 82 }
34 83
35 peer2Cfg = &config.NodeConfig{ 84 // learn peer addresses (triggers HELLO)
36 PrivateSeed: "Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=", 85 list, err := node2.core.Addresses()
37 Endpoints: []string{"udp://127.0.0.1:20862"}, 86 if err != nil {
87 t.Fatal(err)
38 } 88 }
39) 89 for _, addr := range list {
90 node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr))
91 }
92
93 // wait for 5 seconds
94 time.Sleep(5 * time.Second)
95}
96
97//----------------------------------------------------------------------
98// Two node GNUnet both running locally, but exchanging messages over
99// the internet (UPNP router required).
100//----------------------------------------------------------------------
101
102// TestCoreSimple test a two node network
103func TestCoreUPNP(t *testing.T) {
104
105 // configuration data
106 var (
107 peer1Cfg = &config.NodeConfig{
108 Name: "p1",
109 PrivateSeed: "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=",
110 Endpoints: []*config.EndpointConfig{
111 {
112 ID: "p1",
113 Network: "ip+udp",
114 Address: "upnp:",
115 Port: 2086,
116 TTL: 86400,
117 },
118 },
119 }
120 peer2Cfg = &config.NodeConfig{
121 Name: "p2",
122 PrivateSeed: "Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=",
123 Endpoints: []*config.EndpointConfig{
124 {
125 ID: "p2",
126 Network: "ip+udp",
127 Address: "upnp:",
128 Port: 1080,
129 TTL: 86400,
130 },
131 },
132 }
133 )
134
135 // setup execution context
136 ctx, cancel := context.WithCancel(context.Background())
137 defer func() {
138 cancel()
139 time.Sleep(time.Second)
140 }()
141
142 // create and run nodes
143 node1, err := NewTestNode(t, ctx, peer1Cfg)
144 if err != nil {
145 t.Fatal(err)
146 }
147 defer node1.Shutdown()
148 node2, err := NewTestNode(t, ctx, peer2Cfg)
149 if err != nil {
150 t.Fatal(err)
151 }
152 defer node2.Shutdown()
153
154 // learn peer addresses (triggers HELLO)
155 list, err := node2.core.Addresses()
156 if err != nil {
157 t.Fatal(err)
158 }
159 for _, addr := range list {
160 node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr))
161 }
162
163 // sleep a bit
164 time.Sleep(3 * time.Second)
165}
40 166
41//---------------------------------------------------------------------- 167//----------------------------------------------------------------------
42// create and run a node with given spec 168// create and run a node with given spec
@@ -50,9 +176,15 @@ type TestNode struct {
50 addr *util.Address 176 addr *util.Address
51} 177}
52 178
179func (n *TestNode) Shutdown() {
180 n.core.Shutdown()
181}
182
53func (n *TestNode) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) { 183func (n *TestNode) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) {
54 n.t.Logf("[%d] Learning %s for %s", n.id, addr.StringAll(), peer.String()) 184 n.t.Logf("[%d] Learning %s for %s", n.id, addr.StringAll(), peer.String())
55 n.core.Learn(ctx, peer, addr) 185 if err := n.core.Learn(ctx, peer, addr); err != nil {
186 n.t.Log("Learn: " + err.Error())
187 }
56} 188}
57 189
58func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (node *TestNode, err error) { 190func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (node *TestNode, err error) {
@@ -62,18 +194,25 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (nod
62 node.t = t 194 node.t = t
63 node.id = util.NextID() 195 node.id = util.NextID()
64 196
197 // create core service
198 if node.core, err = NewCore(ctx, cfg); err != nil {
199 return
200 }
201 node.peer = node.core.Peer()
202
65 // create peer object 203 // create peer object
66 if node.peer, err = NewLocalPeer(cfg); err != nil { 204 if node.peer, err = NewLocalPeer(cfg); err != nil {
67 return 205 return
68 } 206 }
69 t.Logf("[%d] Node %s starting", node.id, node.peer.GetID()) 207 t.Logf("[%d] Node %s starting", node.id, node.peer.GetID())
208 t.Logf("[%d] --> %s", node.id, hex.EncodeToString(node.peer.GetID().Key))
70 209
71 // create core service 210 list, err := node.core.Addresses()
72 if node.core, err = NewCore(ctx, node.peer); err != nil { 211 if err != nil {
73 return 212 t.Fatal(err)
74 } 213 }
75 for _, addr := range node.core.trans.Endpoints() { 214 for _, addr := range list {
76 s := addr.Network() + ":" + addr.String() 215 s := addr.Network() + "://" + addr.String()
77 if node.addr, err = util.ParseAddress(s); err != nil { 216 if node.addr, err = util.ParseAddress(s); err != nil {
78 continue 217 continue
79 } 218 }
@@ -82,7 +221,7 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (nod
82 221
83 // register as event listener 222 // register as event listener
84 incoming := make(chan *Event) 223 incoming := make(chan *Event)
85 node.core.Register("test", NewListener(incoming, nil)) 224 node.core.Register(cfg.Name, NewListener(incoming, nil))
86 225
87 // heart beat 226 // heart beat
88 tick := time.NewTicker(5 * time.Minute) 227 tick := time.NewTicker(5 * time.Minute)
@@ -100,6 +239,7 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (nod
100 t.Logf("[%d] <<< Peer %s diconnected", node.id, ev.Peer) 239 t.Logf("[%d] <<< Peer %s diconnected", node.id, ev.Peer)
101 case EV_MESSAGE: 240 case EV_MESSAGE:
102 t.Logf("[%d] <<< Msg from %s of type %d", node.id, ev.Peer, ev.Msg.Header().MsgType) 241 t.Logf("[%d] <<< Msg from %s of type %d", node.id, ev.Peer, ev.Msg.Header().MsgType)
242 t.Logf("[%d] <<< --> %s", node.id, ev.Msg.String())
103 } 243 }
104 244
105 // handle termination signal 245 // handle termination signal
@@ -115,36 +255,3 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (nod
115 }() 255 }()
116 return 256 return
117} 257}
118
119//----------------------------------------------------------------------
120// Two node GNUnet (smallest and simplest network)
121//----------------------------------------------------------------------
122
123// TestCoreSimple test a two node network
124func TestCoreSimple(t *testing.T) {
125
126 // setup execution context
127 ctx, cancel := context.WithCancel(context.Background())
128 defer func() {
129 cancel()
130 time.Sleep(time.Second)
131 }()
132
133 // create and run nodes
134 node1, err := NewTestNode(t, ctx, peer1Cfg)
135 if err != nil {
136 t.Fatal(err)
137 }
138 node2, err := NewTestNode(t, ctx, peer2Cfg)
139 if err != nil {
140 t.Fatal(err)
141 }
142
143 // learn peer addresses (triggers HELLO)
144 for _, addr := range node2.core.trans.Endpoints() {
145 node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr))
146 }
147
148 // wait for 5 seconds
149 time.Sleep(5 * time.Second)
150}
diff --git a/src/gnunet/core/peer.go b/src/gnunet/core/peer.go
index 2b6fe74..86ed43a 100644
--- a/src/gnunet/core/peer.go
+++ b/src/gnunet/core/peer.go
@@ -48,7 +48,6 @@ type Peer struct {
48 prv *ed25519.PrivateKey // node private key (long-term signing key) 48 prv *ed25519.PrivateKey // node private key (long-term signing key)
49 pub *ed25519.PublicKey // node public key (=identifier) 49 pub *ed25519.PublicKey // node public key (=identifier)
50 idString string // node identifier as string 50 idString string // node identifier as string
51 addrList []*util.Address // list of addresses associated with node
52 ephPrv *ed25519.PrivateKey // ephemeral signing key 51 ephPrv *ed25519.PrivateKey // ephemeral signing key
53 ephMsg *message.EphemeralKeyMsg // ephemeral signing key message 52 ephMsg *message.EphemeralKeyMsg // ephemeral signing key message
54} 53}
@@ -73,16 +72,6 @@ func NewLocalPeer(cfg *config.NodeConfig) (p *Peer, err error) {
73 if err != nil { 72 if err != nil {
74 return 73 return
75 } 74 }
76 // set the endpoint addresses for local node
77 p.addrList = make([]*util.Address, len(cfg.Endpoints))
78 var addr *util.Address
79 for i, a := range cfg.Endpoints {
80 if addr, err = util.ParseAddress(a); err != nil {
81 return
82 }
83 addr.Expires = util.NewAbsoluteTime(time.Now().Add(12 * time.Hour))
84 p.addrList[i] = addr
85 }
86 return 75 return
87} 76}
88 77
@@ -98,36 +87,24 @@ func NewPeer(peerID string) (p *Peer, err error) {
98 p.prv = nil 87 p.prv = nil
99 p.pub = ed25519.NewPublicKeyFromBytes(data) 88 p.pub = ed25519.NewPublicKeyFromBytes(data)
100 p.idString = util.EncodeBinaryToString(p.pub.Bytes()) 89 p.idString = util.EncodeBinaryToString(p.pub.Bytes())
101 p.addrList = make([]*util.Address, 0)
102 return 90 return
103} 91}
104 92
93// Shutdown peer-related processes.
94func (p *Peer) Shutdown() {}
95
105//---------------------------------------------------------------------- 96//----------------------------------------------------------------------
106//---------------------------------------------------------------------- 97//----------------------------------------------------------------------
107 98
108// Address returns a peer address for the given transport protocol 99// HelloData returns the current HELLO data for the peer. The list of listening
109func (p *Peer) Address(transport string) *util.Address { 100// endpoint addresses re passed in from core to reflect the actual active
110 for _, addr := range p.addrList { 101// endpoints.
111 // skip expired entries 102func (p *Peer) HelloData(ttl time.Duration, a []*util.Address) (h *blocks.HelloBlock, err error) {
112 if addr.Expires.Expired() {
113 continue
114 }
115 // filter by transport protocol
116 if len(transport) > 0 && transport != addr.Netw {
117 continue
118 }
119 return addr
120 }
121 return nil
122}
123
124// HelloData returns the current HELLO data for the peer
125func (p *Peer) HelloData(ttl time.Duration) (h *blocks.HelloBlock, err error) {
126 // assemble HELLO data 103 // assemble HELLO data
127 h = new(blocks.HelloBlock) 104 h = new(blocks.HelloBlock)
128 h.PeerID = p.GetID() 105 h.PeerID = p.GetID()
129 h.Expire = util.NewAbsoluteTime(time.Now().Add(ttl)) 106 h.Expire = util.NewAbsoluteTime(time.Now().Add(ttl))
130 h.SetAddresses(p.addrList) 107 h.SetAddresses(a)
131 108
132 // sign data 109 // sign data
133 err = h.Sign(p.prv) 110 err = h.Sign(p.prv)
@@ -171,16 +148,6 @@ func (p *Peer) GetIDString() string {
171 return p.idString 148 return p.idString
172} 149}
173 150
174// GetAddressList returns a list of addresses associated with this peer.
175func (p *Peer) GetAddressList() []*util.Address {
176 return p.addrList
177}
178
179// AddAddress adds a new address for a node.
180func (p *Peer) AddAddress(a *util.Address) {
181 p.addrList = append(p.addrList, a)
182}
183
184// Sign a message with the (long-term) private key. 151// Sign a message with the (long-term) private key.
185func (p *Peer) Sign(msg []byte) (*ed25519.EdSignature, error) { 152func (p *Peer) Sign(msg []byte) (*ed25519.EdSignature, error) {
186 if p.prv == nil { 153 if p.prv == nil {
diff --git a/src/gnunet/core/peer_test.go b/src/gnunet/core/peer_test.go
index 28b328f..70a5c5f 100644
--- a/src/gnunet/core/peer_test.go
+++ b/src/gnunet/core/peer_test.go
@@ -21,6 +21,7 @@ package core
21import ( 21import (
22 "gnunet/config" 22 "gnunet/config"
23 "gnunet/service/dht/blocks" 23 "gnunet/service/dht/blocks"
24 "gnunet/util"
24 "testing" 25 "testing"
25 "time" 26 "time"
26) 27)
@@ -29,8 +30,13 @@ import (
29var ( 30var (
30 cfg = &config.NodeConfig{ 31 cfg = &config.NodeConfig{
31 PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", 32 PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
32 Endpoints: []string{ 33 Endpoints: []*config.EndpointConfig{
33 "r5n+ip+udp://127.0.0.1:6666", 34 {
35 ID: "test",
36 Network: "r5n+ip+udp",
37 Address: "127.0.0.1",
38 Port: 6666,
39 },
34 }, 40 },
35 } 41 }
36 TTL = 6 * time.Hour 42 TTL = 6 * time.Hour
@@ -44,8 +50,17 @@ func TestPeerHello(t *testing.T) {
44 t.Fatal(err) 50 t.Fatal(err)
45 } 51 }
46 52
47 // get HELLO data for the node 53 // get HELLO data for the node:
48 h, err := node.HelloData(TTL) 54 // This hack will only work for direct listening addresses
55 addrList := make([]*util.Address, 0)
56 for _, epRef := range cfg.Endpoints {
57 addr, err := util.ParseAddress(epRef.Addr())
58 if err != nil {
59 t.Fatal(err)
60 }
61 addrList = append(addrList, addr)
62 }
63 h, err := node.HelloData(TTL, addrList)
49 64
50 // convert to URL and back 65 // convert to URL and back
51 u := h.URL() 66 u := h.URL()
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index ad203ca..eb17b30 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -15,9 +15,11 @@ require (
15require ( 15require (
16 github.com/cespare/xxhash/v2 v2.1.2 // indirect 16 github.com/cespare/xxhash/v2 v2.1.2 // indirect
17 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect 17 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
18 github.com/huin/goupnp v1.0.0 // indirect
18 golang.org/x/mod v0.4.2 // indirect 19 golang.org/x/mod v0.4.2 // indirect
19 golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect 20 golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
20 golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect 21 golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
22 golang.org/x/text v0.3.7 // indirect
21 golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect 23 golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect
22 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect 24 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
23) 25)
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index f2baf8e..17c2d00 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -11,6 +11,7 @@ github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC
11github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= 11github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
12github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= 12github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
13github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= 13github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
14github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo=
14github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= 15github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc=
15github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= 16github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
16github.com/mattn/go-sqlite3 v1.14.13 h1:1tj15ngiFfcZzii7yd82foL+ks+ouQcj8j/TPq3fk1I= 17github.com/mattn/go-sqlite3 v1.14.13 h1:1tj15ngiFfcZzii7yd82foL+ks+ouQcj8j/TPq3fk1I=
@@ -55,6 +56,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
55golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= 56golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
56golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= 57golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
57golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= 58golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
59golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
58golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= 60golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
59golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= 61golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
60golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 h1:BonxutuHCTL0rBDnZlKjpGIQFTjyUVTexFOdWkB6Fg0= 62golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 h1:BonxutuHCTL0rBDnZlKjpGIQFTjyUVTexFOdWkB6Fg0=
diff --git a/src/gnunet/message/msg_hello.go b/src/gnunet/message/msg_hello.go
index 18fe9d5..25ef98a 100644
--- a/src/gnunet/message/msg_hello.go
+++ b/src/gnunet/message/msg_hello.go
@@ -59,7 +59,7 @@ func NewHelloAddress(a *util.Address) *HelloAddress {
59// String returns a human-readable representation of the message. 59// String returns a human-readable representation of the message.
60func (a *HelloAddress) String() string { 60func (a *HelloAddress) String() string {
61 return fmt.Sprintf("Address{%s,expire=%s}", 61 return fmt.Sprintf("Address{%s,expire=%s}",
62 util.AddressString(a.Transport, a.Address), a.ExpireOn) 62 util.URI(a.Transport, a.Address), a.ExpireOn)
63} 63}
64 64
65// HelloMsg is a message send by peers to announce their presence 65// HelloMsg is a message send by peers to announce their presence
diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go
deleted file mode 100644
index e47699f..0000000
--- a/src/gnunet/modules.go
+++ /dev/null
@@ -1,84 +0,0 @@
1// This file is part of gnunet-go, a GNUnet-implementation in Golang.
2// Copyright (C) 2019, 2020 Bernd Fix >Y<
3//
4// gnunet-go is free software: you can redistribute it and/or modify it
5// under the terms of the GNU Affero General Public License as published
6// by the Free Software Foundation, either version 3 of the License,
7// or (at your option) any later version.
8//
9// gnunet-go is distributed in the hope that it will be useful, but
10// WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12// Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program. If not, see <http://www.gnu.org/licenses/>.
16//
17// SPDX-License-Identifier: AGPL3.0-or-later
18
19//======================================================================
20// Standalone (all-in-one) implementation of GNUnet:
21// -------------------------------------------------
22// Instead of running GNUnet services like GNS or DHT in separate
23// processes communicating (exchanging messages) with each other over
24// Unix Domain Sockets, the standalone implementation combines all
25// service modules into a single binary running go-routines to
26// concurrently performing their tasks.
27//======================================================================
28
29package gnunet
30
31import (
32 "gnunet/service/dht"
33 "gnunet/service/gns"
34 "gnunet/service/namecache"
35 "gnunet/service/revocation"
36 "net/rpc"
37)
38
39// Instances holds a list of all GNUnet service modules
40type Instances struct {
41 GNS *gns.Module
42 Namecache *namecache.NamecacheModule
43 DHT *dht.Module
44 Revocation *revocation.Module
45}
46
47// Register modules for JSON-RPC
48func (inst Instances) Register() {
49 rpc.Register(inst.GNS)
50 rpc.Register(inst.Namecache)
51 rpc.Register(inst.DHT)
52 rpc.Register(inst.Revocation)
53}
54
55// Local reference to instance list. The list is initialized
56// by core.
57var (
58 Modules Instances
59)
60
61/* TODO: implement
62// Initialize instance list and link module functions as required.
63// This function is called by core on start-up.
64func Init(ctx context.Context) {
65
66 // Namecache (no calls to other modules)
67 Modules.Namecache = namecache.NewModule(ctx, c)
68
69 // DHT (no calls to other modules)
70 Modules.DHT = dht.NewModule(ctx, c)
71
72 // Revocation (no calls to other modules)
73 Modules.Revocation = revocation.NewModule(ctx, c)
74
75 // GNS (calls Namecache, DHT and Identity)
76 gns := gns.NewModule(ctx, c)
77 Modules.GNS = gns
78 gns.LookupLocal = Modules.Namecache.Get
79 gns.StoreLocal = Modules.Namecache.Put
80 gns.LookupRemote = Modules.DHT.Get
81 gns.RevocationQuery = Modules.Revocation.Query
82 gns.RevocationRevoke = Modules.Revocation.Revoke
83}
84*/
diff --git a/src/gnunet/service/dht/blocks/hello.go b/src/gnunet/service/dht/blocks/hello.go
index 77fc2ae..eb3bf2a 100644
--- a/src/gnunet/service/dht/blocks/hello.go
+++ b/src/gnunet/service/dht/blocks/hello.go
@@ -177,7 +177,7 @@ func (h *HelloBlock) URL() string {
177 if i > 0 { 177 if i > 0 {
178 u += "&" 178 u += "&"
179 } 179 }
180 u += url.QueryEscape(a.String()) 180 u += url.QueryEscape(a.URI())
181 } 181 }
182 return u 182 return u
183} 183}
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index 5f04d54..3339aa2 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -81,7 +81,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module) {
81 81
82//---------------------------------------------------------------------- 82//----------------------------------------------------------------------
83 83
84// Get a block from the DHT 84// Get a block from the DHT ["dht:get"]
85func (nc *Module) Get(ctx context.Context, query blocks.Query) (block blocks.Block, err error) { 85func (nc *Module) Get(ctx context.Context, query blocks.Query) (block blocks.Block, err error) {
86 86
87 // check if we have the requested block in cache or permanent storage. 87 // check if we have the requested block in cache or permanent storage.
@@ -100,7 +100,7 @@ func (nc *Module) Get(ctx context.Context, query blocks.Query) (block blocks.Blo
100 return nil, nil 100 return nil, nil
101} 101}
102 102
103// Put a block into the DHT 103// Put a block into the DHT ["dht:put"]
104func (nc *Module) Put(ctx context.Context, key blocks.Query, block blocks.Block) error { 104func (nc *Module) Put(ctx context.Context, key blocks.Query, block blocks.Block) error {
105 return nil 105 return nil
106} 106}
@@ -142,6 +142,20 @@ func (m *Module) heartbeat(ctx context.Context) {
142 142
143//---------------------------------------------------------------------- 143//----------------------------------------------------------------------
144 144
145// Export functions
146func (m *Module) Export(fcn map[string]any) {
147 // add exported functions from module
148 fcn["dht:get"] = m.Get
149 fcn["dht:put"] = m.Put
150}
151
152// Import functions
153func (m *Module) Import(fcm map[string]any) {
154 // nothing to import now.
155}
156
157//----------------------------------------------------------------------
158
145// RPC returns the route and handler function for a JSON-RPC request 159// RPC returns the route and handler function for a JSON-RPC request
146func (m *Module) RPC() (string, func(http.ResponseWriter, *http.Request)) { 160func (m *Module) RPC() (string, func(http.ResponseWriter, *http.Request)) {
147 return "/gns/", func(wrt http.ResponseWriter, req *http.Request) { 161 return "/gns/", func(wrt http.ResponseWriter, req *http.Request) {
diff --git a/src/gnunet/service/dht/routingtable.go b/src/gnunet/service/dht/routingtable.go
index 0078b71..2933e6a 100644
--- a/src/gnunet/service/dht/routingtable.go
+++ b/src/gnunet/service/dht/routingtable.go
@@ -101,7 +101,7 @@ type RoutingTable struct {
101 ref *PeerAddress // reference address for distance 101 ref *PeerAddress // reference address for distance
102 buckets []*Bucket // list of buckets 102 buckets []*Bucket // list of buckets
103 list map[*PeerAddress]struct{} // keep list of peers 103 list map[*PeerAddress]struct{} // keep list of peers
104 rwlock sync.RWMutex // lock for write operations 104 mtx sync.RWMutex // lock for write operations
105 l2nse float64 // log2 of estimated network size 105 l2nse float64 // log2 of estimated network size
106 inProcess bool // flag if Process() is running 106 inProcess bool // flag if Process() is running
107} 107}
@@ -131,8 +131,8 @@ func NewRoutingTable(ref *PeerAddress) *RoutingTable {
131// Returns true if the entry was added, false otherwise. 131// Returns true if the entry was added, false otherwise.
132func (rt *RoutingTable) Add(p *PeerAddress) bool { 132func (rt *RoutingTable) Add(p *PeerAddress) bool {
133 // ensure one write and no readers 133 // ensure one write and no readers
134 rt.rwlock.Lock() 134 rt.lock(false)
135 defer rt.rwlock.Unlock() 135 defer rt.unlock(false)
136 136
137 // check if peer is already known 137 // check if peer is already known
138 if _, ok := rt.list[p]; ok { 138 if _, ok := rt.list[p]; ok {
@@ -153,8 +153,8 @@ func (rt *RoutingTable) Add(p *PeerAddress) bool {
153// Returns true if the entry was removed, false otherwise. 153// Returns true if the entry was removed, false otherwise.
154func (rt *RoutingTable) Remove(p *PeerAddress) bool { 154func (rt *RoutingTable) Remove(p *PeerAddress) bool {
155 // ensure one write and no readers 155 // ensure one write and no readers
156 rt.rwlock.Lock() 156 rt.lock(false)
157 defer rt.rwlock.Unlock() 157 defer rt.unlock(false)
158 158
159 // compute distance (bucket index) and remove entry from bucket 159 // compute distance (bucket index) and remove entry from bucket
160 _, idx := p.Distance(rt.ref) 160 _, idx := p.Distance(rt.ref)
@@ -170,10 +170,15 @@ func (rt *RoutingTable) Remove(p *PeerAddress) bool {
170//---------------------------------------------------------------------- 170//----------------------------------------------------------------------
171 171
172// Process a function f in the locked context of a routing table 172// Process a function f in the locked context of a routing table
173func (rt *RoutingTable) Process(f func() error) error { 173func (rt *RoutingTable) Process(f func() error, readonly bool) error {
174 // ensure one write and no readers 174 // handle locking
175 rt.rwlock.Lock() 175 rt.lock(readonly)
176 defer rt.rwlock.Unlock() 176 rt.inProcess = true
177 defer func() {
178 rt.inProcess = false
179 rt.unlock(readonly)
180 }()
181 // call function in unlocked context
177 return f() 182 return f()
178} 183}
179 184
@@ -184,8 +189,8 @@ func (rt *RoutingTable) Process(f func() error) error {
184// SelectClosestPeer for a given peer address and bloomfilter. 189// SelectClosestPeer for a given peer address and bloomfilter.
185func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) (n *PeerAddress) { 190func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) (n *PeerAddress) {
186 // no writer allowed 191 // no writer allowed
187 rt.rwlock.RLock() 192 rt.mtx.RLock()
188 defer rt.rwlock.RUnlock() 193 defer rt.mtx.RUnlock()
189 194
190 // find closest address 195 // find closest address
191 var dist *math.Int 196 var dist *math.Int
@@ -204,8 +209,8 @@ func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) (
204// included in the bloomfilter) 209// included in the bloomfilter)
205func (rt *RoutingTable) SelectRandomPeer(bf *PeerBloomFilter) *PeerAddress { 210func (rt *RoutingTable) SelectRandomPeer(bf *PeerBloomFilter) *PeerAddress {
206 // no writer allowed 211 // no writer allowed
207 rt.rwlock.RLock() 212 rt.mtx.RLock()
208 defer rt.rwlock.RUnlock() 213 defer rt.mtx.RUnlock()
209 214
210 // select random entry from list 215 // select random entry from list
211 if size := len(rt.list); size > 0 { 216 if size := len(rt.list); size > 0 {
@@ -279,11 +284,35 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) {
279 } 284 }
280 } 285 }
281 return nil 286 return nil
282 }); err != nil { 287 }, false); err != nil {
283 logger.Println(logger.ERROR, "[dht] RT heartbeat: "+err.Error()) 288 logger.Println(logger.ERROR, "[dht] RT heartbeat: "+err.Error())
284 } 289 }
285} 290}
286 291
292//----------------------------------------------------------------------
293
294// lock with given mode (if not in processing function)
295func (rt *RoutingTable) lock(readonly bool) {
296 if !rt.inProcess {
297 if readonly {
298 rt.mtx.RLock()
299 } else {
300 rt.mtx.Lock()
301 }
302 }
303}
304
305// lock with given mode (if not in processing function)
306func (rt *RoutingTable) unlock(readonly bool) {
307 if !rt.inProcess {
308 if readonly {
309 rt.mtx.RUnlock()
310 } else {
311 rt.mtx.Unlock()
312 }
313 }
314}
315
287//====================================================================== 316//======================================================================
288// Routing table buckets 317// Routing table buckets
289//====================================================================== 318//======================================================================
diff --git a/src/gnunet/service/dht/routingtable_test.go b/src/gnunet/service/dht/routingtable_test.go
index 659f9d4..33c4b7f 100644
--- a/src/gnunet/service/dht/routingtable_test.go
+++ b/src/gnunet/service/dht/routingtable_test.go
@@ -45,8 +45,12 @@ type Entry struct {
45var ( 45var (
46 cfg = &config.NodeConfig{ 46 cfg = &config.NodeConfig{
47 PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", 47 PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
48 Endpoints: []string{ 48 Endpoints: []*config.EndpointConfig{
49 "r5n+ip+udp://127.0.0.1:6666", 49 {
50 Network: "r5n+ip+udp",
51 Address: "127.0.0.1",
52 Port: 6666,
53 },
50 }, 54 },
51 } 55 }
52) 56)
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index 4878aa0..93f9bca 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -98,6 +98,7 @@ type Module struct {
98 RevocationRevoke func(ctx context.Context, rd *revocation.RevData) (success bool, err error) 98 RevocationRevoke func(ctx context.Context, rd *revocation.RevData) (success bool, err error)
99} 99}
100 100
101// NewModule instantiates a new GNS module.
101func NewModule(ctx context.Context, c *core.Core) (m *Module) { 102func NewModule(ctx context.Context, c *core.Core) (m *Module) {
102 m = &Module{ 103 m = &Module{
103 ModuleImpl: *service.NewModuleImpl(), 104 ModuleImpl: *service.NewModuleImpl(),
@@ -128,6 +129,23 @@ func (m *Module) event(ctx context.Context, ev *core.Event) {
128 129
129//---------------------------------------------------------------------- 130//----------------------------------------------------------------------
130 131
132// Export functions
133func (m *Module) Export(fcn map[string]any) {
134 // add exported functions from module
135}
136
137// Import functions
138func (m *Module) Import(fcn map[string]any) {
139 // resolve imports from other modules
140 m.LookupLocal = fcn["namecache:get"].(func(ctx context.Context, query *blocks.GNSQuery) (*blocks.GNSBlock, error))
141 m.StoreLocal = fcn["namecache:put"].(func(ctx context.Context, query *blocks.GNSQuery, block *blocks.GNSBlock) error)
142 m.LookupRemote = fcn["dht:get"].(func(ctx context.Context, query blocks.Query) (blocks.Block, error))
143 m.RevocationQuery = fcn["rev:query"].(func(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, err error))
144 m.RevocationRevoke = fcn["rev:revoke"].(func(ctx context.Context, rd *revocation.RevData) (success bool, err error))
145}
146
147//----------------------------------------------------------------------
148
131// Resolve a GNS name with multiple labels. If pkey is not nil, the name 149// Resolve a GNS name with multiple labels. If pkey is not nil, the name
132// is interpreted as "relative to current zone". 150// is interpreted as "relative to current zone".
133func (m *Module) Resolve( 151func (m *Module) Resolve(
diff --git a/src/gnunet/service/module.go b/src/gnunet/service/module.go
index 5f95975..4109f16 100644
--- a/src/gnunet/service/module.go
+++ b/src/gnunet/service/module.go
@@ -26,7 +26,43 @@ import (
26) 26)
27 27
28// Module is an interface for GNUnet service modules (workers). 28// Module is an interface for GNUnet service modules (workers).
29//
30// Modules can call other GNUnet services; these services can be used by
31// sending messages to the respective service socket (the default way) or by
32// calling the module functions directly (if the other module is compiled
33// along with the calling module into one binary). The latter method requires
34// calls to m.Export() and m.Import() to link the modules together (see
35// example):
36//
37// // create module instances
38// gnsMod = gns.NewModule(ctx, core)
39// dhtMod = dht.NewModule(ctx, core)
40// ncMod = namecache.NewModule(ctx, core)
41// revMod = revocation.NewModule(ctx, core)
42//
43// // export module functions
44// fcn := make(map[string]any)
45// gnsMod.Export(fcn)
46// dhtMod.Export(fcn)
47// ncMod.Export(fcn)
48// revMod.Export(fcn)
49//
50// // import (link) module functions
51// gnsMod.Import(fcn)
52// dhtMod.Import(fcn)
53// ncMod.Import(fcn)
54// revMod.Import(fcn)
55//
56// Exported and imported module function are identified by name defined in the
57// Export() function. Import() functions that access functions in other modules
58// need to use the same name for linking.
29type Module interface { 59type Module interface {
60 // Export functions by name
61 Export(map[string]any)
62
63 // Import functions by name
64 Import(map[string]any)
65
30 // RPC returns the route and handler for JSON-RPC requests 66 // RPC returns the route and handler for JSON-RPC requests
31 RPC() (string, func(http.ResponseWriter, *http.Request)) 67 RPC() (string, func(http.ResponseWriter, *http.Request))
32 68
diff --git a/src/gnunet/service/namecache/module.go b/src/gnunet/service/namecache/module.go
index 9d5bca1..b9aaad0 100644
--- a/src/gnunet/service/namecache/module.go
+++ b/src/gnunet/service/namecache/module.go
@@ -35,23 +35,39 @@ import (
35//---------------------------------------------------------------------- 35//----------------------------------------------------------------------
36 36
37// Namecache handles the transient storage of GNS blocks under the query key. 37// Namecache handles the transient storage of GNS blocks under the query key.
38type NamecacheModule struct { 38type Module struct {
39 service.ModuleImpl 39 service.ModuleImpl
40 40
41 cache service.DHTStore // transient block cache 41 cache service.DHTStore // transient block cache
42} 42}
43 43
44// NewModule creates a new module instance. 44// NewModule creates a new module instance.
45func NewModule(ctx context.Context, c *core.Core) (m *NamecacheModule) { 45func NewModule(ctx context.Context, c *core.Core) (m *Module) {
46 m = &NamecacheModule{ 46 m = &Module{
47 ModuleImpl: *service.NewModuleImpl(), 47 ModuleImpl: *service.NewModuleImpl(),
48 } 48 }
49 m.cache, _ = service.NewDHTStore(config.Cfg.Namecache.Storage) 49 m.cache, _ = service.NewDHTStore(config.Cfg.Namecache.Storage)
50 return 50 return
51} 51}
52 52
53//----------------------------------------------------------------------
54
55// Export functions
56func (m *Module) Export(fcn map[string]any) {
57 // add exported functions from module
58 fcn["namecache:get"] = m.Get
59 fcn["namecache:put"] = m.Put
60}
61
62// Import functions
63func (m *Module) Import(fcm map[string]any) {
64 // nothing to import now.
65}
66
67//----------------------------------------------------------------------
68
53// Get an entry from the cache if available. 69// Get an entry from the cache if available.
54func (m *NamecacheModule) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) { 70func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) {
55 var b blocks.Block 71 var b blocks.Block
56 b, err = m.cache.Get(query) 72 b, err = m.cache.Get(query)
57 err = blocks.Unwrap(b, block) 73 err = blocks.Unwrap(b, block)
@@ -59,6 +75,6 @@ func (m *NamecacheModule) Get(ctx context.Context, query *blocks.GNSQuery) (bloc
59} 75}
60 76
61// Put entry into the cache. 77// Put entry into the cache.
62func (m *NamecacheModule) Put(ctx context.Context, query *blocks.GNSQuery, block *blocks.GNSBlock) error { 78func (m *Module) Put(ctx context.Context, query *blocks.GNSQuery, block *blocks.GNSBlock) error {
63 return m.cache.Put(query, block) 79 return m.cache.Put(query, block)
64} 80}
diff --git a/src/gnunet/service/revocation/module.go b/src/gnunet/service/revocation/module.go
index 1f0ab48..eade16b 100644
--- a/src/gnunet/service/revocation/module.go
+++ b/src/gnunet/service/revocation/module.go
@@ -98,8 +98,22 @@ func (m *Module) event(ctx context.Context, ev *core.Event) {
98 98
99//---------------------------------------------------------------------- 99//----------------------------------------------------------------------
100 100
101// Export functions
102func (m *Module) Export(fcn map[string]any) {
103 // add exported functions from module
104 fcn["rev:query"] = m.Query
105 fcn["rev:revoke"] = m.Revoke
106}
107
108// Import functions
109func (m *Module) Import(fcm map[string]any) {
110 // nothing to import now.
111}
112
113//----------------------------------------------------------------------
114
101// Query return true if the pkey is valid (not revoked) and false 115// Query return true if the pkey is valid (not revoked) and false
102// if the pkey has been revoked. 116// if the pkey has been revoked ["rev:query"]
103func (m *Module) Query(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, err error) { 117func (m *Module) Query(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, err error) {
104 // fast check first: is the key in the bloomfilter? 118 // fast check first: is the key in the bloomfilter?
105 data := zkey.Bytes() 119 data := zkey.Bytes()
@@ -118,7 +132,7 @@ func (m *Module) Query(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, e
118 return false, nil 132 return false, nil
119} 133}
120 134
121// Revoke a key with given revocation data 135// Revoke a key with given revocation data ["rev:revoke"]
122func (m *Module) Revoke(ctx context.Context, rd *RevData) (success bool, err error) { 136func (m *Module) Revoke(ctx context.Context, rd *RevData) (success bool, err error) {
123 // verify the revocation data 137 // verify the revocation data
124 diff, rc := rd.Verify(true) 138 diff, rc := rd.Verify(true)
diff --git a/src/gnunet/transport/endpoint.go b/src/gnunet/transport/endpoint.go
index b54ee4e..5dabfa2 100644
--- a/src/gnunet/transport/endpoint.go
+++ b/src/gnunet/transport/endpoint.go
@@ -30,6 +30,10 @@ import (
30var ( 30var (
31 ErrEndpNotAvailable = errors.New("no endpoint for address available") 31 ErrEndpNotAvailable = errors.New("no endpoint for address available")
32 ErrEndpProtocolMismatch = errors.New("transport protocol mismatch") 32 ErrEndpProtocolMismatch = errors.New("transport protocol mismatch")
33 ErrEndpProtocolUnknown = errors.New("unknown transport protocol")
34 ErrEndpExists = errors.New("endpoint exists")
35 ErrEndpNoAddress = errors.New("no address for endpoint")
36 ErrEndpNoConnection = errors.New("no connection on endpoint")
33) 37)
34 38
35// Endpoint represents a local endpoint that can send and receive messages. 39// Endpoint represents a local endpoint that can send and receive messages.
@@ -83,9 +87,12 @@ type PaketEndpoint struct {
83func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) (err error) { 87func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) (err error) {
84 // create listener 88 // create listener
85 var lc net.ListenConfig 89 var lc net.ListenConfig
86 if ep.conn, err = lc.ListenPacket(ctx, ep.addr.Network(), ep.addr.String()); err != nil { 90 xproto := ep.addr.Network()
91 if ep.conn, err = lc.ListenPacket(ctx, EpProtocol(xproto), ep.addr.String()); err != nil {
87 return 92 return
88 } 93 }
94 // use the actual listening address
95 ep.addr = util.NewAddress(xproto, ep.conn.LocalAddr().String())
89 96
90 // run watch dog for termination 97 // run watch dog for termination
91 go func() { 98 go func() {
@@ -95,27 +102,15 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) (
95 // run go routine to handle messages from clients 102 // run go routine to handle messages from clients
96 go func() { 103 go func() {
97 for { 104 for {
98 // read next message from packet 105 // read next message
99 n, _, err := ep.conn.ReadFrom(ep.buf) 106 tm, err := ep.read()
100 if err != nil { 107 if err != nil {
101 break 108 break
102 } 109 }
103 rdr := bytes.NewBuffer(util.Clone(ep.buf[:n])) 110 // send transport message to handler
104 msg, err := ReadMessageDirect(rdr, ep.buf) 111 go func() {
105 if err != nil { 112 hdlr <- tm
106 break 113 }()
107 }
108 // check for transport message
109 if msg.Header().MsgType == message.DUMMY {
110 // set transient attributes
111 tm := msg.(*TransportMessage)
112 tm.endp = ep.id
113 tm.conn = 0
114 // send to handler
115 go func() {
116 hdlr <- tm
117 }()
118 }
119 } 114 }
120 // connection ended. 115 // connection ended.
121 ep.conn.Close() 116 ep.conn.Close()
@@ -123,29 +118,73 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) (
123 return 118 return
124} 119}
125 120
121// Read a transport message from endpoint based on extended protocol
122func (ep *PaketEndpoint) read() (tm *TransportMessage, err error) {
123 // read next packet (assuming that it contains one complete message)
124 var n int
125 if n, _, err = ep.conn.ReadFrom(ep.buf); err != nil {
126 return
127 }
128 // parse transport message based on extended protocol
129 var (
130 peer *util.PeerID
131 msg message.Message
132 )
133 switch ep.addr.Network() {
134 case "ip+udp":
135 // parse peer id and message in sequence
136 peer = util.NewPeerID(ep.buf[:32])
137 rdr := bytes.NewBuffer(util.Clone(ep.buf[32:n]))
138 if msg, err = ReadMessageDirect(rdr, ep.buf); err != nil {
139 return
140 }
141 default:
142 panic(ErrEndpProtocolUnknown)
143 }
144 // return transport message
145 return &TransportMessage{
146 Peer: peer,
147 Msg: msg,
148 }, nil
149}
150
126// Send message to address from endpoint 151// Send message to address from endpoint
127func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) (err error) { 152func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) (err error) {
153 // check for valid connection
154 if ep.conn == nil {
155 return ErrEndpNoConnection
156 }
157 // resolve target address
128 var a *net.UDPAddr 158 var a *net.UDPAddr
129 a, err = net.ResolveUDPAddr(addr.Network(), addr.String()) 159 a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()), addr.String())
160
161 // get message content (TransportMessage)
130 var buf []byte 162 var buf []byte
131 if buf, err = msg.Bytes(); err != nil { 163 if buf, err = msg.Bytes(); err != nil {
132 return 164 return
133 } 165 }
166 // handle extended protocol:
167 switch ep.addr.Network() {
168 case "ip+udp":
169 // no modifications required
170
171 default:
172 // unknown protocol
173 return ErrEndpProtocolUnknown
174 }
134 _, err = ep.conn.WriteTo(buf, a) 175 _, err = ep.conn.WriteTo(buf, a)
135 return 176 return
136} 177}
137 178
138// Address returms the 179// Address returms the
139func (ep *PaketEndpoint) Address() net.Addr { 180func (ep *PaketEndpoint) Address() net.Addr {
140 if ep.conn != nil {
141 return ep.conn.LocalAddr()
142 }
143 return ep.addr 181 return ep.addr
144} 182}
145 183
146// CanSendTo returns true if the endpoint can sent to address 184// CanSendTo returns true if the endpoint can sent to address
147func (ep *PaketEndpoint) CanSendTo(addr net.Addr) bool { 185func (ep *PaketEndpoint) CanSendTo(addr net.Addr) (ok bool) {
148 return epMode(addr.Network()) == "packet" 186 ok = EpProtocol(addr.Network()) == EpProtocol(ep.addr.Network())
187 return
149} 188}
150 189
151// ID returns the endpoint identifier 190// ID returns the endpoint identifier
@@ -153,6 +192,7 @@ func (ep *PaketEndpoint) ID() int {
153 return ep.id 192 return ep.id
154} 193}
155 194
195// create a new packet endpoint for protcol and address
156func newPacketEndpoint(addr net.Addr) (ep *PaketEndpoint, err error) { 196func newPacketEndpoint(addr net.Addr) (ep *PaketEndpoint, err error) {
157 // check for matching protocol 197 // check for matching protocol
158 if epMode(addr.Network()) != "packet" { 198 if epMode(addr.Network()) != "packet" {
@@ -185,9 +225,13 @@ type StreamEndpoint struct {
185func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) (err error) { 225func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) (err error) {
186 // create listener 226 // create listener
187 var lc net.ListenConfig 227 var lc net.ListenConfig
188 if ep.listener, err = lc.Listen(ctx, ep.addr.Network(), ep.addr.String()); err != nil { 228 xproto := ep.addr.Network()
229 if ep.listener, err = lc.Listen(ctx, EpProtocol(xproto), ep.addr.String()); err != nil {
189 return 230 return
190 } 231 }
232 // get actual listening address
233 ep.addr = util.NewAddress(xproto, ep.listener.Addr().String())
234
191 // run watch dog for termination 235 // run watch dog for termination
192 go func() { 236 go func() {
193 <-ctx.Done() 237 <-ctx.Done()
@@ -206,21 +250,14 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage)
206 go func() { 250 go func() {
207 for { 251 for {
208 // read next message from connection 252 // read next message from connection
209 msg, err := ReadMessage(ctx, conn, ep.buf) 253 tm, err := ep.read(ctx, conn)
210 if err != nil { 254 if err != nil {
211 break 255 break
212 } 256 }
213 // check for transport message 257 // send transport message to handler
214 if msg.Header().MsgType == message.DUMMY { 258 go func() {
215 // set transient attributes 259 hdlr <- tm
216 tm := msg.(*TransportMessage) 260 }()
217 tm.endp = ep.id
218 tm.conn = session
219 // send to handler
220 go func() {
221 hdlr <- tm
222 }()
223 }
224 } 261 }
225 // connection ended. 262 // connection ended.
226 conn.Close() 263 conn.Close()
@@ -231,6 +268,34 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage)
231 return 268 return
232} 269}
233 270
271// Read a transport message from endpoint based on extended protocol
272func (ep *StreamEndpoint) read(ctx context.Context, conn net.Conn) (tm *TransportMessage, err error) {
273 // parse transport message based on extended protocol
274 var (
275 peer *util.PeerID
276 msg message.Message
277 )
278 switch ep.addr.Network() {
279 case "ip+udp":
280 // parse peer id
281 peer = util.NewPeerID(nil)
282 if _, err = conn.Read(peer.Key); err != nil {
283 return
284 }
285 // read next message from connection
286 if msg, err = ReadMessage(ctx, conn, ep.buf); err != nil {
287 break
288 }
289 default:
290 panic(ErrEndpProtocolUnknown)
291 }
292 // return transport message
293 return &TransportMessage{
294 Peer: peer,
295 Msg: msg,
296 }, nil
297}
298
234// Send message to address from endpoint 299// Send message to address from endpoint
235func (ep *StreamEndpoint) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) error { 300func (ep *StreamEndpoint) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) error {
236 return nil 301 return nil
@@ -238,9 +303,6 @@ func (ep *StreamEndpoint) Send(ctx context.Context, addr net.Addr, msg *Transpor
238 303
239// Address returns the actual listening endpoint address 304// Address returns the actual listening endpoint address
240func (ep *StreamEndpoint) Address() net.Addr { 305func (ep *StreamEndpoint) Address() net.Addr {
241 if ep.listener != nil {
242 return ep.listener.Addr()
243 }
244 return ep.addr 306 return ep.addr
245} 307}
246 308
@@ -254,6 +316,7 @@ func (ep *StreamEndpoint) ID() int {
254 return ep.id 316 return ep.id
255} 317}
256 318
319// create a new endpoint based on extended protocol and address
257func newStreamEndpoint(addr net.Addr) (ep *StreamEndpoint, err error) { 320func newStreamEndpoint(addr net.Addr) (ep *StreamEndpoint, err error) {
258 // check for matching protocol 321 // check for matching protocol
259 if epMode(addr.Network()) != "stream" { 322 if epMode(addr.Network()) != "stream" {
@@ -270,10 +333,29 @@ func newStreamEndpoint(addr net.Addr) (ep *StreamEndpoint, err error) {
270 return 333 return
271} 334}
272 335
336//----------------------------------------------------------------------
337// derive endpoint mode (packet/stream) and transport protocol from
338// net.Adddr.Network() strings
339//----------------------------------------------------------------------
340
341// EpProtocol returns the transport protocol for a given network string
342// that can include extended protocol information like "r5n+ip+udp"
343func EpProtocol(netw string) string {
344 switch netw {
345 case "udp", "udp4", "udp6", "ip+udp":
346 return "udp"
347 case "tcp", "tcp4", "tcp6":
348 return "tcp"
349 case "unix":
350 return "unix"
351 }
352 return ""
353}
354
273// epMode returns the endpoint mode (packet or stream) for a given network 355// epMode returns the endpoint mode (packet or stream) for a given network
274func epMode(netw string) string { 356func epMode(netw string) string {
275 switch netw { 357 switch EpProtocol(netw) {
276 case "udp", "udp4", "udp6", "r5n+ip+udp": 358 case "udp":
277 return "packet" 359 return "packet"
278 case "tcp", "unix": 360 case "tcp", "unix":
279 return "stream" 361 return "stream"
diff --git a/src/gnunet/transport/reader_writer.go b/src/gnunet/transport/reader_writer.go
index 2e5f14a..db3527e 100644
--- a/src/gnunet/transport/reader_writer.go
+++ b/src/gnunet/transport/reader_writer.go
@@ -113,10 +113,7 @@ func ReadMessage(ctx context.Context, rdr io.ReadCloser, buf []byte) (msg messag
113 if err = get(4, int(mh.MsgSize)-4); err != nil { 113 if err = get(4, int(mh.MsgSize)-4); err != nil {
114 return nil, err 114 return nil, err
115 } 115 }
116 // handle transport message case 116 if msg, err = message.NewEmptyMessage(mh.MsgType); err != nil {
117 if mh.MsgType == message.DUMMY {
118 msg = NewTransportMessage(nil, nil)
119 } else if msg, err = message.NewEmptyMessage(mh.MsgType); err != nil {
120 return nil, err 117 return nil, err
121 } 118 }
122 if msg == nil { 119 if msg == nil {
diff --git a/src/gnunet/transport/transport.go b/src/gnunet/transport/transport.go
index 14def98..d1e8249 100644
--- a/src/gnunet/transport/transport.go
+++ b/src/gnunet/transport/transport.go
@@ -25,6 +25,8 @@ import (
25 "gnunet/message" 25 "gnunet/message"
26 "gnunet/util" 26 "gnunet/util"
27 "net" 27 "net"
28
29 "github.com/bfix/gospel/network"
28) 30)
29 31
30// Trnsport layer error codes 32// Trnsport layer error codes
@@ -41,32 +43,19 @@ var (
41// Msg is the exchanged GNUnet message. The packet itself satisfies the 43// Msg is the exchanged GNUnet message. The packet itself satisfies the
42// message.Message interface. 44// message.Message interface.
43type TransportMessage struct { 45type TransportMessage struct {
44 Hdr *message.Header `` // message header 46 Peer *util.PeerID // remote peer
45 Peer *util.PeerID `` // remote peer 47 Msg message.Message // GNUnet message
46 Payload []byte `size:"*"` // GNUnet message
47
48 // package-local attributes (transient)
49 msg message.Message
50 endp int // id of endpoint (incoming message)
51 conn int // id of connection (optional, incoming message)
52}
53
54func (msg *TransportMessage) Header() *message.Header {
55 return msg.Hdr
56}
57
58func (msg *TransportMessage) Message() (m message.Message, err error) {
59 if m = msg.msg; m == nil {
60 rdr := bytes.NewBuffer(msg.Payload)
61 m, err = ReadMessageDirect(rdr, nil)
62 }
63 return
64} 48}
65 49
66// Bytes returns the binary representation of a transport message 50// Bytes returns the binary representation of a transport message
67func (msg *TransportMessage) Bytes() ([]byte, error) { 51func (msg *TransportMessage) Bytes() ([]byte, error) {
68 buf := new(bytes.Buffer) 52 buf := new(bytes.Buffer)
69 err := WriteMessageDirect(buf, msg) 53 // serialize peer id
54 if _, err := buf.Write(msg.Peer.Key); err != nil {
55 return nil, err
56 }
57 // serialize message
58 err := WriteMessageDirect(buf, msg.Msg)
70 return buf.Bytes(), err 59 return buf.Bytes(), err
71} 60}
72 61
@@ -76,21 +65,13 @@ func (msg *TransportMessage) String() string {
76} 65}
77 66
78// NewTransportMessage creates a message suitable for transfer 67// NewTransportMessage creates a message suitable for transfer
79func NewTransportMessage(peer *util.PeerID, payload []byte) (tm *TransportMessage) { 68func NewTransportMessage(peer *util.PeerID, msg message.Message) (tm *TransportMessage) {
80 if peer == nil { 69 if peer == nil {
81 peer = util.NewPeerID(nil) 70 peer = util.NewPeerID(nil)
82 } 71 }
83 msize := 0
84 if payload != nil {
85 msize = len(payload)
86 }
87 tm = &TransportMessage{ 72 tm = &TransportMessage{
88 Hdr: &message.Header{ 73 Peer: peer,
89 MsgSize: uint16(36 + msize), 74 Msg: msg,
90 MsgType: message.DUMMY,
91 },
92 Peer: peer,
93 Payload: payload,
94 } 75 }
95 return 76 return
96} 77}
@@ -100,28 +81,41 @@ func NewTransportMessage(peer *util.PeerID, payload []byte) (tm *TransportMessag
100// Transport enables network-oriented (like IP, UDP, TCP or UDS) 81// Transport enables network-oriented (like IP, UDP, TCP or UDS)
101// message exchange on multiple endpoints. 82// message exchange on multiple endpoints.
102type Transport struct { 83type Transport struct {
103 incoming chan *TransportMessage // messages as received from the network 84 incoming chan *TransportMessage // messages as received from the network
104 endpoints map[int]Endpoint // list of available endpoints 85 endpoints *util.Map[int, Endpoint] // list of available endpoints
86 upnp *network.PortMapper // UPnP mapper (optional)
105} 87}
106 88
107// NewTransport creates and runs a new transport layer implementation. 89// NewTransport creates and runs a new transport layer implementation.
108func NewTransport(ctx context.Context, ch chan *TransportMessage) (t *Transport) { 90func NewTransport(ctx context.Context, tag string, ch chan *TransportMessage) (t *Transport) {
109 // create transport instance 91 // create transport instance
92 mngr, err := network.NewPortMapper(tag)
93 if err != nil {
94 mngr = nil
95 }
110 return &Transport{ 96 return &Transport{
111 incoming: ch, 97 incoming: ch,
112 endpoints: make(map[int]Endpoint), 98 endpoints: util.NewMap[int, Endpoint](),
99 upnp: mngr,
100 }
101}
102
103// Shutdown transport-related processes
104func (t *Transport) Shutdown() {
105 if t.upnp != nil {
106 t.upnp.Close()
113 } 107 }
114} 108}
115 109
116// Send a message over suitable endpoint 110// Send a message over suitable endpoint
117func (t *Transport) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) (err error) { 111func (t *Transport) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) (err error) {
118 for _, ep := range t.endpoints { 112 // use the first endpoint able to handle address
113 return t.endpoints.ProcessRange(func(_ int, ep Endpoint) error {
119 if ep.CanSendTo(addr) { 114 if ep.CanSendTo(addr) {
120 err = ep.Send(ctx, addr, msg) 115 return ep.Send(ctx, addr, msg)
121 break
122 } 116 }
123 } 117 return nil
124 return 118 }, true)
125} 119}
126 120
127//---------------------------------------------------------------------- 121//----------------------------------------------------------------------
@@ -130,22 +124,45 @@ func (t *Transport) Send(ctx context.Context, addr net.Addr, msg *TransportMessa
130 124
131// AddEndpoint instantiates and run a new endpoint handler for the 125// AddEndpoint instantiates and run a new endpoint handler for the
132// given address (must map to a network interface). 126// given address (must map to a network interface).
133func (t *Transport) AddEndpoint(ctx context.Context, addr net.Addr) (a net.Addr, err error) { 127func (t *Transport) AddEndpoint(ctx context.Context, addr *util.Address) (ep Endpoint, err error) {
134 // register endpoint 128 // check for valid address
135 var ep Endpoint 129 if addr == nil {
130 err = ErrEndpNoAddress
131 return
132 }
133 // check if endpoint is already available
134 as := addr.Network() + "://" + addr.String()
135 if err = t.endpoints.ProcessRange(func(_ int, ep Endpoint) error {
136 ae := ep.Address().Network() + "://" + ep.Address().String()
137 if as == ae {
138 return ErrEndpExists
139 }
140 return nil
141 }, true); err != nil {
142 return
143 }
144 // register new endpoint
136 if ep, err = NewEndpoint(addr); err != nil { 145 if ep, err = NewEndpoint(addr); err != nil {
137 return 146 return
138 } 147 }
139 t.endpoints[ep.ID()] = ep 148 // add endpoint to list and run it
149 t.endpoints.Put(ep.ID(), ep)
140 ep.Run(ctx, t.incoming) 150 ep.Run(ctx, t.incoming)
141 return ep.Address(), nil 151 return
142} 152}
143 153
144// Endpoints returns a list of listening addresses managed by transport. 154//----------------------------------------------------------------------
145func (t *Transport) Endpoints() (list []net.Addr) { 155// UPnP handling
146 list = make([]net.Addr, 0) 156//----------------------------------------------------------------------
147 for _, ep := range t.endpoints { 157
148 list = append(list, ep.Address()) 158// ForwardOpen returns a local address for listening that will receive traffic
149 } 159// from a port forward handled by UPnP on the router.
150 return 160func (t *Transport) ForwardOpen(protocol, param string, port int) (id, local, remote string, err error) {
161 // no parameters currently defined, so just do the assignment.
162 return t.upnp.Assign(protocol, port)
163}
164
165// ForwardClose closes a specific port forwarding
166func (t *Transport) ForwardClose(id string) error {
167 return t.upnp.Unassign(id)
151} 168}
diff --git a/src/gnunet/util/address.go b/src/gnunet/util/address.go
index 106e671..a56e95f 100644
--- a/src/gnunet/util/address.go
+++ b/src/gnunet/util/address.go
@@ -34,11 +34,11 @@ type Address struct {
34} 34}
35 35
36// NewAddress returns a new Address for the given transport and specs 36// NewAddress returns a new Address for the given transport and specs
37func NewAddress(transport string, addr []byte) *Address { 37func NewAddress(transport string, addr string) *Address {
38 return &Address{ 38 return &Address{
39 Netw: transport, 39 Netw: transport,
40 Options: 0, 40 Options: 0,
41 Address: Clone(addr), 41 Address: Clone([]byte(addr)),
42 Expires: AbsoluteTimeNever(), 42 Expires: AbsoluteTimeNever(),
43 } 43 }
44} 44}
@@ -61,7 +61,7 @@ func ParseAddress(s string) (addr *Address, err error) {
61 err = fmt.Errorf("invalid address format: '%s'", s) 61 err = fmt.Errorf("invalid address format: '%s'", s)
62 return 62 return
63 } 63 }
64 addr = NewAddress(p[0], []byte(strings.Trim(p[1], "/"))) 64 addr = NewAddress(p[0], strings.Trim(p[1], "/"))
65 return 65 return
66} 66}
67 67
@@ -91,8 +91,11 @@ func (a *Address) Network() string {
91 91
92//---------------------------------------------------------------------- 92//----------------------------------------------------------------------
93 93
94// AddressString returns a string representaion of an address. 94// URI returns a string representaion of an address.
95func AddressString(network string, addr []byte) string { 95func (a *Address) URI() string {
96 return URI(a.Netw, a.Address)
97}
98func URI(network string, addr []byte) string {
96 return network + "://" + string(addr) 99 return network + "://" + string(addr)
97} 100}
98 101
@@ -151,13 +154,13 @@ func (a *PeerAddrList) Add(id string, addr *Address) (mode int) {
151 list = append(list, addr) 154 list = append(list, addr)
152 a.list.Put(id, list) 155 a.list.Put(id, list)
153 return nil 156 return nil
154 }) 157 }, false)
155 } 158 }
156 return 159 return
157} 160}
158 161
159// Get address for peer 162// Get address for peer
160func (a *PeerAddrList) Get(id string, transport string) *Address { 163func (a *PeerAddrList) Get(id string, transport string) (res []*Address) {
161 list, ok := a.list.Get(id) 164 list, ok := a.list.Get(id)
162 if ok { 165 if ok {
163 for _, addr := range list { 166 for _, addr := range list {
@@ -171,10 +174,10 @@ func (a *PeerAddrList) Get(id string, transport string) *Address {
171 // skip other transports 174 // skip other transports
172 continue 175 continue
173 } 176 }
174 return addr 177 res = append(res, addr)
175 } 178 }
176 } 179 }
177 return nil 180 return
178} 181}
179 182
180// Delete a list entry by key. 183// Delete a list entry by key.
diff --git a/src/gnunet/util/database.go b/src/gnunet/util/database.go
index a1198fd..f1d1e5f 100644
--- a/src/gnunet/util/database.go
+++ b/src/gnunet/util/database.go
@@ -121,7 +121,7 @@ func (p *dbPool) remove(key string) error {
121 p.insts.Delete(key) 121 p.insts.Delete(key)
122 } 122 }
123 return 123 return
124 }) 124 }, false)
125} 125}
126 126
127// Connect to a SQL database (various types and flavors): 127// Connect to a SQL database (various types and flavors):
@@ -180,6 +180,6 @@ func (p *dbPool) Connect(spec string) (db *DbConn, err error) {
180 db = new(DbConn) 180 db = new(DbConn)
181 db.conn, err = inst.db.Conn(p.ctx) 181 db.conn, err = inst.db.Conn(p.ctx)
182 return err 182 return err
183 }) 183 }, false)
184 return 184 return
185} 185}
diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go
index 7240757..4443737 100644
--- a/src/gnunet/util/misc.go
+++ b/src/gnunet/util/misc.go
@@ -70,43 +70,87 @@ func NewMap[K comparable, V any]() *Map[K, V] {
70 } 70 }
71} 71}
72 72
73//----------------------------------------------------------------------
74
73// Process a function in the locked map context. Calls 75// Process a function in the locked map context. Calls
74// to other map functions in 'f' will use additional locks. 76// to other map functions in 'f' will skip their locks.
75func (m *Map[K, V]) Process(f func() error) error { 77func (m *Map[K, V]) Process(f func() error, readonly bool) error {
76 m.mtx.Lock() 78 // handle locking
77 defer m.mtx.Unlock() 79 m.lock(readonly)
80 m.inProcess = true
81 defer func() {
82 m.inProcess = false
83 m.unlock(readonly)
84 }()
85 // function call in unlocked environment
86 return f()
87}
88
89// Process a ranged function in the locked map context. Calls
90// to other map functions in 'f' will skip their locks.
91func (m *Map[K, V]) ProcessRange(f func(key K, value V) error, readonly bool) error {
92 // handle locking
93 m.lock(readonly)
78 m.inProcess = true 94 m.inProcess = true
79 err := f() 95 defer func() {
80 m.inProcess = false 96 m.inProcess = false
81 return err 97 m.unlock(readonly)
98 }()
99 // range over map and call function.
100 for key, value := range m.list {
101 if err := f(key, value); err != nil {
102 return err
103 }
104 }
105 return nil
82} 106}
83 107
108//----------------------------------------------------------------------
109
84// Put value into map under given key. 110// Put value into map under given key.
85func (m *Map[K, V]) Put(key K, value V) { 111func (m *Map[K, V]) Put(key K, value V) {
86 if !m.inProcess { 112 m.lock(false)
87 m.mtx.Lock() 113 defer m.unlock(false)
88 defer m.mtx.Unlock()
89 }
90 m.list[key] = value 114 m.list[key] = value
91} 115}
92 116
93// Get value with iven key from map. 117// Get value with iven key from map.
94func (m *Map[K, V]) Get(key K) (value V, ok bool) { 118func (m *Map[K, V]) Get(key K) (value V, ok bool) {
95 if !m.inProcess { 119 m.lock(true)
96 m.mtx.RLock() 120 defer m.unlock(true)
97 defer m.mtx.RUnlock()
98 }
99 value, ok = m.list[key] 121 value, ok = m.list[key]
100 return 122 return
101} 123}
102 124
103// Delete key/value pair from map. 125// Delete key/value pair from map.
104func (m *Map[K, V]) Delete(key K) { 126func (m *Map[K, V]) Delete(key K) {
127 m.lock(false)
128 defer m.unlock(false)
129 delete(m.list, key)
130}
131
132//----------------------------------------------------------------------
133
134// lock with given mode (if not in processing function)
135func (m *Map[K, V]) lock(readonly bool) {
105 if !m.inProcess { 136 if !m.inProcess {
106 m.mtx.Lock() 137 if readonly {
107 defer m.mtx.Unlock() 138 m.mtx.RLock()
139 } else {
140 m.mtx.Lock()
141 }
142 }
143}
144
145// lock with given mode (if not in processing function)
146func (m *Map[K, V]) unlock(readonly bool) {
147 if !m.inProcess {
148 if readonly {
149 m.mtx.RUnlock()
150 } else {
151 m.mtx.Unlock()
152 }
108 } 153 }
109 delete(m.list, key)
110} 154}
111 155
112//---------------------------------------------------------------------- 156//----------------------------------------------------------------------
diff --git a/src/gnunet/util/peer_id.go b/src/gnunet/util/peer_id.go
index a8202a1..a74958d 100644
--- a/src/gnunet/util/peer_id.go
+++ b/src/gnunet/util/peer_id.go
@@ -25,23 +25,19 @@ type PeerID struct {
25 Key []byte `size:"32"` 25 Key []byte `size:"32"`
26} 26}
27 27
28// NewPeerID creates a new object from the data. 28// NewPeerID creates a new peer id from data.
29func NewPeerID(data []byte) *PeerID { 29func NewPeerID(data []byte) (p *PeerID) {
30 if data == nil { 30 p = &PeerID{
31 data = make([]byte, 32) 31 Key: make([]byte, 32),
32 } else {
33 size := len(data)
34 if size > 32 {
35 data = data[:32]
36 } else if size < 32 {
37 buf := make([]byte, 32)
38 CopyAlignedBlock(buf, data)
39 data = buf
40 }
41 } 32 }
42 return &PeerID{ 33 if data != nil {
43 Key: data, 34 if len(data) < 32 {
35 CopyAlignedBlock(p.Key, data)
36 } else {
37 copy(p.Key, data[:32])
38 }
44 } 39 }
40 return
45} 41}
46 42
47// Equals returns true if two peer IDs match. 43// Equals returns true if two peer IDs match.