diff options
author | Bernd Fix <brf@hoi-polloi.org> | 2022-08-15 14:02:05 +0200 |
---|---|---|
committer | Bernd Fix <brf@hoi-polloi.org> | 2022-08-15 14:02:05 +0200 |
commit | b62071330cd7e0445e89660d07b7aed098f80285 (patch) | |
tree | 85c1bbe055dff8bf027459121dc2fb958a846af4 | |
parent | 835c8e8b45487c1276426034b5afb915853b6eb1 (diff) | |
download | gnunet-go-b62071330cd7e0445e89660d07b7aed098f80285.tar.gz gnunet-go-b62071330cd7e0445e89660d07b7aed098f80285.zip |
Milestone 2+3 (NLnet funding)v0.1.30
34 files changed, 1010 insertions, 445 deletions
diff --git a/src/gnunet/cmd/gnunet-service-dht-go/main.go b/src/gnunet/cmd/gnunet-service-dht-go/main.go index d8244d9..28d2100 100644 --- a/src/gnunet/cmd/gnunet-service-dht-go/main.go +++ b/src/gnunet/cmd/gnunet-service-dht-go/main.go | |||
@@ -137,7 +137,7 @@ func main() { | |||
137 | // check for HELLO URL | 137 | // check for HELLO URL |
138 | if strings.HasPrefix(bs, "gnunet://hello/") { | 138 | if strings.HasPrefix(bs, "gnunet://hello/") { |
139 | var hb *blocks.HelloBlock | 139 | var hb *blocks.HelloBlock |
140 | if hb, err = blocks.ParseHelloURL(bs, true); err != nil { | 140 | if hb, err = blocks.ParseHelloBlockFromURL(bs, true); err != nil { |
141 | logger.Printf(logger.ERROR, "[dht] failed bootstrap HELLO URL %s: %s", bs, err.Error()) | 141 | logger.Printf(logger.ERROR, "[dht] failed bootstrap HELLO URL %s: %s", bs, err.Error()) |
142 | continue | 142 | continue |
143 | } | 143 | } |
diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go index cfbf705..897926f 100644 --- a/src/gnunet/config/config.go +++ b/src/gnunet/config/config.go | |||
@@ -90,9 +90,9 @@ type ServiceConfig struct { | |||
90 | 90 | ||
91 | // GNSConfig contains parameters for the GNU Name System service | 91 | // GNSConfig contains parameters for the GNU Name System service |
92 | type GNSConfig struct { | 92 | type GNSConfig struct { |
93 | Service *ServiceConfig `json:"service"` // socket for GNS service | 93 | Service *ServiceConfig `json:"service"` // socket for GNS service |
94 | DHTReplLevel int `json:"dhtReplLevel"` // DHT replication level | 94 | ReplLevel int `json:"replLevel"` // DHT replication level |
95 | MaxDepth int `json:"maxDepth"` // maximum recursion depth in resolution | 95 | MaxDepth int `json:"maxDepth"` // maximum recursion depth in resolution |
96 | } | 96 | } |
97 | 97 | ||
98 | //---------------------------------------------------------------------- | 98 | //---------------------------------------------------------------------- |
@@ -109,7 +109,8 @@ type DHTConfig struct { | |||
109 | 109 | ||
110 | // RoutingConfig holds parameters for routing tables | 110 | // RoutingConfig holds parameters for routing tables |
111 | type RoutingConfig struct { | 111 | type RoutingConfig struct { |
112 | PeerTTL int `json:"peerTTL"` // time-out for peers in table | 112 | PeerTTL int `json:"peerTTL"` // time-out for peers in table |
113 | ReplLevel int `json:"replLevel"` // replication level | ||
113 | } | 114 | } |
114 | 115 | ||
115 | //---------------------------------------------------------------------- | 116 | //---------------------------------------------------------------------- |
diff --git a/src/gnunet/config/gnunet-config.json b/src/gnunet/config/gnunet-config.json index 27678ab..2052b33 100644 --- a/src/gnunet/config/gnunet-config.json +++ b/src/gnunet/config/gnunet-config.json | |||
@@ -37,7 +37,8 @@ | |||
37 | "maxGB": 10 | 37 | "maxGB": 10 |
38 | }, | 38 | }, |
39 | "routing": { | 39 | "routing": { |
40 | "peerTTL": 10800 | 40 | "peerTTL": 10800, |
41 | "replLevel": 5 | ||
41 | }, | 42 | }, |
42 | "heartbeat": 900 | 43 | "heartbeat": 900 |
43 | }, | 44 | }, |
@@ -48,7 +49,7 @@ | |||
48 | "perm": "0770" | 49 | "perm": "0770" |
49 | } | 50 | } |
50 | }, | 51 | }, |
51 | "dhtReplLevel": 10, | 52 | "replLevel": 10, |
52 | "maxDepth": 250 | 53 | "maxDepth": 250 |
53 | }, | 54 | }, |
54 | "namecache": { | 55 | "namecache": { |
diff --git a/src/gnunet/core/hello_test.go b/src/gnunet/core/hello_test.go index 0590c90..211250c 100644 --- a/src/gnunet/core/hello_test.go +++ b/src/gnunet/core/hello_test.go | |||
@@ -64,7 +64,7 @@ var ( | |||
64 | 64 | ||
65 | func TestHelloURLDirect(t *testing.T) { | 65 | func TestHelloURLDirect(t *testing.T) { |
66 | for _, hu := range helloURL { | 66 | for _, hu := range helloURL { |
67 | if _, err := blocks.ParseHelloURL(hu, false); err != nil { | 67 | if _, err := blocks.ParseHelloBlockFromURL(hu, false); err != nil { |
68 | t.Fatal(err) | 68 | t.Fatal(err) |
69 | } | 69 | } |
70 | } | 70 | } |
@@ -93,7 +93,7 @@ func TestHelloURL(t *testing.T) { | |||
93 | 93 | ||
94 | // convert to and from HELLO URL | 94 | // convert to and from HELLO URL |
95 | url1 := hd.URL() | 95 | url1 := hd.URL() |
96 | hd2, err := blocks.ParseHelloURL(url1, true) | 96 | hd2, err := blocks.ParseHelloBlockFromURL(url1, true) |
97 | if err != nil { | 97 | if err != nil { |
98 | t.Fatal(err) | 98 | t.Fatal(err) |
99 | } | 99 | } |
diff --git a/src/gnunet/core/peer_test.go b/src/gnunet/core/peer_test.go index 1be4d42..29fe801 100644 --- a/src/gnunet/core/peer_test.go +++ b/src/gnunet/core/peer_test.go | |||
@@ -67,7 +67,7 @@ func TestPeerHello(t *testing.T) { | |||
67 | // convert to URL and back | 67 | // convert to URL and back |
68 | u := h.URL() | 68 | u := h.URL() |
69 | t.Log(u) | 69 | t.Log(u) |
70 | h2, err := blocks.ParseHelloURL(u, true) | 70 | h2, err := blocks.ParseHelloBlockFromURL(u, true) |
71 | if err != nil { | 71 | if err != nil { |
72 | t.Fatal(err) | 72 | t.Fatal(err) |
73 | } | 73 | } |
diff --git a/src/gnunet/crypto/gns.go b/src/gnunet/crypto/gns.go index 27a52d3..39eb8c5 100644 --- a/src/gnunet/crypto/gns.go +++ b/src/gnunet/crypto/gns.go | |||
@@ -417,7 +417,9 @@ func NewZoneSignature(d []byte) (sig *ZoneSignature, err error) { | |||
417 | } | 417 | } |
418 | // set signature implementation | 418 | // set signature implementation |
419 | zs := impl.NewSignature() | 419 | zs := impl.NewSignature() |
420 | err = zs.Init(sig.Signature) | 420 | if err = zs.Init(sig.Signature); err != nil { |
421 | return | ||
422 | } | ||
421 | sig.impl = zs | 423 | sig.impl = zs |
422 | // set public key implementation | 424 | // set public key implementation |
423 | zk := impl.NewPublic() | 425 | zk := impl.NewPublic() |
diff --git a/src/gnunet/crypto/hash.go b/src/gnunet/crypto/hash.go index 49e57ff..a5716a0 100644 --- a/src/gnunet/crypto/hash.go +++ b/src/gnunet/crypto/hash.go | |||
@@ -58,7 +58,7 @@ func NewHashCode(data []byte) *HashCode { | |||
58 | hc := new(HashCode) | 58 | hc := new(HashCode) |
59 | size := hc.Size() | 59 | size := hc.Size() |
60 | v := make([]byte, size) | 60 | v := make([]byte, size) |
61 | if data != nil && len(data) > 0 { | 61 | if len(data) > 0 { |
62 | if uint(len(data)) < size { | 62 | if uint(len(data)) < size { |
63 | util.CopyAlignedBlock(v, data) | 63 | util.CopyAlignedBlock(v, data) |
64 | } else { | 64 | } else { |
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod index 8a5db53..9e01d7c 100644 --- a/src/gnunet/go.mod +++ b/src/gnunet/go.mod | |||
@@ -3,7 +3,7 @@ module gnunet | |||
3 | go 1.18 | 3 | go 1.18 |
4 | 4 | ||
5 | require ( | 5 | require ( |
6 | github.com/bfix/gospel v1.2.17 | 6 | github.com/bfix/gospel v1.2.18 |
7 | github.com/go-redis/redis/v8 v8.11.5 | 7 | github.com/go-redis/redis/v8 v8.11.5 |
8 | github.com/go-sql-driver/mysql v1.6.0 | 8 | github.com/go-sql-driver/mysql v1.6.0 |
9 | github.com/gorilla/mux v1.8.0 | 9 | github.com/gorilla/mux v1.8.0 |
@@ -24,4 +24,4 @@ require ( | |||
24 | golang.org/x/tools v0.1.11 // indirect | 24 | golang.org/x/tools v0.1.11 // indirect |
25 | ) | 25 | ) |
26 | 26 | ||
27 | // replace github.com/bfix/gospel v1.2.17 => ../gospel | 27 | //replace github.com/bfix/gospel v1.2.18 => ../gospel |
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum index 22c7e09..2a2a36a 100644 --- a/src/gnunet/go.sum +++ b/src/gnunet/go.sum | |||
@@ -1,5 +1,5 @@ | |||
1 | github.com/bfix/gospel v1.2.17 h1:Stvm+OiCA2GIWIhI/HKc6uaLDMtrJNxXgw/g+v9witw= | 1 | github.com/bfix/gospel v1.2.18 h1:X9hYudt5dvjYTGGmKC4T7qcLdb7ORblVD4kAC/ZYXdU= |
2 | github.com/bfix/gospel v1.2.17/go.mod h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI= | 2 | github.com/bfix/gospel v1.2.18/go.mod h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI= |
3 | github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= | 3 | github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= |
4 | github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= | 4 | github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= |
5 | github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= | 5 | github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= |
diff --git a/src/gnunet/message/msg_dht_p2p.go b/src/gnunet/message/msg_dht_p2p.go index ccc0406..638ff16 100644 --- a/src/gnunet/message/msg_dht_p2p.go +++ b/src/gnunet/message/msg_dht_p2p.go | |||
@@ -113,20 +113,20 @@ func (m *DHTP2PGetMsg) Update(pf *blocks.PeerFilter, rf blocks.ResultFilter, hop | |||
113 | 113 | ||
114 | // DHTP2PPutMsg wire layout | 114 | // DHTP2PPutMsg wire layout |
115 | type DHTP2PPutMsg struct { | 115 | type DHTP2PPutMsg struct { |
116 | MsgSize uint16 `order:"big"` // total size of message | 116 | MsgSize uint16 `order:"big"` // total size of message |
117 | MsgType uint16 `order:"big"` // DHT_P2P_PUT (146) | 117 | MsgType uint16 `order:"big"` // DHT_P2P_PUT (146) |
118 | BType uint32 `order:"big"` // block type | 118 | BType uint32 `order:"big"` // block type |
119 | Flags uint16 `order:"big"` // processing flags | 119 | Flags uint16 `order:"big"` // processing flags |
120 | HopCount uint16 `order:"big"` // message hops | 120 | HopCount uint16 `order:"big"` // message hops |
121 | ReplLvl uint16 `order:"big"` // replication level | 121 | ReplLvl uint16 `order:"big"` // replication level |
122 | PathL uint16 `order:"big"` // path length | 122 | PathL uint16 `order:"big"` // path length |
123 | Expiration util.AbsoluteTime `` // expiration date | 123 | Expiration util.AbsoluteTime `` // expiration date |
124 | PeerFilter *blocks.PeerFilter `` // peer bloomfilter | 124 | PeerFilter *blocks.PeerFilter `` // peer bloomfilter |
125 | Key *crypto.HashCode `` // query key to block | 125 | Key *crypto.HashCode `` // query key to block |
126 | TruncOrigin []byte `size:"(PESize)"` // truncated origin (if TRUNCATED flag set) | 126 | TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin (if TRUNCATED flag set) |
127 | PutPath []*path.Entry `size:"PathL"` // PUT path | 127 | PutPath []*path.Entry `size:"PathL"` // PUT path |
128 | LastSig []byte `size:"(PESize)"` // signature of last hop (if RECORD_ROUTE flag is set) | 128 | LastSig *util.PeerSignature `opt:"(IsUsed)"` // signature of last hop (if RECORD_ROUTE flag is set) |
129 | Block []byte `size:"*"` // block data | 129 | Block []byte `size:"*"` // block data |
130 | } | 130 | } |
131 | 131 | ||
132 | // NewDHTP2PPutMsg creates an empty new DHTP2PPutMsg | 132 | // NewDHTP2PPutMsg creates an empty new DHTP2PPutMsg |
@@ -149,19 +149,15 @@ func NewDHTP2PPutMsg() *DHTP2PPutMsg { | |||
149 | } | 149 | } |
150 | } | 150 | } |
151 | 151 | ||
152 | // PESize calculates field sizes based on flags and attributes | 152 | // IsUsed returns true if an optional field is used |
153 | func (m *DHTP2PPutMsg) PESize(field string) uint { | 153 | func (m *DHTP2PPutMsg) IsUsed(field string) bool { |
154 | switch field { | 154 | switch field { |
155 | case "Origin": | 155 | case "Origin": |
156 | if m.Flags&enums.DHT_RO_TRUNCATED != 0 { | 156 | return m.Flags&enums.DHT_RO_TRUNCATED != 0 |
157 | return util.NewPeerID(nil).Size() | ||
158 | } | ||
159 | case "LastSig": | 157 | case "LastSig": |
160 | if m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 { | 158 | return m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 |
161 | return util.NewPeerSignature(nil).Size() | ||
162 | } | ||
163 | } | 159 | } |
164 | return 0 | 160 | return false |
165 | } | 161 | } |
166 | 162 | ||
167 | //---------------------------------------------------------------------- | 163 | //---------------------------------------------------------------------- |
@@ -171,13 +167,13 @@ func (m *DHTP2PPutMsg) Update(p *path.Path, pf *blocks.PeerFilter, hop uint16) * | |||
171 | msg := NewDHTP2PPutMsg() | 167 | msg := NewDHTP2PPutMsg() |
172 | msg.Flags = m.Flags | 168 | msg.Flags = m.Flags |
173 | msg.HopCount = hop | 169 | msg.HopCount = hop |
174 | msg.PathL = m.PathL | 170 | msg.PathL = p.NumList |
175 | msg.Expiration = m.Expiration | 171 | msg.Expiration = m.Expiration |
176 | msg.PeerFilter = pf | 172 | msg.PeerFilter = pf |
177 | msg.Key = m.Key.Clone() | 173 | msg.Key = m.Key.Clone() |
178 | msg.TruncOrigin = m.TruncOrigin | 174 | msg.TruncOrigin = p.TruncOrigin |
179 | msg.PutPath = util.Clone(m.PutPath) | 175 | msg.PutPath = util.Clone(p.List) |
180 | msg.LastSig = m.LastSig | 176 | msg.LastSig = p.LastSig |
181 | msg.Block = util.Clone(m.Block) | 177 | msg.Block = util.Clone(m.Block) |
182 | msg.SetPath(p) | 178 | msg.SetPath(p) |
183 | return msg | 179 | return msg |
@@ -199,11 +195,11 @@ func (m *DHTP2PPutMsg) Path(sender *util.PeerID) *path.Path { | |||
199 | 195 | ||
200 | // handle truncate origin | 196 | // handle truncate origin |
201 | if m.Flags&enums.DHT_RO_TRUNCATED == 1 { | 197 | if m.Flags&enums.DHT_RO_TRUNCATED == 1 { |
202 | if m.TruncOrigin == nil || len(m.TruncOrigin) == 0 { | 198 | if m.TruncOrigin == nil { |
203 | logger.Printf(logger.WARN, "[path] truncated but no origin - flag reset") | 199 | logger.Printf(logger.WARN, "[path] truncated but no origin - flag reset") |
204 | m.Flags &^= enums.DHT_RO_TRUNCATED | 200 | m.Flags &^= enums.DHT_RO_TRUNCATED |
205 | } else { | 201 | } else { |
206 | pth.TruncOrigin = util.NewPeerID(m.TruncOrigin) | 202 | pth.TruncOrigin = m.TruncOrigin |
207 | pth.Flags |= path.PathTruncated | 203 | pth.Flags |= path.PathTruncated |
208 | } | 204 | } |
209 | } | 205 | } |
@@ -213,12 +209,12 @@ func (m *DHTP2PPutMsg) Path(sender *util.PeerID) *path.Path { | |||
213 | pth.NumList = uint16(len(pth.List)) | 209 | pth.NumList = uint16(len(pth.List)) |
214 | 210 | ||
215 | // handle last hop signature | 211 | // handle last hop signature |
216 | if m.LastSig == nil || len(m.LastSig) == 0 { | 212 | if m.LastSig == nil { |
217 | logger.Printf(logger.WARN, "[path] - last hop signature missing - path reset") | 213 | logger.Printf(logger.WARN, "[path] - last hop signature missing - path reset") |
218 | return path.NewPath(crypto.Hash(m.Block), m.Expiration) | 214 | return path.NewPath(crypto.Hash(m.Block), m.Expiration) |
219 | } | 215 | } |
220 | pth.Flags |= path.PathLastHop | 216 | pth.Flags |= path.PathLastHop |
221 | pth.LastSig = util.NewPeerSignature(m.LastSig) | 217 | pth.LastSig = m.LastSig |
222 | pth.LastHop = sender | 218 | pth.LastHop = sender |
223 | return pth | 219 | return pth |
224 | } | 220 | } |
@@ -235,7 +231,13 @@ func (m *DHTP2PPutMsg) SetPath(p *path.Path) { | |||
235 | if len(m.PutPath) > 0 { | 231 | if len(m.PutPath) > 0 { |
236 | pes = m.PutPath[0].Size() | 232 | pes = m.PutPath[0].Size() |
237 | } | 233 | } |
238 | oldSize := uint(len(m.PutPath))*pes + m.PESize("Origin") + m.PESize("LastSig") | 234 | oldSize := uint(len(m.PutPath)) * pes |
235 | if m.TruncOrigin != nil { | ||
236 | oldSize += m.TruncOrigin.Size() | ||
237 | } | ||
238 | if m.LastSig != nil { | ||
239 | oldSize += m.LastSig.Size() | ||
240 | } | ||
239 | // if no new path is defined,... | 241 | // if no new path is defined,... |
240 | if p == nil { | 242 | if p == nil { |
241 | // ... remove existing path | 243 | // ... remove existing path |
@@ -254,12 +256,12 @@ func (m *DHTP2PPutMsg) SetPath(p *path.Path) { | |||
254 | if p.TruncOrigin != nil { | 256 | if p.TruncOrigin != nil { |
255 | // truncated path | 257 | // truncated path |
256 | m.Flags |= enums.DHT_RO_TRUNCATED | 258 | m.Flags |= enums.DHT_RO_TRUNCATED |
257 | m.TruncOrigin = p.TruncOrigin.Bytes() | 259 | m.TruncOrigin = p.TruncOrigin |
258 | } | 260 | } |
259 | m.PutPath = util.Clone(p.List) | 261 | m.PutPath = util.Clone(p.List) |
260 | m.PathL = uint16(len(m.PutPath)) | 262 | m.PathL = uint16(len(m.PutPath)) |
261 | if p.LastSig != nil { | 263 | if p.LastSig != nil { |
262 | m.LastSig = p.LastSig.Bytes() | 264 | m.LastSig = p.LastSig |
263 | } | 265 | } |
264 | } | 266 | } |
265 | 267 | ||
@@ -283,52 +285,174 @@ func (m *DHTP2PPutMsg) Header() *Header { | |||
283 | 285 | ||
284 | // DHTP2PResultMsg wire layout | 286 | // DHTP2PResultMsg wire layout |
285 | type DHTP2PResultMsg struct { | 287 | type DHTP2PResultMsg struct { |
286 | MsgSize uint16 `order:"big"` // total size of message | 288 | MsgSize uint16 `order:"big"` // total size of message |
287 | MsgType uint16 `order:"big"` // DHT_P2P_RESULT (148) | 289 | MsgType uint16 `order:"big"` // DHT_P2P_RESULT (148) |
288 | BType uint32 `order:"big"` // Block type of result | 290 | BType uint32 `order:"big"` // Block type of result |
289 | Reserved uint32 `order:"big"` // Reserved for further use | 291 | Flags uint32 `order:"big"` // Message flags |
290 | PutPathL uint16 `order:"big"` // size of PUTPATH field | 292 | PutPathL uint16 `order:"big"` // size of PUTPATH field |
291 | GetPathL uint16 `order:"big"` // size of GETPATH field | 293 | GetPathL uint16 `order:"big"` // size of GETPATH field |
292 | Expires util.AbsoluteTime `` // expiration date | 294 | Expires util.AbsoluteTime `` // expiration date |
293 | Query *crypto.HashCode `` // Query key for block | 295 | Query *crypto.HashCode `` // Query key for block |
294 | Origin []byte `size:"(PESize)"` // truncated origin (if TRUNCATED flag set) | 296 | TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin (if TRUNCATED flag set) |
295 | PutPath []*path.Entry `size:"PutPathL"` // PUTPATH | 297 | PathList []*path.Entry `size:"(NumPath)"` // PATH |
296 | GetPath []*path.Entry `size:"GetPathL"` // GETPATH | 298 | LastSig *util.PeerSignature `size:"(IsUsed)"` // signature of last hop (if RECORD_ROUTE flag is set) |
297 | LastSig []byte `size:"(PESize)"` // signature of last hop (if RECORD_ROUTE flag is set) | 299 | Block []byte `size:"*"` // block data |
298 | Block []byte `size:"*"` // block data | ||
299 | } | 300 | } |
300 | 301 | ||
301 | // NewDHTP2PResultMsg creates a new empty DHTP2PResultMsg | 302 | // NewDHTP2PResultMsg creates a new empty DHTP2PResultMsg |
302 | func NewDHTP2PResultMsg() *DHTP2PResultMsg { | 303 | func NewDHTP2PResultMsg() *DHTP2PResultMsg { |
303 | return &DHTP2PResultMsg{ | 304 | return &DHTP2PResultMsg{ |
304 | MsgSize: 88, // size of empty message | 305 | MsgSize: 88, // size of empty message |
305 | MsgType: DHT_P2P_RESULT, // DHT_P2P_RESULT (148) | 306 | MsgType: DHT_P2P_RESULT, // DHT_P2P_RESULT (148) |
306 | BType: uint32(enums.BLOCK_TYPE_ANY), // type of returned block | 307 | BType: uint32(enums.BLOCK_TYPE_ANY), // type of returned block |
307 | Origin: nil, // no truncated origin | 308 | TruncOrigin: nil, // no truncated origin |
308 | PutPathL: 0, // empty putpath | 309 | PutPathL: 0, // empty putpath |
309 | PutPath: nil, // -"- | 310 | GetPathL: 0, // empty getpath |
310 | GetPathL: 0, // empty getpath | 311 | PathList: nil, // empty path list (put+get) |
311 | GetPath: nil, // -"- | 312 | LastSig: nil, // no recorded route |
312 | LastSig: nil, // no recorded route | 313 | Block: nil, // empty block |
313 | Block: nil, // empty block | ||
314 | } | 314 | } |
315 | } | 315 | } |
316 | 316 | ||
317 | // PESize calculates field sizes based on flags and attributes | 317 | // IsUsed returns if an optional field is present |
318 | func (m *DHTP2PResultMsg) PESize(field string) uint { | 318 | func (m *DHTP2PResultMsg) IsUsed(field string) bool { |
319 | switch field { | 319 | switch field { |
320 | case "Origin": | 320 | case "Origin": |
321 | //if m.Flags&enums.DHT_RO_TRUNCATED != 0 { | 321 | return m.Flags&enums.DHT_RO_TRUNCATED != 0 |
322 | return 32 | ||
323 | //} | ||
324 | case "LastSig": | 322 | case "LastSig": |
325 | //if m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 { | 323 | return m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 |
326 | return 64 | 324 | } |
327 | //} | 325 | return false |
326 | } | ||
327 | |||
328 | // NumPath returns the total number of entries in path | ||
329 | func (m *DHTP2PResultMsg) NumPath(field string) uint { | ||
330 | return uint(m.GetPathL + m.PutPathL) | ||
331 | } | ||
332 | |||
333 | //---------------------------------------------------------------------- | ||
334 | // Path handling (get/set path in message) | ||
335 | //---------------------------------------------------------------------- | ||
336 | |||
337 | // Path returns the current path from message | ||
338 | func (m *DHTP2PResultMsg) Path(sender *util.PeerID) *path.Path { | ||
339 | // create a "real" path list from message data | ||
340 | pth := path.NewPath(crypto.Hash(m.Block), m.Expires) | ||
341 | |||
342 | // return empty path if recording is switched off | ||
343 | if m.Flags&enums.DHT_RO_RECORD_ROUTE == 0 { | ||
344 | return pth | ||
345 | } | ||
346 | // handle truncate origin | ||
347 | if m.Flags&enums.DHT_RO_TRUNCATED == 1 { | ||
348 | if m.TruncOrigin == nil { | ||
349 | logger.Printf(logger.WARN, "[path] truncated but no origin - flag reset") | ||
350 | m.Flags &^= enums.DHT_RO_TRUNCATED | ||
351 | } else { | ||
352 | pth.TruncOrigin = m.TruncOrigin | ||
353 | pth.Flags |= path.PathTruncated | ||
354 | } | ||
355 | } | ||
356 | // copy path elements | ||
357 | pth.List = util.Clone(m.PathList) | ||
358 | pth.NumList = uint16(len(pth.List)) | ||
359 | |||
360 | // check consistent length values; adjust if mismatched | ||
361 | if m.GetPathL+m.PutPathL != pth.NumList { | ||
362 | logger.Printf(logger.WARN, "[path] Inconsistent PATH length -- adjusting...") | ||
363 | if sp := pth.NumList - m.PutPathL; sp > 0 { | ||
364 | pth.SplitPos = sp | ||
365 | } else { | ||
366 | pth.SplitPos = 0 | ||
367 | } | ||
368 | } else { | ||
369 | pth.SplitPos = pth.NumList - m.PutPathL | ||
370 | } | ||
371 | // handle last hop signature | ||
372 | if m.LastSig == nil { | ||
373 | logger.Printf(logger.WARN, "[path] - last hop signature missing - path reset") | ||
374 | return path.NewPath(crypto.Hash(m.Block), m.Expires) | ||
375 | } | ||
376 | pth.Flags |= path.PathLastHop | ||
377 | pth.LastSig = m.LastSig | ||
378 | pth.LastHop = sender | ||
379 | return pth | ||
380 | } | ||
381 | |||
382 | // Set path in message; corrects the message size accordingly | ||
383 | func (m *DHTP2PResultMsg) SetPath(p *path.Path) { | ||
384 | |||
385 | // return if recording is switched off (don't touch path) | ||
386 | if m.Flags&enums.DHT_RO_RECORD_ROUTE == 0 { | ||
387 | return | ||
388 | } | ||
389 | // compute old path size | ||
390 | var pes uint | ||
391 | if len(m.PathList) > 0 { | ||
392 | pes = m.PathList[0].Size() | ||
393 | } | ||
394 | oldSize := uint(len(m.PathList)) * pes | ||
395 | if m.TruncOrigin != nil { | ||
396 | oldSize += m.TruncOrigin.Size() | ||
397 | } | ||
398 | if m.LastSig != nil { | ||
399 | oldSize += m.LastSig.Size() | ||
400 | } | ||
401 | // if no new path is defined,... | ||
402 | if p == nil { | ||
403 | // ... remove existing path | ||
404 | m.TruncOrigin = nil | ||
405 | m.PathList = make([]*path.Entry, 0) | ||
406 | m.LastSig = nil | ||
407 | m.GetPathL = 0 | ||
408 | m.PutPathL = 0 | ||
409 | m.Flags &^= enums.DHT_RO_TRUNCATED | ||
410 | m.MsgSize -= uint16(oldSize) | ||
411 | return | ||
412 | } | ||
413 | // adjust message size | ||
414 | m.MsgSize += uint16(p.Size() - oldSize) | ||
415 | |||
416 | // transfer path data | ||
417 | if p.TruncOrigin != nil { | ||
418 | // truncated path | ||
419 | m.Flags |= enums.DHT_RO_TRUNCATED | ||
420 | m.TruncOrigin = p.TruncOrigin | ||
421 | } | ||
422 | m.PathList = util.Clone(p.List) | ||
423 | m.PutPathL = p.SplitPos | ||
424 | m.GetPathL = p.NumList - p.SplitPos | ||
425 | if p.LastSig != nil { | ||
426 | m.LastSig = p.LastSig | ||
427 | } | ||
428 | } | ||
429 | |||
430 | //---------------------------------------------------------------------- | ||
431 | |||
432 | // Update message (forwarding) | ||
433 | func (m *DHTP2PResultMsg) Update(pth *path.Path) *DHTP2PResultMsg { | ||
434 | // clone old message | ||
435 | msg := &DHTP2PResultMsg{ | ||
436 | MsgSize: m.MsgSize, | ||
437 | MsgType: m.MsgType, | ||
438 | BType: m.BType, | ||
439 | Flags: m.Flags, | ||
440 | PutPathL: m.PutPathL, | ||
441 | GetPathL: m.GetPathL, | ||
442 | Expires: m.Expires, | ||
443 | Query: m.Query.Clone(), | ||
444 | TruncOrigin: m.TruncOrigin, | ||
445 | PathList: util.Clone(m.PathList), | ||
446 | LastSig: m.LastSig, | ||
447 | Block: util.Clone(m.Block), | ||
328 | } | 448 | } |
329 | return 0 | 449 | // set new path |
450 | msg.SetPath(pth) | ||
451 | return msg | ||
330 | } | 452 | } |
331 | 453 | ||
454 | //---------------------------------------------------------------------- | ||
455 | |||
332 | // String returns a human-readable representation of the message. | 456 | // String returns a human-readable representation of the message. |
333 | func (m *DHTP2PResultMsg) String() string { | 457 | func (m *DHTP2PResultMsg) String() string { |
334 | return fmt.Sprintf("DHTP2PResultMsg{btype=%s,putl=%d,getl=%d}", | 458 | return fmt.Sprintf("DHTP2PResultMsg{btype=%s,putl=%d,getl=%d}", |
diff --git a/src/gnunet/service/connection.go b/src/gnunet/service/connection.go index d443160..fbc24d0 100644 --- a/src/gnunet/service/connection.go +++ b/src/gnunet/service/connection.go | |||
@@ -138,8 +138,8 @@ func (s *Connection) Receive(ctx context.Context) (message.Message, error) { | |||
138 | } | 138 | } |
139 | 139 | ||
140 | // Receiver returns the receiving client (string representation) | 140 | // Receiver returns the receiving client (string representation) |
141 | func (s *Connection) Receiver() string { | 141 | func (s *Connection) Receiver() *util.PeerID { |
142 | return fmt.Sprintf("uds:%d", s.id) | 142 | return nil |
143 | } | 143 | } |
144 | 144 | ||
145 | //---------------------------------------------------------------------- | 145 | //---------------------------------------------------------------------- |
diff --git a/src/gnunet/service/dht/blocks/filters.go b/src/gnunet/service/dht/blocks/filters.go index 26af0b3..f617479 100644 --- a/src/gnunet/service/dht/blocks/filters.go +++ b/src/gnunet/service/dht/blocks/filters.go | |||
@@ -22,6 +22,7 @@ import ( | |||
22 | "bytes" | 22 | "bytes" |
23 | "crypto/sha512" | 23 | "crypto/sha512" |
24 | "encoding/binary" | 24 | "encoding/binary" |
25 | "gnunet/crypto" | ||
25 | "gnunet/util" | 26 | "gnunet/util" |
26 | 27 | ||
27 | "github.com/bfix/gospel/logger" | 28 | "github.com/bfix/gospel/logger" |
@@ -105,9 +106,12 @@ type ResultFilter interface { | |||
105 | // Add entry to filter | 106 | // Add entry to filter |
106 | Add(Block) | 107 | Add(Block) |
107 | 108 | ||
108 | // Contains returns true if entry is filtered | 109 | // Contains returns true if block is filtered |
109 | Contains(Block) bool | 110 | Contains(Block) bool |
110 | 111 | ||
112 | // ContainsHash returns true if block hash is filtered | ||
113 | ContainsHash(bh *crypto.HashCode) bool | ||
114 | |||
111 | // Bytes returns the binary representation of a result filter | 115 | // Bytes returns the binary representation of a result filter |
112 | Bytes() []byte | 116 | Bytes() []byte |
113 | 117 | ||
@@ -140,9 +144,15 @@ func (rf *GenericResultFilter) Add(b Block) { | |||
140 | rf.bf.Add(b.Bytes()) | 144 | rf.bf.Add(b.Bytes()) |
141 | } | 145 | } |
142 | 146 | ||
143 | // Contains returns true if entry (binary representation) is filtered | 147 | // Contains returns true if a block is filtered |
144 | func (rf *GenericResultFilter) Contains(b Block) bool { | 148 | func (rf *GenericResultFilter) Contains(b Block) bool { |
145 | return rf.bf.Contains(b.Bytes()) | 149 | bh := crypto.Hash(b.Bytes()) |
150 | return rf.bf.Contains(bh.Bits) | ||
151 | } | ||
152 | |||
153 | // ContainsHash returns true if a block hash is filtered | ||
154 | func (rf *GenericResultFilter) ContainsHash(bh *crypto.HashCode) bool { | ||
155 | return rf.bf.Contains(bh.Bits) | ||
146 | } | 156 | } |
147 | 157 | ||
148 | // Bytes returns the binary representation of a result filter | 158 | // Bytes returns the binary representation of a result filter |
diff --git a/src/gnunet/service/dht/blocks/handlers.go b/src/gnunet/service/dht/blocks/handlers.go index 9df3867..c166504 100644 --- a/src/gnunet/service/dht/blocks/handlers.go +++ b/src/gnunet/service/dht/blocks/handlers.go | |||
@@ -29,15 +29,22 @@ type BlockHandler interface { | |||
29 | // Parse a block instance from binary data | 29 | // Parse a block instance from binary data |
30 | ParseBlock(buf []byte) (Block, error) | 30 | ParseBlock(buf []byte) (Block, error) |
31 | 31 | ||
32 | // ValidateBlockQuery is used to evaluate the request for a block as part of | 32 | // ValidateBlockQuery is used to evaluate the request for a block as part |
33 | // DHT-P2P-GET processing. Here, the block payload is unknown, but if possible | 33 | // of DHT-P2P-GET processing. Here, the block payload is unknown, but if |
34 | // the XQuery and Key SHOULD be verified. | 34 | // possible the XQuery and Key SHOULD be verified. |
35 | ValidateBlockQuery(key *crypto.HashCode, xquery []byte) bool | 35 | ValidateBlockQuery(key *crypto.HashCode, xquery []byte) bool |
36 | 36 | ||
37 | // ValidateBlockKey returns true if the block key is the same as the | 37 | // ValidateBlockKey returns true if the block key is the same as the |
38 | // query key used to access the block. | 38 | // query key used to access the block. |
39 | ValidateBlockKey(b Block, key *crypto.HashCode) bool | 39 | ValidateBlockKey(b Block, key *crypto.HashCode) bool |
40 | 40 | ||
41 | // DeriveBlockKey is used to synthesize the block key from the block | ||
42 | // payload as part of PutMessage and ResultMessage processing. The special | ||
43 | // return value of 'nil' implies that this block type does not permit | ||
44 | // deriving the key from the block. A Key may be returned for a block that | ||
45 | // is ill-formed. | ||
46 | DeriveBlockKey(b Block) *crypto.HashCode | ||
47 | |||
41 | // ValidateBlockStoreRequest is used to evaluate a block payload as part of | 48 | // ValidateBlockStoreRequest is used to evaluate a block payload as part of |
42 | // PutMessage and ResultMessage processing. | 49 | // PutMessage and ResultMessage processing. |
43 | ValidateBlockStoreRequest(b Block) bool | 50 | ValidateBlockStoreRequest(b Block) bool |
diff --git a/src/gnunet/service/dht/blocks/hello.go b/src/gnunet/service/dht/blocks/hello.go index 32f803f..082e914 100644 --- a/src/gnunet/service/dht/blocks/hello.go +++ b/src/gnunet/service/dht/blocks/hello.go | |||
@@ -30,6 +30,7 @@ import ( | |||
30 | "net/url" | 30 | "net/url" |
31 | "strconv" | 31 | "strconv" |
32 | "strings" | 32 | "strings" |
33 | "time" | ||
33 | 34 | ||
34 | "github.com/bfix/gospel/crypto/ed25519" | 35 | "github.com/bfix/gospel/crypto/ed25519" |
35 | "github.com/bfix/gospel/data" | 36 | "github.com/bfix/gospel/data" |
@@ -63,8 +64,21 @@ type HelloBlock struct { | |||
63 | addrs []*util.Address // cooked address data | 64 | addrs []*util.Address // cooked address data |
64 | } | 65 | } |
65 | 66 | ||
67 | // NewHelloBlock initializes a new HELLO block (unsigned) | ||
68 | func NewHelloBlock(peer *util.PeerID, addrs []*util.Address, ttl time.Duration) *HelloBlock { | ||
69 | hb := new(HelloBlock) | ||
70 | hb.PeerID = peer | ||
71 | // limit expiration to second precision (HELLO-URL compatibility) | ||
72 | hb.Expires = util.NewAbsoluteTimeEpoch(uint64(time.Now().Add(ttl).Unix())) | ||
73 | hb.SetAddresses(addrs) | ||
74 | return hb | ||
75 | } | ||
76 | |||
66 | // SetAddresses adds a bulk of addresses for this HELLO block. | 77 | // SetAddresses adds a bulk of addresses for this HELLO block. |
67 | func (h *HelloBlock) SetAddresses(a []*util.Address) { | 78 | func (h *HelloBlock) SetAddresses(a []*util.Address) { |
79 | if len(a) == 0 { | ||
80 | return | ||
81 | } | ||
68 | h.addrs = util.Clone(a) | 82 | h.addrs = util.Clone(a) |
69 | if err := h.finalize(); err != nil { | 83 | if err := h.finalize(); err != nil { |
70 | logger.Printf(logger.ERROR, "[HelloBlock.SetAddresses] failed: %s", err.Error()) | 84 | logger.Printf(logger.ERROR, "[HelloBlock.SetAddresses] failed: %s", err.Error()) |
@@ -76,10 +90,10 @@ func (h *HelloBlock) Addresses() []*util.Address { | |||
76 | return util.Clone(h.addrs) | 90 | return util.Clone(h.addrs) |
77 | } | 91 | } |
78 | 92 | ||
79 | // ParseHelloURL parses a HELLO URL of the following form: | 93 | // ParseHelloBlockFromURL parses a HELLO URL of the following form: |
80 | // gnunet://hello/<PeerID>/<signature>/<expire>?<addrs> | 94 | // gnunet://hello/<PeerID>/<signature>/<expire>?<addrs> |
81 | // The addresses are encoded. | 95 | // The addresses are encoded. |
82 | func ParseHelloURL(u string, checkExpiry bool) (h *HelloBlock, err error) { | 96 | func ParseHelloBlockFromURL(u string, checkExpiry bool) (h *HelloBlock, err error) { |
83 | // check and trim prefix | 97 | // check and trim prefix |
84 | if !strings.HasPrefix(u, helloPrefix) { | 98 | if !strings.HasPrefix(u, helloPrefix) { |
85 | err = fmt.Errorf("invalid HELLO-URL prefix: '%s'", u) | 99 | err = fmt.Errorf("invalid HELLO-URL prefix: '%s'", u) |
@@ -158,8 +172,8 @@ func ParseHelloURL(u string, checkExpiry bool) (h *HelloBlock, err error) { | |||
158 | return | 172 | return |
159 | } | 173 | } |
160 | 174 | ||
161 | // ParseHelloFromBytes converts a byte array into a HelloBlock instance. | 175 | // ParseHelloBlockFromBytes converts a byte array into a HelloBlock instance. |
162 | func ParseHelloFromBytes(buf []byte) (h *HelloBlock, err error) { | 176 | func ParseHelloBlockFromBytes(buf []byte) (h *HelloBlock, err error) { |
163 | h = new(HelloBlock) | 177 | h = new(HelloBlock) |
164 | if err = data.Unmarshal(h, buf); err == nil { | 178 | if err = data.Unmarshal(h, buf); err == nil { |
165 | err = h.finalize() | 179 | err = h.finalize() |
@@ -289,7 +303,7 @@ func (h *HelloBlock) SignedData() []byte { | |||
289 | err := binary.Write(buf, binary.BigEndian, size) | 303 | err := binary.Write(buf, binary.BigEndian, size) |
290 | if err == nil { | 304 | if err == nil { |
291 | if err = binary.Write(buf, binary.BigEndian, purpose); err == nil { | 305 | if err = binary.Write(buf, binary.BigEndian, purpose); err == nil { |
292 | if err = binary.Write(buf, binary.BigEndian, h.Expires.Epoch()*1000000); err == nil { | 306 | if err = binary.Write(buf, binary.BigEndian, h.Expires /*.Epoch()*1000000*/); err == nil { |
293 | if n, err = buf.Write(hAddr[:]); err == nil { | 307 | if n, err = buf.Write(hAddr[:]); err == nil { |
294 | if n != len(hAddr[:]) { | 308 | if n != len(hAddr[:]) { |
295 | err = errors.New("signed data size mismatch") | 309 | err = errors.New("signed data size mismatch") |
@@ -313,7 +327,7 @@ type HelloBlockHandler struct{} | |||
313 | 327 | ||
314 | // Parse a block instance from binary data | 328 | // Parse a block instance from binary data |
315 | func (bh *HelloBlockHandler) ParseBlock(buf []byte) (Block, error) { | 329 | func (bh *HelloBlockHandler) ParseBlock(buf []byte) (Block, error) { |
316 | return ParseHelloFromBytes(buf) | 330 | return ParseHelloBlockFromBytes(buf) |
317 | } | 331 | } |
318 | 332 | ||
319 | // ValidateHelloBlockQuery validates query parameters for a | 333 | // ValidateHelloBlockQuery validates query parameters for a |
@@ -326,20 +340,49 @@ func (bh *HelloBlockHandler) ValidateBlockQuery(key *crypto.HashCode, xquery []b | |||
326 | // ValidateBlockKey returns true if the block key is the same as the | 340 | // ValidateBlockKey returns true if the block key is the same as the |
327 | // query key used to access the block. | 341 | // query key used to access the block. |
328 | func (bh *HelloBlockHandler) ValidateBlockKey(b Block, key *crypto.HashCode) bool { | 342 | func (bh *HelloBlockHandler) ValidateBlockKey(b Block, key *crypto.HashCode) bool { |
343 | // check for matching keys | ||
344 | bkey := bh.DeriveBlockKey(b) | ||
345 | if bkey == nil { | ||
346 | logger.Println(logger.WARN, "[HelloHdlr] ValidateBlockKey: not a HELLO block") | ||
347 | return false | ||
348 | } | ||
349 | return key.Equals(bkey) | ||
350 | } | ||
351 | |||
352 | // DeriveBlockKey is used to synthesize the block key from the block | ||
353 | // payload as part of PutMessage and ResultMessage processing. The special | ||
354 | // return value of 'nil' implies that this block type does not permit | ||
355 | // deriving the key from the block. A Key may be returned for a block that | ||
356 | // is ill-formed. | ||
357 | func (bh *HelloBlockHandler) DeriveBlockKey(b Block) *crypto.HashCode { | ||
358 | // check for correct type | ||
329 | hb, ok := b.(*HelloBlock) | 359 | hb, ok := b.(*HelloBlock) |
330 | if !ok { | 360 | if !ok { |
331 | return false | 361 | logger.Println(logger.WARN, "[HelloHdlr] DeriveBlockKey: not a HELLO block") |
362 | return nil | ||
332 | } | 363 | } |
333 | // key must be the hash of the peer id | 364 | // key must be the hash of the peer id |
334 | bkey := crypto.Hash(hb.PeerID.Bytes()) | 365 | return crypto.Hash(hb.PeerID.Bytes()) |
335 | return key.Equals(bkey) | ||
336 | } | 366 | } |
337 | 367 | ||
338 | // ValidateBlockStoreRequest is used to evaluate a block payload as part of | 368 | // ValidateBlockStoreRequest is used to evaluate a block payload as part of |
339 | // PutMessage and ResultMessage processing. | 369 | // PutMessage and ResultMessage processing. |
370 | // To validate a block store request is to verify the EdDSA SIGNATURE over | ||
371 | // the hashed ADDRESSES against the public key from the peer ID field. If the | ||
372 | // signature is valid true is returned. | ||
340 | func (bh *HelloBlockHandler) ValidateBlockStoreRequest(b Block) bool { | 373 | func (bh *HelloBlockHandler) ValidateBlockStoreRequest(b Block) bool { |
341 | // TODO: verify block payload | 374 | // check for correct type |
342 | return true | 375 | hb, ok := b.(*HelloBlock) |
376 | if !ok { | ||
377 | logger.Println(logger.WARN, "[HelloHdlr] ValidateBlockStoreRequest: not a HELLO block") | ||
378 | return false | ||
379 | } | ||
380 | // verify signature | ||
381 | ok, err := hb.Verify() | ||
382 | if err != nil { | ||
383 | ok = false | ||
384 | } | ||
385 | return ok | ||
343 | } | 386 | } |
344 | 387 | ||
345 | // SetupResultFilter is used to setup an empty result filter. The arguments | 388 | // SetupResultFilter is used to setup an empty result filter. The arguments |
@@ -428,6 +471,11 @@ func (rf *HelloResultFilter) Contains(b Block) bool { | |||
428 | return false | 471 | return false |
429 | } | 472 | } |
430 | 473 | ||
474 | // ContainsHash checks if a block hash is contained in the result filter | ||
475 | func (rf *HelloResultFilter) ContainsHash(bh *crypto.HashCode) bool { | ||
476 | return rf.bf.Contains(bh.Bits) | ||
477 | } | ||
478 | |||
431 | // Bytes returns a binary representation of a HELLO result filter | 479 | // Bytes returns a binary representation of a HELLO result filter |
432 | func (rf *HelloResultFilter) Bytes() []byte { | 480 | func (rf *HelloResultFilter) Bytes() []byte { |
433 | return rf.bf.Bytes() | 481 | return rf.bf.Bytes() |
diff --git a/src/gnunet/service/dht/blocks/hello_test.go b/src/gnunet/service/dht/blocks/hello_test.go new file mode 100644 index 0000000..deb8041 --- /dev/null +++ b/src/gnunet/service/dht/blocks/hello_test.go | |||
@@ -0,0 +1,164 @@ | |||
1 | // This file is part of gnunet-go, a GNUnet-implementation in Golang. | ||
2 | // Copyright (C) 2019-2022 Bernd Fix >Y< | ||
3 | // | ||
4 | // gnunet-go is free software: you can redistribute it and/or modify it | ||
5 | // under the terms of the GNU Affero General Public License as published | ||
6 | // by the Free Software Foundation, either version 3 of the License, | ||
7 | // or (at your option) any later version. | ||
8 | // | ||
9 | // gnunet-go is distributed in the hope that it will be useful, but | ||
10 | // WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
12 | // Affero General Public License for more details. | ||
13 | // | ||
14 | // You should have received a copy of the GNU Affero General Public License | ||
15 | // along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
16 | // | ||
17 | // SPDX-License-Identifier: AGPL3.0-or-later | ||
18 | |||
19 | package blocks | ||
20 | |||
21 | import ( | ||
22 | "bytes" | ||
23 | "encoding/base64" | ||
24 | "encoding/hex" | ||
25 | "gnunet/util" | ||
26 | "strings" | ||
27 | "testing" | ||
28 | "time" | ||
29 | |||
30 | "github.com/bfix/gospel/crypto/ed25519" | ||
31 | "github.com/bfix/gospel/data" | ||
32 | ) | ||
33 | |||
34 | var ( | ||
35 | block *HelloBlock | ||
36 | sk *ed25519.PrivateKey | ||
37 | ) | ||
38 | |||
39 | func setup(t *testing.T) { | ||
40 | t.Helper() | ||
41 | |||
42 | // check for initialized values | ||
43 | if block != nil { | ||
44 | return | ||
45 | } | ||
46 | // generate keys | ||
47 | var pk *ed25519.PublicKey | ||
48 | pk, sk = ed25519.NewKeypair() | ||
49 | peer := util.NewPeerID(pk.Bytes()) | ||
50 | |||
51 | // set addresses | ||
52 | addrs := []string{ | ||
53 | "ip+udp://172.17.0.6:2086", | ||
54 | "ip+udp://245.23.42.67:2086", | ||
55 | } | ||
56 | addrList := make([]*util.Address, 0) | ||
57 | for _, addr := range addrs { | ||
58 | frag := strings.Split(addr, "://") | ||
59 | e := util.NewAddress(frag[0], frag[1]) | ||
60 | if e == nil { | ||
61 | t.Fatal("invalid address: " + addr) | ||
62 | } | ||
63 | addrList = append(addrList, e) | ||
64 | } | ||
65 | |||
66 | // create new HELLO block | ||
67 | block = NewHelloBlock(peer, addrList, time.Hour) | ||
68 | |||
69 | // sign block. | ||
70 | sig, err := sk.EdSign(block.SignedData()) | ||
71 | if err != nil { | ||
72 | t.Fatal(err) | ||
73 | } | ||
74 | block.Signature = util.NewPeerSignature(sig.Bytes()) | ||
75 | } | ||
76 | |||
77 | func TestHelloVerify(t *testing.T) { | ||
78 | setup(t) | ||
79 | |||
80 | // verify signature | ||
81 | ok, err := block.Verify() | ||
82 | if err != nil { | ||
83 | t.Fatal(err) | ||
84 | } | ||
85 | if !ok { | ||
86 | t.Fatal("HELLO verify failed") | ||
87 | } | ||
88 | } | ||
89 | |||
90 | func TestHelloURL(t *testing.T) { | ||
91 | setup(t) | ||
92 | |||
93 | // create HELLO URL | ||
94 | url := block.URL() | ||
95 | t.Log(url) | ||
96 | |||
97 | // read back | ||
98 | tblk, err := ParseHelloBlockFromURL(url, true) | ||
99 | if err != nil { | ||
100 | t.Fatal(err) | ||
101 | } | ||
102 | // verify identical blocks | ||
103 | if !bytes.Equal(tblk.Bytes(), block.Bytes()) { | ||
104 | t.Log(hex.EncodeToString(tblk.Bytes())) | ||
105 | t.Log(hex.EncodeToString(block.Bytes())) | ||
106 | t.Fatal("URL readback failed") | ||
107 | } | ||
108 | } | ||
109 | |||
110 | func TestHelloBytes(t *testing.T) { | ||
111 | setup(t) | ||
112 | |||
113 | buf := block.Bytes() | ||
114 | tblk, err := ParseHelloBlockFromBytes(buf) | ||
115 | if err != nil { | ||
116 | t.Fatal(err) | ||
117 | } | ||
118 | // verify identical blocks | ||
119 | if !bytes.Equal(tblk.Bytes(), block.Bytes()) { | ||
120 | t.Log(hex.EncodeToString(tblk.Bytes())) | ||
121 | t.Log(hex.EncodeToString(block.Bytes())) | ||
122 | t.Fatal("Bytes readback failed") | ||
123 | } | ||
124 | } | ||
125 | |||
126 | func TestHelloDebug(t *testing.T) { | ||
127 | blkData := "QKObXJUbnnghRh9McDDjHaB9IIL6MhhEiQHc8VfO3QMABeZZJJhsA" + | ||
128 | "GlwK3VkcDovLzEyNy4wLjAuMToxMDAwMQBpcCt1ZHA6Ly8xNzIuMT" + | ||
129 | "cuMC40OjEwMDAxAGlwK3VkcDovL1s6OmZmZmY6MTcyLjE3LjAuNF06MTAwMDEA" | ||
130 | buf, err := base64.RawStdEncoding.DecodeString(blkData) | ||
131 | if err != nil { | ||
132 | t.Fatal(err) | ||
133 | } | ||
134 | hb, err := ParseHelloBlockFromBytes(buf) | ||
135 | if err != nil { | ||
136 | t.Fatal(err) | ||
137 | } | ||
138 | ok, err := hb.Verify() | ||
139 | if err != nil { | ||
140 | t.Fatal(err) | ||
141 | } | ||
142 | if !ok { | ||
143 | // trace problem | ||
144 | t.Log("Block: " + hex.EncodeToString(buf)) | ||
145 | t.Log("PeerID: " + hb.PeerID.String()) | ||
146 | t.Log(" -> " + hex.EncodeToString(hb.PeerID.Bytes())) | ||
147 | t.Logf("Expire: %d", hb.Expires.Val) | ||
148 | t.Logf(" -> " + hb.Expires.String()) | ||
149 | var exp util.AbsoluteTime | ||
150 | if err = data.Unmarshal(&exp, buf[32:40]); err != nil { | ||
151 | t.Fatal(err) | ||
152 | } | ||
153 | t.Logf(" -> " + exp.String()) | ||
154 | t.Log("AddrBin: " + hex.EncodeToString(hb.AddrBin)) | ||
155 | sd := hb.SignedData() | ||
156 | t.Log("SignedData: " + hex.EncodeToString(sd)) | ||
157 | t.Log("Addresses:") | ||
158 | for _, addr := range hb.Addresses() { | ||
159 | t.Logf("* " + addr.URI()) | ||
160 | } | ||
161 | t.Log("Signature: " + hex.EncodeToString(hb.Signature.Bytes())) | ||
162 | t.Fatal("debug HELLO verify failed") | ||
163 | } | ||
164 | } | ||
diff --git a/src/gnunet/service/dht/local.go b/src/gnunet/service/dht/local.go index 1e6b100..ba76892 100644 --- a/src/gnunet/service/dht/local.go +++ b/src/gnunet/service/dht/local.go | |||
@@ -19,7 +19,6 @@ | |||
19 | package dht | 19 | package dht |
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "errors" | ||
23 | "gnunet/enums" | 22 | "gnunet/enums" |
24 | "gnunet/service/dht/blocks" | 23 | "gnunet/service/dht/blocks" |
25 | "gnunet/service/store" | 24 | "gnunet/service/store" |
@@ -28,58 +27,37 @@ import ( | |||
28 | "github.com/bfix/gospel/math" | 27 | "github.com/bfix/gospel/math" |
29 | ) | 28 | ) |
30 | 29 | ||
31 | // getHelloCache tries to find the requested HELLO block in the HELLO cache | 30 | // lookupHelloCache tries to find the requested HELLO block in the HELLO cache |
32 | func (m *Module) getHelloCache(label string, addr *PeerAddress, rf blocks.ResultFilter) (entry *store.DHTEntry, dist *math.Int) { | 31 | func (m *Module) lookupHelloCache(label string, addr *PeerAddress, rf blocks.ResultFilter, approx bool) (results []*store.DHTResult) { |
33 | logger.Printf(logger.DBG, "[%s] GET message for HELLO: check cache", label) | 32 | logger.Printf(logger.DBG, "[%s] GET message for HELLO: check cache", label) |
34 | // find best cached HELLO | 33 | // find best cached HELLO |
35 | var block blocks.Block | 34 | return m.rtable.LookupHello(addr, rf, approx) |
36 | block, dist = m.rtable.BestHello(addr, rf) | ||
37 | |||
38 | // if block is filtered, skip it | ||
39 | if block != nil { | ||
40 | if !rf.Contains(block) { | ||
41 | entry = &store.DHTEntry{Blk: block} | ||
42 | } else { | ||
43 | logger.Printf(logger.DBG, "[%s] GET message for HELLO: matching DHT block is filtered", label) | ||
44 | entry = nil | ||
45 | dist = nil | ||
46 | } | ||
47 | } | ||
48 | return | ||
49 | } | 35 | } |
50 | 36 | ||
51 | // getLocalStorage tries to find the requested block in local storage | 37 | // getLocalStorage tries to find the requested block in local storage |
52 | func (m *Module) getLocalStorage(label string, query blocks.Query, rf blocks.ResultFilter) (entry *store.DHTEntry, dist *math.Int, err error) { | 38 | func (m *Module) getLocalStorage(label string, query blocks.Query, rf blocks.ResultFilter) (results []*store.DHTResult, err error) { |
53 | 39 | ||
54 | // query DHT store for exact match (9.4.3.3c) | 40 | // query DHT store for exact matches (9.4.3.3c) |
55 | if entry, err = m.store.Get(query); err != nil { | 41 | var entries []*store.DHTEntry |
42 | if entries, err = m.store.Get(label, query, rf); err != nil { | ||
56 | logger.Printf(logger.ERROR, "[%s] Failed to get DHT block from storage: %s", label, err.Error()) | 43 | logger.Printf(logger.ERROR, "[%s] Failed to get DHT block from storage: %s", label, err.Error()) |
57 | return | 44 | return |
58 | } | 45 | } |
59 | if entry != nil { | 46 | for _, entry := range entries { |
60 | dist = math.ZERO | 47 | // add entry to result list |
61 | // check if we are filtered out | 48 | result := &store.DHTResult{ |
62 | if rf.Contains(entry.Blk) { | 49 | Entry: entry, |
63 | logger.Printf(logger.DBG, "[%s] matching DHT block is filtered", label) | 50 | Dist: math.ZERO, |
64 | entry = nil | ||
65 | dist = nil | ||
66 | } | 51 | } |
52 | results = append(results, result) | ||
53 | // add to result filter | ||
54 | rf.Add(entry.Blk) | ||
67 | } | 55 | } |
68 | // if we have no exact match, find approximate block if requested | 56 | // if we have no exact match, find approximate block if requested |
69 | if entry == nil || query.Flags()&enums.DHT_RO_FIND_APPROXIMATE != 0 { | 57 | if len(results) == 0 || query.Flags()&enums.DHT_RO_FIND_APPROXIMATE != 0 { |
70 | // no exact match: find approximate (9.4.3.3b) | 58 | // no exact match: find approximate (9.4.3.3b) |
71 | match := func(e *store.DHTEntry) bool { | 59 | if results, err = m.store.GetApprox(label, query, rf); err != nil { |
72 | return rf.Contains(e.Blk) | 60 | logger.Printf(logger.ERROR, "[%s] Failed to get (approx.) DHT blocks from storage: %s", label, err.Error()) |
73 | } | ||
74 | var d any | ||
75 | entry, d, err = m.store.GetApprox(query, match) | ||
76 | var ok bool | ||
77 | dist, ok = d.(*math.Int) | ||
78 | if !ok { | ||
79 | err = errors.New("no approx distance") | ||
80 | } | ||
81 | if err != nil { | ||
82 | logger.Printf(logger.ERROR, "[%s] Failed to get (approx.) DHT block from storage: %s", label, err.Error()) | ||
83 | } | 61 | } |
84 | } | 62 | } |
85 | return | 63 | return |
diff --git a/src/gnunet/service/dht/messages.go b/src/gnunet/service/dht/messages.go index deb8461..76a92b6 100644 --- a/src/gnunet/service/dht/messages.go +++ b/src/gnunet/service/dht/messages.go | |||
@@ -20,6 +20,7 @@ package dht | |||
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "context" | 22 | "context" |
23 | "gnunet/crypto" | ||
23 | "gnunet/enums" | 24 | "gnunet/enums" |
24 | "gnunet/message" | 25 | "gnunet/message" |
25 | "gnunet/service/dht/blocks" | 26 | "gnunet/service/dht/blocks" |
@@ -29,7 +30,6 @@ import ( | |||
29 | "gnunet/util" | 30 | "gnunet/util" |
30 | 31 | ||
31 | "github.com/bfix/gospel/logger" | 32 | "github.com/bfix/gospel/logger" |
32 | "github.com/bfix/gospel/math" | ||
33 | ) | 33 | ) |
34 | 34 | ||
35 | //---------------------------------------------------------------------- | 35 | //---------------------------------------------------------------------- |
@@ -38,6 +38,7 @@ import ( | |||
38 | 38 | ||
39 | // HandleMessage handles a DHT request/response message. Responses are sent | 39 | // HandleMessage handles a DHT request/response message. Responses are sent |
40 | // to the specified responder. | 40 | // to the specified responder. |
41 | //nolint:gocyclo // life sometimes is complex... | ||
41 | func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn message.Message, back transport.Responder) bool { | 42 | func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn message.Message, back transport.Responder) bool { |
42 | // assemble log label | 43 | // assemble log label |
43 | label := "dht" | 44 | label := "dht" |
@@ -58,15 +59,18 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m | |||
58 | // process message | 59 | // process message |
59 | switch msg := msgIn.(type) { | 60 | switch msg := msgIn.(type) { |
60 | 61 | ||
62 | //================================================================== | ||
63 | // DHT-P2P-GET | ||
64 | //================================================================== | ||
61 | case *message.DHTP2PGetMsg: | 65 | case *message.DHTP2PGetMsg: |
62 | //-------------------------------------------------------------- | 66 | //-------------------------------------------------------------- |
63 | // DHT-P2P GET | 67 | // DHT-P2P GET |
64 | //-------------------------------------------------------------- | 68 | //-------------------------------------------------------------- |
65 | logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message", label) | 69 | logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message", label) |
66 | query := blocks.NewGenericQuery(msg.Query, enums.BlockType(msg.BType), msg.Flags) | ||
67 | 70 | ||
68 | var entry *store.DHTEntry | 71 | // assemble query and initialize (cache) results |
69 | var dist *math.Int | 72 | query := blocks.NewGenericQuery(msg.Query, enums.BlockType(msg.BType), msg.Flags) |
73 | var results []*store.DHTResult | ||
70 | 74 | ||
71 | //-------------------------------------------------------------- | 75 | //-------------------------------------------------------------- |
72 | // validate query (based on block type requested) (9.4.3.1) | 76 | // validate query (based on block type requested) (9.4.3.1) |
@@ -113,7 +117,8 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m | |||
113 | closest := m.rtable.IsClosestPeer(nil, addr, msg.PeerFilter, 0) | 117 | closest := m.rtable.IsClosestPeer(nil, addr, msg.PeerFilter, 0) |
114 | demux := int(msg.Flags)&enums.DHT_RO_DEMULTIPLEX_EVERYWHERE != 0 | 118 | demux := int(msg.Flags)&enums.DHT_RO_DEMULTIPLEX_EVERYWHERE != 0 |
115 | approx := int(msg.Flags)&enums.DHT_RO_FIND_APPROXIMATE != 0 | 119 | approx := int(msg.Flags)&enums.DHT_RO_FIND_APPROXIMATE != 0 |
116 | // actions | 120 | |
121 | // enforced actions | ||
117 | doResult := closest || (demux && approx) | 122 | doResult := closest || (demux && approx) |
118 | doForward := !closest || (demux && !approx) | 123 | doForward := !closest || (demux && !approx) |
119 | logger.Printf(logger.DBG, "[%s] GET message: closest=%v, demux=%v, approx=%v --> result=%v, forward=%v", | 124 | logger.Printf(logger.DBG, "[%s] GET message: closest=%v, demux=%v, approx=%v --> result=%v, forward=%v", |
@@ -122,53 +127,43 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m | |||
122 | //------------------------------------------------------ | 127 | //------------------------------------------------------ |
123 | // query for a HELLO? (9.4.3.3a) | 128 | // query for a HELLO? (9.4.3.3a) |
124 | if btype == enums.BLOCK_TYPE_DHT_URL_HELLO { | 129 | if btype == enums.BLOCK_TYPE_DHT_URL_HELLO { |
125 | // try to find result in HELLO cache | 130 | // try to find results in HELLO cache |
126 | entry, dist = m.getHelloCache(label, addr, rf) | 131 | results = m.lookupHelloCache(label, addr, rf, approx) |
127 | } | 132 | } |
133 | |||
128 | //-------------------------------------------------------------- | 134 | //-------------------------------------------------------------- |
129 | // find the closest block that has that is not filtered by the result | 135 | // query flags demand a result |
130 | // filter (in case we did not find an appropriate block in cache). | ||
131 | if doResult { | 136 | if doResult { |
132 | // save best-match values from cache | 137 | // if we don't have a result from cache or are in approx mode, |
133 | entryCache := entry | 138 | // try storage lookup |
134 | distCache := dist | 139 | if len(results) == 0 || approx { |
135 | dist = nil | 140 | // get results from local storage |
136 | 141 | lclResults, err := m.getLocalStorage(label, query, rf) | |
137 | // if we don't have an exact match, try storage lookup | 142 | if err == nil { |
138 | if entryCache == nil || (distCache != nil && !distCache.Equals(math.ZERO)) { | 143 | // append local results |
139 | // get entry from local storage | 144 | results = append(results, lclResults...) |
140 | var err error | ||
141 | if entry, dist, err = m.getLocalStorage(label, query, rf); err != nil { | ||
142 | entry = nil | ||
143 | dist = nil | ||
144 | } | ||
145 | // if we have a block from cache, check if it is better than the | ||
146 | // block found in the DHT | ||
147 | if entryCache != nil && dist != nil && distCache.Cmp(dist) < 0 { | ||
148 | entry = entryCache | ||
149 | dist = distCache | ||
150 | } | 145 | } |
151 | } | 146 | } |
152 | // if we have a block, send it as response | 147 | // if we have results, send them as response |
153 | if entry != nil { | 148 | for _, result := range results { |
149 | var pth *path.Path | ||
150 | // check if record the route | ||
151 | if msg.Flags&enums.DHT_RO_RECORD_ROUTE != 0 && result.Entry.Path != nil { | ||
152 | // update get path | ||
153 | pth = result.Entry.Path.Clone() | ||
154 | pth.SplitPos = pth.NumList | ||
155 | pe := pth.NewElement(pth.LastHop, local, back.Receiver()) | ||
156 | pth.Add(pe) | ||
157 | } | ||
158 | |||
154 | logger.Printf(logger.INFO, "[%s] sending DHT result message to caller", label) | 159 | logger.Printf(logger.INFO, "[%s] sending DHT result message to caller", label) |
155 | if err := m.sendResult(ctx, query, entry.Blk, back); err != nil { | 160 | if err := m.sendResult(ctx, query, result.Entry.Blk, pth, back); err != nil { |
156 | logger.Printf(logger.ERROR, "[%s] Failed to send DHT result message: %s", label, err.Error()) | 161 | logger.Printf(logger.ERROR, "[%s] Failed to send DHT result message: %s", label, err.Error()) |
157 | } | 162 | } |
158 | } | 163 | } |
159 | } | 164 | } |
160 | // check if we need to forward message based on filter result | 165 | //-------------------------------------------------------------- |
161 | if entry != nil && blockHdlr != nil { | 166 | // query flags demand a result |
162 | switch blockHdlr.FilterResult(entry.Blk, query.Key(), rf, msg.XQuery) { | ||
163 | case blocks.RF_LAST: | ||
164 | // no need for further results | ||
165 | case blocks.RF_MORE: | ||
166 | // possibly more results | ||
167 | doForward = true | ||
168 | case blocks.RF_DUPLICATE, blocks.RF_IRRELEVANT: | ||
169 | // do not forward | ||
170 | } | ||
171 | } | ||
172 | if doForward { | 167 | if doForward { |
173 | // build updated GET message | 168 | // build updated GET message |
174 | pf.Add(local) | 169 | pf.Add(local) |
@@ -195,6 +190,9 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m | |||
195 | } | 190 | } |
196 | logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message done", label) | 191 | logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message done", label) |
197 | 192 | ||
193 | //================================================================== | ||
194 | // DHT-P2P-PUT | ||
195 | //================================================================== | ||
198 | case *message.DHTP2PPutMsg: | 196 | case *message.DHTP2PPutMsg: |
199 | //---------------------------------------------------------- | 197 | //---------------------------------------------------------- |
200 | // DHT-P2P PUT | 198 | // DHT-P2P PUT |
@@ -267,29 +265,29 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m | |||
267 | logger.Printf(logger.ERROR, "[%s] failed to store DHT entry: %s", label, err.Error()) | 265 | logger.Printf(logger.ERROR, "[%s] failed to store DHT entry: %s", label, err.Error()) |
268 | } | 266 | } |
269 | } | 267 | } |
270 | |||
271 | //-------------------------------------------------------------- | 268 | //-------------------------------------------------------------- |
272 | // if the put is for a HELLO block, add the sender to the | 269 | // if the put is for a HELLO block, add the sender to the |
273 | // routing table (9.3.2.9) | 270 | // routing table (9.3.2.9) |
274 | if btype == enums.BLOCK_TYPE_DHT_HELLO { | 271 | if btype == enums.BLOCK_TYPE_DHT_HELLO { |
275 | // get addresses from HELLO block | 272 | // get addresses from HELLO block |
276 | hello, err := blocks.ParseHelloFromBytes(msg.Block) | 273 | hello, err := blocks.ParseHelloBlockFromBytes(msg.Block) |
277 | if err != nil { | 274 | if err != nil { |
278 | logger.Printf(logger.ERROR, "[%s] failed to parse HELLO block: %s", label, err.Error()) | 275 | logger.Printf(logger.ERROR, "[%s] failed to parse HELLO block: %s", label, err.Error()) |
279 | } else { | 276 | } else { |
280 | // check state of bucket for given address | 277 | // check state of bucket for given address |
281 | if m.rtable.Check(NewPeerAddress(sender)) == 0 { | 278 | if m.rtable.Check(NewPeerAddress(hello.PeerID)) == 0 { |
282 | // we could add the sender to the routing table | 279 | // we could add the sender to the routing table |
283 | for _, addr := range hello.Addresses() { | 280 | for _, addr := range hello.Addresses() { |
284 | if transport.CanHandleAddress(addr) { | 281 | if transport.CanHandleAddress(addr) { |
285 | // try to connect to peer (triggers EV_CONNECTED on success) | 282 | // try to connect to peer (triggers EV_CONNECTED on success) |
286 | m.core.TryConnect(sender, addr) | 283 | if err := m.core.TryConnect(sender, addr); err != nil { |
284 | logger.Printf(logger.ERROR, "[%s] try-connection to %s failed: %s", label, addr.URI(), err.Error()) | ||
285 | } | ||
287 | } | 286 | } |
288 | } | 287 | } |
289 | } | 288 | } |
290 | } | 289 | } |
291 | } | 290 | } |
292 | |||
293 | //-------------------------------------------------------------- | 291 | //-------------------------------------------------------------- |
294 | // check if we need to forward | 292 | // check if we need to forward |
295 | if !closest || demux { | 293 | if !closest || demux { |
@@ -325,24 +323,100 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m | |||
325 | } | 323 | } |
326 | logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-PUT message done", label) | 324 | logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-PUT message done", label) |
327 | 325 | ||
326 | //================================================================== | ||
327 | // DHT-P2P-RESULT | ||
328 | //================================================================== | ||
328 | case *message.DHTP2PResultMsg: | 329 | case *message.DHTP2PResultMsg: |
329 | //---------------------------------------------------------- | 330 | //---------------------------------------------------------- |
330 | // DHT-P2P RESULT | 331 | // DHT-P2P RESULT |
331 | //---------------------------------------------------------- | 332 | //---------------------------------------------------------- |
332 | logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT message", label) | 333 | logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT message for type %s", |
334 | label, enums.BlockType(msg.BType).String()) | ||
333 | 335 | ||
334 | // check task list for handler | 336 | //-------------------------------------------------------------- |
337 | // check if request is expired (9.5.2.1) | ||
338 | if msg.Expires.Expired() { | ||
339 | logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT message expired (%s)", | ||
340 | label, msg.Expires.String()) | ||
341 | return false | ||
342 | } | ||
343 | //-------------------------------------------------------------- | ||
344 | btype := enums.BlockType(msg.BType) | ||
345 | var blkKey *crypto.HashCode | ||
346 | blockHdlr, ok := blocks.BlockHandlers[btype] | ||
347 | if ok { | ||
348 | // reconstruct block instance | ||
349 | if block, err := blockHdlr.ParseBlock(msg.Block); err == nil { | ||
350 | // validate block (9.5.2.2) | ||
351 | if !blockHdlr.ValidateBlockStoreRequest(block) { | ||
352 | logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT invalid block -- discarded", label) | ||
353 | return false | ||
354 | } | ||
355 | // Compute block key (9.5.2.4) | ||
356 | blkKey = blockHdlr.DeriveBlockKey(block) | ||
357 | } | ||
358 | } else { | ||
359 | logger.Printf(logger.INFO, "[%s] No validator defined for block type %s", label, btype.String()) | ||
360 | blockHdlr = nil | ||
361 | } | ||
362 | //-------------------------------------------------------------- | ||
363 | // verify path (9.5.2.3) | ||
364 | var pth *path.Path | ||
365 | if msg.GetPathL+msg.PutPathL > 0 { | ||
366 | pth = msg.Path(sender) | ||
367 | pth.Verify(local) | ||
368 | } | ||
369 | //-------------------------------------------------------------- | ||
370 | // if the put is for a HELLO block, add the originator to the | ||
371 | // routing table (9.5.2.5) | ||
372 | if btype == enums.BLOCK_TYPE_DHT_HELLO { | ||
373 | // get addresses from HELLO block | ||
374 | hello, err := blocks.ParseHelloBlockFromBytes(msg.Block) | ||
375 | if err != nil { | ||
376 | logger.Printf(logger.ERROR, "[%s] failed to parse HELLO block: %s", label, err.Error()) | ||
377 | } else { | ||
378 | // check state of bucket for given address | ||
379 | if m.rtable.Check(NewPeerAddress(hello.PeerID)) == 0 { | ||
380 | // we could add the originator to the routing table | ||
381 | for _, addr := range hello.Addresses() { | ||
382 | if transport.CanHandleAddress(addr) { | ||
383 | // try to connect to peer (triggers EV_CONNECTED on success) | ||
384 | if err := m.core.TryConnect(sender, addr); err != nil { | ||
385 | logger.Printf(logger.ERROR, "[%s] try-connection to %s failed: %s", label, addr.URI(), err.Error()) | ||
386 | } | ||
387 | } | ||
388 | } | ||
389 | } | ||
390 | } | ||
391 | } | ||
392 | // message forwarding to responder | ||
335 | key := msg.Query.String() | 393 | key := msg.Query.String() |
336 | logger.Printf(logger.DBG, "[%s] DHT-P2P-RESULT key = %s", label, key) | 394 | logger.Printf(logger.DBG, "[%s] DHT-P2P-RESULT key = %s", label, key) |
337 | handled := false | 395 | handled := false |
338 | if list, ok := m.reshdlrs.Get(key); ok { | 396 | if list, ok := m.reshdlrs.Get(key); ok { |
339 | for _, rh := range list { | 397 | for _, rh := range list { |
340 | logger.Printf(logger.DBG, "[%s] Task #%d for DHT-P2P-RESULT found", label, rh.ID()) | 398 | logger.Printf(logger.DBG, "[%s] Task #%d for DHT-P2P-RESULT found", label, rh.ID()) |
341 | // handle the message | 399 | |
342 | go rh.Handle(ctx, msg) | 400 | //-------------------------------------------------------------- |
401 | // check task list for handler (9.5.2.6) | ||
402 | if rh.Flags()&enums.DHT_RO_FIND_APPROXIMATE == 0 && blkKey != nil && !blkKey.Equals(rh.Key()) { | ||
403 | // (9.5.2.6.a) derived key mismatch | ||
404 | logger.Printf(logger.ERROR, "[%s] derived block key / query key mismatch:", label) | ||
405 | logger.Printf(logger.ERROR, "[%s] --> %s != %s", label, blkKey.String(), rh.Key().String()) | ||
406 | return false | ||
407 | } | ||
408 | // (9.5.2.6.b+c) check block against query | ||
409 | /* | ||
410 | if blockHdlr != nil { | ||
411 | blockHdlr.FilterBlockResult(block, rh.Key()) | ||
412 | } | ||
413 | */ | ||
414 | |||
415 | //-------------------------------------------------------------- | ||
416 | // handle the message (forwarding) | ||
417 | go rh.Handle(ctx, msg, pth, sender, local) | ||
343 | handled = true | 418 | handled = true |
344 | } | 419 | } |
345 | return true | ||
346 | } | 420 | } |
347 | if !handled { | 421 | if !handled { |
348 | logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT not processed (no handler)", label) | 422 | logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT not processed (no handler)", label) |
@@ -350,6 +424,9 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m | |||
350 | logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT message done", label) | 424 | logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT message done", label) |
351 | return handled | 425 | return handled |
352 | 426 | ||
427 | //================================================================== | ||
428 | // DHT-P2P-HELLO | ||
429 | //================================================================== | ||
353 | case *message.DHTP2PHelloMsg: | 430 | case *message.DHTP2PHelloMsg: |
354 | //---------------------------------------------------------- | 431 | //---------------------------------------------------------- |
355 | // DHT-P2P HELLO | 432 | // DHT-P2P HELLO |
@@ -379,8 +456,8 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m | |||
379 | logger.Printf(logger.INFO, "[%s] Sending HELLO to %s: %s", label, sender, msgOut) | 456 | logger.Printf(logger.INFO, "[%s] Sending HELLO to %s: %s", label, sender, msgOut) |
380 | err = m.core.Send(ctx, sender, msgOut) | 457 | err = m.core.Send(ctx, sender, msgOut) |
381 | // no error if the message might have been sent | 458 | // no error if the message might have been sent |
382 | if err == transport.ErrEndpMaybeSent { | 459 | if err != nil && err != transport.ErrEndpMaybeSent { |
383 | err = nil | 460 | logger.Printf(logger.ERROR, "[%s] Failed to send HELLO message: %s", label, err.Error()) |
384 | } | 461 | } |
385 | } | 462 | } |
386 | 463 | ||
@@ -401,9 +478,9 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m | |||
401 | }) | 478 | }) |
402 | } | 479 | } |
403 | 480 | ||
404 | //-------------------------------------------------------------- | 481 | //================================================================== |
405 | // Legacy message types (not implemented) | 482 | // Legacy message types (not implemented) |
406 | //-------------------------------------------------------------- | 483 | //================================================================== |
407 | 484 | ||
408 | case *message.DHTClientPutMsg: | 485 | case *message.DHTClientPutMsg: |
409 | //---------------------------------------------------------- | 486 | //---------------------------------------------------------- |
@@ -446,7 +523,7 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m | |||
446 | } | 523 | } |
447 | 524 | ||
448 | // send a result back to caller | 525 | // send a result back to caller |
449 | func (m *Module) sendResult(ctx context.Context, query blocks.Query, blk blocks.Block, back transport.Responder) error { | 526 | func (m *Module) sendResult(ctx context.Context, query blocks.Query, blk blocks.Block, pth *path.Path, back transport.Responder) error { |
450 | // assemble result message | 527 | // assemble result message |
451 | out := message.NewDHTP2PResultMsg() | 528 | out := message.NewDHTP2PResultMsg() |
452 | out.BType = uint32(query.Type()) | 529 | out.BType = uint32(query.Type()) |
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go index 6207f94..5278b13 100644 --- a/src/gnunet/service/dht/module.go +++ b/src/gnunet/service/dht/module.go | |||
@@ -24,6 +24,7 @@ import ( | |||
24 | "gnunet/config" | 24 | "gnunet/config" |
25 | "gnunet/core" | 25 | "gnunet/core" |
26 | "gnunet/crypto" | 26 | "gnunet/crypto" |
27 | "gnunet/enums" | ||
27 | "gnunet/message" | 28 | "gnunet/message" |
28 | "gnunet/service" | 29 | "gnunet/service" |
29 | "gnunet/service/dht/blocks" | 30 | "gnunet/service/dht/blocks" |
@@ -33,7 +34,6 @@ import ( | |||
33 | "time" | 34 | "time" |
34 | 35 | ||
35 | "github.com/bfix/gospel/logger" | 36 | "github.com/bfix/gospel/logger" |
36 | "github.com/bfix/gospel/math" | ||
37 | ) | 37 | ) |
38 | 38 | ||
39 | //====================================================================== | 39 | //====================================================================== |
@@ -41,22 +41,51 @@ import ( | |||
41 | //====================================================================== | 41 | //====================================================================== |
42 | 42 | ||
43 | //---------------------------------------------------------------------- | 43 | //---------------------------------------------------------------------- |
44 | // Responder for local message handling | 44 | // Responder for local message handling (API, not message-based) |
45 | //---------------------------------------------------------------------- | 45 | //---------------------------------------------------------------------- |
46 | 46 | ||
47 | type LocalResponder struct { | 47 | // LocalBlockResponder is a message handler used to handle results for |
48 | ch chan blocks.Block // out-going channel for incoming blocks | 48 | // locally initiated GET calls |
49 | type LocalBlockResponder struct { | ||
50 | ch chan blocks.Block // out-going channel for incoming block results | ||
51 | rf blocks.ResultFilter // filter out duplicates | ||
49 | } | 52 | } |
50 | 53 | ||
51 | func (lr *LocalResponder) Send(ctx context.Context, msg message.Message) error { | 54 | // NewLocalBlockResponder returns a new instance |
55 | func NewLocalBlockResponder() *LocalBlockResponder { | ||
56 | return &LocalBlockResponder{ | ||
57 | ch: make(chan blocks.Block), | ||
58 | rf: blocks.NewGenericResultFilter(), | ||
59 | } | ||
60 | } | ||
61 | |||
62 | // C returns the back-channel | ||
63 | func (lr *LocalBlockResponder) C() <-chan blocks.Block { | ||
64 | return lr.ch | ||
65 | } | ||
66 | |||
67 | // Send interface method: dissect message and relay block if appropriate | ||
68 | func (lr *LocalBlockResponder) Send(ctx context.Context, msg message.Message) error { | ||
69 | // check if incoming message is a DHT-RESULT | ||
70 | switch res := msg.(type) { | ||
71 | case *message.DHTP2PResultMsg: | ||
72 | // deliver incoming blocks | ||
73 | go func() { | ||
74 | lr.ch <- blocks.NewGenericBlock(res.Block) | ||
75 | }() | ||
76 | default: | ||
77 | logger.Println(logger.WARN, "[local] not a DHT-RESULT -- skipped") | ||
78 | } | ||
52 | return nil | 79 | return nil |
53 | } | 80 | } |
54 | 81 | ||
55 | func (lr *LocalResponder) Receiver() string { | 82 | // Receiver is nil for local responders. |
56 | return "@" | 83 | func (lr *LocalBlockResponder) Receiver() *util.PeerID { |
84 | return nil | ||
57 | } | 85 | } |
58 | 86 | ||
59 | func (lr *LocalResponder) Close() { | 87 | // Close back-channel |
88 | func (lr *LocalBlockResponder) Close() { | ||
60 | close(lr.ch) | 89 | close(lr.ch) |
61 | } | 90 | } |
62 | 91 | ||
@@ -68,8 +97,9 @@ func (lr *LocalResponder) Close() { | |||
68 | type Module struct { | 97 | type Module struct { |
69 | service.ModuleImpl | 98 | service.ModuleImpl |
70 | 99 | ||
71 | store store.DHTStore // reference to the block storage mechanism | 100 | cfg *config.DHTConfig // configuraion parameters |
72 | core *core.Core // reference to core services | 101 | store *store.DHTStore // reference to the block storage mechanism |
102 | core *core.Core // reference to core services | ||
73 | 103 | ||
74 | rtable *RoutingTable // routing table | 104 | rtable *RoutingTable // routing table |
75 | lastHello *message.DHTP2PHelloMsg // last own HELLO message used; re-create if expired | 105 | lastHello *message.DHTP2PHelloMsg // last own HELLO message used; re-create if expired |
@@ -80,7 +110,7 @@ type Module struct { | |||
80 | // mechanism for persistence. | 110 | // mechanism for persistence. |
81 | func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Module, err error) { | 111 | func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Module, err error) { |
82 | // create permanent storage handler | 112 | // create permanent storage handler |
83 | var storage store.DHTStore | 113 | var storage *store.DHTStore |
84 | if storage, err = store.NewDHTStore(cfg.Storage); err != nil { | 114 | if storage, err = store.NewDHTStore(cfg.Storage); err != nil { |
85 | return | 115 | return |
86 | } | 116 | } |
@@ -90,6 +120,7 @@ func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Mod | |||
90 | // return module instance | 120 | // return module instance |
91 | m = &Module{ | 121 | m = &Module{ |
92 | ModuleImpl: *service.NewModuleImpl(), | 122 | ModuleImpl: *service.NewModuleImpl(), |
123 | cfg: cfg, | ||
93 | store: storage, | 124 | store: storage, |
94 | core: c, | 125 | core: c, |
95 | rtable: rt, | 126 | rtable: rt, |
@@ -99,18 +130,58 @@ func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Mod | |||
99 | pulse := time.Duration(cfg.Heartbeat) * time.Second | 130 | pulse := time.Duration(cfg.Heartbeat) * time.Second |
100 | listener := m.Run(ctx, m.event, m.Filter(), pulse, m.heartbeat) | 131 | listener := m.Run(ctx, m.event, m.Filter(), pulse, m.heartbeat) |
101 | c.Register("dht", listener) | 132 | c.Register("dht", listener) |
133 | |||
134 | // run periodic tasks (8.2. peer discovery) | ||
135 | ticker := time.NewTicker(5 * time.Minute) | ||
136 | key := crypto.Hash(m.core.PeerID().Data) | ||
137 | flags := uint16(enums.DHT_RO_FIND_APPROXIMATE | enums.DHT_RO_DEMULTIPLEX_EVERYWHERE) | ||
138 | var resCh <-chan blocks.Block | ||
139 | go func() { | ||
140 | for { | ||
141 | select { | ||
142 | // initiate peer discovery | ||
143 | case <-ticker.C: | ||
144 | // query DHT for our own HELLO block | ||
145 | query := blocks.NewGenericQuery(key, enums.BLOCK_TYPE_DHT_HELLO, flags) | ||
146 | resCh = m.Get(ctx, query) | ||
147 | |||
148 | // handle peer discover results | ||
149 | case res := <-resCh: | ||
150 | // check for correct type | ||
151 | btype := res.Type() | ||
152 | if btype == enums.BLOCK_TYPE_DHT_HELLO { | ||
153 | hb, ok := res.(*blocks.HelloBlock) | ||
154 | if !ok { | ||
155 | logger.Printf(logger.WARN, "[dht] peer discovery received invalid block data") | ||
156 | } else { | ||
157 | // cache HELLO block | ||
158 | m.rtable.CacheHello(hb) | ||
159 | // add sender to routing table | ||
160 | m.rtable.Add(NewPeerAddress(hb.PeerID), "dht") | ||
161 | } | ||
162 | } else { | ||
163 | logger.Printf(logger.WARN, "[dht] peer discovery received invalid block type %s", btype.String()) | ||
164 | } | ||
165 | |||
166 | // termination | ||
167 | case <-ctx.Done(): | ||
168 | ticker.Stop() | ||
169 | return | ||
170 | } | ||
171 | } | ||
172 | }() | ||
102 | return | 173 | return |
103 | } | 174 | } |
104 | 175 | ||
105 | //---------------------------------------------------------------------- | 176 | //---------------------------------------------------------------------- |
106 | // DHT methods for local use | 177 | // DHT methods for local use (API) |
107 | //---------------------------------------------------------------------- | 178 | //---------------------------------------------------------------------- |
108 | 179 | ||
109 | // Get blocks from the DHT ["dht:get"] | 180 | // Get blocks from the DHT ["dht:get"] |
110 | // Locally request blocks for a given query. The res channel will deliver the | 181 | // Locally request blocks for a given query. The res channel will deliver the |
111 | // returned results to the caller; the channel is closed if no further blocks | 182 | // returned results to the caller; the channel is closed if no further blocks |
112 | // are expected or the query times out. | 183 | // are expected or the query times out. |
113 | func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.Block) { | 184 | func (m *Module) Get(ctx context.Context, query blocks.Query) <-chan blocks.Block { |
114 | // get the block handler for given block type to construct an empty | 185 | // get the block handler for given block type to construct an empty |
115 | // result filter. If no handler is defined, a default PassResultFilter | 186 | // result filter. If no handler is defined, a default PassResultFilter |
116 | // is created. | 187 | // is created. |
@@ -123,14 +194,14 @@ func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.B | |||
123 | logger.Println(logger.WARN, "[dht] unknown result filter implementation -- skipped") | 194 | logger.Println(logger.WARN, "[dht] unknown result filter implementation -- skipped") |
124 | } | 195 | } |
125 | // get additional query parameters | 196 | // get additional query parameters |
126 | xquery, ok := util.GetParam[[]byte](query.Params(), "xquery") | 197 | xquery, _ := util.GetParam[[]byte](query.Params(), "xquery") |
127 | 198 | ||
128 | // assemble a new GET message | 199 | // assemble a new GET message |
129 | msg := message.NewDHTP2PGetMsg() | 200 | msg := message.NewDHTP2PGetMsg() |
130 | msg.BType = uint32(query.Type()) | 201 | msg.BType = uint32(query.Type()) |
131 | msg.Flags = query.Flags() | 202 | msg.Flags = query.Flags() |
132 | msg.HopCount = 0 | 203 | msg.HopCount = 0 |
133 | msg.ReplLevel = 10 | 204 | msg.ReplLevel = uint16(m.cfg.Routing.ReplLevel) |
134 | msg.PeerFilter = blocks.NewPeerFilter() | 205 | msg.PeerFilter = blocks.NewPeerFilter() |
135 | msg.ResFilter = rf.Bytes() | 206 | msg.ResFilter = rf.Bytes() |
136 | msg.RfSize = uint16(len(msg.ResFilter)) | 207 | msg.RfSize = uint16(len(msg.ResFilter)) |
@@ -138,15 +209,13 @@ func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.B | |||
138 | msg.MsgSize += msg.RfSize + uint16(len(xquery)) | 209 | msg.MsgSize += msg.RfSize + uint16(len(xquery)) |
139 | 210 | ||
140 | // compose a response channel and handler | 211 | // compose a response channel and handler |
141 | res = make(chan blocks.Block) | 212 | hdlr := NewLocalBlockResponder() |
142 | hdlr := &LocalResponder{ | 213 | |
143 | ch: res, | ||
144 | } | ||
145 | // time-out handling | 214 | // time-out handling |
146 | ttl, ok := util.GetParam[time.Duration](query.Params(), "timeout") | 215 | ttl, ok := util.GetParam[time.Duration](query.Params(), "timeout") |
147 | if !ok { | 216 | if !ok { |
148 | // defaults to 10 minutes | 217 | // defaults to 10 minutes |
149 | ttl = 600 * time.Second | 218 | ttl = 10 * time.Minute |
150 | } | 219 | } |
151 | lctx, cancel := context.WithTimeout(ctx, ttl) | 220 | lctx, cancel := context.WithTimeout(ctx, ttl) |
152 | 221 | ||
@@ -159,23 +228,12 @@ func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.B | |||
159 | hdlr.Close() | 228 | hdlr.Close() |
160 | cancel() | 229 | cancel() |
161 | }() | 230 | }() |
162 | return res | 231 | return hdlr.C() |
163 | } | 232 | } |
164 | 233 | ||
165 | // GetApprox returns the first block not excluded ["dht:getapprox"] | 234 | // GetApprox returns the first block not excluded ["dht:getapprox"] |
166 | func (m *Module) GetApprox(ctx context.Context, query blocks.Query, excl func(*store.DHTEntry) bool) (entry *store.DHTEntry, dist *math.Int, err error) { | 235 | func (m *Module) GetApprox(ctx context.Context, query blocks.Query, rf blocks.ResultFilter) (results []*store.DHTResult, err error) { |
167 | var val any | 236 | return m.store.GetApprox("dht", query, rf) |
168 | if entry, val, err = m.store.GetApprox(query, excl); err != nil { | ||
169 | return | ||
170 | } | ||
171 | hc, ok := val.(*crypto.HashCode) | ||
172 | if !ok { | ||
173 | err = errors.New("no approx result") | ||
174 | } | ||
175 | asked := NewQueryAddress(query.Key()) | ||
176 | found := NewQueryAddress(hc) | ||
177 | dist, _ = found.Distance(asked) | ||
178 | return | ||
179 | } | 237 | } |
180 | 238 | ||
181 | // Put a block into the DHT ["dht:put"] | 239 | // Put a block into the DHT ["dht:put"] |
@@ -191,7 +249,7 @@ func (m *Module) Put(ctx context.Context, query blocks.Query, block blocks.Block | |||
191 | msg.Flags = query.Flags() | 249 | msg.Flags = query.Flags() |
192 | msg.HopCount = 0 | 250 | msg.HopCount = 0 |
193 | msg.PeerFilter = blocks.NewPeerFilter() | 251 | msg.PeerFilter = blocks.NewPeerFilter() |
194 | msg.ReplLvl = 10 | 252 | msg.ReplLvl = uint16(m.cfg.Routing.ReplLevel) |
195 | msg.Expiration = expire | 253 | msg.Expiration = expire |
196 | msg.Block = block.Bytes() | 254 | msg.Block = block.Bytes() |
197 | msg.Key = query.Key().Clone() | 255 | msg.Key = query.Key().Clone() |
diff --git a/src/gnunet/service/dht/path/elements.go b/src/gnunet/service/dht/path/elements.go index 5e26e57..53d1cfd 100644 --- a/src/gnunet/service/dht/path/elements.go +++ b/src/gnunet/service/dht/path/elements.go | |||
@@ -38,8 +38,8 @@ var ( | |||
38 | //---------------------------------------------------------------------- | 38 | //---------------------------------------------------------------------- |
39 | // Entry is an element of the path list | 39 | // Entry is an element of the path list |
40 | type Entry struct { | 40 | type Entry struct { |
41 | Signer *util.PeerID // path element signer | ||
42 | Signature *util.PeerSignature // path element signature | 41 | Signature *util.PeerSignature // path element signature |
42 | Signer *util.PeerID // path element signer | ||
43 | } | 43 | } |
44 | 44 | ||
45 | // Size returns the size of a path element in wire format | 45 | // Size returns the size of a path element in wire format |
diff --git a/src/gnunet/service/dht/path/handling.go b/src/gnunet/service/dht/path/handling.go index 412f646..b225c82 100644 --- a/src/gnunet/service/dht/path/handling.go +++ b/src/gnunet/service/dht/path/handling.go | |||
@@ -35,8 +35,8 @@ import ( | |||
35 | 35 | ||
36 | // path flags | 36 | // path flags |
37 | const ( | 37 | const ( |
38 | PathTruncated = iota | 38 | PathTruncated = 1 |
39 | PathLastHop | 39 | PathLastHop = 2 |
40 | ) | 40 | ) |
41 | 41 | ||
42 | // Path is the complete list of verified hops a message travelled. | 42 | // Path is the complete list of verified hops a message travelled. |
@@ -48,6 +48,7 @@ type Path struct { | |||
48 | Expire util.AbsoluteTime `` // expiration time | 48 | Expire util.AbsoluteTime `` // expiration time |
49 | TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin (optional) | 49 | TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin (optional) |
50 | NumList uint16 `order:"big"` // number of list entries | 50 | NumList uint16 `order:"big"` // number of list entries |
51 | SplitPos uint16 `order:"big"` // optional split position | ||
51 | List []*Entry `size:"NumList"` // list of path entries | 52 | List []*Entry `size:"NumList"` // list of path entries |
52 | LastSig *util.PeerSignature `opt:"(Isused)"` // last hop signature | 53 | LastSig *util.PeerSignature `opt:"(Isused)"` // last hop signature |
53 | LastHop *util.PeerID `opt:"(IsUsed)"` // last hop peer id | 54 | LastHop *util.PeerID `opt:"(IsUsed)"` // last hop peer id |
@@ -72,6 +73,7 @@ func NewPath(bh *crypto.HashCode, expire util.AbsoluteTime) *Path { | |||
72 | Expire: expire, | 73 | Expire: expire, |
73 | TruncOrigin: nil, | 74 | TruncOrigin: nil, |
74 | NumList: 0, | 75 | NumList: 0, |
76 | SplitPos: 0, | ||
75 | List: make([]*Entry, 0), | 77 | List: make([]*Entry, 0), |
76 | LastSig: nil, | 78 | LastSig: nil, |
77 | LastHop: nil, | 79 | LastHop: nil, |
@@ -81,7 +83,7 @@ func NewPath(bh *crypto.HashCode, expire util.AbsoluteTime) *Path { | |||
81 | // NewPathFromBytes reconstructs a path instance from binary data. The layout | 83 | // NewPathFromBytes reconstructs a path instance from binary data. The layout |
82 | // of the data must match with the layout used in Path.Bytes(). | 84 | // of the data must match with the layout used in Path.Bytes(). |
83 | func NewPathFromBytes(buf []byte) (path *Path, err error) { | 85 | func NewPathFromBytes(buf []byte) (path *Path, err error) { |
84 | if buf == nil || len(buf) == 0 { | 86 | if len(buf) == 0 { |
85 | return | 87 | return |
86 | } | 88 | } |
87 | path = new(Path) | 89 | path = new(Path) |
@@ -116,6 +118,7 @@ func (p *Path) Clone() *Path { | |||
116 | Expire: p.Expire, | 118 | Expire: p.Expire, |
117 | TruncOrigin: p.TruncOrigin, | 119 | TruncOrigin: p.TruncOrigin, |
118 | NumList: p.NumList, | 120 | NumList: p.NumList, |
121 | SplitPos: p.SplitPos, | ||
119 | List: util.Clone(p.List), | 122 | List: util.Clone(p.List), |
120 | LastSig: p.LastSig, | 123 | LastSig: p.LastSig, |
121 | LastHop: p.LastHop, | 124 | LastHop: p.LastHop, |
diff --git a/src/gnunet/service/dht/resulthandler.go b/src/gnunet/service/dht/resulthandler.go index faab93e..85895f8 100644 --- a/src/gnunet/service/dht/resulthandler.go +++ b/src/gnunet/service/dht/resulthandler.go | |||
@@ -22,8 +22,10 @@ import ( | |||
22 | "bytes" | 22 | "bytes" |
23 | "context" | 23 | "context" |
24 | "gnunet/crypto" | 24 | "gnunet/crypto" |
25 | "gnunet/enums" | ||
25 | "gnunet/message" | 26 | "gnunet/message" |
26 | "gnunet/service/dht/blocks" | 27 | "gnunet/service/dht/blocks" |
28 | "gnunet/service/dht/path" | ||
27 | "gnunet/transport" | 29 | "gnunet/transport" |
28 | "gnunet/util" | 30 | "gnunet/util" |
29 | "time" | 31 | "time" |
@@ -52,7 +54,10 @@ type ResultHandler interface { | |||
52 | Done() bool | 54 | Done() bool |
53 | 55 | ||
54 | // Key returns the query/store key as string | 56 | // Key returns the query/store key as string |
55 | Key() string | 57 | Key() *crypto.HashCode |
58 | |||
59 | // Flags returns the query flags | ||
60 | Flags() uint16 | ||
56 | 61 | ||
57 | // Compare two result handlers | 62 | // Compare two result handlers |
58 | Compare(ResultHandler) int | 63 | Compare(ResultHandler) int |
@@ -61,7 +66,7 @@ type ResultHandler interface { | |||
61 | Merge(ResultHandler) bool | 66 | Merge(ResultHandler) bool |
62 | 67 | ||
63 | // Handle result message | 68 | // Handle result message |
64 | Handle(context.Context, *message.DHTP2PResultMsg) bool | 69 | Handle(ctx context.Context, msg *message.DHTP2PResultMsg, pth *path.Path, sender, local *util.PeerID) bool |
65 | } | 70 | } |
66 | 71 | ||
67 | // Compare return values | 72 | // Compare return values |
@@ -108,8 +113,13 @@ func (t *GenericResultHandler) ID() int { | |||
108 | } | 113 | } |
109 | 114 | ||
110 | // Key returns the key string | 115 | // Key returns the key string |
111 | func (t *GenericResultHandler) Key() string { | 116 | func (t *GenericResultHandler) Key() *crypto.HashCode { |
112 | return t.key.String() | 117 | return t.key |
118 | } | ||
119 | |||
120 | // Flags returns the query flags | ||
121 | func (t *GenericResultHandler) Flags() uint16 { | ||
122 | return t.flags | ||
113 | } | 123 | } |
114 | 124 | ||
115 | // Done returns true if the result handler is no longer active. | 125 | // Done returns true if the result handler is no longer active. |
@@ -176,15 +186,28 @@ func NewForwardResultHandler(msgIn message.Message, rf blocks.ResultFilter, back | |||
176 | } | 186 | } |
177 | 187 | ||
178 | // Handle incoming DHT-P2P-RESULT message | 188 | // Handle incoming DHT-P2P-RESULT message |
179 | func (t *ForwardResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg) bool { | 189 | func (t *ForwardResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg, pth *path.Path, sender, local *util.PeerID) bool { |
180 | // don't send result if it is filtered out | 190 | // don't send result if it is filtered out |
181 | if !t.Proceed(ctx, msg) { | 191 | if !t.Proceed(ctx, msg) { |
182 | logger.Printf(logger.DBG, "[dht-task-%d] result filtered out -- already known", t.id) | 192 | logger.Printf(logger.DBG, "[dht-task-%d] result filtered out -- already known", t.id) |
183 | return false | 193 | return false |
184 | } | 194 | } |
195 | // extend path if route is recorded | ||
196 | pp := pth.Clone() | ||
197 | if msg.Flags&enums.DHT_RO_RECORD_ROUTE != 0 { | ||
198 | // yes: add path element for remote receivers | ||
199 | if rcv := t.resp.Receiver(); rcv != nil { | ||
200 | pe := pp.NewElement(sender, local, rcv) | ||
201 | pp.Add(pe) | ||
202 | } | ||
203 | } | ||
204 | |||
205 | // build updated PUT message | ||
206 | msgOut := msg.Update(pp) | ||
207 | |||
185 | // send result message back to originator (result forwarding). | 208 | // send result message back to originator (result forwarding). |
186 | logger.Printf(logger.INFO, "[dht-task-%d] sending result back to originator", t.id) | 209 | logger.Printf(logger.INFO, "[dht-task-%d] sending result back to originator", t.id) |
187 | if err := t.resp.Send(ctx, msg); err != nil && err != transport.ErrEndpMaybeSent { | 210 | if err := t.resp.Send(ctx, msgOut); err != nil && err != transport.ErrEndpMaybeSent { |
188 | logger.Printf(logger.ERROR, "[dht-task-%d] sending result back to originator failed: %s", t.id, err.Error()) | 211 | logger.Printf(logger.ERROR, "[dht-task-%d] sending result back to originator failed: %s", t.id, err.Error()) |
189 | return false | 212 | return false |
190 | } | 213 | } |
@@ -200,7 +223,7 @@ func (t *ForwardResultHandler) Compare(h ResultHandler) int { | |||
200 | return RHC_DIFFER | 223 | return RHC_DIFFER |
201 | } | 224 | } |
202 | // check for same recipient | 225 | // check for same recipient |
203 | if ht.resp.Receiver() != t.resp.Receiver() { | 226 | if ht.resp.Receiver().Equals(t.resp.Receiver()) { |
204 | logger.Printf(logger.DBG, "[frh] recipients differ: %s -- %s", ht.resp.Receiver(), t.resp.Receiver()) | 227 | logger.Printf(logger.DBG, "[frh] recipients differ: %s -- %s", ht.resp.Receiver(), t.resp.Receiver()) |
205 | return RHC_DIFFER | 228 | return RHC_DIFFER |
206 | } | 229 | } |
@@ -237,7 +260,7 @@ func (t *ForwardResultHandler) Merge(h ResultHandler) bool { | |||
237 | //---------------------------------------------------------------------- | 260 | //---------------------------------------------------------------------- |
238 | 261 | ||
239 | // ResultHandlerFcn is the function prototype for custom handlers: | 262 | // ResultHandlerFcn is the function prototype for custom handlers: |
240 | type ResultHandlerFcn func(context.Context, *message.DHTP2PResultMsg, chan<- any) bool | 263 | type ResultHandlerFcn func(context.Context, *message.DHTP2PResultMsg, *path.Path, chan<- any) bool |
241 | 264 | ||
242 | // DirectResultHandler for local DHT-P2P-GET requests | 265 | // DirectResultHandler for local DHT-P2P-GET requests |
243 | type DirectResultHandler struct { | 266 | type DirectResultHandler struct { |
@@ -262,7 +285,7 @@ func NewDirectResultHandler(msgIn message.Message, rf blocks.ResultFilter, hdlr | |||
262 | } | 285 | } |
263 | 286 | ||
264 | // Handle incoming DHT-P2P-RESULT message | 287 | // Handle incoming DHT-P2P-RESULT message |
265 | func (t *DirectResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg) bool { | 288 | func (t *DirectResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg, pth *path.Path, sender, local *util.PeerID) bool { |
266 | // don't send result if it is filtered out | 289 | // don't send result if it is filtered out |
267 | if !t.Proceed(ctx, msg) { | 290 | if !t.Proceed(ctx, msg) { |
268 | logger.Printf(logger.DBG, "[dht-task-%d] result filtered out -- already known", t.id) | 291 | logger.Printf(logger.DBG, "[dht-task-%d] result filtered out -- already known", t.id) |
@@ -271,7 +294,7 @@ func (t *DirectResultHandler) Handle(ctx context.Context, msg *message.DHTP2PRes | |||
271 | // check for correct message type and handler function | 294 | // check for correct message type and handler function |
272 | if t.hdlr != nil { | 295 | if t.hdlr != nil { |
273 | logger.Printf(logger.INFO, "[dht-task-%d] handling result message", t.id) | 296 | logger.Printf(logger.INFO, "[dht-task-%d] handling result message", t.id) |
274 | return t.hdlr(ctx, msg, t.rc) | 297 | return t.hdlr(ctx, msg, pth, t.rc) |
275 | } | 298 | } |
276 | return false | 299 | return false |
277 | } | 300 | } |
@@ -318,7 +341,7 @@ func NewResultHandlerList() *ResultHandlerList { | |||
318 | // Add handler to list | 341 | // Add handler to list |
319 | func (t *ResultHandlerList) Add(hdlr ResultHandler) bool { | 342 | func (t *ResultHandlerList) Add(hdlr ResultHandler) bool { |
320 | // get current list of handlers for key | 343 | // get current list of handlers for key |
321 | key := hdlr.Key() | 344 | key := hdlr.Key().String() |
322 | list, ok := t.list.Get(key, 0) | 345 | list, ok := t.list.Get(key, 0) |
323 | modified := false | 346 | modified := false |
324 | if !ok { | 347 | if !ok { |
diff --git a/src/gnunet/service/dht/routingtable.go b/src/gnunet/service/dht/routingtable.go index 98f0819..20bf5fc 100644 --- a/src/gnunet/service/dht/routingtable.go +++ b/src/gnunet/service/dht/routingtable.go | |||
@@ -25,6 +25,7 @@ import ( | |||
25 | "gnunet/config" | 25 | "gnunet/config" |
26 | "gnunet/crypto" | 26 | "gnunet/crypto" |
27 | "gnunet/service/dht/blocks" | 27 | "gnunet/service/dht/blocks" |
28 | "gnunet/service/store" | ||
28 | "gnunet/util" | 29 | "gnunet/util" |
29 | "sync" | 30 | "sync" |
30 | "time" | 31 | "time" |
@@ -84,11 +85,7 @@ func (addr *PeerAddress) Equals(p *PeerAddress) bool { | |||
84 | // Distance between two addresses: returns a distance value and a | 85 | // Distance between two addresses: returns a distance value and a |
85 | // bucket index (smaller index = less distant). | 86 | // bucket index (smaller index = less distant). |
86 | func (addr *PeerAddress) Distance(p *PeerAddress) (*math.Int, int) { | 87 | func (addr *PeerAddress) Distance(p *PeerAddress) (*math.Int, int) { |
87 | d := make([]byte, 64) | 88 | r := util.Distance(addr.Key.Bits, p.Key.Bits) |
88 | for i := range d { | ||
89 | d[i] = addr.Key.Bits[i] ^ p.Key.Bits[i] | ||
90 | } | ||
91 | r := math.NewIntFromBytes(d) | ||
92 | return r, 512 - r.BitLen() | 89 | return r, 512 - r.BitLen() |
93 | } | 90 | } |
94 | 91 | ||
@@ -371,18 +368,29 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) { | |||
371 | 368 | ||
372 | //---------------------------------------------------------------------- | 369 | //---------------------------------------------------------------------- |
373 | 370 | ||
374 | func (rt *RoutingTable) BestHello(addr *PeerAddress, rf blocks.ResultFilter) (hb *blocks.HelloBlock, dist *math.Int) { | 371 | // LookupHello returns blocks from the HELLO cache for given query. |
375 | // iterate over cached HELLOs to find (best) match first | 372 | func (rt *RoutingTable) LookupHello(addr *PeerAddress, rf blocks.ResultFilter, approx bool) (results []*store.DHTResult) { |
376 | _ = rt.helloCache.ProcessRange(func(key string, val *blocks.HelloBlock, _ int) error { | 373 | // iterate over cached HELLOs to find matches; |
374 | // approximate search is limited by distance (max. diff for bucket index is 16) | ||
375 | _ = rt.helloCache.ProcessRange(func(key string, hb *blocks.HelloBlock, _ int) error { | ||
377 | // check if block is excluded by result filter | 376 | // check if block is excluded by result filter |
378 | if !rf.Contains(val) { | 377 | var result *store.DHTResult |
379 | // check for better match | 378 | if !rf.Contains(hb) { |
380 | p := NewPeerAddress(val.PeerID) | 379 | // no: possible result, compute distance |
381 | d, _ := addr.Distance(p) | 380 | p := NewPeerAddress(hb.PeerID) |
382 | if hb == nil || d.Cmp(dist) < 0 { | 381 | dist, idx := addr.Distance(p) |
383 | hb = val | 382 | result = &store.DHTResult{ |
384 | dist = d | 383 | Entry: &store.DHTEntry{ |
384 | Blk: hb, | ||
385 | }, | ||
386 | Dist: dist, | ||
387 | } | ||
388 | // check if we need to add result | ||
389 | if (approx && idx < 16) || idx == 0 { | ||
390 | results = append(results, result) | ||
385 | } | 391 | } |
392 | } else { | ||
393 | logger.Printf(logger.DBG, "[RT] GET-HELLO: cache block is filtered") | ||
386 | } | 394 | } |
387 | return nil | 395 | return nil |
388 | }, true) | 396 | }, true) |
diff --git a/src/gnunet/service/dht/routingtable_test.go b/src/gnunet/service/dht/routingtable_test.go index 42e2349..02ddc52 100644 --- a/src/gnunet/service/dht/routingtable_test.go +++ b/src/gnunet/service/dht/routingtable_test.go | |||
@@ -70,7 +70,7 @@ func TestRT(t *testing.T) { | |||
70 | // helper functions | 70 | // helper functions |
71 | genRemotePeer := func() *PeerAddress { | 71 | genRemotePeer := func() *PeerAddress { |
72 | d := make([]byte, 32) | 72 | d := make([]byte, 32) |
73 | _, _ = rand.Read(d) | 73 | _, _ = rand.Read(d) //nolint:gosec // good enough for testing |
74 | return NewPeerAddress(util.NewPeerID(d)) | 74 | return NewPeerAddress(util.NewPeerID(d)) |
75 | } | 75 | } |
76 | 76 | ||
@@ -86,10 +86,10 @@ func TestRT(t *testing.T) { | |||
86 | for i := range tasks { | 86 | for i := range tasks { |
87 | tasks[i] = new(Entry) | 87 | tasks[i] = new(Entry) |
88 | tasks[i].addr = genRemotePeer() | 88 | tasks[i].addr = genRemotePeer() |
89 | tasks[i].born = rand.Int63n(EPOCHS) | 89 | tasks[i].born = rand.Int63n(EPOCHS) //nolint:gosec // good enough for testing |
90 | tasks[i].ttl = 1000 + rand.Int63n(7000) | 90 | tasks[i].ttl = 1000 + rand.Int63n(7000) //nolint:gosec // good enough for testing |
91 | tasks[i].drop = 2000 + rand.Int63n(3000) | 91 | tasks[i].drop = 2000 + rand.Int63n(3000) //nolint:gosec // good enough for testing |
92 | tasks[i].revive = rand.Int63n(2000) | 92 | tasks[i].revive = rand.Int63n(2000) //nolint:gosec // good enough for testing |
93 | tasks[i].online = false | 93 | tasks[i].online = false |
94 | } | 94 | } |
95 | 95 | ||
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go index c431d48..0f4781d 100644 --- a/src/gnunet/service/gns/module.go +++ b/src/gnunet/service/gns/module.go | |||
@@ -200,6 +200,7 @@ func (m *Module) ResolveAbsolute( | |||
200 | // ResolveRelative resolves a relative path (to a given zone) recursively by | 200 | // ResolveRelative resolves a relative path (to a given zone) recursively by |
201 | // processing simple (PKEY,Label) lookups in sequence and handle intermediate | 201 | // processing simple (PKEY,Label) lookups in sequence and handle intermediate |
202 | // GNS record types | 202 | // GNS record types |
203 | //nolint:gocyclo // life sometimes is complex... | ||
203 | func (m *Module) ResolveRelative( | 204 | func (m *Module) ResolveRelative( |
204 | ctx context.Context, | 205 | ctx context.Context, |
205 | labels []string, | 206 | labels []string, |
diff --git a/src/gnunet/service/namecache/module.go b/src/gnunet/service/namecache/module.go index 642355b..616c0b5 100644 --- a/src/gnunet/service/namecache/module.go +++ b/src/gnunet/service/namecache/module.go | |||
@@ -20,6 +20,7 @@ package namecache | |||
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "context" | 22 | "context" |
23 | "errors" | ||
23 | "gnunet/config" | 24 | "gnunet/config" |
24 | "gnunet/core" | 25 | "gnunet/core" |
25 | "gnunet/service" | 26 | "gnunet/service" |
@@ -39,7 +40,7 @@ import ( | |||
39 | type Module struct { | 40 | type Module struct { |
40 | service.ModuleImpl | 41 | service.ModuleImpl |
41 | 42 | ||
42 | cache store.DHTStore // transient block cache | 43 | cache *store.DHTStore // transient block cache |
43 | } | 44 | } |
44 | 45 | ||
45 | // NewModule creates a new module instance. | 46 | // NewModule creates a new module instance. |
@@ -67,13 +68,18 @@ func (m *Module) Import(fcm map[string]any) { | |||
67 | 68 | ||
68 | //---------------------------------------------------------------------- | 69 | //---------------------------------------------------------------------- |
69 | 70 | ||
70 | // Get an entry from the cache if available. | 71 | // Get entry from the cache if available. |
71 | func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) { | 72 | func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) { |
72 | var e *store.DHTEntry | 73 | var e []*store.DHTEntry |
73 | if e, err = m.cache.Get(query); err != nil { | 74 | rf := blocks.NewGenericResultFilter() |
75 | if e, err = m.cache.Get("namecache", query, rf); err != nil { | ||
74 | return | 76 | return |
75 | } | 77 | } |
76 | err = blocks.Unwrap(e.Blk, block) | 78 | if len(e) != 1 { |
79 | err = errors.New("only one DHT entry exppected") | ||
80 | } else { | ||
81 | err = blocks.Unwrap(e[0].Blk, block) | ||
82 | } | ||
77 | return | 83 | return |
78 | } | 84 | } |
79 | 85 | ||
diff --git a/src/gnunet/service/store/store_fs.go b/src/gnunet/service/store/store_dht.go index bbbe7cc..802cf60 100644 --- a/src/gnunet/service/store/store_fs.go +++ b/src/gnunet/service/store/store_dht.go | |||
@@ -21,6 +21,7 @@ package store | |||
21 | import ( | 21 | import ( |
22 | "encoding/hex" | 22 | "encoding/hex" |
23 | "fmt" | 23 | "fmt" |
24 | "gnunet/crypto" | ||
24 | "gnunet/service/dht/blocks" | 25 | "gnunet/service/dht/blocks" |
25 | "gnunet/service/dht/path" | 26 | "gnunet/service/dht/path" |
26 | "gnunet/util" | 27 | "gnunet/util" |
@@ -35,9 +36,60 @@ import ( | |||
35 | // Filesystem-based storage | 36 | // Filesystem-based storage |
36 | //============================================================ | 37 | //============================================================ |
37 | 38 | ||
38 | // FileStore implements a filesystem-based storage mechanism for | 39 | //------------------------------------------------------------ |
40 | // DHT entry is an entity stored in the DHT | ||
41 | //------------------------------------------------------------ | ||
42 | |||
43 | // DHTEntry to be stored to/retrieved from local storage | ||
44 | type DHTEntry struct { | ||
45 | Blk blocks.Block // reference to DHT block | ||
46 | Path *path.Path // associated put path | ||
47 | } | ||
48 | |||
49 | //------------------------------------------------------------ | ||
50 | // DHT result is a single DHT result | ||
51 | //------------------------------------------------------------ | ||
52 | |||
53 | // Result as returned by local DHT queries | ||
54 | type DHTResult struct { | ||
55 | Entry *DHTEntry // reference to DHT entry | ||
56 | Dist *math.Int // distance of entry to query key | ||
57 | } | ||
58 | |||
59 | //------------------------------------------------------------ | ||
60 | |||
61 | type DHTResultSet struct { | ||
62 | list []*DHTResult // list of DHT results | ||
63 | pos int // iterator position | ||
64 | } | ||
65 | |||
66 | func NewDHTResultSet() *DHTResultSet { | ||
67 | return &DHTResultSet{ | ||
68 | list: make([]*DHTResult, 0), | ||
69 | pos: 0, | ||
70 | } | ||
71 | } | ||
72 | |||
73 | func (rs *DHTResultSet) Add(r *DHTResult) { | ||
74 | rs.list = append(rs.list, r) | ||
75 | } | ||
76 | |||
77 | func (rs *DHTResultSet) Next() (result *DHTResult) { | ||
78 | if rs.pos == len(rs.list) { | ||
79 | return nil | ||
80 | } | ||
81 | result = rs.list[rs.pos] | ||
82 | rs.pos++ | ||
83 | return | ||
84 | } | ||
85 | |||
86 | //------------------------------------------------------------ | ||
87 | // DHT store | ||
88 | //------------------------------------------------------------ | ||
89 | |||
90 | // DHTStore implements a filesystem-based storage mechanism for | ||
39 | // DHT queries and blocks. | 91 | // DHT queries and blocks. |
40 | type FileStore struct { | 92 | type DHTStore struct { |
41 | path string // storage path | 93 | path string // storage path |
42 | cache bool // storage works as cache | 94 | cache bool // storage works as cache |
43 | args util.ParameterSet // arguments / settings | 95 | args util.ParameterSet // arguments / settings |
@@ -53,10 +105,10 @@ type FileStore struct { | |||
53 | size int // size of cache (number of entries) | 105 | size int // size of cache (number of entries) |
54 | } | 106 | } |
55 | 107 | ||
56 | // NewFileStore instantiates a new file storage. | 108 | // NewDHTStore instantiates a new file storage handler. |
57 | func NewFileStore(spec util.ParameterSet) (DHTStore, error) { | 109 | func NewDHTStore(spec util.ParameterSet) (*DHTStore, error) { |
58 | // create file store handler | 110 | // create file store handler |
59 | fs := new(FileStore) | 111 | fs := new(DHTStore) |
60 | fs.args = spec | 112 | fs.args = spec |
61 | 113 | ||
62 | // get parameter | 114 | // get parameter |
@@ -93,7 +145,7 @@ func NewFileStore(spec util.ParameterSet) (DHTStore, error) { | |||
93 | } | 145 | } |
94 | 146 | ||
95 | // Close file storage. | 147 | // Close file storage. |
96 | func (s *FileStore) Close() (err error) { | 148 | func (s *DHTStore) Close() (err error) { |
97 | if !s.cache { | 149 | if !s.cache { |
98 | // close database connection | 150 | // close database connection |
99 | err = s.meta.Close() | 151 | err = s.meta.Close() |
@@ -102,7 +154,7 @@ func (s *FileStore) Close() (err error) { | |||
102 | } | 154 | } |
103 | 155 | ||
104 | // Put block into storage under given key | 156 | // Put block into storage under given key |
105 | func (s *FileStore) Put(query blocks.Query, entry *DHTEntry) (err error) { | 157 | func (s *DHTStore) Put(query blocks.Query, entry *DHTEntry) (err error) { |
106 | // check for free space | 158 | // check for free space |
107 | if !s.cache { | 159 | if !s.cache { |
108 | if int(s.totalSize>>30) > s.maxSpace { | 160 | if int(s.totalSize>>30) > s.maxSpace { |
@@ -122,9 +174,10 @@ func (s *FileStore) Put(query blocks.Query, entry *DHTEntry) (err error) { | |||
122 | // compile metadata | 174 | // compile metadata |
123 | now := util.AbsoluteTimeNow() | 175 | now := util.AbsoluteTimeNow() |
124 | meta := &FileMetadata{ | 176 | meta := &FileMetadata{ |
125 | key: query.Key().Bits, | 177 | key: query.Key(), |
126 | size: uint64(blkSize), | 178 | size: uint64(blkSize), |
127 | btype: btype, | 179 | btype: btype, |
180 | bhash: crypto.Hash(entry.Blk.Bytes()), | ||
128 | expires: expire, | 181 | expires: expire, |
129 | stored: now, | 182 | stored: now, |
130 | lastUsed: now, | 183 | lastUsed: now, |
@@ -146,75 +199,73 @@ func (s *FileStore) Put(query blocks.Query, entry *DHTEntry) (err error) { | |||
146 | } | 199 | } |
147 | 200 | ||
148 | // Get block with given key from storage | 201 | // Get block with given key from storage |
149 | func (s *FileStore) Get(query blocks.Query) (entry *DHTEntry, err error) { | 202 | func (s *DHTStore) Get(label string, query blocks.Query, rf blocks.ResultFilter) (results []*DHTEntry, err error) { |
150 | // check if we have metadata for the query | 203 | // check if we have metadata for the query |
151 | key := query.Key().Bits | 204 | var mds []*FileMetadata |
152 | btype := query.Type() | 205 | if mds, err = s.meta.Get(query); err != nil || len(mds) == 0 { |
153 | var md *FileMetadata | ||
154 | if md, err = s.meta.Get(key, btype); err != nil || md == nil { | ||
155 | return | 206 | return |
156 | } | 207 | } |
157 | // check for expired entry | 208 | // traverse list of results |
158 | if md.expires.Expired() { | 209 | for _, md := range mds { |
159 | err = s.dropFile(md) | 210 | // check for expired entry |
160 | return | 211 | if md.expires.Expired() { |
161 | } | 212 | if err = s.dropFile(md); err != nil { |
162 | // mark the block as newly used | 213 | logger.Printf(logger.ERROR, "[%s] can't drop DHT file: %s", label, err) |
163 | if err = s.meta.Used(key, btype); err != nil { | 214 | } |
164 | return | 215 | continue |
216 | } | ||
217 | // check for filtered block | ||
218 | if rf.ContainsHash(md.bhash) { | ||
219 | continue | ||
220 | } | ||
221 | // read entry from storage | ||
222 | var entry *DHTEntry | ||
223 | if entry, err = s.readEntry(md.key.Bits); err != nil { | ||
224 | logger.Printf(logger.ERROR, "[%s] can't read DHT entry: %s", label, err) | ||
225 | continue | ||
226 | } | ||
227 | results = append(results, entry) | ||
228 | // mark the block as newly used | ||
229 | if err = s.meta.Used(md.key.Bits, md.btype); err != nil { | ||
230 | logger.Printf(logger.ERROR, "[%s] can't flag DHT entry as used: %s", label, err) | ||
231 | continue | ||
232 | } | ||
165 | } | 233 | } |
166 | return s.readEntry(key) | 234 | return |
167 | } | 235 | } |
168 | 236 | ||
169 | // GetApprox returns the best-matching value with given key from storage | 237 | // GetApprox returns the best-matching value with given key from storage |
170 | // that is not excluded | 238 | // that is not excluded |
171 | func (s *FileStore) GetApprox(query blocks.Query, excl func(*DHTEntry) bool) (entry *DHTEntry, key any, err error) { | 239 | func (s *DHTStore) GetApprox(label string, query blocks.Query, rf blocks.ResultFilter) (results []*DHTResult, err error) { |
172 | var bestKey []byte | 240 | // iterate over all keys; process each metadata instance |
173 | var bestEntry *DHTEntry | 241 | // (append to results if appropriate) |
174 | var bestDist *math.Int | 242 | process := func(md *FileMetadata) { |
175 | // distance function | 243 | // check for filtered block. |
176 | dist := func(a, b []byte) *math.Int { | 244 | if rf.ContainsHash(md.bhash) { |
177 | c := make([]byte, len(a)) | 245 | // filtered out... |
178 | for i := range a { | 246 | return |
179 | c[i] = a[i] ^ b[i] | ||
180 | } | 247 | } |
181 | return math.NewIntFromBytes(c) | 248 | // check distance (max. 16 bucktes off) |
182 | } | 249 | dist := util.Distance(md.key.Bits, query.Key().Bits) |
183 | // iterate over all keys | 250 | if (512 - dist.BitLen()) > 16 { |
184 | check := func(md *FileMetadata) { | 251 | return |
185 | // check for better match | ||
186 | d := dist(md.key, query.Key().Bits) | ||
187 | var entry *DHTEntry | ||
188 | if bestKey == nil || d.Cmp(bestDist) < 0 { | ||
189 | // we might have a match. check block for exclusion | ||
190 | if entry, err = s.readEntry(md.key); err != nil { | ||
191 | logger.Printf(logger.ERROR, "[dhtstore] failed to retrieve block for %s", hex.EncodeToString(md.key)) | ||
192 | return | ||
193 | } | ||
194 | if excl(entry) { | ||
195 | return | ||
196 | } | ||
197 | // remember best match | ||
198 | bestKey = md.key | ||
199 | bestEntry = entry | ||
200 | bestDist = d | ||
201 | } | 252 | } |
202 | } | 253 | // read entry from storage |
203 | if err = s.meta.Traverse(check); err != nil { | 254 | var entry *DHTEntry |
204 | return | 255 | if entry, err = s.readEntry(md.key.Bits); err != nil { |
205 | } | 256 | logger.Printf(logger.ERROR, "[%s] failed to retrieve block for %s", label, md.key.String()) |
206 | if bestEntry != nil { | ||
207 | // mark the block as newly used | ||
208 | if err = s.meta.Used(bestKey, bestEntry.Blk.Type()); err != nil { | ||
209 | return | 257 | return |
210 | } | 258 | } |
259 | // add to result list | ||
260 | result := &DHTResult{ | ||
261 | Entry: entry, | ||
262 | Dist: dist, | ||
263 | } | ||
264 | results = append(results, result) | ||
211 | } | 265 | } |
212 | return bestEntry, bestDist, nil | 266 | // traverse mestadata database |
213 | } | 267 | err = s.meta.Traverse(process) |
214 | 268 | return | |
215 | // Get a list of all stored block keys (generic query). | ||
216 | func (s *FileStore) List() ([]blocks.Query, error) { | ||
217 | return nil, ErrStoreNoList | ||
218 | } | 269 | } |
219 | 270 | ||
220 | //---------------------------------------------------------------------- | 271 | //---------------------------------------------------------------------- |
@@ -227,7 +278,7 @@ type entryLayout struct { | |||
227 | } | 278 | } |
228 | 279 | ||
229 | // read entry from storage for given key | 280 | // read entry from storage for given key |
230 | func (s *FileStore) readEntry(key []byte) (entry *DHTEntry, err error) { | 281 | func (s *DHTStore) readEntry(key []byte) (entry *DHTEntry, err error) { |
231 | // get path and filename from key | 282 | // get path and filename from key |
232 | folder, fname := s.expandPath(key) | 283 | folder, fname := s.expandPath(key) |
233 | 284 | ||
@@ -255,7 +306,7 @@ func (s *FileStore) readEntry(key []byte) (entry *DHTEntry, err error) { | |||
255 | } | 306 | } |
256 | 307 | ||
257 | // write entry to storage for given key | 308 | // write entry to storage for given key |
258 | func (s *FileStore) writeEntry(key []byte, entry *DHTEntry) (err error) { | 309 | func (s *DHTStore) writeEntry(key []byte, entry *DHTEntry) (err error) { |
259 | // get folder and filename from key | 310 | // get folder and filename from key |
260 | folder, fname := s.expandPath(key) | 311 | folder, fname := s.expandPath(key) |
261 | // make sure the folder exists | 312 | // make sure the folder exists |
@@ -287,14 +338,14 @@ func (s *FileStore) writeEntry(key []byte, entry *DHTEntry) (err error) { | |||
287 | //---------------------------------------------------------------------- | 338 | //---------------------------------------------------------------------- |
288 | 339 | ||
289 | // expandPath returns the full path to the file for given key. | 340 | // expandPath returns the full path to the file for given key. |
290 | func (s *FileStore) expandPath(key []byte) (string, string) { | 341 | func (s *DHTStore) expandPath(key []byte) (string, string) { |
291 | h := hex.EncodeToString(key) | 342 | h := hex.EncodeToString(key) |
292 | return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:] | 343 | return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:] |
293 | } | 344 | } |
294 | 345 | ||
295 | // Prune list of file headers so we drop at least n entries. | 346 | // Prune list of file headers so we drop at least n entries. |
296 | // returns number of removed entries. | 347 | // returns number of removed entries. |
297 | func (s *FileStore) prune(n int) (del int) { | 348 | func (s *DHTStore) prune(n int) (del int) { |
298 | // collect obsolete records | 349 | // collect obsolete records |
299 | obsolete, err := s.meta.Obsolete(n) | 350 | obsolete, err := s.meta.Obsolete(n) |
300 | if err != nil { | 351 | if err != nil { |
@@ -311,16 +362,17 @@ func (s *FileStore) prune(n int) (del int) { | |||
311 | } | 362 | } |
312 | 363 | ||
313 | // drop file removes a file from metadatabase and the physical storage. | 364 | // drop file removes a file from metadatabase and the physical storage. |
314 | func (s *FileStore) dropFile(md *FileMetadata) (err error) { | 365 | func (s *DHTStore) dropFile(md *FileMetadata) (err error) { |
315 | // adjust total size | 366 | // adjust total size |
316 | s.totalSize -= md.size | 367 | s.totalSize -= md.size |
317 | // remove from database | 368 | // remove from database |
318 | if err = s.meta.Drop(md.key, md.btype); err != nil { | 369 | if err = s.meta.Drop(md.key.Bits, md.btype); err != nil { |
319 | logger.Printf(logger.ERROR, "[store] can't remove metadata (%s,%d): %s", md.key, md.btype, err.Error()) | 370 | logger.Printf(logger.ERROR, "[store] can't remove metadata (%s,%d): %s", md.key, md.btype, err.Error()) |
320 | return | 371 | return |
321 | } | 372 | } |
322 | // remove from filesystem | 373 | // remove from filesystem |
323 | path := fmt.Sprintf("%s/%s/%s/%s", s.path, md.key[:2], md.key[2:4], md.key[4:]) | 374 | h := hex.EncodeToString(md.key.Bits) |
375 | path := fmt.Sprintf("%s/%s/%s/%s", s.path, h[:2], h[2:4], h[4:]) | ||
324 | if err = os.Remove(path); err != nil { | 376 | if err = os.Remove(path); err != nil { |
325 | logger.Printf(logger.ERROR, "[store] can't remove file %s: %s", path, err.Error()) | 377 | logger.Printf(logger.ERROR, "[store] can't remove file %s: %s", path, err.Error()) |
326 | } | 378 | } |
diff --git a/src/gnunet/service/store/store_fs_meta.go b/src/gnunet/service/store/store_dht_meta.go index 5710286..9ebe387 100644 --- a/src/gnunet/service/store/store_fs_meta.go +++ b/src/gnunet/service/store/store_dht_meta.go | |||
@@ -21,7 +21,9 @@ package store | |||
21 | import ( | 21 | import ( |
22 | "database/sql" | 22 | "database/sql" |
23 | _ "embed" | 23 | _ "embed" |
24 | "gnunet/crypto" | ||
24 | "gnunet/enums" | 25 | "gnunet/enums" |
26 | "gnunet/service/dht/blocks" | ||
25 | "gnunet/util" | 27 | "gnunet/util" |
26 | "os" | 28 | "os" |
27 | ) | 29 | ) |
@@ -33,21 +35,30 @@ import ( | |||
33 | // FileMetadata holds information about a file (raw block data) | 35 | // FileMetadata holds information about a file (raw block data) |
34 | // and is stored in a SQL database for faster access. | 36 | // and is stored in a SQL database for faster access. |
35 | type FileMetadata struct { | 37 | type FileMetadata struct { |
36 | key []byte // storage key | 38 | key *crypto.HashCode // storage key |
37 | size uint64 // size of file | 39 | size uint64 // size of file |
38 | btype enums.BlockType // block type | 40 | btype enums.BlockType // block type |
41 | bhash *crypto.HashCode // block hash | ||
39 | stored util.AbsoluteTime // time added to store | 42 | stored util.AbsoluteTime // time added to store |
40 | expires util.AbsoluteTime // expiration time | 43 | expires util.AbsoluteTime // expiration time |
41 | lastUsed util.AbsoluteTime // time last used | 44 | lastUsed util.AbsoluteTime // time last used |
42 | usedCount uint64 // usage count | 45 | usedCount uint64 // usage count |
43 | } | 46 | } |
44 | 47 | ||
48 | // NewFileMetadata creates a new file metadata instance | ||
49 | func NewFileMetadata() *FileMetadata { | ||
50 | return &FileMetadata{ | ||
51 | key: crypto.NewHashCode(nil), | ||
52 | bhash: crypto.NewHashCode(nil), | ||
53 | } | ||
54 | } | ||
55 | |||
45 | //------------------------------------------------------------ | 56 | //------------------------------------------------------------ |
46 | // Metadata database: A SQLite3 database to hold metadata about | 57 | // Metadata database: A SQLite3 database to hold metadata about |
47 | // blocks in file storage | 58 | // blocks in file storage |
48 | //------------------------------------------------------------ | 59 | //------------------------------------------------------------ |
49 | 60 | ||
50 | //go:embed store_fs_meta.sql | 61 | //go:embed store_dht_meta.sql |
51 | var initScript []byte | 62 | var initScript []byte |
52 | 63 | ||
53 | // FileMetaDB is a SQLite3 database for block metadata | 64 | // FileMetaDB is a SQLite3 database for block metadata |
@@ -86,42 +97,67 @@ func OpenMetaDB(path string) (db *FileMetaDB, err error) { | |||
86 | // Store metadata in database: creates or updates a record for the metadata | 97 | // Store metadata in database: creates or updates a record for the metadata |
87 | // in the database; primary key is the query key | 98 | // in the database; primary key is the query key |
88 | func (db *FileMetaDB) Store(md *FileMetadata) (err error) { | 99 | func (db *FileMetaDB) Store(md *FileMetadata) (err error) { |
89 | sql := "replace into meta(qkey,btype,size,stored,expires,lastUsed,usedCount) values(?,?,?,?,?,?,?)" | 100 | sql := "replace into meta(qkey,btype,bhash,size,stored,expires,lastUsed,usedCount) values(?,?,?,?,?,?,?,?)" |
90 | _, err = db.conn.Exec(sql, md.key, md.btype, md.size, md.stored.Epoch(), md.expires.Epoch(), md.lastUsed.Epoch(), md.usedCount) | 101 | _, err = db.conn.Exec(sql, |
102 | md.key.Bits, md.btype, md.bhash.Bits, md.size, md.stored.Epoch(), | ||
103 | md.expires.Epoch(), md.lastUsed.Epoch(), md.usedCount) | ||
91 | return | 104 | return |
92 | } | 105 | } |
93 | 106 | ||
94 | // Get block metadata from database | 107 | // Get block metadata from database |
95 | func (db *FileMetaDB) Get(key []byte, btype enums.BlockType) (md *FileMetadata, err error) { | 108 | func (db *FileMetaDB) Get(query blocks.Query) (mds []*FileMetadata, err error) { |
96 | md = new(FileMetadata) | 109 | // select rows in database matching the query |
97 | md.key = util.Clone(key) | 110 | stmt := "select size,bhash,stored,expires,lastUsed,usedCount from meta where qkey=?" |
98 | md.btype = btype | 111 | btype := query.Type() |
99 | stmt := "select size,stored,expires,lastUsed,usedCount from meta where qkey=? and btype=?" | 112 | var rows *sql.Rows |
100 | row := db.conn.QueryRow(stmt, key, btype) | 113 | if btype == enums.BLOCK_TYPE_ANY { |
101 | var st, exp, lu uint64 | 114 | rows, err = db.conn.Query(stmt, query.Key().Bits) |
102 | if err = row.Scan(&md.size, &st, &exp, &lu, &md.usedCount); err != nil { | ||
103 | if err == sql.ErrNoRows { | ||
104 | md = nil | ||
105 | err = nil | ||
106 | } | ||
107 | } else { | 115 | } else { |
116 | rows, err = db.conn.Query(stmt+" and btype=?", query.Key().Bits, btype) | ||
117 | } | ||
118 | if err != nil { | ||
119 | return | ||
120 | } | ||
121 | // process results | ||
122 | for rows.Next() { | ||
123 | md := NewFileMetadata() | ||
124 | md.key = query.Key() | ||
125 | md.btype = btype | ||
126 | var st, exp, lu uint64 | ||
127 | if err = rows.Scan(&md.size, &md.bhash.Bits, &st, &exp, &lu, &md.usedCount); err != nil { | ||
128 | if err == sql.ErrNoRows { | ||
129 | md = nil | ||
130 | err = nil | ||
131 | } | ||
132 | return | ||
133 | } | ||
108 | md.stored.Val = st * 1000000 | 134 | md.stored.Val = st * 1000000 |
109 | md.expires.Val = exp * 1000000 | 135 | md.expires.Val = exp * 1000000 |
110 | md.lastUsed.Val = lu * 1000000 | 136 | md.lastUsed.Val = lu * 1000000 |
137 | mds = append(mds, md) | ||
111 | } | 138 | } |
112 | return | 139 | return |
113 | } | 140 | } |
114 | 141 | ||
115 | // Drop metadata for block from database | 142 | // Drop metadata for block from database |
116 | func (db *FileMetaDB) Drop(key []byte, btype enums.BlockType) error { | 143 | func (db *FileMetaDB) Drop(key []byte, btype enums.BlockType) (err error) { |
117 | _, err := db.conn.Exec("delete from meta where qkey=? and btype=?", key, btype) | 144 | if btype != enums.BLOCK_TYPE_ANY { |
118 | return err | 145 | _, err = db.conn.Exec("delete from meta where qkey=? and btype=?", key, btype) |
146 | } else { | ||
147 | _, err = db.conn.Exec("delete from meta where qkey=?", key) | ||
148 | } | ||
149 | return | ||
119 | } | 150 | } |
120 | 151 | ||
121 | // Used a block from store: increment usage count and lastUsed time. | 152 | // Used a block from store: increment usage count and lastUsed time. |
122 | func (db *FileMetaDB) Used(key []byte, btype enums.BlockType) error { | 153 | func (db *FileMetaDB) Used(key []byte, btype enums.BlockType) (err error) { |
123 | _, err := db.conn.Exec("update meta set usedCount=usedCount+1,lastUsed=unixepoch() where qkey=? and btype=?", key, btype) | 154 | stmt := "update meta set usedCount=usedCount+1,lastUsed=unixepoch() where qkey=?" |
124 | return err | 155 | if btype != enums.BLOCK_TYPE_ANY { |
156 | _, err = db.conn.Exec(stmt+" and btype=?", key, btype) | ||
157 | } else { | ||
158 | _, err = db.conn.Exec(stmt, key) | ||
159 | } | ||
160 | return | ||
125 | } | 161 | } |
126 | 162 | ||
127 | // Obsolete collects records from the meta database that are considered | 163 | // Obsolete collects records from the meta database that are considered |
@@ -150,15 +186,15 @@ func (db *FileMetaDB) Obsolete(n int) (removable []*FileMetadata, err error) { | |||
150 | 186 | ||
151 | // Traverse metadata records and call function on each record | 187 | // Traverse metadata records and call function on each record |
152 | func (db *FileMetaDB) Traverse(f func(*FileMetadata)) error { | 188 | func (db *FileMetaDB) Traverse(f func(*FileMetadata)) error { |
153 | sql := "select qkey,btype,size,stored,expires,lastUsed,usedCount from meta" | 189 | sql := "select qkey,btype,bhash,size,stored,expires,lastUsed,usedCount from meta" |
154 | rows, err := db.conn.Query(sql) | 190 | rows, err := db.conn.Query(sql) |
155 | if err != nil { | 191 | if err != nil { |
156 | return err | 192 | return err |
157 | } | 193 | } |
158 | md := new(FileMetadata) | 194 | md := NewFileMetadata() |
159 | for rows.Next() { | 195 | for rows.Next() { |
160 | var st, exp, lu uint64 | 196 | var st, exp, lu uint64 |
161 | err = rows.Scan(&md.key, &md.btype, &md.size, &st, &exp, &lu, &md.usedCount) | 197 | err = rows.Scan(&md.key.Bits, &md.btype, &md.bhash.Bits, &md.size, &st, &exp, &lu, &md.usedCount) |
162 | if err != nil { | 198 | if err != nil { |
163 | return err | 199 | return err |
164 | } | 200 | } |
diff --git a/src/gnunet/service/store/store_fs_meta.sql b/src/gnunet/service/store/store_dht_meta.sql index a2692ab..ba9fe1f 100644 --- a/src/gnunet/service/store/store_fs_meta.sql +++ b/src/gnunet/service/store/store_dht_meta.sql | |||
@@ -19,6 +19,7 @@ | |||
19 | create table meta ( | 19 | create table meta ( |
20 | qkey blob, -- key (SHA512 hash) | 20 | qkey blob, -- key (SHA512 hash) |
21 | btype integer, -- block type | 21 | btype integer, -- block type |
22 | bhash blob, -- block hash | ||
22 | size integer, -- size of file | 23 | size integer, -- size of file |
23 | stored integer, -- time added to store | 24 | stored integer, -- time added to store |
24 | expires integer, -- expiration time | 25 | expires integer, -- expiration time |
diff --git a/src/gnunet/service/store/dhtstore_test.go b/src/gnunet/service/store/store_dht_test.go index ba5fae2..9f80380 100644 --- a/src/gnunet/service/store/dhtstore_test.go +++ b/src/gnunet/service/store/store_dht_test.go | |||
@@ -52,21 +52,27 @@ func TestDHTFilesStore(t *testing.T) { | |||
52 | 52 | ||
53 | // create file store | 53 | // create file store |
54 | if _, err := os.Stat(path); err != nil { | 54 | if _, err := os.Stat(path); err != nil { |
55 | os.MkdirAll(path, 0755) | 55 | if err = os.MkdirAll(path, 0755); err != nil { |
56 | t.Fatal(err) | ||
57 | } | ||
56 | } | 58 | } |
57 | fs, err := NewFileStore(cfg) | 59 | fs, err := NewDHTStore(cfg) |
58 | if err != nil { | 60 | if err != nil { |
59 | t.Fatal(err) | 61 | t.Fatal(err) |
60 | } | 62 | } |
61 | // allocate keys | 63 | // allocate keys |
62 | keys := make([]blocks.Query, 0, fsNumBlocks) | 64 | keys := make([]blocks.Query, 0, fsNumBlocks) |
65 | // create result filter | ||
66 | rf := blocks.NewGenericResultFilter() | ||
63 | 67 | ||
64 | // First round: save blocks | 68 | // First round: save blocks |
65 | for i := 0; i < fsNumBlocks; i++ { | 69 | for i := 0; i < fsNumBlocks; i++ { |
66 | // generate random block | 70 | // generate random block |
67 | size := 1024 + rand.Intn(62000) | 71 | size := 1024 + rand.Intn(62000) //nolint:gosec // good enough for testing |
68 | buf := make([]byte, size) | 72 | buf := make([]byte, size) |
69 | rand.Read(buf) | 73 | if _, err = rand.Read(buf); err != nil { //nolint:gosec // good enough for testing |
74 | t.Fatal(err) | ||
75 | } | ||
70 | blk := blocks.NewGenericBlock(buf) | 76 | blk := blocks.NewGenericBlock(buf) |
71 | // generate associated key | 77 | // generate associated key |
72 | k := crypto.Hash(buf) | 78 | k := crypto.Hash(buf) |
@@ -86,11 +92,14 @@ func TestDHTFilesStore(t *testing.T) { | |||
86 | // Second round: retrieve blocks and check | 92 | // Second round: retrieve blocks and check |
87 | for i, key := range keys { | 93 | for i, key := range keys { |
88 | // get block | 94 | // get block |
89 | val, err := fs.Get(key) | 95 | vals, err := fs.Get("test", key, rf) |
90 | if err != nil { | 96 | if err != nil { |
91 | t.Fatalf("[%d] %s", i, err) | 97 | t.Fatalf("[%d] %s", i, err) |
92 | } | 98 | } |
93 | buf := val.Blk.Bytes() | 99 | if len(vals) != 1 { |
100 | t.Fatalf("[%d] only one result expected", i) | ||
101 | } | ||
102 | buf := vals[0].Blk.Bytes() | ||
94 | 103 | ||
95 | // re-create key | 104 | // re-create key |
96 | k := crypto.Hash(buf) | 105 | k := crypto.Hash(buf) |
diff --git a/src/gnunet/service/store/store.go b/src/gnunet/service/store/store_kv.go index ed55547..8bcb711 100644 --- a/src/gnunet/service/store/store.go +++ b/src/gnunet/service/store/store_kv.go | |||
@@ -24,8 +24,6 @@ import ( | |||
24 | _ "embed" // use embedded filesystem | 24 | _ "embed" // use embedded filesystem |
25 | "errors" | 25 | "errors" |
26 | "fmt" | 26 | "fmt" |
27 | "gnunet/service/dht/blocks" | ||
28 | "gnunet/service/dht/path" | ||
29 | "gnunet/util" | 27 | "gnunet/util" |
30 | 28 | ||
31 | redis "github.com/go-redis/redis/v8" | 29 | redis "github.com/go-redis/redis/v8" |
@@ -41,68 +39,24 @@ var ( | |||
41 | ) | 39 | ) |
42 | 40 | ||
43 | //------------------------------------------------------------ | 41 | //------------------------------------------------------------ |
44 | // Generic storage interface. Can be used for persistent or | ||
45 | // transient (caching) storage of key/value data. | ||
46 | //------------------------------------------------------------ | ||
47 | 42 | ||
48 | // Store is a key/value storage where the type of the key is either | 43 | // KVStore us a eneric key/value storage interface. Can be used for |
49 | // a SHA512 hash value or a string and the value is either a DHT | 44 | // persistent or transient (caching) storage of stringed key/value data. |
50 | // block or a string. It is possiblle to mix any key/value types, | 45 | type KVStore interface { |
51 | // but not used in this implementation. | ||
52 | type Store[K, V any] interface { | ||
53 | // Put value into storage under given key | 46 | // Put value into storage under given key |
54 | Put(key K, val V) error | 47 | Put(key string, val string) error |
55 | 48 | ||
56 | // Get value with given key from storage | 49 | // Get value with given key from storage |
57 | Get(key K) (V, error) | 50 | Get(key string) (string, error) |
58 | |||
59 | // GetApprox returns the best-matching value with given key from storage | ||
60 | // that is not excluded. | ||
61 | GetApprox(key K, excl func(V) bool) (V, any, error) | ||
62 | 51 | ||
63 | // List all store keys | 52 | // List all store keys |
64 | List() ([]K, error) | 53 | List() ([]string, error) |
65 | 54 | ||
66 | // Close store | 55 | // Close store |
67 | Close() error | 56 | Close() error |
68 | } | 57 | } |
69 | 58 | ||
70 | //------------------------------------------------------------ | 59 | //------------------------------------------------------------ |
71 | // Types for custom store requirements | ||
72 | //------------------------------------------------------------ | ||
73 | |||
74 | // DHTEntry to be stored/retrieved | ||
75 | type DHTEntry struct { | ||
76 | Blk blocks.Block | ||
77 | Path *path.Path | ||
78 | } | ||
79 | |||
80 | // DHTStore for DHT queries and blocks | ||
81 | type DHTStore Store[blocks.Query, *DHTEntry] | ||
82 | |||
83 | // KVStore for key/value string pairs | ||
84 | type KVStore Store[string, string] | ||
85 | |||
86 | //------------------------------------------------------------ | ||
87 | // NewDHTStore creates a new storage handler with given spec | ||
88 | // for use with DHT queries and blocks | ||
89 | func NewDHTStore(spec util.ParameterSet) (DHTStore, error) { | ||
90 | // get the mode parameter | ||
91 | mode, ok := util.GetParam[string](spec, "mode") | ||
92 | if !ok { | ||
93 | return nil, ErrStoreInvalidSpec | ||
94 | } | ||
95 | switch mode { | ||
96 | //------------------------------------------------------------------ | ||
97 | // File-base storage | ||
98 | //------------------------------------------------------------------ | ||
99 | case "file": | ||
100 | return NewFileStore(spec) | ||
101 | } | ||
102 | return nil, ErrStoreUnknown | ||
103 | } | ||
104 | |||
105 | //------------------------------------------------------------ | ||
106 | // NewKVStore creates a new storage handler with given spec | 60 | // NewKVStore creates a new storage handler with given spec |
107 | // for use with key/value string pairs. | 61 | // for use with key/value string pairs. |
108 | func NewKVStore(spec util.ParameterSet) (KVStore, error) { | 62 | func NewKVStore(spec util.ParameterSet) (KVStore, error) { |
@@ -178,11 +132,6 @@ func (s *RedisStore) Get(key string) (value string, err error) { | |||
178 | return s.client.Get(context.TODO(), key).Result() | 132 | return s.client.Get(context.TODO(), key).Result() |
179 | } | 133 | } |
180 | 134 | ||
181 | // GetApprox returns the best-matching value for given key from storage | ||
182 | func (s *RedisStore) GetApprox(key string, crit func(string) bool) (value string, vkey any, err error) { | ||
183 | return "", "", ErrStoreNoApprox | ||
184 | } | ||
185 | |||
186 | // List all keys in store | 135 | // List all keys in store |
187 | func (s *RedisStore) List() (keys []string, err error) { | 136 | func (s *RedisStore) List() (keys []string, err error) { |
188 | var ( | 137 | var ( |
@@ -255,11 +204,6 @@ func (s *SQLStore) Get(key string) (value string, err error) { | |||
255 | return | 204 | return |
256 | } | 205 | } |
257 | 206 | ||
258 | // GetApprox returns the best-matching value for given key from storage | ||
259 | func (s *SQLStore) GetApprox(key string, crit func(string) bool) (value string, vkey any, err error) { | ||
260 | return "", "", ErrStoreNoApprox | ||
261 | } | ||
262 | |||
263 | // List all keys in store | 207 | // List all keys in store |
264 | func (s *SQLStore) List() (keys []string, err error) { | 208 | func (s *SQLStore) List() (keys []string, err error) { |
265 | var ( | 209 | var ( |
diff --git a/src/gnunet/transport/responder.go b/src/gnunet/transport/responder.go index 7032c78..be958bf 100644 --- a/src/gnunet/transport/responder.go +++ b/src/gnunet/transport/responder.go | |||
@@ -33,8 +33,9 @@ type Responder interface { | |||
33 | // Handle outgoing message | 33 | // Handle outgoing message |
34 | Send(ctx context.Context, msg message.Message) error | 34 | Send(ctx context.Context, msg message.Message) error |
35 | 35 | ||
36 | // Receiver returns the receiving peer (string representation) | 36 | // Receiver returns the receiving peer. Returns nil if |
37 | Receiver() string | 37 | // this is a local responder (service.Connection) |
38 | Receiver() *util.PeerID | ||
38 | } | 39 | } |
39 | 40 | ||
40 | //---------------------------------------------------------------------- | 41 | //---------------------------------------------------------------------- |
@@ -55,6 +56,6 @@ func (r *TransportResponder) Send(ctx context.Context, msg message.Message) erro | |||
55 | } | 56 | } |
56 | 57 | ||
57 | // Receiver returns the receiving peer id | 58 | // Receiver returns the receiving peer id |
58 | func (r *TransportResponder) Receiver() string { | 59 | func (r *TransportResponder) Receiver() *util.PeerID { |
59 | return r.Peer.String() | 60 | return r.Peer |
60 | } | 61 | } |
diff --git a/src/gnunet/util/map.go b/src/gnunet/util/map.go index ab48089..2237188 100644 --- a/src/gnunet/util/map.go +++ b/src/gnunet/util/map.go | |||
@@ -152,7 +152,7 @@ func (m *Map[K, V]) GetRandom(pid int) (key K, value V, ok bool) { | |||
152 | 152 | ||
153 | ok = false | 153 | ok = false |
154 | if size := m.Size(); size > 0 { | 154 | if size := m.Size(); size > 0 { |
155 | idx := rand.Intn(size) | 155 | idx := rand.Intn(size) //nolint:gosec // good enough for selection |
156 | for key, value = range m.list { | 156 | for key, value = range m.list { |
157 | if idx == 0 { | 157 | if idx == 0 { |
158 | ok = true | 158 | ok = true |
diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go index 562bdaa..a157ec4 100644 --- a/src/gnunet/util/misc.go +++ b/src/gnunet/util/misc.go | |||
@@ -24,6 +24,7 @@ import ( | |||
24 | "strings" | 24 | "strings" |
25 | 25 | ||
26 | "github.com/bfix/gospel/data" | 26 | "github.com/bfix/gospel/data" |
27 | "github.com/bfix/gospel/math" | ||
27 | ) | 28 | ) |
28 | 29 | ||
29 | //---------------------------------------------------------------------- | 30 | //---------------------------------------------------------------------- |
@@ -76,6 +77,16 @@ func GetParam[V any](params ParameterSet, key string) (i V, ok bool) { | |||
76 | // additional helpers | 77 | // additional helpers |
77 | //---------------------------------------------------------------------- | 78 | //---------------------------------------------------------------------- |
78 | 79 | ||
80 | // Distance returns the XOR distance between to byte arrays | ||
81 | func Distance(a, b []byte) *math.Int { | ||
82 | size := len(a) | ||
83 | d := make([]byte, size) | ||
84 | for i := range d { | ||
85 | d[i] = a[i] ^ b[i] | ||
86 | } | ||
87 | return math.NewIntFromBytes(d) | ||
88 | } | ||
89 | |||
79 | // StripPathRight returns a dot-separated path without | 90 | // StripPathRight returns a dot-separated path without |
80 | // its last (right-most) element. | 91 | // its last (right-most) element. |
81 | func StripPathRight(s string) string { | 92 | func StripPathRight(s string) string { |
diff --git a/src/gnunet/util/peer.go b/src/gnunet/util/peer.go index 62ae2e2..bcddad6 100644 --- a/src/gnunet/util/peer.go +++ b/src/gnunet/util/peer.go | |||
@@ -38,7 +38,7 @@ func NewPeerPublicKey(data []byte) *PeerPublicKey { | |||
38 | pk := new(PeerPublicKey) | 38 | pk := new(PeerPublicKey) |
39 | size := pk.Size() | 39 | size := pk.Size() |
40 | v := make([]byte, size) | 40 | v := make([]byte, size) |
41 | if data != nil && len(data) > 0 { | 41 | if len(data) > 0 { |
42 | if uint(len(data)) < size { | 42 | if uint(len(data)) < size { |
43 | CopyAlignedBlock(v, data) | 43 | CopyAlignedBlock(v, data) |
44 | } else { | 44 | } else { |
@@ -68,7 +68,7 @@ func (pk *PeerPublicKey) Verify(data []byte, sig *PeerSignature) (bool, error) { | |||
68 | // Peer identifier: | 68 | // Peer identifier: |
69 | //---------------------------------------------------------------------- | 69 | //---------------------------------------------------------------------- |
70 | 70 | ||
71 | // PeerID is a wrpped PeerPublicKey | 71 | // PeerID is a wrapped PeerPublicKey |
72 | type PeerID struct { | 72 | type PeerID struct { |
73 | PeerPublicKey | 73 | PeerPublicKey |
74 | } | 74 | } |
@@ -109,7 +109,7 @@ func NewPeerSignature(data []byte) *PeerSignature { | |||
109 | s := new(PeerSignature) | 109 | s := new(PeerSignature) |
110 | size := s.Size() | 110 | size := s.Size() |
111 | v := make([]byte, size) | 111 | v := make([]byte, size) |
112 | if data != nil && len(data) > 0 { | 112 | if len(data) > 0 { |
113 | if uint(len(data)) < size { | 113 | if uint(len(data)) < size { |
114 | CopyAlignedBlock(v, data) | 114 | CopyAlignedBlock(v, data) |
115 | } else { | 115 | } else { |