aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernd Fix <brf@hoi-polloi.org>2022-08-24 17:27:59 +0200
committerBernd Fix <brf@hoi-polloi.org>2022-08-24 17:27:59 +0200
commit3cee953814e10b8e8bca10164e7f25e93f4a6d3f (patch)
tree83d57d9ee618881fcf77c0865f9d5c407e91307e
parent21d7292dbd062ff11194fdc235a3d54830d7ba57 (diff)
downloadgnunet-go-3cee953814e10b8e8bca10164e7f25e93f4a6d3f.tar.gz
gnunet-go-3cee953814e10b8e8bca10164e7f25e93f4a6d3f.zip
Integration tests: more bug fixes.v0.1.33
-rw-r--r--src/gnunet/crypto/signature.go7
-rw-r--r--src/gnunet/service/dht/blocks/default.go162
-rw-r--r--src/gnunet/service/dht/blocks/filters.go96
-rw-r--r--src/gnunet/service/dht/blocks/generic.go31
-rw-r--r--src/gnunet/service/dht/blocks/gns.go4
-rw-r--r--src/gnunet/service/dht/blocks/handlers.go1
-rw-r--r--src/gnunet/service/dht/blocks/hello.go96
-rw-r--r--src/gnunet/service/dht/messages.go70
-rw-r--r--src/gnunet/service/dht/module.go10
-rw-r--r--src/gnunet/service/dht/path/handling.go13
-rw-r--r--src/gnunet/service/dht/resulthandler.go22
-rw-r--r--src/gnunet/service/dht/routingtable.go26
-rw-r--r--src/gnunet/service/namecache/module.go3
-rw-r--r--src/gnunet/service/store/store_dht.go121
-rw-r--r--src/gnunet/service/store/store_dht_test.go2
-rw-r--r--src/gnunet/transport/reader_writer.go16
-rw-r--r--src/gnunet/util/peer.go3
17 files changed, 466 insertions, 217 deletions
diff --git a/src/gnunet/crypto/signature.go b/src/gnunet/crypto/signature.go
index 1325f18..9f6bee1 100644
--- a/src/gnunet/crypto/signature.go
+++ b/src/gnunet/crypto/signature.go
@@ -29,7 +29,7 @@ type SignaturePurpose struct {
29 Purpose enums.SigPurpose `order:"big"` // Signature purpose 29 Purpose enums.SigPurpose `order:"big"` // Signature purpose
30} 30}
31 31
32// Signable interface for objects that can get signed by peer 32// Signable interface for objects that can get signed by a Signer
33type Signable interface { 33type Signable interface {
34 // SignedData returns the byte array to be signed 34 // SignedData returns the byte array to be signed
35 SignedData() []byte 35 SignedData() []byte
@@ -37,3 +37,8 @@ type Signable interface {
37 // SetSignature returns the signature to the signable object 37 // SetSignature returns the signature to the signable object
38 SetSignature(*util.PeerSignature) error 38 SetSignature(*util.PeerSignature) error
39} 39}
40
41// Signer instance for creating signatures
42type Signer interface {
43 Sign(Signable) error
44}
diff --git a/src/gnunet/service/dht/blocks/default.go b/src/gnunet/service/dht/blocks/default.go
new file mode 100644
index 0000000..9d09572
--- /dev/null
+++ b/src/gnunet/service/dht/blocks/default.go
@@ -0,0 +1,162 @@
1// This file is part of gnunet-go, a GNUnet-implementation in Golang.
2// Copyright (C) 2019-2022 Bernd Fix >Y<
3//
4// gnunet-go is free software: you can redistribute it and/or modify it
5// under the terms of the GNU Affero General Public License as published
6// by the Free Software Foundation, either version 3 of the License,
7// or (at your option) any later version.
8//
9// gnunet-go is distributed in the hope that it will be useful, but
10// WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12// Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program. If not, see <http://www.gnu.org/licenses/>.
16//
17// SPDX-License-Identifier: AGPL3.0-or-later
18
19package blocks
20
21import (
22 "fmt"
23 "gnunet/crypto"
24 "gnunet/enums"
25 "gnunet/util"
26)
27
28//----------------------------------------------------------------------
29// TEST block
30//----------------------------------------------------------------------
31
32// TestBlock (BLOCK_TYPE_TEST) is a block for testing the DHT with non-HELLO
33// blocks. Applications using the DHT are encouraged to define custom blocks
34// with appropriate internal logic. TestBlocks are just a pile of bits that
35// never expire...
36type TestBlock struct {
37 expire util.AbsoluteTime `` // expiry (transient!)
38 Data []byte `size:"*"` // block data
39}
40
41// NewTestBlock creates a new empty test block
42func NewTestBlock() Block {
43 return &TestBlock{
44 expire: util.AbsoluteTimeNever(),
45 Data: nil,
46 }
47}
48
49// Prepare a block to be of given type and expiration.
50// Use expiration date for test block.
51func (t *TestBlock) Prepare(_ enums.BlockType, expire util.AbsoluteTime) {
52 t.expire = expire
53}
54
55// Return the block type
56func (t *TestBlock) Type() enums.BlockType {
57 return enums.BLOCK_TYPE_TEST
58}
59
60// Bytes returns the raw block data
61func (t *TestBlock) Bytes() []byte {
62 return util.Clone(t.Data)
63}
64
65// Expire returns the block expiration
66func (t *TestBlock) Expire() util.AbsoluteTime {
67 return t.expire
68}
69
70// String returns the human-readable representation of a block
71func (t *TestBlock) String() string {
72 return fmt.Sprintf("TestBlock{%d bytes}", len(t.Data))
73}
74
75// Verify the integrity of a block (optional). Override in custom query
76// types to implement block-specific integrity checks (see GNSBlock for
77// example). This verification is usually weaker than the verification
78// method from a Query (see GNSBlock.Verify for explanation).
79func (t *TestBlock) Verify() (bool, error) {
80 // no internal verification defined. All good.
81 return true, nil
82}
83
84//----------------------------------------------------------------------
85// TEST block handler
86//----------------------------------------------------------------------
87
88// TestBlockHandler methods related to HELLO blocks
89type TestBlockHandler struct{}
90
91// Parse a block instance from binary data
92func (bh *TestBlockHandler) ParseBlock(buf []byte) (Block, error) {
93 return &TestBlock{
94 Data: util.Clone(buf),
95 }, nil
96}
97
98// ValidateHelloBlockQuery validates query parameters for a
99// DHT-GET request for HELLO blocks.
100func (bh *TestBlockHandler) ValidateBlockQuery(key *crypto.HashCode, xquery []byte) bool {
101 // no internal logic
102 return true
103}
104
105// ValidateBlockKey returns true if the block key is the same as the
106// query key used to access the block.
107func (bh *TestBlockHandler) ValidateBlockKey(b Block, key *crypto.HashCode) bool {
108 // no internal logic
109 return true
110}
111
112// DeriveBlockKey is used to synthesize the block key from the block
113// payload as part of PutMessage and ResultMessage processing. The special
114// return value of 'nil' implies that this block type does not permit
115// deriving the key from the block. A Key may be returned for a block that
116// is ill-formed.
117func (bh *TestBlockHandler) DeriveBlockKey(b Block) *crypto.HashCode {
118 return nil
119}
120
121// ValidateBlockStoreRequest is used to evaluate a block payload as part of
122// PutMessage and ResultMessage processing.
123// To validate a block store request is to verify the EdDSA SIGNATURE over
124// the hashed ADDRESSES against the public key from the peer ID field. If the
125// signature is valid true is returned.
126func (bh *TestBlockHandler) ValidateBlockStoreRequest(b Block) bool {
127 // no internal logic
128 return true
129}
130
131// SetupResultFilter is used to setup an empty result filter. The arguments
132// are the set of results that must be filtered at the initiator, and a
133// MUTATOR value which MAY be used to deterministically re-randomize
134// probabilistic data structures.
135func (bh *TestBlockHandler) SetupResultFilter(filterSize int, mutator uint32) ResultFilter {
136 return NewGenericResultFilter(filterSize, mutator)
137}
138
139// ParseResultFilter from binary data
140func (bh *TestBlockHandler) ParseResultFilter(data []byte) ResultFilter {
141 return NewGenericResultFilterFromBytes(data)
142}
143
144// FilterResult is used to filter results against specific queries. This
145// function does not check the validity of the block itself or that it
146// matches the given key, as this must have been checked earlier. Thus,
147// locally stored blocks from previously observed ResultMessages and
148// PutMessages use this function to perform filtering based on the request
149// parameters of a particular GET operation. Possible values for the
150// FilterEvaluationResult are defined above. If the main evaluation result
151// is RF_MORE, the function also returns and updated result filter where
152// the block is added to the set of filtered replies. An implementation is
153// not expected to actually differentiate between the RF_DUPLICATE and
154// RF_IRRELEVANT return values: in both cases the block is ignored for
155// this query.
156func (bh *TestBlockHandler) FilterResult(b Block, key *crypto.HashCode, rf ResultFilter, xQuery []byte) int {
157 if rf.Contains(b) {
158 return RF_DUPLICATE
159 }
160 rf.Add(b)
161 return RF_LAST
162}
diff --git a/src/gnunet/service/dht/blocks/filters.go b/src/gnunet/service/dht/blocks/filters.go
index a807f1c..e7d961f 100644
--- a/src/gnunet/service/dht/blocks/filters.go
+++ b/src/gnunet/service/dht/blocks/filters.go
@@ -125,81 +125,87 @@ type ResultFilter interface {
125} 125}
126 126
127//---------------------------------------------------------------------- 127//----------------------------------------------------------------------
128// Generic result filter: 128// Generic result filter
129// Filter duplicate blocks (identical hash value over content)
130//---------------------------------------------------------------------- 129//----------------------------------------------------------------------
131 130
132// GenericResultFilter is a dummy result filter with no state. 131// GenericResultFilter is the default resultfilter implementation for
132// DHT blocks. It is used by the two predefined block types (BLOCK_TYPE_TEST
133// and BLOCK_TYPE_DHT_URL_HELLO) and can serve custom blocks as well if
134// no custom result filter is required.
133type GenericResultFilter struct { 135type GenericResultFilter struct {
134 bf *BloomFilter 136 bf *BloomFilter
135} 137}
136 138
137// NewGenericResultFilter creates a new empty result bloom filter 139// NewGenericResultFilter initializes an empty result filter
138func NewGenericResultFilter() *GenericResultFilter { 140func NewGenericResultFilter(filterSize int, mutator uint32) *GenericResultFilter {
139 return &GenericResultFilter{ 141 // HELLO result filters are BloomFilters with a mutator
140 bf: NewBloomFilter(128), 142 rf := new(GenericResultFilter)
143 rf.bf = NewBloomFilter(filterSize)
144 rf.bf.SetMutator(mutator)
145 return rf
146}
147
148// NewGenericResultFilterFromBytes creates a new result filter from a binary
149// representation: 'data' is the concatenaion 'mutator|bloomfilter'.
150// If 'withMutator' is false, no mutator is used.
151func NewGenericResultFilterFromBytes(data []byte) *GenericResultFilter {
152 //logger.Printf(logger.DBG, "[filter] FromBytes = %d:%s (mutator: %v)",len(data), hex.EncodeToString(data), withMutator)
153
154 // handle mutator input
155 mSize := 4
156 rf := new(GenericResultFilter)
157 rf.bf = &BloomFilter{
158 Bits: util.Clone(data[mSize:]),
141 } 159 }
160 if mSize > 0 {
161 rf.bf.SetMutator(data[:mSize])
162 }
163 return rf
142} 164}
143 165
144// Add a block to the result filter. 166// Add a HELLO block to th result filter
145func (rf *GenericResultFilter) Add(b Block) { 167func (rf *GenericResultFilter) Add(b Block) {
146 bh := crypto.Hash(b.Bytes()) 168 if hb, ok := b.(*HelloBlock); ok {
147 rf.bf.Add(bh.Data) 169 hAddr := sha512.Sum512(hb.AddrBin)
170 rf.bf.Add(hAddr[:])
171 }
148} 172}
149 173
150// Contains returns true if a block is filtered 174// Contains checks if a block is contained in the result filter
151func (rf *GenericResultFilter) Contains(b Block) bool { 175func (rf *GenericResultFilter) Contains(b Block) bool {
152 bh := crypto.Hash(b.Bytes()) 176 if hb, ok := b.(*HelloBlock); ok {
153 return rf.bf.Contains(bh.Data) 177 hAddr := sha512.Sum512(hb.AddrBin)
178 return rf.bf.Contains(hAddr[:])
179 }
180 return false
154} 181}
155 182
156// ContainsHash returns true if a block hash is filtered 183// ContainsHash checks if a block hash is contained in the result filter
157func (rf *GenericResultFilter) ContainsHash(bh *crypto.HashCode) bool { 184func (rf *GenericResultFilter) ContainsHash(bh *crypto.HashCode) bool {
158 return rf.bf.Contains(bh.Data) 185 return rf.bf.Contains(bh.Data)
159} 186}
160 187
161// Bytes returns the binary representation of a result filter 188// Bytes returns a binary representation of a HELLO result filter
162func (rf *GenericResultFilter) Bytes() (buf []byte) { 189func (rf *GenericResultFilter) Bytes() []byte {
163 return rf.bf.Bytes() 190 return rf.bf.Bytes()
164} 191}
165 192
166// Merge two result filters 193// Compare two HELLO result filters
167func (rf *GenericResultFilter) Merge(t ResultFilter) bool { 194func (rf *GenericResultFilter) Compare(t ResultFilter) int {
168 // check for correct type
169 trf, ok := t.(*GenericResultFilter) 195 trf, ok := t.(*GenericResultFilter)
170 if !ok { 196 if !ok {
171 return false 197 return CMP_DIFFER
172 }
173 // check for identical mutator (if any)
174 if !bytes.Equal(rf.bf.mInput, trf.bf.mInput) {
175 return false
176 }
177 // check for same size
178 if len(rf.bf.Bits) != len(trf.bf.Bits) {
179 return false
180 }
181 // merge bloomfilters
182 for i := range rf.bf.Bits {
183 rf.bf.Bits[i] ^= trf.bf.Bits[i]
184 } 198 }
185 return true 199 return rf.bf.Compare(trf.bf)
186} 200}
187 201
188// Compare two result filters 202// Merge two HELLO result filters
189func (rf *GenericResultFilter) Compare(t ResultFilter) int { 203func (rf *GenericResultFilter) Merge(t ResultFilter) bool {
190 trf, ok := t.(*GenericResultFilter) 204 trf, ok := t.(*GenericResultFilter)
191 if !ok { 205 if !ok {
192 return CMP_DIFFER 206 return false
193 }
194 // check for identical mutator (if any)
195 if !bytes.Equal(rf.bf.mInput, trf.bf.mInput) {
196 return CMP_DIFFER
197 }
198 // check for identical bits
199 if bytes.Equal(rf.bf.Bits, trf.bf.Bits) {
200 return CMP_SAME
201 } 207 }
202 return CMP_MERGE 208 return rf.bf.Merge(trf.bf)
203} 209}
204 210
205//====================================================================== 211//======================================================================
diff --git a/src/gnunet/service/dht/blocks/generic.go b/src/gnunet/service/dht/blocks/generic.go
index 5741d57..374bfc6 100644
--- a/src/gnunet/service/dht/blocks/generic.go
+++ b/src/gnunet/service/dht/blocks/generic.go
@@ -80,6 +80,10 @@ type Block interface {
80 80
81 // String returns the human-readable representation of a block 81 // String returns the human-readable representation of a block
82 String() string 82 String() string
83
84 // Prepare a block to be of given type and expiration. Block types
85 // decide if and which information to change/set in the block instance.
86 Prepare(enums.BlockType, util.AbsoluteTime)
83} 87}
84 88
85// Unwrap (raw) block to a specific block type 89// Unwrap (raw) block to a specific block type
@@ -173,6 +177,12 @@ func NewGenericBlock(btype enums.BlockType, expire util.AbsoluteTime, blk []byte
173 } 177 }
174} 178}
175 179
180// Prepare a block to be of given type and expiration.
181func (b *GenericBlock) Prepare(btype enums.BlockType, expire util.AbsoluteTime) {
182 b.BType = btype
183 b.Expire_ = expire
184}
185
176// Bytes returns the DHT block data (unstructured without type and 186// Bytes returns the DHT block data (unstructured without type and
177// expiration information. 187// expiration information.
178func (b *GenericBlock) Bytes() []byte { 188func (b *GenericBlock) Bytes() []byte {
@@ -184,9 +194,9 @@ func (b *GenericBlock) Type() enums.BlockType {
184 return b.BType 194 return b.BType
185} 195}
186 196
187// Expire returns the block expiration (never for custom blocks) 197// Expire returns the block expiration
188func (b *GenericBlock) Expire() util.AbsoluteTime { 198func (b *GenericBlock) Expire() util.AbsoluteTime {
189 return util.AbsoluteTimeNever() 199 return b.Expire_
190} 200}
191 201
192// Verify the integrity of a block (optional). Override in custom query 202// Verify the integrity of a block (optional). Override in custom query
@@ -199,7 +209,7 @@ func (b *GenericBlock) Verify() (bool, error) {
199 209
200// String returns the human-readable representation of a block 210// String returns the human-readable representation of a block
201func (b *GenericBlock) String() string { 211func (b *GenericBlock) String() string {
202 return fmt.Sprintf("Block{type=%s,expire=%s,data=[%d]", b.BType, b.Expire_, len(b.Data)) 212 return fmt.Sprintf("Block{type=%s,expire=%s,data=[%d]}", b.BType, b.Expire_, len(b.Data))
203} 213}
204 214
205//---------------------------------------------------------------------- 215//----------------------------------------------------------------------
@@ -211,16 +221,19 @@ var (
211 blkFactory = map[enums.BlockType]func() Block{ 221 blkFactory = map[enums.BlockType]func() Block{
212 enums.BLOCK_TYPE_GNS_NAMERECORD: NewGNSBlock, 222 enums.BLOCK_TYPE_GNS_NAMERECORD: NewGNSBlock,
213 enums.BLOCK_TYPE_DHT_URL_HELLO: NewHelloBlock, 223 enums.BLOCK_TYPE_DHT_URL_HELLO: NewHelloBlock,
224 enums.BLOCK_TYPE_TEST: NewTestBlock,
214 } 225 }
215) 226)
216 227
217// NewGenericBlock creates a Block from binary data. 228// NewGenericBlock creates a Block from binary data.
218func NewBlock(btype enums.BlockType, expires util.AbsoluteTime, blk []byte) (b Block, err error) { 229func NewBlock(btype enums.BlockType, expire util.AbsoluteTime, blk []byte) (b Block, err error) {
219 fac, ok := blkFactory[btype] 230 if fac, ok := blkFactory[btype]; ok {
220 if !ok { 231 b = fac()
221 return NewGenericBlock(btype, expires, blk), nil 232 if err = data.Unmarshal(b, blk); err == nil {
233 b.Prepare(btype, expire)
234 }
235 } else {
236 b, err = NewGenericBlock(btype, expire, blk), nil
222 } 237 }
223 b = fac()
224 err = data.Unmarshal(b, blk)
225 return 238 return
226} 239}
diff --git a/src/gnunet/service/dht/blocks/gns.go b/src/gnunet/service/dht/blocks/gns.go
index ec2cb71..6b41b0b 100644
--- a/src/gnunet/service/dht/blocks/gns.go
+++ b/src/gnunet/service/dht/blocks/gns.go
@@ -177,6 +177,10 @@ func NewGNSBlock() Block {
177 } 177 }
178} 178}
179 179
180// Prepare a block to be of given type and expiration.
181// Not required for GNS blocks
182func (b *GNSBlock) Prepare(enums.BlockType, util.AbsoluteTime) {}
183
180// Verify the integrity of the block data from a signature. 184// Verify the integrity of the block data from a signature.
181// Only the cryptographic signature is verified; the formal correctness of 185// Only the cryptographic signature is verified; the formal correctness of
182// the association between the block and a GNS label in a GNS zone can't 186// the association between the block and a GNS label in a GNS zone can't
diff --git a/src/gnunet/service/dht/blocks/handlers.go b/src/gnunet/service/dht/blocks/handlers.go
index c166504..d8db6ae 100644
--- a/src/gnunet/service/dht/blocks/handlers.go
+++ b/src/gnunet/service/dht/blocks/handlers.go
@@ -84,4 +84,5 @@ func init() {
84 84
85 // add validation functions 85 // add validation functions
86 BlockHandlers[enums.BLOCK_TYPE_DHT_URL_HELLO] = new(HelloBlockHandler) 86 BlockHandlers[enums.BLOCK_TYPE_DHT_URL_HELLO] = new(HelloBlockHandler)
87 BlockHandlers[enums.BLOCK_TYPE_TEST] = new(TestBlockHandler)
87} 88}
diff --git a/src/gnunet/service/dht/blocks/hello.go b/src/gnunet/service/dht/blocks/hello.go
index 8e2cd77..5b14aa1 100644
--- a/src/gnunet/service/dht/blocks/hello.go
+++ b/src/gnunet/service/dht/blocks/hello.go
@@ -20,7 +20,6 @@ package blocks
20 20
21import ( 21import (
22 "bytes" 22 "bytes"
23 "crypto/sha512"
24 "errors" 23 "errors"
25 "fmt" 24 "fmt"
26 "gnunet/crypto" 25 "gnunet/crypto"
@@ -192,6 +191,10 @@ func ParseHelloBlockFromBytes(buf []byte) (h *HelloBlock, err error) {
192 return 191 return
193} 192}
194 193
194// Prepare a block to be of given type and expiration.
195// Not required for HELLO blocks
196func (h *HelloBlock) Prepare(enums.BlockType, util.AbsoluteTime) {}
197
195// finalize block data (generate dependent fields) 198// finalize block data (generate dependent fields)
196func (h *HelloBlock) finalize() (err error) { 199func (h *HelloBlock) finalize() (err error) {
197 if h.addrs == nil { 200 if h.addrs == nil {
@@ -199,15 +202,19 @@ func (h *HelloBlock) finalize() (err error) {
199 pos := 0 202 pos := 0
200 h.addrs = make([]*util.Address, 0) 203 h.addrs = make([]*util.Address, 0)
201 for { 204 for {
205 // reconstruct address string
202 var as string 206 var as string
203 as, pos = util.ReadCString(h.AddrBin, pos) 207 as, pos = util.ReadCString(h.AddrBin, pos)
204 if pos == -1 { 208 if pos == -1 {
205 break 209 break
206 } 210 }
211 // convert to target address type
207 var addr *util.Address 212 var addr *util.Address
208 if addr, err = util.ParseAddress(as); err != nil { 213 if addr, err = util.ParseAddress(as); err != nil {
209 return 214 return
210 } 215 }
216 addr.Expire = h.Expire_
217 // append to list
211 h.addrs = append(h.addrs, addr) 218 h.addrs = append(h.addrs, addr)
212 } 219 }
213 } else if h.AddrBin == nil { 220 } else if h.AddrBin == nil {
@@ -245,7 +252,7 @@ func (h *HelloBlock) Expire() util.AbsoluteTime {
245// String returns the human-readable representation of a block 252// String returns the human-readable representation of a block
246func (h *HelloBlock) String() string { 253func (h *HelloBlock) String() string {
247 return fmt.Sprintf("HelloBlock{peer=%s,expires=%s,addrs=[%d]}", 254 return fmt.Sprintf("HelloBlock{peer=%s,expires=%s,addrs=[%d]}",
248 h.PeerID, h.Expire_, len(h.Addresses())) 255 h.PeerID.Short(), h.Expire_, len(h.Addresses()))
249} 256}
250 257
251// URL returns the HELLO URL for the data. 258// URL returns the HELLO URL for the data.
@@ -405,12 +412,12 @@ func (bh *HelloBlockHandler) ValidateBlockStoreRequest(b Block) bool {
405// MUTATOR value which MAY be used to deterministically re-randomize 412// MUTATOR value which MAY be used to deterministically re-randomize
406// probabilistic data structures. 413// probabilistic data structures.
407func (bh *HelloBlockHandler) SetupResultFilter(filterSize int, mutator uint32) ResultFilter { 414func (bh *HelloBlockHandler) SetupResultFilter(filterSize int, mutator uint32) ResultFilter {
408 return NewHelloResultFilter(filterSize, mutator) 415 return NewGenericResultFilter(filterSize, mutator)
409} 416}
410 417
411// ParseResultFilter from binary data 418// ParseResultFilter from binary data
412func (bh *HelloBlockHandler) ParseResultFilter(data []byte) ResultFilter { 419func (bh *HelloBlockHandler) ParseResultFilter(data []byte) ResultFilter {
413 return NewHelloResultFilterFromBytes(data) 420 return NewGenericResultFilterFromBytes(data)
414} 421}
415 422
416// FilterResult is used to filter results against specific queries. This 423// FilterResult is used to filter results against specific queries. This
@@ -432,84 +439,3 @@ func (bh *HelloBlockHandler) FilterResult(b Block, key *crypto.HashCode, rf Resu
432 rf.Add(b) 439 rf.Add(b)
433 return RF_LAST 440 return RF_LAST
434} 441}
435
436//----------------------------------------------------------------------
437// HELLO result filter
438//----------------------------------------------------------------------
439
440// HelloResultFilter is a result filter implementation for HELLO blocks
441type HelloResultFilter struct {
442 bf *BloomFilter
443}
444
445// NewHelloResultFilter initializes an empty resut filter
446func NewHelloResultFilter(filterSize int, mutator uint32) *HelloResultFilter {
447 // HELLO result filters are BloomFilters with a mutator
448 rf := new(HelloResultFilter)
449 rf.bf = NewBloomFilter(filterSize)
450 rf.bf.SetMutator(mutator)
451 return rf
452}
453
454// NewHelloResultFilterFromBytes creates a new result filter from a binary
455// representation: 'data' is the concatenaion 'mutator|bloomfilter'.
456// If 'withMutator' is false, no mutator is used.
457func NewHelloResultFilterFromBytes(data []byte) *HelloResultFilter {
458 //logger.Printf(logger.DBG, "[filter] FromBytes = %d:%s (mutator: %v)",len(data), hex.EncodeToString(data), withMutator)
459
460 // handle mutator input
461 mSize := 4
462 rf := new(HelloResultFilter)
463 rf.bf = &BloomFilter{
464 Bits: util.Clone(data[mSize:]),
465 }
466 if mSize > 0 {
467 rf.bf.SetMutator(data[:mSize])
468 }
469 return rf
470}
471
472// Add a HELLO block to th result filter
473func (rf *HelloResultFilter) Add(b Block) {
474 if hb, ok := b.(*HelloBlock); ok {
475 hAddr := sha512.Sum512(hb.AddrBin)
476 rf.bf.Add(hAddr[:])
477 }
478}
479
480// Contains checks if a block is contained in the result filter
481func (rf *HelloResultFilter) Contains(b Block) bool {
482 if hb, ok := b.(*HelloBlock); ok {
483 hAddr := sha512.Sum512(hb.AddrBin)
484 return rf.bf.Contains(hAddr[:])
485 }
486 return false
487}
488
489// ContainsHash checks if a block hash is contained in the result filter
490func (rf *HelloResultFilter) ContainsHash(bh *crypto.HashCode) bool {
491 return rf.bf.Contains(bh.Data)
492}
493
494// Bytes returns a binary representation of a HELLO result filter
495func (rf *HelloResultFilter) Bytes() []byte {
496 return rf.bf.Bytes()
497}
498
499// Compare two HELLO result filters
500func (rf *HelloResultFilter) Compare(t ResultFilter) int {
501 trf, ok := t.(*HelloResultFilter)
502 if !ok {
503 return CMP_DIFFER
504 }
505 return rf.bf.Compare(trf.bf)
506}
507
508// Merge two HELLO result filters
509func (rf *HelloResultFilter) Merge(t ResultFilter) bool {
510 trf, ok := t.(*HelloResultFilter)
511 if !ok {
512 return false
513 }
514 return rf.bf.Merge(trf.bf)
515}
diff --git a/src/gnunet/service/dht/messages.go b/src/gnunet/service/dht/messages.go
index 020c7f5..6383dc6 100644
--- a/src/gnunet/service/dht/messages.go
+++ b/src/gnunet/service/dht/messages.go
@@ -98,11 +98,12 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
98 } 98 }
99 } else { 99 } else {
100 // ... or create a new one 100 // ... or create a new one
101 mut := util.RndUInt32()
101 if blockHdlr != nil { 102 if blockHdlr != nil {
102 rf = blockHdlr.SetupResultFilter(128, util.RndUInt32()) 103 rf = blockHdlr.SetupResultFilter(128, mut)
103 } else { 104 } else {
104 logger.Printf(logger.WARN, "[%s] using default result filter", label) 105 logger.Printf(logger.WARN, "[%s] using default result filter", label)
105 rf = blocks.NewGenericResultFilter() 106 rf = blocks.NewGenericResultFilter(128, mut)
106 } 107 }
107 } 108 }
108 // clone peer filter 109 // clone peer filter
@@ -131,6 +132,10 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
131 if btype == enums.BLOCK_TYPE_DHT_URL_HELLO { 132 if btype == enums.BLOCK_TYPE_DHT_URL_HELLO {
132 // try to find results in HELLO cache 133 // try to find results in HELLO cache
133 results = m.lookupHelloCache(label, addr, rf, approx) 134 results = m.lookupHelloCache(label, addr, rf, approx)
135 // DEBUG:
136 for i, res := range results {
137 logger.Printf(logger.DBG, "[%s] cache #%d = %s", label, i, res)
138 }
134 } 139 }
135 140
136 //-------------------------------------------------------------- 141 //--------------------------------------------------------------
@@ -142,8 +147,29 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
142 // get results from local storage 147 // get results from local storage
143 lclResults, err := m.getLocalStorage(label, query, rf) 148 lclResults, err := m.getLocalStorage(label, query, rf)
144 if err == nil { 149 if err == nil {
145 // append local results 150 // DEBUG:
146 results = append(results, lclResults...) 151 for i, res := range lclResults {
152 logger.Printf(logger.DBG, "[%s] local #%d = %s", label, i, res)
153 }
154 // create total result list
155 if len(results) == 0 {
156 results = lclResults
157 } else if len(results)+len(lclResults) <= 10 {
158 // handle few results directly
159 results = append(results, lclResults...)
160 } else {
161 // compile a new sorted list from results.
162 list := store.NewSortedDHTResults(10)
163 for pos, res := range results {
164 list.Add(res, pos)
165 }
166 for _, res := range lclResults {
167 if pos := list.Accepts(res.Dist); pos != -1 {
168 list.Add(res, pos)
169 }
170 }
171 results = list.GetResults()
172 }
147 } 173 }
148 } 174 }
149 // if we have results, send them as response on the back channel 175 // if we have results, send them as response on the back channel
@@ -159,7 +185,11 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
159 pth = result.Entry.Path.Clone() 185 pth = result.Entry.Path.Clone()
160 pth.SplitPos = pth.NumList 186 pth.SplitPos = pth.NumList
161 pe := pth.NewElement(pth.LastHop, local, back.Receiver()) 187 pe := pth.NewElement(pth.LastHop, local, back.Receiver())
162 pth.Add(pe) 188 if err := m.core.Sign(pe); err != nil {
189 logger.Printf(logger.ERROR, "[%s] failed to sign path element: %s", label, err.Error())
190 } else {
191 pth.Add(pe)
192 }
163 } 193 }
164 194
165 logger.Printf(logger.INFO, "[%s] sending result message to %s", label, rcv) 195 logger.Printf(logger.INFO, "[%s] sending result message to %s", label, rcv)
@@ -186,7 +216,7 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
186 } 216 }
187 pf.Add(p.Peer) 217 pf.Add(p.Peer)
188 // create open get-forward result handler 218 // create open get-forward result handler
189 rh := NewResultHandler(msg, rf, back) 219 rh := NewResultHandler(msg, rf, back, m.core)
190 logger.Printf(logger.INFO, "[%s] result handler task #%d (key %s) started", 220 logger.Printf(logger.INFO, "[%s] result handler task #%d (key %s) started",
191 label, rh.ID(), rh.Key().Short()) 221 label, rh.ID(), rh.Key().Short())
192 m.reshdlrs.Add(rh) 222 m.reshdlrs.Add(rh)
@@ -318,7 +348,11 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
318 // yes: add path element 348 // yes: add path element
319 pp = entry.Path.Clone() 349 pp = entry.Path.Clone()
320 pe := pp.NewElement(sender, local, p.Peer) 350 pe := pp.NewElement(sender, local, p.Peer)
321 pp.Add(pe) 351 if err := m.core.Sign(pe); err != nil {
352 logger.Printf(logger.ERROR, "[%s] failed to sign path element: %s", label, err.Error())
353 } else {
354 pp.Add(pe)
355 }
322 } 356 }
323 // build updated PUT message 357 // build updated PUT message
324 msgOut := msg.Update(pp, pf, msg.HopCount+1) 358 msgOut := msg.Update(pp, pf, msg.HopCount+1)
@@ -409,8 +443,20 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
409 key := msg.Query.String() 443 key := msg.Query.String()
410 if list, ok := m.reshdlrs.Get(key); ok { 444 if list, ok := m.reshdlrs.Get(key); ok {
411 for _, rh := range list { 445 for _, rh := range list {
412 logger.Printf(logger.DBG, "[%s] Result handler task #%d found", label, rh.ID()) 446 logger.Printf(logger.DBG, "[%s] Result handler task #%d found (receiver %s)", label, rh.ID(), rh.Receiver().Short())
413 447
448 // check if the handler can really handle the result
449 if rh.Type() != btype {
450 // this is another block type, we don't handle it
451 logger.Printf(logger.DBG, "[%s] Result handler not suitable (%s != %s) -- skipped", label, rh.Type(), btype)
452 continue
453 }
454 /*
455 if rh.Flags()&enums.DHT_RO_FIND_APPROXIMATE != msg.Flags&enums.DHT_RO_FIND_APPROXIMATE {
456 logger.Printf(logger.DBG, "[%s] Result handler asked for match, got approx -- ignored", label)
457 continue
458 }
459 */
414 //-------------------------------------------------------------- 460 //--------------------------------------------------------------
415 // check task list for handler (9.5.2.6) 461 // check task list for handler (9.5.2.6)
416 if rh.Flags()&enums.DHT_RO_FIND_APPROXIMATE == 0 && blkKey != nil && !blkKey.Equal(rh.Key()) { 462 if rh.Flags()&enums.DHT_RO_FIND_APPROXIMATE == 0 && blkKey != nil && !blkKey.Equal(rh.Key()) {
@@ -485,6 +531,7 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m
485 } 531 }
486 // we need to cache a new(er) HELLO 532 // we need to cache a new(er) HELLO
487 if isNew { 533 if isNew {
534 logger.Printf(logger.INFO, "[%s] caching HELLO from %s", label, sender.Short())
488 m.rtable.CacheHello(&blocks.HelloBlock{ 535 m.rtable.CacheHello(&blocks.HelloBlock{
489 PeerID: sender, 536 PeerID: sender,
490 Signature: msg.Signature, 537 Signature: msg.Signature,
@@ -552,7 +599,12 @@ func (m *Module) sendResult(ctx context.Context, query blocks.Query, blk blocks.
552 out.Block = blk.Bytes() 599 out.Block = blk.Bytes()
553 out.MsgSize += uint16(len(out.Block)) 600 out.MsgSize += uint16(len(out.Block))
554 out.SetPath(pth) 601 out.SetPath(pth)
555 602 /*
603 // DEBUG:
604 if out.BType == enums.BLOCK_TYPE_TEST {
605 logger.Printf(logger.DBG, "result message = %s", util.Dump(out, "json"))
606 }
607 */
556 // send message 608 // send message
557 return back.Send(ctx, out) 609 return back.Send(ctx, out)
558} 610}
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index fcf5d1c..718b730 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -56,7 +56,7 @@ type LocalBlockResponder struct {
56func NewLocalBlockResponder() *LocalBlockResponder { 56func NewLocalBlockResponder() *LocalBlockResponder {
57 return &LocalBlockResponder{ 57 return &LocalBlockResponder{
58 ch: make(chan blocks.Block), 58 ch: make(chan blocks.Block),
59 rf: blocks.NewGenericResultFilter(), 59 rf: blocks.NewGenericResultFilter(128, util.RndUInt32()),
60 } 60 }
61} 61}
62 62
@@ -77,6 +77,10 @@ func (lr *LocalBlockResponder) Send(ctx context.Context, msg message.Message) er
77 lr.ch <- blk 77 lr.ch <- blk
78 } else { 78 } else {
79 logger.Println(logger.WARN, "[local] DHT-RESULT block problem: "+err.Error()) 79 logger.Println(logger.WARN, "[local] DHT-RESULT block problem: "+err.Error())
80 // DEBUG:
81 logger.Printf(logger.DBG, "[local] btype=%s, expire=%s", res.BType, res.Expire)
82 logger.Printf(logger.DBG, "[local] block=%s", hex.EncodeToString(res.Block))
83 panic("@@@")
80 } 84 }
81 }() 85 }()
82 default: 86 default:
@@ -161,7 +165,7 @@ func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Mod
161 if !ok { 165 if !ok {
162 logger.Println(logger.WARN, "[dht-discovery] received invalid block data") 166 logger.Println(logger.WARN, "[dht-discovery] received invalid block data")
163 logger.Printf(logger.DBG, "[dht-discovery] -> %s", hex.EncodeToString(res.Bytes())) 167 logger.Printf(logger.DBG, "[dht-discovery] -> %s", hex.EncodeToString(res.Bytes()))
164 } else { 168 } else if !hb.PeerID.Equal(m.core.PeerID()) {
165 // cache HELLO block 169 // cache HELLO block
166 m.rtable.CacheHello(hb) 170 m.rtable.CacheHello(hb)
167 // add sender to routing table 171 // add sender to routing table
@@ -385,7 +389,7 @@ func (m *Module) getHello(label string) (msg *message.DHTP2PHelloMsg, err error)
385 // save for later use 389 // save for later use
386 m.lastHello = msg 390 m.lastHello = msg
387 391
388 // DEBUG 392 // DEBUG:
389 var ok bool 393 var ok bool
390 if ok, err = msg.Verify(m.core.PeerID()); !ok || err != nil { 394 if ok, err = msg.Verify(m.core.PeerID()); !ok || err != nil {
391 if !ok { 395 if !ok {
diff --git a/src/gnunet/service/dht/path/handling.go b/src/gnunet/service/dht/path/handling.go
index 7ad91df..38fce68 100644
--- a/src/gnunet/service/dht/path/handling.go
+++ b/src/gnunet/service/dht/path/handling.go
@@ -43,7 +43,7 @@ type Path struct {
43 NumList uint16 `order:"big"` // number of list entries 43 NumList uint16 `order:"big"` // number of list entries
44 SplitPos uint16 `order:"big"` // optional split position 44 SplitPos uint16 `order:"big"` // optional split position
45 List []*Entry `size:"NumList"` // list of path entries 45 List []*Entry `size:"NumList"` // list of path entries
46 LastSig *util.PeerSignature `opt:"(Isused)"` // last hop signature 46 LastSig *util.PeerSignature `opt:"(IsUsed)"` // last hop signature
47 LastHop *util.PeerID `opt:"(IsUsed)"` // last hop peer id 47 LastHop *util.PeerID `opt:"(IsUsed)"` // last hop peer id
48} 48}
49 49
@@ -90,16 +90,21 @@ func (p *Path) Size() uint {
90 if p.TruncOrigin != nil { 90 if p.TruncOrigin != nil {
91 size += p.TruncOrigin.Size() 91 size += p.TruncOrigin.Size()
92 } 92 }
93 size += uint(p.NumList) * p.List[0].Size() 93 if p.NumList > 0 {
94 size += uint(p.NumList) * p.List[0].Size()
95 }
94 if p.LastSig != nil { 96 if p.LastSig != nil {
95 size += p.LastSig.Size() + p.LastHop.Size() 97 size += p.LastSig.Size()
96 } 98 }
97 return size 99 return size
98} 100}
99 101
100// Bytes returns a binary representation 102// Bytes returns a binary representation
101func (p *Path) Bytes() []byte { 103func (p *Path) Bytes() []byte {
102 buf, _ := data.Marshal(p) 104 buf, err := data.Marshal(p)
105 if err != nil {
106 logger.Println(logger.WARN, "[path] failed serialization: "+err.Error())
107 }
103 return buf 108 return buf
104} 109}
105 110
diff --git a/src/gnunet/service/dht/resulthandler.go b/src/gnunet/service/dht/resulthandler.go
index 7d9a94f..59ab9fc 100644
--- a/src/gnunet/service/dht/resulthandler.go
+++ b/src/gnunet/service/dht/resulthandler.go
@@ -65,11 +65,12 @@ type ResultHandler struct {
65 started util.AbsoluteTime // Timestamp of session start 65 started util.AbsoluteTime // Timestamp of session start
66 active bool // is the task active? 66 active bool // is the task active?
67 resp transport.Responder // back-channel to deliver result 67 resp transport.Responder // back-channel to deliver result
68 signer crypto.Signer // signing instance
68} 69}
69 70
70// NewResultHandler creates an instance from a DHT-GET message and a 71// NewResultHandler creates an instance from a DHT-GET message and a
71// result filter instance. 72// result filter instance.
72func NewResultHandler(msg *message.DHTP2PGetMsg, rf blocks.ResultFilter, back transport.Responder) *ResultHandler { 73func NewResultHandler(msg *message.DHTP2PGetMsg, rf blocks.ResultFilter, back transport.Responder, signer crypto.Signer) *ResultHandler {
73 return &ResultHandler{ 74 return &ResultHandler{
74 id: util.NextID(), 75 id: util.NextID(),
75 key: msg.Query.Clone(), 76 key: msg.Query.Clone(),
@@ -80,6 +81,7 @@ func NewResultHandler(msg *message.DHTP2PGetMsg, rf blocks.ResultFilter, back tr
80 started: util.AbsoluteTimeNow(), 81 started: util.AbsoluteTimeNow(),
81 active: true, 82 active: true,
82 resp: back, 83 resp: back,
84 signer: signer,
83 } 85 }
84} 86}
85 87
@@ -93,6 +95,16 @@ func (t *ResultHandler) Key() *crypto.HashCode {
93 return t.key 95 return t.key
94} 96}
95 97
98// Receiver returns the destination peer
99func (t *ResultHandler) Receiver() *util.PeerID {
100 return t.resp.Receiver()
101}
102
103// Type returns the requested block type
104func (t *ResultHandler) Type() enums.BlockType {
105 return t.btype
106}
107
96// Flags returns the query flags 108// Flags returns the query flags
97func (t *ResultHandler) Flags() uint16 { 109func (t *ResultHandler) Flags() uint16 {
98 return t.flags 110 return t.flags
@@ -161,7 +173,11 @@ func (t *ResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg
161 pp = pth.Clone() 173 pp = pth.Clone()
162 // yes: add path element 174 // yes: add path element
163 pe := pp.NewElement(sender, local, rcv) 175 pe := pp.NewElement(sender, local, rcv)
164 pp.Add(pe) 176 if err := t.signer.Sign(pe); err == nil {
177 logger.Printf(logger.ERROR, "[dht-task-%d] failed to sign path element: %s", t.id, err.Error())
178 } else {
179 pp.Add(pe)
180 }
165 } 181 }
166 // build updated PUT message 182 // build updated PUT message
167 msg = msg.Update(pp) 183 msg = msg.Update(pp)
@@ -213,7 +229,7 @@ func (t *ResultHandlerList) Add(hdlr *ResultHandler) bool {
213 case RHC_MERGE: 229 case RHC_MERGE:
214 // merge the two result handlers 230 // merge the two result handlers
215 oldMod := modified 231 oldMod := modified
216 modified = h.Merge(hdlr) || modified 232 modified = h.Merge(hdlr)
217 logger.Printf(logger.DBG, "[rhl] resultfilter compare: MERGE (%v -- %v)", oldMod, modified) 233 logger.Printf(logger.DBG, "[rhl] resultfilter compare: MERGE (%v -- %v)", oldMod, modified)
218 break loop 234 break loop
219 case RHC_REPLACE: 235 case RHC_REPLACE:
diff --git a/src/gnunet/service/dht/routingtable.go b/src/gnunet/service/dht/routingtable.go
index aa3c32f..a119bbe 100644
--- a/src/gnunet/service/dht/routingtable.go
+++ b/src/gnunet/service/dht/routingtable.go
@@ -342,7 +342,7 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) {
342 // check if we can/need to drop a peer 342 // check if we can/need to drop a peer
343 drop := timeout.Compare(p.lastSeen.Elapsed()) < 0 343 drop := timeout.Compare(p.lastSeen.Elapsed()) < 0
344 if drop || timeout.Compare(p.lastUsed.Elapsed()) < 0 { 344 if drop || timeout.Compare(p.lastUsed.Elapsed()) < 0 {
345 logger.Printf(logger.DBG, "[dht-rt-hb] removing %v: %v, %v", p, p.lastSeen.Elapsed(), p.lastUsed.Elapsed()) 345 logger.Printf(logger.DBG, "[dht-rt-hb] removing %s: lastSeen %s, lastUsed %v", p.Peer.Short(), p.lastSeen.Elapsed(), p.lastUsed.Elapsed())
346 rt.Remove(p, "dht-rt-hb", pid) 346 rt.Remove(p, "dht-rt-hb", pid)
347 } 347 }
348 return nil 348 return nil
@@ -367,29 +367,29 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) {
367// LookupHello returns blocks from the HELLO cache for given query. 367// LookupHello returns blocks from the HELLO cache for given query.
368func (rt *RoutingTable) LookupHello(addr *PeerAddress, rf blocks.ResultFilter, approx bool, label string) (results []*store.DHTResult) { 368func (rt *RoutingTable) LookupHello(addr *PeerAddress, rf blocks.ResultFilter, approx bool, label string) (results []*store.DHTResult) {
369 // iterate over cached HELLOs to find matches; 369 // iterate over cached HELLOs to find matches;
370 // approximate search is limited by distance (max. diff for bucket index is 16) 370 // approximate search is guided by distance
371 list := store.NewSortedDHTResults(10)
371 _ = rt.helloCache.ProcessRange(func(key string, hb *blocks.HelloBlock, _ int) error { 372 _ = rt.helloCache.ProcessRange(func(key string, hb *blocks.HelloBlock, _ int) error {
372 // check if block is excluded by result filter 373 // check if block is excluded by result filter
373 var result *store.DHTResult
374 if !rf.Contains(hb) { 374 if !rf.Contains(hb) {
375 // no: possible result, compute distance 375 // no: possible result, compute distance
376 p := NewPeerAddress(hb.PeerID) 376 p := NewPeerAddress(hb.PeerID)
377 dist, idx := addr.Distance(p) 377 dist, _ := addr.Distance(p)
378 result = &store.DHTResult{ 378 if pos := list.Accepts(dist); pos != -1 {
379 Entry: &store.DHTEntry{ 379 result := &store.DHTResult{
380 Blk: hb, 380 Entry: &store.DHTEntry{
381 }, 381 Blk: hb,
382 Dist: dist, 382 },
383 } 383 Dist: dist,
384 // check if we need to add result 384 }
385 if (approx && idx < 16) || idx == 0 { 385 list.Add(result, pos)
386 results = append(results, result)
387 } 386 }
388 } else { 387 } else {
389 logger.Println(logger.DBG, "[%s] LookupHello: cached HELLO block is filtered") 388 logger.Println(logger.DBG, "[%s] LookupHello: cached HELLO block is filtered")
390 } 389 }
391 return nil 390 return nil
392 }, true) 391 }, true)
392 results = list.GetResults()
393 return 393 return
394} 394}
395 395
diff --git a/src/gnunet/service/namecache/module.go b/src/gnunet/service/namecache/module.go
index 616c0b5..0e8053a 100644
--- a/src/gnunet/service/namecache/module.go
+++ b/src/gnunet/service/namecache/module.go
@@ -26,6 +26,7 @@ import (
26 "gnunet/service" 26 "gnunet/service"
27 "gnunet/service/dht/blocks" 27 "gnunet/service/dht/blocks"
28 "gnunet/service/store" 28 "gnunet/service/store"
29 "gnunet/util"
29) 30)
30 31
31//====================================================================== 32//======================================================================
@@ -71,7 +72,7 @@ func (m *Module) Import(fcm map[string]any) {
71// Get entry from the cache if available. 72// Get entry from the cache if available.
72func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) { 73func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) {
73 var e []*store.DHTEntry 74 var e []*store.DHTEntry
74 rf := blocks.NewGenericResultFilter() 75 rf := blocks.NewGenericResultFilter(128, util.RndUInt32())
75 if e, err = m.cache.Get("namecache", query, rf); err != nil { 76 if e, err = m.cache.Get("namecache", query, rf); err != nil {
76 return 77 return
77 } 78 }
diff --git a/src/gnunet/service/store/store_dht.go b/src/gnunet/service/store/store_dht.go
index 3e0eb29..2673cae 100644
--- a/src/gnunet/service/store/store_dht.go
+++ b/src/gnunet/service/store/store_dht.go
@@ -22,6 +22,7 @@ import (
22 "encoding/hex" 22 "encoding/hex"
23 "fmt" 23 "fmt"
24 "gnunet/crypto" 24 "gnunet/crypto"
25 "gnunet/enums"
25 "gnunet/service/dht/blocks" 26 "gnunet/service/dht/blocks"
26 "gnunet/service/dht/path" 27 "gnunet/service/dht/path"
27 "gnunet/util" 28 "gnunet/util"
@@ -46,6 +47,11 @@ type DHTEntry struct {
46 Path *path.Path // associated put path 47 Path *path.Path // associated put path
47} 48}
48 49
50// String returns a human-readable representation
51func (e *DHTEntry) String() string {
52 return fmt.Sprintf("DHTEntry{%s,path=%s}", e.Blk, e.Path)
53}
54
49//------------------------------------------------------------ 55//------------------------------------------------------------
50// DHT result is a single DHT result 56// DHT result is a single DHT result
51//------------------------------------------------------------ 57//------------------------------------------------------------
@@ -56,31 +62,62 @@ type DHTResult struct {
56 Dist *math.Int // distance of entry to query key 62 Dist *math.Int // distance of entry to query key
57} 63}
58 64
59//------------------------------------------------------------ 65// String returns a human-readable representation
66func (r *DHTResult) String() string {
67 return fmt.Sprintf("DHTResult{%s,dist=%d}", r.Entry, r.Dist.BitLen())
68}
69
70//----------------------------------------------------------------------
71// Sorted DHT result list
72//----------------------------------------------------------------------
73
74// SortedDHTResults is a length-limit result list which only adds entries
75// if they are "better" than another listed entry. "better" means "less
76// distant" from the search key
77type SortedDHTResults struct {
78 list []*DHTResult
79}
60 80
61type DHTResultSet struct { 81// NewSortedDHTResults creates a new sorted result list
62 list []*DHTResult // list of DHT results 82func NewSortedDHTResults(n int) *SortedDHTResults {
63 pos int // iterator position 83 return &SortedDHTResults{
84 list: make([]*DHTResult, n),
85 }
64} 86}
65 87
66func NewDHTResultSet() *DHTResultSet { 88// Accepts checks if given distance would be inserted into the list
67 return &DHTResultSet{ 89// at pos. If pos < 0 the distance is rejected.
68 list: make([]*DHTResult, 0), 90func (rl *SortedDHTResults) Accepts(dist *math.Int) int {
69 pos: 0, 91 for pos, entry := range rl.list {
92 if entry == nil || entry.Dist.Cmp(dist) > 0 {
93 return pos
94 }
70 } 95 }
96 return -1
71} 97}
72 98
73func (rs *DHTResultSet) Add(r *DHTResult) { 99// Add result at given position with sanity check
74 rs.list = append(rs.list, r) 100func (rl *SortedDHTResults) Add(res *DHTResult, pos int) {
101 // check index
102 if pos < 0 || pos > len(rl.list)-1 {
103 return
104 }
105 // check entry
106 entry := rl.list[pos]
107 if entry == nil || entry.Dist.Cmp(res.Dist) > 0 {
108 rl.list[pos] = res
109 }
75} 110}
76 111
77func (rs *DHTResultSet) Next() (result *DHTResult) { 112// GetResults returns the final result list
78 if rs.pos == len(rs.list) { 113func (rl *SortedDHTResults) GetResults() []*DHTResult {
79 return nil 114 out := make([]*DHTResult, 0)
115 for _, res := range rl.list {
116 if res != nil {
117 out = append(out, res)
118 }
80 } 119 }
81 result = rs.list[rs.pos] 120 return out
82 rs.pos++
83 return
84} 121}
85 122
86//------------------------------------------------------------ 123//------------------------------------------------------------
@@ -167,8 +204,8 @@ func (s *DHTStore) Put(query blocks.Query, entry *DHTEntry) (err error) {
167 expire := entry.Blk.Expire() 204 expire := entry.Blk.Expire()
168 blkSize := len(entry.Blk.Bytes()) 205 blkSize := len(entry.Blk.Bytes())
169 206
170 logger.Printf(logger.INFO, "[dht-store] storing %d bytes @ %s (path %s)", 207 logger.Printf(logger.INFO, "[dht-store] storing %d bytes @ %s (path %s), expires %s",
171 blkSize, query.Key().Short(), entry.Path) 208 blkSize, query.Key().Short(), entry.Path, expire)
172 209
173 // write entry to file for storage 210 // write entry to file for storage
174 if err = s.writeEntry(query.Key().Data, entry); err != nil { 211 if err = s.writeEntry(query.Key().Data, entry); err != nil {
@@ -233,41 +270,53 @@ func (s *DHTStore) Get(label string, query blocks.Query, rf blocks.ResultFilter)
233 logger.Printf(logger.ERROR, "[%s] can't flag DHT entry as used: %s", label, err) 270 logger.Printf(logger.ERROR, "[%s] can't flag DHT entry as used: %s", label, err)
234 continue 271 continue
235 } 272 }
273 logger.Printf(logger.INFO, "[dht-store] retrieving %d bytes @ %s (path %s)",
274 len(entry.Blk.Bytes()), query.Key().Short(), entry.Path)
236 } 275 }
237 return 276 return
238} 277}
239 278
240// GetApprox returns the best-matching value with given key from storage 279// GetApprox returns the best-matching values with given key from storage
241// that is not excluded 280// that are not excluded
242func (s *DHTStore) GetApprox(label string, query blocks.Query, rf blocks.ResultFilter) (results []*DHTResult, err error) { 281func (s *DHTStore) GetApprox(label string, query blocks.Query, rf blocks.ResultFilter) (results []*DHTResult, err error) {
282 btype := query.Type()
283
284 // List of possible results (size limited)
285 list := NewSortedDHTResults(10)
286
243 // iterate over all keys; process each metadata instance 287 // iterate over all keys; process each metadata instance
244 // (append to results if appropriate)
245 process := func(md *FileMetadata) { 288 process := func(md *FileMetadata) {
289 // filter by block type
290 if btype != enums.BLOCK_TYPE_ANY && btype != md.btype {
291 // block type not matching
292 return
293 }
246 // check for filtered block. 294 // check for filtered block.
247 if rf.ContainsHash(md.bhash) { 295 if rf.ContainsHash(md.bhash) {
248 // filtered out... 296 // filtered out...
249 return 297 return
250 } 298 }
251 // check distance (max. 16 bucktes off) 299 // check distance in result list
252 dist := util.Distance(md.key.Data, query.Key().Data) 300 dist := util.Distance(md.key.Data, query.Key().Data)
253 if (512 - dist.BitLen()) > 16 { 301 if pos := list.Accepts(dist); pos != -1 {
254 return 302
255 } 303 // read entry from storage
256 // read entry from storage 304 var entry *DHTEntry
257 var entry *DHTEntry 305 if entry, err = s.readEntry(md); err != nil {
258 if entry, err = s.readEntry(md); err != nil { 306 logger.Printf(logger.ERROR, "[%s] failed to retrieve block for %s", label, md.key.String())
259 logger.Printf(logger.ERROR, "[%s] failed to retrieve block for %s", label, md.key.String()) 307 return
260 return 308 }
261 } 309 // add to result list
262 // add to result list 310 result := &DHTResult{
263 result := &DHTResult{ 311 Entry: entry,
264 Entry: entry, 312 Dist: dist,
265 Dist: dist, 313 }
314 list.Add(result, pos)
266 } 315 }
267 results = append(results, result)
268 } 316 }
269 // traverse mestadata database 317 // traverse mestadata database
270 err = s.meta.Traverse(process) 318 err = s.meta.Traverse(process)
319 results = list.GetResults()
271 return 320 return
272} 321}
273 322
diff --git a/src/gnunet/service/store/store_dht_test.go b/src/gnunet/service/store/store_dht_test.go
index 5278634..263ac41 100644
--- a/src/gnunet/service/store/store_dht_test.go
+++ b/src/gnunet/service/store/store_dht_test.go
@@ -63,7 +63,7 @@ func TestDHTFilesStore(t *testing.T) {
63 // allocate keys 63 // allocate keys
64 keys := make([]blocks.Query, 0, fsNumBlocks) 64 keys := make([]blocks.Query, 0, fsNumBlocks)
65 // create result filter 65 // create result filter
66 rf := blocks.NewGenericResultFilter() 66 rf := blocks.NewGenericResultFilter(128, 236742)
67 67
68 // First round: save blocks 68 // First round: save blocks
69 btype := enums.BLOCK_TYPE_TEST 69 btype := enums.BLOCK_TYPE_TEST
diff --git a/src/gnunet/transport/reader_writer.go b/src/gnunet/transport/reader_writer.go
index 349dad7..2cee0a9 100644
--- a/src/gnunet/transport/reader_writer.go
+++ b/src/gnunet/transport/reader_writer.go
@@ -20,7 +20,6 @@ package transport
20 20
21import ( 21import (
22 "context" 22 "context"
23 "errors"
24 "fmt" 23 "fmt"
25 "gnunet/message" 24 "gnunet/message"
26 "io" 25 "io"
@@ -42,10 +41,13 @@ func WriteMessage(ctx context.Context, wrt io.WriteCloser, msg message.Message)
42 return 41 return
43 } 42 }
44 /* 43 /*
45 // debug for outgoing messages 44 // DEBUG: outgoing messages
46 if msg.Type() == enums.MSG_DHT_P2P_HELLO { 45 if msg.Type() == enums.MSG_DHT_P2P_RESULT {
47 logger.Printf(logger.DBG, "[rw_msg] msg=%s", hex.EncodeToString(buf)) 46 tmsg, _ := msg.(*message.DHTP2PResultMsg)
48 logger.Printf(logger.DBG, "[rw_msg] msg=%s", util.Dump(msg, "json")) 47 if tmsg.BType == enums.BLOCK_TYPE_TEST {
48 logger.Printf(logger.DBG, "[rw_msg] msg=%s", hex.EncodeToString(buf))
49 logger.Printf(logger.DBG, "[rw_msg] msg=%s", util.Dump(msg, "json"))
50 }
49 } 51 }
50 */ 52 */
51 // check message header size and packet size 53 // check message header size and packet size
@@ -54,7 +56,7 @@ func WriteMessage(ctx context.Context, wrt io.WriteCloser, msg message.Message)
54 return 56 return
55 } 57 }
56 if len(buf) != int(mh.MsgSize) { 58 if len(buf) != int(mh.MsgSize) {
57 return errors.New("WriteMessage: message size mismatch") 59 return fmt.Errorf("WriteMessage: message size mismatch (%d != %d)", len(buf), mh.MsgSize)
58 } 60 }
59 // perform write operation 61 // perform write operation
60 var n int 62 var n int
@@ -116,7 +118,7 @@ func ReadMessage(ctx context.Context, rdr io.ReadCloser, buf []byte) (msg messag
116 } 118 }
117 err = data.Unmarshal(msg, buf[:mh.MsgSize]) 119 err = data.Unmarshal(msg, buf[:mh.MsgSize])
118 /* 120 /*
119 // debug for incoming messages 121 // DEBUG: incoming messages
120 if mh.MsgType == enums.MSG_DHT_P2P_RESULT { 122 if mh.MsgType == enums.MSG_DHT_P2P_RESULT {
121 logger.Printf(logger.DBG, "[rw_msg] msg=%s", hex.EncodeToString(buf[:mh.MsgSize])) 123 logger.Printf(logger.DBG, "[rw_msg] msg=%s", hex.EncodeToString(buf[:mh.MsgSize]))
122 logger.Printf(logger.DBG, "[rw_msg] msg=%s", util.Dump(msg, "json")) 124 logger.Printf(logger.DBG, "[rw_msg] msg=%s", util.Dump(msg, "json"))
diff --git a/src/gnunet/util/peer.go b/src/gnunet/util/peer.go
index d16f2df..9646966 100644
--- a/src/gnunet/util/peer.go
+++ b/src/gnunet/util/peer.go
@@ -95,6 +95,9 @@ func (p *PeerID) String() string {
95 95
96// SHort returns a shortened peer id for display 96// SHort returns a shortened peer id for display
97func (p *PeerID) Short() string { 97func (p *PeerID) Short() string {
98 if p == nil {
99 return "local"
100 }
98 return p.String()[:8] 101 return p.String()[:8]
99} 102}
100 103