diff options
author | Bernd Fix <brf@hoi-polloi.org> | 2022-06-07 09:31:22 +0200 |
---|---|---|
committer | Bernd Fix <brf@hoi-polloi.org> | 2022-06-07 09:31:22 +0200 |
commit | 7a6ae8f61ee7efde161db98462259ea9bbb23386 (patch) | |
tree | 990de3aab9cec74f74185883a5ca3d2941b18dff /src | |
parent | de577428a69a0002c3194afdf0562cf5a4dc1bdc (diff) | |
download | gnunet-go-7a6ae8f61ee7efde161db98462259ea9bbb23386.tar.gz gnunet-go-7a6ae8f61ee7efde161db98462259ea9bbb23386.zip |
Improved transport and module code.v0.1.24
Diffstat (limited to 'src')
28 files changed, 907 insertions, 465 deletions
diff --git a/src/gnunet/cmd/gnunet-service-dht-test-go/main.go b/src/gnunet/cmd/gnunet-service-dht-go/main.go index cd2bd1a..3b525fd 100644 --- a/src/gnunet/cmd/gnunet-service-dht-test-go/main.go +++ b/src/gnunet/cmd/gnunet-service-dht-go/main.go | |||
@@ -83,16 +83,12 @@ func main() { | |||
83 | 83 | ||
84 | // instantiate core service | 84 | // instantiate core service |
85 | ctx, cancel := context.WithCancel(context.Background()) | 85 | ctx, cancel := context.WithCancel(context.Background()) |
86 | var local *core.Peer | ||
87 | if local, err = core.NewLocalPeer(config.Cfg.Local); err != nil { | ||
88 | logger.Printf(logger.ERROR, "[dht] No local peer: %s\n", err.Error()) | ||
89 | return | ||
90 | } | ||
91 | var c *core.Core | 86 | var c *core.Core |
92 | if c, err = core.NewCore(ctx, local); err != nil { | 87 | if c, err = core.NewCore(ctx, config.Cfg.Local); err != nil { |
93 | logger.Printf(logger.ERROR, "[dht] core failed: %s\n", err.Error()) | 88 | logger.Printf(logger.ERROR, "[dht] core failed: %s\n", err.Error()) |
94 | return | 89 | return |
95 | } | 90 | } |
91 | defer c.Shutdown() | ||
96 | 92 | ||
97 | // start a new DHT service | 93 | // start a new DHT service |
98 | dht := dht.NewService(ctx, c) | 94 | dht := dht.NewService(ctx, c) |
diff --git a/src/gnunet/cmd/peer_mockup/main.go b/src/gnunet/cmd/peer_mockup/main.go index 58f4baf..96f62da 100644 --- a/src/gnunet/cmd/peer_mockup/main.go +++ b/src/gnunet/cmd/peer_mockup/main.go | |||
@@ -22,13 +22,19 @@ var ( | |||
22 | // configuration for local node | 22 | // configuration for local node |
23 | localCfg = &config.NodeConfig{ | 23 | localCfg = &config.NodeConfig{ |
24 | PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", | 24 | PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", |
25 | Endpoints: []string{ | 25 | Endpoints: []*config.EndpointConfig{ |
26 | "udp:127.0.0.1:2086", | 26 | { |
27 | ID: "local", | ||
28 | Network: "udp", | ||
29 | Address: "127.0.0.1", | ||
30 | Port: 2086, | ||
31 | TTL: 86400, | ||
32 | }, | ||
27 | }, | 33 | }, |
28 | } | 34 | } |
29 | // configuration for remote node | 35 | // configuration for remote node |
30 | remoteCfg = "3GXXMNb5YpIUO7ejIR2Yy0Cf5texuLfDjHkXcqbPxkc=" | 36 | remoteCfg = "3GXXMNb5YpIUO7ejIR2Yy0Cf5texuLfDjHkXcqbPxkc=" |
31 | remoteAddr = "udp:172.17.0.5:2086" | 37 | remoteAddr = "udp://172.17.0.5:2086" |
32 | 38 | ||
33 | // top-level variables used accross functions | 39 | // top-level variables used accross functions |
34 | local *core.Peer // local peer (with private key) | 40 | local *core.Peer // local peer (with private key) |
@@ -50,14 +56,11 @@ func main() { | |||
50 | flag.Parse() | 56 | flag.Parse() |
51 | 57 | ||
52 | // setup peer and core instances | 58 | // setup peer and core instances |
53 | if local, err = core.NewLocalPeer(localCfg); err != nil { | 59 | if c, err = core.NewCore(ctx, localCfg); err != nil { |
54 | fmt.Println("local failed: " + err.Error()) | ||
55 | return | ||
56 | } | ||
57 | if c, err = core.NewCore(ctx, local); err != nil { | ||
58 | fmt.Println("core failed: " + err.Error()) | 60 | fmt.Println("core failed: " + err.Error()) |
59 | return | 61 | return |
60 | } | 62 | } |
63 | local = c.Peer() | ||
61 | if remote, err = core.NewPeer(remoteCfg); err != nil { | 64 | if remote, err = core.NewPeer(remoteCfg); err != nil { |
62 | fmt.Println("remote failed: " + err.Error()) | 65 | fmt.Println("remote failed: " + err.Error()) |
63 | return | 66 | return |
diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go index 41a65f0..3687a3e 100644 --- a/src/gnunet/config/config.go +++ b/src/gnunet/config/config.go | |||
@@ -20,6 +20,7 @@ package config | |||
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "encoding/json" | 22 | "encoding/json" |
23 | "fmt" | ||
23 | "io/ioutil" | 24 | "io/ioutil" |
24 | "reflect" | 25 | "reflect" |
25 | "regexp" | 26 | "regexp" |
@@ -32,10 +33,26 @@ import ( | |||
32 | // Configuration for local node | 33 | // Configuration for local node |
33 | //---------------------------------------------------------------------- | 34 | //---------------------------------------------------------------------- |
34 | 35 | ||
36 | // EndpointConfig holds parameters for local network listeners. | ||
37 | type EndpointConfig struct { | ||
38 | ID string `json:"id"` // endpoint identifier | ||
39 | Network string `json:"network"` // network protocol to use on endpoint | ||
40 | Address string `json:"address"` // address to listen on | ||
41 | Port int `json:"port"` // port for listening to network | ||
42 | TTL int `json:"ttl"` // time-to-live for address (in seconds) | ||
43 | } | ||
44 | |||
45 | // Addr returns an address string for endpoint configuration; it does NOT | ||
46 | // handle special cases like UPNP and such. | ||
47 | func (c *EndpointConfig) Addr() string { | ||
48 | return fmt.Sprintf("%s://%s:%d", c.Network, c.Address, c.Port) | ||
49 | } | ||
50 | |||
35 | // NodeConfig holds parameters for the local node instance | 51 | // NodeConfig holds parameters for the local node instance |
36 | type NodeConfig struct { | 52 | type NodeConfig struct { |
37 | PrivateSeed string `json:"privateSeed"` // Node private key seed (base64) | 53 | Name string `json:"name"` // (short) name for local node |
38 | Endpoints []string `json:"endpoints"` // list of endpoints available | 54 | PrivateSeed string `json:"privateSeed"` // Node private key seed (base64) |
55 | Endpoints []*EndpointConfig `json:"endpoints"` // list of endpoints available | ||
39 | } | 56 | } |
40 | 57 | ||
41 | //---------------------------------------------------------------------- | 58 | //---------------------------------------------------------------------- |
@@ -139,9 +156,16 @@ func ParseConfig(fileName string) (err error) { | |||
139 | if err != nil { | 156 | if err != nil { |
140 | return | 157 | return |
141 | } | 158 | } |
159 | return ParseConfigBytes(file, true) | ||
160 | } | ||
161 | |||
162 | // ParseConfigBytes reads a configuration from binary data. The data is | ||
163 | // a JSON-encoded content. If 'subst' is true, the configuration strings | ||
164 | // are subsituted | ||
165 | func ParseConfigBytes(data []byte, subst bool) (err error) { | ||
142 | // unmarshal to Config data structure | 166 | // unmarshal to Config data structure |
143 | Cfg = new(Config) | 167 | Cfg = new(Config) |
144 | if err = json.Unmarshal(file, Cfg); err == nil { | 168 | if err = json.Unmarshal(data, Cfg); err == nil { |
145 | // process all string-based config settings and apply | 169 | // process all string-based config settings and apply |
146 | // string substitutions. | 170 | // string substitutions. |
147 | applySubstitutions(Cfg, Cfg.Env) | 171 | applySubstitutions(Cfg, Cfg.Env) |
diff --git a/src/gnunet/config/config_test.go b/src/gnunet/config/config_test.go index 3afd3a7..d40bc19 100644 --- a/src/gnunet/config/config_test.go +++ b/src/gnunet/config/config_test.go | |||
@@ -20,6 +20,7 @@ package config | |||
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "encoding/json" | 22 | "encoding/json" |
23 | "io/ioutil" | ||
23 | "testing" | 24 | "testing" |
24 | 25 | ||
25 | "github.com/bfix/gospel/logger" | 26 | "github.com/bfix/gospel/logger" |
@@ -27,14 +28,18 @@ import ( | |||
27 | 28 | ||
28 | func TestConfigRead(t *testing.T) { | 29 | func TestConfigRead(t *testing.T) { |
29 | logger.SetLogLevel(logger.WARN) | 30 | logger.SetLogLevel(logger.WARN) |
30 | if err := ParseConfig("./gnunet-config.json"); err != nil { | 31 | |
32 | // read configuration file | ||
33 | data, err := ioutil.ReadFile("./gnunet-config.json") | ||
34 | if err != nil { | ||
35 | t.Fatal(err) | ||
36 | } | ||
37 | // parse configuration | ||
38 | if err := ParseConfigBytes(data, false); err != nil { | ||
31 | t.Fatal(err) | 39 | t.Fatal(err) |
32 | } | 40 | } |
33 | if testing.Verbose() { | 41 | // write configuration |
34 | data, err := json.Marshal(Cfg) | 42 | if _, err = json.Marshal(Cfg); err != nil { |
35 | if err != nil { | 43 | t.Fatal(err) |
36 | t.Fatal(err) | ||
37 | } | ||
38 | t.Log("cfg=" + string(data)) | ||
39 | } | 44 | } |
40 | } | 45 | } |
diff --git a/src/gnunet/config/gnunet-config.json b/src/gnunet/config/gnunet-config.json index 941cf21..82606d7 100644 --- a/src/gnunet/config/gnunet-config.json +++ b/src/gnunet/config/gnunet-config.json | |||
@@ -1,58 +1,65 @@ | |||
1 | { | 1 | { |
2 | "local": { | 2 | "bootstrap": { |
3 | "privateSeed": "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", | 3 | "nodes": [ |
4 | "endpoints": [ | 4 | "gnunet://hello/7KTBJ90340HF1Q2GB0A57E2XJER4FDHX8HP5GHEB9125VPWPD27G/BNMDFN6HJCPWSPNBSEC06MC1K8QN1Z2DHRQSRXDTFR7FTBD4JHNBJ2RJAAEZ31FWG1Q3PMN3PXGZQ3Q7NTNEKQZFA7TE2Y46FM8E20R/1653499308?r5n%2Bip%2Budp%3A127.0.0.1%3A7654" |
5 | "r5n+ip+udp:127.0.0.1:6666" | 5 | ] |
6 | ] | 6 | }, |
7 | }, | 7 | "local": { |
8 | "bootstrap": { | 8 | "name": "ygng", |
9 | "nodes": [ | 9 | "privateSeed": "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", |
10 | "gnunet://hello/7KTBJ90340HF1Q2GB0A57E2XJER4FDHX8HP5GHEB9125VPWPD27G/BNMDFN6HJCPWSPNBSEC06MC1K8QN1Z2DHRQSRXDTFR7FTBD4JHNBJ2RJAAEZ31FWG1Q3PMN3PXGZQ3Q7NTNEKQZFA7TE2Y46FM8E20R/1653499308?r5n%2Bip%2Budp%3A127.0.0.1%3A7654" | 10 | "endpoints": [ |
11 | ] | 11 | { |
12 | }, | 12 | "id": "test", |
13 | "environ": { | 13 | "network": "ip+udp", |
14 | "TMP": "/tmp", | 14 | "address": "upnp:192.168.178.1", |
15 | "RT_SYS": "${TMP}/gnunet-system-runtime" | 15 | "port": 6666, |
16 | }, | 16 | "ttl": 86400 |
17 | "dht": { | 17 | } |
18 | "service": { | 18 | ] |
19 | "socket": "${RT_SYS}/gnunet-service-dht.sock", | 19 | }, |
20 | "params": { | 20 | "environ": { |
21 | "perm": "0770" | 21 | "TMP": "/tmp", |
22 | } | 22 | "RT_SYS": "${TMP}/gnunet-system-runtime" |
23 | }, | 23 | }, |
24 | "storage": "dht_file_store+/var/lib/gnunet/dht/store", | 24 | "dht": { |
25 | "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000" | 25 | "service": { |
26 | }, | 26 | "socket": "${RT_SYS}/gnunet-service-dht.sock", |
27 | "gns": { | 27 | "params": { |
28 | "service": { | 28 | "perm": "0770" |
29 | "socket": "${RT_SYS}/gnunet-service-gns-go.sock", | 29 | } |
30 | "params": { | 30 | }, |
31 | "perm": "0770" | 31 | "storage": "dht_file_store+/var/lib/gnunet/dht/store", |
32 | } | 32 | "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000" |
33 | }, | 33 | }, |
34 | "dhtReplLevel": 10, | 34 | "gns": { |
35 | "maxDepth": 250 | 35 | "service": { |
36 | }, | 36 | "socket": "${RT_SYS}/gnunet-service-gns-go.sock", |
37 | "namecache": { | 37 | "params": { |
38 | "service": { | 38 | "perm": "0770" |
39 | "socket": "${RT_SYS}/gnunet-service-namecache.sock", | 39 | } |
40 | "params": { | 40 | }, |
41 | "perm": "0770" | 41 | "dhtReplLevel": 10, |
42 | } | 42 | "maxDepth": 250 |
43 | }, | 43 | }, |
44 | "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000" | 44 | "namecache": { |
45 | }, | 45 | "service": { |
46 | "revocation": { | 46 | "socket": "${RT_SYS}/gnunet-service-namecache.sock", |
47 | "service": { | 47 | "params": { |
48 | "socket": "${RT_SYS}/gnunet-service-revocation-go.sock", | 48 | "perm": "0770" |
49 | "params": { | 49 | } |
50 | "perm": "0770" | 50 | }, |
51 | } | 51 | "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000" |
52 | }, | 52 | }, |
53 | "storage": "redis:localhost:6397::15" | 53 | "revocation": { |
54 | }, | 54 | "service": { |
55 | "rpc": { | 55 | "socket": "${RT_SYS}/gnunet-service-revocation-go.sock", |
56 | "endpoint": "tcp:127.0.0.1:80" | 56 | "params": { |
57 | } | 57 | "perm": "0770" |
58 | } | ||
59 | }, | ||
60 | "storage": "redis:localhost:6397::15" | ||
61 | }, | ||
62 | "rpc": { | ||
63 | "endpoint": "tcp:127.0.0.1:80" | ||
64 | } | ||
58 | } \ No newline at end of file | 65 | } \ No newline at end of file |
diff --git a/src/gnunet/core/core.go b/src/gnunet/core/core.go index c3bf355..7582891 100644 --- a/src/gnunet/core/core.go +++ b/src/gnunet/core/core.go | |||
@@ -20,16 +20,34 @@ package core | |||
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "context" | 22 | "context" |
23 | "errors" | ||
24 | "gnunet/config" | ||
23 | "gnunet/message" | 25 | "gnunet/message" |
24 | "gnunet/service/dht/blocks" | 26 | "gnunet/service/dht/blocks" |
25 | "gnunet/transport" | 27 | "gnunet/transport" |
26 | "gnunet/util" | 28 | "gnunet/util" |
27 | "net" | 29 | "net" |
30 | "strings" | ||
28 | "time" | 31 | "time" |
32 | ) | ||
29 | 33 | ||
30 | "github.com/bfix/gospel/data" | 34 | //---------------------------------------------------------------------- |
35 | // Core-related error codes | ||
36 | var ( | ||
37 | ErrCoreNoUpnpDyn = errors.New("no dynamic port with UPnP") | ||
38 | ErrCoreNoEndpAddr = errors.New("no endpoint for address") | ||
31 | ) | 39 | ) |
32 | 40 | ||
41 | //---------------------------------------------------------------------- | ||
42 | // EndpointRef is a reference to an endpoint instance managed by core. | ||
43 | type EndpointRef struct { | ||
44 | id string // endpoint identifier in configuration | ||
45 | ep transport.Endpoint // reference to endpoint | ||
46 | addr *util.Address // public endpoint address | ||
47 | upnpId string // UPNP identifier (empty if unused) | ||
48 | } | ||
49 | |||
50 | //---------------------------------------------------------------------- | ||
33 | // Core service | 51 | // Core service |
34 | type Core struct { | 52 | type Core struct { |
35 | // local peer instance | 53 | // local peer instance |
@@ -41,31 +59,91 @@ type Core struct { | |||
41 | // reference to transport implementation | 59 | // reference to transport implementation |
42 | trans *transport.Transport | 60 | trans *transport.Transport |
43 | 61 | ||
44 | // registered listeners | 62 | // registered signal listeners |
45 | listeners map[string]*Listener | 63 | listeners map[string]*Listener |
46 | 64 | ||
47 | // list of known peers with addresses | 65 | // list of known peers with addresses |
48 | peers *util.PeerAddrList | 66 | peers *util.PeerAddrList |
67 | |||
68 | // List of registered endpoints | ||
69 | endpoints map[string]*EndpointRef | ||
49 | } | 70 | } |
50 | 71 | ||
51 | //---------------------------------------------------------------------- | 72 | //---------------------------------------------------------------------- |
52 | 73 | ||
53 | // NewCore creates and runs a new core instance. | 74 | // NewCore creates and runs a new core instance. |
54 | func NewCore(ctx context.Context, local *Peer) (c *Core, err error) { | 75 | func NewCore(ctx context.Context, node *config.NodeConfig) (c *Core, err error) { |
76 | |||
77 | // instantiate peer | ||
78 | var peer *Peer | ||
79 | if peer, err = NewLocalPeer(node); err != nil { | ||
80 | return | ||
81 | } | ||
55 | // create new core instance | 82 | // create new core instance |
56 | incoming := make(chan *transport.TransportMessage) | 83 | incoming := make(chan *transport.TransportMessage) |
57 | c = &Core{ | 84 | c = &Core{ |
58 | local: local, | 85 | local: peer, |
59 | incoming: incoming, | 86 | incoming: incoming, |
60 | listeners: make(map[string]*Listener), | 87 | listeners: make(map[string]*Listener), |
61 | trans: transport.NewTransport(ctx, incoming), | 88 | trans: transport.NewTransport(ctx, node.Name, incoming), |
62 | peers: util.NewPeerAddrList(), | 89 | peers: util.NewPeerAddrList(), |
90 | endpoints: make(map[string]*EndpointRef), | ||
63 | } | 91 | } |
64 | // add all local peer endpoints to transport. | 92 | // add all local peer endpoints to transport. |
65 | for _, addr := range local.addrList { | 93 | for _, epCfg := range node.Endpoints { |
66 | if _, err = c.trans.AddEndpoint(ctx, addr); err != nil { | 94 | var ( |
95 | upnpId string // upnp identifier | ||
96 | local *util.Address // local address | ||
97 | remote *util.Address // remote address | ||
98 | ep transport.Endpoint // endpoint reference | ||
99 | ) | ||
100 | // handle special addresses: | ||
101 | if strings.HasPrefix(epCfg.Address, "upnp:") { | ||
102 | // don't allow dynamic port assignment | ||
103 | if epCfg.Port == 0 { | ||
104 | err = ErrCoreNoUpnpDyn | ||
105 | return | ||
106 | } | ||
107 | // handle UPNP port forwarding | ||
108 | protocol := transport.EpProtocol(epCfg.Network) | ||
109 | var localA, remoteA string | ||
110 | if upnpId, remoteA, localA, err = c.trans.ForwardOpen(protocol, epCfg.Address[5:], epCfg.Port); err != nil { | ||
111 | return | ||
112 | } | ||
113 | // parse local and remote addresses | ||
114 | if local, err = util.ParseAddress(epCfg.Network + "://" + localA); err != nil { | ||
115 | return | ||
116 | } | ||
117 | if remote, err = util.ParseAddress(epCfg.Network + "://" + remoteA); err != nil { | ||
118 | return | ||
119 | } | ||
120 | } else { | ||
121 | // direct address specification: | ||
122 | if local, err = util.ParseAddress(epCfg.Addr()); err != nil { | ||
123 | return | ||
124 | } | ||
125 | remote = local | ||
126 | upnpId = "" | ||
127 | } | ||
128 | // add endpoint for address | ||
129 | if ep, err = c.trans.AddEndpoint(ctx, local); err != nil { | ||
67 | return | 130 | return |
68 | } | 131 | } |
132 | // if port is set to 0, replace it with port assigned dynamically. | ||
133 | // only applies to direct listening addresses! | ||
134 | if epCfg.Port == 0 && local == remote { | ||
135 | addr := ep.Address() | ||
136 | if remote, err = util.ParseAddress(addr.Network() + "://" + addr.String()); err != nil { | ||
137 | return | ||
138 | } | ||
139 | } | ||
140 | // save endpoint reference | ||
141 | c.endpoints[epCfg.ID] = &EndpointRef{ | ||
142 | id: epCfg.ID, | ||
143 | ep: ep, | ||
144 | addr: remote, | ||
145 | upnpId: upnpId, | ||
146 | } | ||
69 | } | 147 | } |
70 | // run message pump | 148 | // run message pump |
71 | go func() { | 149 | go func() { |
@@ -77,32 +155,31 @@ func NewCore(ctx context.Context, local *Peer) (c *Core, err error) { | |||
77 | var ev *Event | 155 | var ev *Event |
78 | 156 | ||
79 | // inspect message for peer state events | 157 | // inspect message for peer state events |
80 | m, err := tm.Message() | 158 | switch msg := tm.Msg.(type) { |
81 | if err == nil { | 159 | case *message.HelloMsg: |
82 | switch msg := m.(type) { | 160 | // keep peer addresses |
83 | case *message.HelloMsg: | 161 | for _, addr := range msg.Addresses { |
84 | // keep peer addresses | 162 | a := &util.Address{ |
85 | for _, addr := range msg.Addresses { | 163 | Netw: addr.Transport, |
86 | a := &util.Address{ | 164 | Address: addr.Address, |
87 | Netw: addr.Transport, | 165 | Expires: addr.ExpireOn, |
88 | Address: addr.Address, | ||
89 | Expires: addr.ExpireOn, | ||
90 | } | ||
91 | c.Learn(ctx, msg.PeerID, a) | ||
92 | } | 166 | } |
93 | // generate EV_CONNECT event | 167 | c.Learn(ctx, msg.PeerID, a) |
94 | ev = new(Event) | 168 | } |
95 | ev.ID = EV_CONNECT | 169 | // generate EV_CONNECT event |
96 | ev.Peer = tm.Peer | 170 | ev = &Event{ |
97 | ev.Msg = msg | 171 | ID: EV_CONNECT, |
98 | c.dispatch(ev) | 172 | Peer: tm.Peer, |
173 | Msg: msg, | ||
99 | } | 174 | } |
175 | c.dispatch(ev) | ||
100 | } | 176 | } |
101 | // generate EV_MESSAGE event | 177 | // generate EV_MESSAGE event |
102 | ev = new(Event) | 178 | ev = &Event{ |
103 | ev.ID = EV_MESSAGE | 179 | ID: EV_MESSAGE, |
104 | ev.Peer = tm.Peer | 180 | Peer: tm.Peer, |
105 | ev.Msg, _ = tm.Message() | 181 | Msg: tm.Msg, |
182 | } | ||
106 | c.dispatch(ev) | 183 | c.dispatch(ev) |
107 | 184 | ||
108 | // wait for termination | 185 | // wait for termination |
@@ -114,29 +191,63 @@ func NewCore(ctx context.Context, local *Peer) (c *Core, err error) { | |||
114 | return | 191 | return |
115 | } | 192 | } |
116 | 193 | ||
194 | // Shutdown all core-related processes. | ||
195 | func (c *Core) Shutdown() { | ||
196 | c.trans.Shutdown() | ||
197 | c.local.Shutdown() | ||
198 | } | ||
199 | |||
117 | //---------------------------------------------------------------------- | 200 | //---------------------------------------------------------------------- |
118 | 201 | ||
119 | // Send is a function that allows the local peer to send a protocol | 202 | // Send is a function that allows the local peer to send a protocol |
120 | // message to a remote peer. | 203 | // message to a remote peer. |
121 | func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg message.Message) error { | 204 | func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg message.Message) error { |
122 | // TODO: select best endpoint protocol for transport; now fixed to UDP | 205 | // TODO: select best endpoint protocol for transport; now fixed to IP+UDP |
123 | netw := "udp" | 206 | netw := "ip+udp" |
124 | addr := c.peers.Get(peer.String(), netw) | 207 | addrs := c.peers.Get(peer.String(), netw) |
125 | payload, err := data.Marshal(msg) | 208 | if len(addrs) == 0 { |
126 | if err != nil { | 209 | return ErrCoreNoEndpAddr |
127 | return err | ||
128 | } | 210 | } |
129 | tm := transport.NewTransportMessage(c.PeerID(), payload) | 211 | // TODO: select best address; curently selects first |
212 | addr := addrs[0] | ||
213 | |||
214 | // select best endpoint for transport | ||
215 | var ep transport.Endpoint | ||
216 | for _, epCfg := range c.endpoints { | ||
217 | if epCfg.addr.Network() == netw { | ||
218 | if ep == nil { | ||
219 | ep = epCfg.ep | ||
220 | } | ||
221 | // TODO: compare endpoints, select better one: | ||
222 | // if ep.Better(epCfg.ep) { | ||
223 | // ep = epCfg.ep | ||
224 | // } | ||
225 | } | ||
226 | } | ||
227 | // check we have an endpoint to send on | ||
228 | if ep == nil { | ||
229 | return ErrCoreNoEndpAddr | ||
230 | } | ||
231 | // assemble transport message | ||
232 | tm := transport.NewTransportMessage(c.PeerID(), msg) | ||
233 | // send on transport | ||
130 | return c.trans.Send(ctx, addr, tm) | 234 | return c.trans.Send(ctx, addr, tm) |
131 | } | 235 | } |
132 | 236 | ||
133 | // Learn a (new) address for peer | 237 | // Learn a (new) address for peer |
134 | func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) (err error) { | 238 | func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) (err error) { |
135 | if c.peers.Add(peer.String(), addr) == 1 { | 239 | if c.peers.Add(peer.String(), addr) == 1 { |
240 | // we added a previously unknown peer: send a HELLO | ||
241 | |||
242 | // collect endpoint addresses | ||
243 | addrList := make([]*util.Address, 0) | ||
244 | for _, epRef := range c.endpoints { | ||
245 | addrList = append(addrList, epRef.addr) | ||
246 | } | ||
136 | // new peer id: send HELLO message to newly added peer | 247 | // new peer id: send HELLO message to newly added peer |
137 | node := c.local | 248 | node := c.local |
138 | var hello *blocks.HelloBlock | 249 | var hello *blocks.HelloBlock |
139 | hello, err = node.HelloData(time.Hour) | 250 | hello, err = node.HelloData(time.Hour, addrList) |
140 | if err != nil { | 251 | if err != nil { |
141 | return | 252 | return |
142 | } | 253 | } |
@@ -150,11 +261,28 @@ func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) | |||
150 | return | 261 | return |
151 | } | 262 | } |
152 | 263 | ||
264 | // Addresses returns the list of listening endpoint addresses | ||
265 | func (c *Core) Addresses() (list []*util.Address, err error) { | ||
266 | for _, epRef := range c.endpoints { | ||
267 | list = append(list, epRef.addr) | ||
268 | } | ||
269 | return | ||
270 | } | ||
271 | |||
272 | //---------------------------------------------------------------------- | ||
273 | |||
274 | // Peer returns the local peer | ||
275 | func (c *Core) Peer() *Peer { | ||
276 | return c.local | ||
277 | } | ||
278 | |||
153 | // PeerID returns the peer id of the local node. | 279 | // PeerID returns the peer id of the local node. |
154 | func (c *Core) PeerID() *util.PeerID { | 280 | func (c *Core) PeerID() *util.PeerID { |
155 | return c.local.GetID() | 281 | return c.local.GetID() |
156 | } | 282 | } |
157 | 283 | ||
284 | //---------------------------------------------------------------------- | ||
285 | |||
158 | // TryConnect is a function which allows the local peer to attempt the | 286 | // TryConnect is a function which allows the local peer to attempt the |
159 | // establishment of a connection to another peer using an address. | 287 | // establishment of a connection to another peer using an address. |
160 | // When the connection attempt is successful, information on the new | 288 | // When the connection attempt is successful, information on the new |
diff --git a/src/gnunet/core/core_test.go b/src/gnunet/core/core_test.go index 102abf1..d8a6277 100644 --- a/src/gnunet/core/core_test.go +++ b/src/gnunet/core/core_test.go | |||
@@ -20,23 +20,149 @@ package core | |||
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "context" | 22 | "context" |
23 | "encoding/hex" | ||
23 | "gnunet/config" | 24 | "gnunet/config" |
24 | "gnunet/util" | 25 | "gnunet/util" |
25 | "testing" | 26 | "testing" |
26 | "time" | 27 | "time" |
27 | ) | 28 | ) |
28 | 29 | ||
29 | var ( | 30 | //---------------------------------------------------------------------- |
30 | peer1Cfg = &config.NodeConfig{ | 31 | // Two node GNUnet (smallest and simplest network) |
31 | PrivateSeed: "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=", | 32 | //---------------------------------------------------------------------- |
32 | Endpoints: []string{"udp://127.0.0.1:20861"}, | 33 | |
34 | // TestCoreSimple test a two node network | ||
35 | func TestCoreSimple(t *testing.T) { | ||
36 | |||
37 | var ( | ||
38 | peer1Cfg = &config.NodeConfig{ | ||
39 | Name: "p1", | ||
40 | PrivateSeed: "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=", | ||
41 | Endpoints: []*config.EndpointConfig{ | ||
42 | { | ||
43 | ID: "p1", | ||
44 | Network: "ip+udp", | ||
45 | Address: "127.0.0.1", | ||
46 | Port: 0, | ||
47 | TTL: 86400, | ||
48 | }, | ||
49 | }, | ||
50 | } | ||
51 | |||
52 | peer2Cfg = &config.NodeConfig{ | ||
53 | Name: "p2", | ||
54 | PrivateSeed: "Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=", | ||
55 | Endpoints: []*config.EndpointConfig{ | ||
56 | { | ||
57 | ID: "p2", | ||
58 | Network: "ip+udp", | ||
59 | Address: "127.0.0.1", | ||
60 | Port: 20862, | ||
61 | TTL: 86400, | ||
62 | }, | ||
63 | }, | ||
64 | } | ||
65 | ) | ||
66 | |||
67 | // setup execution context | ||
68 | ctx, cancel := context.WithCancel(context.Background()) | ||
69 | defer func() { | ||
70 | cancel() | ||
71 | time.Sleep(time.Second) | ||
72 | }() | ||
73 | |||
74 | // create and run nodes | ||
75 | node1, err := NewTestNode(t, ctx, peer1Cfg) | ||
76 | if err != nil { | ||
77 | t.Fatal(err) | ||
78 | } | ||
79 | node2, err := NewTestNode(t, ctx, peer2Cfg) | ||
80 | if err != nil { | ||
81 | t.Fatal(err) | ||
33 | } | 82 | } |
34 | 83 | ||
35 | peer2Cfg = &config.NodeConfig{ | 84 | // learn peer addresses (triggers HELLO) |
36 | PrivateSeed: "Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=", | 85 | list, err := node2.core.Addresses() |
37 | Endpoints: []string{"udp://127.0.0.1:20862"}, | 86 | if err != nil { |
87 | t.Fatal(err) | ||
38 | } | 88 | } |
39 | ) | 89 | for _, addr := range list { |
90 | node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr)) | ||
91 | } | ||
92 | |||
93 | // wait for 5 seconds | ||
94 | time.Sleep(5 * time.Second) | ||
95 | } | ||
96 | |||
97 | //---------------------------------------------------------------------- | ||
98 | // Two node GNUnet both running locally, but exchanging messages over | ||
99 | // the internet (UPNP router required). | ||
100 | //---------------------------------------------------------------------- | ||
101 | |||
102 | // TestCoreSimple test a two node network | ||
103 | func TestCoreUPNP(t *testing.T) { | ||
104 | |||
105 | // configuration data | ||
106 | var ( | ||
107 | peer1Cfg = &config.NodeConfig{ | ||
108 | Name: "p1", | ||
109 | PrivateSeed: "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=", | ||
110 | Endpoints: []*config.EndpointConfig{ | ||
111 | { | ||
112 | ID: "p1", | ||
113 | Network: "ip+udp", | ||
114 | Address: "upnp:", | ||
115 | Port: 2086, | ||
116 | TTL: 86400, | ||
117 | }, | ||
118 | }, | ||
119 | } | ||
120 | peer2Cfg = &config.NodeConfig{ | ||
121 | Name: "p2", | ||
122 | PrivateSeed: "Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=", | ||
123 | Endpoints: []*config.EndpointConfig{ | ||
124 | { | ||
125 | ID: "p2", | ||
126 | Network: "ip+udp", | ||
127 | Address: "upnp:", | ||
128 | Port: 1080, | ||
129 | TTL: 86400, | ||
130 | }, | ||
131 | }, | ||
132 | } | ||
133 | ) | ||
134 | |||
135 | // setup execution context | ||
136 | ctx, cancel := context.WithCancel(context.Background()) | ||
137 | defer func() { | ||
138 | cancel() | ||
139 | time.Sleep(time.Second) | ||
140 | }() | ||
141 | |||
142 | // create and run nodes | ||
143 | node1, err := NewTestNode(t, ctx, peer1Cfg) | ||
144 | if err != nil { | ||
145 | t.Fatal(err) | ||
146 | } | ||
147 | defer node1.Shutdown() | ||
148 | node2, err := NewTestNode(t, ctx, peer2Cfg) | ||
149 | if err != nil { | ||
150 | t.Fatal(err) | ||
151 | } | ||
152 | defer node2.Shutdown() | ||
153 | |||
154 | // learn peer addresses (triggers HELLO) | ||
155 | list, err := node2.core.Addresses() | ||
156 | if err != nil { | ||
157 | t.Fatal(err) | ||
158 | } | ||
159 | for _, addr := range list { | ||
160 | node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr)) | ||
161 | } | ||
162 | |||
163 | // sleep a bit | ||
164 | time.Sleep(3 * time.Second) | ||
165 | } | ||
40 | 166 | ||
41 | //---------------------------------------------------------------------- | 167 | //---------------------------------------------------------------------- |
42 | // create and run a node with given spec | 168 | // create and run a node with given spec |
@@ -50,9 +176,15 @@ type TestNode struct { | |||
50 | addr *util.Address | 176 | addr *util.Address |
51 | } | 177 | } |
52 | 178 | ||
179 | func (n *TestNode) Shutdown() { | ||
180 | n.core.Shutdown() | ||
181 | } | ||
182 | |||
53 | func (n *TestNode) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) { | 183 | func (n *TestNode) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) { |
54 | n.t.Logf("[%d] Learning %s for %s", n.id, addr.StringAll(), peer.String()) | 184 | n.t.Logf("[%d] Learning %s for %s", n.id, addr.StringAll(), peer.String()) |
55 | n.core.Learn(ctx, peer, addr) | 185 | if err := n.core.Learn(ctx, peer, addr); err != nil { |
186 | n.t.Log("Learn: " + err.Error()) | ||
187 | } | ||
56 | } | 188 | } |
57 | 189 | ||
58 | func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (node *TestNode, err error) { | 190 | func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (node *TestNode, err error) { |
@@ -62,18 +194,25 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (nod | |||
62 | node.t = t | 194 | node.t = t |
63 | node.id = util.NextID() | 195 | node.id = util.NextID() |
64 | 196 | ||
197 | // create core service | ||
198 | if node.core, err = NewCore(ctx, cfg); err != nil { | ||
199 | return | ||
200 | } | ||
201 | node.peer = node.core.Peer() | ||
202 | |||
65 | // create peer object | 203 | // create peer object |
66 | if node.peer, err = NewLocalPeer(cfg); err != nil { | 204 | if node.peer, err = NewLocalPeer(cfg); err != nil { |
67 | return | 205 | return |
68 | } | 206 | } |
69 | t.Logf("[%d] Node %s starting", node.id, node.peer.GetID()) | 207 | t.Logf("[%d] Node %s starting", node.id, node.peer.GetID()) |
208 | t.Logf("[%d] --> %s", node.id, hex.EncodeToString(node.peer.GetID().Key)) | ||
70 | 209 | ||
71 | // create core service | 210 | list, err := node.core.Addresses() |
72 | if node.core, err = NewCore(ctx, node.peer); err != nil { | 211 | if err != nil { |
73 | return | 212 | t.Fatal(err) |
74 | } | 213 | } |
75 | for _, addr := range node.core.trans.Endpoints() { | 214 | for _, addr := range list { |
76 | s := addr.Network() + ":" + addr.String() | 215 | s := addr.Network() + "://" + addr.String() |
77 | if node.addr, err = util.ParseAddress(s); err != nil { | 216 | if node.addr, err = util.ParseAddress(s); err != nil { |
78 | continue | 217 | continue |
79 | } | 218 | } |
@@ -82,7 +221,7 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (nod | |||
82 | 221 | ||
83 | // register as event listener | 222 | // register as event listener |
84 | incoming := make(chan *Event) | 223 | incoming := make(chan *Event) |
85 | node.core.Register("test", NewListener(incoming, nil)) | 224 | node.core.Register(cfg.Name, NewListener(incoming, nil)) |
86 | 225 | ||
87 | // heart beat | 226 | // heart beat |
88 | tick := time.NewTicker(5 * time.Minute) | 227 | tick := time.NewTicker(5 * time.Minute) |
@@ -100,6 +239,7 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (nod | |||
100 | t.Logf("[%d] <<< Peer %s diconnected", node.id, ev.Peer) | 239 | t.Logf("[%d] <<< Peer %s diconnected", node.id, ev.Peer) |
101 | case EV_MESSAGE: | 240 | case EV_MESSAGE: |
102 | t.Logf("[%d] <<< Msg from %s of type %d", node.id, ev.Peer, ev.Msg.Header().MsgType) | 241 | t.Logf("[%d] <<< Msg from %s of type %d", node.id, ev.Peer, ev.Msg.Header().MsgType) |
242 | t.Logf("[%d] <<< --> %s", node.id, ev.Msg.String()) | ||
103 | } | 243 | } |
104 | 244 | ||
105 | // handle termination signal | 245 | // handle termination signal |
@@ -115,36 +255,3 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) (nod | |||
115 | }() | 255 | }() |
116 | return | 256 | return |
117 | } | 257 | } |
118 | |||
119 | //---------------------------------------------------------------------- | ||
120 | // Two node GNUnet (smallest and simplest network) | ||
121 | //---------------------------------------------------------------------- | ||
122 | |||
123 | // TestCoreSimple test a two node network | ||
124 | func TestCoreSimple(t *testing.T) { | ||
125 | |||
126 | // setup execution context | ||
127 | ctx, cancel := context.WithCancel(context.Background()) | ||
128 | defer func() { | ||
129 | cancel() | ||
130 | time.Sleep(time.Second) | ||
131 | }() | ||
132 | |||
133 | // create and run nodes | ||
134 | node1, err := NewTestNode(t, ctx, peer1Cfg) | ||
135 | if err != nil { | ||
136 | t.Fatal(err) | ||
137 | } | ||
138 | node2, err := NewTestNode(t, ctx, peer2Cfg) | ||
139 | if err != nil { | ||
140 | t.Fatal(err) | ||
141 | } | ||
142 | |||
143 | // learn peer addresses (triggers HELLO) | ||
144 | for _, addr := range node2.core.trans.Endpoints() { | ||
145 | node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr)) | ||
146 | } | ||
147 | |||
148 | // wait for 5 seconds | ||
149 | time.Sleep(5 * time.Second) | ||
150 | } | ||
diff --git a/src/gnunet/core/peer.go b/src/gnunet/core/peer.go index 2b6fe74..86ed43a 100644 --- a/src/gnunet/core/peer.go +++ b/src/gnunet/core/peer.go | |||
@@ -48,7 +48,6 @@ type Peer struct { | |||
48 | prv *ed25519.PrivateKey // node private key (long-term signing key) | 48 | prv *ed25519.PrivateKey // node private key (long-term signing key) |
49 | pub *ed25519.PublicKey // node public key (=identifier) | 49 | pub *ed25519.PublicKey // node public key (=identifier) |
50 | idString string // node identifier as string | 50 | idString string // node identifier as string |
51 | addrList []*util.Address // list of addresses associated with node | ||
52 | ephPrv *ed25519.PrivateKey // ephemeral signing key | 51 | ephPrv *ed25519.PrivateKey // ephemeral signing key |
53 | ephMsg *message.EphemeralKeyMsg // ephemeral signing key message | 52 | ephMsg *message.EphemeralKeyMsg // ephemeral signing key message |
54 | } | 53 | } |
@@ -73,16 +72,6 @@ func NewLocalPeer(cfg *config.NodeConfig) (p *Peer, err error) { | |||
73 | if err != nil { | 72 | if err != nil { |
74 | return | 73 | return |
75 | } | 74 | } |
76 | // set the endpoint addresses for local node | ||
77 | p.addrList = make([]*util.Address, len(cfg.Endpoints)) | ||
78 | var addr *util.Address | ||
79 | for i, a := range cfg.Endpoints { | ||
80 | if addr, err = util.ParseAddress(a); err != nil { | ||
81 | return | ||
82 | } | ||
83 | addr.Expires = util.NewAbsoluteTime(time.Now().Add(12 * time.Hour)) | ||
84 | p.addrList[i] = addr | ||
85 | } | ||
86 | return | 75 | return |
87 | } | 76 | } |
88 | 77 | ||
@@ -98,36 +87,24 @@ func NewPeer(peerID string) (p *Peer, err error) { | |||
98 | p.prv = nil | 87 | p.prv = nil |
99 | p.pub = ed25519.NewPublicKeyFromBytes(data) | 88 | p.pub = ed25519.NewPublicKeyFromBytes(data) |
100 | p.idString = util.EncodeBinaryToString(p.pub.Bytes()) | 89 | p.idString = util.EncodeBinaryToString(p.pub.Bytes()) |
101 | p.addrList = make([]*util.Address, 0) | ||
102 | return | 90 | return |
103 | } | 91 | } |
104 | 92 | ||
93 | // Shutdown peer-related processes. | ||
94 | func (p *Peer) Shutdown() {} | ||
95 | |||
105 | //---------------------------------------------------------------------- | 96 | //---------------------------------------------------------------------- |
106 | //---------------------------------------------------------------------- | 97 | //---------------------------------------------------------------------- |
107 | 98 | ||
108 | // Address returns a peer address for the given transport protocol | 99 | // HelloData returns the current HELLO data for the peer. The list of listening |
109 | func (p *Peer) Address(transport string) *util.Address { | 100 | // endpoint addresses re passed in from core to reflect the actual active |
110 | for _, addr := range p.addrList { | 101 | // endpoints. |
111 | // skip expired entries | 102 | func (p *Peer) HelloData(ttl time.Duration, a []*util.Address) (h *blocks.HelloBlock, err error) { |
112 | if addr.Expires.Expired() { | ||
113 | continue | ||
114 | } | ||
115 | // filter by transport protocol | ||
116 | if len(transport) > 0 && transport != addr.Netw { | ||
117 | continue | ||
118 | } | ||
119 | return addr | ||
120 | } | ||
121 | return nil | ||
122 | } | ||
123 | |||
124 | // HelloData returns the current HELLO data for the peer | ||
125 | func (p *Peer) HelloData(ttl time.Duration) (h *blocks.HelloBlock, err error) { | ||
126 | // assemble HELLO data | 103 | // assemble HELLO data |
127 | h = new(blocks.HelloBlock) | 104 | h = new(blocks.HelloBlock) |
128 | h.PeerID = p.GetID() | 105 | h.PeerID = p.GetID() |
129 | h.Expire = util.NewAbsoluteTime(time.Now().Add(ttl)) | 106 | h.Expire = util.NewAbsoluteTime(time.Now().Add(ttl)) |
130 | h.SetAddresses(p.addrList) | 107 | h.SetAddresses(a) |
131 | 108 | ||
132 | // sign data | 109 | // sign data |
133 | err = h.Sign(p.prv) | 110 | err = h.Sign(p.prv) |
@@ -171,16 +148,6 @@ func (p *Peer) GetIDString() string { | |||
171 | return p.idString | 148 | return p.idString |
172 | } | 149 | } |
173 | 150 | ||
174 | // GetAddressList returns a list of addresses associated with this peer. | ||
175 | func (p *Peer) GetAddressList() []*util.Address { | ||
176 | return p.addrList | ||
177 | } | ||
178 | |||
179 | // AddAddress adds a new address for a node. | ||
180 | func (p *Peer) AddAddress(a *util.Address) { | ||
181 | p.addrList = append(p.addrList, a) | ||
182 | } | ||
183 | |||
184 | // Sign a message with the (long-term) private key. | 151 | // Sign a message with the (long-term) private key. |
185 | func (p *Peer) Sign(msg []byte) (*ed25519.EdSignature, error) { | 152 | func (p *Peer) Sign(msg []byte) (*ed25519.EdSignature, error) { |
186 | if p.prv == nil { | 153 | if p.prv == nil { |
diff --git a/src/gnunet/core/peer_test.go b/src/gnunet/core/peer_test.go index 28b328f..70a5c5f 100644 --- a/src/gnunet/core/peer_test.go +++ b/src/gnunet/core/peer_test.go | |||
@@ -21,6 +21,7 @@ package core | |||
21 | import ( | 21 | import ( |
22 | "gnunet/config" | 22 | "gnunet/config" |
23 | "gnunet/service/dht/blocks" | 23 | "gnunet/service/dht/blocks" |
24 | "gnunet/util" | ||
24 | "testing" | 25 | "testing" |
25 | "time" | 26 | "time" |
26 | ) | 27 | ) |
@@ -29,8 +30,13 @@ import ( | |||
29 | var ( | 30 | var ( |
30 | cfg = &config.NodeConfig{ | 31 | cfg = &config.NodeConfig{ |
31 | PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", | 32 | PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", |
32 | Endpoints: []string{ | 33 | Endpoints: []*config.EndpointConfig{ |
33 | "r5n+ip+udp://127.0.0.1:6666", | 34 | { |
35 | ID: "test", | ||
36 | Network: "r5n+ip+udp", | ||
37 | Address: "127.0.0.1", | ||
38 | Port: 6666, | ||
39 | }, | ||
34 | }, | 40 | }, |
35 | } | 41 | } |
36 | TTL = 6 * time.Hour | 42 | TTL = 6 * time.Hour |
@@ -44,8 +50,17 @@ func TestPeerHello(t *testing.T) { | |||
44 | t.Fatal(err) | 50 | t.Fatal(err) |
45 | } | 51 | } |
46 | 52 | ||
47 | // get HELLO data for the node | 53 | // get HELLO data for the node: |
48 | h, err := node.HelloData(TTL) | 54 | // This hack will only work for direct listening addresses |
55 | addrList := make([]*util.Address, 0) | ||
56 | for _, epRef := range cfg.Endpoints { | ||
57 | addr, err := util.ParseAddress(epRef.Addr()) | ||
58 | if err != nil { | ||
59 | t.Fatal(err) | ||
60 | } | ||
61 | addrList = append(addrList, addr) | ||
62 | } | ||
63 | h, err := node.HelloData(TTL, addrList) | ||
49 | 64 | ||
50 | // convert to URL and back | 65 | // convert to URL and back |
51 | u := h.URL() | 66 | u := h.URL() |
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod index ad203ca..eb17b30 100644 --- a/src/gnunet/go.mod +++ b/src/gnunet/go.mod | |||
@@ -15,9 +15,11 @@ require ( | |||
15 | require ( | 15 | require ( |
16 | github.com/cespare/xxhash/v2 v2.1.2 // indirect | 16 | github.com/cespare/xxhash/v2 v2.1.2 // indirect |
17 | github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect | 17 | github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect |
18 | github.com/huin/goupnp v1.0.0 // indirect | ||
18 | golang.org/x/mod v0.4.2 // indirect | 19 | golang.org/x/mod v0.4.2 // indirect |
19 | golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect | 20 | golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect |
20 | golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect | 21 | golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect |
22 | golang.org/x/text v0.3.7 // indirect | ||
21 | golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect | 23 | golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect |
22 | golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect | 24 | golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect |
23 | ) | 25 | ) |
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum index f2baf8e..17c2d00 100644 --- a/src/gnunet/go.sum +++ b/src/gnunet/go.sum | |||
@@ -11,6 +11,7 @@ github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC | |||
11 | github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= | 11 | github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= |
12 | github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= | 12 | github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= |
13 | github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= | 13 | github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= |
14 | github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo= | ||
14 | github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= | 15 | github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= |
15 | github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= | 16 | github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= |
16 | github.com/mattn/go-sqlite3 v1.14.13 h1:1tj15ngiFfcZzii7yd82foL+ks+ouQcj8j/TPq3fk1I= | 17 | github.com/mattn/go-sqlite3 v1.14.13 h1:1tj15ngiFfcZzii7yd82foL+ks+ouQcj8j/TPq3fk1I= |
@@ -55,6 +56,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= | |||
55 | golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= | 56 | golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= |
56 | golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= | 57 | golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= |
57 | golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= | 58 | golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= |
59 | golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= | ||
58 | golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= | 60 | golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= |
59 | golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= | 61 | golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= |
60 | golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 h1:BonxutuHCTL0rBDnZlKjpGIQFTjyUVTexFOdWkB6Fg0= | 62 | golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 h1:BonxutuHCTL0rBDnZlKjpGIQFTjyUVTexFOdWkB6Fg0= |
diff --git a/src/gnunet/message/msg_hello.go b/src/gnunet/message/msg_hello.go index 18fe9d5..25ef98a 100644 --- a/src/gnunet/message/msg_hello.go +++ b/src/gnunet/message/msg_hello.go | |||
@@ -59,7 +59,7 @@ func NewHelloAddress(a *util.Address) *HelloAddress { | |||
59 | // String returns a human-readable representation of the message. | 59 | // String returns a human-readable representation of the message. |
60 | func (a *HelloAddress) String() string { | 60 | func (a *HelloAddress) String() string { |
61 | return fmt.Sprintf("Address{%s,expire=%s}", | 61 | return fmt.Sprintf("Address{%s,expire=%s}", |
62 | util.AddressString(a.Transport, a.Address), a.ExpireOn) | 62 | util.URI(a.Transport, a.Address), a.ExpireOn) |
63 | } | 63 | } |
64 | 64 | ||
65 | // HelloMsg is a message send by peers to announce their presence | 65 | // HelloMsg is a message send by peers to announce their presence |
diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go deleted file mode 100644 index e47699f..0000000 --- a/src/gnunet/modules.go +++ /dev/null | |||
@@ -1,84 +0,0 @@ | |||
1 | // This file is part of gnunet-go, a GNUnet-implementation in Golang. | ||
2 | // Copyright (C) 2019, 2020 Bernd Fix >Y< | ||
3 | // | ||
4 | // gnunet-go is free software: you can redistribute it and/or modify it | ||
5 | // under the terms of the GNU Affero General Public License as published | ||
6 | // by the Free Software Foundation, either version 3 of the License, | ||
7 | // or (at your option) any later version. | ||
8 | // | ||
9 | // gnunet-go is distributed in the hope that it will be useful, but | ||
10 | // WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
12 | // Affero General Public License for more details. | ||
13 | // | ||
14 | // You should have received a copy of the GNU Affero General Public License | ||
15 | // along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
16 | // | ||
17 | // SPDX-License-Identifier: AGPL3.0-or-later | ||
18 | |||
19 | //====================================================================== | ||
20 | // Standalone (all-in-one) implementation of GNUnet: | ||
21 | // ------------------------------------------------- | ||
22 | // Instead of running GNUnet services like GNS or DHT in separate | ||
23 | // processes communicating (exchanging messages) with each other over | ||
24 | // Unix Domain Sockets, the standalone implementation combines all | ||
25 | // service modules into a single binary running go-routines to | ||
26 | // concurrently performing their tasks. | ||
27 | //====================================================================== | ||
28 | |||
29 | package gnunet | ||
30 | |||
31 | import ( | ||
32 | "gnunet/service/dht" | ||
33 | "gnunet/service/gns" | ||
34 | "gnunet/service/namecache" | ||
35 | "gnunet/service/revocation" | ||
36 | "net/rpc" | ||
37 | ) | ||
38 | |||
39 | // Instances holds a list of all GNUnet service modules | ||
40 | type Instances struct { | ||
41 | GNS *gns.Module | ||
42 | Namecache *namecache.NamecacheModule | ||
43 | DHT *dht.Module | ||
44 | Revocation *revocation.Module | ||
45 | } | ||
46 | |||
47 | // Register modules for JSON-RPC | ||
48 | func (inst Instances) Register() { | ||
49 | rpc.Register(inst.GNS) | ||
50 | rpc.Register(inst.Namecache) | ||
51 | rpc.Register(inst.DHT) | ||
52 | rpc.Register(inst.Revocation) | ||
53 | } | ||
54 | |||
55 | // Local reference to instance list. The list is initialized | ||
56 | // by core. | ||
57 | var ( | ||
58 | Modules Instances | ||
59 | ) | ||
60 | |||
61 | /* TODO: implement | ||
62 | // Initialize instance list and link module functions as required. | ||
63 | // This function is called by core on start-up. | ||
64 | func Init(ctx context.Context) { | ||
65 | |||
66 | // Namecache (no calls to other modules) | ||
67 | Modules.Namecache = namecache.NewModule(ctx, c) | ||
68 | |||
69 | // DHT (no calls to other modules) | ||
70 | Modules.DHT = dht.NewModule(ctx, c) | ||
71 | |||
72 | // Revocation (no calls to other modules) | ||
73 | Modules.Revocation = revocation.NewModule(ctx, c) | ||
74 | |||
75 | // GNS (calls Namecache, DHT and Identity) | ||
76 | gns := gns.NewModule(ctx, c) | ||
77 | Modules.GNS = gns | ||
78 | gns.LookupLocal = Modules.Namecache.Get | ||
79 | gns.StoreLocal = Modules.Namecache.Put | ||
80 | gns.LookupRemote = Modules.DHT.Get | ||
81 | gns.RevocationQuery = Modules.Revocation.Query | ||
82 | gns.RevocationRevoke = Modules.Revocation.Revoke | ||
83 | } | ||
84 | */ | ||
diff --git a/src/gnunet/service/dht/blocks/hello.go b/src/gnunet/service/dht/blocks/hello.go index 77fc2ae..eb3bf2a 100644 --- a/src/gnunet/service/dht/blocks/hello.go +++ b/src/gnunet/service/dht/blocks/hello.go | |||
@@ -177,7 +177,7 @@ func (h *HelloBlock) URL() string { | |||
177 | if i > 0 { | 177 | if i > 0 { |
178 | u += "&" | 178 | u += "&" |
179 | } | 179 | } |
180 | u += url.QueryEscape(a.String()) | 180 | u += url.QueryEscape(a.URI()) |
181 | } | 181 | } |
182 | return u | 182 | return u |
183 | } | 183 | } |
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go index 5f04d54..3339aa2 100644 --- a/src/gnunet/service/dht/module.go +++ b/src/gnunet/service/dht/module.go | |||
@@ -81,7 +81,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module) { | |||
81 | 81 | ||
82 | //---------------------------------------------------------------------- | 82 | //---------------------------------------------------------------------- |
83 | 83 | ||
84 | // Get a block from the DHT | 84 | // Get a block from the DHT ["dht:get"] |
85 | func (nc *Module) Get(ctx context.Context, query blocks.Query) (block blocks.Block, err error) { | 85 | func (nc *Module) Get(ctx context.Context, query blocks.Query) (block blocks.Block, err error) { |
86 | 86 | ||
87 | // check if we have the requested block in cache or permanent storage. | 87 | // check if we have the requested block in cache or permanent storage. |
@@ -100,7 +100,7 @@ func (nc *Module) Get(ctx context.Context, query blocks.Query) (block blocks.Blo | |||
100 | return nil, nil | 100 | return nil, nil |
101 | } | 101 | } |
102 | 102 | ||
103 | // Put a block into the DHT | 103 | // Put a block into the DHT ["dht:put"] |
104 | func (nc *Module) Put(ctx context.Context, key blocks.Query, block blocks.Block) error { | 104 | func (nc *Module) Put(ctx context.Context, key blocks.Query, block blocks.Block) error { |
105 | return nil | 105 | return nil |
106 | } | 106 | } |
@@ -142,6 +142,20 @@ func (m *Module) heartbeat(ctx context.Context) { | |||
142 | 142 | ||
143 | //---------------------------------------------------------------------- | 143 | //---------------------------------------------------------------------- |
144 | 144 | ||
145 | // Export functions | ||
146 | func (m *Module) Export(fcn map[string]any) { | ||
147 | // add exported functions from module | ||
148 | fcn["dht:get"] = m.Get | ||
149 | fcn["dht:put"] = m.Put | ||
150 | } | ||
151 | |||
152 | // Import functions | ||
153 | func (m *Module) Import(fcm map[string]any) { | ||
154 | // nothing to import now. | ||
155 | } | ||
156 | |||
157 | //---------------------------------------------------------------------- | ||
158 | |||
145 | // RPC returns the route and handler function for a JSON-RPC request | 159 | // RPC returns the route and handler function for a JSON-RPC request |
146 | func (m *Module) RPC() (string, func(http.ResponseWriter, *http.Request)) { | 160 | func (m *Module) RPC() (string, func(http.ResponseWriter, *http.Request)) { |
147 | return "/gns/", func(wrt http.ResponseWriter, req *http.Request) { | 161 | return "/gns/", func(wrt http.ResponseWriter, req *http.Request) { |
diff --git a/src/gnunet/service/dht/routingtable.go b/src/gnunet/service/dht/routingtable.go index 0078b71..2933e6a 100644 --- a/src/gnunet/service/dht/routingtable.go +++ b/src/gnunet/service/dht/routingtable.go | |||
@@ -101,7 +101,7 @@ type RoutingTable struct { | |||
101 | ref *PeerAddress // reference address for distance | 101 | ref *PeerAddress // reference address for distance |
102 | buckets []*Bucket // list of buckets | 102 | buckets []*Bucket // list of buckets |
103 | list map[*PeerAddress]struct{} // keep list of peers | 103 | list map[*PeerAddress]struct{} // keep list of peers |
104 | rwlock sync.RWMutex // lock for write operations | 104 | mtx sync.RWMutex // lock for write operations |
105 | l2nse float64 // log2 of estimated network size | 105 | l2nse float64 // log2 of estimated network size |
106 | inProcess bool // flag if Process() is running | 106 | inProcess bool // flag if Process() is running |
107 | } | 107 | } |
@@ -131,8 +131,8 @@ func NewRoutingTable(ref *PeerAddress) *RoutingTable { | |||
131 | // Returns true if the entry was added, false otherwise. | 131 | // Returns true if the entry was added, false otherwise. |
132 | func (rt *RoutingTable) Add(p *PeerAddress) bool { | 132 | func (rt *RoutingTable) Add(p *PeerAddress) bool { |
133 | // ensure one write and no readers | 133 | // ensure one write and no readers |
134 | rt.rwlock.Lock() | 134 | rt.lock(false) |
135 | defer rt.rwlock.Unlock() | 135 | defer rt.unlock(false) |
136 | 136 | ||
137 | // check if peer is already known | 137 | // check if peer is already known |
138 | if _, ok := rt.list[p]; ok { | 138 | if _, ok := rt.list[p]; ok { |
@@ -153,8 +153,8 @@ func (rt *RoutingTable) Add(p *PeerAddress) bool { | |||
153 | // Returns true if the entry was removed, false otherwise. | 153 | // Returns true if the entry was removed, false otherwise. |
154 | func (rt *RoutingTable) Remove(p *PeerAddress) bool { | 154 | func (rt *RoutingTable) Remove(p *PeerAddress) bool { |
155 | // ensure one write and no readers | 155 | // ensure one write and no readers |
156 | rt.rwlock.Lock() | 156 | rt.lock(false) |
157 | defer rt.rwlock.Unlock() | 157 | defer rt.unlock(false) |
158 | 158 | ||
159 | // compute distance (bucket index) and remove entry from bucket | 159 | // compute distance (bucket index) and remove entry from bucket |
160 | _, idx := p.Distance(rt.ref) | 160 | _, idx := p.Distance(rt.ref) |
@@ -170,10 +170,15 @@ func (rt *RoutingTable) Remove(p *PeerAddress) bool { | |||
170 | //---------------------------------------------------------------------- | 170 | //---------------------------------------------------------------------- |
171 | 171 | ||
172 | // Process a function f in the locked context of a routing table | 172 | // Process a function f in the locked context of a routing table |
173 | func (rt *RoutingTable) Process(f func() error) error { | 173 | func (rt *RoutingTable) Process(f func() error, readonly bool) error { |
174 | // ensure one write and no readers | 174 | // handle locking |
175 | rt.rwlock.Lock() | 175 | rt.lock(readonly) |
176 | defer rt.rwlock.Unlock() | 176 | rt.inProcess = true |
177 | defer func() { | ||
178 | rt.inProcess = false | ||
179 | rt.unlock(readonly) | ||
180 | }() | ||
181 | // call function in unlocked context | ||
177 | return f() | 182 | return f() |
178 | } | 183 | } |
179 | 184 | ||
@@ -184,8 +189,8 @@ func (rt *RoutingTable) Process(f func() error) error { | |||
184 | // SelectClosestPeer for a given peer address and bloomfilter. | 189 | // SelectClosestPeer for a given peer address and bloomfilter. |
185 | func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) (n *PeerAddress) { | 190 | func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) (n *PeerAddress) { |
186 | // no writer allowed | 191 | // no writer allowed |
187 | rt.rwlock.RLock() | 192 | rt.mtx.RLock() |
188 | defer rt.rwlock.RUnlock() | 193 | defer rt.mtx.RUnlock() |
189 | 194 | ||
190 | // find closest address | 195 | // find closest address |
191 | var dist *math.Int | 196 | var dist *math.Int |
@@ -204,8 +209,8 @@ func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) ( | |||
204 | // included in the bloomfilter) | 209 | // included in the bloomfilter) |
205 | func (rt *RoutingTable) SelectRandomPeer(bf *PeerBloomFilter) *PeerAddress { | 210 | func (rt *RoutingTable) SelectRandomPeer(bf *PeerBloomFilter) *PeerAddress { |
206 | // no writer allowed | 211 | // no writer allowed |
207 | rt.rwlock.RLock() | 212 | rt.mtx.RLock() |
208 | defer rt.rwlock.RUnlock() | 213 | defer rt.mtx.RUnlock() |
209 | 214 | ||
210 | // select random entry from list | 215 | // select random entry from list |
211 | if size := len(rt.list); size > 0 { | 216 | if size := len(rt.list); size > 0 { |
@@ -279,11 +284,35 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) { | |||
279 | } | 284 | } |
280 | } | 285 | } |
281 | return nil | 286 | return nil |
282 | }); err != nil { | 287 | }, false); err != nil { |
283 | logger.Println(logger.ERROR, "[dht] RT heartbeat: "+err.Error()) | 288 | logger.Println(logger.ERROR, "[dht] RT heartbeat: "+err.Error()) |
284 | } | 289 | } |
285 | } | 290 | } |
286 | 291 | ||
292 | //---------------------------------------------------------------------- | ||
293 | |||
294 | // lock with given mode (if not in processing function) | ||
295 | func (rt *RoutingTable) lock(readonly bool) { | ||
296 | if !rt.inProcess { | ||
297 | if readonly { | ||
298 | rt.mtx.RLock() | ||
299 | } else { | ||
300 | rt.mtx.Lock() | ||
301 | } | ||
302 | } | ||
303 | } | ||
304 | |||
305 | // lock with given mode (if not in processing function) | ||
306 | func (rt *RoutingTable) unlock(readonly bool) { | ||
307 | if !rt.inProcess { | ||
308 | if readonly { | ||
309 | rt.mtx.RUnlock() | ||
310 | } else { | ||
311 | rt.mtx.Unlock() | ||
312 | } | ||
313 | } | ||
314 | } | ||
315 | |||
287 | //====================================================================== | 316 | //====================================================================== |
288 | // Routing table buckets | 317 | // Routing table buckets |
289 | //====================================================================== | 318 | //====================================================================== |
diff --git a/src/gnunet/service/dht/routingtable_test.go b/src/gnunet/service/dht/routingtable_test.go index 659f9d4..33c4b7f 100644 --- a/src/gnunet/service/dht/routingtable_test.go +++ b/src/gnunet/service/dht/routingtable_test.go | |||
@@ -45,8 +45,12 @@ type Entry struct { | |||
45 | var ( | 45 | var ( |
46 | cfg = &config.NodeConfig{ | 46 | cfg = &config.NodeConfig{ |
47 | PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", | 47 | PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=", |
48 | Endpoints: []string{ | 48 | Endpoints: []*config.EndpointConfig{ |
49 | "r5n+ip+udp://127.0.0.1:6666", | 49 | { |
50 | Network: "r5n+ip+udp", | ||
51 | Address: "127.0.0.1", | ||
52 | Port: 6666, | ||
53 | }, | ||
50 | }, | 54 | }, |
51 | } | 55 | } |
52 | ) | 56 | ) |
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go index 4878aa0..93f9bca 100644 --- a/src/gnunet/service/gns/module.go +++ b/src/gnunet/service/gns/module.go | |||
@@ -98,6 +98,7 @@ type Module struct { | |||
98 | RevocationRevoke func(ctx context.Context, rd *revocation.RevData) (success bool, err error) | 98 | RevocationRevoke func(ctx context.Context, rd *revocation.RevData) (success bool, err error) |
99 | } | 99 | } |
100 | 100 | ||
101 | // NewModule instantiates a new GNS module. | ||
101 | func NewModule(ctx context.Context, c *core.Core) (m *Module) { | 102 | func NewModule(ctx context.Context, c *core.Core) (m *Module) { |
102 | m = &Module{ | 103 | m = &Module{ |
103 | ModuleImpl: *service.NewModuleImpl(), | 104 | ModuleImpl: *service.NewModuleImpl(), |
@@ -128,6 +129,23 @@ func (m *Module) event(ctx context.Context, ev *core.Event) { | |||
128 | 129 | ||
129 | //---------------------------------------------------------------------- | 130 | //---------------------------------------------------------------------- |
130 | 131 | ||
132 | // Export functions | ||
133 | func (m *Module) Export(fcn map[string]any) { | ||
134 | // add exported functions from module | ||
135 | } | ||
136 | |||
137 | // Import functions | ||
138 | func (m *Module) Import(fcn map[string]any) { | ||
139 | // resolve imports from other modules | ||
140 | m.LookupLocal = fcn["namecache:get"].(func(ctx context.Context, query *blocks.GNSQuery) (*blocks.GNSBlock, error)) | ||
141 | m.StoreLocal = fcn["namecache:put"].(func(ctx context.Context, query *blocks.GNSQuery, block *blocks.GNSBlock) error) | ||
142 | m.LookupRemote = fcn["dht:get"].(func(ctx context.Context, query blocks.Query) (blocks.Block, error)) | ||
143 | m.RevocationQuery = fcn["rev:query"].(func(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, err error)) | ||
144 | m.RevocationRevoke = fcn["rev:revoke"].(func(ctx context.Context, rd *revocation.RevData) (success bool, err error)) | ||
145 | } | ||
146 | |||
147 | //---------------------------------------------------------------------- | ||
148 | |||
131 | // Resolve a GNS name with multiple labels. If pkey is not nil, the name | 149 | // Resolve a GNS name with multiple labels. If pkey is not nil, the name |
132 | // is interpreted as "relative to current zone". | 150 | // is interpreted as "relative to current zone". |
133 | func (m *Module) Resolve( | 151 | func (m *Module) Resolve( |
diff --git a/src/gnunet/service/module.go b/src/gnunet/service/module.go index 5f95975..4109f16 100644 --- a/src/gnunet/service/module.go +++ b/src/gnunet/service/module.go | |||
@@ -26,7 +26,43 @@ import ( | |||
26 | ) | 26 | ) |
27 | 27 | ||
28 | // Module is an interface for GNUnet service modules (workers). | 28 | // Module is an interface for GNUnet service modules (workers). |
29 | // | ||
30 | // Modules can call other GNUnet services; these services can be used by | ||
31 | // sending messages to the respective service socket (the default way) or by | ||
32 | // calling the module functions directly (if the other module is compiled | ||
33 | // along with the calling module into one binary). The latter method requires | ||
34 | // calls to m.Export() and m.Import() to link the modules together (see | ||
35 | // example): | ||
36 | // | ||
37 | // // create module instances | ||
38 | // gnsMod = gns.NewModule(ctx, core) | ||
39 | // dhtMod = dht.NewModule(ctx, core) | ||
40 | // ncMod = namecache.NewModule(ctx, core) | ||
41 | // revMod = revocation.NewModule(ctx, core) | ||
42 | // | ||
43 | // // export module functions | ||
44 | // fcn := make(map[string]any) | ||
45 | // gnsMod.Export(fcn) | ||
46 | // dhtMod.Export(fcn) | ||
47 | // ncMod.Export(fcn) | ||
48 | // revMod.Export(fcn) | ||
49 | // | ||
50 | // // import (link) module functions | ||
51 | // gnsMod.Import(fcn) | ||
52 | // dhtMod.Import(fcn) | ||
53 | // ncMod.Import(fcn) | ||
54 | // revMod.Import(fcn) | ||
55 | // | ||
56 | // Exported and imported module function are identified by name defined in the | ||
57 | // Export() function. Import() functions that access functions in other modules | ||
58 | // need to use the same name for linking. | ||
29 | type Module interface { | 59 | type Module interface { |
60 | // Export functions by name | ||
61 | Export(map[string]any) | ||
62 | |||
63 | // Import functions by name | ||
64 | Import(map[string]any) | ||
65 | |||
30 | // RPC returns the route and handler for JSON-RPC requests | 66 | // RPC returns the route and handler for JSON-RPC requests |
31 | RPC() (string, func(http.ResponseWriter, *http.Request)) | 67 | RPC() (string, func(http.ResponseWriter, *http.Request)) |
32 | 68 | ||
diff --git a/src/gnunet/service/namecache/module.go b/src/gnunet/service/namecache/module.go index 9d5bca1..b9aaad0 100644 --- a/src/gnunet/service/namecache/module.go +++ b/src/gnunet/service/namecache/module.go | |||
@@ -35,23 +35,39 @@ import ( | |||
35 | //---------------------------------------------------------------------- | 35 | //---------------------------------------------------------------------- |
36 | 36 | ||
37 | // Namecache handles the transient storage of GNS blocks under the query key. | 37 | // Namecache handles the transient storage of GNS blocks under the query key. |
38 | type NamecacheModule struct { | 38 | type Module struct { |
39 | service.ModuleImpl | 39 | service.ModuleImpl |
40 | 40 | ||
41 | cache service.DHTStore // transient block cache | 41 | cache service.DHTStore // transient block cache |
42 | } | 42 | } |
43 | 43 | ||
44 | // NewModule creates a new module instance. | 44 | // NewModule creates a new module instance. |
45 | func NewModule(ctx context.Context, c *core.Core) (m *NamecacheModule) { | 45 | func NewModule(ctx context.Context, c *core.Core) (m *Module) { |
46 | m = &NamecacheModule{ | 46 | m = &Module{ |
47 | ModuleImpl: *service.NewModuleImpl(), | 47 | ModuleImpl: *service.NewModuleImpl(), |
48 | } | 48 | } |
49 | m.cache, _ = service.NewDHTStore(config.Cfg.Namecache.Storage) | 49 | m.cache, _ = service.NewDHTStore(config.Cfg.Namecache.Storage) |
50 | return | 50 | return |
51 | } | 51 | } |
52 | 52 | ||
53 | //---------------------------------------------------------------------- | ||
54 | |||
55 | // Export functions | ||
56 | func (m *Module) Export(fcn map[string]any) { | ||
57 | // add exported functions from module | ||
58 | fcn["namecache:get"] = m.Get | ||
59 | fcn["namecache:put"] = m.Put | ||
60 | } | ||
61 | |||
62 | // Import functions | ||
63 | func (m *Module) Import(fcm map[string]any) { | ||
64 | // nothing to import now. | ||
65 | } | ||
66 | |||
67 | //---------------------------------------------------------------------- | ||
68 | |||
53 | // Get an entry from the cache if available. | 69 | // Get an entry from the cache if available. |
54 | func (m *NamecacheModule) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) { | 70 | func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) { |
55 | var b blocks.Block | 71 | var b blocks.Block |
56 | b, err = m.cache.Get(query) | 72 | b, err = m.cache.Get(query) |
57 | err = blocks.Unwrap(b, block) | 73 | err = blocks.Unwrap(b, block) |
@@ -59,6 +75,6 @@ func (m *NamecacheModule) Get(ctx context.Context, query *blocks.GNSQuery) (bloc | |||
59 | } | 75 | } |
60 | 76 | ||
61 | // Put entry into the cache. | 77 | // Put entry into the cache. |
62 | func (m *NamecacheModule) Put(ctx context.Context, query *blocks.GNSQuery, block *blocks.GNSBlock) error { | 78 | func (m *Module) Put(ctx context.Context, query *blocks.GNSQuery, block *blocks.GNSBlock) error { |
63 | return m.cache.Put(query, block) | 79 | return m.cache.Put(query, block) |
64 | } | 80 | } |
diff --git a/src/gnunet/service/revocation/module.go b/src/gnunet/service/revocation/module.go index 1f0ab48..eade16b 100644 --- a/src/gnunet/service/revocation/module.go +++ b/src/gnunet/service/revocation/module.go | |||
@@ -98,8 +98,22 @@ func (m *Module) event(ctx context.Context, ev *core.Event) { | |||
98 | 98 | ||
99 | //---------------------------------------------------------------------- | 99 | //---------------------------------------------------------------------- |
100 | 100 | ||
101 | // Export functions | ||
102 | func (m *Module) Export(fcn map[string]any) { | ||
103 | // add exported functions from module | ||
104 | fcn["rev:query"] = m.Query | ||
105 | fcn["rev:revoke"] = m.Revoke | ||
106 | } | ||
107 | |||
108 | // Import functions | ||
109 | func (m *Module) Import(fcm map[string]any) { | ||
110 | // nothing to import now. | ||
111 | } | ||
112 | |||
113 | //---------------------------------------------------------------------- | ||
114 | |||
101 | // Query return true if the pkey is valid (not revoked) and false | 115 | // Query return true if the pkey is valid (not revoked) and false |
102 | // if the pkey has been revoked. | 116 | // if the pkey has been revoked ["rev:query"] |
103 | func (m *Module) Query(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, err error) { | 117 | func (m *Module) Query(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, err error) { |
104 | // fast check first: is the key in the bloomfilter? | 118 | // fast check first: is the key in the bloomfilter? |
105 | data := zkey.Bytes() | 119 | data := zkey.Bytes() |
@@ -118,7 +132,7 @@ func (m *Module) Query(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, e | |||
118 | return false, nil | 132 | return false, nil |
119 | } | 133 | } |
120 | 134 | ||
121 | // Revoke a key with given revocation data | 135 | // Revoke a key with given revocation data ["rev:revoke"] |
122 | func (m *Module) Revoke(ctx context.Context, rd *RevData) (success bool, err error) { | 136 | func (m *Module) Revoke(ctx context.Context, rd *RevData) (success bool, err error) { |
123 | // verify the revocation data | 137 | // verify the revocation data |
124 | diff, rc := rd.Verify(true) | 138 | diff, rc := rd.Verify(true) |
diff --git a/src/gnunet/transport/endpoint.go b/src/gnunet/transport/endpoint.go index b54ee4e..5dabfa2 100644 --- a/src/gnunet/transport/endpoint.go +++ b/src/gnunet/transport/endpoint.go | |||
@@ -30,6 +30,10 @@ import ( | |||
30 | var ( | 30 | var ( |
31 | ErrEndpNotAvailable = errors.New("no endpoint for address available") | 31 | ErrEndpNotAvailable = errors.New("no endpoint for address available") |
32 | ErrEndpProtocolMismatch = errors.New("transport protocol mismatch") | 32 | ErrEndpProtocolMismatch = errors.New("transport protocol mismatch") |
33 | ErrEndpProtocolUnknown = errors.New("unknown transport protocol") | ||
34 | ErrEndpExists = errors.New("endpoint exists") | ||
35 | ErrEndpNoAddress = errors.New("no address for endpoint") | ||
36 | ErrEndpNoConnection = errors.New("no connection on endpoint") | ||
33 | ) | 37 | ) |
34 | 38 | ||
35 | // Endpoint represents a local endpoint that can send and receive messages. | 39 | // Endpoint represents a local endpoint that can send and receive messages. |
@@ -83,9 +87,12 @@ type PaketEndpoint struct { | |||
83 | func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) (err error) { | 87 | func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) (err error) { |
84 | // create listener | 88 | // create listener |
85 | var lc net.ListenConfig | 89 | var lc net.ListenConfig |
86 | if ep.conn, err = lc.ListenPacket(ctx, ep.addr.Network(), ep.addr.String()); err != nil { | 90 | xproto := ep.addr.Network() |
91 | if ep.conn, err = lc.ListenPacket(ctx, EpProtocol(xproto), ep.addr.String()); err != nil { | ||
87 | return | 92 | return |
88 | } | 93 | } |
94 | // use the actual listening address | ||
95 | ep.addr = util.NewAddress(xproto, ep.conn.LocalAddr().String()) | ||
89 | 96 | ||
90 | // run watch dog for termination | 97 | // run watch dog for termination |
91 | go func() { | 98 | go func() { |
@@ -95,27 +102,15 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) ( | |||
95 | // run go routine to handle messages from clients | 102 | // run go routine to handle messages from clients |
96 | go func() { | 103 | go func() { |
97 | for { | 104 | for { |
98 | // read next message from packet | 105 | // read next message |
99 | n, _, err := ep.conn.ReadFrom(ep.buf) | 106 | tm, err := ep.read() |
100 | if err != nil { | 107 | if err != nil { |
101 | break | 108 | break |
102 | } | 109 | } |
103 | rdr := bytes.NewBuffer(util.Clone(ep.buf[:n])) | 110 | // send transport message to handler |
104 | msg, err := ReadMessageDirect(rdr, ep.buf) | 111 | go func() { |
105 | if err != nil { | 112 | hdlr <- tm |
106 | break | 113 | }() |
107 | } | ||
108 | // check for transport message | ||
109 | if msg.Header().MsgType == message.DUMMY { | ||
110 | // set transient attributes | ||
111 | tm := msg.(*TransportMessage) | ||
112 | tm.endp = ep.id | ||
113 | tm.conn = 0 | ||
114 | // send to handler | ||
115 | go func() { | ||
116 | hdlr <- tm | ||
117 | }() | ||
118 | } | ||
119 | } | 114 | } |
120 | // connection ended. | 115 | // connection ended. |
121 | ep.conn.Close() | 116 | ep.conn.Close() |
@@ -123,29 +118,73 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) ( | |||
123 | return | 118 | return |
124 | } | 119 | } |
125 | 120 | ||
121 | // Read a transport message from endpoint based on extended protocol | ||
122 | func (ep *PaketEndpoint) read() (tm *TransportMessage, err error) { | ||
123 | // read next packet (assuming that it contains one complete message) | ||
124 | var n int | ||
125 | if n, _, err = ep.conn.ReadFrom(ep.buf); err != nil { | ||
126 | return | ||
127 | } | ||
128 | // parse transport message based on extended protocol | ||
129 | var ( | ||
130 | peer *util.PeerID | ||
131 | msg message.Message | ||
132 | ) | ||
133 | switch ep.addr.Network() { | ||
134 | case "ip+udp": | ||
135 | // parse peer id and message in sequence | ||
136 | peer = util.NewPeerID(ep.buf[:32]) | ||
137 | rdr := bytes.NewBuffer(util.Clone(ep.buf[32:n])) | ||
138 | if msg, err = ReadMessageDirect(rdr, ep.buf); err != nil { | ||
139 | return | ||
140 | } | ||
141 | default: | ||
142 | panic(ErrEndpProtocolUnknown) | ||
143 | } | ||
144 | // return transport message | ||
145 | return &TransportMessage{ | ||
146 | Peer: peer, | ||
147 | Msg: msg, | ||
148 | }, nil | ||
149 | } | ||
150 | |||
126 | // Send message to address from endpoint | 151 | // Send message to address from endpoint |
127 | func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) (err error) { | 152 | func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) (err error) { |
153 | // check for valid connection | ||
154 | if ep.conn == nil { | ||
155 | return ErrEndpNoConnection | ||
156 | } | ||
157 | // resolve target address | ||
128 | var a *net.UDPAddr | 158 | var a *net.UDPAddr |
129 | a, err = net.ResolveUDPAddr(addr.Network(), addr.String()) | 159 | a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()), addr.String()) |
160 | |||
161 | // get message content (TransportMessage) | ||
130 | var buf []byte | 162 | var buf []byte |
131 | if buf, err = msg.Bytes(); err != nil { | 163 | if buf, err = msg.Bytes(); err != nil { |
132 | return | 164 | return |
133 | } | 165 | } |
166 | // handle extended protocol: | ||
167 | switch ep.addr.Network() { | ||
168 | case "ip+udp": | ||
169 | // no modifications required | ||
170 | |||
171 | default: | ||
172 | // unknown protocol | ||
173 | return ErrEndpProtocolUnknown | ||
174 | } | ||
134 | _, err = ep.conn.WriteTo(buf, a) | 175 | _, err = ep.conn.WriteTo(buf, a) |
135 | return | 176 | return |
136 | } | 177 | } |
137 | 178 | ||
138 | // Address returms the | 179 | // Address returms the |
139 | func (ep *PaketEndpoint) Address() net.Addr { | 180 | func (ep *PaketEndpoint) Address() net.Addr { |
140 | if ep.conn != nil { | ||
141 | return ep.conn.LocalAddr() | ||
142 | } | ||
143 | return ep.addr | 181 | return ep.addr |
144 | } | 182 | } |
145 | 183 | ||
146 | // CanSendTo returns true if the endpoint can sent to address | 184 | // CanSendTo returns true if the endpoint can sent to address |
147 | func (ep *PaketEndpoint) CanSendTo(addr net.Addr) bool { | 185 | func (ep *PaketEndpoint) CanSendTo(addr net.Addr) (ok bool) { |
148 | return epMode(addr.Network()) == "packet" | 186 | ok = EpProtocol(addr.Network()) == EpProtocol(ep.addr.Network()) |
187 | return | ||
149 | } | 188 | } |
150 | 189 | ||
151 | // ID returns the endpoint identifier | 190 | // ID returns the endpoint identifier |
@@ -153,6 +192,7 @@ func (ep *PaketEndpoint) ID() int { | |||
153 | return ep.id | 192 | return ep.id |
154 | } | 193 | } |
155 | 194 | ||
195 | // create a new packet endpoint for protcol and address | ||
156 | func newPacketEndpoint(addr net.Addr) (ep *PaketEndpoint, err error) { | 196 | func newPacketEndpoint(addr net.Addr) (ep *PaketEndpoint, err error) { |
157 | // check for matching protocol | 197 | // check for matching protocol |
158 | if epMode(addr.Network()) != "packet" { | 198 | if epMode(addr.Network()) != "packet" { |
@@ -185,9 +225,13 @@ type StreamEndpoint struct { | |||
185 | func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) (err error) { | 225 | func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) (err error) { |
186 | // create listener | 226 | // create listener |
187 | var lc net.ListenConfig | 227 | var lc net.ListenConfig |
188 | if ep.listener, err = lc.Listen(ctx, ep.addr.Network(), ep.addr.String()); err != nil { | 228 | xproto := ep.addr.Network() |
229 | if ep.listener, err = lc.Listen(ctx, EpProtocol(xproto), ep.addr.String()); err != nil { | ||
189 | return | 230 | return |
190 | } | 231 | } |
232 | // get actual listening address | ||
233 | ep.addr = util.NewAddress(xproto, ep.listener.Addr().String()) | ||
234 | |||
191 | // run watch dog for termination | 235 | // run watch dog for termination |
192 | go func() { | 236 | go func() { |
193 | <-ctx.Done() | 237 | <-ctx.Done() |
@@ -206,21 +250,14 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) | |||
206 | go func() { | 250 | go func() { |
207 | for { | 251 | for { |
208 | // read next message from connection | 252 | // read next message from connection |
209 | msg, err := ReadMessage(ctx, conn, ep.buf) | 253 | tm, err := ep.read(ctx, conn) |
210 | if err != nil { | 254 | if err != nil { |
211 | break | 255 | break |
212 | } | 256 | } |
213 | // check for transport message | 257 | // send transport message to handler |
214 | if msg.Header().MsgType == message.DUMMY { | 258 | go func() { |
215 | // set transient attributes | 259 | hdlr <- tm |
216 | tm := msg.(*TransportMessage) | 260 | }() |
217 | tm.endp = ep.id | ||
218 | tm.conn = session | ||
219 | // send to handler | ||
220 | go func() { | ||
221 | hdlr <- tm | ||
222 | }() | ||
223 | } | ||
224 | } | 261 | } |
225 | // connection ended. | 262 | // connection ended. |
226 | conn.Close() | 263 | conn.Close() |
@@ -231,6 +268,34 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) | |||
231 | return | 268 | return |
232 | } | 269 | } |
233 | 270 | ||
271 | // Read a transport message from endpoint based on extended protocol | ||
272 | func (ep *StreamEndpoint) read(ctx context.Context, conn net.Conn) (tm *TransportMessage, err error) { | ||
273 | // parse transport message based on extended protocol | ||
274 | var ( | ||
275 | peer *util.PeerID | ||
276 | msg message.Message | ||
277 | ) | ||
278 | switch ep.addr.Network() { | ||
279 | case "ip+udp": | ||
280 | // parse peer id | ||
281 | peer = util.NewPeerID(nil) | ||
282 | if _, err = conn.Read(peer.Key); err != nil { | ||
283 | return | ||
284 | } | ||
285 | // read next message from connection | ||
286 | if msg, err = ReadMessage(ctx, conn, ep.buf); err != nil { | ||
287 | break | ||
288 | } | ||
289 | default: | ||
290 | panic(ErrEndpProtocolUnknown) | ||
291 | } | ||
292 | // return transport message | ||
293 | return &TransportMessage{ | ||
294 | Peer: peer, | ||
295 | Msg: msg, | ||
296 | }, nil | ||
297 | } | ||
298 | |||
234 | // Send message to address from endpoint | 299 | // Send message to address from endpoint |
235 | func (ep *StreamEndpoint) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) error { | 300 | func (ep *StreamEndpoint) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) error { |
236 | return nil | 301 | return nil |
@@ -238,9 +303,6 @@ func (ep *StreamEndpoint) Send(ctx context.Context, addr net.Addr, msg *Transpor | |||
238 | 303 | ||
239 | // Address returns the actual listening endpoint address | 304 | // Address returns the actual listening endpoint address |
240 | func (ep *StreamEndpoint) Address() net.Addr { | 305 | func (ep *StreamEndpoint) Address() net.Addr { |
241 | if ep.listener != nil { | ||
242 | return ep.listener.Addr() | ||
243 | } | ||
244 | return ep.addr | 306 | return ep.addr |
245 | } | 307 | } |
246 | 308 | ||
@@ -254,6 +316,7 @@ func (ep *StreamEndpoint) ID() int { | |||
254 | return ep.id | 316 | return ep.id |
255 | } | 317 | } |
256 | 318 | ||
319 | // create a new endpoint based on extended protocol and address | ||
257 | func newStreamEndpoint(addr net.Addr) (ep *StreamEndpoint, err error) { | 320 | func newStreamEndpoint(addr net.Addr) (ep *StreamEndpoint, err error) { |
258 | // check for matching protocol | 321 | // check for matching protocol |
259 | if epMode(addr.Network()) != "stream" { | 322 | if epMode(addr.Network()) != "stream" { |
@@ -270,10 +333,29 @@ func newStreamEndpoint(addr net.Addr) (ep *StreamEndpoint, err error) { | |||
270 | return | 333 | return |
271 | } | 334 | } |
272 | 335 | ||
336 | //---------------------------------------------------------------------- | ||
337 | // derive endpoint mode (packet/stream) and transport protocol from | ||
338 | // net.Adddr.Network() strings | ||
339 | //---------------------------------------------------------------------- | ||
340 | |||
341 | // EpProtocol returns the transport protocol for a given network string | ||
342 | // that can include extended protocol information like "r5n+ip+udp" | ||
343 | func EpProtocol(netw string) string { | ||
344 | switch netw { | ||
345 | case "udp", "udp4", "udp6", "ip+udp": | ||
346 | return "udp" | ||
347 | case "tcp", "tcp4", "tcp6": | ||
348 | return "tcp" | ||
349 | case "unix": | ||
350 | return "unix" | ||
351 | } | ||
352 | return "" | ||
353 | } | ||
354 | |||
273 | // epMode returns the endpoint mode (packet or stream) for a given network | 355 | // epMode returns the endpoint mode (packet or stream) for a given network |
274 | func epMode(netw string) string { | 356 | func epMode(netw string) string { |
275 | switch netw { | 357 | switch EpProtocol(netw) { |
276 | case "udp", "udp4", "udp6", "r5n+ip+udp": | 358 | case "udp": |
277 | return "packet" | 359 | return "packet" |
278 | case "tcp", "unix": | 360 | case "tcp", "unix": |
279 | return "stream" | 361 | return "stream" |
diff --git a/src/gnunet/transport/reader_writer.go b/src/gnunet/transport/reader_writer.go index 2e5f14a..db3527e 100644 --- a/src/gnunet/transport/reader_writer.go +++ b/src/gnunet/transport/reader_writer.go | |||
@@ -113,10 +113,7 @@ func ReadMessage(ctx context.Context, rdr io.ReadCloser, buf []byte) (msg messag | |||
113 | if err = get(4, int(mh.MsgSize)-4); err != nil { | 113 | if err = get(4, int(mh.MsgSize)-4); err != nil { |
114 | return nil, err | 114 | return nil, err |
115 | } | 115 | } |
116 | // handle transport message case | 116 | if msg, err = message.NewEmptyMessage(mh.MsgType); err != nil { |
117 | if mh.MsgType == message.DUMMY { | ||
118 | msg = NewTransportMessage(nil, nil) | ||
119 | } else if msg, err = message.NewEmptyMessage(mh.MsgType); err != nil { | ||
120 | return nil, err | 117 | return nil, err |
121 | } | 118 | } |
122 | if msg == nil { | 119 | if msg == nil { |
diff --git a/src/gnunet/transport/transport.go b/src/gnunet/transport/transport.go index 14def98..d1e8249 100644 --- a/src/gnunet/transport/transport.go +++ b/src/gnunet/transport/transport.go | |||
@@ -25,6 +25,8 @@ import ( | |||
25 | "gnunet/message" | 25 | "gnunet/message" |
26 | "gnunet/util" | 26 | "gnunet/util" |
27 | "net" | 27 | "net" |
28 | |||
29 | "github.com/bfix/gospel/network" | ||
28 | ) | 30 | ) |
29 | 31 | ||
30 | // Trnsport layer error codes | 32 | // Trnsport layer error codes |
@@ -41,32 +43,19 @@ var ( | |||
41 | // Msg is the exchanged GNUnet message. The packet itself satisfies the | 43 | // Msg is the exchanged GNUnet message. The packet itself satisfies the |
42 | // message.Message interface. | 44 | // message.Message interface. |
43 | type TransportMessage struct { | 45 | type TransportMessage struct { |
44 | Hdr *message.Header `` // message header | 46 | Peer *util.PeerID // remote peer |
45 | Peer *util.PeerID `` // remote peer | 47 | Msg message.Message // GNUnet message |
46 | Payload []byte `size:"*"` // GNUnet message | ||
47 | |||
48 | // package-local attributes (transient) | ||
49 | msg message.Message | ||
50 | endp int // id of endpoint (incoming message) | ||
51 | conn int // id of connection (optional, incoming message) | ||
52 | } | ||
53 | |||
54 | func (msg *TransportMessage) Header() *message.Header { | ||
55 | return msg.Hdr | ||
56 | } | ||
57 | |||
58 | func (msg *TransportMessage) Message() (m message.Message, err error) { | ||
59 | if m = msg.msg; m == nil { | ||
60 | rdr := bytes.NewBuffer(msg.Payload) | ||
61 | m, err = ReadMessageDirect(rdr, nil) | ||
62 | } | ||
63 | return | ||
64 | } | 48 | } |
65 | 49 | ||
66 | // Bytes returns the binary representation of a transport message | 50 | // Bytes returns the binary representation of a transport message |
67 | func (msg *TransportMessage) Bytes() ([]byte, error) { | 51 | func (msg *TransportMessage) Bytes() ([]byte, error) { |
68 | buf := new(bytes.Buffer) | 52 | buf := new(bytes.Buffer) |
69 | err := WriteMessageDirect(buf, msg) | 53 | // serialize peer id |
54 | if _, err := buf.Write(msg.Peer.Key); err != nil { | ||
55 | return nil, err | ||
56 | } | ||
57 | // serialize message | ||
58 | err := WriteMessageDirect(buf, msg.Msg) | ||
70 | return buf.Bytes(), err | 59 | return buf.Bytes(), err |
71 | } | 60 | } |
72 | 61 | ||
@@ -76,21 +65,13 @@ func (msg *TransportMessage) String() string { | |||
76 | } | 65 | } |
77 | 66 | ||
78 | // NewTransportMessage creates a message suitable for transfer | 67 | // NewTransportMessage creates a message suitable for transfer |
79 | func NewTransportMessage(peer *util.PeerID, payload []byte) (tm *TransportMessage) { | 68 | func NewTransportMessage(peer *util.PeerID, msg message.Message) (tm *TransportMessage) { |
80 | if peer == nil { | 69 | if peer == nil { |
81 | peer = util.NewPeerID(nil) | 70 | peer = util.NewPeerID(nil) |
82 | } | 71 | } |
83 | msize := 0 | ||
84 | if payload != nil { | ||
85 | msize = len(payload) | ||
86 | } | ||
87 | tm = &TransportMessage{ | 72 | tm = &TransportMessage{ |
88 | Hdr: &message.Header{ | 73 | Peer: peer, |
89 | MsgSize: uint16(36 + msize), | 74 | Msg: msg, |
90 | MsgType: message.DUMMY, | ||
91 | }, | ||
92 | Peer: peer, | ||
93 | Payload: payload, | ||
94 | } | 75 | } |
95 | return | 76 | return |
96 | } | 77 | } |
@@ -100,28 +81,41 @@ func NewTransportMessage(peer *util.PeerID, payload []byte) (tm *TransportMessag | |||
100 | // Transport enables network-oriented (like IP, UDP, TCP or UDS) | 81 | // Transport enables network-oriented (like IP, UDP, TCP or UDS) |
101 | // message exchange on multiple endpoints. | 82 | // message exchange on multiple endpoints. |
102 | type Transport struct { | 83 | type Transport struct { |
103 | incoming chan *TransportMessage // messages as received from the network | 84 | incoming chan *TransportMessage // messages as received from the network |
104 | endpoints map[int]Endpoint // list of available endpoints | 85 | endpoints *util.Map[int, Endpoint] // list of available endpoints |
86 | upnp *network.PortMapper // UPnP mapper (optional) | ||
105 | } | 87 | } |
106 | 88 | ||
107 | // NewTransport creates and runs a new transport layer implementation. | 89 | // NewTransport creates and runs a new transport layer implementation. |
108 | func NewTransport(ctx context.Context, ch chan *TransportMessage) (t *Transport) { | 90 | func NewTransport(ctx context.Context, tag string, ch chan *TransportMessage) (t *Transport) { |
109 | // create transport instance | 91 | // create transport instance |
92 | mngr, err := network.NewPortMapper(tag) | ||
93 | if err != nil { | ||
94 | mngr = nil | ||
95 | } | ||
110 | return &Transport{ | 96 | return &Transport{ |
111 | incoming: ch, | 97 | incoming: ch, |
112 | endpoints: make(map[int]Endpoint), | 98 | endpoints: util.NewMap[int, Endpoint](), |
99 | upnp: mngr, | ||
100 | } | ||
101 | } | ||
102 | |||
103 | // Shutdown transport-related processes | ||
104 | func (t *Transport) Shutdown() { | ||
105 | if t.upnp != nil { | ||
106 | t.upnp.Close() | ||
113 | } | 107 | } |
114 | } | 108 | } |
115 | 109 | ||
116 | // Send a message over suitable endpoint | 110 | // Send a message over suitable endpoint |
117 | func (t *Transport) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) (err error) { | 111 | func (t *Transport) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) (err error) { |
118 | for _, ep := range t.endpoints { | 112 | // use the first endpoint able to handle address |
113 | return t.endpoints.ProcessRange(func(_ int, ep Endpoint) error { | ||
119 | if ep.CanSendTo(addr) { | 114 | if ep.CanSendTo(addr) { |
120 | err = ep.Send(ctx, addr, msg) | 115 | return ep.Send(ctx, addr, msg) |
121 | break | ||
122 | } | 116 | } |
123 | } | 117 | return nil |
124 | return | 118 | }, true) |
125 | } | 119 | } |
126 | 120 | ||
127 | //---------------------------------------------------------------------- | 121 | //---------------------------------------------------------------------- |
@@ -130,22 +124,45 @@ func (t *Transport) Send(ctx context.Context, addr net.Addr, msg *TransportMessa | |||
130 | 124 | ||
131 | // AddEndpoint instantiates and run a new endpoint handler for the | 125 | // AddEndpoint instantiates and run a new endpoint handler for the |
132 | // given address (must map to a network interface). | 126 | // given address (must map to a network interface). |
133 | func (t *Transport) AddEndpoint(ctx context.Context, addr net.Addr) (a net.Addr, err error) { | 127 | func (t *Transport) AddEndpoint(ctx context.Context, addr *util.Address) (ep Endpoint, err error) { |
134 | // register endpoint | 128 | // check for valid address |
135 | var ep Endpoint | 129 | if addr == nil { |
130 | err = ErrEndpNoAddress | ||
131 | return | ||
132 | } | ||
133 | // check if endpoint is already available | ||
134 | as := addr.Network() + "://" + addr.String() | ||
135 | if err = t.endpoints.ProcessRange(func(_ int, ep Endpoint) error { | ||
136 | ae := ep.Address().Network() + "://" + ep.Address().String() | ||
137 | if as == ae { | ||
138 | return ErrEndpExists | ||
139 | } | ||
140 | return nil | ||
141 | }, true); err != nil { | ||
142 | return | ||
143 | } | ||
144 | // register new endpoint | ||
136 | if ep, err = NewEndpoint(addr); err != nil { | 145 | if ep, err = NewEndpoint(addr); err != nil { |
137 | return | 146 | return |
138 | } | 147 | } |
139 | t.endpoints[ep.ID()] = ep | 148 | // add endpoint to list and run it |
149 | t.endpoints.Put(ep.ID(), ep) | ||
140 | ep.Run(ctx, t.incoming) | 150 | ep.Run(ctx, t.incoming) |
141 | return ep.Address(), nil | 151 | return |
142 | } | 152 | } |
143 | 153 | ||
144 | // Endpoints returns a list of listening addresses managed by transport. | 154 | //---------------------------------------------------------------------- |
145 | func (t *Transport) Endpoints() (list []net.Addr) { | 155 | // UPnP handling |
146 | list = make([]net.Addr, 0) | 156 | //---------------------------------------------------------------------- |
147 | for _, ep := range t.endpoints { | 157 | |
148 | list = append(list, ep.Address()) | 158 | // ForwardOpen returns a local address for listening that will receive traffic |
149 | } | 159 | // from a port forward handled by UPnP on the router. |
150 | return | 160 | func (t *Transport) ForwardOpen(protocol, param string, port int) (id, local, remote string, err error) { |
161 | // no parameters currently defined, so just do the assignment. | ||
162 | return t.upnp.Assign(protocol, port) | ||
163 | } | ||
164 | |||
165 | // ForwardClose closes a specific port forwarding | ||
166 | func (t *Transport) ForwardClose(id string) error { | ||
167 | return t.upnp.Unassign(id) | ||
151 | } | 168 | } |
diff --git a/src/gnunet/util/address.go b/src/gnunet/util/address.go index 106e671..a56e95f 100644 --- a/src/gnunet/util/address.go +++ b/src/gnunet/util/address.go | |||
@@ -34,11 +34,11 @@ type Address struct { | |||
34 | } | 34 | } |
35 | 35 | ||
36 | // NewAddress returns a new Address for the given transport and specs | 36 | // NewAddress returns a new Address for the given transport and specs |
37 | func NewAddress(transport string, addr []byte) *Address { | 37 | func NewAddress(transport string, addr string) *Address { |
38 | return &Address{ | 38 | return &Address{ |
39 | Netw: transport, | 39 | Netw: transport, |
40 | Options: 0, | 40 | Options: 0, |
41 | Address: Clone(addr), | 41 | Address: Clone([]byte(addr)), |
42 | Expires: AbsoluteTimeNever(), | 42 | Expires: AbsoluteTimeNever(), |
43 | } | 43 | } |
44 | } | 44 | } |
@@ -61,7 +61,7 @@ func ParseAddress(s string) (addr *Address, err error) { | |||
61 | err = fmt.Errorf("invalid address format: '%s'", s) | 61 | err = fmt.Errorf("invalid address format: '%s'", s) |
62 | return | 62 | return |
63 | } | 63 | } |
64 | addr = NewAddress(p[0], []byte(strings.Trim(p[1], "/"))) | 64 | addr = NewAddress(p[0], strings.Trim(p[1], "/")) |
65 | return | 65 | return |
66 | } | 66 | } |
67 | 67 | ||
@@ -91,8 +91,11 @@ func (a *Address) Network() string { | |||
91 | 91 | ||
92 | //---------------------------------------------------------------------- | 92 | //---------------------------------------------------------------------- |
93 | 93 | ||
94 | // AddressString returns a string representaion of an address. | 94 | // URI returns a string representaion of an address. |
95 | func AddressString(network string, addr []byte) string { | 95 | func (a *Address) URI() string { |
96 | return URI(a.Netw, a.Address) | ||
97 | } | ||
98 | func URI(network string, addr []byte) string { | ||
96 | return network + "://" + string(addr) | 99 | return network + "://" + string(addr) |
97 | } | 100 | } |
98 | 101 | ||
@@ -151,13 +154,13 @@ func (a *PeerAddrList) Add(id string, addr *Address) (mode int) { | |||
151 | list = append(list, addr) | 154 | list = append(list, addr) |
152 | a.list.Put(id, list) | 155 | a.list.Put(id, list) |
153 | return nil | 156 | return nil |
154 | }) | 157 | }, false) |
155 | } | 158 | } |
156 | return | 159 | return |
157 | } | 160 | } |
158 | 161 | ||
159 | // Get address for peer | 162 | // Get address for peer |
160 | func (a *PeerAddrList) Get(id string, transport string) *Address { | 163 | func (a *PeerAddrList) Get(id string, transport string) (res []*Address) { |
161 | list, ok := a.list.Get(id) | 164 | list, ok := a.list.Get(id) |
162 | if ok { | 165 | if ok { |
163 | for _, addr := range list { | 166 | for _, addr := range list { |
@@ -171,10 +174,10 @@ func (a *PeerAddrList) Get(id string, transport string) *Address { | |||
171 | // skip other transports | 174 | // skip other transports |
172 | continue | 175 | continue |
173 | } | 176 | } |
174 | return addr | 177 | res = append(res, addr) |
175 | } | 178 | } |
176 | } | 179 | } |
177 | return nil | 180 | return |
178 | } | 181 | } |
179 | 182 | ||
180 | // Delete a list entry by key. | 183 | // Delete a list entry by key. |
diff --git a/src/gnunet/util/database.go b/src/gnunet/util/database.go index a1198fd..f1d1e5f 100644 --- a/src/gnunet/util/database.go +++ b/src/gnunet/util/database.go | |||
@@ -121,7 +121,7 @@ func (p *dbPool) remove(key string) error { | |||
121 | p.insts.Delete(key) | 121 | p.insts.Delete(key) |
122 | } | 122 | } |
123 | return | 123 | return |
124 | }) | 124 | }, false) |
125 | } | 125 | } |
126 | 126 | ||
127 | // Connect to a SQL database (various types and flavors): | 127 | // Connect to a SQL database (various types and flavors): |
@@ -180,6 +180,6 @@ func (p *dbPool) Connect(spec string) (db *DbConn, err error) { | |||
180 | db = new(DbConn) | 180 | db = new(DbConn) |
181 | db.conn, err = inst.db.Conn(p.ctx) | 181 | db.conn, err = inst.db.Conn(p.ctx) |
182 | return err | 182 | return err |
183 | }) | 183 | }, false) |
184 | return | 184 | return |
185 | } | 185 | } |
diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go index 7240757..4443737 100644 --- a/src/gnunet/util/misc.go +++ b/src/gnunet/util/misc.go | |||
@@ -70,43 +70,87 @@ func NewMap[K comparable, V any]() *Map[K, V] { | |||
70 | } | 70 | } |
71 | } | 71 | } |
72 | 72 | ||
73 | //---------------------------------------------------------------------- | ||
74 | |||
73 | // Process a function in the locked map context. Calls | 75 | // Process a function in the locked map context. Calls |
74 | // to other map functions in 'f' will use additional locks. | 76 | // to other map functions in 'f' will skip their locks. |
75 | func (m *Map[K, V]) Process(f func() error) error { | 77 | func (m *Map[K, V]) Process(f func() error, readonly bool) error { |
76 | m.mtx.Lock() | 78 | // handle locking |
77 | defer m.mtx.Unlock() | 79 | m.lock(readonly) |
80 | m.inProcess = true | ||
81 | defer func() { | ||
82 | m.inProcess = false | ||
83 | m.unlock(readonly) | ||
84 | }() | ||
85 | // function call in unlocked environment | ||
86 | return f() | ||
87 | } | ||
88 | |||
89 | // Process a ranged function in the locked map context. Calls | ||
90 | // to other map functions in 'f' will skip their locks. | ||
91 | func (m *Map[K, V]) ProcessRange(f func(key K, value V) error, readonly bool) error { | ||
92 | // handle locking | ||
93 | m.lock(readonly) | ||
78 | m.inProcess = true | 94 | m.inProcess = true |
79 | err := f() | 95 | defer func() { |
80 | m.inProcess = false | 96 | m.inProcess = false |
81 | return err | 97 | m.unlock(readonly) |
98 | }() | ||
99 | // range over map and call function. | ||
100 | for key, value := range m.list { | ||
101 | if err := f(key, value); err != nil { | ||
102 | return err | ||
103 | } | ||
104 | } | ||
105 | return nil | ||
82 | } | 106 | } |
83 | 107 | ||
108 | //---------------------------------------------------------------------- | ||
109 | |||
84 | // Put value into map under given key. | 110 | // Put value into map under given key. |
85 | func (m *Map[K, V]) Put(key K, value V) { | 111 | func (m *Map[K, V]) Put(key K, value V) { |
86 | if !m.inProcess { | 112 | m.lock(false) |
87 | m.mtx.Lock() | 113 | defer m.unlock(false) |
88 | defer m.mtx.Unlock() | ||
89 | } | ||
90 | m.list[key] = value | 114 | m.list[key] = value |
91 | } | 115 | } |
92 | 116 | ||
93 | // Get value with iven key from map. | 117 | // Get value with iven key from map. |
94 | func (m *Map[K, V]) Get(key K) (value V, ok bool) { | 118 | func (m *Map[K, V]) Get(key K) (value V, ok bool) { |
95 | if !m.inProcess { | 119 | m.lock(true) |
96 | m.mtx.RLock() | 120 | defer m.unlock(true) |
97 | defer m.mtx.RUnlock() | ||
98 | } | ||
99 | value, ok = m.list[key] | 121 | value, ok = m.list[key] |
100 | return | 122 | return |
101 | } | 123 | } |
102 | 124 | ||
103 | // Delete key/value pair from map. | 125 | // Delete key/value pair from map. |
104 | func (m *Map[K, V]) Delete(key K) { | 126 | func (m *Map[K, V]) Delete(key K) { |
127 | m.lock(false) | ||
128 | defer m.unlock(false) | ||
129 | delete(m.list, key) | ||
130 | } | ||
131 | |||
132 | //---------------------------------------------------------------------- | ||
133 | |||
134 | // lock with given mode (if not in processing function) | ||
135 | func (m *Map[K, V]) lock(readonly bool) { | ||
105 | if !m.inProcess { | 136 | if !m.inProcess { |
106 | m.mtx.Lock() | 137 | if readonly { |
107 | defer m.mtx.Unlock() | 138 | m.mtx.RLock() |
139 | } else { | ||
140 | m.mtx.Lock() | ||
141 | } | ||
142 | } | ||
143 | } | ||
144 | |||
145 | // lock with given mode (if not in processing function) | ||
146 | func (m *Map[K, V]) unlock(readonly bool) { | ||
147 | if !m.inProcess { | ||
148 | if readonly { | ||
149 | m.mtx.RUnlock() | ||
150 | } else { | ||
151 | m.mtx.Unlock() | ||
152 | } | ||
108 | } | 153 | } |
109 | delete(m.list, key) | ||
110 | } | 154 | } |
111 | 155 | ||
112 | //---------------------------------------------------------------------- | 156 | //---------------------------------------------------------------------- |
diff --git a/src/gnunet/util/peer_id.go b/src/gnunet/util/peer_id.go index a8202a1..a74958d 100644 --- a/src/gnunet/util/peer_id.go +++ b/src/gnunet/util/peer_id.go | |||
@@ -25,23 +25,19 @@ type PeerID struct { | |||
25 | Key []byte `size:"32"` | 25 | Key []byte `size:"32"` |
26 | } | 26 | } |
27 | 27 | ||
28 | // NewPeerID creates a new object from the data. | 28 | // NewPeerID creates a new peer id from data. |
29 | func NewPeerID(data []byte) *PeerID { | 29 | func NewPeerID(data []byte) (p *PeerID) { |
30 | if data == nil { | 30 | p = &PeerID{ |
31 | data = make([]byte, 32) | 31 | Key: make([]byte, 32), |
32 | } else { | ||
33 | size := len(data) | ||
34 | if size > 32 { | ||
35 | data = data[:32] | ||
36 | } else if size < 32 { | ||
37 | buf := make([]byte, 32) | ||
38 | CopyAlignedBlock(buf, data) | ||
39 | data = buf | ||
40 | } | ||
41 | } | 32 | } |
42 | return &PeerID{ | 33 | if data != nil { |
43 | Key: data, | 34 | if len(data) < 32 { |
35 | CopyAlignedBlock(p.Key, data) | ||
36 | } else { | ||
37 | copy(p.Key, data[:32]) | ||
38 | } | ||
44 | } | 39 | } |
40 | return | ||
45 | } | 41 | } |
46 | 42 | ||
47 | // Equals returns true if two peer IDs match. | 43 | // Equals returns true if two peer IDs match. |