aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernd Fix <brf@hoi-polloi.org>2022-08-15 14:02:05 +0200
committerBernd Fix <brf@hoi-polloi.org>2022-08-15 14:02:05 +0200
commitb62071330cd7e0445e89660d07b7aed098f80285 (patch)
tree85c1bbe055dff8bf027459121dc2fb958a846af4
parent835c8e8b45487c1276426034b5afb915853b6eb1 (diff)
downloadgnunet-go-b62071330cd7e0445e89660d07b7aed098f80285.tar.gz
gnunet-go-b62071330cd7e0445e89660d07b7aed098f80285.zip
Milestone 2+3 (NLnet funding)v0.1.30
-rw-r--r--src/gnunet/cmd/gnunet-service-dht-go/main.go2
-rw-r--r--src/gnunet/config/config.go9
-rw-r--r--src/gnunet/config/gnunet-config.json5
-rw-r--r--src/gnunet/core/hello_test.go4
-rw-r--r--src/gnunet/core/peer_test.go2
-rw-r--r--src/gnunet/crypto/gns.go4
-rw-r--r--src/gnunet/crypto/hash.go2
-rw-r--r--src/gnunet/go.mod4
-rw-r--r--src/gnunet/go.sum4
-rw-r--r--src/gnunet/message/msg_dht_p2p.go256
-rw-r--r--src/gnunet/service/connection.go4
-rw-r--r--src/gnunet/service/dht/blocks/filters.go16
-rw-r--r--src/gnunet/service/dht/blocks/handlers.go13
-rw-r--r--src/gnunet/service/dht/blocks/hello.go70
-rw-r--r--src/gnunet/service/dht/blocks/hello_test.go164
-rw-r--r--src/gnunet/service/dht/local.go58
-rw-r--r--src/gnunet/service/dht/messages.go191
-rw-r--r--src/gnunet/service/dht/module.go128
-rw-r--r--src/gnunet/service/dht/path/elements.go2
-rw-r--r--src/gnunet/service/dht/path/handling.go9
-rw-r--r--src/gnunet/service/dht/resulthandler.go45
-rw-r--r--src/gnunet/service/dht/routingtable.go38
-rw-r--r--src/gnunet/service/dht/routingtable_test.go10
-rw-r--r--src/gnunet/service/gns/module.go1
-rw-r--r--src/gnunet/service/namecache/module.go16
-rw-r--r--src/gnunet/service/store/store_dht.go (renamed from src/gnunet/service/store/store_fs.go)194
-rw-r--r--src/gnunet/service/store/store_dht_meta.go (renamed from src/gnunet/service/store/store_fs_meta.go)86
-rw-r--r--src/gnunet/service/store/store_dht_meta.sql (renamed from src/gnunet/service/store/store_fs_meta.sql)1
-rw-r--r--src/gnunet/service/store/store_dht_test.go (renamed from src/gnunet/service/store/dhtstore_test.go)21
-rw-r--r--src/gnunet/service/store/store_kv.go (renamed from src/gnunet/service/store/store.go)68
-rw-r--r--src/gnunet/transport/responder.go9
-rw-r--r--src/gnunet/util/map.go2
-rw-r--r--src/gnunet/util/misc.go11
-rw-r--r--src/gnunet/util/peer.go6
34 files changed, 1010 insertions, 445 deletions
diff --git a/src/gnunet/cmd/gnunet-service-dht-go/main.go b/src/gnunet/cmd/gnunet-service-dht-go/main.go
index d8244d9..28d2100 100644
--- a/src/gnunet/cmd/gnunet-service-dht-go/main.go
+++ b/src/gnunet/cmd/gnunet-service-dht-go/main.go
@@ -137,7 +137,7 @@ func main() {
137 // check for HELLO URL 137 // check for HELLO URL
138 if strings.HasPrefix(bs, "gnunet://hello/") { 138 if strings.HasPrefix(bs, "gnunet://hello/") {
139 var hb *blocks.HelloBlock 139 var hb *blocks.HelloBlock
140 if hb, err = blocks.ParseHelloURL(bs, true); err != nil { 140 if hb, err = blocks.ParseHelloBlockFromURL(bs, true); err != nil {
141 logger.Printf(logger.ERROR, "[dht] failed bootstrap HELLO URL %s: %s", bs, err.Error()) 141 logger.Printf(logger.ERROR, "[dht] failed bootstrap HELLO URL %s: %s", bs, err.Error())
142 continue 142 continue
143 } 143 }
diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go
index cfbf705..897926f 100644
--- a/src/gnunet/config/config.go
+++ b/src/gnunet/config/config.go
@@ -90,9 +90,9 @@ type ServiceConfig struct {
90 90
91// GNSConfig contains parameters for the GNU Name System service 91// GNSConfig contains parameters for the GNU Name System service
92type GNSConfig struct { 92type GNSConfig struct {
93 Service *ServiceConfig `json:"service"` // socket for GNS service 93 Service *ServiceConfig `json:"service"` // socket for GNS service
94 DHTReplLevel int `json:"dhtReplLevel"` // DHT replication level 94 ReplLevel int `json:"replLevel"` // DHT replication level
95 MaxDepth int `json:"maxDepth"` // maximum recursion depth in resolution 95 MaxDepth int `json:"maxDepth"` // maximum recursion depth in resolution
96} 96}
97 97
98//---------------------------------------------------------------------- 98//----------------------------------------------------------------------
@@ -109,7 +109,8 @@ type DHTConfig struct {
109 109
110// RoutingConfig holds parameters for routing tables 110// RoutingConfig holds parameters for routing tables
111type RoutingConfig struct { 111type RoutingConfig struct {
112 PeerTTL int `json:"peerTTL"` // time-out for peers in table 112 PeerTTL int `json:"peerTTL"` // time-out for peers in table
113 ReplLevel int `json:"replLevel"` // replication level
113} 114}
114 115
115//---------------------------------------------------------------------- 116//----------------------------------------------------------------------
diff --git a/src/gnunet/config/gnunet-config.json b/src/gnunet/config/gnunet-config.json
index 27678ab..2052b33 100644
--- a/src/gnunet/config/gnunet-config.json
+++ b/src/gnunet/config/gnunet-config.json
@@ -37,7 +37,8 @@
37 "maxGB": 10 37 "maxGB": 10
38 }, 38 },
39 "routing": { 39 "routing": {
40 "peerTTL": 10800 40 "peerTTL": 10800,
41 "replLevel": 5
41 }, 42 },
42 "heartbeat": 900 43 "heartbeat": 900
43 }, 44 },
@@ -48,7 +49,7 @@
48 "perm": "0770" 49 "perm": "0770"
49 } 50 }
50 }, 51 },
51 "dhtReplLevel": 10, 52 "replLevel": 10,
52 "maxDepth": 250 53 "maxDepth": 250
53 }, 54 },
54 "namecache": { 55 "namecache": {
diff --git a/src/gnunet/core/hello_test.go b/src/gnunet/core/hello_test.go
index 0590c90..211250c 100644
--- a/src/gnunet/core/hello_test.go
+++ b/src/gnunet/core/hello_test.go
@@ -64,7 +64,7 @@ var (
64 64
65func TestHelloURLDirect(t *testing.T) { 65func TestHelloURLDirect(t *testing.T) {
66 for _, hu := range helloURL { 66 for _, hu := range helloURL {
67 if _, err := blocks.ParseHelloURL(hu, false); err != nil { 67 if _, err := blocks.ParseHelloBlockFromURL(hu, false); err != nil {
68 t.Fatal(err) 68 t.Fatal(err)
69 } 69 }
70 } 70 }
@@ -93,7 +93,7 @@ func TestHelloURL(t *testing.T) {
93 93
94 // convert to and from HELLO URL 94 // convert to and from HELLO URL
95 url1 := hd.URL() 95 url1 := hd.URL()
96 hd2, err := blocks.ParseHelloURL(url1, true) 96 hd2, err := blocks.ParseHelloBlockFromURL(url1, true)
97 if err != nil { 97 if err != nil {
98 t.Fatal(err) 98 t.Fatal(err)
99 } 99 }
diff --git a/src/gnunet/core/peer_test.go b/src/gnunet/core/peer_test.go
index 1be4d42..29fe801 100644
--- a/src/gnunet/core/peer_test.go
+++ b/src/gnunet/core/peer_test.go
@@ -67,7 +67,7 @@ func TestPeerHello(t *testing.T) {
67 // convert to URL and back 67 // convert to URL and back
68 u := h.URL() 68 u := h.URL()
69 t.Log(u) 69 t.Log(u)
70 h2, err := blocks.ParseHelloURL(u, true) 70 h2, err := blocks.ParseHelloBlockFromURL(u, true)
71 if err != nil { 71 if err != nil {
72 t.Fatal(err) 72 t.Fatal(err)
73 } 73 }
diff --git a/src/gnunet/crypto/gns.go b/src/gnunet/crypto/gns.go
index 27a52d3..39eb8c5 100644
--- a/src/gnunet/crypto/gns.go
+++ b/src/gnunet/crypto/gns.go
@@ -417,7 +417,9 @@ func NewZoneSignature(d []byte) (sig *ZoneSignature, err error) {
417 } 417 }
418 // set signature implementation 418 // set signature implementation
419 zs := impl.NewSignature() 419 zs := impl.NewSignature()
420 err = zs.Init(sig.Signature) 420 if err = zs.Init(sig.Signature); err != nil {
421 return
422 }
421 sig.impl = zs 423 sig.impl = zs
422 // set public key implementation 424 // set public key implementation
423 zk := impl.NewPublic() 425 zk := impl.NewPublic()
diff --git a/src/gnunet/crypto/hash.go b/src/gnunet/crypto/hash.go
index 49e57ff..a5716a0 100644
--- a/src/gnunet/crypto/hash.go
+++ b/src/gnunet/crypto/hash.go
@@ -58,7 +58,7 @@ func NewHashCode(data []byte) *HashCode {
58 hc := new(HashCode) 58 hc := new(HashCode)
59 size := hc.Size() 59 size := hc.Size()
60 v := make([]byte, size) 60 v := make([]byte, size)
61 if data != nil && len(data) > 0 { 61 if len(data) > 0 {
62 if uint(len(data)) < size { 62 if uint(len(data)) < size {
63 util.CopyAlignedBlock(v, data) 63 util.CopyAlignedBlock(v, data)
64 } else { 64 } else {
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index 8a5db53..9e01d7c 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -3,7 +3,7 @@ module gnunet
3go 1.18 3go 1.18
4 4
5require ( 5require (
6 github.com/bfix/gospel v1.2.17 6 github.com/bfix/gospel v1.2.18
7 github.com/go-redis/redis/v8 v8.11.5 7 github.com/go-redis/redis/v8 v8.11.5
8 github.com/go-sql-driver/mysql v1.6.0 8 github.com/go-sql-driver/mysql v1.6.0
9 github.com/gorilla/mux v1.8.0 9 github.com/gorilla/mux v1.8.0
@@ -24,4 +24,4 @@ require (
24 golang.org/x/tools v0.1.11 // indirect 24 golang.org/x/tools v0.1.11 // indirect
25) 25)
26 26
27// replace github.com/bfix/gospel v1.2.17 => ../gospel 27//replace github.com/bfix/gospel v1.2.18 => ../gospel
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index 22c7e09..2a2a36a 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -1,5 +1,5 @@
1github.com/bfix/gospel v1.2.17 h1:Stvm+OiCA2GIWIhI/HKc6uaLDMtrJNxXgw/g+v9witw= 1github.com/bfix/gospel v1.2.18 h1:X9hYudt5dvjYTGGmKC4T7qcLdb7ORblVD4kAC/ZYXdU=
2github.com/bfix/gospel v1.2.17/go.mod h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI= 2github.com/bfix/gospel v1.2.18/go.mod h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI=
3github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= 3github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
4github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 4github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
5github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= 5github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
diff --git a/src/gnunet/message/msg_dht_p2p.go b/src/gnunet/message/msg_dht_p2p.go
index ccc0406..638ff16 100644
--- a/src/gnunet/message/msg_dht_p2p.go
+++ b/src/gnunet/message/msg_dht_p2p.go
@@ -113,20 +113,20 @@ func (m *DHTP2PGetMsg) Update(pf *blocks.PeerFilter, rf blocks.ResultFilter, hop
113 113
114// DHTP2PPutMsg wire layout 114// DHTP2PPutMsg wire layout
115type DHTP2PPutMsg struct { 115type DHTP2PPutMsg struct {
116 MsgSize uint16 `order:"big"` // total size of message 116 MsgSize uint16 `order:"big"` // total size of message
117 MsgType uint16 `order:"big"` // DHT_P2P_PUT (146) 117 MsgType uint16 `order:"big"` // DHT_P2P_PUT (146)
118 BType uint32 `order:"big"` // block type 118 BType uint32 `order:"big"` // block type
119 Flags uint16 `order:"big"` // processing flags 119 Flags uint16 `order:"big"` // processing flags
120 HopCount uint16 `order:"big"` // message hops 120 HopCount uint16 `order:"big"` // message hops
121 ReplLvl uint16 `order:"big"` // replication level 121 ReplLvl uint16 `order:"big"` // replication level
122 PathL uint16 `order:"big"` // path length 122 PathL uint16 `order:"big"` // path length
123 Expiration util.AbsoluteTime `` // expiration date 123 Expiration util.AbsoluteTime `` // expiration date
124 PeerFilter *blocks.PeerFilter `` // peer bloomfilter 124 PeerFilter *blocks.PeerFilter `` // peer bloomfilter
125 Key *crypto.HashCode `` // query key to block 125 Key *crypto.HashCode `` // query key to block
126 TruncOrigin []byte `size:"(PESize)"` // truncated origin (if TRUNCATED flag set) 126 TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin (if TRUNCATED flag set)
127 PutPath []*path.Entry `size:"PathL"` // PUT path 127 PutPath []*path.Entry `size:"PathL"` // PUT path
128 LastSig []byte `size:"(PESize)"` // signature of last hop (if RECORD_ROUTE flag is set) 128 LastSig *util.PeerSignature `opt:"(IsUsed)"` // signature of last hop (if RECORD_ROUTE flag is set)
129 Block []byte `size:"*"` // block data 129 Block []byte `size:"*"` // block data
130} 130}
131 131
132// NewDHTP2PPutMsg creates an empty new DHTP2PPutMsg 132// NewDHTP2PPutMsg creates an empty new DHTP2PPutMsg
@@ -149,19 +149,15 @@ func NewDHTP2PPutMsg() *DHTP2PPutMsg {
149 } 149 }
150} 150}
151 151
152// PESize calculates field sizes based on flags and attributes 152// IsUsed returns true if an optional field is used
153func (m *DHTP2PPutMsg) PESize(field string) uint { 153func (m *DHTP2PPutMsg) IsUsed(field string) bool {
154 switch field { 154 switch field {
155 case "Origin": 155 case "Origin":
156 if m.Flags&enums.DHT_RO_TRUNCATED != 0 { 156 return m.Flags&enums.DHT_RO_TRUNCATED != 0
157 return util.NewPeerID(nil).Size()
158 }
159 case "LastSig": 157 case "LastSig":
160 if m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 { 158 return m.Flags&enums.DHT_RO_RECORD_ROUTE != 0
161 return util.NewPeerSignature(nil).Size()
162 }
163 } 159 }
164 return 0 160 return false
165} 161}
166 162
167//---------------------------------------------------------------------- 163//----------------------------------------------------------------------
@@ -171,13 +167,13 @@ func (m *DHTP2PPutMsg) Update(p *path.Path, pf *blocks.PeerFilter, hop uint16) *
171 msg := NewDHTP2PPutMsg() 167 msg := NewDHTP2PPutMsg()
172 msg.Flags = m.Flags 168 msg.Flags = m.Flags
173 msg.HopCount = hop 169 msg.HopCount = hop
174 msg.PathL = m.PathL 170 msg.PathL = p.NumList
175 msg.Expiration = m.Expiration 171 msg.Expiration = m.Expiration
176 msg.PeerFilter = pf 172 msg.PeerFilter = pf
177 msg.Key = m.Key.Clone() 173 msg.Key = m.Key.Clone()
178 msg.TruncOrigin = m.TruncOrigin 174 msg.TruncOrigin = p.TruncOrigin
179 msg.PutPath = util.Clone(m.PutPath) 175 msg.PutPath = util.Clone(p.List)
180 msg.LastSig = m.LastSig 176 msg.LastSig = p.LastSig
181 msg.Block = util.Clone(m.Block) 177 msg.Block = util.Clone(m.Block)
182 msg.SetPath(p) 178 msg.SetPath(p)
183 return msg 179 return msg
@@ -199,11 +195,11 @@ func (m *DHTP2PPutMsg) Path(sender *util.PeerID) *path.Path {
199 195
200 // handle truncate origin 196 // handle truncate origin
201 if m.Flags&enums.DHT_RO_TRUNCATED == 1 { 197 if m.Flags&enums.DHT_RO_TRUNCATED == 1 {
202 if m.TruncOrigin == nil || len(m.TruncOrigin) == 0 { 198 if m.TruncOrigin == nil {
203 logger.Printf(logger.WARN, "[path] truncated but no origin - flag reset") 199 logger.Printf(logger.WARN, "[path] truncated but no origin - flag reset")
204 m.Flags &^= enums.DHT_RO_TRUNCATED 200 m.Flags &^= enums.DHT_RO_TRUNCATED
205 } else { 201 } else {
206 pth.TruncOrigin = util.NewPeerID(m.TruncOrigin) 202 pth.TruncOrigin = m.TruncOrigin
207 pth.Flags |= path.PathTruncated 203 pth.Flags |= path.PathTruncated
208 } 204 }
209 } 205 }
@@ -213,12 +209,12 @@ func (m *DHTP2PPutMsg) Path(sender *util.PeerID) *path.Path {
213 pth.NumList = uint16(len(pth.List)) 209 pth.NumList = uint16(len(pth.List))
214 210
215 // handle last hop signature 211 // handle last hop signature
216 if m.LastSig == nil || len(m.LastSig) == 0 { 212 if m.LastSig == nil {
217 logger.Printf(logger.WARN, "[path] - last hop signature missing - path reset") 213 logger.Printf(logger.WARN, "[path] - last hop signature missing - path reset")
218 return path.NewPath(crypto.Hash(m.Block), m.Expiration) 214 return path.NewPath(crypto.Hash(m.Block), m.Expiration)
219 } 215 }
220 pth.Flags |= path.PathLastHop 216 pth.Flags |= path.PathLastHop
221 pth.LastSig = util.NewPeerSignature(m.LastSig) 217 pth.LastSig = m.LastSig
222 pth.LastHop = sender 218 pth.LastHop = sender
223 return pth 219 return pth
224} 220}
@@ -235,7 +231,13 @@ func (m *DHTP2PPutMsg) SetPath(p *path.Path) {
235 if len(m.PutPath) > 0 { 231 if len(m.PutPath) > 0 {
236 pes = m.PutPath[0].Size() 232 pes = m.PutPath[0].Size()
237 } 233 }
238 oldSize := uint(len(m.PutPath))*pes + m.PESize("Origin") + m.PESize("LastSig") 234 oldSize := uint(len(m.PutPath)) * pes
235 if m.TruncOrigin != nil {
236 oldSize += m.TruncOrigin.Size()
237 }
238 if m.LastSig != nil {
239 oldSize += m.LastSig.Size()
240 }
239 // if no new path is defined,... 241 // if no new path is defined,...
240 if p == nil { 242 if p == nil {
241 // ... remove existing path 243 // ... remove existing path
@@ -254,12 +256,12 @@ func (m *DHTP2PPutMsg) SetPath(p *path.Path) {
254 if p.TruncOrigin != nil { 256 if p.TruncOrigin != nil {
255 // truncated path 257 // truncated path
256 m.Flags |= enums.DHT_RO_TRUNCATED 258 m.Flags |= enums.DHT_RO_TRUNCATED
257 m.TruncOrigin = p.TruncOrigin.Bytes() 259 m.TruncOrigin = p.TruncOrigin
258 } 260 }
259 m.PutPath = util.Clone(p.List) 261 m.PutPath = util.Clone(p.List)
260 m.PathL = uint16(len(m.PutPath)) 262 m.PathL = uint16(len(m.PutPath))
261 if p.LastSig != nil { 263 if p.LastSig != nil {
262 m.LastSig = p.LastSig.Bytes() 264 m.LastSig = p.LastSig
263 } 265 }
264} 266}
265 267
@@ -283,52 +285,174 @@ func (m *DHTP2PPutMsg) Header() *Header {
283 285
284// DHTP2PResultMsg wire layout 286// DHTP2PResultMsg wire layout
285type DHTP2PResultMsg struct { 287type DHTP2PResultMsg struct {
286 MsgSize uint16 `order:"big"` // total size of message 288 MsgSize uint16 `order:"big"` // total size of message
287 MsgType uint16 `order:"big"` // DHT_P2P_RESULT (148) 289 MsgType uint16 `order:"big"` // DHT_P2P_RESULT (148)
288 BType uint32 `order:"big"` // Block type of result 290 BType uint32 `order:"big"` // Block type of result
289 Reserved uint32 `order:"big"` // Reserved for further use 291 Flags uint32 `order:"big"` // Message flags
290 PutPathL uint16 `order:"big"` // size of PUTPATH field 292 PutPathL uint16 `order:"big"` // size of PUTPATH field
291 GetPathL uint16 `order:"big"` // size of GETPATH field 293 GetPathL uint16 `order:"big"` // size of GETPATH field
292 Expires util.AbsoluteTime `` // expiration date 294 Expires util.AbsoluteTime `` // expiration date
293 Query *crypto.HashCode `` // Query key for block 295 Query *crypto.HashCode `` // Query key for block
294 Origin []byte `size:"(PESize)"` // truncated origin (if TRUNCATED flag set) 296 TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin (if TRUNCATED flag set)
295 PutPath []*path.Entry `size:"PutPathL"` // PUTPATH 297 PathList []*path.Entry `size:"(NumPath)"` // PATH
296 GetPath []*path.Entry `size:"GetPathL"` // GETPATH 298 LastSig *util.PeerSignature `size:"(IsUsed)"` // signature of last hop (if RECORD_ROUTE flag is set)
297 LastSig []byte `size:"(PESize)"` // signature of last hop (if RECORD_ROUTE flag is set) 299 Block []byte `size:"*"` // block data
298 Block []byte `size:"*"` // block data
299} 300}
300 301
301// NewDHTP2PResultMsg creates a new empty DHTP2PResultMsg 302// NewDHTP2PResultMsg creates a new empty DHTP2PResultMsg
302func NewDHTP2PResultMsg() *DHTP2PResultMsg { 303func NewDHTP2PResultMsg() *DHTP2PResultMsg {
303 return &DHTP2PResultMsg{ 304 return &DHTP2PResultMsg{
304 MsgSize: 88, // size of empty message 305 MsgSize: 88, // size of empty message
305 MsgType: DHT_P2P_RESULT, // DHT_P2P_RESULT (148) 306 MsgType: DHT_P2P_RESULT, // DHT_P2P_RESULT (148)
306 BType: uint32(enums.BLOCK_TYPE_ANY), // type of returned block 307 BType: uint32(enums.BLOCK_TYPE_ANY), // type of returned block
307 Origin: nil, // no truncated origin 308 TruncOrigin: nil, // no truncated origin
308 PutPathL: 0, // empty putpath 309 PutPathL: 0, // empty putpath
309 PutPath: nil, // -"- 310 GetPathL: 0, // empty getpath
310 GetPathL: 0, // empty getpath 311 PathList: nil, // empty path list (put+get)
311 GetPath: nil, // -"- 312 LastSig: nil, // no recorded route
312 LastSig: nil, // no recorded route 313 Block: nil, // empty block
313 Block: nil, // empty block
314 } 314 }
315} 315}
316 316
317// PESize calculates field sizes based on flags and attributes 317// IsUsed returns if an optional field is present
318func (m *DHTP2PResultMsg) PESize(field string) uint { 318func (m *DHTP2PResultMsg) IsUsed(field string) bool {
319 switch field { 319 switch field {
320 case "Origin": 320 case "Origin":
321 //if m.Flags&enums.DHT_RO_TRUNCATED != 0 { 321 return m.Flags&enums.DHT_RO_TRUNCATED != 0
322 return 32
323 //}
324 case "LastSig": 322 case "LastSig":
325 //if m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 { 323 return m.Flags&enums.DHT_RO_RECORD_ROUTE != 0
326 return 64 324 }
327 //} 325 return false
326}
327
328// NumPath returns the total number of entries in path
329func (m *DHTP2PResultMsg) NumPath(field string) uint {
330 return uint(m.GetPathL + m.PutPathL)
331}
332
333//----------------------------------------------------------------------
334// Path handling (get/set path in message)
335//----------------------------------------------------------------------
336
337// Path returns the current path from message
338func (m *DHTP2PResultMsg) Path(sender *util.PeerID) *path.Path {
339 // create a "real" path list from message data
340 pth := path.NewPath(crypto.Hash(m.Block), m.Expires)
341
342 // return empty path if recording is switched off
343 if m.Flags&enums.DHT_RO_RECORD_ROUTE == 0 {
344 return pth
345 }
346 // handle truncate origin
347 if m.Flags&enums.DHT_RO_TRUNCATED == 1 {
348 if m.TruncOrigin == nil {
349 logger.Printf(logger.WARN, "[path] truncated but no origin - flag reset")
350 m.Flags &^= enums.DHT_RO_TRUNCATED
351 } else {
352 pth.TruncOrigin = m.TruncOrigin
353 pth.Flags |= path.PathTruncated
354 }
355 }
356 // copy path elements
357 pth.List = util.Clone(m.PathList)
358 pth.NumList = uint16(len(pth.List))
359
360 // check consistent length values; adjust if mismatched
361 if m.GetPathL+m.PutPathL != pth.NumList {
362 logger.Printf(logger.WARN, "[path] Inconsistent PATH length -- adjusting...")
363 if sp := pth.NumList - m.PutPathL; sp > 0 {
364 pth.SplitPos = sp
365 } else {
366 pth.SplitPos = 0
367 }
368 } else {
369 pth.SplitPos = pth.NumList - m.PutPathL
370 }
371 // handle last hop signature
372 if m.LastSig == nil {
373 logger.Printf(logger.WARN, "[path] - last hop signature missing - path reset")
374 return path.NewPath(crypto.Hash(m.Block), m.Expires)
375 }
376 pth.Flags |= path.PathLastHop
377 pth.LastSig = m.LastSig
378 pth.LastHop = sender
379 return pth
380}
381
382// Set path in message; corrects the message size accordingly
383func (m *DHTP2PResultMsg) SetPath(p *path.Path) {
384
385 // return if recording is switched off (don't touch path)
386 if m.Flags&enums.DHT_RO_RECORD_ROUTE == 0 {
387 return
388 }
389 // compute old path size
390 var pes uint
391 if len(m.PathList) > 0 {
392 pes = m.PathList[0].Size()
393 }
394 oldSize := uint(len(m.PathList)) * pes
395 if m.TruncOrigin != nil {
396 oldSize += m.TruncOrigin.Size()
397 }
398 if m.LastSig != nil {
399 oldSize += m.LastSig.Size()
400 }
401 // if no new path is defined,...
402 if p == nil {
403 // ... remove existing path
404 m.TruncOrigin = nil
405 m.PathList = make([]*path.Entry, 0)
406 m.LastSig = nil
407 m.GetPathL = 0
408 m.PutPathL = 0
409 m.Flags &^= enums.DHT_RO_TRUNCATED
410 m.MsgSize -= uint16(oldSize)
411 return
412 }
413 // adjust message size
414 m.MsgSize += uint16(p.Size() - oldSize)
415
416 // transfer path data
417 if p.TruncOrigin != nil {
418 // truncated path
419 m.Flags |= enums.DHT_RO_TRUNCATED
420 m.TruncOrigin = p.TruncOrigin
421 }
422 m.PathList = util.Clone(p.List)
423 m.PutPathL = p.SplitPos
424 m.GetPathL = p.NumList - p.SplitPos
425 if p.LastSig != nil {
426 m.LastSig = p.LastSig
427 }
428}
429
430//----------------------------------------------------------------------
431
432// Update message (forwarding)
433func (m *DHTP2PResultMsg) Update(pth *path.Path) *DHTP2PResultMsg {
434 // clone old message
435 msg := &DHTP2PResultMsg{
436 MsgSize: m.MsgSize,
437 MsgType: m.MsgType,
438 BType: m.BType,
439 Flags: m.Flags,
440 PutPathL: m.PutPathL,
441 GetPathL: m.GetPathL,
442 Expires: m.Expires,
443 Query: m.Query.Clone(),
444 TruncOrigin: m.TruncOrigin,
445 PathList: util.Clone(m.PathList),
446 LastSig: m.LastSig,
447 Block: util.Clone(m.Block),
328 } 448 }
329 return 0 449 // set new path
450 msg.SetPath(pth)
451 return msg
330} 452}
331 453
454//----------------------------------------------------------------------
455
332// String returns a human-readable representation of the message. 456// String returns a human-readable representation of the message.
333func (m *DHTP2PResultMsg) String() string { 457func (m *DHTP2PResultMsg) String() string {
334 return fmt.Sprintf("DHTP2PResultMsg{btype=%s,putl=%d,getl=%d}", 458 return fmt.Sprintf("DHTP2PResultMsg{btype=%s,putl=%d,getl=%d}",
diff --git a/src/gnunet/service/connection.go b/src/gnunet/service/connection.go
index d443160..fbc24d0 100644
--- a/src/gnunet/service/connection.go
+++ b/src/gnunet/service/connection.go
@@ -138,8 +138,8 @@ func (s *Connection) Receive(ctx context.Context) (message.Message, error) {
138} 138}
139 139
140// Receiver returns the receiving client (string representation) 140// Receiver returns the receiving client (string representation)
141func (s *Connection) Receiver() string { 141func (s *Connection) Receiver() *util.PeerID {
142 return fmt.Sprintf("uds:%d", s.id) 142 return nil
143} 143}
144 144
145//---------------------------------------------------------------------- 145//----------------------------------------------------------------------
diff --git a/src/gnunet/service/dht/blocks/filters.go b/src/gnunet/service/dht/blocks/filters.go
index 26af0b3..f617479 100644
--- a/src/gnunet/service/dht/blocks/filters.go
+++ b/src/gnunet/service/dht/blocks/filters.go
@@ -22,6 +22,7 @@ import (
22 "bytes" 22 "bytes"
23 "crypto/sha512" 23 "crypto/sha512"
24 "encoding/binary" 24 "encoding/binary"
25 "gnunet/crypto"
25 "gnunet/util" 26 "gnunet/util"
26 27
27 "github.com/bfix/gospel/logger" 28 "github.com/bfix/gospel/logger"
@@ -105,9 +106,12 @@ type ResultFilter interface {
105 // Add entry to filter 106 // Add entry to filter
106 Add(Block) 107 Add(Block)
107 108
108 // Contains returns true if entry is filtered 109 // Contains returns true if block is filtered
109 Contains(Block) bool 110 Contains(Block) bool
110 111
112 // ContainsHash returns true if block hash is filtered
113 ContainsHash(bh *crypto.HashCode) bool
114
111 // Bytes returns the binary representation of a result filter 115 // Bytes returns the binary representation of a result filter
112 Bytes() []byte 116 Bytes() []byte
113 117
@@ -140,9 +144,15 @@ func (rf *GenericResultFilter) Add(b Block) {
140 rf.bf.Add(b.Bytes()) 144 rf.bf.Add(b.Bytes())
141} 145}
142 146
143// Contains returns true if entry (binary representation) is filtered 147// Contains returns true if a block is filtered
144func (rf *GenericResultFilter) Contains(b Block) bool { 148func (rf *GenericResultFilter) Contains(b Block) bool {
145 return rf.bf.Contains(b.Bytes()) 149 bh := crypto.Hash(b.Bytes())
150 return rf.bf.Contains(bh.Bits)
151}
152
153// ContainsHash returns true if a block hash is filtered
154func (rf *GenericResultFilter) ContainsHash(bh *crypto.HashCode) bool {
155 return rf.bf.Contains(bh.Bits)
146} 156}
147 157
148// Bytes returns the binary representation of a result filter 158// Bytes returns the binary representation of a result filter
diff --git a/src/gnunet/service/dht/blocks/handlers.go b/src/gnunet/service/dht/blocks/handlers.go
index 9df3867..c166504 100644
--- a/src/gnunet/service/dht/blocks/handlers.go
+++ b/src/gnunet/service/dht/blocks/handlers.go
@@ -29,15 +29,22 @@ type BlockHandler interface {
29 // Parse a block instance from binary data 29 // Parse a block instance from binary data
30 ParseBlock(buf []byte) (Block, error) 30 ParseBlock(buf []byte) (Block, error)
31 31
32 // ValidateBlockQuery is used to evaluate the request for a block as part of 32 // ValidateBlockQuery is used to evaluate the request for a block as part
33 // DHT-P2P-GET processing. Here, the block payload is unknown, but if possible 33 // of DHT-P2P-GET processing. Here, the block payload is unknown, but if
34 // the XQuery and Key SHOULD be verified. 34 // possible the XQuery and Key SHOULD be verified.
35 ValidateBlockQuery(key *crypto.HashCode, xquery []byte) bool 35 ValidateBlockQuery(key *crypto.HashCode, xquery []byte) bool
36 36
37 // ValidateBlockKey returns true if the block key is the same as the 37 // ValidateBlockKey returns true if the block key is the same as the
38 // query key used to access the block. 38 // query key used to access the block.
39 ValidateBlockKey(b Block, key *crypto.HashCode) bool 39 ValidateBlockKey(b Block, key *crypto.HashCode) bool
40 40
41 // DeriveBlockKey is used to synthesize the block key from the block
42 // payload as part of PutMessage and ResultMessage processing. The special
43 // return value of 'nil' implies that this block type does not permit
44 // deriving the key from the block. A Key may be returned for a block that
45 // is ill-formed.
46 DeriveBlockKey(b Block) *crypto.HashCode
47
41 // ValidateBlockStoreRequest is used to evaluate a block payload as part of 48 // ValidateBlockStoreRequest is used to evaluate a block payload as part of
42 // PutMessage and ResultMessage processing. 49 // PutMessage and ResultMessage processing.
43 ValidateBlockStoreRequest(b Block) bool 50 ValidateBlockStoreRequest(b Block) bool
diff --git a/src/gnunet/service/dht/blocks/hello.go b/src/gnunet/service/dht/blocks/hello.go
index 32f803f..082e914 100644
--- a/src/gnunet/service/dht/blocks/hello.go
+++ b/src/gnunet/service/dht/blocks/hello.go
@@ -30,6 +30,7 @@ import (
30 "net/url" 30 "net/url"
31 "strconv" 31 "strconv"
32 "strings" 32 "strings"
33 "time"
33 34
34 "github.com/bfix/gospel/crypto/ed25519" 35 "github.com/bfix/gospel/crypto/ed25519"
35 "github.com/bfix/gospel/data" 36 "github.com/bfix/gospel/data"
@@ -63,8 +64,21 @@ type HelloBlock struct {
63 addrs []*util.Address // cooked address data 64 addrs []*util.Address // cooked address data
64} 65}
65 66
67// NewHelloBlock initializes a new HELLO block (unsigned)
68func NewHelloBlock(peer *util.PeerID, addrs []*util.Address, ttl time.Duration) *HelloBlock {
69 hb := new(HelloBlock)
70 hb.PeerID = peer
71 // limit expiration to second precision (HELLO-URL compatibility)
72 hb.Expires = util.NewAbsoluteTimeEpoch(uint64(time.Now().Add(ttl).Unix()))
73 hb.SetAddresses(addrs)
74 return hb
75}
76
66// SetAddresses adds a bulk of addresses for this HELLO block. 77// SetAddresses adds a bulk of addresses for this HELLO block.
67func (h *HelloBlock) SetAddresses(a []*util.Address) { 78func (h *HelloBlock) SetAddresses(a []*util.Address) {
79 if len(a) == 0 {
80 return
81 }
68 h.addrs = util.Clone(a) 82 h.addrs = util.Clone(a)
69 if err := h.finalize(); err != nil { 83 if err := h.finalize(); err != nil {
70 logger.Printf(logger.ERROR, "[HelloBlock.SetAddresses] failed: %s", err.Error()) 84 logger.Printf(logger.ERROR, "[HelloBlock.SetAddresses] failed: %s", err.Error())
@@ -76,10 +90,10 @@ func (h *HelloBlock) Addresses() []*util.Address {
76 return util.Clone(h.addrs) 90 return util.Clone(h.addrs)
77} 91}
78 92
79// ParseHelloURL parses a HELLO URL of the following form: 93// ParseHelloBlockFromURL parses a HELLO URL of the following form:
80// gnunet://hello/<PeerID>/<signature>/<expire>?<addrs> 94// gnunet://hello/<PeerID>/<signature>/<expire>?<addrs>
81// The addresses are encoded. 95// The addresses are encoded.
82func ParseHelloURL(u string, checkExpiry bool) (h *HelloBlock, err error) { 96func ParseHelloBlockFromURL(u string, checkExpiry bool) (h *HelloBlock, err error) {
83 // check and trim prefix 97 // check and trim prefix
84 if !strings.HasPrefix(u, helloPrefix) { 98 if !strings.HasPrefix(u, helloPrefix) {
85 err = fmt.Errorf("invalid HELLO-URL prefix: '%s'", u) 99 err = fmt.Errorf("invalid HELLO-URL prefix: '%s'", u)
@@ -158,8 +172,8 @@ func ParseHelloURL(u string, checkExpiry bool) (h *HelloBlock, err error) {
158 return 172 return
159} 173}
160 174
161// ParseHelloFromBytes converts a byte array into a HelloBlock instance. 175// ParseHelloBlockFromBytes converts a byte array into a HelloBlock instance.
162func ParseHelloFromBytes(buf []byte) (h *HelloBlock, err error) { 176func ParseHelloBlockFromBytes(buf []byte) (h *HelloBlock, err error) {
163 h = new(HelloBlock) 177 h = new(HelloBlock)
164 if err = data.Unmarshal(h, buf); err == nil { 178 if err = data.Unmarshal(h, buf); err == nil {
165 err = h.finalize() 179 err = h.finalize()
@@ -289,7 +303,7 @@ func (h *HelloBlock) SignedData() []byte {
289 err := binary.Write(buf, binary.BigEndian, size) 303 err := binary.Write(buf, binary.BigEndian, size)
290 if err == nil { 304 if err == nil {
291 if err = binary.Write(buf, binary.BigEndian, purpose); err == nil { 305 if err = binary.Write(buf, binary.BigEndian, purpose); err == nil {
292 if err = binary.Write(buf, binary.BigEndian, h.Expires.Epoch()*1000000); err == nil { 306 if err = binary.Write(buf, binary.BigEndian, h.Expires /*.Epoch()*1000000*/); err == nil {
293 if n, err = buf.Write(hAddr[:]); err == nil { 307 if n, err = buf.Write(hAddr[:]); err == nil {
294 if n != len(hAddr[:]) { 308 if n != len(hAddr[:]) {
295 err = errors.New("signed data size mismatch") 309 err = errors.New("signed data size mismatch")
@@ -313,7 +327,7 @@ type HelloBlockHandler struct{}
313 327
314// Parse a block instance from binary data 328// Parse a block instance from binary data
315func (bh *HelloBlockHandler) ParseBlock(buf []byte) (Block, error) { 329func (bh *HelloBlockHandler) ParseBlock(buf []byte) (Block, error) {
316 return ParseHelloFromBytes(buf) 330 return ParseHelloBlockFromBytes(buf)
317} 331}
318 332
319// ValidateHelloBlockQuery validates query parameters for a 333// ValidateHelloBlockQuery validates query parameters for a
@@ -326,20 +340,49 @@ func (bh *HelloBlockHandler) ValidateBlockQuery(key *crypto.HashCode, xquery []b
326// ValidateBlockKey returns true if the block key is the same as the 340// ValidateBlockKey returns true if the block key is the same as the
327// query key used to access the block. 341// query key used to access the block.
328func (bh *HelloBlockHandler) ValidateBlockKey(b Block, key *crypto.HashCode) bool { 342func (bh *HelloBlockHandler) ValidateBlockKey(b Block, key *crypto.HashCode) bool {
343 // check for matching keys
344 bkey := bh.DeriveBlockKey(b)
345 if bkey == nil {
346 logger.Println(logger.WARN, "[HelloHdlr] ValidateBlockKey: not a HELLO block")
347 return false
348 }
349 return key.Equals(bkey)
350}
351
352// DeriveBlockKey is used to synthesize the block key from the block
353// payload as part of PutMessage and ResultMessage processing. The special
354// return value of 'nil' implies that this block type does not permit
355// deriving the key from the block. A Key may be returned for a block that
356// is ill-formed.
357func (bh *HelloBlockHandler) DeriveBlockKey(b Block) *crypto.HashCode {
358 // check for correct type
329 hb, ok := b.(*HelloBlock) 359 hb, ok := b.(*HelloBlock)
330 if !ok { 360 if !ok {
331 return false 361 logger.Println(logger.WARN, "[HelloHdlr] DeriveBlockKey: not a HELLO block")
362 return nil
332 } 363 }
333 // key must be the hash of the peer id 364 // key must be the hash of the peer id
334 bkey := crypto.Hash(hb.PeerID.Bytes()) 365 return crypto.Hash(hb.PeerID.Bytes())
335 return key.Equals(bkey)
336} 366}
337 367
338// ValidateBlockStoreRequest is used to evaluate a block payload as part of 368// ValidateBlockStoreRequest is used to evaluate a block payload as part of
339// PutMessage and ResultMessage processing. 369// PutMessage and ResultMessage processing.
370// To validate a block store request is to verify the EdDSA SIGNATURE over
371// the hashed ADDRESSES against the public key from the peer ID field. If the
372// signature is valid true is returned.
340func (bh *HelloBlockHandler) ValidateBlockStoreRequest(b Block) bool { 373func (bh *HelloBlockHandler) ValidateBlockStoreRequest(b Block) bool {
341 // TODO: verify block payload 374 // check for correct type
342 return true 375 hb, ok := b.(*HelloBlock)
376 if !ok {
377 logger.Println(logger.WARN, "[HelloHdlr] ValidateBlockStoreRequest: not a HELLO block")
378 return false
379 }
380 // verify signature
381 ok, err := hb.Verify()
382 if err != nil {
383 ok = false
384 }
385 return ok
343} 386}
344 387
345// SetupResultFilter is used to setup an empty result filter. The arguments 388// SetupResultFilter is used to setup an empty result filter. The arguments
@@ -428,6 +471,11 @@ func (rf *HelloResultFilter) Contains(b Block) bool {
428 return false 471 return false
429} 472}
430 473
474// ContainsHash checks if a block hash is contained in the result filter
475func (rf *HelloResultFilter) ContainsHash(bh *crypto.HashCode) bool {
476 return rf.bf.Contains(bh.Bits)
477}
478
431// Bytes returns a binary representation of a HELLO result filter 479// Bytes returns a binary representation of a HELLO result filter
432func (rf *HelloResultFilter) Bytes() []byte { 480func (rf *HelloResultFilter) Bytes() []byte {
433 return rf.bf.Bytes() 481 return rf.bf.Bytes()
diff --git a/src/gnunet/service/dht/blocks/hello_test.go b/src/gnunet/service/dht/blocks/hello_test.go
new file mode 100644
index 0000000..deb8041
--- /dev/null
+++ b/src/gnunet/service/dht/blocks/hello_test.go
@@ -0,0 +1,164 @@
1// This file is part of gnunet-go, a GNUnet-implementation in Golang.
2// Copyright (C) 2019-2022 Bernd Fix >Y<
3//
4// gnunet-go is free software: you can redistribute it and/or modify it
5// under the terms of the GNU Affero General Public License as published
6// by the Free Software Foundation, either version 3 of the License,
7// or (at your option) any later version.
8//
9// gnunet-go is distributed in the hope that it will be useful, but
10// WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12// Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program. If not, see <http://www.gnu.org/licenses/>.
16//
17// SPDX-License-Identifier: AGPL3.0-or-later
18
19package blocks
20
21import (
22 "bytes"
23 "encoding/base64"
24 "encoding/hex"
25 "gnunet/util"
26 "strings"
27 "testing"
28 "time"
29
30 "github.com/bfix/gospel/crypto/ed25519"
31 "github.com/bfix/gospel/data"
32)
33
34var (
35 block *HelloBlock
36 sk *ed25519.PrivateKey
37)
38
39func setup(t *testing.T) {
40 t.Helper()
41
42 // check for initialized values
43 if block != nil {
44 return
45 }
46 // generate keys
47 var pk *ed25519.PublicKey
48 pk, sk = ed25519.NewKeypair()
49 peer := util.NewPeerID(pk.Bytes())
50
51 // set addresses
52 addrs := []string{
53 "ip+udp://172.17.0.6:2086",
54 "ip+udp://245.23.42.67:2086",
55 }
56 addrList := make([]*util.Address, 0)
57 for _, addr := range addrs {
58 frag := strings.Split(addr, "://")
59 e := util.NewAddress(frag[0], frag[1])
60 if e == nil {
61 t.Fatal("invalid address: " + addr)
62 }
63 addrList = append(addrList, e)
64 }
65
66 // create new HELLO block
67 block = NewHelloBlock(peer, addrList, time.Hour)
68
69 // sign block.
70 sig, err := sk.EdSign(block.SignedData())
71 if err != nil {
72 t.Fatal(err)
73 }
74 block.Signature = util.NewPeerSignature(sig.Bytes())
75}
76
77func TestHelloVerify(t *testing.T) {
78 setup(t)
79
80 // verify signature
81 ok, err := block.Verify()
82 if err != nil {
83 t.Fatal(err)
84 }
85 if !ok {
86 t.Fatal("HELLO verify failed")
87 }
88}
89
90func TestHelloURL(t *testing.T) {
91 setup(t)
92
93 // create HELLO URL
94 url := block.URL()
95 t.Log(url)
96
97 // read back
98 tblk, err := ParseHelloBlockFromURL(url, true)
99 if err != nil {
100 t.Fatal(err)
101 }
102 // verify identical blocks
103 if !bytes.Equal(tblk.Bytes(), block.Bytes()) {
104 t.Log(hex.EncodeToString(tblk.Bytes()))
105 t.Log(hex.EncodeToString(block.Bytes()))
106 t.Fatal("URL readback failed")
107 }
108}
109
110func TestHelloBytes(t *testing.T) {
111 setup(t)
112
113 buf := block.Bytes()
114 tblk, err := ParseHelloBlockFromBytes(buf)
115 if err != nil {
116 t.Fatal(err)
117 }
118 // verify identical blocks
119 if !bytes.Equal(tblk.Bytes(), block.Bytes()) {
120 t.Log(hex.EncodeToString(tblk.Bytes()))
121 t.Log(hex.EncodeToString(block.Bytes()))
122 t.Fatal("Bytes readback failed")
123 }
124}
125
126func TestHelloDebug(t *testing.T) {
127 blkData := "QKObXJUbnnghRh9McDDjHaB9IIL6MhhEiQHc8VfO3QMABeZZJJhsA" +
128 "GlwK3VkcDovLzEyNy4wLjAuMToxMDAwMQBpcCt1ZHA6Ly8xNzIuMT" +
129 "cuMC40OjEwMDAxAGlwK3VkcDovL1s6OmZmZmY6MTcyLjE3LjAuNF06MTAwMDEA"
130 buf, err := base64.RawStdEncoding.DecodeString(blkData)
131 if err != nil {
132 t.Fatal(err)
133 }
134 hb, err := ParseHelloBlockFromBytes(buf)
135 if err != nil {
136 t.Fatal(err)
137 }
138 ok, err := hb.Verify()
139 if err != nil {
140 t.Fatal(err)
141 }
142 if !ok {
143 // trace problem
144 t.Log("Block: " + hex.EncodeToString(buf))
145 t.Log("PeerID: " + hb.PeerID.String())
146 t.Log(" -> " + hex.EncodeToString(hb.PeerID.Bytes()))
147 t.Logf("Expire: %d", hb.Expires.Val)
148 t.Logf(" -> " + hb.Expires.String())
149 var exp util.AbsoluteTime
150 if err = data.Unmarshal(&exp, buf[32:40]); err != nil {
151 t.Fatal(err)
152 }
153 t.Logf(" -> " + exp.String())
154 t.Log("AddrBin: " + hex.EncodeToString(hb.AddrBin))
155 sd := hb.SignedData()
156 t.Log("SignedData: " + hex.EncodeToString(sd))
157 t.Log("Addresses:")
158 for _, addr := range hb.Addresses() {
159 t.Logf("* " + addr.URI())
160 }
161 t.Log("Signature: " + hex.EncodeToString(hb.Signature.Bytes()))
162 t.Fatal("debug HELLO verify failed")
163 }
164}
diff --git a/src/gnunet/service/dht/local.go b/src/gnunet/service/dht/local.go
index 1e6b100..ba76892 100644
--- a/src/gnunet/service/dht/local.go
+++ b/src/gnunet/service/dht/local.go
@@ -19,7 +19,6 @@
19package dht 19package dht
20 20
21import ( 21import (
22 "errors"
23 "gnunet/enums" 22 "gnunet/enums"
24 "gnunet/service/dht/blocks" 23 "gnunet/service/dht/blocks"
25 "gnunet/service/store" 24 "gnunet/service/store"
@@ -28,58 +27,37 @@ import (
28 "github.com/bfix/gospel/math" 27 "github.com/bfix/gospel/math"
29) 28)
30 29
31// getHelloCache tries to find the requested HELLO block in the HELLO cache 30// lookupHelloCache tries to find the requested HELLO block in the HELLO cache
32func (m *Module) getHelloCache(label string, addr *PeerAddress, rf blocks.ResultFilter) (entry *store.DHTEntry, dist *math.Int) { 31func (m *Module) lookupHelloCache(label string, addr *PeerAddress, rf blocks.ResultFilter, approx bool) (results []*store.DHTResult) {
33 logger.Printf(logger.DBG, "[%s] GET message for HELLO: check cache", label) 32 logger.Printf(logger.DBG, "[%s] GET message for HELLO: check cache", label)
34 // find best cached HELLO 33 // find best cached HELLO
35 var block blocks.Block 34 return m.rtable.LookupHello(addr, rf, approx)
36 block, dist = m.rtable.BestHello(addr, rf)
37
38 // if block is filtered, skip it
39 if block != nil {
40 if !rf.Contains(block) {
41 entry = &store.DHTEntry{Blk: block}
42 } else {
43 logger.Printf(logger.DBG, "[%s] GET message for HELLO: matching DHT block is filtered", label)
44 entry = nil
45 dist = nil
46 }
47 }
48 return
49} 35}
50 36
51// getLocalStorage tries to find the requested block in local storage 37// getLocalStorage tries to find the requested block in local storage
52func (m *Module) getLocalStorage(label string, query blocks.Query, rf blocks.ResultFilter) (entry *store.DHTEntry, dist *math.Int, err error) { 38func (m *Module) getLocalStorage(label string, query blocks.Query, rf blocks.ResultFilter) (results []*store.DHTResult, err error) {
53 39
54 // query DHT store for exact match (9.4.3.3c) 40 // query DHT store for exact matches (9.4.3.3c)
55 if entry, err = m.store.Get(query); err != nil { 41 var entries []*store.DHTEntry
42 if entries, err = m.store.Get(label, query, rf); err != nil {
56 logger.Printf(logger.ERROR, "[%s] Failed to get DHT block from storage: %s", label, err.Error()) 43 logger.Printf(logger.ERROR, "[%s] Failed to get DHT block from storage: %s", label, err.Error())
57 return 44 return
58 } 45 }
59 if entry != nil { 46 for _, entry := range entries {
60 dist = math.ZERO 47 // add entry to result list
61 // check if we are filtered out 48 result := &store.DHTResult{
62 if rf.Contains(entry.Blk) { 49 Entry: entry,
63 logger.Printf(logger.DBG, "[%s] matching DHT block is filtered", label) 50 Dist: math.ZERO,
64 entry = nil
65 dist = nil
66 } 51 }
52 results = append(results, result)
53 // add to result filter
54 rf.Add(entry.Blk)
67 } 55 }
68 // if we have no exact match, find approximate block if requested 56 // if we have no exact match, find approximate block if requested
69 if entry == nil || query.Flags()&enums.DHT_RO_FIND_APPROXIMATE != 0 { 57 if len(results) == 0 || query.Flags()&enums.DHT_RO_FIND_APPROXIMATE != 0 {
70 // no exact match: find approximate (9.4.3.3b) 58 // no exact match: find approximate (9.4.3.3b)
71 match := func(e *store.DHTEntry) bool { 59 if results, err = m.store.GetApprox(label, query, rf); err != nil {
72 return rf.Contains(e.Blk) 60 logger.Printf(logger.ERROR, "[%s] Failed to get (approx.) DHT blocks from storage: %s", label, err.Error())
73 }
74 var d any
75 entry, d, err = m.store.GetApprox(query, match)
76 var ok bool
77 dist, ok = d.(*math.Int)
78 if !ok {
79 err = errors.New("no approx distance")
80 }
81 if err != nil {
82 logger.Printf(logger.ERROR, "[%s] Failed to get (approx.) DHT block from storage: %s", label, err.Error())
83 } 61 }
84 } 62 }
85 return 63 return
diff --git a/src/gnunet/service/dht/messages.go b/src/gnunet/service/dht/messages.go
index deb8461..76a92b6 100644
--- a/src/gnunet/service/dht/messages.go
+++ b/src/gnunet/service/dht/messages.go
@@ -20,6 +20,7 @@ package dht
20 20
21import ( 21import (
22 "context" 22 "context"
23 "gnunet/crypto"
23 "gnunet/enums" 24 "gnunet/enums"
24 "gnunet/message" 25 "gnunet/message"
25 "gnunet/service/dht/blocks" 26 "gnunet/service/dht/blocks"
@@ -29,7 +30,6 @@ import (
29 "gnunet/util" 30 "gnunet/util"
30 31
31 "github.com/bfix/gospel/logger" 32 "github.com/bfix/gospel/logger"
32 "github.com/bfix/gospel/math"
33) 33)
34 34
35//---------------------------------------------------------------------- 35//----------------------------------------------------------------------
@@ -38,6 +38,7 @@ import (
38 38
39// HandleMessage handles a DHT request/response message. Responses are sent 39// HandleMessage handles a DHT request/response message. Responses are sent
40// to the specified responder. 40// to the specified responder.
41//nolint:gocyclo // life sometimes is complex...
41func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn message.Message, back transport.Responder) bool { 42func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn message.Message, back transport.Responder) bool {
42 // assemble log label 43 // assemble log label
43 label := "dht" 44 label := "dht"
@@ -58,15 +59,18 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
58 // process message 59 // process message
59 switch msg := msgIn.(type) { 60 switch msg := msgIn.(type) {
60 61
62 //==================================================================
63 // DHT-P2P-GET
64 //==================================================================
61 case *message.DHTP2PGetMsg: 65 case *message.DHTP2PGetMsg:
62 //-------------------------------------------------------------- 66 //--------------------------------------------------------------
63 // DHT-P2P GET 67 // DHT-P2P GET
64 //-------------------------------------------------------------- 68 //--------------------------------------------------------------
65 logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message", label) 69 logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message", label)
66 query := blocks.NewGenericQuery(msg.Query, enums.BlockType(msg.BType), msg.Flags)
67 70
68 var entry *store.DHTEntry 71 // assemble query and initialize (cache) results
69 var dist *math.Int 72 query := blocks.NewGenericQuery(msg.Query, enums.BlockType(msg.BType), msg.Flags)
73 var results []*store.DHTResult
70 74
71 //-------------------------------------------------------------- 75 //--------------------------------------------------------------
72 // validate query (based on block type requested) (9.4.3.1) 76 // validate query (based on block type requested) (9.4.3.1)
@@ -113,7 +117,8 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
113 closest := m.rtable.IsClosestPeer(nil, addr, msg.PeerFilter, 0) 117 closest := m.rtable.IsClosestPeer(nil, addr, msg.PeerFilter, 0)
114 demux := int(msg.Flags)&enums.DHT_RO_DEMULTIPLEX_EVERYWHERE != 0 118 demux := int(msg.Flags)&enums.DHT_RO_DEMULTIPLEX_EVERYWHERE != 0
115 approx := int(msg.Flags)&enums.DHT_RO_FIND_APPROXIMATE != 0 119 approx := int(msg.Flags)&enums.DHT_RO_FIND_APPROXIMATE != 0
116 // actions 120
121 // enforced actions
117 doResult := closest || (demux && approx) 122 doResult := closest || (demux && approx)
118 doForward := !closest || (demux && !approx) 123 doForward := !closest || (demux && !approx)
119 logger.Printf(logger.DBG, "[%s] GET message: closest=%v, demux=%v, approx=%v --> result=%v, forward=%v", 124 logger.Printf(logger.DBG, "[%s] GET message: closest=%v, demux=%v, approx=%v --> result=%v, forward=%v",
@@ -122,53 +127,43 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
122 //------------------------------------------------------ 127 //------------------------------------------------------
123 // query for a HELLO? (9.4.3.3a) 128 // query for a HELLO? (9.4.3.3a)
124 if btype == enums.BLOCK_TYPE_DHT_URL_HELLO { 129 if btype == enums.BLOCK_TYPE_DHT_URL_HELLO {
125 // try to find result in HELLO cache 130 // try to find results in HELLO cache
126 entry, dist = m.getHelloCache(label, addr, rf) 131 results = m.lookupHelloCache(label, addr, rf, approx)
127 } 132 }
133
128 //-------------------------------------------------------------- 134 //--------------------------------------------------------------
129 // find the closest block that has that is not filtered by the result 135 // query flags demand a result
130 // filter (in case we did not find an appropriate block in cache).
131 if doResult { 136 if doResult {
132 // save best-match values from cache 137 // if we don't have a result from cache or are in approx mode,
133 entryCache := entry 138 // try storage lookup
134 distCache := dist 139 if len(results) == 0 || approx {
135 dist = nil 140 // get results from local storage
136 141 lclResults, err := m.getLocalStorage(label, query, rf)
137 // if we don't have an exact match, try storage lookup 142 if err == nil {
138 if entryCache == nil || (distCache != nil && !distCache.Equals(math.ZERO)) { 143 // append local results
139 // get entry from local storage 144 results = append(results, lclResults...)
140 var err error
141 if entry, dist, err = m.getLocalStorage(label, query, rf); err != nil {
142 entry = nil
143 dist = nil
144 }
145 // if we have a block from cache, check if it is better than the
146 // block found in the DHT
147 if entryCache != nil && dist != nil && distCache.Cmp(dist) < 0 {
148 entry = entryCache
149 dist = distCache
150 } 145 }
151 } 146 }
152 // if we have a block, send it as response 147 // if we have results, send them as response
153 if entry != nil { 148 for _, result := range results {
149 var pth *path.Path
150 // check if record the route
151 if msg.Flags&enums.DHT_RO_RECORD_ROUTE != 0 && result.Entry.Path != nil {
152 // update get path
153 pth = result.Entry.Path.Clone()
154 pth.SplitPos = pth.NumList
155 pe := pth.NewElement(pth.LastHop, local, back.Receiver())
156 pth.Add(pe)
157 }
158
154 logger.Printf(logger.INFO, "[%s] sending DHT result message to caller", label) 159 logger.Printf(logger.INFO, "[%s] sending DHT result message to caller", label)
155 if err := m.sendResult(ctx, query, entry.Blk, back); err != nil { 160 if err := m.sendResult(ctx, query, result.Entry.Blk, pth, back); err != nil {
156 logger.Printf(logger.ERROR, "[%s] Failed to send DHT result message: %s", label, err.Error()) 161 logger.Printf(logger.ERROR, "[%s] Failed to send DHT result message: %s", label, err.Error())
157 } 162 }
158 } 163 }
159 } 164 }
160 // check if we need to forward message based on filter result 165 //--------------------------------------------------------------
161 if entry != nil && blockHdlr != nil { 166 // query flags demand a result
162 switch blockHdlr.FilterResult(entry.Blk, query.Key(), rf, msg.XQuery) {
163 case blocks.RF_LAST:
164 // no need for further results
165 case blocks.RF_MORE:
166 // possibly more results
167 doForward = true
168 case blocks.RF_DUPLICATE, blocks.RF_IRRELEVANT:
169 // do not forward
170 }
171 }
172 if doForward { 167 if doForward {
173 // build updated GET message 168 // build updated GET message
174 pf.Add(local) 169 pf.Add(local)
@@ -195,6 +190,9 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
195 } 190 }
196 logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message done", label) 191 logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message done", label)
197 192
193 //==================================================================
194 // DHT-P2P-PUT
195 //==================================================================
198 case *message.DHTP2PPutMsg: 196 case *message.DHTP2PPutMsg:
199 //---------------------------------------------------------- 197 //----------------------------------------------------------
200 // DHT-P2P PUT 198 // DHT-P2P PUT
@@ -267,29 +265,29 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
267 logger.Printf(logger.ERROR, "[%s] failed to store DHT entry: %s", label, err.Error()) 265 logger.Printf(logger.ERROR, "[%s] failed to store DHT entry: %s", label, err.Error())
268 } 266 }
269 } 267 }
270
271 //-------------------------------------------------------------- 268 //--------------------------------------------------------------
272 // if the put is for a HELLO block, add the sender to the 269 // if the put is for a HELLO block, add the sender to the
273 // routing table (9.3.2.9) 270 // routing table (9.3.2.9)
274 if btype == enums.BLOCK_TYPE_DHT_HELLO { 271 if btype == enums.BLOCK_TYPE_DHT_HELLO {
275 // get addresses from HELLO block 272 // get addresses from HELLO block
276 hello, err := blocks.ParseHelloFromBytes(msg.Block) 273 hello, err := blocks.ParseHelloBlockFromBytes(msg.Block)
277 if err != nil { 274 if err != nil {
278 logger.Printf(logger.ERROR, "[%s] failed to parse HELLO block: %s", label, err.Error()) 275 logger.Printf(logger.ERROR, "[%s] failed to parse HELLO block: %s", label, err.Error())
279 } else { 276 } else {
280 // check state of bucket for given address 277 // check state of bucket for given address
281 if m.rtable.Check(NewPeerAddress(sender)) == 0 { 278 if m.rtable.Check(NewPeerAddress(hello.PeerID)) == 0 {
282 // we could add the sender to the routing table 279 // we could add the sender to the routing table
283 for _, addr := range hello.Addresses() { 280 for _, addr := range hello.Addresses() {
284 if transport.CanHandleAddress(addr) { 281 if transport.CanHandleAddress(addr) {
285 // try to connect to peer (triggers EV_CONNECTED on success) 282 // try to connect to peer (triggers EV_CONNECTED on success)
286 m.core.TryConnect(sender, addr) 283 if err := m.core.TryConnect(sender, addr); err != nil {
284 logger.Printf(logger.ERROR, "[%s] try-connection to %s failed: %s", label, addr.URI(), err.Error())
285 }
287 } 286 }
288 } 287 }
289 } 288 }
290 } 289 }
291 } 290 }
292
293 //-------------------------------------------------------------- 291 //--------------------------------------------------------------
294 // check if we need to forward 292 // check if we need to forward
295 if !closest || demux { 293 if !closest || demux {
@@ -325,24 +323,100 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
325 } 323 }
326 logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-PUT message done", label) 324 logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-PUT message done", label)
327 325
326 //==================================================================
327 // DHT-P2P-RESULT
328 //==================================================================
328 case *message.DHTP2PResultMsg: 329 case *message.DHTP2PResultMsg:
329 //---------------------------------------------------------- 330 //----------------------------------------------------------
330 // DHT-P2P RESULT 331 // DHT-P2P RESULT
331 //---------------------------------------------------------- 332 //----------------------------------------------------------
332 logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT message", label) 333 logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT message for type %s",
334 label, enums.BlockType(msg.BType).String())
333 335
334 // check task list for handler 336 //--------------------------------------------------------------
337 // check if request is expired (9.5.2.1)
338 if msg.Expires.Expired() {
339 logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT message expired (%s)",
340 label, msg.Expires.String())
341 return false
342 }
343 //--------------------------------------------------------------
344 btype := enums.BlockType(msg.BType)
345 var blkKey *crypto.HashCode
346 blockHdlr, ok := blocks.BlockHandlers[btype]
347 if ok {
348 // reconstruct block instance
349 if block, err := blockHdlr.ParseBlock(msg.Block); err == nil {
350 // validate block (9.5.2.2)
351 if !blockHdlr.ValidateBlockStoreRequest(block) {
352 logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT invalid block -- discarded", label)
353 return false
354 }
355 // Compute block key (9.5.2.4)
356 blkKey = blockHdlr.DeriveBlockKey(block)
357 }
358 } else {
359 logger.Printf(logger.INFO, "[%s] No validator defined for block type %s", label, btype.String())
360 blockHdlr = nil
361 }
362 //--------------------------------------------------------------
363 // verify path (9.5.2.3)
364 var pth *path.Path
365 if msg.GetPathL+msg.PutPathL > 0 {
366 pth = msg.Path(sender)
367 pth.Verify(local)
368 }
369 //--------------------------------------------------------------
370 // if the put is for a HELLO block, add the originator to the
371 // routing table (9.5.2.5)
372 if btype == enums.BLOCK_TYPE_DHT_HELLO {
373 // get addresses from HELLO block
374 hello, err := blocks.ParseHelloBlockFromBytes(msg.Block)
375 if err != nil {
376 logger.Printf(logger.ERROR, "[%s] failed to parse HELLO block: %s", label, err.Error())
377 } else {
378 // check state of bucket for given address
379 if m.rtable.Check(NewPeerAddress(hello.PeerID)) == 0 {
380 // we could add the originator to the routing table
381 for _, addr := range hello.Addresses() {
382 if transport.CanHandleAddress(addr) {
383 // try to connect to peer (triggers EV_CONNECTED on success)
384 if err := m.core.TryConnect(sender, addr); err != nil {
385 logger.Printf(logger.ERROR, "[%s] try-connection to %s failed: %s", label, addr.URI(), err.Error())
386 }
387 }
388 }
389 }
390 }
391 }
392 // message forwarding to responder
335 key := msg.Query.String() 393 key := msg.Query.String()
336 logger.Printf(logger.DBG, "[%s] DHT-P2P-RESULT key = %s", label, key) 394 logger.Printf(logger.DBG, "[%s] DHT-P2P-RESULT key = %s", label, key)
337 handled := false 395 handled := false
338 if list, ok := m.reshdlrs.Get(key); ok { 396 if list, ok := m.reshdlrs.Get(key); ok {
339 for _, rh := range list { 397 for _, rh := range list {
340 logger.Printf(logger.DBG, "[%s] Task #%d for DHT-P2P-RESULT found", label, rh.ID()) 398 logger.Printf(logger.DBG, "[%s] Task #%d for DHT-P2P-RESULT found", label, rh.ID())
341 // handle the message 399
342 go rh.Handle(ctx, msg) 400 //--------------------------------------------------------------
401 // check task list for handler (9.5.2.6)
402 if rh.Flags()&enums.DHT_RO_FIND_APPROXIMATE == 0 && blkKey != nil && !blkKey.Equals(rh.Key()) {
403 // (9.5.2.6.a) derived key mismatch
404 logger.Printf(logger.ERROR, "[%s] derived block key / query key mismatch:", label)
405 logger.Printf(logger.ERROR, "[%s] --> %s != %s", label, blkKey.String(), rh.Key().String())
406 return false
407 }
408 // (9.5.2.6.b+c) check block against query
409 /*
410 if blockHdlr != nil {
411 blockHdlr.FilterBlockResult(block, rh.Key())
412 }
413 */
414
415 //--------------------------------------------------------------
416 // handle the message (forwarding)
417 go rh.Handle(ctx, msg, pth, sender, local)
343 handled = true 418 handled = true
344 } 419 }
345 return true
346 } 420 }
347 if !handled { 421 if !handled {
348 logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT not processed (no handler)", label) 422 logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT not processed (no handler)", label)
@@ -350,6 +424,9 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
350 logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT message done", label) 424 logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT message done", label)
351 return handled 425 return handled
352 426
427 //==================================================================
428 // DHT-P2P-HELLO
429 //==================================================================
353 case *message.DHTP2PHelloMsg: 430 case *message.DHTP2PHelloMsg:
354 //---------------------------------------------------------- 431 //----------------------------------------------------------
355 // DHT-P2P HELLO 432 // DHT-P2P HELLO
@@ -379,8 +456,8 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
379 logger.Printf(logger.INFO, "[%s] Sending HELLO to %s: %s", label, sender, msgOut) 456 logger.Printf(logger.INFO, "[%s] Sending HELLO to %s: %s", label, sender, msgOut)
380 err = m.core.Send(ctx, sender, msgOut) 457 err = m.core.Send(ctx, sender, msgOut)
381 // no error if the message might have been sent 458 // no error if the message might have been sent
382 if err == transport.ErrEndpMaybeSent { 459 if err != nil && err != transport.ErrEndpMaybeSent {
383 err = nil 460 logger.Printf(logger.ERROR, "[%s] Failed to send HELLO message: %s", label, err.Error())
384 } 461 }
385 } 462 }
386 463
@@ -401,9 +478,9 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
401 }) 478 })
402 } 479 }
403 480
404 //-------------------------------------------------------------- 481 //==================================================================
405 // Legacy message types (not implemented) 482 // Legacy message types (not implemented)
406 //-------------------------------------------------------------- 483 //==================================================================
407 484
408 case *message.DHTClientPutMsg: 485 case *message.DHTClientPutMsg:
409 //---------------------------------------------------------- 486 //----------------------------------------------------------
@@ -446,7 +523,7 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
446} 523}
447 524
448// send a result back to caller 525// send a result back to caller
449func (m *Module) sendResult(ctx context.Context, query blocks.Query, blk blocks.Block, back transport.Responder) error { 526func (m *Module) sendResult(ctx context.Context, query blocks.Query, blk blocks.Block, pth *path.Path, back transport.Responder) error {
450 // assemble result message 527 // assemble result message
451 out := message.NewDHTP2PResultMsg() 528 out := message.NewDHTP2PResultMsg()
452 out.BType = uint32(query.Type()) 529 out.BType = uint32(query.Type())
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index 6207f94..5278b13 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -24,6 +24,7 @@ import (
24 "gnunet/config" 24 "gnunet/config"
25 "gnunet/core" 25 "gnunet/core"
26 "gnunet/crypto" 26 "gnunet/crypto"
27 "gnunet/enums"
27 "gnunet/message" 28 "gnunet/message"
28 "gnunet/service" 29 "gnunet/service"
29 "gnunet/service/dht/blocks" 30 "gnunet/service/dht/blocks"
@@ -33,7 +34,6 @@ import (
33 "time" 34 "time"
34 35
35 "github.com/bfix/gospel/logger" 36 "github.com/bfix/gospel/logger"
36 "github.com/bfix/gospel/math"
37) 37)
38 38
39//====================================================================== 39//======================================================================
@@ -41,22 +41,51 @@ import (
41//====================================================================== 41//======================================================================
42 42
43//---------------------------------------------------------------------- 43//----------------------------------------------------------------------
44// Responder for local message handling 44// Responder for local message handling (API, not message-based)
45//---------------------------------------------------------------------- 45//----------------------------------------------------------------------
46 46
47type LocalResponder struct { 47// LocalBlockResponder is a message handler used to handle results for
48 ch chan blocks.Block // out-going channel for incoming blocks 48// locally initiated GET calls
49type LocalBlockResponder struct {
50 ch chan blocks.Block // out-going channel for incoming block results
51 rf blocks.ResultFilter // filter out duplicates
49} 52}
50 53
51func (lr *LocalResponder) Send(ctx context.Context, msg message.Message) error { 54// NewLocalBlockResponder returns a new instance
55func NewLocalBlockResponder() *LocalBlockResponder {
56 return &LocalBlockResponder{
57 ch: make(chan blocks.Block),
58 rf: blocks.NewGenericResultFilter(),
59 }
60}
61
62// C returns the back-channel
63func (lr *LocalBlockResponder) C() <-chan blocks.Block {
64 return lr.ch
65}
66
67// Send interface method: dissect message and relay block if appropriate
68func (lr *LocalBlockResponder) Send(ctx context.Context, msg message.Message) error {
69 // check if incoming message is a DHT-RESULT
70 switch res := msg.(type) {
71 case *message.DHTP2PResultMsg:
72 // deliver incoming blocks
73 go func() {
74 lr.ch <- blocks.NewGenericBlock(res.Block)
75 }()
76 default:
77 logger.Println(logger.WARN, "[local] not a DHT-RESULT -- skipped")
78 }
52 return nil 79 return nil
53} 80}
54 81
55func (lr *LocalResponder) Receiver() string { 82// Receiver is nil for local responders.
56 return "@" 83func (lr *LocalBlockResponder) Receiver() *util.PeerID {
84 return nil
57} 85}
58 86
59func (lr *LocalResponder) Close() { 87// Close back-channel
88func (lr *LocalBlockResponder) Close() {
60 close(lr.ch) 89 close(lr.ch)
61} 90}
62 91
@@ -68,8 +97,9 @@ func (lr *LocalResponder) Close() {
68type Module struct { 97type Module struct {
69 service.ModuleImpl 98 service.ModuleImpl
70 99
71 store store.DHTStore // reference to the block storage mechanism 100 cfg *config.DHTConfig // configuraion parameters
72 core *core.Core // reference to core services 101 store *store.DHTStore // reference to the block storage mechanism
102 core *core.Core // reference to core services
73 103
74 rtable *RoutingTable // routing table 104 rtable *RoutingTable // routing table
75 lastHello *message.DHTP2PHelloMsg // last own HELLO message used; re-create if expired 105 lastHello *message.DHTP2PHelloMsg // last own HELLO message used; re-create if expired
@@ -80,7 +110,7 @@ type Module struct {
80// mechanism for persistence. 110// mechanism for persistence.
81func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Module, err error) { 111func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Module, err error) {
82 // create permanent storage handler 112 // create permanent storage handler
83 var storage store.DHTStore 113 var storage *store.DHTStore
84 if storage, err = store.NewDHTStore(cfg.Storage); err != nil { 114 if storage, err = store.NewDHTStore(cfg.Storage); err != nil {
85 return 115 return
86 } 116 }
@@ -90,6 +120,7 @@ func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Mod
90 // return module instance 120 // return module instance
91 m = &Module{ 121 m = &Module{
92 ModuleImpl: *service.NewModuleImpl(), 122 ModuleImpl: *service.NewModuleImpl(),
123 cfg: cfg,
93 store: storage, 124 store: storage,
94 core: c, 125 core: c,
95 rtable: rt, 126 rtable: rt,
@@ -99,18 +130,58 @@ func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Mod
99 pulse := time.Duration(cfg.Heartbeat) * time.Second 130 pulse := time.Duration(cfg.Heartbeat) * time.Second
100 listener := m.Run(ctx, m.event, m.Filter(), pulse, m.heartbeat) 131 listener := m.Run(ctx, m.event, m.Filter(), pulse, m.heartbeat)
101 c.Register("dht", listener) 132 c.Register("dht", listener)
133
134 // run periodic tasks (8.2. peer discovery)
135 ticker := time.NewTicker(5 * time.Minute)
136 key := crypto.Hash(m.core.PeerID().Data)
137 flags := uint16(enums.DHT_RO_FIND_APPROXIMATE | enums.DHT_RO_DEMULTIPLEX_EVERYWHERE)
138 var resCh <-chan blocks.Block
139 go func() {
140 for {
141 select {
142 // initiate peer discovery
143 case <-ticker.C:
144 // query DHT for our own HELLO block
145 query := blocks.NewGenericQuery(key, enums.BLOCK_TYPE_DHT_HELLO, flags)
146 resCh = m.Get(ctx, query)
147
148 // handle peer discover results
149 case res := <-resCh:
150 // check for correct type
151 btype := res.Type()
152 if btype == enums.BLOCK_TYPE_DHT_HELLO {
153 hb, ok := res.(*blocks.HelloBlock)
154 if !ok {
155 logger.Printf(logger.WARN, "[dht] peer discovery received invalid block data")
156 } else {
157 // cache HELLO block
158 m.rtable.CacheHello(hb)
159 // add sender to routing table
160 m.rtable.Add(NewPeerAddress(hb.PeerID), "dht")
161 }
162 } else {
163 logger.Printf(logger.WARN, "[dht] peer discovery received invalid block type %s", btype.String())
164 }
165
166 // termination
167 case <-ctx.Done():
168 ticker.Stop()
169 return
170 }
171 }
172 }()
102 return 173 return
103} 174}
104 175
105//---------------------------------------------------------------------- 176//----------------------------------------------------------------------
106// DHT methods for local use 177// DHT methods for local use (API)
107//---------------------------------------------------------------------- 178//----------------------------------------------------------------------
108 179
109// Get blocks from the DHT ["dht:get"] 180// Get blocks from the DHT ["dht:get"]
110// Locally request blocks for a given query. The res channel will deliver the 181// Locally request blocks for a given query. The res channel will deliver the
111// returned results to the caller; the channel is closed if no further blocks 182// returned results to the caller; the channel is closed if no further blocks
112// are expected or the query times out. 183// are expected or the query times out.
113func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.Block) { 184func (m *Module) Get(ctx context.Context, query blocks.Query) <-chan blocks.Block {
114 // get the block handler for given block type to construct an empty 185 // get the block handler for given block type to construct an empty
115 // result filter. If no handler is defined, a default PassResultFilter 186 // result filter. If no handler is defined, a default PassResultFilter
116 // is created. 187 // is created.
@@ -123,14 +194,14 @@ func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.B
123 logger.Println(logger.WARN, "[dht] unknown result filter implementation -- skipped") 194 logger.Println(logger.WARN, "[dht] unknown result filter implementation -- skipped")
124 } 195 }
125 // get additional query parameters 196 // get additional query parameters
126 xquery, ok := util.GetParam[[]byte](query.Params(), "xquery") 197 xquery, _ := util.GetParam[[]byte](query.Params(), "xquery")
127 198
128 // assemble a new GET message 199 // assemble a new GET message
129 msg := message.NewDHTP2PGetMsg() 200 msg := message.NewDHTP2PGetMsg()
130 msg.BType = uint32(query.Type()) 201 msg.BType = uint32(query.Type())
131 msg.Flags = query.Flags() 202 msg.Flags = query.Flags()
132 msg.HopCount = 0 203 msg.HopCount = 0
133 msg.ReplLevel = 10 204 msg.ReplLevel = uint16(m.cfg.Routing.ReplLevel)
134 msg.PeerFilter = blocks.NewPeerFilter() 205 msg.PeerFilter = blocks.NewPeerFilter()
135 msg.ResFilter = rf.Bytes() 206 msg.ResFilter = rf.Bytes()
136 msg.RfSize = uint16(len(msg.ResFilter)) 207 msg.RfSize = uint16(len(msg.ResFilter))
@@ -138,15 +209,13 @@ func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.B
138 msg.MsgSize += msg.RfSize + uint16(len(xquery)) 209 msg.MsgSize += msg.RfSize + uint16(len(xquery))
139 210
140 // compose a response channel and handler 211 // compose a response channel and handler
141 res = make(chan blocks.Block) 212 hdlr := NewLocalBlockResponder()
142 hdlr := &LocalResponder{ 213
143 ch: res,
144 }
145 // time-out handling 214 // time-out handling
146 ttl, ok := util.GetParam[time.Duration](query.Params(), "timeout") 215 ttl, ok := util.GetParam[time.Duration](query.Params(), "timeout")
147 if !ok { 216 if !ok {
148 // defaults to 10 minutes 217 // defaults to 10 minutes
149 ttl = 600 * time.Second 218 ttl = 10 * time.Minute
150 } 219 }
151 lctx, cancel := context.WithTimeout(ctx, ttl) 220 lctx, cancel := context.WithTimeout(ctx, ttl)
152 221
@@ -159,23 +228,12 @@ func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.B
159 hdlr.Close() 228 hdlr.Close()
160 cancel() 229 cancel()
161 }() 230 }()
162 return res 231 return hdlr.C()
163} 232}
164 233
165// GetApprox returns the first block not excluded ["dht:getapprox"] 234// GetApprox returns the first block not excluded ["dht:getapprox"]
166func (m *Module) GetApprox(ctx context.Context, query blocks.Query, excl func(*store.DHTEntry) bool) (entry *store.DHTEntry, dist *math.Int, err error) { 235func (m *Module) GetApprox(ctx context.Context, query blocks.Query, rf blocks.ResultFilter) (results []*store.DHTResult, err error) {
167 var val any 236 return m.store.GetApprox("dht", query, rf)
168 if entry, val, err = m.store.GetApprox(query, excl); err != nil {
169 return
170 }
171 hc, ok := val.(*crypto.HashCode)
172 if !ok {
173 err = errors.New("no approx result")
174 }
175 asked := NewQueryAddress(query.Key())
176 found := NewQueryAddress(hc)
177 dist, _ = found.Distance(asked)
178 return
179} 237}
180 238
181// Put a block into the DHT ["dht:put"] 239// Put a block into the DHT ["dht:put"]
@@ -191,7 +249,7 @@ func (m *Module) Put(ctx context.Context, query blocks.Query, block blocks.Block
191 msg.Flags = query.Flags() 249 msg.Flags = query.Flags()
192 msg.HopCount = 0 250 msg.HopCount = 0
193 msg.PeerFilter = blocks.NewPeerFilter() 251 msg.PeerFilter = blocks.NewPeerFilter()
194 msg.ReplLvl = 10 252 msg.ReplLvl = uint16(m.cfg.Routing.ReplLevel)
195 msg.Expiration = expire 253 msg.Expiration = expire
196 msg.Block = block.Bytes() 254 msg.Block = block.Bytes()
197 msg.Key = query.Key().Clone() 255 msg.Key = query.Key().Clone()
diff --git a/src/gnunet/service/dht/path/elements.go b/src/gnunet/service/dht/path/elements.go
index 5e26e57..53d1cfd 100644
--- a/src/gnunet/service/dht/path/elements.go
+++ b/src/gnunet/service/dht/path/elements.go
@@ -38,8 +38,8 @@ var (
38//---------------------------------------------------------------------- 38//----------------------------------------------------------------------
39// Entry is an element of the path list 39// Entry is an element of the path list
40type Entry struct { 40type Entry struct {
41 Signer *util.PeerID // path element signer
42 Signature *util.PeerSignature // path element signature 41 Signature *util.PeerSignature // path element signature
42 Signer *util.PeerID // path element signer
43} 43}
44 44
45// Size returns the size of a path element in wire format 45// Size returns the size of a path element in wire format
diff --git a/src/gnunet/service/dht/path/handling.go b/src/gnunet/service/dht/path/handling.go
index 412f646..b225c82 100644
--- a/src/gnunet/service/dht/path/handling.go
+++ b/src/gnunet/service/dht/path/handling.go
@@ -35,8 +35,8 @@ import (
35 35
36// path flags 36// path flags
37const ( 37const (
38 PathTruncated = iota 38 PathTruncated = 1
39 PathLastHop 39 PathLastHop = 2
40) 40)
41 41
42// Path is the complete list of verified hops a message travelled. 42// Path is the complete list of verified hops a message travelled.
@@ -48,6 +48,7 @@ type Path struct {
48 Expire util.AbsoluteTime `` // expiration time 48 Expire util.AbsoluteTime `` // expiration time
49 TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin (optional) 49 TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin (optional)
50 NumList uint16 `order:"big"` // number of list entries 50 NumList uint16 `order:"big"` // number of list entries
51 SplitPos uint16 `order:"big"` // optional split position
51 List []*Entry `size:"NumList"` // list of path entries 52 List []*Entry `size:"NumList"` // list of path entries
52 LastSig *util.PeerSignature `opt:"(Isused)"` // last hop signature 53 LastSig *util.PeerSignature `opt:"(Isused)"` // last hop signature
53 LastHop *util.PeerID `opt:"(IsUsed)"` // last hop peer id 54 LastHop *util.PeerID `opt:"(IsUsed)"` // last hop peer id
@@ -72,6 +73,7 @@ func NewPath(bh *crypto.HashCode, expire util.AbsoluteTime) *Path {
72 Expire: expire, 73 Expire: expire,
73 TruncOrigin: nil, 74 TruncOrigin: nil,
74 NumList: 0, 75 NumList: 0,
76 SplitPos: 0,
75 List: make([]*Entry, 0), 77 List: make([]*Entry, 0),
76 LastSig: nil, 78 LastSig: nil,
77 LastHop: nil, 79 LastHop: nil,
@@ -81,7 +83,7 @@ func NewPath(bh *crypto.HashCode, expire util.AbsoluteTime) *Path {
81// NewPathFromBytes reconstructs a path instance from binary data. The layout 83// NewPathFromBytes reconstructs a path instance from binary data. The layout
82// of the data must match with the layout used in Path.Bytes(). 84// of the data must match with the layout used in Path.Bytes().
83func NewPathFromBytes(buf []byte) (path *Path, err error) { 85func NewPathFromBytes(buf []byte) (path *Path, err error) {
84 if buf == nil || len(buf) == 0 { 86 if len(buf) == 0 {
85 return 87 return
86 } 88 }
87 path = new(Path) 89 path = new(Path)
@@ -116,6 +118,7 @@ func (p *Path) Clone() *Path {
116 Expire: p.Expire, 118 Expire: p.Expire,
117 TruncOrigin: p.TruncOrigin, 119 TruncOrigin: p.TruncOrigin,
118 NumList: p.NumList, 120 NumList: p.NumList,
121 SplitPos: p.SplitPos,
119 List: util.Clone(p.List), 122 List: util.Clone(p.List),
120 LastSig: p.LastSig, 123 LastSig: p.LastSig,
121 LastHop: p.LastHop, 124 LastHop: p.LastHop,
diff --git a/src/gnunet/service/dht/resulthandler.go b/src/gnunet/service/dht/resulthandler.go
index faab93e..85895f8 100644
--- a/src/gnunet/service/dht/resulthandler.go
+++ b/src/gnunet/service/dht/resulthandler.go
@@ -22,8 +22,10 @@ import (
22 "bytes" 22 "bytes"
23 "context" 23 "context"
24 "gnunet/crypto" 24 "gnunet/crypto"
25 "gnunet/enums"
25 "gnunet/message" 26 "gnunet/message"
26 "gnunet/service/dht/blocks" 27 "gnunet/service/dht/blocks"
28 "gnunet/service/dht/path"
27 "gnunet/transport" 29 "gnunet/transport"
28 "gnunet/util" 30 "gnunet/util"
29 "time" 31 "time"
@@ -52,7 +54,10 @@ type ResultHandler interface {
52 Done() bool 54 Done() bool
53 55
54 // Key returns the query/store key as string 56 // Key returns the query/store key as string
55 Key() string 57 Key() *crypto.HashCode
58
59 // Flags returns the query flags
60 Flags() uint16
56 61
57 // Compare two result handlers 62 // Compare two result handlers
58 Compare(ResultHandler) int 63 Compare(ResultHandler) int
@@ -61,7 +66,7 @@ type ResultHandler interface {
61 Merge(ResultHandler) bool 66 Merge(ResultHandler) bool
62 67
63 // Handle result message 68 // Handle result message
64 Handle(context.Context, *message.DHTP2PResultMsg) bool 69 Handle(ctx context.Context, msg *message.DHTP2PResultMsg, pth *path.Path, sender, local *util.PeerID) bool
65} 70}
66 71
67// Compare return values 72// Compare return values
@@ -108,8 +113,13 @@ func (t *GenericResultHandler) ID() int {
108} 113}
109 114
110// Key returns the key string 115// Key returns the key string
111func (t *GenericResultHandler) Key() string { 116func (t *GenericResultHandler) Key() *crypto.HashCode {
112 return t.key.String() 117 return t.key
118}
119
120// Flags returns the query flags
121func (t *GenericResultHandler) Flags() uint16 {
122 return t.flags
113} 123}
114 124
115// Done returns true if the result handler is no longer active. 125// Done returns true if the result handler is no longer active.
@@ -176,15 +186,28 @@ func NewForwardResultHandler(msgIn message.Message, rf blocks.ResultFilter, back
176} 186}
177 187
178// Handle incoming DHT-P2P-RESULT message 188// Handle incoming DHT-P2P-RESULT message
179func (t *ForwardResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg) bool { 189func (t *ForwardResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg, pth *path.Path, sender, local *util.PeerID) bool {
180 // don't send result if it is filtered out 190 // don't send result if it is filtered out
181 if !t.Proceed(ctx, msg) { 191 if !t.Proceed(ctx, msg) {
182 logger.Printf(logger.DBG, "[dht-task-%d] result filtered out -- already known", t.id) 192 logger.Printf(logger.DBG, "[dht-task-%d] result filtered out -- already known", t.id)
183 return false 193 return false
184 } 194 }
195 // extend path if route is recorded
196 pp := pth.Clone()
197 if msg.Flags&enums.DHT_RO_RECORD_ROUTE != 0 {
198 // yes: add path element for remote receivers
199 if rcv := t.resp.Receiver(); rcv != nil {
200 pe := pp.NewElement(sender, local, rcv)
201 pp.Add(pe)
202 }
203 }
204
205 // build updated PUT message
206 msgOut := msg.Update(pp)
207
185 // send result message back to originator (result forwarding). 208 // send result message back to originator (result forwarding).
186 logger.Printf(logger.INFO, "[dht-task-%d] sending result back to originator", t.id) 209 logger.Printf(logger.INFO, "[dht-task-%d] sending result back to originator", t.id)
187 if err := t.resp.Send(ctx, msg); err != nil && err != transport.ErrEndpMaybeSent { 210 if err := t.resp.Send(ctx, msgOut); err != nil && err != transport.ErrEndpMaybeSent {
188 logger.Printf(logger.ERROR, "[dht-task-%d] sending result back to originator failed: %s", t.id, err.Error()) 211 logger.Printf(logger.ERROR, "[dht-task-%d] sending result back to originator failed: %s", t.id, err.Error())
189 return false 212 return false
190 } 213 }
@@ -200,7 +223,7 @@ func (t *ForwardResultHandler) Compare(h ResultHandler) int {
200 return RHC_DIFFER 223 return RHC_DIFFER
201 } 224 }
202 // check for same recipient 225 // check for same recipient
203 if ht.resp.Receiver() != t.resp.Receiver() { 226 if ht.resp.Receiver().Equals(t.resp.Receiver()) {
204 logger.Printf(logger.DBG, "[frh] recipients differ: %s -- %s", ht.resp.Receiver(), t.resp.Receiver()) 227 logger.Printf(logger.DBG, "[frh] recipients differ: %s -- %s", ht.resp.Receiver(), t.resp.Receiver())
205 return RHC_DIFFER 228 return RHC_DIFFER
206 } 229 }
@@ -237,7 +260,7 @@ func (t *ForwardResultHandler) Merge(h ResultHandler) bool {
237//---------------------------------------------------------------------- 260//----------------------------------------------------------------------
238 261
239// ResultHandlerFcn is the function prototype for custom handlers: 262// ResultHandlerFcn is the function prototype for custom handlers:
240type ResultHandlerFcn func(context.Context, *message.DHTP2PResultMsg, chan<- any) bool 263type ResultHandlerFcn func(context.Context, *message.DHTP2PResultMsg, *path.Path, chan<- any) bool
241 264
242// DirectResultHandler for local DHT-P2P-GET requests 265// DirectResultHandler for local DHT-P2P-GET requests
243type DirectResultHandler struct { 266type DirectResultHandler struct {
@@ -262,7 +285,7 @@ func NewDirectResultHandler(msgIn message.Message, rf blocks.ResultFilter, hdlr
262} 285}
263 286
264// Handle incoming DHT-P2P-RESULT message 287// Handle incoming DHT-P2P-RESULT message
265func (t *DirectResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg) bool { 288func (t *DirectResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg, pth *path.Path, sender, local *util.PeerID) bool {
266 // don't send result if it is filtered out 289 // don't send result if it is filtered out
267 if !t.Proceed(ctx, msg) { 290 if !t.Proceed(ctx, msg) {
268 logger.Printf(logger.DBG, "[dht-task-%d] result filtered out -- already known", t.id) 291 logger.Printf(logger.DBG, "[dht-task-%d] result filtered out -- already known", t.id)
@@ -271,7 +294,7 @@ func (t *DirectResultHandler) Handle(ctx context.Context, msg *message.DHTP2PRes
271 // check for correct message type and handler function 294 // check for correct message type and handler function
272 if t.hdlr != nil { 295 if t.hdlr != nil {
273 logger.Printf(logger.INFO, "[dht-task-%d] handling result message", t.id) 296 logger.Printf(logger.INFO, "[dht-task-%d] handling result message", t.id)
274 return t.hdlr(ctx, msg, t.rc) 297 return t.hdlr(ctx, msg, pth, t.rc)
275 } 298 }
276 return false 299 return false
277} 300}
@@ -318,7 +341,7 @@ func NewResultHandlerList() *ResultHandlerList {
318// Add handler to list 341// Add handler to list
319func (t *ResultHandlerList) Add(hdlr ResultHandler) bool { 342func (t *ResultHandlerList) Add(hdlr ResultHandler) bool {
320 // get current list of handlers for key 343 // get current list of handlers for key
321 key := hdlr.Key() 344 key := hdlr.Key().String()
322 list, ok := t.list.Get(key, 0) 345 list, ok := t.list.Get(key, 0)
323 modified := false 346 modified := false
324 if !ok { 347 if !ok {
diff --git a/src/gnunet/service/dht/routingtable.go b/src/gnunet/service/dht/routingtable.go
index 98f0819..20bf5fc 100644
--- a/src/gnunet/service/dht/routingtable.go
+++ b/src/gnunet/service/dht/routingtable.go
@@ -25,6 +25,7 @@ import (
25 "gnunet/config" 25 "gnunet/config"
26 "gnunet/crypto" 26 "gnunet/crypto"
27 "gnunet/service/dht/blocks" 27 "gnunet/service/dht/blocks"
28 "gnunet/service/store"
28 "gnunet/util" 29 "gnunet/util"
29 "sync" 30 "sync"
30 "time" 31 "time"
@@ -84,11 +85,7 @@ func (addr *PeerAddress) Equals(p *PeerAddress) bool {
84// Distance between two addresses: returns a distance value and a 85// Distance between two addresses: returns a distance value and a
85// bucket index (smaller index = less distant). 86// bucket index (smaller index = less distant).
86func (addr *PeerAddress) Distance(p *PeerAddress) (*math.Int, int) { 87func (addr *PeerAddress) Distance(p *PeerAddress) (*math.Int, int) {
87 d := make([]byte, 64) 88 r := util.Distance(addr.Key.Bits, p.Key.Bits)
88 for i := range d {
89 d[i] = addr.Key.Bits[i] ^ p.Key.Bits[i]
90 }
91 r := math.NewIntFromBytes(d)
92 return r, 512 - r.BitLen() 89 return r, 512 - r.BitLen()
93} 90}
94 91
@@ -371,18 +368,29 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) {
371 368
372//---------------------------------------------------------------------- 369//----------------------------------------------------------------------
373 370
374func (rt *RoutingTable) BestHello(addr *PeerAddress, rf blocks.ResultFilter) (hb *blocks.HelloBlock, dist *math.Int) { 371// LookupHello returns blocks from the HELLO cache for given query.
375 // iterate over cached HELLOs to find (best) match first 372func (rt *RoutingTable) LookupHello(addr *PeerAddress, rf blocks.ResultFilter, approx bool) (results []*store.DHTResult) {
376 _ = rt.helloCache.ProcessRange(func(key string, val *blocks.HelloBlock, _ int) error { 373 // iterate over cached HELLOs to find matches;
374 // approximate search is limited by distance (max. diff for bucket index is 16)
375 _ = rt.helloCache.ProcessRange(func(key string, hb *blocks.HelloBlock, _ int) error {
377 // check if block is excluded by result filter 376 // check if block is excluded by result filter
378 if !rf.Contains(val) { 377 var result *store.DHTResult
379 // check for better match 378 if !rf.Contains(hb) {
380 p := NewPeerAddress(val.PeerID) 379 // no: possible result, compute distance
381 d, _ := addr.Distance(p) 380 p := NewPeerAddress(hb.PeerID)
382 if hb == nil || d.Cmp(dist) < 0 { 381 dist, idx := addr.Distance(p)
383 hb = val 382 result = &store.DHTResult{
384 dist = d 383 Entry: &store.DHTEntry{
384 Blk: hb,
385 },
386 Dist: dist,
387 }
388 // check if we need to add result
389 if (approx && idx < 16) || idx == 0 {
390 results = append(results, result)
385 } 391 }
392 } else {
393 logger.Printf(logger.DBG, "[RT] GET-HELLO: cache block is filtered")
386 } 394 }
387 return nil 395 return nil
388 }, true) 396 }, true)
diff --git a/src/gnunet/service/dht/routingtable_test.go b/src/gnunet/service/dht/routingtable_test.go
index 42e2349..02ddc52 100644
--- a/src/gnunet/service/dht/routingtable_test.go
+++ b/src/gnunet/service/dht/routingtable_test.go
@@ -70,7 +70,7 @@ func TestRT(t *testing.T) {
70 // helper functions 70 // helper functions
71 genRemotePeer := func() *PeerAddress { 71 genRemotePeer := func() *PeerAddress {
72 d := make([]byte, 32) 72 d := make([]byte, 32)
73 _, _ = rand.Read(d) 73 _, _ = rand.Read(d) //nolint:gosec // good enough for testing
74 return NewPeerAddress(util.NewPeerID(d)) 74 return NewPeerAddress(util.NewPeerID(d))
75 } 75 }
76 76
@@ -86,10 +86,10 @@ func TestRT(t *testing.T) {
86 for i := range tasks { 86 for i := range tasks {
87 tasks[i] = new(Entry) 87 tasks[i] = new(Entry)
88 tasks[i].addr = genRemotePeer() 88 tasks[i].addr = genRemotePeer()
89 tasks[i].born = rand.Int63n(EPOCHS) 89 tasks[i].born = rand.Int63n(EPOCHS) //nolint:gosec // good enough for testing
90 tasks[i].ttl = 1000 + rand.Int63n(7000) 90 tasks[i].ttl = 1000 + rand.Int63n(7000) //nolint:gosec // good enough for testing
91 tasks[i].drop = 2000 + rand.Int63n(3000) 91 tasks[i].drop = 2000 + rand.Int63n(3000) //nolint:gosec // good enough for testing
92 tasks[i].revive = rand.Int63n(2000) 92 tasks[i].revive = rand.Int63n(2000) //nolint:gosec // good enough for testing
93 tasks[i].online = false 93 tasks[i].online = false
94 } 94 }
95 95
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index c431d48..0f4781d 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -200,6 +200,7 @@ func (m *Module) ResolveAbsolute(
200// ResolveRelative resolves a relative path (to a given zone) recursively by 200// ResolveRelative resolves a relative path (to a given zone) recursively by
201// processing simple (PKEY,Label) lookups in sequence and handle intermediate 201// processing simple (PKEY,Label) lookups in sequence and handle intermediate
202// GNS record types 202// GNS record types
203//nolint:gocyclo // life sometimes is complex...
203func (m *Module) ResolveRelative( 204func (m *Module) ResolveRelative(
204 ctx context.Context, 205 ctx context.Context,
205 labels []string, 206 labels []string,
diff --git a/src/gnunet/service/namecache/module.go b/src/gnunet/service/namecache/module.go
index 642355b..616c0b5 100644
--- a/src/gnunet/service/namecache/module.go
+++ b/src/gnunet/service/namecache/module.go
@@ -20,6 +20,7 @@ package namecache
20 20
21import ( 21import (
22 "context" 22 "context"
23 "errors"
23 "gnunet/config" 24 "gnunet/config"
24 "gnunet/core" 25 "gnunet/core"
25 "gnunet/service" 26 "gnunet/service"
@@ -39,7 +40,7 @@ import (
39type Module struct { 40type Module struct {
40 service.ModuleImpl 41 service.ModuleImpl
41 42
42 cache store.DHTStore // transient block cache 43 cache *store.DHTStore // transient block cache
43} 44}
44 45
45// NewModule creates a new module instance. 46// NewModule creates a new module instance.
@@ -67,13 +68,18 @@ func (m *Module) Import(fcm map[string]any) {
67 68
68//---------------------------------------------------------------------- 69//----------------------------------------------------------------------
69 70
70// Get an entry from the cache if available. 71// Get entry from the cache if available.
71func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) { 72func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) {
72 var e *store.DHTEntry 73 var e []*store.DHTEntry
73 if e, err = m.cache.Get(query); err != nil { 74 rf := blocks.NewGenericResultFilter()
75 if e, err = m.cache.Get("namecache", query, rf); err != nil {
74 return 76 return
75 } 77 }
76 err = blocks.Unwrap(e.Blk, block) 78 if len(e) != 1 {
79 err = errors.New("only one DHT entry exppected")
80 } else {
81 err = blocks.Unwrap(e[0].Blk, block)
82 }
77 return 83 return
78} 84}
79 85
diff --git a/src/gnunet/service/store/store_fs.go b/src/gnunet/service/store/store_dht.go
index bbbe7cc..802cf60 100644
--- a/src/gnunet/service/store/store_fs.go
+++ b/src/gnunet/service/store/store_dht.go
@@ -21,6 +21,7 @@ package store
21import ( 21import (
22 "encoding/hex" 22 "encoding/hex"
23 "fmt" 23 "fmt"
24 "gnunet/crypto"
24 "gnunet/service/dht/blocks" 25 "gnunet/service/dht/blocks"
25 "gnunet/service/dht/path" 26 "gnunet/service/dht/path"
26 "gnunet/util" 27 "gnunet/util"
@@ -35,9 +36,60 @@ import (
35// Filesystem-based storage 36// Filesystem-based storage
36//============================================================ 37//============================================================
37 38
38// FileStore implements a filesystem-based storage mechanism for 39//------------------------------------------------------------
40// DHT entry is an entity stored in the DHT
41//------------------------------------------------------------
42
43// DHTEntry to be stored to/retrieved from local storage
44type DHTEntry struct {
45 Blk blocks.Block // reference to DHT block
46 Path *path.Path // associated put path
47}
48
49//------------------------------------------------------------
50// DHT result is a single DHT result
51//------------------------------------------------------------
52
53// Result as returned by local DHT queries
54type DHTResult struct {
55 Entry *DHTEntry // reference to DHT entry
56 Dist *math.Int // distance of entry to query key
57}
58
59//------------------------------------------------------------
60
61type DHTResultSet struct {
62 list []*DHTResult // list of DHT results
63 pos int // iterator position
64}
65
66func NewDHTResultSet() *DHTResultSet {
67 return &DHTResultSet{
68 list: make([]*DHTResult, 0),
69 pos: 0,
70 }
71}
72
73func (rs *DHTResultSet) Add(r *DHTResult) {
74 rs.list = append(rs.list, r)
75}
76
77func (rs *DHTResultSet) Next() (result *DHTResult) {
78 if rs.pos == len(rs.list) {
79 return nil
80 }
81 result = rs.list[rs.pos]
82 rs.pos++
83 return
84}
85
86//------------------------------------------------------------
87// DHT store
88//------------------------------------------------------------
89
90// DHTStore implements a filesystem-based storage mechanism for
39// DHT queries and blocks. 91// DHT queries and blocks.
40type FileStore struct { 92type DHTStore struct {
41 path string // storage path 93 path string // storage path
42 cache bool // storage works as cache 94 cache bool // storage works as cache
43 args util.ParameterSet // arguments / settings 95 args util.ParameterSet // arguments / settings
@@ -53,10 +105,10 @@ type FileStore struct {
53 size int // size of cache (number of entries) 105 size int // size of cache (number of entries)
54} 106}
55 107
56// NewFileStore instantiates a new file storage. 108// NewDHTStore instantiates a new file storage handler.
57func NewFileStore(spec util.ParameterSet) (DHTStore, error) { 109func NewDHTStore(spec util.ParameterSet) (*DHTStore, error) {
58 // create file store handler 110 // create file store handler
59 fs := new(FileStore) 111 fs := new(DHTStore)
60 fs.args = spec 112 fs.args = spec
61 113
62 // get parameter 114 // get parameter
@@ -93,7 +145,7 @@ func NewFileStore(spec util.ParameterSet) (DHTStore, error) {
93} 145}
94 146
95// Close file storage. 147// Close file storage.
96func (s *FileStore) Close() (err error) { 148func (s *DHTStore) Close() (err error) {
97 if !s.cache { 149 if !s.cache {
98 // close database connection 150 // close database connection
99 err = s.meta.Close() 151 err = s.meta.Close()
@@ -102,7 +154,7 @@ func (s *FileStore) Close() (err error) {
102} 154}
103 155
104// Put block into storage under given key 156// Put block into storage under given key
105func (s *FileStore) Put(query blocks.Query, entry *DHTEntry) (err error) { 157func (s *DHTStore) Put(query blocks.Query, entry *DHTEntry) (err error) {
106 // check for free space 158 // check for free space
107 if !s.cache { 159 if !s.cache {
108 if int(s.totalSize>>30) > s.maxSpace { 160 if int(s.totalSize>>30) > s.maxSpace {
@@ -122,9 +174,10 @@ func (s *FileStore) Put(query blocks.Query, entry *DHTEntry) (err error) {
122 // compile metadata 174 // compile metadata
123 now := util.AbsoluteTimeNow() 175 now := util.AbsoluteTimeNow()
124 meta := &FileMetadata{ 176 meta := &FileMetadata{
125 key: query.Key().Bits, 177 key: query.Key(),
126 size: uint64(blkSize), 178 size: uint64(blkSize),
127 btype: btype, 179 btype: btype,
180 bhash: crypto.Hash(entry.Blk.Bytes()),
128 expires: expire, 181 expires: expire,
129 stored: now, 182 stored: now,
130 lastUsed: now, 183 lastUsed: now,
@@ -146,75 +199,73 @@ func (s *FileStore) Put(query blocks.Query, entry *DHTEntry) (err error) {
146} 199}
147 200
148// Get block with given key from storage 201// Get block with given key from storage
149func (s *FileStore) Get(query blocks.Query) (entry *DHTEntry, err error) { 202func (s *DHTStore) Get(label string, query blocks.Query, rf blocks.ResultFilter) (results []*DHTEntry, err error) {
150 // check if we have metadata for the query 203 // check if we have metadata for the query
151 key := query.Key().Bits 204 var mds []*FileMetadata
152 btype := query.Type() 205 if mds, err = s.meta.Get(query); err != nil || len(mds) == 0 {
153 var md *FileMetadata
154 if md, err = s.meta.Get(key, btype); err != nil || md == nil {
155 return 206 return
156 } 207 }
157 // check for expired entry 208 // traverse list of results
158 if md.expires.Expired() { 209 for _, md := range mds {
159 err = s.dropFile(md) 210 // check for expired entry
160 return 211 if md.expires.Expired() {
161 } 212 if err = s.dropFile(md); err != nil {
162 // mark the block as newly used 213 logger.Printf(logger.ERROR, "[%s] can't drop DHT file: %s", label, err)
163 if err = s.meta.Used(key, btype); err != nil { 214 }
164 return 215 continue
216 }
217 // check for filtered block
218 if rf.ContainsHash(md.bhash) {
219 continue
220 }
221 // read entry from storage
222 var entry *DHTEntry
223 if entry, err = s.readEntry(md.key.Bits); err != nil {
224 logger.Printf(logger.ERROR, "[%s] can't read DHT entry: %s", label, err)
225 continue
226 }
227 results = append(results, entry)
228 // mark the block as newly used
229 if err = s.meta.Used(md.key.Bits, md.btype); err != nil {
230 logger.Printf(logger.ERROR, "[%s] can't flag DHT entry as used: %s", label, err)
231 continue
232 }
165 } 233 }
166 return s.readEntry(key) 234 return
167} 235}
168 236
169// GetApprox returns the best-matching value with given key from storage 237// GetApprox returns the best-matching value with given key from storage
170// that is not excluded 238// that is not excluded
171func (s *FileStore) GetApprox(query blocks.Query, excl func(*DHTEntry) bool) (entry *DHTEntry, key any, err error) { 239func (s *DHTStore) GetApprox(label string, query blocks.Query, rf blocks.ResultFilter) (results []*DHTResult, err error) {
172 var bestKey []byte 240 // iterate over all keys; process each metadata instance
173 var bestEntry *DHTEntry 241 // (append to results if appropriate)
174 var bestDist *math.Int 242 process := func(md *FileMetadata) {
175 // distance function 243 // check for filtered block.
176 dist := func(a, b []byte) *math.Int { 244 if rf.ContainsHash(md.bhash) {
177 c := make([]byte, len(a)) 245 // filtered out...
178 for i := range a { 246 return
179 c[i] = a[i] ^ b[i]
180 } 247 }
181 return math.NewIntFromBytes(c) 248 // check distance (max. 16 bucktes off)
182 } 249 dist := util.Distance(md.key.Bits, query.Key().Bits)
183 // iterate over all keys 250 if (512 - dist.BitLen()) > 16 {
184 check := func(md *FileMetadata) { 251 return
185 // check for better match
186 d := dist(md.key, query.Key().Bits)
187 var entry *DHTEntry
188 if bestKey == nil || d.Cmp(bestDist) < 0 {
189 // we might have a match. check block for exclusion
190 if entry, err = s.readEntry(md.key); err != nil {
191 logger.Printf(logger.ERROR, "[dhtstore] failed to retrieve block for %s", hex.EncodeToString(md.key))
192 return
193 }
194 if excl(entry) {
195 return
196 }
197 // remember best match
198 bestKey = md.key
199 bestEntry = entry
200 bestDist = d
201 } 252 }
202 } 253 // read entry from storage
203 if err = s.meta.Traverse(check); err != nil { 254 var entry *DHTEntry
204 return 255 if entry, err = s.readEntry(md.key.Bits); err != nil {
205 } 256 logger.Printf(logger.ERROR, "[%s] failed to retrieve block for %s", label, md.key.String())
206 if bestEntry != nil {
207 // mark the block as newly used
208 if err = s.meta.Used(bestKey, bestEntry.Blk.Type()); err != nil {
209 return 257 return
210 } 258 }
259 // add to result list
260 result := &DHTResult{
261 Entry: entry,
262 Dist: dist,
263 }
264 results = append(results, result)
211 } 265 }
212 return bestEntry, bestDist, nil 266 // traverse mestadata database
213} 267 err = s.meta.Traverse(process)
214 268 return
215// Get a list of all stored block keys (generic query).
216func (s *FileStore) List() ([]blocks.Query, error) {
217 return nil, ErrStoreNoList
218} 269}
219 270
220//---------------------------------------------------------------------- 271//----------------------------------------------------------------------
@@ -227,7 +278,7 @@ type entryLayout struct {
227} 278}
228 279
229// read entry from storage for given key 280// read entry from storage for given key
230func (s *FileStore) readEntry(key []byte) (entry *DHTEntry, err error) { 281func (s *DHTStore) readEntry(key []byte) (entry *DHTEntry, err error) {
231 // get path and filename from key 282 // get path and filename from key
232 folder, fname := s.expandPath(key) 283 folder, fname := s.expandPath(key)
233 284
@@ -255,7 +306,7 @@ func (s *FileStore) readEntry(key []byte) (entry *DHTEntry, err error) {
255} 306}
256 307
257// write entry to storage for given key 308// write entry to storage for given key
258func (s *FileStore) writeEntry(key []byte, entry *DHTEntry) (err error) { 309func (s *DHTStore) writeEntry(key []byte, entry *DHTEntry) (err error) {
259 // get folder and filename from key 310 // get folder and filename from key
260 folder, fname := s.expandPath(key) 311 folder, fname := s.expandPath(key)
261 // make sure the folder exists 312 // make sure the folder exists
@@ -287,14 +338,14 @@ func (s *FileStore) writeEntry(key []byte, entry *DHTEntry) (err error) {
287//---------------------------------------------------------------------- 338//----------------------------------------------------------------------
288 339
289// expandPath returns the full path to the file for given key. 340// expandPath returns the full path to the file for given key.
290func (s *FileStore) expandPath(key []byte) (string, string) { 341func (s *DHTStore) expandPath(key []byte) (string, string) {
291 h := hex.EncodeToString(key) 342 h := hex.EncodeToString(key)
292 return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:] 343 return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:]
293} 344}
294 345
295// Prune list of file headers so we drop at least n entries. 346// Prune list of file headers so we drop at least n entries.
296// returns number of removed entries. 347// returns number of removed entries.
297func (s *FileStore) prune(n int) (del int) { 348func (s *DHTStore) prune(n int) (del int) {
298 // collect obsolete records 349 // collect obsolete records
299 obsolete, err := s.meta.Obsolete(n) 350 obsolete, err := s.meta.Obsolete(n)
300 if err != nil { 351 if err != nil {
@@ -311,16 +362,17 @@ func (s *FileStore) prune(n int) (del int) {
311} 362}
312 363
313// drop file removes a file from metadatabase and the physical storage. 364// drop file removes a file from metadatabase and the physical storage.
314func (s *FileStore) dropFile(md *FileMetadata) (err error) { 365func (s *DHTStore) dropFile(md *FileMetadata) (err error) {
315 // adjust total size 366 // adjust total size
316 s.totalSize -= md.size 367 s.totalSize -= md.size
317 // remove from database 368 // remove from database
318 if err = s.meta.Drop(md.key, md.btype); err != nil { 369 if err = s.meta.Drop(md.key.Bits, md.btype); err != nil {
319 logger.Printf(logger.ERROR, "[store] can't remove metadata (%s,%d): %s", md.key, md.btype, err.Error()) 370 logger.Printf(logger.ERROR, "[store] can't remove metadata (%s,%d): %s", md.key, md.btype, err.Error())
320 return 371 return
321 } 372 }
322 // remove from filesystem 373 // remove from filesystem
323 path := fmt.Sprintf("%s/%s/%s/%s", s.path, md.key[:2], md.key[2:4], md.key[4:]) 374 h := hex.EncodeToString(md.key.Bits)
375 path := fmt.Sprintf("%s/%s/%s/%s", s.path, h[:2], h[2:4], h[4:])
324 if err = os.Remove(path); err != nil { 376 if err = os.Remove(path); err != nil {
325 logger.Printf(logger.ERROR, "[store] can't remove file %s: %s", path, err.Error()) 377 logger.Printf(logger.ERROR, "[store] can't remove file %s: %s", path, err.Error())
326 } 378 }
diff --git a/src/gnunet/service/store/store_fs_meta.go b/src/gnunet/service/store/store_dht_meta.go
index 5710286..9ebe387 100644
--- a/src/gnunet/service/store/store_fs_meta.go
+++ b/src/gnunet/service/store/store_dht_meta.go
@@ -21,7 +21,9 @@ package store
21import ( 21import (
22 "database/sql" 22 "database/sql"
23 _ "embed" 23 _ "embed"
24 "gnunet/crypto"
24 "gnunet/enums" 25 "gnunet/enums"
26 "gnunet/service/dht/blocks"
25 "gnunet/util" 27 "gnunet/util"
26 "os" 28 "os"
27) 29)
@@ -33,21 +35,30 @@ import (
33// FileMetadata holds information about a file (raw block data) 35// FileMetadata holds information about a file (raw block data)
34// and is stored in a SQL database for faster access. 36// and is stored in a SQL database for faster access.
35type FileMetadata struct { 37type FileMetadata struct {
36 key []byte // storage key 38 key *crypto.HashCode // storage key
37 size uint64 // size of file 39 size uint64 // size of file
38 btype enums.BlockType // block type 40 btype enums.BlockType // block type
41 bhash *crypto.HashCode // block hash
39 stored util.AbsoluteTime // time added to store 42 stored util.AbsoluteTime // time added to store
40 expires util.AbsoluteTime // expiration time 43 expires util.AbsoluteTime // expiration time
41 lastUsed util.AbsoluteTime // time last used 44 lastUsed util.AbsoluteTime // time last used
42 usedCount uint64 // usage count 45 usedCount uint64 // usage count
43} 46}
44 47
48// NewFileMetadata creates a new file metadata instance
49func NewFileMetadata() *FileMetadata {
50 return &FileMetadata{
51 key: crypto.NewHashCode(nil),
52 bhash: crypto.NewHashCode(nil),
53 }
54}
55
45//------------------------------------------------------------ 56//------------------------------------------------------------
46// Metadata database: A SQLite3 database to hold metadata about 57// Metadata database: A SQLite3 database to hold metadata about
47// blocks in file storage 58// blocks in file storage
48//------------------------------------------------------------ 59//------------------------------------------------------------
49 60
50//go:embed store_fs_meta.sql 61//go:embed store_dht_meta.sql
51var initScript []byte 62var initScript []byte
52 63
53// FileMetaDB is a SQLite3 database for block metadata 64// FileMetaDB is a SQLite3 database for block metadata
@@ -86,42 +97,67 @@ func OpenMetaDB(path string) (db *FileMetaDB, err error) {
86// Store metadata in database: creates or updates a record for the metadata 97// Store metadata in database: creates or updates a record for the metadata
87// in the database; primary key is the query key 98// in the database; primary key is the query key
88func (db *FileMetaDB) Store(md *FileMetadata) (err error) { 99func (db *FileMetaDB) Store(md *FileMetadata) (err error) {
89 sql := "replace into meta(qkey,btype,size,stored,expires,lastUsed,usedCount) values(?,?,?,?,?,?,?)" 100 sql := "replace into meta(qkey,btype,bhash,size,stored,expires,lastUsed,usedCount) values(?,?,?,?,?,?,?,?)"
90 _, err = db.conn.Exec(sql, md.key, md.btype, md.size, md.stored.Epoch(), md.expires.Epoch(), md.lastUsed.Epoch(), md.usedCount) 101 _, err = db.conn.Exec(sql,
102 md.key.Bits, md.btype, md.bhash.Bits, md.size, md.stored.Epoch(),
103 md.expires.Epoch(), md.lastUsed.Epoch(), md.usedCount)
91 return 104 return
92} 105}
93 106
94// Get block metadata from database 107// Get block metadata from database
95func (db *FileMetaDB) Get(key []byte, btype enums.BlockType) (md *FileMetadata, err error) { 108func (db *FileMetaDB) Get(query blocks.Query) (mds []*FileMetadata, err error) {
96 md = new(FileMetadata) 109 // select rows in database matching the query
97 md.key = util.Clone(key) 110 stmt := "select size,bhash,stored,expires,lastUsed,usedCount from meta where qkey=?"
98 md.btype = btype 111 btype := query.Type()
99 stmt := "select size,stored,expires,lastUsed,usedCount from meta where qkey=? and btype=?" 112 var rows *sql.Rows
100 row := db.conn.QueryRow(stmt, key, btype) 113 if btype == enums.BLOCK_TYPE_ANY {
101 var st, exp, lu uint64 114 rows, err = db.conn.Query(stmt, query.Key().Bits)
102 if err = row.Scan(&md.size, &st, &exp, &lu, &md.usedCount); err != nil {
103 if err == sql.ErrNoRows {
104 md = nil
105 err = nil
106 }
107 } else { 115 } else {
116 rows, err = db.conn.Query(stmt+" and btype=?", query.Key().Bits, btype)
117 }
118 if err != nil {
119 return
120 }
121 // process results
122 for rows.Next() {
123 md := NewFileMetadata()
124 md.key = query.Key()
125 md.btype = btype
126 var st, exp, lu uint64
127 if err = rows.Scan(&md.size, &md.bhash.Bits, &st, &exp, &lu, &md.usedCount); err != nil {
128 if err == sql.ErrNoRows {
129 md = nil
130 err = nil
131 }
132 return
133 }
108 md.stored.Val = st * 1000000 134 md.stored.Val = st * 1000000
109 md.expires.Val = exp * 1000000 135 md.expires.Val = exp * 1000000
110 md.lastUsed.Val = lu * 1000000 136 md.lastUsed.Val = lu * 1000000
137 mds = append(mds, md)
111 } 138 }
112 return 139 return
113} 140}
114 141
115// Drop metadata for block from database 142// Drop metadata for block from database
116func (db *FileMetaDB) Drop(key []byte, btype enums.BlockType) error { 143func (db *FileMetaDB) Drop(key []byte, btype enums.BlockType) (err error) {
117 _, err := db.conn.Exec("delete from meta where qkey=? and btype=?", key, btype) 144 if btype != enums.BLOCK_TYPE_ANY {
118 return err 145 _, err = db.conn.Exec("delete from meta where qkey=? and btype=?", key, btype)
146 } else {
147 _, err = db.conn.Exec("delete from meta where qkey=?", key)
148 }
149 return
119} 150}
120 151
121// Used a block from store: increment usage count and lastUsed time. 152// Used a block from store: increment usage count and lastUsed time.
122func (db *FileMetaDB) Used(key []byte, btype enums.BlockType) error { 153func (db *FileMetaDB) Used(key []byte, btype enums.BlockType) (err error) {
123 _, err := db.conn.Exec("update meta set usedCount=usedCount+1,lastUsed=unixepoch() where qkey=? and btype=?", key, btype) 154 stmt := "update meta set usedCount=usedCount+1,lastUsed=unixepoch() where qkey=?"
124 return err 155 if btype != enums.BLOCK_TYPE_ANY {
156 _, err = db.conn.Exec(stmt+" and btype=?", key, btype)
157 } else {
158 _, err = db.conn.Exec(stmt, key)
159 }
160 return
125} 161}
126 162
127// Obsolete collects records from the meta database that are considered 163// Obsolete collects records from the meta database that are considered
@@ -150,15 +186,15 @@ func (db *FileMetaDB) Obsolete(n int) (removable []*FileMetadata, err error) {
150 186
151// Traverse metadata records and call function on each record 187// Traverse metadata records and call function on each record
152func (db *FileMetaDB) Traverse(f func(*FileMetadata)) error { 188func (db *FileMetaDB) Traverse(f func(*FileMetadata)) error {
153 sql := "select qkey,btype,size,stored,expires,lastUsed,usedCount from meta" 189 sql := "select qkey,btype,bhash,size,stored,expires,lastUsed,usedCount from meta"
154 rows, err := db.conn.Query(sql) 190 rows, err := db.conn.Query(sql)
155 if err != nil { 191 if err != nil {
156 return err 192 return err
157 } 193 }
158 md := new(FileMetadata) 194 md := NewFileMetadata()
159 for rows.Next() { 195 for rows.Next() {
160 var st, exp, lu uint64 196 var st, exp, lu uint64
161 err = rows.Scan(&md.key, &md.btype, &md.size, &st, &exp, &lu, &md.usedCount) 197 err = rows.Scan(&md.key.Bits, &md.btype, &md.bhash.Bits, &md.size, &st, &exp, &lu, &md.usedCount)
162 if err != nil { 198 if err != nil {
163 return err 199 return err
164 } 200 }
diff --git a/src/gnunet/service/store/store_fs_meta.sql b/src/gnunet/service/store/store_dht_meta.sql
index a2692ab..ba9fe1f 100644
--- a/src/gnunet/service/store/store_fs_meta.sql
+++ b/src/gnunet/service/store/store_dht_meta.sql
@@ -19,6 +19,7 @@
19create table meta ( 19create table meta (
20 qkey blob, -- key (SHA512 hash) 20 qkey blob, -- key (SHA512 hash)
21 btype integer, -- block type 21 btype integer, -- block type
22 bhash blob, -- block hash
22 size integer, -- size of file 23 size integer, -- size of file
23 stored integer, -- time added to store 24 stored integer, -- time added to store
24 expires integer, -- expiration time 25 expires integer, -- expiration time
diff --git a/src/gnunet/service/store/dhtstore_test.go b/src/gnunet/service/store/store_dht_test.go
index ba5fae2..9f80380 100644
--- a/src/gnunet/service/store/dhtstore_test.go
+++ b/src/gnunet/service/store/store_dht_test.go
@@ -52,21 +52,27 @@ func TestDHTFilesStore(t *testing.T) {
52 52
53 // create file store 53 // create file store
54 if _, err := os.Stat(path); err != nil { 54 if _, err := os.Stat(path); err != nil {
55 os.MkdirAll(path, 0755) 55 if err = os.MkdirAll(path, 0755); err != nil {
56 t.Fatal(err)
57 }
56 } 58 }
57 fs, err := NewFileStore(cfg) 59 fs, err := NewDHTStore(cfg)
58 if err != nil { 60 if err != nil {
59 t.Fatal(err) 61 t.Fatal(err)
60 } 62 }
61 // allocate keys 63 // allocate keys
62 keys := make([]blocks.Query, 0, fsNumBlocks) 64 keys := make([]blocks.Query, 0, fsNumBlocks)
65 // create result filter
66 rf := blocks.NewGenericResultFilter()
63 67
64 // First round: save blocks 68 // First round: save blocks
65 for i := 0; i < fsNumBlocks; i++ { 69 for i := 0; i < fsNumBlocks; i++ {
66 // generate random block 70 // generate random block
67 size := 1024 + rand.Intn(62000) 71 size := 1024 + rand.Intn(62000) //nolint:gosec // good enough for testing
68 buf := make([]byte, size) 72 buf := make([]byte, size)
69 rand.Read(buf) 73 if _, err = rand.Read(buf); err != nil { //nolint:gosec // good enough for testing
74 t.Fatal(err)
75 }
70 blk := blocks.NewGenericBlock(buf) 76 blk := blocks.NewGenericBlock(buf)
71 // generate associated key 77 // generate associated key
72 k := crypto.Hash(buf) 78 k := crypto.Hash(buf)
@@ -86,11 +92,14 @@ func TestDHTFilesStore(t *testing.T) {
86 // Second round: retrieve blocks and check 92 // Second round: retrieve blocks and check
87 for i, key := range keys { 93 for i, key := range keys {
88 // get block 94 // get block
89 val, err := fs.Get(key) 95 vals, err := fs.Get("test", key, rf)
90 if err != nil { 96 if err != nil {
91 t.Fatalf("[%d] %s", i, err) 97 t.Fatalf("[%d] %s", i, err)
92 } 98 }
93 buf := val.Blk.Bytes() 99 if len(vals) != 1 {
100 t.Fatalf("[%d] only one result expected", i)
101 }
102 buf := vals[0].Blk.Bytes()
94 103
95 // re-create key 104 // re-create key
96 k := crypto.Hash(buf) 105 k := crypto.Hash(buf)
diff --git a/src/gnunet/service/store/store.go b/src/gnunet/service/store/store_kv.go
index ed55547..8bcb711 100644
--- a/src/gnunet/service/store/store.go
+++ b/src/gnunet/service/store/store_kv.go
@@ -24,8 +24,6 @@ import (
24 _ "embed" // use embedded filesystem 24 _ "embed" // use embedded filesystem
25 "errors" 25 "errors"
26 "fmt" 26 "fmt"
27 "gnunet/service/dht/blocks"
28 "gnunet/service/dht/path"
29 "gnunet/util" 27 "gnunet/util"
30 28
31 redis "github.com/go-redis/redis/v8" 29 redis "github.com/go-redis/redis/v8"
@@ -41,68 +39,24 @@ var (
41) 39)
42 40
43//------------------------------------------------------------ 41//------------------------------------------------------------
44// Generic storage interface. Can be used for persistent or
45// transient (caching) storage of key/value data.
46//------------------------------------------------------------
47 42
48// Store is a key/value storage where the type of the key is either 43// KVStore us a eneric key/value storage interface. Can be used for
49// a SHA512 hash value or a string and the value is either a DHT 44// persistent or transient (caching) storage of stringed key/value data.
50// block or a string. It is possiblle to mix any key/value types, 45type KVStore interface {
51// but not used in this implementation.
52type Store[K, V any] interface {
53 // Put value into storage under given key 46 // Put value into storage under given key
54 Put(key K, val V) error 47 Put(key string, val string) error
55 48
56 // Get value with given key from storage 49 // Get value with given key from storage
57 Get(key K) (V, error) 50 Get(key string) (string, error)
58
59 // GetApprox returns the best-matching value with given key from storage
60 // that is not excluded.
61 GetApprox(key K, excl func(V) bool) (V, any, error)
62 51
63 // List all store keys 52 // List all store keys
64 List() ([]K, error) 53 List() ([]string, error)
65 54
66 // Close store 55 // Close store
67 Close() error 56 Close() error
68} 57}
69 58
70//------------------------------------------------------------ 59//------------------------------------------------------------
71// Types for custom store requirements
72//------------------------------------------------------------
73
74// DHTEntry to be stored/retrieved
75type DHTEntry struct {
76 Blk blocks.Block
77 Path *path.Path
78}
79
80// DHTStore for DHT queries and blocks
81type DHTStore Store[blocks.Query, *DHTEntry]
82
83// KVStore for key/value string pairs
84type KVStore Store[string, string]
85
86//------------------------------------------------------------
87// NewDHTStore creates a new storage handler with given spec
88// for use with DHT queries and blocks
89func NewDHTStore(spec util.ParameterSet) (DHTStore, error) {
90 // get the mode parameter
91 mode, ok := util.GetParam[string](spec, "mode")
92 if !ok {
93 return nil, ErrStoreInvalidSpec
94 }
95 switch mode {
96 //------------------------------------------------------------------
97 // File-base storage
98 //------------------------------------------------------------------
99 case "file":
100 return NewFileStore(spec)
101 }
102 return nil, ErrStoreUnknown
103}
104
105//------------------------------------------------------------
106// NewKVStore creates a new storage handler with given spec 60// NewKVStore creates a new storage handler with given spec
107// for use with key/value string pairs. 61// for use with key/value string pairs.
108func NewKVStore(spec util.ParameterSet) (KVStore, error) { 62func NewKVStore(spec util.ParameterSet) (KVStore, error) {
@@ -178,11 +132,6 @@ func (s *RedisStore) Get(key string) (value string, err error) {
178 return s.client.Get(context.TODO(), key).Result() 132 return s.client.Get(context.TODO(), key).Result()
179} 133}
180 134
181// GetApprox returns the best-matching value for given key from storage
182func (s *RedisStore) GetApprox(key string, crit func(string) bool) (value string, vkey any, err error) {
183 return "", "", ErrStoreNoApprox
184}
185
186// List all keys in store 135// List all keys in store
187func (s *RedisStore) List() (keys []string, err error) { 136func (s *RedisStore) List() (keys []string, err error) {
188 var ( 137 var (
@@ -255,11 +204,6 @@ func (s *SQLStore) Get(key string) (value string, err error) {
255 return 204 return
256} 205}
257 206
258// GetApprox returns the best-matching value for given key from storage
259func (s *SQLStore) GetApprox(key string, crit func(string) bool) (value string, vkey any, err error) {
260 return "", "", ErrStoreNoApprox
261}
262
263// List all keys in store 207// List all keys in store
264func (s *SQLStore) List() (keys []string, err error) { 208func (s *SQLStore) List() (keys []string, err error) {
265 var ( 209 var (
diff --git a/src/gnunet/transport/responder.go b/src/gnunet/transport/responder.go
index 7032c78..be958bf 100644
--- a/src/gnunet/transport/responder.go
+++ b/src/gnunet/transport/responder.go
@@ -33,8 +33,9 @@ type Responder interface {
33 // Handle outgoing message 33 // Handle outgoing message
34 Send(ctx context.Context, msg message.Message) error 34 Send(ctx context.Context, msg message.Message) error
35 35
36 // Receiver returns the receiving peer (string representation) 36 // Receiver returns the receiving peer. Returns nil if
37 Receiver() string 37 // this is a local responder (service.Connection)
38 Receiver() *util.PeerID
38} 39}
39 40
40//---------------------------------------------------------------------- 41//----------------------------------------------------------------------
@@ -55,6 +56,6 @@ func (r *TransportResponder) Send(ctx context.Context, msg message.Message) erro
55} 56}
56 57
57// Receiver returns the receiving peer id 58// Receiver returns the receiving peer id
58func (r *TransportResponder) Receiver() string { 59func (r *TransportResponder) Receiver() *util.PeerID {
59 return r.Peer.String() 60 return r.Peer
60} 61}
diff --git a/src/gnunet/util/map.go b/src/gnunet/util/map.go
index ab48089..2237188 100644
--- a/src/gnunet/util/map.go
+++ b/src/gnunet/util/map.go
@@ -152,7 +152,7 @@ func (m *Map[K, V]) GetRandom(pid int) (key K, value V, ok bool) {
152 152
153 ok = false 153 ok = false
154 if size := m.Size(); size > 0 { 154 if size := m.Size(); size > 0 {
155 idx := rand.Intn(size) 155 idx := rand.Intn(size) //nolint:gosec // good enough for selection
156 for key, value = range m.list { 156 for key, value = range m.list {
157 if idx == 0 { 157 if idx == 0 {
158 ok = true 158 ok = true
diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go
index 562bdaa..a157ec4 100644
--- a/src/gnunet/util/misc.go
+++ b/src/gnunet/util/misc.go
@@ -24,6 +24,7 @@ import (
24 "strings" 24 "strings"
25 25
26 "github.com/bfix/gospel/data" 26 "github.com/bfix/gospel/data"
27 "github.com/bfix/gospel/math"
27) 28)
28 29
29//---------------------------------------------------------------------- 30//----------------------------------------------------------------------
@@ -76,6 +77,16 @@ func GetParam[V any](params ParameterSet, key string) (i V, ok bool) {
76// additional helpers 77// additional helpers
77//---------------------------------------------------------------------- 78//----------------------------------------------------------------------
78 79
80// Distance returns the XOR distance between to byte arrays
81func Distance(a, b []byte) *math.Int {
82 size := len(a)
83 d := make([]byte, size)
84 for i := range d {
85 d[i] = a[i] ^ b[i]
86 }
87 return math.NewIntFromBytes(d)
88}
89
79// StripPathRight returns a dot-separated path without 90// StripPathRight returns a dot-separated path without
80// its last (right-most) element. 91// its last (right-most) element.
81func StripPathRight(s string) string { 92func StripPathRight(s string) string {
diff --git a/src/gnunet/util/peer.go b/src/gnunet/util/peer.go
index 62ae2e2..bcddad6 100644
--- a/src/gnunet/util/peer.go
+++ b/src/gnunet/util/peer.go
@@ -38,7 +38,7 @@ func NewPeerPublicKey(data []byte) *PeerPublicKey {
38 pk := new(PeerPublicKey) 38 pk := new(PeerPublicKey)
39 size := pk.Size() 39 size := pk.Size()
40 v := make([]byte, size) 40 v := make([]byte, size)
41 if data != nil && len(data) > 0 { 41 if len(data) > 0 {
42 if uint(len(data)) < size { 42 if uint(len(data)) < size {
43 CopyAlignedBlock(v, data) 43 CopyAlignedBlock(v, data)
44 } else { 44 } else {
@@ -68,7 +68,7 @@ func (pk *PeerPublicKey) Verify(data []byte, sig *PeerSignature) (bool, error) {
68// Peer identifier: 68// Peer identifier:
69//---------------------------------------------------------------------- 69//----------------------------------------------------------------------
70 70
71// PeerID is a wrpped PeerPublicKey 71// PeerID is a wrapped PeerPublicKey
72type PeerID struct { 72type PeerID struct {
73 PeerPublicKey 73 PeerPublicKey
74} 74}
@@ -109,7 +109,7 @@ func NewPeerSignature(data []byte) *PeerSignature {
109 s := new(PeerSignature) 109 s := new(PeerSignature)
110 size := s.Size() 110 size := s.Size()
111 v := make([]byte, size) 111 v := make([]byte, size)
112 if data != nil && len(data) > 0 { 112 if len(data) > 0 {
113 if uint(len(data)) < size { 113 if uint(len(data)) < size {
114 CopyAlignedBlock(v, data) 114 CopyAlignedBlock(v, data)
115 } else { 115 } else {